External usage of internal utility functions and modules should be kept to a minimum as they may be altered, refactored or moved to other locations without notice (and without the typical deprecation cycle).
Make and return a future completed with a given result.
Check if eventlet is available and if not raise a runtime error.
exc (exception) – exception to raise instead of raising a runtime error
- taskflow.utils.iter_utils.fill(it, desired_len, filler=None)¶
Iterates over a provided iterator up to the desired length.
If the source iterator does not have enough values then the filler value is yielded until the desired length is reached.
Returns how many values in the iterator (depletes the iterator).
- taskflow.utils.iter_utils.generate_delays(delay, max_delay, multiplier=2)¶
Generator/iterator that provides back delays values.
The values it generates increments by a given multiple after each iteration (using the max delay as a upper bound). Negative values will never be generated… and it will iterate forever (ie it will never stop generating values).
- taskflow.utils.iter_utils.unique_seen(its, seen_selector=None)¶
Yields unique values from iterator(s) (and retains order).
- taskflow.utils.iter_utils.find_first_match(it, matcher, not_found_value=None)¶
Searches iterator for first value that matcher callback returns true.
- taskflow.utils.iter_utils.while_is_not(it, stop_value)¶
Yields given values from iterator until stop value is passed.
This uses the
isoperator to determine equivalency (and not the
Yields values from iterator until a limit is reached.
if limit is negative, we iterate forever.
- taskflow.utils.kazoo_utils.prettify_failures(failures, limit=-1)¶
Prettifies a checked commits failures (ignores sensitive data…).
- exception taskflow.utils.kazoo_utils.KazooTransactionException(message, failures)¶
Exception raised when a checked commit fails.
Commits a kazoo transcation and validates the result.
NOTE(harlowja): Until https://github.com/python-zk/kazoo/pull/224 is fixed or a similar pull request is merged we have to workaround the transaction failing silently.
Stops and closes a client, even if it wasn’t started.
- taskflow.utils.kazoo_utils.check_compatible(client, min_version=None, max_version=None)¶
Checks if a kazoo client is backed by a zookeeper server version.
This check will verify that the zookeeper server version that the client is connected to satisfies a given minimum version (inclusive) and maximum (inclusive) version range. If the server is not in the provided version range then a exception is raised indiciating this.
conf (dict) – configuration dictionary that will be used to configure the created client
The keys that will be extracted are:
read_only: boolean that specifies whether to allow connections to read only servers, defaults to
randomize_hosts: boolean that specifies whether to randomize host lists provided, defaults to
command_retry: a kazoo retry object (or dict of options which will be used for creating one) that will be used for retrying commands that are executed
connection_retry: a kazoo retry object (or dict of options which will be used for creating one) that will be used for retrying connection failures that occur
hosts: a string, list, set (or dict with host keys) that will specify the hosts the kazoo client should be connected to, if none is provided then
localhost:2181will be used by default
timeout: a float value that specifies the default timeout that the kazoo client will use
keyfile: SSL keyfile to use for authentication
keyfile_password: SSL keyfile password
certfile: SSL certfile to use for authentication
ca: SSL CA file to use for authentication
use_ssl: argument to control whether SSL is used or not
verify_certs: when using SSL, argument to bypass
- class taskflow.utils.kombu_utils.DelayedPretty(message)¶
Wraps a message and delays prettifying it until requested.
TODO(harlowja): remove this when https://github.com/celery/kombu/pull/454/ is merged and a release is made that contains it (since that pull request is equivalent and/or better than this).
- class taskflow.utils.misc.StrEnum(value)¶
An enumeration that is also a string and can be compared to strings.
- class taskflow.utils.misc.StringIO(initial_value='', newline='\n')¶
String buffer with some small additions.
- class taskflow.utils.misc.BytesIO(initial_bytes=b'')¶
Byte buffer with some small additions.
Gets the machines hostname; if not able to returns an invalid one.
- taskflow.utils.misc.match_type(obj, matchers)¶
Matches a given object using the given matchers list/iterable.
NOTE(harlowja): each element of the provided list/iterable must be tuple of (valid types, result).
Returns the result (the second element of the provided tuple) if a type match occurs, otherwise none if no matches are found.
- taskflow.utils.misc.countdown_iter(start_at, decr=1)¶
Generator that decrements after each generation until <= zero.
NOTE(harlowja): we can likely remove this when we can use an
itertools.countthat takes a step (on py2.6 which we still support that step parameter does not exist and therefore can’t be used).
- taskflow.utils.misc.extract_driver_and_conf(conf, conf_key)¶
Common function to get a driver name and its configuration.
Like reversed(enumerate(items)) but with less copying/cloning…
- taskflow.utils.misc.merge_uri(uri, conf)¶
Merges a parsed uri into the given configuration dictionary.
Merges the username, password, hostname, port, and query parameters of a URI into the given configuration dictionary (it does not overwrite existing configuration keys if they already exist) and returns the merged configuration.
NOTE(harlowja): does not merge the path, scheme or fragment.
- taskflow.utils.misc.find_subclasses(locations, base_cls, exclude_hidden=True)¶
Finds subclass types in the given locations.
This will examines the given locations for types which are subclasses of the base class type provided and returns the found subclasses (or fails with exceptions if this introspection can not be accomplished).
If a string is provided as one of the locations it will be imported and examined if it is a subclass of the base class. If a module is given, all of its members will be examined for attributes which are subclasses of the base class. If a type itself is given it will be examined for being a subclass of the base class.
Returns first of values that is not None (or None if all are/were).
Parses a uri into its components.
Frozen checking/raising method decorator.
- taskflow.utils.misc.clamp(value, minimum, maximum, on_clamped=None)¶
Clamps a value to ensure its >= minimum and <= maximum.
- taskflow.utils.misc.fix_newlines(text, replacement='\n')¶
Fixes text that may end with wrong nl by replacing with right nl.
- taskflow.utils.misc.binary_encode(text, encoding='utf-8', errors='strict')¶
Encodes a text string into a binary string using given encoding.
Does nothing if data is already a binary string (raises on unknown types).
- taskflow.utils.misc.binary_decode(data, encoding='utf-8', errors='strict')¶
Decodes a binary string into a text string using given encoding.
Does nothing if data is already a text string (raises on unknown types).
- taskflow.utils.misc.decode_msgpack(raw_data, root_types=(<class 'dict'>, ))¶
Parse raw data to get decoded object.
Decodes a msgback encoded ‘blob’ from a given raw data binary string and checks that the root type of that decoded object is in the allowed set of types (by default a dict should be the root type).
- taskflow.utils.misc.decode_json(raw_data, root_types=(<class 'dict'>, ))¶
Parse raw data to get decoded object.
Decodes a JSON encoded ‘blob’ from a given raw data binary string and checks that the root type of that decoded object is in the allowed set of types (by default a dict should be the root type).
- class taskflow.utils.misc.cachedproperty(fget=None, require_lock=True)¶
A thread-safe descriptor property that is only evaluated once.
This caching descriptor can be placed on instance methods to translate those methods into properties that will be cached in the instance (avoiding repeated attribute checking logic to do the equivalent).
NOTE(harlowja): by default the property that will be saved will be under the decorated methods name prefixed with an underscore. For example if we were to attach this descriptor to an instance method ‘get_thing(self)’ the cached property would be stored under ‘_get_thing’ in the self object after the first call to ‘get_thing’ occurs.
Converts number of milliseconds (from epoch) into a datetime object.
Gets a object’s version as a string.
Returns string representation of object’s version taken from its ‘version’ attribute, or None if object does not have such attribute or its version is None.
- taskflow.utils.misc.sequence_minus(seq1, seq2)¶
Calculate difference of two sequences.
Result contains the elements from first sequence that are not present in second sequence, in original order. Works even if sequence elements are not hashable.
- taskflow.utils.misc.as_int(obj, quiet=False)¶
Converts an arbitrary value into a integer.
Captures the occurring exception and provides a failure object back.
This will save the current exception information and yield back a failure object for the caller to use (it will raise a runtime error if no active exception is being handled).
This is useful since in some cases the exception context can be cleared, resulting in None being attempted to be saved after an exception handler is run. This can happen when eventlet switches greenthreads or when running an exception handler, code raises and catches an exception. In both cases the exception context will be cleared.
To work around this, we save the exception state, yield a failure and then run other code.
>>> from taskflow.utils import misc >>> >>> def cleanup(): ... pass ... >>> >>> def save_failure(f): ... print("Saving %s" % f) ... >>> >>> try: ... raise IOError("Broken") ... except Exception: ... with misc.capture_failure() as fail: ... print("Activating cleanup") ... cleanup() ... save_failure(fail) ... Activating cleanup Saving Failure: IOError: Broken
Tests an object to to determine whether it is iterable.
This function will test the specified object to determine whether it is iterable. String types (both
unicode) are ignored and will return False.
obj – object to be tested for iterable
True if object is iterable and is not a string
Copy an existing dictionary or default to empty dict…
This will return a empty dict if given object is falsey, otherwise it will create a dict of the given object (which if provided a dictionary object will make a shallow copy of that object).
Creates a temporary logbook for temporary usage in the given backend.
Mainly useful for tests and other use cases where a temporary logbook is needed for a short-period of time.
- taskflow.utils.persistence_utils.temporary_flow_detail(backend=None, meta=None)¶
Creates a temporary flow detail and logbook in the given backend.
Mainly useful for tests and other use cases where a temporary flow detail and a temporary logbook is needed for a short-period of time.
- taskflow.utils.persistence_utils.create_flow_detail(flow, book=None, backend=None, meta=None)¶
Creates a flow detail for a flow & adds & saves it in a logbook.
This will create a flow detail for the given flow using the flow name, and add it to the provided logbook and then uses the given backend to save the logbook and then returns the created flow detail.
If no book is provided a temporary one will be created automatically (no reference to the logbook will be returned, so this should nearly always be provided or only used in situations where no logbook is needed, for example in tests). If no backend is provided then no saving will occur and the created flow detail will not be persisted even if the flow detail was added to a given (or temporarily generated) logbook.
- class taskflow.utils.redis_utils.RedisClient(*args, **kwargs)¶
A redis client that can be closed (and raises on-usage after closed).
TODO(harlowja): if https://github.com/andymccurdy/redis-py/issues/613 ever gets resolved or merged or other then we can likely remove this.
- execute_command(*args, **options)¶
Execute a command and return a parsed response
- transaction(func, *watches, **kwargs)¶
Convenience method for executing the callable func as a transaction while watching all keys specified in watches. The ‘func’ callable should expect a single argument which is a Pipeline object.
Return a Publish/Subscribe object. With this object, you can subscribe to channels and listen for messages that get published to them.
- class taskflow.utils.redis_utils.UnknownExpire(value)¶
Non-expiry (not ttls) results return from
- DOES_NOT_EXPIRE = -1¶
The command returns
-1if the key exists but has no associated expire.
- KEY_NOT_FOUND = -2¶
The command returns
-2if the key does not exist.
- taskflow.utils.redis_utils.get_expiry(client, key, prior_version=None)¶
Gets an expiry for a key (using best determined ttl method).
- taskflow.utils.redis_utils.apply_expiry(client, key, expiry, prior_version=None)¶
Applies an expiry to a key (using best determined expiry method).
- taskflow.utils.redis_utils.is_server_new_enough(client, min_version, default=False, prior_version=None)¶
Checks if a client is attached to a new enough redis server.
- taskflow.utils.schema_utils.schema_validate(data, schema)¶
Validates given data using provided json schema.
Helper to determine if a thread is alive (handles none safely).
Return the ‘thread identifier’ of the current thread.
Try to guess optimal thread count for current system.
- taskflow.utils.threading_utils.daemon_thread(target, *args, **kwargs)¶
Makes a daemon thread that calls the given target when started.
- taskflow.utils.threading_utils.no_op(*args, **kwargs)¶
Function that does nothing.
- class taskflow.utils.threading_utils.ThreadBundle¶
A group/bundle of threads that start/stop together.
- bind(thread_factory, before_start=None, after_start=None, before_join=None, after_join=None)¶
Adds a thread (to-be) into this bundle (with given callbacks).
Creates & starts all associated threads (that are not running).
Stops & joins all associated threads (that have been started).