Initial work on spot check schema and preparatory refactoring (#656).

This commit is contained in:
Dan Helfman 2024-03-20 11:58:59 -07:00
parent ecf5a7e294
commit 3ecd0e731e
8 changed files with 1168 additions and 1104 deletions

1
NEWS
View File

@ -5,6 +5,7 @@
* Add documentation about backing up containerized databases by configuring borgmatic to exec into
a container to run a dump command:
https://torsion.org/borgmatic/docs/how-to/backup-your-databases/#containers
*
1.8.9
* #311: Add custom dump/restore command options for MySQL and MariaDB.

View File

@ -1,12 +1,292 @@
import datetime
import hashlib
import itertools
import logging
import pathlib
import os
import borgmatic.borg.extract
import borgmatic.borg.check
import borgmatic.borg.state
import borgmatic.config.validate
import borgmatic.hooks.command
DEFAULT_CHECKS = (
{'name': 'repository', 'frequency': '1 month'},
{'name': 'archives', 'frequency': '1 month'},
)
logger = logging.getLogger(__name__)
def parse_checks(config, only_checks=None):
'''
Given a configuration dict with a "checks" sequence of dicts and an optional list of override
checks, return a tuple of named checks to run.
For example, given a config of:
{'checks': ({'name': 'repository'}, {'name': 'archives'})}
This will be returned as:
('repository', 'archives')
If no "checks" option is present in the config, return the DEFAULT_CHECKS. If a checks value
has a name of "disabled", return an empty tuple, meaning that no checks should be run.
'''
checks = only_checks or tuple(
check_config['name'] for check_config in (config.get('checks', None) or DEFAULT_CHECKS)
)
checks = tuple(check.lower() for check in checks)
if 'disabled' in checks:
logger.warning(
'The "disabled" value for the "checks" option is deprecated and will be removed from a future release; use "skip_actions" instead'
)
if len(checks) > 1:
logger.warning(
'Multiple checks are configured, but one of them is "disabled"; not running any checks'
)
return ()
return checks
def parse_frequency(frequency):
'''
Given a frequency string with a number and a unit of time, return a corresponding
datetime.timedelta instance or None if the frequency is None or "always".
For instance, given "3 weeks", return datetime.timedelta(weeks=3)
Raise ValueError if the given frequency cannot be parsed.
'''
if not frequency:
return None
frequency = frequency.strip().lower()
if frequency == 'always':
return None
try:
number, time_unit = frequency.split(' ')
number = int(number)
except ValueError:
raise ValueError(f"Could not parse consistency check frequency '{frequency}'")
if not time_unit.endswith('s'):
time_unit += 's'
if time_unit == 'months':
number *= 30
time_unit = 'days'
elif time_unit == 'years':
number *= 365
time_unit = 'days'
try:
return datetime.timedelta(**{time_unit: number})
except TypeError:
raise ValueError(f"Could not parse consistency check frequency '{frequency}'")
def filter_checks_on_frequency(
config,
borg_repository_id,
checks,
force,
archives_check_id=None,
):
'''
Given a configuration dict with a "checks" sequence of dicts, a Borg repository ID, a sequence
of checks, whether to force checks to run, and an ID for the archives check potentially being
run (if any), filter down those checks based on the configured "frequency" for each check as
compared to its check time file.
In other words, a check whose check time file's timestamp is too new (based on the configured
frequency) will get cut from the returned sequence of checks. Example:
config = {
'checks': [
{
'name': 'archives',
'frequency': '2 weeks',
},
]
}
When this function is called with that config and "archives" in checks, "archives" will get
filtered out of the returned result if its check time file is newer than 2 weeks old, indicating
that it's not yet time to run that check again.
Raise ValueError if a frequency cannot be parsed.
'''
if not checks:
return checks
filtered_checks = list(checks)
if force:
return tuple(filtered_checks)
for check_config in config.get('checks', DEFAULT_CHECKS):
check = check_config['name']
if checks and check not in checks:
continue
frequency_delta = parse_frequency(check_config.get('frequency'))
if not frequency_delta:
continue
check_time = probe_for_check_time(config, borg_repository_id, check, archives_check_id)
if not check_time:
continue
# If we've not yet reached the time when the frequency dictates we're ready for another
# check, skip this check.
if datetime.datetime.now() < check_time + frequency_delta:
remaining = check_time + frequency_delta - datetime.datetime.now()
logger.info(
f'Skipping {check} check due to configured frequency; {remaining} until next check (use --force to check anyway)'
)
filtered_checks.remove(check)
return tuple(filtered_checks)
def make_archives_check_id(archive_filter_flags):
'''
Given a sequence of flags to filter archives, return a unique hash corresponding to those
particular flags. If there are no flags, return None.
'''
if not archive_filter_flags:
return None
return hashlib.sha256(' '.join(archive_filter_flags).encode()).hexdigest()
def make_check_time_path(config, borg_repository_id, check_type, archives_check_id=None):
'''
Given a configuration dict, a Borg repository ID, the name of a check type ("repository",
"archives", etc.), and a unique hash of the archives filter flags, return a path for recording
that check's time (the time of that check last occurring).
'''
borgmatic_source_directory = os.path.expanduser(
config.get('borgmatic_source_directory', borgmatic.borg.state.DEFAULT_BORGMATIC_SOURCE_DIRECTORY)
)
if check_type in ('archives', 'data'):
return os.path.join(
borgmatic_source_directory,
'checks',
borg_repository_id,
check_type,
archives_check_id if archives_check_id else 'all',
)
return os.path.join(
borgmatic_source_directory,
'checks',
borg_repository_id,
check_type,
)
def write_check_time(path): # pragma: no cover
'''
Record a check time of now as the modification time of the given path.
'''
logger.debug(f'Writing check time at {path}')
os.makedirs(os.path.dirname(path), mode=0o700, exist_ok=True)
pathlib.Path(path, mode=0o600).touch()
def read_check_time(path):
'''
Return the check time based on the modification time of the given path. Return None if the path
doesn't exist.
'''
logger.debug(f'Reading check time from {path}')
try:
return datetime.datetime.fromtimestamp(os.stat(path).st_mtime)
except FileNotFoundError:
return None
def probe_for_check_time(config, borg_repository_id, check, archives_check_id):
'''
Given a configuration dict, a Borg repository ID, the name of a check type ("repository",
"archives", etc.), and a unique hash of the archives filter flags, return a the corresponding
check time or None if such a check time does not exist.
When the check type is "archives" or "data", this function probes two different paths to find
the check time, e.g.:
~/.borgmatic/checks/1234567890/archives/9876543210
~/.borgmatic/checks/1234567890/archives/all
... and returns the maximum modification time of the files found (if any). The first path
represents a more specific archives check time (a check on a subset of archives), and the second
is a fallback to the last "all" archives check.
For other check types, this function reads from a single check time path, e.g.:
~/.borgmatic/checks/1234567890/repository
'''
check_times = (
read_check_time(group[0])
for group in itertools.groupby(
(
make_check_time_path(config, borg_repository_id, check, archives_check_id),
make_check_time_path(config, borg_repository_id, check),
)
)
)
try:
return max(check_time for check_time in check_times if check_time)
except ValueError:
return None
def upgrade_check_times(config, borg_repository_id):
'''
Given a configuration dict and a Borg repository ID, upgrade any corresponding check times on
disk from old-style paths to new-style paths.
Currently, the only upgrade performed is renaming an archive or data check path that looks like:
~/.borgmatic/checks/1234567890/archives
to:
~/.borgmatic/checks/1234567890/archives/all
'''
for check_type in ('archives', 'data'):
new_path = make_check_time_path(config, borg_repository_id, check_type, 'all')
old_path = os.path.dirname(new_path)
temporary_path = f'{old_path}.temp'
if not os.path.isfile(old_path) and not os.path.isfile(temporary_path):
continue
logger.debug(f'Upgrading archives check time from {old_path} to {new_path}')
try:
os.rename(old_path, temporary_path)
except FileNotFoundError:
pass
os.mkdir(old_path)
os.rename(temporary_path, new_path)
def run_check(
config_filename,
repository,
@ -20,6 +300,8 @@ def run_check(
):
'''
Run the "check" action for the given repository.
Raise ValueError if the Borg repository ID cannot be determined.
'''
if check_arguments.repository and not borgmatic.config.validate.repositories_match(
repository, check_arguments.repository
@ -34,16 +316,69 @@ def run_check(
global_arguments.dry_run,
**hook_context,
)
logger.info(f'{repository.get("label", repository["path"])}: Running consistency checks')
borgmatic.borg.check.check_archives(
repository_id = borgmatic.borg.check.get_repository_id(
repository['path'],
config,
local_borg_version,
check_arguments,
global_arguments,
local_path=local_path,
remote_path=remote_path,
)
upgrade_check_times(config, repository_id)
configured_checks = parse_checks(config, check_arguments.only_checks)
archive_filter_flags = borgmatic.borg.check.make_archive_filter_flags(
local_borg_version, config, configured_checks, check_arguments
)
archives_check_id = make_archives_check_id(archive_filter_flags)
checks = filter_checks_on_frequency(
config,
repository_id,
configured_checks,
check_arguments.force,
archives_check_id,
)
borg_specific_checks = set(checks).intersection({'repository', 'archives', 'data'})
if borg_specific_checks:
borgmatic.borg.check.check_archives(
repository['path'],
config,
local_borg_version,
check_arguments,
global_arguments,
borg_specific_checks,
archive_filter_flags,
local_path=local_path,
remote_path=remote_path,
)
for check in borg_specific_checks:
write_check_time(
make_check_time_path(config, repository_id, check, archives_check_id)
)
if 'extract' in checks:
borgmatic.borg.extract.extract_last_archive_dry_run(
config,
local_borg_version,
global_arguments,
repository['path'],
config.get('lock_wait'),
local_path,
remote_path,
)
write_check_time(make_check_time_path(config, repository_id, 'extract'))
#if 'spot' in checks:
# TODO:
# count the number of files in source directories
# in a loop until the sample percentage (of the total source files) is met:
# pick a random file from source directories and calculate its sha256 sum
# extract the file from the latest archive (to stdout) and calculate its sha256 sum
# if the two checksums are equal, increment the matching files count
# if the percentage of matching files (of the total source files) < tolerance percentage, error
borgmatic.hooks.command.execute_hook(
config.get('after_check'),
config.get('umask'),

View File

@ -1,172 +1,28 @@
import argparse
import datetime
import hashlib
import itertools
import json
import logging
import os
import pathlib
from borgmatic.borg import environment, extract, feature, flags, rinfo, state
from borgmatic.borg import environment, feature, flags, rinfo
from borgmatic.execute import DO_NOT_CAPTURE, execute_command
DEFAULT_CHECKS = (
{'name': 'repository', 'frequency': '1 month'},
{'name': 'archives', 'frequency': '1 month'},
)
logger = logging.getLogger(__name__)
def parse_checks(config, only_checks=None):
def make_archive_filter_flags(local_borg_version, config, checks, check_arguments):
'''
Given a configuration dict with a "checks" sequence of dicts and an optional list of override
checks, return a tuple of named checks to run.
Given the local Borg version, a configuration dict, a parsed sequence of checks, and check
arguments as an argparse.Namespace instance, transform the checks into tuple of command-line
flags for filtering archives in a check command.
For example, given a config of:
{'checks': ({'name': 'repository'}, {'name': 'archives'})}
This will be returned as:
('repository', 'archives')
If no "checks" option is present in the config, return the DEFAULT_CHECKS. If a checks value
has a name of "disabled", return an empty tuple, meaning that no checks should be run.
If "check_last" is set in the configuration and "archives" is in checks, then include a "--last"
flag. And if "prefix" is set in configuration and "archives" is in checks, then include a
"--match-archives" flag.
'''
checks = only_checks or tuple(
check_config['name'] for check_config in (config.get('checks', None) or DEFAULT_CHECKS)
)
checks = tuple(check.lower() for check in checks)
check_last = config.get('check_last', None)
prefix = config.get('prefix')
if 'disabled' in checks:
logger.warning(
'The "disabled" value for the "checks" option is deprecated and will be removed from a future release; use "skip_actions" instead'
)
if len(checks) > 1:
logger.warning(
'Multiple checks are configured, but one of them is "disabled"; not running any checks'
)
return ()
return checks
def parse_frequency(frequency):
'''
Given a frequency string with a number and a unit of time, return a corresponding
datetime.timedelta instance or None if the frequency is None or "always".
For instance, given "3 weeks", return datetime.timedelta(weeks=3)
Raise ValueError if the given frequency cannot be parsed.
'''
if not frequency:
return None
frequency = frequency.strip().lower()
if frequency == 'always':
return None
try:
number, time_unit = frequency.split(' ')
number = int(number)
except ValueError:
raise ValueError(f"Could not parse consistency check frequency '{frequency}'")
if not time_unit.endswith('s'):
time_unit += 's'
if time_unit == 'months':
number *= 30
time_unit = 'days'
elif time_unit == 'years':
number *= 365
time_unit = 'days'
try:
return datetime.timedelta(**{time_unit: number})
except TypeError:
raise ValueError(f"Could not parse consistency check frequency '{frequency}'")
def filter_checks_on_frequency(
config,
borg_repository_id,
checks,
force,
archives_check_id=None,
):
'''
Given a configuration dict with a "checks" sequence of dicts, a Borg repository ID, a sequence
of checks, whether to force checks to run, and an ID for the archives check potentially being
run (if any), filter down those checks based on the configured "frequency" for each check as
compared to its check time file.
In other words, a check whose check time file's timestamp is too new (based on the configured
frequency) will get cut from the returned sequence of checks. Example:
config = {
'checks': [
{
'name': 'archives',
'frequency': '2 weeks',
},
]
}
When this function is called with that config and "archives" in checks, "archives" will get
filtered out of the returned result if its check time file is newer than 2 weeks old, indicating
that it's not yet time to run that check again.
Raise ValueError if a frequency cannot be parsed.
'''
if not checks:
return checks
filtered_checks = list(checks)
if force:
return tuple(filtered_checks)
for check_config in config.get('checks', DEFAULT_CHECKS):
check = check_config['name']
if checks and check not in checks:
continue
frequency_delta = parse_frequency(check_config.get('frequency'))
if not frequency_delta:
continue
check_time = probe_for_check_time(config, borg_repository_id, check, archives_check_id)
if not check_time:
continue
# If we've not yet reached the time when the frequency dictates we're ready for another
# check, skip this check.
if datetime.datetime.now() < check_time + frequency_delta:
remaining = check_time + frequency_delta - datetime.datetime.now()
logger.info(
f'Skipping {check} check due to configured frequency; {remaining} until next check (use --force to check anyway)'
)
filtered_checks.remove(check)
return tuple(filtered_checks)
def make_archive_filter_flags(
local_borg_version, config, checks, check_arguments, check_last=None, prefix=None
):
'''
Given the local Borg version, a configuration dict, a parsed sequence of checks, check arguments
as an argparse.Namespace instance, the check last value, and a consistency check prefix,
transform the checks into tuple of command-line flags for filtering archives in a check command.
If a check_last value is given and "archives" is in checks, then include a "--last" flag. And if
a prefix value is given and "archives" is in checks, then include a "--match-archives" flag.
'''
if 'archives' in checks or 'data' in checks:
return (('--last', str(check_last)) if check_last else ()) + (
(
@ -196,17 +52,6 @@ def make_archive_filter_flags(
return ()
def make_archives_check_id(archive_filter_flags):
'''
Given a sequence of flags to filter archives, return a unique hash corresponding to those
particular flags. If there are no flags, return None.
'''
if not archive_filter_flags:
return None
return hashlib.sha256(' '.join(archive_filter_flags).encode()).hexdigest()
def make_check_flags(checks, archive_filter_flags):
'''
Given a parsed sequence of checks and a sequence of flags to filter archives, transform the
@ -240,144 +85,15 @@ def make_check_flags(checks, archive_filter_flags):
)
def make_check_time_path(config, borg_repository_id, check_type, archives_check_id=None):
def get_repository_id(repository_path, config, local_borg_version, global_arguments, local_path, remote_path):
'''
Given a configuration dict, a Borg repository ID, the name of a check type ("repository",
"archives", etc.), and a unique hash of the archives filter flags, return a path for recording
that check's time (the time of that check last occurring).
'''
borgmatic_source_directory = os.path.expanduser(
config.get('borgmatic_source_directory', state.DEFAULT_BORGMATIC_SOURCE_DIRECTORY)
)
Given a local or remote repository path, a configuration dict, the local Borg version, global
arguments, and local/remote commands to run, return the corresponding Borg repository ID.
if check_type in ('archives', 'data'):
return os.path.join(
borgmatic_source_directory,
'checks',
borg_repository_id,
check_type,
archives_check_id if archives_check_id else 'all',
)
return os.path.join(
borgmatic_source_directory,
'checks',
borg_repository_id,
check_type,
)
def write_check_time(path): # pragma: no cover
'''
Record a check time of now as the modification time of the given path.
'''
logger.debug(f'Writing check time at {path}')
os.makedirs(os.path.dirname(path), mode=0o700, exist_ok=True)
pathlib.Path(path, mode=0o600).touch()
def read_check_time(path):
'''
Return the check time based on the modification time of the given path. Return None if the path
doesn't exist.
'''
logger.debug(f'Reading check time from {path}')
try:
return datetime.datetime.fromtimestamp(os.stat(path).st_mtime)
except FileNotFoundError:
return None
def probe_for_check_time(config, borg_repository_id, check, archives_check_id):
'''
Given a configuration dict, a Borg repository ID, the name of a check type ("repository",
"archives", etc.), and a unique hash of the archives filter flags, return a the corresponding
check time or None if such a check time does not exist.
When the check type is "archives" or "data", this function probes two different paths to find
the check time, e.g.:
~/.borgmatic/checks/1234567890/archives/9876543210
~/.borgmatic/checks/1234567890/archives/all
... and returns the maximum modification time of the files found (if any). The first path
represents a more specific archives check time (a check on a subset of archives), and the second
is a fallback to the last "all" archives check.
For other check types, this function reads from a single check time path, e.g.:
~/.borgmatic/checks/1234567890/repository
'''
check_times = (
read_check_time(group[0])
for group in itertools.groupby(
(
make_check_time_path(config, borg_repository_id, check, archives_check_id),
make_check_time_path(config, borg_repository_id, check),
)
)
)
try:
return max(check_time for check_time in check_times if check_time)
except ValueError:
return None
def upgrade_check_times(config, borg_repository_id):
'''
Given a configuration dict and a Borg repository ID, upgrade any corresponding check times on
disk from old-style paths to new-style paths.
Currently, the only upgrade performed is renaming an archive or data check path that looks like:
~/.borgmatic/checks/1234567890/archives
to:
~/.borgmatic/checks/1234567890/archives/all
'''
for check_type in ('archives', 'data'):
new_path = make_check_time_path(config, borg_repository_id, check_type, 'all')
old_path = os.path.dirname(new_path)
temporary_path = f'{old_path}.temp'
if not os.path.isfile(old_path) and not os.path.isfile(temporary_path):
continue
logger.debug(f'Upgrading archives check time from {old_path} to {new_path}')
try:
os.rename(old_path, temporary_path)
except FileNotFoundError:
pass
os.mkdir(old_path)
os.rename(temporary_path, new_path)
def check_archives(
repository_path,
config,
local_borg_version,
check_arguments,
global_arguments,
local_path='borg',
remote_path=None,
):
'''
Given a local or remote repository path, a configuration dict, the local Borg version, check
arguments as an argparse.Namespace instance, global arguments, and local/remote commands to run,
check the contained Borg archives for consistency.
If there are no consistency checks to run, skip running them.
Raises ValueError if the Borg repository ID cannot be determined.
Raise ValueError if the Borg repository ID cannot be determined.
'''
try:
borg_repository_id = json.loads(
return json.loads(
rinfo.display_repository_info(
repository_path,
config,
@ -391,82 +107,63 @@ def check_archives(
except (json.JSONDecodeError, KeyError):
raise ValueError(f'Cannot determine Borg repository ID for {repository_path}')
upgrade_check_times(config, borg_repository_id)
check_last = config.get('check_last', None)
prefix = config.get('prefix')
configured_checks = parse_checks(config, check_arguments.only_checks)
lock_wait = None
def check_archives(
repository_path,
config,
local_borg_version,
check_arguments,
global_arguments,
checks,
archive_filter_flags,
local_path='borg',
remote_path=None,
):
'''
Given a local or remote repository path, a configuration dict, the local Borg version, check
arguments as an argparse.Namespace instance, global arguments, a set of named Borg checks to run
(some combination "repository", "archives", and/or "data"), archive filter flags, and
local/remote commands to run, check the contained Borg archives for consistency.
'''
lock_wait = config.get('lock_wait')
extra_borg_options = config.get('extra_borg_options', {}).get('check', '')
archive_filter_flags = make_archive_filter_flags(
local_borg_version, config, configured_checks, check_arguments, check_last, prefix
)
archives_check_id = make_archives_check_id(archive_filter_flags)
checks = filter_checks_on_frequency(
config,
borg_repository_id,
configured_checks,
check_arguments.force,
archives_check_id,
verbosity_flags = ()
if logger.isEnabledFor(logging.INFO):
verbosity_flags = ('--info',)
if logger.isEnabledFor(logging.DEBUG):
verbosity_flags = ('--debug', '--show-rc')
full_command = (
(local_path, 'check')
+ (('--repair',) if check_arguments.repair else ())
+ make_check_flags(checks, archive_filter_flags)
+ (('--remote-path', remote_path) if remote_path else ())
+ (('--log-json',) if global_arguments.log_json else ())
+ (('--lock-wait', str(lock_wait)) if lock_wait else ())
+ verbosity_flags
+ (('--progress',) if check_arguments.progress else ())
+ (tuple(extra_borg_options.split(' ')) if extra_borg_options else ())
+ flags.make_repository_flags(repository_path, local_borg_version)
)
if set(checks).intersection({'repository', 'archives', 'data'}):
lock_wait = config.get('lock_wait')
borg_environment = environment.make_environment(config)
borg_exit_codes = config.get('borg_exit_codes')
verbosity_flags = ()
if logger.isEnabledFor(logging.INFO):
verbosity_flags = ('--info',)
if logger.isEnabledFor(logging.DEBUG):
verbosity_flags = ('--debug', '--show-rc')
full_command = (
(local_path, 'check')
+ (('--repair',) if check_arguments.repair else ())
+ make_check_flags(checks, archive_filter_flags)
+ (('--remote-path', remote_path) if remote_path else ())
+ (('--log-json',) if global_arguments.log_json else ())
+ (('--lock-wait', str(lock_wait)) if lock_wait else ())
+ verbosity_flags
+ (('--progress',) if check_arguments.progress else ())
+ (tuple(extra_borg_options.split(' ')) if extra_borg_options else ())
+ flags.make_repository_flags(repository_path, local_borg_version)
# The Borg repair option triggers an interactive prompt, which won't work when output is
# captured. And progress messes with the terminal directly.
if check_arguments.repair or check_arguments.progress:
execute_command(
full_command,
output_file=DO_NOT_CAPTURE,
extra_environment=borg_environment,
borg_local_path=local_path,
borg_exit_codes=borg_exit_codes,
)
borg_environment = environment.make_environment(config)
borg_exit_codes = config.get('borg_exit_codes')
# The Borg repair option triggers an interactive prompt, which won't work when output is
# captured. And progress messes with the terminal directly.
if check_arguments.repair or check_arguments.progress:
execute_command(
full_command,
output_file=DO_NOT_CAPTURE,
extra_environment=borg_environment,
borg_local_path=local_path,
borg_exit_codes=borg_exit_codes,
)
else:
execute_command(
full_command,
extra_environment=borg_environment,
borg_local_path=local_path,
borg_exit_codes=borg_exit_codes,
)
for check in checks:
write_check_time(
make_check_time_path(config, borg_repository_id, check, archives_check_id)
)
if 'extract' in checks:
extract.extract_last_archive_dry_run(
config,
local_borg_version,
global_arguments,
repository_path,
lock_wait,
local_path,
remote_path,
else:
execute_command(
full_command,
extra_environment=borg_environment,
borg_local_path=local_path,
borg_exit_codes=borg_exit_codes,
)
write_check_time(make_check_time_path(config, borg_repository_id, 'extract'))

View File

@ -21,6 +21,19 @@ def insert_newline_before_comment(config, field_name):
)
def get_properties(schema):
'''
Given a schema dict, return its properties. But if it's got sub-schemas with multiple different
potential properties, returned their merged properties instead.
'''
if 'oneOf' in schema:
return dict(
collections.ChainMap(*[sub_schema['properties'] for sub_schema in schema['oneOf']])
)
return schema['properties']
def schema_to_sample_configuration(schema, level=0, parent_is_sequence=False):
'''
Given a loaded configuration schema, generate and return sample config for it. Include comments
@ -40,7 +53,7 @@ def schema_to_sample_configuration(schema, level=0, parent_is_sequence=False):
config = ruamel.yaml.comments.CommentedMap(
[
(field_name, schema_to_sample_configuration(sub_schema, level + 1))
for field_name, sub_schema in schema['properties'].items()
for field_name, sub_schema in get_properties(schema).items()
]
)
indent = (level * INDENT) + (SEQUENCE_INDENT if parent_is_sequence else 0)
@ -151,7 +164,7 @@ def add_comments_to_configuration_sequence(config, schema, indent=0):
return
for field_name in config[0].keys():
field_schema = schema['items']['properties'].get(field_name, {})
field_schema = get_properties(schema['items']).get(field_name, {})
description = field_schema.get('description')
# No description to use? Skip it.
@ -178,7 +191,7 @@ def add_comments_to_configuration_object(config, schema, indent=0, skip_first=Fa
if skip_first and index == 0:
continue
field_schema = schema['properties'].get(field_name, {})
field_schema = get_properties(schema).get(field_name, {})
description = field_schema.get('description', '').strip()
# If this is an optional key, add an indicator to the comment flagging it to be commented

View File

@ -503,37 +503,99 @@ properties:
type: array
items:
type: object
required: ['name']
additionalProperties: false
properties:
name:
type: string
enum:
- repository
- archives
- data
- extract
- disabled
description: |
Name of consistency check to run: "repository",
"archives", "data", and/or "extract". "repository"
checks the consistency of the repository, "archives"
checks all of the archives, "data" verifies the
integrity of the data within the archives, and "extract"
does an extraction dry-run of the most recent archive.
Note that "data" implies "archives". See "skip_actions"
for disabling checks altogether.
example: repository
frequency:
type: string
description: |
How frequently to run this type of consistency check (as
a best effort). The value is a number followed by a unit
of time. E.g., "2 weeks" to run this consistency check
no more than every two weeks for a given repository or
"1 month" to run it no more than monthly. Defaults to
"always": running this check every time checks are run.
example: 2 weeks
oneOf:
- required: [name]
additionalProperties: false
properties:
name:
type: string
enum:
- repository
- archives
- data
- extract
- disabled
description: |
Name of consistency check to run: "repository",
"archives", "data", "spot", and/or "extract".
"repository" checks the consistency of the
repository, "archives" checks all of the
archives, "data" verifies the integrity of the
data within the archives, "spot" checks that
some percentage of source files are found in the
most recent archive (with identical contents),
and "extract" does an extraction dry-run of the
most recent archive. Note that "data" implies
"archives". See "skip_actions" for disabling
checks altogether.
example: spot
frequency:
type: string
description: |
How frequently to run this type of consistency
check (as a best effort). The value is a number
followed by a unit of time. E.g., "2 weeks" to
run this consistency check no more than every
two weeks for a given repository or "1 month" to
run it no more than monthly. Defaults to
"always": running this check every time checks
are run.
example: 2 weeks
- required:
- name
- sample_percentage
- tolerance_percentage
additionalProperties: false
properties:
name:
type: string
enum:
- spot
description: |
Name of consistency check to run: "repository",
"archives", "data", "spot", and/or "extract".
"repository" checks the consistency of the
repository, "archives" checks all of the
archives, "data" verifies the integrity of the
data within the archives, "spot" checks that
some percentage of source files are found in the
most recent archive (with identical contents),
and "extract" does an extraction dry-run of the
most recent archive. Note that "data" implies
"archives". See "skip_actions" for disabling
checks altogether.
example: repository
frequency:
type: string
description: |
How frequently to run this type of consistency
check (as a best effort). The value is a number
followed by a unit of time. E.g., "2 weeks" to
run this consistency check no more than every
two weeks for a given repository or "1 month" to
run it no more than monthly. Defaults to
"always": running this check every time checks
are run.
example: 2 weeks
sample_percentage:
type: number
description: |
The percentage of total files in the source
directories to randomly sample and compare to
their corresponding files in the most recent
backup archive. Only applies to the "spot"
check.
example: 5
tolerance_percentage:
type: number
description: |
The percentage of total files in the source
directories that can fail a spot check
comparison without failing the entire
consistency check. Should be lower than or
equal to the "sample_percentage". Only applies
to the "spot" check.
example: 0.5
description: |
List of one or more consistency checks to run on a periodic basis
(if "frequency" is set) or every time borgmatic runs checks (if

View File

@ -1,18 +1,433 @@
from flexmock import flexmock
import pytest
from borgmatic.actions import check as module
def test_run_check_calls_hooks_for_configured_repository():
def test_parse_checks_returns_them_as_tuple():
checks = module.parse_checks({'checks': [{'name': 'foo'}, {'name': 'bar'}]})
assert checks == ('foo', 'bar')
def test_parse_checks_with_missing_value_returns_defaults():
checks = module.parse_checks({})
assert checks == ('repository', 'archives')
def test_parse_checks_with_empty_list_returns_defaults():
checks = module.parse_checks({'checks': []})
assert checks == ('repository', 'archives')
def test_parse_checks_with_none_value_returns_defaults():
checks = module.parse_checks({'checks': None})
assert checks == ('repository', 'archives')
def test_parse_checks_with_disabled_returns_no_checks():
checks = module.parse_checks({'checks': [{'name': 'foo'}, {'name': 'disabled'}]})
assert checks == ()
def test_parse_checks_prefers_override_checks_to_configured_checks():
checks = module.parse_checks(
{'checks': [{'name': 'archives'}]}, only_checks=['repository', 'extract']
)
assert checks == ('repository', 'extract')
@pytest.mark.parametrize(
'frequency,expected_result',
(
(None, None),
('always', None),
('1 hour', module.datetime.timedelta(hours=1)),
('2 hours', module.datetime.timedelta(hours=2)),
('1 day', module.datetime.timedelta(days=1)),
('2 days', module.datetime.timedelta(days=2)),
('1 week', module.datetime.timedelta(weeks=1)),
('2 weeks', module.datetime.timedelta(weeks=2)),
('1 month', module.datetime.timedelta(days=30)),
('2 months', module.datetime.timedelta(days=60)),
('1 year', module.datetime.timedelta(days=365)),
('2 years', module.datetime.timedelta(days=365 * 2)),
),
)
def test_parse_frequency_parses_into_timedeltas(frequency, expected_result):
assert module.parse_frequency(frequency) == expected_result
@pytest.mark.parametrize(
'frequency',
(
'sometime',
'x days',
'3 decades',
),
)
def test_parse_frequency_raises_on_parse_error(frequency):
with pytest.raises(ValueError):
module.parse_frequency(frequency)
def test_filter_checks_on_frequency_without_config_uses_default_checks():
flexmock(module).should_receive('parse_frequency').and_return(
module.datetime.timedelta(weeks=4)
)
flexmock(module).should_receive('make_check_time_path')
flexmock(module).should_receive('probe_for_check_time').and_return(None)
assert module.filter_checks_on_frequency(
config={},
borg_repository_id='repo',
checks=('repository', 'archives'),
force=False,
archives_check_id='1234',
) == ('repository', 'archives')
def test_filter_checks_on_frequency_retains_unconfigured_check():
assert module.filter_checks_on_frequency(
config={},
borg_repository_id='repo',
checks=('data',),
force=False,
) == ('data',)
def test_filter_checks_on_frequency_retains_check_without_frequency():
flexmock(module).should_receive('parse_frequency').and_return(None)
assert module.filter_checks_on_frequency(
config={'checks': [{'name': 'archives'}]},
borg_repository_id='repo',
checks=('archives',),
force=False,
archives_check_id='1234',
) == ('archives',)
def test_filter_checks_on_frequency_retains_check_with_elapsed_frequency():
flexmock(module).should_receive('parse_frequency').and_return(
module.datetime.timedelta(hours=1)
)
flexmock(module).should_receive('make_check_time_path')
flexmock(module).should_receive('probe_for_check_time').and_return(
module.datetime.datetime(year=module.datetime.MINYEAR, month=1, day=1)
)
assert module.filter_checks_on_frequency(
config={'checks': [{'name': 'archives', 'frequency': '1 hour'}]},
borg_repository_id='repo',
checks=('archives',),
force=False,
archives_check_id='1234',
) == ('archives',)
def test_filter_checks_on_frequency_retains_check_with_missing_check_time_file():
flexmock(module).should_receive('parse_frequency').and_return(
module.datetime.timedelta(hours=1)
)
flexmock(module).should_receive('make_check_time_path')
flexmock(module).should_receive('probe_for_check_time').and_return(None)
assert module.filter_checks_on_frequency(
config={'checks': [{'name': 'archives', 'frequency': '1 hour'}]},
borg_repository_id='repo',
checks=('archives',),
force=False,
archives_check_id='1234',
) == ('archives',)
def test_filter_checks_on_frequency_skips_check_with_unelapsed_frequency():
flexmock(module).should_receive('parse_frequency').and_return(
module.datetime.timedelta(hours=1)
)
flexmock(module).should_receive('make_check_time_path')
flexmock(module).should_receive('probe_for_check_time').and_return(
module.datetime.datetime.now()
)
assert (
module.filter_checks_on_frequency(
config={'checks': [{'name': 'archives', 'frequency': '1 hour'}]},
borg_repository_id='repo',
checks=('archives',),
force=False,
archives_check_id='1234',
)
== ()
)
def test_filter_checks_on_frequency_restains_check_with_unelapsed_frequency_and_force():
assert module.filter_checks_on_frequency(
config={'checks': [{'name': 'archives', 'frequency': '1 hour'}]},
borg_repository_id='repo',
checks=('archives',),
force=True,
archives_check_id='1234',
) == ('archives',)
def test_filter_checks_on_frequency_passes_through_empty_checks():
assert (
module.filter_checks_on_frequency(
config={'checks': [{'name': 'archives', 'frequency': '1 hour'}]},
borg_repository_id='repo',
checks=(),
force=False,
archives_check_id='1234',
)
== ()
)
def test_make_archives_check_id_with_flags_returns_a_value_and_does_not_raise():
assert module.make_archives_check_id(('--match-archives', 'sh:foo-*'))
def test_make_archives_check_id_with_empty_flags_returns_none():
assert module.make_archives_check_id(()) is None
def test_make_check_time_path_with_borgmatic_source_directory_includes_it():
flexmock(module.os.path).should_receive('expanduser').with_args('~/.borgmatic').and_return(
'/home/user/.borgmatic'
)
assert (
module.make_check_time_path(
{'borgmatic_source_directory': '~/.borgmatic'}, '1234', 'archives', '5678'
)
== '/home/user/.borgmatic/checks/1234/archives/5678'
)
def test_make_check_time_path_without_borgmatic_source_directory_uses_default():
flexmock(module.os.path).should_receive('expanduser').with_args(
module.borgmatic.borg.state.DEFAULT_BORGMATIC_SOURCE_DIRECTORY
).and_return('/home/user/.borgmatic')
assert (
module.make_check_time_path({}, '1234', 'archives', '5678')
== '/home/user/.borgmatic/checks/1234/archives/5678'
)
def test_make_check_time_path_with_archives_check_and_no_archives_check_id_defaults_to_all():
flexmock(module.os.path).should_receive('expanduser').with_args('~/.borgmatic').and_return(
'/home/user/.borgmatic'
)
assert (
module.make_check_time_path(
{'borgmatic_source_directory': '~/.borgmatic'},
'1234',
'archives',
)
== '/home/user/.borgmatic/checks/1234/archives/all'
)
def test_make_check_time_path_with_repositories_check_ignores_archives_check_id():
flexmock(module.os.path).should_receive('expanduser').with_args('~/.borgmatic').and_return(
'/home/user/.borgmatic'
)
assert (
module.make_check_time_path(
{'borgmatic_source_directory': '~/.borgmatic'}, '1234', 'repository', '5678'
)
== '/home/user/.borgmatic/checks/1234/repository'
)
def test_read_check_time_does_not_raise():
flexmock(module.os).should_receive('stat').and_return(flexmock(st_mtime=123))
assert module.read_check_time('/path')
def test_read_check_time_on_missing_file_does_not_raise():
flexmock(module.os).should_receive('stat').and_raise(FileNotFoundError)
assert module.read_check_time('/path') is None
def test_probe_for_check_time_uses_maximum_of_multiple_check_times():
flexmock(module).should_receive('make_check_time_path').and_return(
'~/.borgmatic/checks/1234/archives/5678'
).and_return('~/.borgmatic/checks/1234/archives/all')
flexmock(module).should_receive('read_check_time').and_return(1).and_return(2)
assert module.probe_for_check_time(flexmock(), flexmock(), flexmock(), flexmock()) == 2
def test_probe_for_check_time_deduplicates_identical_check_time_paths():
flexmock(module).should_receive('make_check_time_path').and_return(
'~/.borgmatic/checks/1234/archives/5678'
).and_return('~/.borgmatic/checks/1234/archives/5678')
flexmock(module).should_receive('read_check_time').and_return(1).once()
assert module.probe_for_check_time(flexmock(), flexmock(), flexmock(), flexmock()) == 1
def test_probe_for_check_time_skips_none_check_time():
flexmock(module).should_receive('make_check_time_path').and_return(
'~/.borgmatic/checks/1234/archives/5678'
).and_return('~/.borgmatic/checks/1234/archives/all')
flexmock(module).should_receive('read_check_time').and_return(None).and_return(2)
assert module.probe_for_check_time(flexmock(), flexmock(), flexmock(), flexmock()) == 2
def test_probe_for_check_time_uses_single_check_time():
flexmock(module).should_receive('make_check_time_path').and_return(
'~/.borgmatic/checks/1234/archives/5678'
).and_return('~/.borgmatic/checks/1234/archives/all')
flexmock(module).should_receive('read_check_time').and_return(1).and_return(None)
assert module.probe_for_check_time(flexmock(), flexmock(), flexmock(), flexmock()) == 1
def test_probe_for_check_time_returns_none_when_no_check_time_found():
flexmock(module).should_receive('make_check_time_path').and_return(
'~/.borgmatic/checks/1234/archives/5678'
).and_return('~/.borgmatic/checks/1234/archives/all')
flexmock(module).should_receive('read_check_time').and_return(None).and_return(None)
assert module.probe_for_check_time(flexmock(), flexmock(), flexmock(), flexmock()) is None
def test_upgrade_check_times_renames_old_check_paths_to_all():
base_path = '~/.borgmatic/checks/1234'
flexmock(module).should_receive('make_check_time_path').with_args(
object, object, 'archives', 'all'
).and_return(f'{base_path}/archives/all')
flexmock(module).should_receive('make_check_time_path').with_args(
object, object, 'data', 'all'
).and_return(f'{base_path}/data/all')
flexmock(module.os.path).should_receive('isfile').with_args(f'{base_path}/archives').and_return(
True
)
flexmock(module.os.path).should_receive('isfile').with_args(
f'{base_path}/archives.temp'
).and_return(False)
flexmock(module.os.path).should_receive('isfile').with_args(f'{base_path}/data').and_return(
False
)
flexmock(module.os.path).should_receive('isfile').with_args(
f'{base_path}/data.temp'
).and_return(False)
flexmock(module.os).should_receive('rename').with_args(
f'{base_path}/archives', f'{base_path}/archives.temp'
).once()
flexmock(module.os).should_receive('mkdir').with_args(f'{base_path}/archives').once()
flexmock(module.os).should_receive('rename').with_args(
f'{base_path}/archives.temp', f'{base_path}/archives/all'
).once()
module.upgrade_check_times(flexmock(), flexmock())
def test_upgrade_check_times_renames_data_check_paths_when_archives_paths_are_already_upgraded():
base_path = '~/.borgmatic/checks/1234'
flexmock(module).should_receive('make_check_time_path').with_args(
object, object, 'archives', 'all'
).and_return(f'{base_path}/archives/all')
flexmock(module).should_receive('make_check_time_path').with_args(
object, object, 'data', 'all'
).and_return(f'{base_path}/data/all')
flexmock(module.os.path).should_receive('isfile').with_args(f'{base_path}/archives').and_return(
False
)
flexmock(module.os.path).should_receive('isfile').with_args(
f'{base_path}/archives.temp'
).and_return(False)
flexmock(module.os.path).should_receive('isfile').with_args(f'{base_path}/data').and_return(
True
)
flexmock(module.os).should_receive('rename').with_args(
f'{base_path}/data', f'{base_path}/data.temp'
).once()
flexmock(module.os).should_receive('mkdir').with_args(f'{base_path}/data').once()
flexmock(module.os).should_receive('rename').with_args(
f'{base_path}/data.temp', f'{base_path}/data/all'
).once()
module.upgrade_check_times(flexmock(), flexmock())
def test_upgrade_check_times_skips_missing_check_paths():
flexmock(module).should_receive('make_check_time_path').and_return(
'~/.borgmatic/checks/1234/archives/all'
)
flexmock(module.os.path).should_receive('isfile').and_return(False)
flexmock(module.os).should_receive('rename').never()
flexmock(module.os).should_receive('mkdir').never()
module.upgrade_check_times(flexmock(), flexmock())
def test_upgrade_check_times_renames_stale_temporary_check_path():
base_path = '~/.borgmatic/checks/1234'
flexmock(module).should_receive('make_check_time_path').with_args(
object, object, 'archives', 'all'
).and_return(f'{base_path}/archives/all')
flexmock(module).should_receive('make_check_time_path').with_args(
object, object, 'data', 'all'
).and_return(f'{base_path}/data/all')
flexmock(module.os.path).should_receive('isfile').with_args(f'{base_path}/archives').and_return(
False
)
flexmock(module.os.path).should_receive('isfile').with_args(
f'{base_path}/archives.temp'
).and_return(True)
flexmock(module.os.path).should_receive('isfile').with_args(f'{base_path}/data').and_return(
False
)
flexmock(module.os.path).should_receive('isfile').with_args(
f'{base_path}/data.temp'
).and_return(False)
flexmock(module.os).should_receive('rename').with_args(
f'{base_path}/archives', f'{base_path}/archives.temp'
).and_raise(FileNotFoundError)
flexmock(module.os).should_receive('mkdir').with_args(f'{base_path}/archives').once()
flexmock(module.os).should_receive('rename').with_args(
f'{base_path}/archives.temp', f'{base_path}/archives/all'
).once()
module.upgrade_check_times(flexmock(), flexmock())
def test_run_check_checks_archives_for_configured_repository():
flexmock(module.logger).answer = lambda message: None
flexmock(module.borgmatic.config.validate).should_receive('repositories_match').never()
flexmock(module.borgmatic.borg.check).should_receive('get_repository_id').and_return(flexmock())
flexmock(module).should_receive('upgrade_check_times')
flexmock(module).should_receive('parse_checks')
flexmock(module.borgmatic.borg.check).should_receive('make_archive_filter_flags').and_return(())
flexmock(module).should_receive('make_archives_check_id').and_return(None)
flexmock(module).should_receive('filter_checks_on_frequency').and_return({'repository', 'archives'})
flexmock(module.borgmatic.borg.check).should_receive('check_archives').once()
flexmock(module).should_receive('make_check_time_path')
flexmock(module).should_receive('write_check_time')
flexmock(module.borgmatic.borg.extract).should_receive('extract_last_archive_dry_run').never()
flexmock(module.borgmatic.hooks.command).should_receive('execute_hook').times(2)
check_arguments = flexmock(
repository=None,
progress=flexmock(),
repair=flexmock(),
only=flexmock(),
only_checks=flexmock(),
force=flexmock(),
)
global_arguments = flexmock(monitoring_verbosity=1, dry_run=False)
@ -30,17 +445,98 @@ def test_run_check_calls_hooks_for_configured_repository():
)
def test_run_check_runs_with_selected_repository():
def test_run_check_runs_configured_extract_check():
flexmock(module.logger).answer = lambda message: None
flexmock(module.borgmatic.config.validate).should_receive('repositories_match').never()
flexmock(module.borgmatic.borg.check).should_receive('get_repository_id').and_return(flexmock())
flexmock(module).should_receive('upgrade_check_times')
flexmock(module).should_receive('parse_checks')
flexmock(module.borgmatic.borg.check).should_receive('make_archive_filter_flags').and_return(())
flexmock(module).should_receive('make_archives_check_id').and_return(None)
flexmock(module).should_receive('filter_checks_on_frequency').and_return({'extract'})
flexmock(module.borgmatic.borg.check).should_receive('check_archives').never()
flexmock(module.borgmatic.borg.extract).should_receive('extract_last_archive_dry_run').once()
flexmock(module).should_receive('make_check_time_path')
flexmock(module).should_receive('write_check_time')
flexmock(module.borgmatic.hooks.command).should_receive('execute_hook').times(2)
check_arguments = flexmock(
repository=None,
progress=flexmock(),
repair=flexmock(),
only_checks=flexmock(),
force=flexmock(),
)
global_arguments = flexmock(monitoring_verbosity=1, dry_run=False)
module.run_check(
config_filename='test.yaml',
repository={'path': 'repo'},
config={'repositories': ['repo']},
hook_context={},
local_borg_version=None,
check_arguments=check_arguments,
global_arguments=global_arguments,
local_path=None,
remote_path=None,
)
def test_run_check_without_checks_runs_nothing_except_hooks():
flexmock(module.logger).answer = lambda message: None
flexmock(module.borgmatic.config.validate).should_receive('repositories_match').never()
flexmock(module.borgmatic.borg.check).should_receive('get_repository_id').and_return(flexmock())
flexmock(module).should_receive('upgrade_check_times')
flexmock(module).should_receive('parse_checks')
flexmock(module.borgmatic.borg.check).should_receive('make_archive_filter_flags').and_return(())
flexmock(module).should_receive('make_archives_check_id').and_return(None)
flexmock(module).should_receive('filter_checks_on_frequency').and_return({})
flexmock(module.borgmatic.borg.check).should_receive('check_archives').never()
flexmock(module).should_receive('make_check_time_path')
flexmock(module).should_receive('write_check_time').never()
flexmock(module.borgmatic.borg.extract).should_receive('extract_last_archive_dry_run').never()
flexmock(module.borgmatic.hooks.command).should_receive('execute_hook').times(2)
check_arguments = flexmock(
repository=None,
progress=flexmock(),
repair=flexmock(),
only_checks=flexmock(),
force=flexmock(),
)
global_arguments = flexmock(monitoring_verbosity=1, dry_run=False)
module.run_check(
config_filename='test.yaml',
repository={'path': 'repo'},
config={'repositories': ['repo']},
hook_context={},
local_borg_version=None,
check_arguments=check_arguments,
global_arguments=global_arguments,
local_path=None,
remote_path=None,
)
def test_run_check_checks_archives_in_selected_repository():
flexmock(module.logger).answer = lambda message: None
flexmock(module.borgmatic.config.validate).should_receive(
'repositories_match'
).once().and_return(True)
flexmock(module.borgmatic.borg.check).should_receive('get_repository_id').and_return(flexmock())
flexmock(module).should_receive('upgrade_check_times')
flexmock(module).should_receive('parse_checks')
flexmock(module.borgmatic.borg.check).should_receive('make_archive_filter_flags').and_return(())
flexmock(module).should_receive('make_archives_check_id').and_return(None)
flexmock(module).should_receive('filter_checks_on_frequency').and_return({'repository', 'archives'})
flexmock(module.borgmatic.borg.check).should_receive('check_archives').once()
flexmock(module).should_receive('make_check_time_path')
flexmock(module).should_receive('write_check_time')
flexmock(module.borgmatic.borg.extract).should_receive('extract_last_archive_dry_run').never()
check_arguments = flexmock(
repository=flexmock(),
progress=flexmock(),
repair=flexmock(),
only=flexmock(),
only_checks=flexmock(),
force=flexmock(),
)
global_arguments = flexmock(monitoring_verbosity=1, dry_run=False)
@ -68,7 +564,7 @@ def test_run_check_bails_if_repository_does_not_match():
repository=flexmock(),
progress=flexmock(),
repair=flexmock(),
only=flexmock(),
only_checks=flexmock(),
force=flexmock(),
)
global_arguments = flexmock(monitoring_verbosity=1, dry_run=False)

File diff suppressed because it is too large Load Diff

View File

@ -6,9 +6,48 @@ from flexmock import flexmock
from borgmatic.config import generate as module
def test_get_properties_with_simple_object():
schema = {
'type': 'object',
'properties': OrderedDict(
[
('field1', {'example': 'Example'}),
]
),
}
assert module.get_properties(schema) == schema['properties']
def test_get_properties_merges_one_of_list_properties():
schema = {
'type': 'object',
'oneOf': [
{
'properties': OrderedDict(
[
('field1', {'example': 'Example 1'}),
('field2', {'example': 'Example 2'}),
]
),
},
{
'properties': OrderedDict(
[
('field2', {'example': 'Example 2'}),
('field3', {'example': 'Example 3'}),
]
),
},
],
}
assert module.get_properties(schema) == dict(
schema['oneOf'][0]['properties'], **schema['oneOf'][1]['properties']
)
def test_schema_to_sample_configuration_generates_config_map_with_examples():
flexmock(module.ruamel.yaml.comments).should_receive('CommentedMap').replace_with(OrderedDict)
flexmock(module).should_receive('add_comments_to_configuration_object')
schema = {
'type': 'object',
'properties': OrderedDict(
@ -19,6 +58,9 @@ def test_schema_to_sample_configuration_generates_config_map_with_examples():
]
),
}
flexmock(module).should_receive('get_properties').and_return(schema['properties'])
flexmock(module.ruamel.yaml.comments).should_receive('CommentedMap').replace_with(OrderedDict)
flexmock(module).should_receive('add_comments_to_configuration_object')
config = module.schema_to_sample_configuration(schema)
@ -42,9 +84,6 @@ def test_schema_to_sample_configuration_generates_config_sequence_of_strings_wit
def test_schema_to_sample_configuration_generates_config_sequence_of_maps_with_examples():
flexmock(module.ruamel.yaml.comments).should_receive('CommentedSeq').replace_with(list)
flexmock(module).should_receive('add_comments_to_configuration_sequence')
flexmock(module).should_receive('add_comments_to_configuration_object')
schema = {
'type': 'array',
'items': {
@ -54,6 +93,10 @@ def test_schema_to_sample_configuration_generates_config_sequence_of_maps_with_e
),
},
}
flexmock(module).should_receive('get_properties').and_return(schema['items']['properties'])
flexmock(module.ruamel.yaml.comments).should_receive('CommentedSeq').replace_with(list)
flexmock(module).should_receive('add_comments_to_configuration_sequence')
flexmock(module).should_receive('add_comments_to_configuration_object')
config = module.schema_to_sample_configuration(schema)