borgmatic/borgmatic/tests/unit/test_borg.py

421 lines
12 KiB
Python

from collections import OrderedDict
from subprocess import STDOUT
import os
from flexmock import flexmock
from borgmatic import borg as module
from borgmatic.tests.builtins import builtins_mock
from borgmatic.verbosity import VERBOSITY_SOME, VERBOSITY_LOTS
def test_initialize_with_passphrase_should_set_environment():
orig_environ = os.environ
try:
os.environ = {}
module.initialize({'encryption_passphrase': 'pass'}, command='attic')
assert os.environ.get('ATTIC_PASSPHRASE') == 'pass'
finally:
os.environ = orig_environ
def test_initialize_without_passphrase_should_not_set_environment():
orig_environ = os.environ
try:
os.environ = {}
module.initialize({}, command='attic')
assert os.environ.get('ATTIC_PASSPHRASE') == None
finally:
os.environ = orig_environ
def insert_subprocess_mock(check_call_command, **kwargs):
subprocess = flexmock(STDOUT=STDOUT)
subprocess.should_receive('check_call').with_args(check_call_command, **kwargs).once()
flexmock(module).subprocess = subprocess
def insert_subprocess_never():
subprocess = flexmock()
subprocess.should_receive('check_call').never()
flexmock(module).subprocess = subprocess
def insert_platform_mock():
flexmock(module.platform).should_receive('node').and_return('host')
def insert_datetime_mock():
flexmock(module).datetime = flexmock().should_receive('now').and_return(
flexmock().should_receive('isoformat').and_return('now').mock
).mock
CREATE_COMMAND_WITHOUT_EXCLUDES = ('attic', 'create', 'repo::host-now', 'foo', 'bar')
CREATE_COMMAND = CREATE_COMMAND_WITHOUT_EXCLUDES + ('--exclude-from', 'excludes')
def test_create_archive_should_call_attic_with_parameters():
insert_subprocess_mock(CREATE_COMMAND)
insert_platform_mock()
insert_datetime_mock()
module.create_archive(
excludes_filename='excludes',
verbosity=None,
storage_config={},
source_directories='foo bar',
repository='repo',
command='attic',
)
def test_create_archive_with_two_spaces_in_source_directories():
insert_subprocess_mock(CREATE_COMMAND)
insert_platform_mock()
insert_datetime_mock()
module.create_archive(
excludes_filename='excludes',
verbosity=None,
storage_config={},
source_directories='foo bar',
repository='repo',
command='attic',
)
def test_create_archive_with_none_excludes_filename_should_call_attic_without_excludes():
insert_subprocess_mock(CREATE_COMMAND_WITHOUT_EXCLUDES)
insert_platform_mock()
insert_datetime_mock()
module.create_archive(
excludes_filename=None,
verbosity=None,
storage_config={},
source_directories='foo bar',
repository='repo',
command='attic',
)
def test_create_archive_with_verbosity_some_should_call_attic_with_stats_parameter():
insert_subprocess_mock(CREATE_COMMAND + ('--stats',))
insert_platform_mock()
insert_datetime_mock()
module.create_archive(
excludes_filename='excludes',
verbosity=VERBOSITY_SOME,
storage_config={},
source_directories='foo bar',
repository='repo',
command='attic',
)
def test_create_archive_with_verbosity_lots_should_call_attic_with_verbose_parameter():
insert_subprocess_mock(CREATE_COMMAND + ('--verbose', '--stats'))
insert_platform_mock()
insert_datetime_mock()
module.create_archive(
excludes_filename='excludes',
verbosity=VERBOSITY_LOTS,
storage_config={},
source_directories='foo bar',
repository='repo',
command='attic',
)
def test_create_archive_with_compression_should_call_attic_with_compression_parameters():
insert_subprocess_mock(CREATE_COMMAND + ('--compression', 'rle'))
insert_platform_mock()
insert_datetime_mock()
module.create_archive(
excludes_filename='excludes',
verbosity=None,
storage_config={'compression': 'rle'},
source_directories='foo bar',
repository='repo',
command='attic',
)
def test_create_archive_with_one_file_system_should_call_attic_with_one_file_system_parameters():
insert_subprocess_mock(CREATE_COMMAND + ('--one-file-system',))
insert_platform_mock()
insert_datetime_mock()
module.create_archive(
excludes_filename='excludes',
verbosity=None,
storage_config={},
source_directories='foo bar',
repository='repo',
command='attic',
one_file_system=True,
)
def test_create_archive_with_umask_should_call_attic_with_umask_parameters():
insert_subprocess_mock(CREATE_COMMAND + ('--umask', '740'))
insert_platform_mock()
insert_datetime_mock()
module.create_archive(
excludes_filename='excludes',
verbosity=None,
storage_config={'umask': 740},
source_directories='foo bar',
repository='repo',
command='attic',
)
def test_create_archive_with_source_directories_glob_expands():
insert_subprocess_mock(('attic', 'create', 'repo::host-now', 'foo', 'food'))
insert_platform_mock()
insert_datetime_mock()
flexmock(module).should_receive('glob').with_args('foo*').and_return(['foo', 'food'])
module.create_archive(
excludes_filename=None,
verbosity=None,
storage_config={},
source_directories='foo*',
repository='repo',
command='attic',
)
def test_create_archive_with_non_matching_source_directories_glob_passes_through():
insert_subprocess_mock(('attic', 'create', 'repo::host-now', 'foo*'))
insert_platform_mock()
insert_datetime_mock()
flexmock(module).should_receive('glob').with_args('foo*').and_return([])
module.create_archive(
excludes_filename=None,
verbosity=None,
storage_config={},
source_directories='foo*',
repository='repo',
command='attic',
)
def test_create_archive_with_glob_should_call_attic_with_expanded_directories():
insert_subprocess_mock(('attic', 'create', 'repo::host-now', 'foo', 'food'))
insert_platform_mock()
insert_datetime_mock()
flexmock(module).should_receive('glob').with_args('foo*').and_return(['foo', 'food'])
module.create_archive(
excludes_filename=None,
verbosity=None,
storage_config={},
source_directories='foo*',
repository='repo',
command='attic',
)
BASE_PRUNE_FLAGS = (
('--keep-daily', '1'),
('--keep-weekly', '2'),
('--keep-monthly', '3'),
)
def test_make_prune_flags_should_return_flags_from_config():
retention_config = OrderedDict(
(
('keep_daily', 1),
('keep_weekly', 2),
('keep_monthly', 3),
)
)
result = module._make_prune_flags(retention_config)
assert tuple(result) == BASE_PRUNE_FLAGS
PRUNE_COMMAND = (
'attic', 'prune', 'repo', '--keep-daily', '1', '--keep-weekly', '2', '--keep-monthly', '3',
)
def test_prune_archives_should_call_attic_with_parameters():
retention_config = flexmock()
flexmock(module).should_receive('_make_prune_flags').with_args(retention_config).and_return(
BASE_PRUNE_FLAGS,
)
insert_subprocess_mock(PRUNE_COMMAND)
module.prune_archives(
verbosity=None,
repository='repo',
retention_config=retention_config,
command='attic',
)
def test_prune_archives_with_verbosity_some_should_call_attic_with_stats_parameter():
retention_config = flexmock()
flexmock(module).should_receive('_make_prune_flags').with_args(retention_config).and_return(
BASE_PRUNE_FLAGS,
)
insert_subprocess_mock(PRUNE_COMMAND + ('--stats',))
module.prune_archives(
repository='repo',
verbosity=VERBOSITY_SOME,
retention_config=retention_config,
command='attic',
)
def test_prune_archives_with_verbosity_lots_should_call_attic_with_verbose_parameter():
retention_config = flexmock()
flexmock(module).should_receive('_make_prune_flags').with_args(retention_config).and_return(
BASE_PRUNE_FLAGS,
)
insert_subprocess_mock(PRUNE_COMMAND + ('--verbose', '--stats',))
module.prune_archives(
repository='repo',
verbosity=VERBOSITY_LOTS,
retention_config=retention_config,
command='attic',
)
def test_parse_checks_returns_them_as_tuple():
checks = module._parse_checks({'checks': 'foo disabled bar'})
assert checks == ('foo', 'bar')
def test_parse_checks_with_missing_value_returns_defaults():
checks = module._parse_checks({})
assert checks == module.DEFAULT_CHECKS
def test_parse_checks_with_blank_value_returns_defaults():
checks = module._parse_checks({'checks': ''})
assert checks == module.DEFAULT_CHECKS
def test_parse_checks_with_disabled_returns_no_checks():
checks = module._parse_checks({'checks': 'disabled'})
assert checks == ()
def test_make_check_flags_with_checks_returns_flags():
flags = module._make_check_flags(('foo', 'bar'))
assert flags == ('--foo-only', '--bar-only')
def test_make_check_flags_with_default_checks_returns_no_flags():
flags = module._make_check_flags(module.DEFAULT_CHECKS)
assert flags == ()
def test_make_check_flags_with_checks_and_last_returns_flags_including_last():
flags = module._make_check_flags(('foo', 'bar'), check_last=3)
assert flags == ('--foo-only', '--bar-only', '--last', 3)
def test_make_check_flags_with_last_returns_last_flag():
flags = module._make_check_flags(module.DEFAULT_CHECKS, check_last=3)
assert flags == ('--last', 3)
def test_check_archives_should_call_attic_with_parameters():
checks = flexmock()
check_last = flexmock()
consistency_config = flexmock().should_receive('get').and_return(check_last).mock
flexmock(module).should_receive('_parse_checks').and_return(checks)
flexmock(module).should_receive('_make_check_flags').with_args(checks, check_last).and_return(())
stdout = flexmock()
insert_subprocess_mock(
('attic', 'check', 'repo'),
stdout=stdout, stderr=STDOUT,
)
insert_platform_mock()
insert_datetime_mock()
builtins_mock().should_receive('open').and_return(stdout)
flexmock(module.os).should_receive('devnull')
module.check_archives(
verbosity=None,
repository='repo',
consistency_config=consistency_config,
command='attic',
)
def test_check_archives_with_verbosity_some_should_call_attic_with_verbose_parameter():
consistency_config = flexmock().should_receive('get').and_return(None).mock
flexmock(module).should_receive('_parse_checks').and_return(flexmock())
flexmock(module).should_receive('_make_check_flags').and_return(())
insert_subprocess_mock(
('attic', 'check', 'repo', '--verbose'),
stdout=None, stderr=STDOUT,
)
insert_platform_mock()
insert_datetime_mock()
module.check_archives(
verbosity=VERBOSITY_SOME,
repository='repo',
consistency_config=consistency_config,
command='attic',
)
def test_check_archives_with_verbosity_lots_should_call_attic_with_verbose_parameter():
consistency_config = flexmock().should_receive('get').and_return(None).mock
flexmock(module).should_receive('_parse_checks').and_return(flexmock())
flexmock(module).should_receive('_make_check_flags').and_return(())
insert_subprocess_mock(
('attic', 'check', 'repo', '--verbose'),
stdout=None, stderr=STDOUT,
)
insert_platform_mock()
insert_datetime_mock()
module.check_archives(
verbosity=VERBOSITY_LOTS,
repository='repo',
consistency_config=consistency_config,
command='attic',
)
def test_check_archives_without_any_checks_should_bail():
consistency_config = flexmock().should_receive('get').and_return(None).mock
flexmock(module).should_receive('_parse_checks').and_return(())
insert_subprocess_never()
module.check_archives(
verbosity=None,
repository='repo',
consistency_config=consistency_config,
command='attic',
)