Object¶
Object Auditor¶
- class swift.obj.auditor.AuditorWorker(conf, logger, rcache, devices, zero_byte_only_at_fps=0, watcher_defs=None)¶
Bases:
object
Walk through file system to audit objects
- audit_all_objects(mode='once', device_dirs=None)¶
- create_recon_nested_dict(top_level_key, device_list, item)¶
- failsafe_object_audit(location)¶
Entrypoint to object_audit, with a failsafe generic exception handler.
- object_audit(location)¶
Audits the given object location.
- Parameters:
location – an audit location (from diskfile.object_audit_location_generator)
- record_stats(obj_size)¶
Based on config’s object_size_stats will keep track of how many objects fall into the specified ranges. For example with the following:
object_size_stats = 10, 100, 1024
and your system has 3 objects of sizes: 5, 20, and 10000 bytes the log will look like: {“10”: 1, “100”: 1, “1024”: 0, “OVER”: 1}
- class swift.obj.auditor.ObjectAuditor(conf, logger=None, **options)¶
Bases:
Daemon
Audit objects.
- audit_loop(parent, zbo_fps, override_devices=None, **kwargs)¶
Parallel audit loop
- clear_recon_cache(auditor_type)¶
Clear recon cache entries
- fork_child(zero_byte_fps=False, sleep_between_zbf_scanner=False, **kwargs)¶
Child execution
- run_audit(**kwargs)¶
Run the object audit
- run_forever(*args, **kwargs)¶
Run the object audit until stopped.
- run_once(*args, **kwargs)¶
Run the object audit once
- class swift.obj.auditor.WatcherWrapper(watcher_class, watcher_name, conf, logger)¶
Bases:
object
Run the user-supplied watcher.
Simple and gets the job done. Note that we aren’t doing anything to isolate ourselves from hangs or file descriptor leaks in the plugins.
- end()¶
- see_object(meta, data_file_path)¶
- start(audit_type)¶
- swift.obj.auditor.main()¶
Object Backend¶
Disk File Interface for the Swift Object Server
The DiskFile, DiskFileWriter and DiskFileReader classes combined define the on-disk abstraction layer for supporting the object server REST API interfaces (excluding REPLICATE). Other implementations wishing to provide an alternative backend for the object server must implement the three classes. An example alternative implementation can be found in the mem_server.py and mem_diskfile.py modules along size this one.
The DiskFileManager is a reference implemenation specific class and is not part of the backend API.
The remaining methods in this module are considered implementation specific and are also not considered part of the backend API.
- class swift.obj.diskfile.AuditLocation(path, device, partition, policy)¶
Bases:
object
Represents an object location to be audited.
Other than being a bucket of data, the only useful thing this does is stringify to a filesystem path so the auditor’s logs look okay.
- class swift.obj.diskfile.BaseDiskFile(mgr, device_path, partition, account=None, container=None, obj=None, _datadir=None, policy=None, use_splice=False, pipe_size=None, open_expired=False, next_part_power=None, **kwargs)¶
Bases:
object
Manage object files.
This specific implementation manages object files on a disk formatted with a POSIX-compliant file system that supports extended attributes as metadata on a file or directory.
Note
The arguments to the constructor are considered implementation specific. The API does not define the constructor arguments.
The following path format is used for data file locations: <devices_path/<device_dir>/<datadir>/<partdir>/<suffixdir>/<hashdir>/ <datafile>.<ext>
- Parameters:
mgr – associated DiskFileManager instance
device_path – path to the target device or drive
partition – partition on the device in which the object lives
account – account name for the object
container – container name for the object
obj – object name for the object
_datadir – override the full datadir otherwise constructed here
policy – the StoragePolicy instance
use_splice – if true, use zero-copy splice() to send data
pipe_size – size of pipe buffer used in zero-copy operations
open_expired – if True, open() will not raise a DiskFileExpired if object is expired
next_part_power – the next partition power to be used
- property account¶
- property container¶
- property content_length¶
- property content_type¶
- property content_type_timestamp¶
- create(size=None, extension='.data')¶
Context manager to create a file. We create a temporary file first, and then return a DiskFileWriter object to encapsulate the state.
Note
An implementation is not required to perform on-disk preallocations even if the parameter is specified. But if it does and it fails, it must raise a DiskFileNoSpace exception.
- Parameters:
size – optional initial size of file to explicitly allocate on disk
extension – file extension to use for the newly-created file; defaults to
.data
for the sake of tests
- Raises:
DiskFileNoSpace – if a size is specified and allocation fails
- property data_timestamp¶
- delete(timestamp)¶
Delete the object.
This implementation creates a tombstone file using the given timestamp, and removes any older versions of the object file. Any file that has an older timestamp than timestamp will be deleted.
Note
An implementation is free to use or ignore the timestamp parameter.
- Parameters:
timestamp – timestamp to compare with each file
- Raises:
DiskFileError – this implementation will raise the same errors as the create() method.
- property durable_timestamp¶
Provides the timestamp of the newest data file found in the object directory.
- Returns:
A Timestamp instance, or None if no data file was found.
- Raises:
DiskFileNotOpen – if the open() method has not been previously called on this instance.
- property fragments¶
- classmethod from_hash_dir(mgr, hash_dir_path, device_path, partition, policy)¶
- get_datafile_metadata()¶
Provide the datafile metadata for a previously opened object as a dictionary. This is metadata that was included when the object was first PUT, and does not include metadata set by any subsequent POST.
- Returns:
object’s datafile metadata dictionary
- Raises:
DiskFileNotOpen – if the
swift.obj.diskfile.DiskFile.open()
method was not previously invoked
- get_metadata()¶
Provide the metadata for a previously opened object as a dictionary.
- Returns:
object’s metadata dictionary
- Raises:
DiskFileNotOpen – if the
swift.obj.diskfile.DiskFile.open()
method was not previously invoked
- get_metafile_metadata()¶
Provide the metafile metadata for a previously opened object as a dictionary. This is metadata that was written by a POST and does not include any persistent metadata that was set by the original PUT.
- Returns:
object’s .meta file metadata dictionary, or None if there is no .meta file
- Raises:
DiskFileNotOpen – if the
swift.obj.diskfile.DiskFile.open()
method was not previously invoked
- property manager¶
- property obj¶
- open(modernize=False, current_time=None)¶
Open the object.
This implementation opens the data file representing the object, reads the associated metadata in the extended attributes, additionally combining metadata from fast-POST .meta files.
- Parameters:
modernize – if set, update this diskfile to the latest format. Currently, this means adding metadata checksums if none are present.
current_time – Unix time used in checking expiration. If not present, the current time will be used.
Note
An implementation is allowed to raise any of the following exceptions, but is only required to raise DiskFileNotExist when the object representation does not exist.
- Raises:
DiskFileCollision – on name mis-match with metadata
DiskFileNotExist – if the object does not exist
DiskFileDeleted – if the object was previously deleted
DiskFileQuarantined – if while reading metadata of the file some data did pass cross checks
- Returns:
itself for use as a context manager
- read_metadata(current_time=None)¶
Return the metadata for an object without requiring the caller to open the object first.
- Parameters:
current_time – Unix time used in checking expiration. If not present, the current time will be used.
- Returns:
metadata dictionary for an object
- Raises:
DiskFileError – this implementation will raise the same errors as the open() method.
- reader(keep_cache=False, cooperative_period=0, _quarantine_hook=<function BaseDiskFile.<lambda>>)¶
Return a
swift.common.swob.Response
class compatible “app_iter” object as defined byswift.obj.diskfile.DiskFileReader
.For this implementation, the responsibility of closing the open file is passed to the
swift.obj.diskfile.DiskFileReader
object.- Parameters:
keep_cache – caller’s preference for keeping data read in the OS buffer cache
cooperative_period – the period parameter for cooperative yielding during file read
_quarantine_hook – 1-arg callable called when obj quarantined; the arg is the reason for quarantine. Default is to ignore it. Not needed by the REST layer.
- Returns:
a
swift.obj.diskfile.DiskFileReader
object
- reader_cls = None¶
- property timestamp¶
- validate_metadata()¶
- write_metadata(metadata)¶
Write a block of metadata to an object without requiring the caller to create the object first. Supports fast-POST behavior semantics.
- Parameters:
metadata – dictionary of metadata to be associated with the object
- Raises:
DiskFileError – this implementation will raise the same errors as the create() method.
- writer(size=None)¶
- writer_cls = None¶
- class swift.obj.diskfile.BaseDiskFileManager(conf, logger)¶
Bases:
object
Management class for devices, providing common place for shared parameters and methods not provided by the DiskFile class (which primarily services the object server REST API layer).
The get_diskfile() method is how this implementation creates a DiskFile object.
Note
This class is reference implementation specific and not part of the pluggable on-disk backend API.
Note
TODO(portante): Not sure what the right name to recommend here, as “manager” seemed generic enough, though suggestions are welcome.
- Parameters:
conf – caller provided configuration object
logger – caller provided logger
- classmethod check_policy(policy)¶
- cleanup_ondisk_files(hsh_path, **kwargs)¶
Clean up on-disk files that are obsolete and gather the set of valid on-disk files for an object.
- Parameters:
hsh_path – object hash path
frag_index – if set, search for a specific fragment index .data file, otherwise accept the first valid .data file
- Returns:
a dict that may contain: valid on disk files keyed by their filename extension; a list of obsolete files stored under the key ‘obsolete’; a list of files remaining in the directory, reverse sorted, stored under the key ‘files’.
- clear_auditor_status(policy, auditor_type='ALL')¶
- static consolidate_hashes(partition_dir)¶
Take what’s in hashes.pkl and hashes.invalid, combine them, write the result back to hashes.pkl, and clear out hashes.invalid.
- Parameters:
partition_dir – absolute path to partition dir containing hashes.pkl and hashes.invalid
- Returns:
a dict, the suffix hashes (if any), the key ‘valid’ will be False if hashes.pkl is corrupt, cannot be read or does not exist
- construct_dev_path(device)¶
Construct the path to a device without checking if it is mounted.
- Parameters:
device – name of target device
- Returns:
full path to the device
- diskfile_cls = None¶
- get_dev_path(device, mount_check=None)¶
Return the path to a device, first checking to see if either it is a proper mount point, or at least a directory depending on the mount_check configuration option.
- Parameters:
device – name of target device
mount_check – whether or not to check mountedness of device. Defaults to bool(self.mount_check).
- Returns:
full path to the device, None if the path to the device is not a proper mount point or directory.
- get_diskfile(device, partition, account, container, obj, policy, **kwargs)¶
Returns a BaseDiskFile instance for an object based on the object’s partition, path parts and policy.
- Parameters:
device – name of target device
partition – partition on device in which the object lives
account – account name for the object
container – container name for the object
obj – object name for the object
policy – the StoragePolicy instance
- get_diskfile_and_filenames_from_hash(device, partition, object_hash, policy, **kwargs)¶
Returns a tuple of (a DiskFile instance for an object at the given object_hash, the basenames of the files in the object’s hash dir). Just in case someone thinks of refactoring, be sure DiskFileDeleted is not raised, but the DiskFile instance representing the tombstoned object is returned instead.
- Parameters:
device – name of target device
partition – partition on the device in which the object lives
object_hash – the hash of an object path
policy – the StoragePolicy instance
- Raises:
DiskFileNotExist – if the object does not exist
- Returns:
a tuple comprising (an instance of BaseDiskFile, a list of file basenames)
- get_diskfile_from_audit_location(audit_location)¶
Returns a BaseDiskFile instance for an object at the given AuditLocation.
- Parameters:
audit_location – object location to be audited
- get_diskfile_from_hash(device, partition, object_hash, policy, **kwargs)¶
Returns a DiskFile instance for an object at the given object_hash. Just in case someone thinks of refactoring, be sure DiskFileDeleted is not raised, but the DiskFile instance representing the tombstoned object is returned instead.
- Parameters:
device – name of target device
partition – partition on the device in which the object lives
object_hash – the hash of an object path
policy – the StoragePolicy instance
- Raises:
DiskFileNotExist – if the object does not exist
- Returns:
an instance of BaseDiskFile
- get_hashes(device, partition, suffixes, policy, skip_rehash=False)¶
- Parameters:
device – name of target device
partition – partition name
suffixes – a list of suffix directories to be recalculated
policy – the StoragePolicy instance
skip_rehash – just mark the suffixes dirty; return None
- Returns:
a dictionary that maps suffix directories
- get_ondisk_files(files, datadir, verify=True, policy=None, **kwargs)¶
Given a simple list of files names, determine the files that constitute a valid fileset i.e. a set of files that defines the state of an object, and determine the files that are obsolete and could be deleted. Note that some files may fall into neither category.
If a file is considered part of a valid fileset then its info dict will be added to the results dict, keyed by <extension>_info. Any files that are no longer required will have their info dicts added to a list stored under the key ‘obsolete’.
The results dict will always contain entries with keys ‘ts_file’, ‘data_file’ and ‘meta_file’. Their values will be the fully qualified path to a file of the corresponding type if there is such a file in the valid fileset, or None.
- Parameters:
files – a list of file names.
datadir – directory name files are from; this is used to construct file paths in the results, but the datadir is not modified by this method.
verify – if True verify that the ondisk file contract has not been violated, otherwise do not verify.
policy – storage policy used to store the files. Used to validate fragment indexes for EC policies.
- Returns:
- a dict that will contain keys:
ts_file -> path to a .ts file or None data_file -> path to a .data file or None meta_file -> path to a .meta file or None ctype_file -> path to a .meta file or None
- and may contain keys:
ts_info -> a file info dict for a .ts file data_info -> a file info dict for a .data file meta_info -> a file info dict for a .meta file ctype_info -> a file info dict for a .meta file which contains the content-type value unexpected -> a list of file paths for unexpected files possible_reclaim -> a list of file info dicts for possible reclaimable files obsolete -> a list of file info dicts for obsolete files
- static invalidate_hash(suffix_dir)¶
Invalidates the hash for a suffix_dir in the partition’s hashes file.
- Parameters:
suffix_dir – absolute path to suffix dir whose hash needs invalidating
- make_on_disk_filename(timestamp, ext=None, ctype_timestamp=None, *a, **kw)¶
Returns filename for given timestamp.
- Parameters:
timestamp – the object timestamp, an instance of
Timestamp
ext – an optional string representing a file extension to be appended to the returned file name
ctype_timestamp – an optional content-type timestamp, an instance of
Timestamp
- Returns:
a file name
- object_audit_location_generator(policy, device_dirs=None, auditor_type='ALL')¶
Yield an AuditLocation for all objects stored under device_dirs.
- Parameters:
policy – the StoragePolicy instance
device_dirs – directory of target device
auditor_type – either ALL or ZBF
- parse_on_disk_filename(filename, policy)¶
Parse an on disk file name.
- Parameters:
filename – the file name including extension
policy – storage policy used to store the file
- Returns:
a dict, with keys for timestamp, ext and ctype_timestamp:
timestamp is a
Timestamp
ctype_timestamp is a
Timestamp
or None for .meta files, otherwise Noneext is a string, the file extension including the leading dot or the empty string if the filename has no extension.
Subclasses may override this method to add further keys to the returned dict.
- Raises:
DiskFileError – if any part of the filename is not able to be validated.
- partition_lock(device, policy, partition, name=None, timeout=None)¶
A context manager that will lock on the partition given.
- Parameters:
device – device targeted by the lock request
policy – policy targeted by the lock request
partition – partition targeted by the lock request
- Raises:
PartitionLockTimeout – If the lock on the partition cannot be granted within the configured timeout.
- pickle_async_update(device, account, container, obj, data, timestamp, policy)¶
Write data describing a container update notification to a pickle file in the async_pending directory.
- Parameters:
device – name of target device
account – account name for the object
container – container name for the object
obj – object name for the object
data – update data to be written to pickle file
timestamp – a Timestamp
policy – the StoragePolicy instance
- policy = None¶
- static quarantine_renamer(device_path, corrupted_file_path)¶
In the case that a file is corrupted, move it to a quarantined area to allow replication to fix it.
- Params device_path:
The path to the device the corrupted file is on.
- Params corrupted_file_path:
The path to the file you want quarantined.
- Returns:
path (str) of directory the file was moved to
- Raises:
OSError – re-raises non errno.EEXIST / errno.ENOTEMPTY exceptions from rename
- replication_lock(device, policy, partition)¶
A context manager that will lock on the partition and, if configured to do so, on the device given.
- Parameters:
device – name of target device
policy – policy targeted by the replication request
partition – partition targeted by the replication request
- Raises:
ReplicationLockTimeout – If the lock on the device cannot be granted within the configured timeout.
- yield_hashes(device, partition, policy, suffixes=None, **kwargs)¶
Yields tuples of (hash_only, timestamps) for object information stored for the given device, partition, and (optionally) suffixes. If suffixes is None, all stored suffixes will be searched for object hashes. Note that if suffixes is not None but empty, such as [], then nothing will be yielded.
timestamps is a dict which may contain items mapping:
ts_data -> timestamp of data or tombstone file,
ts_meta -> timestamp of meta file, if one exists
- ts_ctype -> timestamp of meta file containing most recent
content-type value, if one exists
durable -> True if data file at ts_data is durable, False otherwise
where timestamps are instances of
Timestamp
- Parameters:
device – name of target device
partition – partition name
policy – the StoragePolicy instance
suffixes – optional list of suffix directories to be searched
- yield_suffixes(device, partition, policy)¶
Yields tuples of (full_path, suffix_only) for suffixes stored on the given device and partition.
- Parameters:
device – name of target device
partition – partition name
policy – the StoragePolicy instance
- class swift.obj.diskfile.BaseDiskFileReader(fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, use_splice, pipe_size, diskfile, keep_cache=False, cooperative_period=0)¶
Bases:
object
Encapsulation of the WSGI read context for servicing GET REST API requests. Serves as the context manager object for the
swift.obj.diskfile.DiskFile
class’sswift.obj.diskfile.DiskFile.reader()
method.Note
The quarantining behavior of this method is considered implementation specific, and is not required of the API.
Note
The arguments to the constructor are considered implementation specific. The API does not define the constructor arguments.
- Parameters:
fp – open file object pointer reference
data_file – on-disk data file name for the object
obj_size – verified on-disk size of the object
etag – expected metadata etag value for entire file
disk_chunk_size – size of reads from disk in bytes
keep_cache_size – maximum object size that will be kept in cache
device_path – on-disk device path, used when quarantining an obj
logger – logger caller wants this object to use
quarantine_hook – 1-arg callable called w/reason when quarantined
use_splice – if true, use zero-copy splice() to send data
pipe_size – size of pipe buffer used in zero-copy operations
diskfile – the diskfile creating this DiskFileReader instance
keep_cache – should resulting reads be kept in the buffer cache
cooperative_period – the period parameter when does cooperative yielding during file read
- app_iter_range(start, stop)¶
Returns an iterator over the data file for range (start, stop)
- app_iter_ranges(ranges, content_type, boundary, size)¶
Returns an iterator over the data file for a set of ranges
- can_zero_copy_send()¶
- close()¶
Close the open file handle if present.
For this specific implementation, this method will handle quarantining the file if necessary.
- property manager¶
- zero_copy_send(wsockfd)¶
Does some magic with splice() and tee() to move stuff from disk to network without ever touching userspace.
- Parameters:
wsockfd – file descriptor (integer) of the socket out which to send data
- class swift.obj.diskfile.BaseDiskFileWriter(name, datadir, size, bytes_per_sync, diskfile, next_part_power, extension='.data')¶
Bases:
object
Encapsulation of the write context for servicing PUT REST API requests. Serves as the context manager object for the
swift.obj.diskfile.DiskFile
class’sswift.obj.diskfile.DiskFile.create()
method.Note
It is the responsibility of the
swift.obj.diskfile.DiskFile.create()
method context manager to close the open file descriptor.Note
The arguments to the constructor are considered implementation specific. The API does not define the constructor arguments.
- Parameters:
name – name of object from REST API
datadir – on-disk directory object will end up in on
swift.obj.diskfile.DiskFileWriter.put()
fd – open file descriptor of temporary file to receive data
tmppath – full path name of the opened file descriptor
bytes_per_sync – number bytes written between sync calls
diskfile – the diskfile creating this DiskFileWriter instance
next_part_power – the next partition power to be used
extension – the file extension to be used; may be used internally to distinguish between PUT/POST/DELETE operations
- chunks_finished()¶
Expose internal stats about written chunks.
- Returns:
a tuple, (upload_size, etag)
- close()¶
- commit(timestamp)¶
Perform any operations necessary to mark the object as durable. For replication policy type this is a no-op.
- Parameters:
timestamp – object put timestamp, an instance of
Timestamp
- property logger¶
- property manager¶
- open()¶
- put(metadata)¶
Finalize writing the file on disk.
- Parameters:
metadata – dictionary of metadata to be associated with the object
- write(chunk)¶
Write a chunk of data to disk. All invocations of this method must come before invoking the :func:
For this implementation, the data is written into a temporary file.
- Parameters:
chunk – the chunk of data to write as a string object
- class swift.obj.diskfile.DiskFile(mgr, device_path, partition, account=None, container=None, obj=None, _datadir=None, policy=None, use_splice=False, pipe_size=None, open_expired=False, next_part_power=None, **kwargs)¶
Bases:
BaseDiskFile
- reader_cls¶
alias of
DiskFileReader
- writer_cls¶
alias of
DiskFileWriter
- class swift.obj.diskfile.DiskFileManager(conf, logger)¶
Bases:
BaseDiskFileManager
- policy = 'replication'¶
- class swift.obj.diskfile.DiskFileReader(fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, use_splice, pipe_size, diskfile, keep_cache=False, cooperative_period=0)¶
Bases:
BaseDiskFileReader
- class swift.obj.diskfile.DiskFileRouter(*args, **kwargs)¶
Bases:
object
- class swift.obj.diskfile.DiskFileWriter(name, datadir, size, bytes_per_sync, diskfile, next_part_power, extension='.data')¶
Bases:
BaseDiskFileWriter
- put(metadata)¶
Finalize writing the file on disk.
- Parameters:
metadata – dictionary of metadata to be associated with the object
- class swift.obj.diskfile.ECDiskFile(*args, **kwargs)¶
Bases:
BaseDiskFile
- property durable_timestamp¶
Provides the timestamp of the newest durable file found in the object directory.
- Returns:
A Timestamp instance, or None if no durable file was found.
- Raises:
DiskFileNotOpen – if the open() method has not been previously called on this instance.
- property fragments¶
Provides information about all fragments that were found in the object directory, including fragments without a matching durable file, and including any fragment chosen to construct the opened diskfile.
- Returns:
A dict mapping <Timestamp instance> -> <list of frag indexes>, or None if the diskfile has not been opened or no fragments were found.
- purge(timestamp, frag_index, nondurable_purge_delay=0, meta_timestamp=None)¶
Remove a tombstone file matching the specified timestamp or datafile matching the specified timestamp and fragment index from the object directory.
This provides the EC reconstructor/ssync process with a way to remove a tombstone or fragment from a handoff node after reverting it to its primary node.
The hash will be invalidated, and if empty the hsh_path will be removed immediately.
- Parameters:
timestamp – the object timestamp, an instance of
Timestamp
frag_index – fragment archive index, must be a whole number or None.
nondurable_purge_delay – only remove a non-durable data file if it’s been on disk longer than this many seconds.
meta_timestamp – if not None then remove any meta file with this timestamp
- reader_cls¶
alias of
ECDiskFileReader
- validate_metadata()¶
- writer_cls¶
alias of
ECDiskFileWriter
- class swift.obj.diskfile.ECDiskFileManager(conf, logger)¶
Bases:
BaseDiskFileManager
- diskfile_cls¶
alias of
ECDiskFile
- make_on_disk_filename(timestamp, ext=None, frag_index=None, ctype_timestamp=None, durable=False, *a, **kw)¶
Returns the EC specific filename for given timestamp.
- Parameters:
timestamp – the object timestamp, an instance of
Timestamp
ext – an optional string representing a file extension to be appended to the returned file name
frag_index – a fragment archive index, used with .data extension only, must be a whole number.
ctype_timestamp – an optional content-type timestamp, an instance of
Timestamp
durable – if True then include a durable marker in data filename.
- Returns:
a file name
- Raises:
DiskFileError – if ext==’.data’ and the kwarg frag_index is not a whole number
- parse_on_disk_filename(filename, policy)¶
Returns timestamp(s) and other info extracted from a policy specific file name. For EC policy the data file name includes a fragment index and possibly a durable marker, both of which must be stripped off to retrieve the timestamp.
- Parameters:
filename – the file name including extension
- Returns:
- a dict, with keys for timestamp, frag_index, durable, ext and
ctype_timestamp:
timestamp is a
Timestamp
frag_index is an int or None
ctype_timestamp is a
Timestamp
or None for .meta files, otherwise Noneext is a string, the file extension including the leading dot or the empty string if the filename has no extension
durable is a boolean that is True if the filename is a data file that includes a durable marker
- Raises:
DiskFileError – if any part of the filename is not able to be validated.
- policy = 'erasure_coding'¶
- validate_fragment_index(frag_index, policy=None)¶
Return int representation of frag_index, or raise a DiskFileError if frag_index is not a whole number.
- Parameters:
frag_index – a fragment archive index
policy – storage policy used to validate the index against
- class swift.obj.diskfile.ECDiskFileReader(fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, use_splice, pipe_size, diskfile, keep_cache=False, cooperative_period=0)¶
Bases:
BaseDiskFileReader
- class swift.obj.diskfile.ECDiskFileWriter(name, datadir, size, bytes_per_sync, diskfile, next_part_power, extension='.data')¶
Bases:
BaseDiskFileWriter
- commit(timestamp)¶
Finalize put by renaming the object data file to include a durable marker. We do this for EC policy because it requires a 2-phase put commit confirmation.
- Parameters:
timestamp – object put timestamp, an instance of
Timestamp
- Raises:
DiskFileError – if the diskfile frag_index has not been set (either during initialisation or a call to put())
- put(metadata)¶
The only difference between this method and the replication policy DiskFileWriter method is adding the frag index to the metadata.
- Parameters:
metadata – dictionary of metadata to be associated with object
- swift.obj.diskfile.clear_auditor_status(devices, datadir, auditor_type='ALL')¶
- swift.obj.diskfile.consolidate_hashes(partition_dir)¶
Take what’s in hashes.pkl and hashes.invalid, combine them, write the result back to hashes.pkl, and clear out hashes.invalid.
- Parameters:
partition_dir – absolute path to partition dir containing hashes.pkl and hashes.invalid
- Returns:
a dict, the suffix hashes (if any), the key ‘valid’ will be False if hashes.pkl is corrupt, cannot be read or does not exist
- swift.obj.diskfile.extract_policy(obj_path)¶
Extracts the policy for an object (based on the name of the objects directory) given the device-relative path to the object. Returns None in the event that the path is malformed in some way.
The device-relative path is everything after the mount point; for example:
- /srv/node/d42/objects-5/30/179/
485dc017205a81df3af616d917c90179/1401811134.873649.data
would have device-relative path:
objects-5/30/179/485dc017205a81df3af616d917c90179/1401811134.873649.data
- Parameters:
obj_path – device-relative path of an object, or the full path
- Returns:
a
BaseStoragePolicy
or None
- swift.obj.diskfile.get_async_dir(policy_or_index)¶
Get the async dir for the given policy.
- Parameters:
policy_or_index –
StoragePolicy
instance, or an index (string or int); if None, the legacy Policy-0 is assumed.- Returns:
async_pending
orasync_pending-<N>
as appropriate
- swift.obj.diskfile.get_auditor_status(datadir_path, logger, auditor_type)¶
- swift.obj.diskfile.get_data_dir(policy_or_index)¶
Get the data dir for the given policy.
- Parameters:
policy_or_index –
StoragePolicy
instance, or an index (string or int); if None, the legacy Policy-0 is assumed.- Returns:
objects
orobjects-<N>
as appropriate
- swift.obj.diskfile.get_part_path(dev_path, policy, partition)¶
Given the device path, policy, and partition, returns the full path to the partition
- swift.obj.diskfile.get_tmp_dir(policy_or_index)¶
Get the temp dir for the given policy.
- Parameters:
policy_or_index –
StoragePolicy
instance, or an index (string or int); if None, the legacy Policy-0 is assumed.- Returns:
tmp
ortmp-<N>
as appropriate
- swift.obj.diskfile.invalidate_hash(suffix_dir)¶
Invalidates the hash for a suffix_dir in the partition’s hashes file.
- Parameters:
suffix_dir – absolute path to suffix dir whose hash needs invalidating
- swift.obj.diskfile.object_audit_location_generator(devices, datadir, mount_check=True, logger=None, device_dirs=None, auditor_type='ALL')¶
Given a devices path (e.g. “/srv/node”), yield an AuditLocation for all objects stored under that directory for the given datadir (policy), if device_dirs isn’t set. If device_dirs is set, only yield AuditLocation for the objects under the entries in device_dirs. The AuditLocation only knows the path to the hash directory, not to the .data file therein (if any). This is to avoid a double listdir(hash_dir); the DiskFile object will always do one, so we don’t.
- Parameters:
devices – parent directory of the devices to be audited
datadir – objects directory
mount_check – flag to check if a mount check should be performed on devices
logger – a logger object
device_dirs – a list of directories under devices to traverse
auditor_type – either ALL or ZBF
- swift.obj.diskfile.quarantine_renamer(device_path, corrupted_file_path)¶
In the case that a file is corrupted, move it to a quarantined area to allow replication to fix it.
- Params device_path:
The path to the device the corrupted file is on.
- Params corrupted_file_path:
The path to the file you want quarantined.
- Returns:
path (str) of directory the file was moved to
- Raises:
OSError – re-raises non errno.EEXIST / errno.ENOTEMPTY exceptions from rename
- swift.obj.diskfile.read_hashes(partition_dir)¶
Read the existing hashes.pkl
- Returns:
a dict, the suffix hashes (if any), the key ‘valid’ will be False if hashes.pkl is corrupt, cannot be read or does not exist
- swift.obj.diskfile.read_metadata(fd, add_missing_checksum=False)¶
Helper function to read the pickled metadata from an object file.
- Parameters:
fd – file descriptor or filename to load the metadata from
add_missing_checksum – if set and checksum is missing, add it
- Returns:
dictionary of metadata
- swift.obj.diskfile.relink_paths(target_path, new_target_path, ignore_missing=True)¶
Hard-links a file located in
target_path
using the second pathnew_target_path
. Creates intermediate directories if required.- Parameters:
target_path – current absolute filename
new_target_path – new absolute filename for the hardlink
ignore_missing – if True then no exception is raised if the link could not be made because
target_path
did not exist, otherwise an OSError will be raised.
- Raises:
OSError if the hard link could not be created, unless the intended hard link already exists or the
target_path
does not exist andmust_exist
if False.- Returns:
True if the link was created by the call to this method, False otherwise.
- swift.obj.diskfile.update_auditor_status(datadir_path, logger, partitions, auditor_type)¶
- swift.obj.diskfile.valid_suffix(value)¶
- swift.obj.diskfile.write_hashes(partition_dir, hashes)¶
Write hashes to hashes.pkl
The updated key is added to hashes before it is written.
- swift.obj.diskfile.write_metadata(fd, metadata, xattr_size=65536)¶
Helper function to write pickled metadata for an object file.
- Parameters:
fd – file descriptor or filename to write the metadata
metadata – metadata to write
Object Replicator¶
- class swift.obj.replicator.ObjectReplicator(conf, logger=None)¶
Bases:
Daemon
Replicate objects.
Encapsulates most logic and data needed by the object replication process. Each call to .replicate() performs one replication pass. It’s up to the caller to do this in a loop.
- aggregate_recon_update()¶
- build_replication_jobs(policy, ips, override_devices=None, override_partitions=None)¶
Helper function for collect_jobs to build jobs for replication using replication style storage policy
- check_ring(object_ring)¶
Check to see if the ring has been updated :param object_ring: the ring to check
- Returns:
boolean indicating whether or not the ring has changed
- collect_jobs(override_devices=None, override_partitions=None, override_policies=None)¶
Returns a sorted list of jobs (dictionaries) that specify the partitions, nodes, etc to be rsynced.
- Parameters:
override_devices – if set, only jobs on these devices will be returned
override_partitions – if set, only jobs on these partitions will be returned
override_policies – if set, only jobs in these storage policies will be returned
- delete_handoff_objs(job, delete_objs)¶
- delete_partition(path)¶
- get_local_devices()¶
Returns a set of all local devices in all replication-type storage policies.
This is the device names, e.g. “sdq” or “d1234” or something, not the full ring entries.
- get_worker_args(once=False, **kwargs)¶
For each worker yield a (possibly empty) dict of kwargs to pass along to the daemon’s
run()
method after fork. The length of elements returned from this method will determine the number of processes created.If the returned iterable is empty, the Strategy will fallback to run-inline strategy.
- Parameters:
once – False if the worker(s) will be daemonized, True if the worker(s) will be run once
kwargs – plumbed through via command line argparser
- Returns:
an iterable of dicts, each element represents the kwargs to be passed to a single worker’s
run()
method after fork.
- heartbeat()¶
Loop that runs in the background during replication. It periodically logs progress.
- is_healthy()¶
Check whether our set of local devices remains the same.
If devices have been added or removed, then we return False here so that we can kill off any worker processes and then distribute the new set of local devices across a new set of workers so that all devices are, once again, being worked on.
This function may also cause recon stats to be updated.
- Returns:
False if any local devices have been added or removed, True otherwise
- load_object_ring(policy)¶
Make sure the policy’s rings are loaded.
- Parameters:
policy – the StoragePolicy instance
- Returns:
appropriate ring object
- post_multiprocess_run()¶
Override this to do something after running using multiple worker processes. This method is called in the parent process.
This is probably only useful for run-once mode since there is no “after running” in run-forever mode.
- replicate(override_devices=None, override_partitions=None, override_policies=None, start_time=None)¶
Run a replication pass
- revert(job)¶
High-level method that replicates a single partition that doesn’t belong on this node.
- Parameters:
job – a dict containing info about the partition to be replicated
- rsync(node, job, suffixes)¶
Uses rsync to implement the sync method. This was the first sync method in Swift.
- run_forever(multiprocess_worker_index=None, override_devices=None, *args, **kwargs)¶
Override this to run forever
- run_once(multiprocess_worker_index=None, have_overrides=False, *args, **kwargs)¶
Override this to run the script once
- ssync(node, job, suffixes, remote_check_objs=None)¶
- stats_line()¶
Logs various stats for the currently running replication pass.
- sync(node, job, suffixes, *args, **kwargs)¶
Synchronize local suffix directories from a partition with a remote node.
- Parameters:
node – the “dev” entry for the remote node to sync with
job – information about the partition being synced
suffixes – a list of suffixes which need to be pushed
- Returns:
boolean and dictionary, boolean indicating success or failure
- property total_stats¶
- update(job)¶
High-level method that replicates a single partition.
- Parameters:
job – a dict containing info about the partition to be replicated
- update_recon(total, end_time, override_devices)¶
- class swift.obj.replicator.Stats(attempted=0, failure=0, hashmatch=0, remove=0, rsync=0, success=0, suffix_count=0, suffix_hash=0, suffix_sync=0, failure_nodes=None)¶
Bases:
object
- add_failure_stats(failures)¶
Note the failure of one or more devices.
- Parameters:
failures – a list of (ip, device-name) pairs that failed
- fields = ['attempted', 'failure', 'hashmatch', 'remove', 'rsync', 'success', 'suffix_count', 'suffix_hash', 'suffix_sync', 'failure_nodes']¶
- classmethod from_recon(dct)¶
- to_recon()¶
- swift.obj.replicator.main()¶
- class swift.obj.ssync_sender.Sender(daemon, node, job, suffixes, remote_check_objs=None, include_non_durable=False, max_objects=0)¶
Bases:
object
Sends SSYNC requests to the object server.
These requests are eventually handled by
ssync_receiver
and full documentation about the process is there.- connect()¶
Establishes a connection and starts an SSYNC request with the object server.
- disconnect(connection)¶
Closes down the connection to the object server once done with the SSYNC request.
- missing_check(connection, response)¶
Handles the sender-side of the MISSING_CHECK step of a SSYNC request.
Full documentation of this can be found at
Receiver.missing_check()
.
- send_delete(connection, url_path, timestamp)¶
Sends a DELETE subrequest with the given information.
- send_post(connection, url_path, df)¶
- send_put(connection, url_path, df, durable=True)¶
Sends a PUT subrequest for the url_path using the source df (DiskFile) and content_length.
- send_subrequest(connection, method, url_path, headers, df)¶
- updates(connection, response, send_map)¶
Handles the sender-side of the UPDATES step of an SSYNC request.
Full documentation of this can be found at
Receiver.updates()
.
- class swift.obj.ssync_sender.SsyncBufferedHTTPConnection(host, port=None, timeout=<object object>, source_address=None)¶
Bases:
BufferedHTTPConnection
- response_class¶
alias of
SsyncBufferedHTTPResponse
- class swift.obj.ssync_sender.SsyncBufferedHTTPResponse(*args, **kwargs)¶
Bases:
BufferedHTTPResponse
,object
- readline(size=1024)¶
Reads a line from the SSYNC response body.
httplib has no readline and will block on read(x) until x is read, so we have to do the work ourselves. A bit of this is taken from Python’s httplib itself.
- swift.obj.ssync_sender.decode_wanted(parts)¶
Parse missing_check line parts to determine which parts of local diskfile were wanted by the receiver.
The encoder for parts is
encode_wanted()
- swift.obj.ssync_sender.encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None, **kwargs)¶
Returns a string representing the object hash, its data file timestamp, the delta forwards to its metafile and content-type timestamps, if non-zero, and its durability, in the form:
<hash> <ts_data> [m:<hex delta to ts_meta>[,t:<hex delta to ts_ctype>] [,durable:False]
The decoder for this line is
decode_missing()
- class swift.obj.ssync_receiver.Receiver(app, request)¶
Bases:
object
Handles incoming SSYNC requests to the object server.
These requests come from the object-replicator daemon that uses
ssync_sender
.The number of concurrent SSYNC requests is restricted by use of a replication_semaphore and can be configured with the object-server.conf [object-server] replication_concurrency setting.
An SSYNC request is really just an HTTP conduit for sender/receiver replication communication. The overall SSYNC request should always succeed, but it will contain multiple requests within its request and response bodies. This “hack” is done so that replication concurrency can be managed.
The general process inside an SSYNC request is:
Initialize the request: Basic request validation, mount check, acquire semaphore lock, etc..
Missing check: Sender sends the hashes and timestamps of the object information it can send, receiver sends back the hashes it wants (doesn’t have or has an older timestamp).
Updates: Sender sends the object information requested.
Close down: Release semaphore lock, etc.
- initialize_request()¶
Basic validation of request and mount check.
This function will be called before attempting to acquire a replication semaphore lock, so contains only quick checks.
- missing_check()¶
Handles the receiver-side of the MISSING_CHECK step of a SSYNC request.
Receives a list of hashes and timestamps of object information the sender can provide and responds with a list of hashes desired, either because they’re missing or have an older timestamp locally.
The process is generally:
Sender sends
:MISSING_CHECK: START
and begins sending hash timestamp lines.Receiver gets
:MISSING_CHECK: START
and begins reading the hash timestamp lines, collecting the hashes of those it desires.Sender sends
:MISSING_CHECK: END
.Receiver gets
:MISSING_CHECK: END
, responds with:MISSING_CHECK: START
, followed by the list of <wanted_hash> specifiers it collected as being wanted (one per line),:MISSING_CHECK: END
, and flushes any buffers.Each <wanted_hash> specifier has the form <hash>[ <parts>] where <parts> is a string containing characters ‘d’ and/or ‘m’ indicating that only data or meta part of object respectively is required to be sync’d.
Sender gets
:MISSING_CHECK: START
and reads the list of hashes desired by the receiver until reading:MISSING_CHECK: END
.
The collection and then response is so the sender doesn’t have to read while it writes to ensure network buffers don’t fill up and block everything.
- updates()¶
Handles the UPDATES step of an SSYNC request.
Receives a set of PUT and DELETE subrequests that will be routed to the object server itself for processing. These contain the information requested by the MISSING_CHECK step.
The PUT and DELETE subrequests are formatted pretty much exactly like regular HTTP requests, excepting the HTTP version on the first request line.
The process is generally:
Sender sends
:UPDATES: START
and begins sending the PUT and DELETE subrequests.Receiver gets
:UPDATES: START
and begins routing the subrequests to the object server.Sender sends
:UPDATES: END
.Receiver gets
:UPDATES: END
and sends:UPDATES: START
and:UPDATES: END
(assuming no errors).Sender gets
:UPDATES: START
and:UPDATES: END
.
If too many subrequests fail, as configured by replication_failure_threshold and replication_failure_ratio, the receiver will hang up the request early so as to not waste any more time.
At step 4, the receiver will send back an error if there were any failures (that didn’t cause a hangup due to the above thresholds) so the sender knows the whole was not entirely a success. This is so the sender knows if it can remove an out of place partition, for example.
- exception swift.obj.ssync_receiver.SsyncClientDisconnected¶
Bases:
Exception
- swift.obj.ssync_receiver.decode_missing(line)¶
Parse a string of the form generated by
encode_missing()
and return a dict with keysobject_hash
,ts_data
,ts_meta
,ts_ctype
,durable
.The encoder for this line is
encode_missing()
- swift.obj.ssync_receiver.encode_wanted(remote, local)¶
Compare a remote and local results and generate a wanted line.
- Parameters:
remote – a dict, with ts_data and ts_meta keys in the form returned by
decode_missing()
local – a dict, possibly empty, with ts_data and ts_meta keys in the form returned
Receiver._check_local()
The decoder for this line is
decode_wanted()
Object Reconstructor¶
- class swift.obj.reconstructor.ObjectReconstructor(conf, logger=None)¶
Bases:
Daemon
Reconstruct objects using erasure code. And also rebalance EC Fragment Archive objects off handoff nodes.
Encapsulates most logic and data needed by the object reconstruction process. Each call to .reconstruct() performs one pass. It’s up to the caller to do this in a loop.
- aggregate_recon_update()¶
Aggregate per-disk rcache updates from child workers.
- build_reconstruction_jobs(part_info)¶
Helper function for collect_jobs to build jobs for reconstruction using EC style storage policy
N.B. If this function ever returns an empty list of jobs the entire partition will be deleted.
- check_ring(object_ring)¶
Check to see if the ring has been updated
- Parameters:
object_ring – the ring to check
- Returns:
boolean indicating whether or not the ring has changed
- collect_parts(override_devices=None, override_partitions=None)¶
Helper for getting partitions in the top level reconstructor
In handoffs_only mode primary partitions will not be included in the returned (possibly empty) list.
- delete_partition(path)¶
- delete_reverted_objs(job, objects)¶
For EC we can potentially revert only some of a partition so we’ll delete reverted objects here. Note that we delete the fragment index of the file we sent to the remote node.
- Parameters:
job – the job being processed
objects – a dict of objects to be deleted, each entry maps hash=>timestamp
- detect_lockups()¶
In testing, the pool.waitall() call very occasionally failed to return. This is an attempt to make sure the reconstructor finishes its reconstruction pass in some eventuality.
- final_recon_dump(total, override_devices=None, **kwargs)¶
Add stats for this worker’s run to recon cache.
When in worker mode (per_disk_stats == True) this worker’s stats are added per device instead of in the top level keys (aggregation is serialized in the parent process).
- Parameters:
total – the runtime of cycle in minutes
override_devices – (optional) list of device that are being reconstructed
- get_local_devices()¶
Returns a set of all local devices in all EC policies.
- get_policy2devices()¶
- get_suffix_delta(local_suff, local_index, remote_suff, remote_index)¶
Compare the local suffix hashes with the remote suffix hashes for the given local and remote fragment indexes. Return those suffixes which should be synced.
- Parameters:
local_suff – the local suffix hashes (from _get_hashes)
local_index – the local fragment index for the job
remote_suff – the remote suffix hashes (from remote REPLICATE request)
remote_index – the remote fragment index for the job
- Returns:
a list of strings, the suffix dirs to sync
- get_worker_args(once=False, **kwargs)¶
Take the set of all local devices for this node from all the EC policies rings, and distribute them evenly into the number of workers to be spawned according to the configured worker count. If devices is given in kwargs then distribute only those devices.
- Parameters:
once – False if the worker(s) will be daemonized, True if the worker(s) will be run once
kwargs – optional overrides from the command line
- heartbeat()¶
Loop that runs in the background during reconstruction. It periodically logs progress.
- is_healthy()¶
Check whether rings have changed, and maybe do a recon update.
- Returns:
False if any ec ring has changed
- kill_coros()¶
Utility function that kills all coroutines currently running.
- load_object_ring(policy)¶
Make sure the policy’s rings are loaded.
- Parameters:
policy – the StoragePolicy instance
- Returns:
appropriate ring object
- make_rebuilt_fragment_iter(responses, path, policy, frag_index)¶
Turn a set of connections from backend object servers into a generator that yields up the rebuilt fragment archive for frag_index.
- post_multiprocess_run()¶
Override this to do something after running using multiple worker processes. This method is called in the parent process.
This is probably only useful for run-once mode since there is no “after running” in run-forever mode.
- process_job(job)¶
Sync the local partition with the remote node(s) according to the parameters of the job. For primary nodes, the SYNC job type will define both left and right hand sync_to nodes to ssync with as defined by this primary nodes index in the node list based on the fragment index found in the partition. For non-primary nodes (either handoff revert, or rebalance) the REVERT job will define a single node in sync_to which is the proper/new home for the fragment index.
N.B. ring rebalancing can be time consuming and handoff nodes’ fragment indexes do not have a stable order, it’s possible to have more than one REVERT job for a partition, and in some rare failure conditions there may even also be a SYNC job for the same partition - but each one will be processed separately because each job will define a separate list of node(s) to ‘sync_to’.
- Parameters:
job – the job dict, with the keys defined in
_get_job_info
- reconstruct(**kwargs)¶
Run a reconstruction pass
- reconstruct_fa(job, node, df)¶
Reconstructs a fragment archive - this method is called from ssync after a remote node responds that is missing this object - the local diskfile is opened to provide metadata - but to reconstruct the missing fragment archive we must connect to multiple object servers.
- Parameters:
job – job from ssync_sender.
node – node to which we’re rebuilding.
df – an instance of
BaseDiskFile
.
- Returns:
a DiskFile like class for use by ssync.
- Raises:
DiskFileQuarantined – if the fragment archive cannot be reconstructed and has as a result been quarantined.
DiskFileError – if the fragment archive cannot be reconstructed.
- run_forever(multiprocess_worker_index=None, *args, **kwargs)¶
Override this to run forever
- run_once(multiprocess_worker_index=None, *args, **kwargs)¶
Override this to run the script once
- stats_line()¶
Logs various stats for the currently running reconstruction pass.
- class swift.obj.reconstructor.RebuildingECDiskFileStream(datafile_metadata, frag_index, rebuilt_fragment_iter)¶
Bases:
object
This class wraps the reconstructed fragment archive data and metadata in the DiskFile interface for ssync.
- property content_length¶
- get_datafile_metadata()¶
- get_metadata()¶
- reader()¶
- class swift.obj.reconstructor.ResponseBucket¶
Bases:
object
Encapsulates fragment GET response data related to a single timestamp.
- swift.obj.reconstructor.main()¶
Object Server¶
Object Server for Swift
- class swift.obj.server.EventletPlungerString¶
Bases:
bytes
Eventlet won’t send headers until it’s accumulated at least eventlet.wsgi.MINIMUM_CHUNK_SIZE bytes or the app iter is exhausted. If we want to send the response body behind Eventlet’s back, perhaps with some zero-copy wizardry, then we have to unclog the plumbing in eventlet.wsgi to force the headers out, so we use an EventletPlungerString to empty out all of Eventlet’s buffers.
- class swift.obj.server.ObjectController(conf, logger=None)¶
Bases:
BaseStorageServer
Implements the WSGI application for the Swift Object Server.
- DELETE(request)¶
Handle HTTP DELETE requests for the Swift Object Server.
- GET(request)¶
Handle HTTP GET requests for the Swift Object Server.
- HEAD(request)¶
Handle HTTP HEAD requests for the Swift Object Server.
- POST(request)¶
Handle HTTP POST requests for the Swift Object Server.
- PUT(request)¶
Handle HTTP PUT requests for the Swift Object Server.
- REPLICATE(request)¶
Handle REPLICATE requests for the Swift Object Server. This is used by the object replicator to get hashes for directories.
Note that the name REPLICATE is preserved for historical reasons as this verb really just returns the hashes information for the specified parameters and is used, for example, by both replication and EC.
- SSYNC(request)¶
- async_update(op, account, container, obj, host, partition, contdevice, headers_out, objdevice, policy, logger_thread_locals=None, container_path=None, db_state=None)¶
Sends or saves an async update.
- Parameters:
op – operation performed (ex: ‘PUT’, or ‘DELETE’)
account – account name for the object
container – container name for the object
obj – object name
host – host that the container is on
partition – partition that the container is on
contdevice – device name that the container is on
headers_out – dictionary of headers to send in the container request
objdevice – device name that the object is in
policy – the associated BaseStoragePolicy instance
logger_thread_locals – The thread local values to be set on the self.logger to retain transaction logging information.
container_path – optional path in the form <account/container> to which the update should be sent. If given this path will be used instead of constructing a path from the
account
andcontainer
params.db_state – The current database state of the container as supplied to us by the proxy.
- container_update(op, account, container, obj, request, headers_out, objdevice, policy)¶
Update the container when objects are updated.
- Parameters:
op – operation performed (ex: ‘PUT’, or ‘DELETE’)
account – account name for the object
container – container name for the object
obj – object name
request – the original request object driving the update
headers_out – dictionary of headers to send in the container request(s)
objdevice – device name that the object is in
policy – the BaseStoragePolicy instance
- delete_at_update(op, delete_at, account, container, obj, request, objdevice, policy, extra_headers=None)¶
Update the expiring objects container when objects are updated.
- Parameters:
op – operation performed (ex: ‘PUT’, or ‘DELETE’)
delete_at – scheduled delete in UNIX seconds, int
account – account name for the object
container – container name for the object
obj – object name
request – the original request driving the update
objdevice – device name that the object is in
policy – the BaseStoragePolicy instance (used for tmp dir)
extra_headers – dict of additional headers for the update
- get_diskfile(device, partition, account, container, obj, policy, **kwargs)¶
Utility method for instantiating a DiskFile object supporting a given REST API.
An implementation of the object server that wants to use a different DiskFile class would simply over-ride this method to provide that behavior.
- server_type = 'object-server'¶
- setup(conf)¶
Implementation specific setup. This method is called at the very end by the constructor to allow a specific implementation to modify existing attributes or add its own attributes.
- Parameters:
conf – WSGI configuration parameter
- swift.obj.server.app_factory(global_conf, **local_conf)¶
paste.deploy app factory for creating WSGI object server apps
- swift.obj.server.drain(file_like, read_size, timeout)¶
Read and discard any bytes from file_like.
- Parameters:
file_like – file-like object to read from
read_size – how big a chunk to read at a time
timeout – how long to wait for a read (use None for no timeout)
- Raises:
ChunkReadTimeout – if no chunk was read in time
- swift.obj.server.get_obj_name_and_placement(request)¶
Split and validate path for an object.
- Parameters:
request – a swob request
- Returns:
a tuple of path parts and storage policy
- swift.obj.server.global_conf_callback(preloaded_app_conf, global_conf)¶
Callback for swift.common.wsgi.run_wsgi during the global_conf creation so that we can add our replication_semaphore, used to limit the number of concurrent SSYNC_REQUESTS across all workers.
- Parameters:
preloaded_app_conf – The preloaded conf for the WSGI app. This conf instance will go away, so just read from it, don’t write.
global_conf – The global conf that will eventually be passed to the app_factory function later. This conf is created before the worker subprocesses are forked, so can be useful to set up semaphores, shared memory, etc.
- swift.obj.server.iter_mime_headers_and_bodies(wsgi_input, mime_boundary, read_chunk_size)¶
- swift.obj.server.main()¶
Object Updater¶
- class swift.obj.updater.BucketizedUpdateSkippingLimiter(update_iterable, logger, stats, num_buckets=1000, max_elements_per_group_per_second=50, max_deferred_elements=0, drain_until=0)¶
Bases:
object
Wrap an iterator to rate-limit updates on a per-bucket basis, where updates are mapped to buckets by hashing their destination path. If an update is rate-limited then it is placed on a deferral queue and may be sent later if the wrapped iterator is exhausted before the
drain_until
time is reached.The deferral queue has constrained size and once the queue is full updates are evicted using a first-in-first-out policy. This policy is used because updates on the queue may have been made obsolete by newer updates written to disk, and this is more likely for updates that have been on the queue longest.
The iterator increments stats as follows:
The deferrals stat is incremented for each update that is rate-limited. Note that a individual update is rate-limited at most once.
The skips stat is incremented for each rate-limited update that is not eventually yielded. This includes updates that are evicted from the deferral queue and all updates that remain in the deferral queue when
drain_until
time is reached and the iterator terminates.The drains stat is incremented for each rate-limited update that is eventually yielded.
Consequently, when this iterator terminates, the sum of skips and drains is equal to the number of deferrals.
- Parameters:
update_iterable – an async_pending update iterable
logger – a logger instance
stats – a SweepStats instance
num_buckets – number of buckets to divide container hashes into, the more buckets total the less containers to a bucket (once a busy container slows down a bucket the whole bucket starts deferring)
max_elements_per_group_per_second – tunable, when deferring kicks in
max_deferred_elements – maximum number of deferred elements before skipping starts. Each bucket may defer updates, but once the total number of deferred updates summed across all buckets reaches this value then all buckets will skip subsequent updates.
drain_until – time at which any remaining deferred elements must be skipped and the iterator stops. Once the wrapped iterator has been exhausted, this iterator will drain deferred elements from its buckets until either all buckets have drained or this time is reached.
- next()¶
- class swift.obj.updater.ObjectUpdater(conf, logger=None)¶
Bases:
Daemon
Update object information in container listings.
- get_container_ring()¶
Get the container ring. Load it, if it hasn’t been yet.
- object_sweep(device)¶
If there are async pendings on the device, walk each one and update.
- Parameters:
device – path to device
- object_update(node, part, op, obj, headers_out)¶
Perform the object update to the container
- Parameters:
node – node dictionary from the container ring
part – partition that holds the container
op – operation performed (ex: ‘PUT’ or ‘DELETE’)
obj – object name being updated
headers_out – headers to send with the update
- Returns:
a tuple of (
success
,node_id
,redirect
) wheresuccess
is True if the update succeeded,node_id
is the_id of the node updated andredirect
is either None or a tuple of (a path, a timestamp string).
- process_object_update(update_path, device, policy, update, **kwargs)¶
Process the object information to be updated and update.
- Parameters:
update_path – path to pickled object update file
device – path to device
policy – storage policy of object update
update – the un-pickled update data
kwargs – un-used keys from update_ctx
- run_forever(*args, **kwargs)¶
Run the updater continuously.
- run_once(*args, **kwargs)¶
Run the updater once.
- class swift.obj.updater.RateLimiterBucket(max_updates_per_second)¶
Bases:
EventletRateLimiter
Extends EventletRateLimiter to also maintain a deque of items that have been deferred due to rate-limiting, and to provide a comparator for sorting instanced by readiness.
- class swift.obj.updater.SweepStats(errors=0, failures=0, quarantines=0, successes=0, unlinks=0, redirects=0, skips=0, deferrals=0, drains=0)¶
Bases:
object
Stats bucket for an update sweep
A measure of the rate at which updates are being rate-limited is:
deferrals / (deferrals + successes + failures - drains)
A measure of the rate at which updates are not being sent during a sweep is:
skips / (skips + successes + failures)
- copy()¶
- reset()¶
- since(other)¶
- swift.obj.updater.main()¶
- swift.obj.updater.random() x in the interval [0, 1). ¶
- swift.obj.updater.split_update_path(update)¶
Split the account and container parts out of the async update data.
N.B. updates to shards set the container_path key while the account and container keys are always the root.