# -*- coding: utf-8 -*-
# Copyright © 2015 Yahoo! Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import contextlib
import functools

import consul
from oslo_utils import encodeutils
import requests

import tooz
from tooz import _retry
from tooz import coordination
from tooz import locking
from tooz import utils

def _failure_translator():

    """Translates common consul exceptions into tooz exceptions."""
    except (consul.Timeout, requests.exceptions.RequestException) as e:
    except (consul.ConsulException, ValueError) as e:
        # ValueError = Typically json decoding failed for some reason.

def _translate_failures(func):

    def wrapper(*args, **kwargs):
        with _failure_translator():
            return func(*args, **kwargs)

    return wrapper

class ConsulLock(locking.Lock):
    def __init__(self, name, node, address, session_id, client, token=None):
        super(ConsulLock, self).__init__(name)
        self._name = name
        self._node = node
        self._address = address
        self._acl_token = token
        self._session_id = session_id
        self._client = client
        self.acquired = False

    def acquire(self, blocking=True, shared=False):
        if shared:
            raise tooz.NotImplemented

        def _acquire():
            # Check if we are the owner and if we are simulate
            # blocking (because consul will not block a second
            # acquisition attempt by the same owner).
            _index, value = self._client.kv.get(key=self._name,
            if value and value.get('Session') == self._session_id:
                if blocking is False:
                    return False
                    raise _retry.TryAgain
                # The value can be anything.
                gotten = self._client.kv.put(key=self._name,
                                             value="I got it!",
                if gotten:
                    self.acquired = True
                    return True
                if blocking is False:
                    return False
                    raise _retry.TryAgain

        return _acquire()

    def release(self):
        if not self.acquired:
            return False
        # Get the lock to verify the session ID's are same
        _index, contents = self._client.kv.get(key=self._name,
        if not contents:
            return False
        owner = contents.get('Session')
        if owner == self._session_id:
            removed = self._client.kv.put(key=self._name,
            if removed:
                self.acquired = False
                return True
        return False

[docs]class ConsulDriver(coordination.CoordinationDriverCachedRunWatchers, coordination.CoordinationDriverWithExecutor): """This driver uses `python-consul`_ client against `consul`_ servers. The ConsulDriver implements the coordination driver API(s) so that Consul can be used as an option for Distributed Locking and Group Membership. The data is stored in Consul's key-value store. The Consul driver connection URI should look like:: consul://HOST[:PORT][?OPTION1=VALUE1[&OPTION2=VALUE2[&...]]] If not specified, PORT defaults to 8500. Available options are: ================== ======= Name Default ================== ======= ttl 15 namespace tooz acl_token None ================== ======= For details on the available options, refer to The following Key/Value paths are utilized in Consul to implement the coordination APIs: +-------------------------------------------+--------------+--------------+ | Key | Value | Description | +===========================================+==============+==============+ | <namespace>/groups/<group_id> | None | Group of | | | | members. | +-------------------------------------------+--------------+--------------+ | <namespace>/groups/<group_id>/<member_id> | Member | Member in a | | | capabilities | group. | | | encoded as | | | | msgpack | | +-------------------------------------------+--------------+--------------+ | <namespace>/group_locks/<group_id> | Consul | Lock for | | | session ID | group | | | | membership | +-------------------------------------------+--------------+--------------+ | <namespace>/locks/<name> | Consul | Each key is | | | session ID | a distributed| | | | lock. | +-------------------------------------------+--------------+--------------+ NOTE: A group in tooz is NOT the same as a Consul service, so tooz groups are implemented using Consul Key/Values and not with native services. Groups in tooz do not have host:port details or health checks that report to consul to verify that the service is still alive and listening on that host:port. If you need to use native Consul services, configure the Consul agent directly (not via tooz). .. _python-consul: .. _consul: """ #: Default namespace when none is provided TOOZ_NAMESPACE = "tooz" #: Default TTL DEFAULT_TTL = 15 #: Default consul port if not provided. DEFAULT_PORT = 8500 #: Consul ACL Token if not provided ACL_TOKEN = None CHARACTERISTICS = ( # client liveness is based on more than just timeouts coordination.Characteristics.NON_TIMEOUT_BASED, # multiple service instances (group members) could register # with different ports per thread/proc # but the agent is always on locahost: so not DISTRIBUTED_ACROSS_HOSTS coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS, coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES, # # The consul consistency mode determines the history characteristics. # Writes *always* go through a single leader process into raft log. # Reads *may* use all servers depending on the request's mode: # - 'consistent' = LINEARIZABLE # - 'default' = SEQUENTIAL # - 'stale' = CAUSAL coordination.Characteristics.SEQUENTIAL, # 'default' consistency mode # # raft log of servers is snapshotted + compacted coordination.Characteristics.SERIALIZABLE, ) """ Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable enum member(s) that can be used to interogate how this driver works. """
[docs] def __init__(self, member_id, parsed_url, options): super(ConsulDriver, self).__init__(member_id, parsed_url, options) options = utils.collapse(options) self._host = parsed_url.hostname self._port = parsed_url.port or self.DEFAULT_PORT self._session_id = None self._session_name = encodeutils.safe_decode(member_id) self._ttl = int(options.get('ttl', self.DEFAULT_TTL)) namespace = options.get('namespace', self.TOOZ_NAMESPACE) self._namespace = encodeutils.safe_decode(namespace) self._acl_token = options.get('acl_token', self.ACL_TOKEN) # the empty group name adds a trailing / needed for lookups self._groups_prefix = self._paths_join(self._namespace, "groups", "") self._client = None
def _start(self): """Create a client, register a node and create a session.""" # Create a consul client if self._client is None: self._client = consul.Consul(host=self._host, port=self._port, token=self._acl_token) local_agent = self._client.agent.self() self._node = local_agent['Member']['Name'] self._address = local_agent['Member']['Addr'] # implicitly uses the agent's datacenter (set in consul agent config) # Register a Node self._client.catalog.register(node=self._node, address=self._address, token=self._acl_token) # Create a session self._session_id = self._client.session.create( name=self._session_name, node=self._node, ttl=self._ttl, token=self._acl_token) def _stop(self): if self._client is not None: if self._session_id is not None: self._client.session.destroy(self._session_id, token=self._acl_token) self._session_id = None self._client = None
[docs] def heartbeat(self): # THIS IS A REQUIREMENT FOR CONSUL TO WORK PROPERLY. # Consul maintains a "session" token that is used to as the basis # for all operations in the service. The session must be refreshed # periodically or else it assumes that the agent maintaining the # session has died or is unreachable. When a session expires all locks # are released and any services that were registered with that session # are marked as no longer active. self._client.session.renew(self._session_id, token=self._acl_token) # renew the session every half-TTL or 1 second, whatever is larger sleep_sec = max(self._ttl / 2, 1) return sleep_sec
def _get_lock(self, real_name): return ConsulLock(real_name, self._node, self._address, session_id=self._session_id, client=self._client, token=self._acl_token)
[docs] def get_lock(self, name): real_name = self._path_lock(name) return self._get_lock(real_name)
def _get_group_lock(self, group_id): real_name = self._path_group_lock(group_id) return self._get_lock(real_name) @staticmethod def _paths_join(*args): pieces = [] for arg in args: pieces.append(encodeutils.safe_decode(arg)) return "/".join(pieces) def _path_lock(self, name): return self._paths_join(self._namespace, "locks", name) def _path_group_lock(self, name): return self._paths_join(self._namespace, "group_locks", name) def _path_group(self, group_id): # add an extra '/' at the end so that searches with this path # will go down and find their children return self._paths_join(self._namespace, "groups", group_id) + "/" def _path_member(self, group_id, member_id): return self._paths_join( self._namespace, "groups", group_id, member_id ) def _group_path_to_id(self, base_path, group_path): """Translates a path into a group name. The group name is the last part of the path. So, we simply split on the path separator '/' and return the last element. Example: group_id = self._path_to_group_id("tooz/groups/helloworld") print(group_id) # "helloworld" """ if group_path.startswith(base_path): group_id = group_path[len(base_path):] else: group_id = group_path # if a group has members (sub-keys) it will contain a trailing / # we need to strip this to get just the name # if a group has no members there is no trailing / (for some reason) group_id = group_id.strip("/") return utils.to_binary(group_id) def _get_group_members(self, group_path): index, data = self._client.kv.get(group_path, recurse=True) group = None members = [] for kv in (data or []): if kv["Key"] == group_path: group = kv else: members.append(kv) return (group, members)
[docs] def get_groups(self): @_translate_failures def _get_groups(): groups = [] index, data = self._client.kv.get(self._groups_prefix, keys=True, separator="/") for key in (data or []): if key != self._groups_prefix: group_id = self._group_path_to_id(self._groups_prefix, key) groups.append(group_id) return groups return ConsulFutureResult(self._executor.submit(_get_groups))
[docs] def create_group(self, group_id): @_translate_failures def _create_group(): group_path = self._path_group(group_id) # create with Check-And-Set index 0 will only succeed if the key # doesn't exit result = self._client.kv.put(group_path, "", cas=0) if not result: raise coordination.GroupAlreadyExist(group_id) return result return ConsulFutureResult(self._executor.submit(_create_group))
def _destroy_group(self, group_id): """Should only be used in tests...""" with self._get_group_lock(group_id) as lock: group_path = self._path_group(group_id) self._client.kv.delete(group_path, recurse=True) self._client.kv.delete(lock._name)
[docs] def delete_group(self, group_id): @_translate_failures def _delete_group(): # create a lock for the group so that other operations on this # group do not conflict while the group is being deleted with self._get_group_lock(group_id) as lock: group_path = self._path_group(group_id) group, members = self._get_group_members(group_path) if not group: raise coordination.GroupNotCreated(group_id) if members: raise coordination.GroupNotEmpty(group_id) # delete the group recursively result = self._client.kv.delete(group_path, recurse=True) # delete the lock for the group self._client.kv.delete(lock._name) return result return ConsulFutureResult(self._executor.submit(_delete_group))
[docs] def join_group(self, group_id, capabilities=b""): @_translate_failures def _join_group(): # lock the group so that it doesn't get deleted while we join with self._get_group_lock(group_id): group_path = self._path_group(group_id) member_path = self._path_member(group_id, self._member_id) group, members = self._get_group_members(group_path) if not group: raise coordination.GroupNotCreated(group_id) for m in members: if m["Key"] == member_path: raise coordination.MemberAlreadyExist(group_id, self._member_id) # create with Check-And-Set index 0 will only succeed if the # key doesn't exit self._client.kv.put(member_path, utils.dumps(capabilities), cas=0) self._joined_groups.add(group_id) return ConsulFutureResult(self._executor.submit(_join_group))
[docs] def leave_group(self, group_id): @_translate_failures def _leave_group(): # NOTE: We do NOT have to lock the group here because deletes in # Consul are atomic and succeed even if the key doesn't exist # This means that there is no race condition between checking # if the member exists and then deleting it. group_path = self._path_group(group_id) member_path = self._path_member(group_id, self._member_id) group, members = self._get_group_members(group_path) member = None for m in members: if m["Key"] == member_path: member = m break else: raise coordination.MemberNotJoined(group_id, self._member_id) # delete the member key with Check-And-Set semantics based on index # we read above self._client.kv.delete(member_path, cas=member["ModifyIndex"]) self._joined_groups.discard(group_id) return ConsulFutureResult(self._executor.submit(_leave_group))
[docs] def get_members(self, group_id): @_translate_failures def _get_members(): group_path = self._path_group(group_id) group, members = self._get_group_members(group_path) if not group: raise coordination.GroupNotCreated(group_id) result = set() for m in members: member_id = self._group_path_to_id(group_path, m["Key"]) result.add(member_id) return result return ConsulFutureResult(self._executor.submit(_get_members))
[docs] def get_member_capabilities(self, group_id, member_id): @_translate_failures def _get_member_capabilities(): member_path = self._path_member(group_id, member_id) index, data = self._client.kv.get(member_path) if not data: raise coordination.MemberNotJoined(group_id, member_id) return utils.loads(data["Value"]) return ConsulFutureResult( self._executor.submit(_get_member_capabilities))
[docs] def update_capabilities(self, group_id, capabilities): @_translate_failures def _update_capabilities(): member_path = self._path_member(group_id, self._member_id) index, data = self._client.kv.get(member_path) if not data: raise coordination.MemberNotJoined(group_id, self._member_id) # no need to Check-And-Set here, latest write wins self._client.kv.put(member_path, utils.dumps(capabilities)) return ConsulFutureResult(self._executor.submit(_update_capabilities))
[docs] @staticmethod def watch_elected_as_leader(group_id, callback): raise tooz.NotImplemented
[docs] @staticmethod def unwatch_elected_as_leader(group_id, callback): raise tooz.NotImplemented
ConsulFutureResult = functools.partial(coordination.CoordinatorResult, failure_translator=_failure_translator)