#!/usr/bin/env python3 # PYTHON_ARGCOMPLETE_OK import collections import duplitab import hashlib import os import re import shlex import subprocess import tabulate import tempfile import time import urllib.parse import yaml def command_join(args): return ' '.join([shlex.quote(a) for a in args]) def sshfs_mount(url, path, print_trace = False): """ > Duplicity uses the URL format [...]. The generic format for a URL is: > scheme://[user[:password]@]host[:port]/[/]path > [...] > In protocols that support it, the path may be preceded by a single slash, '/path', to > represent a relative path to the target home directory, or preceded by a double slash, > '//path', to represent an absolute filesystem path. """ url_attr = urllib.parse.urlparse(url) assert url_attr.scheme in ['sftp'] mount_command = [ 'sshfs', '{}:{}'.format(url_attr.netloc, url_attr.path[1:]), path, ] if print_trace: print('+ {}'.format(command_join(mount_command))) subprocess.check_call(mount_command) def sshfs_unmount(path, retry_delay_seconds = 1.0, retry_count = 2, print_trace = False): unmount_command = [ 'fusermount', '-u', path, ] if print_trace: print('+ {}'.format(command_join(unmount_command))) try: subprocess.check_call(unmount_command) except subprocess.CalledProcessError as ex: if retry_count > 0: time.sleep(retry_delay_seconds) sshfs_unmount( path = path, retry_delay_seconds = retry_delay_seconds, retry_count = retry_count - 1, print_trace = print_trace, ) else: raise ex class InvalidBackupConfigError(ValueError): def __init__(self, message, backup_config): super().__init__(message) self.backup_config = backup_config def __str__(self): return "{}\n\n{}".format( super().__str__(), yaml.dump({'backup config': self.backup_config}, default_flow_style = False), ) def backup(config, duplicity_verbosity, no_print_config, no_print_statistics, tab_dry, print_trace = False): for backup in config: if not no_print_config: print('\n{}'.format(yaml.dump({'backup': backup}, default_flow_style = False).strip())) backup_command = ['duplicity'] # name if backup['name']: backup_command += ['--name', backup['name']] # encryption try: encryption = backup['encryption'] except KeyError: encryption = True if 'encryption' in backup and not backup['encryption']: backup_command += ['--no-encryption'] else: if 'encrypt_key' in backup: backup_command += ['--encrypt-key', backup['encrypt_key']] # determine source source_mount_path = None try: if backup['source_type'] == 'local': local_source_path = backup['source_path'] elif backup['source_type'] == 'sshfs': source_mount_path = tempfile.mkdtemp(prefix = 'duplitab-source-sshfs-') sshfs_mount( url = 'sftp://{}/{}'.format(backup['source_host'], backup['source_path']), path = source_mount_path, print_trace = print_trace, ) local_source_path = source_mount_path backup_command.append('--allow-source-mismatch') else: raise Exception("unsupported source type '{}'".format(backup['source_type'])) # selectors try: selectors = backup['selectors'] except KeyError: selectors = [] for selector in selectors: if selector['option'] in ['include', 'exclude']: shell_pattern = selector['shell_pattern'] if shell_pattern.startswith(backup['source_path']): shell_pattern = shell_pattern.replace( backup['source_path'], local_source_path, 1, ) backup_command += ['--{}'.format(selector['option']), shell_pattern] else: raise Exception("unsupported selector option '{}'".format(selector['option'])) # duplicity verbosity if duplicity_verbosity: backup_command += ['--verbosity', duplicity_verbosity] # statistics if no_print_statistics: backup_command.append('--no-print-statistics') # source path backup_command.append(local_source_path) # target target_mount_path = None try: if 'target_via_sshfs' in backup and backup['target_via_sshfs']: target_mount_path = tempfile.mkdtemp(prefix = 'duplitab-target-sshfs-') backup_command += ['file://' + target_mount_path] sshfs_mount( backup['target_url'], target_mount_path, print_trace = print_trace, ) # set backup name to make archive dir persistent # (default name: hash of target url) if not backup['name']: backup_command += ['--name', hashlib.sha1(backup['target_url'].encode('utf-8')).hexdigest()] else: backup_command += [backup['target_url']] try: if print_trace: print('{} {}'.format( '*' if tab_dry else '+', command_join(backup_command), )) if not tab_dry: subprocess.check_call(backup_command) finally: if target_mount_path: sshfs_unmount(target_mount_path, print_trace = print_trace) finally: if target_mount_path: os.rmdir(target_mount_path) if source_mount_path: sshfs_unmount(source_mount_path, print_trace = print_trace) finally: if source_mount_path: os.rmdir(source_mount_path) def run(command, config_path, quiet, duplicity_verbosity, target_url_filter_regex = None, table_style = 'plain', no_print_config = False, no_print_trace = False, no_print_statistics = False, tab_dry = False): if quiet: if not duplicity_verbosity: duplicity_verbosity = 'warning' no_print_trace = True no_print_statistics = True no_print_config = True with open(config_path) as config_file: config = yaml.load(config_file.read()) for backup_attr in config: if not 'name' in backup_attr: backup_attr['name'] = None elif os.sep in backup_attr['name']: raise InvalidBackupConfigError( "backup name may not contain '{}'".format(os.sep), backup_config = backup_attr, ) if not 'source_type' in backup_attr: backup_attr['source_type'] = 'local' if not 'source_host' in backup_attr: backup_attr['source_host'] = None if not 'encrypt_key' in backup_attr: backup_attr['encrypt_key'] = None filtered_config = [] for backup_attr in config: if (not target_url_filter_regex or re.search('^{}$'.format(target_url_filter_regex), backup_attr['target_url'])): filtered_config.append(backup_attr) if not command or command == 'list': columns = collections.OrderedDict([ ('name', 'name'), ('source type', 'source_type'), ('source host', 'source_host'), ('source path', 'source_path'), ('target url', 'target_url'), ('encrypt key', 'encrypt_key'), ]) table = [[b[c] for c in columns.values()] for b in filtered_config] print(tabulate.tabulate( table, columns.keys(), tablefmt = table_style, )) elif command == 'status': table = [] for backup_attr in filtered_config: collection = duplitab.Collection(url = backup_attr['target_url']) if not quiet: sys.stderr.write('request status of collection {!r}\n'.format(collection.url)) try: status = collection.request_status() except duplitab.DuplicityCommandFailedError as ex: status = None if not quiet: sys.stderr.write(ex.msg) table.append([ collection.url, (status.last_full_backup_time or 'never') if status else None, (status.last_incremental_backup_time or 'never') if status else None, ]) print(tabulate.tabulate( table, ['target_url', 'last full backup', 'last incremental backup'], tablefmt = table_style, )) elif command == 'backup': backup( config = filtered_config, duplicity_verbosity = duplicity_verbosity, no_print_config = no_print_config, no_print_statistics = no_print_statistics, tab_dry = tab_dry, print_trace = not no_print_trace, ) def _init_argparser(): import argparse argparser = argparse.ArgumentParser(description = None) argparser.add_argument( '-c', '--config', dest = 'config_path', default = '/etc/duplitab', ) argparser.add_argument( '-u', '--filter-target-url', dest = 'target_url_filter_regex', metavar = 'REGEXP', default = None, ) argparser.add_argument( '-q', '--quiet', '--silent', action = 'store_true', dest = 'quiet', ) argparser.add_argument( '--duplicity-verbosity', type = str, ) argparser.add_argument( '--no-print-trace', action = 'store_true', ) subparsers = argparser.add_subparsers( dest = 'command', ) subparser_list = subparsers.add_parser('list') subparser_list.add_argument( '--table-style', default = 'plain', ) subparser_list = subparsers.add_parser('status') subparser_list.add_argument( '--table-style', default = 'plain', ) subparser_backup = subparsers.add_parser('backup') subparser_backup.add_argument( '--no-print-config', action = 'store_true', ) subparser_backup.add_argument( '--no-print-statistics', action = 'store_true', ) subparser_backup.add_argument( '--tab-dry', action = 'store_true', ) return argparser def main(argv): argparser = _init_argparser() try: import argcomplete argcomplete.autocomplete(argparser) except ImportError: pass args = argparser.parse_args(argv) run(**vars(args)) return 0 if __name__ == "__main__": import sys sys.exit(main(sys.argv[1:]))