borgmatic/borgmatic/borg.py

243 lines
7.6 KiB
Python

from datetime import datetime
import glob
import itertools
import os
import platform
import sys
import re
import subprocess
import tempfile
from borgmatic.verbosity import VERBOSITY_SOME, VERBOSITY_LOTS
# Integration with Borg for actually handling backups.
COMMAND = 'borg'
def initialize(storage_config, command=COMMAND):
passphrase = storage_config.get('encryption_passphrase')
if passphrase:
os.environ['{}_PASSPHRASE'.format(command.upper())] = passphrase
def _write_exclude_file(exclude_patterns=None):
'''
Given a sequence of exclude patterns, write them to a named temporary file and return it. Return
None if no patterns are provided.
'''
if not exclude_patterns:
return None
exclude_file = tempfile.NamedTemporaryFile('w')
exclude_file.write('\n'.join(exclude_patterns))
exclude_file.flush()
return exclude_file
def create_archive(
verbosity, repository, location_config, storage_config, command=COMMAND,
):
'''
Given a vebosity flag, a storage config dict, a list of source directories, a local or remote
repository path, a list of exclude patterns, and a command to run, create a Borg archive.
'''
sources = tuple(
itertools.chain.from_iterable(
glob.glob(directory) or [directory]
for directory in location_config['source_directories']
)
)
exclude_file = _write_exclude_file(location_config.get('exclude_patterns'))
exclude_flags = ('--exclude-from', exclude_file.name) if exclude_file else ()
compression = storage_config.get('compression', None)
compression_flags = ('--compression', compression) if compression else ()
umask = storage_config.get('umask', None)
umask_flags = ('--umask', str(umask)) if umask else ()
one_file_system_flags = ('--one-file-system',) if location_config.get('one_file_system') else ()
remote_path = location_config.get('remote_path')
remote_path_flags = ('--remote-path', remote_path) if remote_path else ()
verbosity_flags = {
VERBOSITY_SOME: ('--info', '--stats',),
VERBOSITY_LOTS: ('--debug', '--list', '--stats'),
}.get(verbosity, ())
full_command = (
command, 'create',
'{repository}::{hostname}-{timestamp}'.format(
repository=repository,
hostname=platform.node(),
timestamp=datetime.now().isoformat(),
),
) + sources + exclude_flags + compression_flags + one_file_system_flags + \
remote_path_flags + umask_flags + verbosity_flags
subprocess.check_call(full_command)
def _make_prune_flags(retention_config):
'''
Given a retention config dict mapping from option name to value, tranform it into an iterable of
command-line name-value flag pairs.
For example, given a retention config of:
{'keep_weekly': 4, 'keep_monthly': 6}
This will be returned as an iterable of:
(
('--keep-weekly', '4'),
('--keep-monthly', '6'),
)
'''
return (
('--' + option_name.replace('_', '-'), str(retention_config[option_name]))
for option_name, value in retention_config.items()
)
def prune_archives(verbosity, repository, retention_config, command=COMMAND, remote_path=None):
'''
Given a verbosity flag, a local or remote repository path, a retention config dict, and a
command to run, prune Borg archives according the the retention policy specified in that
configuration.
'''
remote_path_flags = ('--remote-path', remote_path) if remote_path else ()
verbosity_flags = {
VERBOSITY_SOME: ('--info', '--stats',),
VERBOSITY_LOTS: ('--debug', '--stats'),
}.get(verbosity, ())
full_command = (
command, 'prune',
repository,
) + tuple(
element
for pair in _make_prune_flags(retention_config)
for element in pair
) + remote_path_flags + verbosity_flags
subprocess.check_call(full_command)
DEFAULT_CHECKS = ('repository', 'archives')
def _parse_checks(consistency_config):
'''
Given a consistency config with a "checks" list, transform it to a tuple of named checks to run.
For example, given a retention config of:
{'checks': ['repository', 'archives']}
This will be returned as:
('repository', 'archives')
If no "checks" option is present, return the DEFAULT_CHECKS. If the checks value is the string
"disabled", return an empty tuple, meaning that no checks should be run.
'''
checks = consistency_config.get('checks', [])
if checks == ['disabled']:
return ()
return tuple(check for check in checks if check.lower() not in ('disabled', '')) or DEFAULT_CHECKS
def _make_check_flags(checks, check_last=None):
'''
Given a parsed sequence of checks, transform it into tuple of command-line flags.
For example, given parsed checks of:
('repository',)
This will be returned as:
('--repository-only',)
Additionally, if a check_last value is given, a "--last" flag will be added.
'''
last_flag = ('--last', str(check_last)) if check_last else ()
if checks == DEFAULT_CHECKS:
return last_flag
return tuple(
'--{}-only'.format(check) for check in checks
if check in DEFAULT_CHECKS
) + last_flag
def check_archives(verbosity, repository, consistency_config, command=COMMAND, remote_path=None):
'''
Given a verbosity flag, a local or remote repository path, a consistency config dict, and a
command to run, check the contained Borg archives for consistency.
If there are no consistency checks to run, skip running them.
'''
checks = _parse_checks(consistency_config)
check_last = consistency_config.get('check_last', None)
if set(checks).intersection(set(DEFAULT_CHECKS)):
remote_path_flags = ('--remote-path', remote_path) if remote_path else ()
verbosity_flags = {
VERBOSITY_SOME: ('--info',),
VERBOSITY_LOTS: ('--debug',),
}.get(verbosity, ())
full_command = (
command, 'check',
repository,
) + _make_check_flags(checks, check_last) + remote_path_flags + verbosity_flags
# The check command spews to stdout/stderr even without the verbose flag. Suppress it.
stdout = None if verbosity_flags else open(os.devnull, 'w')
subprocess.check_call(full_command, stdout=stdout, stderr=subprocess.STDOUT)
if 'extract' in checks:
extract_last_archive_dry_run(verbosity, repository, command, remote_path)
def extract_last_archive_dry_run(verbosity, repository, command=COMMAND, remote_path=None):
'''
Perform an extraction dry-run of just the most recent archive. If there are no archives, skip
the dry-run.
'''
remote_path_flags = ('--remote-path', remote_path) if remote_path else ()
verbosity_flags = {
VERBOSITY_SOME: ('--info',),
VERBOSITY_LOTS: ('--debug',),
}.get(verbosity, ())
full_list_command = (
command, 'list',
'--short',
repository,
) + remote_path_flags + verbosity_flags
list_output = subprocess.check_output(full_list_command).decode(sys.stdout.encoding)
last_archive_name = list_output.strip().split('\n')[-1]
if not last_archive_name:
return
list_flag = ('--list',) if verbosity == VERBOSITY_LOTS else ()
full_extract_command = (
command, 'extract',
'--dry-run',
'{repository}::{last_archive_name}'.format(
repository=repository,
last_archive_name=last_archive_name,
),
) + remote_path_flags + verbosity_flags + list_flag
subprocess.check_call(full_extract_command)