Source code for tooz.drivers.etcd3gw

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import base64
import functools
import logging
import threading
import uuid

import etcd3gw
from etcd3gw import exceptions as etcd3_exc
from oslo_utils import encodeutils

import tooz
from tooz import _retry
from tooz import coordination
from tooz import locking
from tooz import utils

LOG = logging.getLogger(__name__)


def _encode(data):
    """Safely encode data for consumption of the gateway."""
    return base64.b64encode(data).decode("ascii")


def _translate_failures(func):
    """Translates common requests exceptions into tooz exceptions."""

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except etcd3_exc.ConnectionFailedError as e:
            utils.raise_with_cause(coordination.ToozConnectionError,
                                   encodeutils.exception_to_unicode(e),
                                   cause=e)
        except etcd3_exc.ConnectionTimeoutError as e:
            utils.raise_with_cause(coordination.OperationTimedOut,
                                   encodeutils.exception_to_unicode(e),
                                   cause=e)
        except etcd3_exc.Etcd3Exception as e:
            utils.raise_with_cause(coordination.ToozError,
                                   encodeutils.exception_to_unicode(e),
                                   cause=e)

    return wrapper


class Etcd3Lock(locking.Lock):
    """An etcd3-specific lock.

    Thin wrapper over etcd3's lock object basically to provide the heartbeat()
    semantics for the coordination driver.
    """

    LOCK_PREFIX = b"/tooz/locks"

    def __init__(self, coord, name, timeout):
        super(Etcd3Lock, self).__init__(name)
        self._timeout = timeout
        self._coord = coord
        self._key = self.LOCK_PREFIX + name
        self._key_b64 = _encode(self._key)
        self._uuid = _encode(uuid.uuid4().bytes)
        self._exclusive_access = threading.Lock()

    @_translate_failures
    def acquire(self, blocking=True, shared=False, timeout=None):
        if shared:
            raise tooz.NotImplemented
        if timeout is None:
            timeout = self._timeout

        @_retry.retry(stop_max_delay=blocking)
        def _acquire():
            # TODO(jd): save the created revision so we can check it later to
            # make sure we still have the lock
            self._lease = self._coord.client.lease(timeout)
            txn = {
                'compare': [{
                    'key': self._key_b64,
                    'result': 'EQUAL',
                    'target': 'CREATE',
                    'create_revision': 0
                }],
                'success': [{
                    'request_put': {
                        'key': self._key_b64,
                        'value': self._uuid,
                        'lease': self._lease.id
                    }
                }],
                'failure': [{
                    'request_range': {
                        'key': self._key_b64
                    }
                }]
            }
            result = self._coord.client.transaction(txn)
            success = result.get('succeeded', False)

            if success is not True:
                if blocking is False:
                    return False
                raise _retry.TryAgain
            self._coord._acquired_locks.add(self)
            return True

        return _acquire()

    @_translate_failures
    def release(self):
        txn = {
            'compare': [{
                'key': self._key_b64,
                'result': 'EQUAL',
                'target': 'VALUE',
                'value': self._uuid
            }],
            'success': [{
                'request_delete_range': {
                    'key': self._key_b64
                }
            }]
        }

        with self._exclusive_access:
            result = self._coord.client.transaction(txn)
            success = result.get('succeeded', False)
            if success:
                self._coord._acquired_locks.remove(self)
                return True
        return False

    @_translate_failures
    def break_(self):
        if self._coord.client.delete(self._key):
            self._coord._acquired_locks.discard(self)
            return True
        return False

    @property
    def acquired(self):
        return self in self._coord._acquired_locks

    @_translate_failures
    def heartbeat(self):
        with self._exclusive_access:
            if self.acquired:
                self._lease.refresh()
                return True
        return False


[docs] class Etcd3Driver(coordination.CoordinationDriverCachedRunWatchers, coordination.CoordinationDriverWithExecutor): """An etcd based driver. This driver uses etcd provide the coordination driver semantics and required API(s). The Etcd driver connection URI should look like:: etcd3+PROTOCOL://[HOST[:PORT]][?OPTION1=VALUE1[&OPTION2=VALUE2[&...]]] The PROTOCOL can be http or https. If not specified, HOST defaults to localhost and PORT defaults to 2379. Available options are: ================== ======= Name Default ================== ======= api_version None ca_cert None cert_key None cert_cert None timeout 30 lock_timeout 30 membership_timeout 30 ================== ======= """ #: Default socket/lock/member/leader timeout used when none is provided. DEFAULT_TIMEOUT = 30 #: Default hostname used when none is provided. DEFAULT_HOST = "localhost" #: Default port used if none provided (4001 or 2379 are the common ones). DEFAULT_PORT = 2379 GROUP_PREFIX = b"tooz/groups/"
[docs] def __init__(self, member_id, parsed_url, options): super(Etcd3Driver, self).__init__(member_id, parsed_url, options) protocol = 'https' if parsed_url.scheme.endswith('https') else 'http' host = parsed_url.hostname or self.DEFAULT_HOST port = parsed_url.port or self.DEFAULT_PORT options = utils.collapse(options) ca_cert = options.get('ca_cert') cert_key = options.get('cert_key') cert_cert = options.get('cert_cert') timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT)) api_version = options.get("api_version") if api_version: api_path = "/" + api_version + "/" else: api_path = None self.client = etcd3gw.client(host=host, port=port, protocol=protocol, ca_cert=ca_cert, cert_key=cert_key, cert_cert=cert_cert, api_path=api_path, timeout=timeout) self.lock_timeout = int(options.get('lock_timeout', timeout)) self.membership_timeout = int(options.get( 'membership_timeout', timeout)) self._acquired_locks = set() self._membership_lease = None
def _start(self): super(Etcd3Driver, self)._start() self._membership_lease = self.client.lease(self.membership_timeout)
[docs] def get_lock(self, name): return Etcd3Lock(self, name, self.lock_timeout)
[docs] def heartbeat(self): # TODO(kaifeng) use the same lease for locks? if self._membership_lease.refresh() == -1: expired_lease = self._membership_lease.id self._membership_lease = self.client.lease(self.membership_timeout) LOG.debug('Created new lease %s after previous lease %s expired.', self._membership_lease.id, expired_lease) # NOTE(jaypipes): Copying because set can mutate during iteration for lock in self._acquired_locks.copy(): lock.heartbeat() return min(self.lock_timeout, self.membership_timeout)
def _encode_group_id(self, group_id): return _encode(self._prefix_group(group_id)) def _prefix_group(self, group_id): return b"%s%s/" % (self.GROUP_PREFIX, utils.to_binary(group_id))
[docs] def create_group(self, group_id): @_translate_failures def _create_group(): encoded_group = self._encode_group_id(group_id) txn = { 'compare': [{ 'key': encoded_group, 'result': 'EQUAL', 'target': 'VERSION', 'version': 0 }], 'success': [{ 'request_put': { 'key': encoded_group, # We shouldn't need a value, but etcd3gw needs it for # now 'value': encoded_group } }], 'failure': [] } result = self.client.transaction(txn) if not result.get("succeeded"): raise coordination.GroupAlreadyExist(group_id) return coordination.CoordinatorResult( self._executor.submit(_create_group))
def _destroy_group(self, group_id): self.client.delete(group_id)
[docs] def delete_group(self, group_id): @_translate_failures def _delete_group(): prefix_group = self._prefix_group(group_id) members = self.client.get_prefix(prefix_group) if len(members) > 1: raise coordination.GroupNotEmpty(group_id) encoded_group = self._encode_group_id(group_id) txn = { 'compare': [{ 'key': encoded_group, 'result': 'NOT_EQUAL', 'target': 'VERSION', 'version': 0 }], 'success': [{ 'request_delete_range': { 'key': encoded_group, } }], 'failure': [] } result = self.client.transaction(txn) if not result.get("succeeded"): raise coordination.GroupNotCreated(group_id) return coordination.CoordinatorResult( self._executor.submit(_delete_group))
[docs] def join_group(self, group_id, capabilities=b""): @_retry.retry() @_translate_failures def _join_group(): prefix_group = self._prefix_group(group_id) prefix_member = prefix_group + self._member_id members = self.client.get_prefix(prefix_group) encoded_member = _encode(prefix_member) group_metadata = None for cap, metadata in members: if metadata['key'] == prefix_member: raise coordination.MemberAlreadyExist(group_id, self._member_id) if metadata['key'] == prefix_group: group_metadata = metadata if group_metadata is None: raise coordination.GroupNotCreated(group_id) encoded_group = self._encode_group_id(group_id) txn = { 'compare': [{ 'key': encoded_group, 'result': 'EQUAL', 'target': 'VERSION', 'version': int(group_metadata['version']) }], 'success': [{ 'request_put': { 'key': encoded_member, 'value': _encode(utils.dumps(capabilities)), 'lease': self._membership_lease.id } }], 'failure': [] } result = self.client.transaction(txn) if not result.get('succeeded'): raise _retry.TryAgain else: self._joined_groups.add(group_id) return coordination.CoordinatorResult( self._executor.submit(_join_group))
[docs] def leave_group(self, group_id): @_translate_failures def _leave_group(): prefix_group = self._prefix_group(group_id) prefix_member = prefix_group + self._member_id members = self.client.get_prefix(prefix_group) for capabilities, metadata in members: if metadata['key'] == prefix_member: break else: raise coordination.MemberNotJoined(group_id, self._member_id) self.client.delete(prefix_member) self._joined_groups.discard(group_id) return coordination.CoordinatorResult( self._executor.submit(_leave_group))
[docs] def get_members(self, group_id): @_translate_failures def _get_members(): prefix_group = self._prefix_group(group_id) members = set() group_found = False for cap, metadata in self.client.get_prefix(prefix_group): if metadata['key'] == prefix_group: group_found = True else: members.add(metadata['key'][len(prefix_group):]) if not group_found: raise coordination.GroupNotCreated(group_id) return members return coordination.CoordinatorResult( self._executor.submit(_get_members))
[docs] def get_member_capabilities(self, group_id, member_id): @_translate_failures def _get_member_capabilities(): prefix_member = self._prefix_group(group_id) + member_id result = self.client.get(prefix_member) if not result: raise coordination.MemberNotJoined(group_id, member_id) return utils.loads(result[0]) return coordination.CoordinatorResult( self._executor.submit(_get_member_capabilities))
[docs] def update_capabilities(self, group_id, capabilities): @_translate_failures def _update_capabilities(): prefix_member = self._prefix_group(group_id) + self._member_id result = self.client.get(prefix_member) if not result: raise coordination.MemberNotJoined(group_id, self._member_id) self.client.put(prefix_member, utils.dumps(capabilities), lease=self._membership_lease) return coordination.CoordinatorResult( self._executor.submit(_update_capabilities))
[docs] def get_groups(self): @_translate_failures def _get_groups(): groups = self.client.get_prefix(self.GROUP_PREFIX) return [ group[1]['key'][len(self.GROUP_PREFIX):-1] for group in groups] return coordination.CoordinatorResult( self._executor.submit(_get_groups))
[docs] @staticmethod def watch_elected_as_leader(group_id, callback): raise tooz.NotImplemented
[docs] @staticmethod def unwatch_elected_as_leader(group_id, callback): raise tooz.NotImplemented