__init__.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import datetime
  2. import re
  3. import subprocess
  4. import sys
  5. import time
  6. def _duplicity(params):
  7. stdout = subprocess.check_output(
  8. ['duplicity'] + params,
  9. )
  10. return stdout.decode(sys.stdout.encoding)
  11. def _parse_duplicity_timestamp(timestamp):
  12. return datetime.datetime.fromtimestamp(
  13. time.mktime(time.strptime(timestamp))
  14. )
  15. class Collection(object):
  16. def __init__(self, url):
  17. self.url = url
  18. def request_status(self):
  19. return _CollectionStatus._parse(
  20. text=_duplicity(['collection-status', self.url])
  21. )
  22. class _Status(object):
  23. def __eq__(self, other):
  24. return isinstance(self, type(other)) and vars(self) == vars(other)
  25. def __neq__(self, other):
  26. return not (self == other)
  27. class _CollectionStatus(_Status):
  28. chain_separator_regex = r'-{25}\s'
  29. def __init__(self, primary_chain):
  30. self.primary_chain = primary_chain
  31. @property
  32. def last_incremental_backup_time(self):
  33. return self.primary_chain.last_backup_time if self.primary_chain else None
  34. @classmethod
  35. def _parse(cls, text):
  36. if 'No backup chains with active signatures found' in text:
  37. primary_chain = None
  38. else:
  39. primary_chain_match = re.search(
  40. '^Found primary backup chain.*\s{sep}([\w\W]*?)\s{sep}'.format(
  41. sep=_CollectionStatus.chain_separator_regex,
  42. ),
  43. text,
  44. re.MULTILINE,
  45. )
  46. primary_chain = _ChainStatus._parse(
  47. text=primary_chain_match.group(1),
  48. )
  49. return cls(
  50. primary_chain=primary_chain,
  51. )
  52. class _ChainStatus(_Status):
  53. def __init__(self, sets):
  54. self.sets = sets
  55. @property
  56. def last_backup_time(self):
  57. return max([s.backup_time for s in self.sets])
  58. @classmethod
  59. def _parse(cls, text):
  60. sets = []
  61. set_lines = re.split(r'Num volumes: *\r?\n', text)[1]
  62. for set_line in re.split(r'\r?\n', set_lines):
  63. set_attr = re.match(
  64. r'\s*(?P<mode>\w+) {2,}(?P<ts>.+?) {2,} (?P<vol>\d+)',
  65. set_line,
  66. ).groupdict()
  67. # duplicity uses time.asctime().
  68. # time.strptime() without format inverts time.asctime().
  69. sets.append(_SetStatus(
  70. backup_time=_parse_duplicity_timestamp(set_attr['ts']),
  71. ))
  72. return cls(sets=sets)
  73. class _SetStatus(_Status):
  74. def __init__(self, backup_time):
  75. self.backup_time = backup_time