#
# Copyright (C) 2011 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public
# License as published by the Free Software Foundation; either version
# 2 of the License (GPLv2) or (at your option) any later version.
# There is NO WARRANTY for this software, express or implied,
# including the implied warranties of MERCHANTABILITY,
# NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should
# have received a copy of GPLv2 along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
"""sync_trees - utilities for synchronising trees with rsync"""
from datetime import datetime
import logging
import os
from os import path
import sys # We use stdout for testing purposes
import shellutil
import shutil
import subprocess
import traceback
import collections
import re
import contextlib
import errno
from . import sync_config, util
_BASE_FETCH_DIR_PARAMS = """
-rlptDvH --delete-after --ignore-errors --progress --stats --human-readable
--timeout=18000 --partial --delay-updates
""".split()
# Rsync statistics collection
_sync_stats_pattern = re.compile(r"""
Number of files: (?P<total_file_count>\d+)
Number of files transferred: (?P<transferred_file_count>\d+)
Total file size: (?P<total_size>[.\d]+)(?P<total_size_kind>[BKMG]?) bytes
Total transferred file size: (?P<transferred_size>[.\d]+)(?P<transferred_size_kind>[BKMG]?) bytes
Literal data: (?P<literal_size>[.\d]+)(?P<literal_size_kind>[BKMG]?) bytes
Matched data: (?P<matched_size>[.\d]+)(?P<matched_size_kind>[BKMG]?) bytes
File list size: (?P<listing_size>[.\d]+)(?P<listing_size_kind>[BKMG]?)
File list generation time: (?P<listing_creation_seconds>[.\d]+) seconds
File list transfer time: (?P<listing_transfer_seconds>[.\d]+) seconds
Total bytes sent: (?P<sent_size>[.\d]+)(?P<sent_size_kind>[BKMG]?)
Total bytes received: (?P<received_size>[.\d]+)(?P<received_size_kind>[BKMG]?)
sent [.\d]+[BKMG]? bytes\s+received [.\d]+[BKMG]? bytes\s+(?P<transfer_rate>[.\d]+)(?P<transfer_rate_kind>[BKMG]?)\s+bytes/sec
""", re.DOTALL)
_old_sync_stats_pattern = re.compile(r"""
Number of files: (?P<total_file_count>\d+)
Number of files transferred: (?P<transferred_file_count>\d+)
Total file size: (?P<total_size>[.\d]+)(?P<total_size_kind>[BKMG]?) bytes
Total transferred file size: (?P<transferred_size>[.\d]+)(?P<transferred_size_kind>[BKMG]?) bytes
Literal data: (?P<literal_size>[.\d]+)(?P<literal_size_kind>[BKMG]?) bytes
Matched data: (?P<matched_size>[.\d]+)(?P<matched_size_kind>[BKMG]?) bytes
File list size: (?P<listing_size>[.\d]+)(?P<listing_size_kind>[BKMG]?)
Total bytes sent: (?P<sent_size>[.\d]+)(?P<sent_size_kind>[BKMG]?)
Total bytes received: (?P<received_size>[.\d]+)(?P<received_size_kind>[BKMG]?)
sent [.\d]+[BKMG]? bytes\s+received [.\d]+[BKMG]? bytes\s+(?P<transfer_rate>[.\d]+)(?P<transfer_rate_kind>[BKMG]?)\s+bytes/sec
""", re.DOTALL)
_kind_scale = {
None : 1,
'' : 1,
'B' : 1,
'K' : 1024,
'M' : 1024*1024,
'G' : 1024*1024*1024,
'T' : 1024*1024*1024*1024,
}
def _bytes_from_size_and_kind(size, kind):
scale = _kind_scale[kind]
return int(float(size) * scale)
_sync_stats_fields = """
total_file_count transferred_file_count
total_bytes transferred_bytes
literal_bytes matched_bytes
sent_bytes received_bytes
transfer_bps
listing_bytes listing_creation_seconds listing_transfer_seconds
""".split()
[docs]class SyncStats(collections.namedtuple("SyncStats", _sync_stats_fields)):
def __add__(self, other):
if not isinstance(other, SyncStats):
return NotImplemented
return SyncStats(*(a + b for a, b in zip(self, other)))
@classmethod
[docs] def from_rsync_output(cls, raw_data, old_daemon=False):
stats = collections.defaultdict(int)
scraped = None
if old_daemon:
pattern = _old_sync_stats_pattern
else:
pattern = _sync_stats_pattern
for scraped in pattern.finditer(raw_data):
data = scraped.groupdict()
if old_daemon:
data["listing_creation_seconds"] = 0
data["listing_transfer_seconds"] = 0
for field in SyncStats._fields:
if field.endswith("_count"):
stats[field] += int(data[field])
elif field.endswith("_seconds"):
stats[field] += float(data[field])
elif field.endswith("_bps"):
field_prefix = field.rpartition('_')[0]
rate = data[field_prefix + "_rate"]
kind = data[field_prefix + "_rate_kind"]
stats[field] += _bytes_from_size_and_kind(rate, kind)
else:
field_prefix = field.rpartition('_')[0]
size = data[field_prefix + "_size"]
kind = data[field_prefix + "_size_kind"]
stats[field] += _bytes_from_size_and_kind(size, kind)
if scraped is None:
raise ValueError("No rsync stats found in output")
return cls(**stats)
_null_sync_stats = SyncStats(*([0]*len(SyncStats._fields)))
# rsync remote ls scraping
_remote_ls_entry_pattern = re.compile(
"^(?P<entry_kind>.).*"
" (?P<mtime>\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d)"
" (?P<entry_details>.*)$", re.MULTILINE)
[docs]class BaseSyncCommand(object):
SYNC_UP_TO_DATE = "SYNC_UP_TO_DATE"
SYNC_COMPLETED = "SYNC_COMPLETED"
SYNC_PARTIAL = "SYNC_PARTIAL"
SYNC_FAILED = "SYNC_FAILED"
SYNC_DISABLED = "SYNC_DISABLED"
DRY_RUN_SUFFIX = "_DRY_RUN"
CONFIG_TYPE = None
def __init__(self, config, log_dest=None):
config_type = self.CONFIG_TYPE
if config_type is None:
raise NotImplementedError("CONFIG_TYPE not set by subclass")
config_data = config_type(config)
config_data.validate()
self.__dict__.update(config_data.config)
self._init_run_log(log_dest)
def _init_run_log(self, log_dest):
self._run_log_indent_level = 0
if log_dest is None:
self._run_log_file = None
elif isinstance(log_dest, basestring):
# Use line buffered output by default
self._run_log_file = open(log_dest, 'w', 1)
else:
self._run_log_file = log_dest
self._update_run_log("Log initialised: {0} {1}",
type(self).__name__,
util.__version__)
@contextlib.contextmanager
def _indent_run_log(self, level=None):
old_level = self._run_log_indent_level
if level is None:
level = old_level + 1
self._run_log_indent_level = level
try:
yield
finally:
self._run_log_indent_level = old_level
def _update_run_log(self, _fmt, *args, **kwds):
if self._run_log_file is None:
return
fmt = (" " * self._run_log_indent_level) + _fmt
if args:
msg = fmt.format(*args, **kwds)
else:
msg = fmt
self._run_log_file.write(msg.rstrip() + '\n')
@contextlib.contextmanager
def _flush_run_log(self):
if self._run_log_file is None:
yield
else:
try:
yield
finally:
self._run_log_file.flush()
def _run_shell_command(self, cmd):
shell_output = []
with self._indent_run_log(0):
self._update_run_log("_"*75)
self._update_run_log("Getting shell output for:\n\n {0}\n\n", cmd)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
for line in proc.stdout:
shell_output.append(line)
self._update_run_log(line)
result = proc.wait()
self._update_run_log("^"*75)
return result, "".join(shell_output)
def _consolidate_tree(self):
local_path = self.local_path
hardlink_cmd = ["hardlink", "-v", self.local_path]
try:
return_code, __ = self._run_shell_command(hardlink_cmd)
except:
self._update_run_log(traceback.format_exc())
result_msg = "Exception while hard linking duplicates in {0!r}"
else:
if return_code == 0:
result_msg = "Successfully hard linked duplicates in {0!r}"
else:
result_msg = "Failed to hard link duplicates in {0!r}"
self._update_run_log(result_msg, local_path)
def _send_amqp_message(self, result, sync_stats):
details = "{0!r} transfer {1!r} -> {2!r}: {3}, {4}".format(
self.tree_name, self.remote_path, self.local_path, result, sync_stats)
if self.dry_run_only:
msg = "Not sending AMQP message for test run ({0})"
msg = "AMQP support not yet implemented ({0})"
self._update_run_log(msg, details)
def _run_sync_inner(self):
start_time = datetime.utcnow()
if not self.enabled:
self._update_run_log("Ignoring sync request for {0!r} at {1}", self.tree_name, start_time)
return self.SYNC_DISABLED, start_time, start_time, _null_sync_stats
self._update_run_log("Syncing tree {0!r} at {1}", self.tree_name, start_time)
with self._indent_run_log():
if self.dry_run_only:
self._update_run_log("Performing test run (no file transfer)")
elif not path.exists(self.local_path):
self._update_run_log("Local path {0!r} does not exist, creating it", self.local_path)
try:
os.makedirs(self.local_path, 0755)
except OSError as ex:
if ex.errno != errno.EEXIST:
raise
self._update_run_log(" Destination directory already created by another process")
result, sync_stats = self._do_transfer()
if sync_stats.transferred_file_count > 0:
self._update_run_log("Consolidating downloaded data with hard links")
with self._indent_run_log():
self._consolidate_tree()
self._update_run_log("Sending AMQP message")
with self._indent_run_log():
self._send_amqp_message(result, sync_stats)
finish_time = datetime.utcnow()
if self.dry_run_only:
result += self.DRY_RUN_SUFFIX
msg = "Completed sync of {0!r} at {1} (Result: {2}, Duration: {3})"
self._update_run_log(msg, self.tree_name,
finish_time, result, finish_time - start_time)
return result, start_time, finish_time, sync_stats
[docs] def run_sync(self):
"""Execute the full synchronisation task
Ensures the sync log is flushed before returing
"""
with self._flush_run_log():
return self._run_sync_inner()
def _build_common_rsync_params(self):
"""Construct rsync parameters common to all operations"""
params = []
if self.old_remote_daemon:
params.append("--no-implied-dirs")
if self.rsync_port:
params.append("--port={0}".format(self.rsync_port))
return params
def _build_fetch_dir_rsync_params(self, remote_source_path, local_dest_path,
local_seed_paths=()):
"""Construct rsync parameters to fetch a remote directory"""
params = _BASE_FETCH_DIR_PARAMS[:]
params.extend(self._build_common_rsync_params())
if self.dry_run_only:
params.append("-n")
if self.bandwidth_limit:
params.append("--bwlimit={0}".format(self.bandwidth_limit))
# Add sync filters
for rsync_filter in self.sync_filters:
params.append("--filter={0}".format(rsync_filter))
# Add exclude filters
for excluded_file in self.exclude_from_sync:
params.append("--exclude={0}".format(excluded_file))
# Protect directories from deletion if they contain a file called PROTECTED
for dir_info in shellutil.filtered_walk(local_dest_path, file_pattern='PROTECTED'):
if dir_info.files:
rel_path = dir_info.path
if os.path.isabs(rel_path):
rel_path = os.path.relpath(rel_path, local_dest_path)
params.append("--filter=protect {0}".format(rel_path))
for seed_path in local_seed_paths:
params.append("--link-dest={0}".format(seed_path))
params.append(remote_source_path)
params.append(local_dest_path)
return params
def _scrape_fetch_dir_rsync_stats(self, data):
try:
return SyncStats.from_rsync_output(data, self.old_remote_daemon)
except ValueError:
self._update_run_log("No stats data found in rsync output")
raise RuntimeError("No stats data found in rsync output")
def _fetch_dir_complete(self, result, remote_source_path, local_dest_path):
return result
[docs] def fetch_dir(self, remote_source_path, local_dest_path, local_seed_paths=()):
"""Fetch a single directory from the remote server"""
params = self._build_fetch_dir_rsync_params(remote_source_path,
local_dest_path,
local_seed_paths)
rsync_fetch_command = ["rsync"] + params
rsync_stats = _null_sync_stats
self._update_run_log("Downloading {0!r} -> {1!r}", remote_source_path, local_dest_path)
for seed_path in local_seed_paths:
self._update_run_log("Using {0!r} as local seed data", seed_path)
if not self.dry_run_only:
# Remove any previously synchronised files and symlinks that have
# have been changed to directories on the source server
if (os.path.lexists(local_dest_path) and
(os.path.islink(local_dest_path) or
not os.path.isdir(local_dest_path))):
self._update_run_log("Unlinking {0!r} (replacing with directory)", local_dest_path)
os.unlink(local_dest_path)
# Ensure the full path to the destination directory exists locally
if not os.path.lexists(local_dest_path):
self._update_run_log("Creating destination directory {0!r}", local_dest_path)
try:
os.makedirs(local_dest_path)
except OSError as ex:
if ex.errno != errno.EEXIST:
raise
self._update_run_log(" Destination directory already created by another process")
with self._indent_run_log():
try:
return_code, captured = self._run_shell_command(rsync_fetch_command)
except:
self._update_run_log(traceback.format_exc())
result_msg = "Exception while updating {0!r} from {1!r}"
else:
if return_code in (0, 23):
with self._indent_run_log():
rsync_stats = self._scrape_fetch_dir_rsync_stats(captured)
self._update_run_log("Retrieved rsync stats:")
with self._indent_run_log():
for field, value in zip(rsync_stats._fields, rsync_stats):
self._update_run_log("{0}={1}", field, value)
if return_code == 23:
result_msg = "Partially updated {0!r} from {1!r}"
result = self.SYNC_PARTIAL
elif rsync_stats.transferred_file_count == 0:
result_msg = "{0!r} already up to date relative to {1!r} (or all updates were found in seed directory)"
result = self.SYNC_UP_TO_DATE
else:
result_msg = "Successfully updated {0!r} from {1!r}"
result = self.SYNC_COMPLETED
# We give subclasses a chance to second guess the nominal result
# as well as taking other actions
result = self._fetch_dir_complete(result, remote_source_path, local_dest_path)
else:
result_msg = "Non-zero return code (%d) updating {0!r} from {1!r}" % return_code
result = self.SYNC_FAILED
self._update_run_log(result_msg, local_dest_path, remote_source_path)
return result, rsync_stats
[docs]class SyncTree(BaseSyncCommand):
"""Sync the contents of a directory"""
CONFIG_TYPE = sync_config.TreeSyncConfig
def _do_transfer(self):
remote_source_path = "rsync://{0}{1}".format(self.remote_server, self.remote_path)
local_dest_path = self.local_path
return self.fetch_dir(remote_source_path, local_dest_path)
[docs]class SyncVersionedTree(BaseSyncCommand):
"""Sync the contents of a directory containing multiple versions of a tree"""
CONFIG_TYPE = sync_config.VersionedSyncConfig
def _build_remote_ls_rsync_params(self, remote_ls_path):
"""Construct rsync parameters to get a remote directory listing"""
params = ["-nl"]
if self.old_remote_daemon:
# The common params handles adding --no-implied-dirs, but the
# directory listing operation also needs this option
params.append("--old-d")
params.extend(self._build_common_rsync_params())
# Filter out unwanted directories
for subdir_filter in self.listing_filters:
params.append("--filter={0}".format(subdir_filter))
for excluded_pattern in self.exclude_from_listing:
params.append("--exclude={0}".format(excluded_pattern))
params.append(remote_ls_path)
return params
def _scrape_rsync_remote_ls(self, data):
dir_entries = []
link_entries = []
for entry in re.finditer(_remote_ls_entry_pattern, data):
kind = entry.group("entry_kind")
details = entry.group("entry_details")
if kind == 'l':
link_entries.append(details.strip())
elif kind == 'd':
mtime = entry.group("mtime")
dir_entries.append((mtime, details.strip()))
else:
self._update_run_log("Unknown entry kind {0!r}", entry)
self._update_run_log("Identified directories {0!r}", dir_entries)
self._update_run_log("Identified symlinks {0!r}", link_entries)
return dir_entries, link_entries
[docs] def remote_ls(self, remote_ls_path):
params = self._build_remote_ls_rsync_params(remote_ls_path)
rsync_ls_command = ["rsync"] + params
self._update_run_log("Getting remote listing for {0!r}", remote_ls_path)
dir_entries = link_entries = ()
with self._indent_run_log():
try:
return_code, captured = self._run_shell_command(rsync_ls_command)
except:
self._update_run_log(traceback.format_exc())
result_msg = "Exception while listing {0!r}"
else:
if return_code == 0:
result_msg = "Successfully listed {0!r}"
with self._indent_run_log():
dir_entries, link_entries = self._scrape_rsync_remote_ls(captured)
else:
result_msg = "Non-zero return code ({0:d}) listing {{0!r}}".format(return_code)
self._update_run_log(result_msg, remote_ls_path)
return dir_entries, link_entries
def _iter_local_versions(self):
local_path = self.local_path
dir_info = shellutil.filtered_walk(local_path,
dir_pattern=self.listing_pattern,
excluded_dirs=self.exclude_from_listing,
depth=0).next()
for d in dir_info.subdirs:
yield os.path.join(local_path, d)
def _get_initial_seed_paths(self):
# By default, there are no initial seed paths
return ()
def _iter_remote_versions(self, remote_dir_entries):
seed_paths = self._get_initial_seed_paths()
for mtime, version in sorted(remote_dir_entries):
remote_version = self.remote_path + version
remote_source_path = "rsync://{0}{1}/".format(self.remote_server, remote_version)
local_dest_path = os.path.join(self.local_path, version)
yield remote_source_path, local_dest_path, seed_paths
# If it exists, use the previous tree as the seed for the next one
if os.path.isdir(local_dest_path):
seed_paths = (local_dest_path,)
def _already_retrieved(self, local_dest_path):
# Local directories are overwritten by default
return False
def _should_retrieve(self, remote_source_path):
# Remote directories are retrieved by default
return True
def _fix_link_entries(self, remote_link_entries):
# ensure local symlinks match remote ones
self._update_run_log("Ensuring local validity of upstream symlinks")
with self._indent_run_log():
local_path = self.local_path
for ls_entry in remote_link_entries:
link_path, target_path = re.search("([^ ]*) -> ([^ ]*)$", ls_entry).groups()
# If those paths are absolute, os.path.join will just ignore 'local_path'
link_path = os.path.join(local_path, link_path)
full_target_path = os.path.join(local_path, target_path)
self._update_run_log("Checking symlink '{0} -> {1}'", link_path, target_path)
# Only care about symlinks to directories that exist on the local system
if not os.path.exists(full_target_path):
self._update_run_log("Local {0!r} does not exist, ignoring symlink {1!r}", full_target_path, ls_entry)
continue
if not os.path.isdir(full_target_path):
self._update_run_log("Local {0!r} is not a directory, ignoring symlink {1!r}", full_target_path, ls_entry)
continue
if os.path.islink(full_target_path):
old_target_link = os.path.join(os.path.dirname(full_target_path), os.readlink(full_target_path))
if os.path.samefile(old_target_link, link_path):
self._update_run_log("Local {0!r} links back to {1!r}, ignoring symlink {2!r}", full_target_path, link_path, ls_entry)
continue
if os.path.lexists(link_path):
if os.path.islink(link_path):
old_link_target = os.readlink(link_path)
if old_link_target == target_path:
self._update_run_log("Symlink {0!r} already exists at {1!r}", ls_entry, link_path)
continue
self._update_run_log("Unlinking old symlink '{0} -> {1}'", link_path, old_link_target)
os.unlink(link_path)
elif os.path.isdir(link_path):
if os.path.exists(os.path.join(link_path, "PROTECTED")):
self._update_run_log("Skipping existing directory {0!r} (PROTECTED file found)", link_path)
continue
self._update_run_log("Removing old directory {0!r}", link_path)
shutil.rmtree(link_path)
else:
self._update_run_log("Unlinking old file {0!r}", link_path)
os.unlink(link_path)
self._update_run_log("Creating symlink '{0} -> {1}'", link_path, target_path)
os.symlink(target_path, link_path)
def _delete_old_dirs(self, remote_dir_entries):
self._update_run_log("Checking for removal of directories on remote server")
dirs_to_delete = self._get_old_dirs(remote_dir_entries)
return self._delete_local_dirs(dirs_to_delete)
def _get_old_dirs(self, remote_dir_entries):
local_dirs = set(os.path.basename(d) for d in self._iter_local_versions())
remote_dirs = set(d for mtime, d in remote_dir_entries)
return sorted(local_dirs - remote_dirs)
def _delete_local_dirs(self, dirs_to_delete):
local_path = self.local_path
deleted = 0
with self._indent_run_log():
for dirname in dirs_to_delete:
dirpath = os.path.join(local_path, dirname)
if os.path.exists(os.path.join(dirpath, "PROTECTED")):
self._update_run_log("Not deleting {0!r} (PROTECTED file found)", dirpath)
continue
self._update_run_log("Deleting {0!r} (not on remote server)", dirpath)
shutil.rmtree(dirpath)
deleted += 1
return deleted
def _do_transfer(self):
sync_stats = _null_sync_stats
remote_pattern = os.path.join(self.remote_path, self.listing_pattern)
remote_ls_path = "rsync://{0}{1}".format(self.remote_server, remote_pattern)
dir_entries, link_entries = self.remote_ls(remote_ls_path)
if not dir_entries:
self._update_run_log("No relevant directories found at {0!r}", remote_ls_path)
return self.SYNC_FAILED, sync_stats
tallies = collections.defaultdict(int)
for remote_source_path, local_dest_path, local_seed_paths in self._iter_remote_versions(dir_entries):
self._update_run_log("Preparing to download {0!r} -> {1!r}", remote_source_path, local_dest_path)
if self._already_retrieved(local_dest_path):
self._update_run_log("Skipping download for {0!r} -> {1!r} (already completed)", remote_source_path, local_dest_path)
continue
if not self._should_retrieve(remote_source_path):
self._update_run_log("Skipping download for {0!r} -> {1!r} (source not ready)", remote_source_path, local_dest_path)
continue
dir_result, dir_stats = self.fetch_dir(remote_source_path, local_dest_path, local_seed_paths)
tallies[dir_result] += 1
sync_stats += dir_stats
if link_entries:
self._fix_link_entries(link_entries)
up_to_date = tallies[self.SYNC_UP_TO_DATE]
completed = tallies[self.SYNC_COMPLETED]
partial = tallies[self.SYNC_PARTIAL]
failed = tallies[self.SYNC_FAILED]
deleted = 0
if self.delete_old_dirs:
if failed or partial:
self._update_run_log("Errors occurred, not deleting old directories in {0!r}", self.local_path)
else:
deleted = self._delete_old_dirs(dir_entries)
if failed and not (partial or completed or up_to_date):
# Absolutely nothing worked
result = self.SYNC_FAILED
elif failed or partial:
# Got at least some failures
result = self.SYNC_PARTIAL
elif completed or deleted:
# Had to actually do something
result = self.SYNC_COMPLETED
else:
# Everything was already up to date
result = self.SYNC_UP_TO_DATE
return result, sync_stats
[docs]class SyncSnapshotTree(SyncVersionedTree):
"""Sync the contents of a directory containing multiple snapshots of a tree"""
CONFIG_TYPE = sync_config.SnapshotSyncConfig
def _find_latest_remote_version(self, remote_dir_entries):
seed_paths = self._get_initial_seed_paths()
for mtime, dir_entry in sorted(remote_dir_entries, reverse=True):
remote_entry = self.remote_path + dir_entry
remote_source_path = "rsync://{0}{1}/".format(self.remote_server, remote_entry)
local_dest_path = os.path.join(self.local_path, dir_entry)
yield remote_source_path, local_dest_path, seed_paths
# Keep going until we successfully copy a tree to the local system
if self._already_retrieved(local_dest_path):
self._update_run_log("Latest remote tree is in {0!r}", local_dest_path)
break
else:
self._update_run_log("No valid remote tree identified")
def _iter_remote_versions(self, remote_dir_entries):
if self.sync_latest_only:
return self._find_latest_remote_version(remote_dir_entries)
return super(SyncSnapshotTree, self)._iter_remote_versions(remote_dir_entries)
def _already_retrieved(self, local_dest_path):
local_status_path = os.path.join(local_dest_path, "STATUS")
with self._indent_run_log():
self._update_run_log("Checking for STATUS file in {0!r}", local_dest_path)
with self._indent_run_log():
if os.path.exists(local_status_path):
with open(local_status_path) as f:
status = f.read().strip()
self._update_run_log("Current status of {0!r} is {1!r}", local_dest_path, status)
return status == "FINISHED"
else:
self._update_run_log("No STATUS file found in {0!r}", local_dest_path)
return False
def _should_retrieve(self, remote_source_path):
with shellutil.temp_dir() as tmpdir:
with self._indent_run_log():
tmp_local_status = os.path.join(tmpdir, "STATUS")
remote_status_path = os.path.join(remote_source_path, "STATUS")
params = self._build_common_rsync_params()
params.append(remote_status_path)
params.append(tmp_local_status)
self._update_run_log("Checking for STATUS file in {0!r}", remote_source_path)
with self._indent_run_log():
rsync_status_command = ["rsync"] + params
try:
return_code, __ = self._run_shell_command(rsync_status_command)
except:
self._update_run_log(traceback.format_exc())
result_msg = "Exception while attempting to check status of {0!r}"
else:
if os.path.exists(tmp_local_status):
with open(tmp_local_status) as f:
status = f.read().strip()
self._update_run_log("Current status of {0!r} is {1!r}", remote_source_path, status)
return status == "FINISHED"
else:
result_msg = "No STATUS file found in {0!r}"
self._update_run_log(result_msg, remote_source_path)
return False
def _fetch_dir_complete(self, result, remote_source_path, local_dest_path):
if result == self.SYNC_PARTIAL:
return result
status_path = os.path.join(local_dest_path, "STATUS")
if result == self.SYNC_UP_TO_DATE and os.path.exists(status_path):
# Tree actually *was* up to date, we didn't just get lucky
# and manage to hard link everything
return result
result = self.SYNC_COMPLETED
if not self.dry_run_only:
with open(status_path, 'w') as f:
f.write("FINISHED\n")
self._link_to_latest(local_dest_path)
return result
def _get_latest_dir(self):
# Preferred approach is to use the symbolic link to the latest version
link_name = self.latest_link_name
if link_name is not None:
link_path = os.path.join(self.local_path, link_name)
if os.path.isdir(link_path):
target_path = os.path.join(link_path, os.readlink(link_path))
return os.path.abspath(target_path)
# If that's not available, we rely on the local mtime
def _sort_key(d):
return os.path.getmtime(d), d
candidates = self._iter_local_versions()
try:
return max(candidates, key=_sort_key)
except ValueError:
pass
return None
def _get_initial_seed_paths(self):
# Use the most recent local dir as the initial seed path
latest_dir = self._get_latest_dir()
return (latest_dir,) if latest_dir is not None else ()
def _get_old_dirs(self, remote_dir_entries):
dirs_to_delete = (super(SyncSnapshotTree, self).
_get_old_dirs(remote_dir_entries))
# Never delete latest entry, even if it's gone from the remote server
latest_dir = self._get_latest_dir()
if latest_dir is not None:
dirname = os.path.basename(latest_dir)
try:
dirs_to_delete.remove(dirname)
except ValueError:
pass
return dirs_to_delete
def _link_to_latest(self, target_path):
link_name = self.latest_link_name
if link_name is None:
return
local_path = self.local_path
link_path = os.path.join(local_path, link_name)
self._update_run_log("Updating {0!r} symlink to refer to latest version", link_path)
with self._indent_run_log():
if self.dry_run_only:
self._update_run_log("Skipping creation of {0!r} for test run", link_path)
return
if target_path is None:
self._update_run_log("No valid target versions in {0!r}, skipping", local_path)
return
relative_target = os.path.relpath(target_path, os.path.dirname(link_path))
if os.path.isdir(link_path):
if os.path.islink(link_path):
if os.readlink(link_path) == relative_target:
self._update_run_log("Link {0!r} -> {1!r} already exists", link_path, relative_target)
return
os.unlink(link_path)
else:
self._update_run_log("Existing latest directory, {0!r}, is not a symbolic link, deleting it", link_path)
shutil.rmtree(link_path)
elif os.path.lexists(link_path):
self._update_run_log("Existing entry, {0!r}, is not a directory, deleting it", link_path)
os.unlink(link_path)
os.symlink(relative_target, link_path)
self._update_run_log("Linked {0!r} -> {1!r}", link_path, relative_target)
[docs]class SyncSnapshotDelta(BaseSyncCommand):
"""Create an rsync delta from a snapshot directory"""
def __init__(self):
raise NotImplemented("Depends on Pulp plugin details")
[docs]class SyncFromDelta(BaseSyncCommand):
"""Create a new local snapshots from an upstream delta"""
def __init__(self):
raise NotImplemented("Depends on Pulp plugin details")