Initial work on command-line flags for all configuration (#303).

This commit is contained in:
Dan Helfman 2025-03-14 22:59:43 -07:00
commit 92279d3c71
6 changed files with 337 additions and 24 deletions

View file

@ -1,5 +1,7 @@
import collections
import itertools
import json
import re
import sys
from argparse import ArgumentParser
@ -282,12 +284,155 @@ def parse_arguments_for_actions(unparsed_arguments, action_parsers, global_parse
)
def make_parsers():
def add_arguments_from_schema(arguments_group, schema, unparsed_arguments, names=None):
'''
Build a global arguments parser, individual action parsers, and a combined parser containing
both. Return them as a tuple. The global parser is useful for parsing just global arguments
while ignoring actions, and the combined parser is handy for displaying help that includes
everything: global flags, a list of actions, etc.
Given an argparse._ArgumentGroup instance, a configuration schema dict, and a sequence of
unparsed argument strings, convert the entire schema into corresponding command-line flags and
add them to the arguments group.
For instance, given a schema of:
{
'type': 'object',
'properties': {
'foo': {
'type': 'object',
'properties': {
'bar': {'type': 'integer'}
}
}
}
}
... the following flag will be added to the arguments group:
--foo.bar
If "foo" is instead an array of objects, it will get added like this
--foo[0].bar
And if names are also passed in, they are considered to be the name components of an option
(e.g. "foo" and "bar") and are used to construct a resulting flag.
'''
if names is None:
names = ()
schema_type = schema.get('type')
# If this option has multiple types, just use the first one (that isn't "null").
if isinstance(schema_type, list):
try:
schema_type = next(single_type for single_type in schema_type if single_type != 'null')
except StopIteration:
raise ValueError(f'Unknown type in configuration schema: {schema_type}')
# If this is an "object" type, recurse for each child option ("property").
if schema_type in {'object', 'array'}:
properties = (
schema.get('items', {}).get('properties')
if schema_type == 'array'
else schema.get('properties')
)
if properties:
for name, child in properties.items():
add_arguments_from_schema(
arguments_group,
child,
unparsed_arguments,
names + ((name + '[0]',) if child.get('type') == 'array' else (name,)),
)
return
flag_name = '.'.join(names)
description = schema.get('description')
metavar = names[-1].upper()
if schema_type == 'array':
metavar = metavar.rstrip('S')
if description:
if schema_type == 'array':
items_schema = schema.get('items', {})
description += ' Can specify flag multiple times.'
if '[0]' in flag_name:
description += ' To specify a different list element, replace the "[0]" with another array index ("[1]", "[2]", etc.).'
description = description.replace('%', '%%')
try:
argument_type = {'string': str, 'integer': int, 'boolean': bool, 'array': str}[schema_type]
except KeyError:
raise ValueError(f'Unknown type in configuration schema: {schema_type}')
arguments_group.add_argument(
f"--{flag_name.replace('_', '-')}",
type=argument_type,
metavar=metavar,
action='append' if schema_type == 'array' else None,
help=description,
)
# We want to support flags that can have arbitrary indices like:
#
# --foo.bar[1].baz
#
# But argparse doesn't support that natively because the index can be an arbitrary number. We
# won't let that stop us though, will we? So, if the current flag name has an array component in
# it (e.g. a name with "[0]"), then make a pattern that would match the flag name regardless of
# the number that's in it. The idea is that we want to look for unparsed arguments that appear
# like the flag name, but instead of "[0]" they have, say, "[1]" or "[123]".
#
# Next, we check each unparsed argument against that pattern. If one of them matches, add an
# argument flag for it to the argument parser group. Example:
#
# Let's say flag_name is:
#
# --foo.bar[0].baz
#
# ... then the regular expression pattern will be:
#
# ^--foo\.bar\[\d+\]\.baz
#
# ... and, if that matches an unparsed argument of:
#
# --foo.bar[1].baz
#
# ... then an argument flag will get added equal to that unparsed argument. And the unparsed
# argument will match it when parsing is performed! In this manner, we're using the actual user
# CLI input to inform what exact flags we support!
if '[0]' not in flag_name or '--help' in unparsed_arguments:
return
pattern = re.compile(f'^--{flag_name.replace("[0]", r"\[\d+\]").replace(".", r"\.")}$')
existing_flags = set(
itertools.chain(
*(group_action.option_strings for group_action in arguments_group._group_actions)
)
)
for unparsed in unparsed_arguments:
unparsed_flag_name = unparsed.split('=', 1)[0]
if pattern.match(unparsed_flag_name) and unparsed_flag_name not in existing_flags:
arguments_group.add_argument(
unparsed_flag_name,
type=argument_type,
metavar=metavar,
help=description,
)
def make_parsers(schema, unparsed_arguments):
'''
Given a configuration schema dict, build a global arguments parser, individual action parsers,
and a combined parser containing both. Return them as a tuple. The global parser is useful for
parsing just global arguments while ignoring actions, and the combined parser is handy for
displaying help that includes everything: global flags, a list of actions, etc.
'''
config_paths = collect.get_default_config_paths(expand_home=True)
unexpanded_config_paths = collect.get_default_config_paths(expand_home=False)
@ -388,6 +533,7 @@ def make_parsers():
action='store_true',
help='Display installed version number of borgmatic and exit',
)
add_arguments_from_schema(global_group, schema, unparsed_arguments)
global_plus_action_parser = ArgumentParser(
description='''
@ -1523,15 +1669,18 @@ def make_parsers():
return global_parser, action_parsers, global_plus_action_parser
def parse_arguments(*unparsed_arguments):
def parse_arguments(schema, *unparsed_arguments):
'''
Given command-line arguments with which this script was invoked, parse the arguments and return
them as a dict mapping from action name (or "global") to an argparse.Namespace instance.
Given a configuration schema dict and the command-line arguments with which this script was
invoked, parse the arguments and return them as a dict mapping from action name (or "global") to
an argparse.Namespace instance.
Raise ValueError if the arguments cannot be parsed.
Raise SystemExit with an error code of 0 if "--help" was requested.
'''
global_parser, action_parsers, global_plus_action_parser = make_parsers()
global_parser, action_parsers, global_plus_action_parser = make_parsers(
schema, unparsed_arguments
)
arguments, remaining_action_arguments = parse_arguments_for_actions(
unparsed_arguments, action_parsers.choices, global_parser
)

View file

@ -8,6 +8,8 @@ import time
from queue import Queue
from subprocess import CalledProcessError
import ruamel.yaml
import borgmatic.actions.borg
import borgmatic.actions.break_lock
import borgmatic.actions.change_passphrase
@ -33,6 +35,7 @@ import borgmatic.actions.restore
import borgmatic.actions.transfer
import borgmatic.commands.completion.bash
import borgmatic.commands.completion.fish
import borgmatic.config.load
from borgmatic.borg import umount as borg_umount
from borgmatic.borg import version as borg_version
from borgmatic.commands.arguments import parse_arguments
@ -570,14 +573,14 @@ def run_actions(
)
def load_configurations(config_filenames, overrides=None, resolve_env=True):
def load_configurations(config_filenames, global_arguments, overrides=None, resolve_env=True):
'''
Given a sequence of configuration filenames, a sequence of configuration file override strings
in the form of "option.suboption=value", and whether to resolve environment variables, load and
validate each configuration file. Return the results as a tuple of: dict of configuration
filename to corresponding parsed configuration, a sequence of paths for all loaded configuration
files (including includes), and a sequence of logging.LogRecord instances containing any parse
errors.
Given a sequence of configuration filenames, global arguments as an argparse.Namespace, a
sequence of configuration file override strings in the form of "option.suboption=value", and
whether to resolve environment variables, load and validate each configuration file. Return the
results as a tuple of: dict of configuration filename to corresponding parsed configuration, a
sequence of paths for all loaded configuration files (including includes), and a sequence of
logging.LogRecord instances containing any parse errors.
Log records are returned here instead of being logged directly because logging isn't yet
initialized at this point! (Although with the Delayed_logging_handler now in place, maybe this
@ -605,6 +608,7 @@ def load_configurations(config_filenames, overrides=None, resolve_env=True):
configs[config_filename], paths, parse_logs = validate.parse_configuration(
config_filename,
validate.schema_filename(),
global_arguments,
overrides,
resolve_env,
)
@ -928,9 +932,17 @@ def exit_with_help_link(): # pragma: no cover
def main(extra_summary_logs=[]): # pragma: no cover
configure_signals()
configure_delayed_logging()
schema_filename = validate.schema_filename()
try:
arguments = parse_arguments(*sys.argv[1:])
schema = borgmatic.config.load.load_configuration(schema_filename)
except (ruamel.yaml.error.YAMLError, RecursionError) as error:
configure_logging(logging.CRITICAL)
logger.critical(error)
exit_with_help_link()
try:
arguments = parse_arguments(schema, *sys.argv[1:])
except ValueError as error:
configure_logging(logging.CRITICAL)
logger.critical(error)
@ -953,10 +965,10 @@ def main(extra_summary_logs=[]): # pragma: no cover
print(borgmatic.commands.completion.fish.fish_completion())
sys.exit(0)
validate = bool('validate' in arguments)
config_filenames = tuple(collect.collect_config_filenames(global_arguments.config_paths))
configs, config_paths, parse_logs = load_configurations(
config_filenames,
global_arguments,
global_arguments.overrides,
resolve_env=global_arguments.resolve_env and not validate,
)

View file

@ -0,0 +1,137 @@
import io
import re
import ruamel.yaml
LIST_INDEX_KEY_PATTERN = re.compile(r'^(?P<list_name>[a-zA-z-]+)\[(?P<index>\d+)\]$')
def set_values(config, keys, value):
'''
Given a configuration dict, a sequence of parsed key strings, and a string value, descend into
the configuration hierarchy based on the keys to set the value into the right place.
'''
if not keys:
return
first_key = keys[0]
# Support "name[0]"-style list index syntax.
match = LIST_INDEX_KEY_PATTERN.match(first_key)
if match:
list_key = match.group('list_name')
list_index = int(match.group('index'))
if len(keys) == 1:
config[list_key][list_index] = value
return
if list_key not in config:
config[list_key] = []
try:
set_values(config[list_key][list_index], keys[1:], value)
except IndexError:
raise ValueError(f'The list index {first_key} is out of range')
return
if len(keys) == 1:
config[first_key] = value
return
if first_key not in config:
config[first_key] = {}
set_values(config[first_key], keys[1:], value)
def type_for_option(schema, option_keys):
'''
Given a configuration schema dict and a sequence of keys identifying a potentially nested
option, e.g. ('extra_borg_options', 'create'), return the schema type of that option as a
string.
Return None if the option or its type cannot be found in the schema.
'''
option_schema = schema
for key in option_keys:
# Support "name[0]"-style list index syntax.
match = LIST_INDEX_KEY_PATTERN.match(key)
try:
if match:
option_schema = option_schema['properties'][match.group('list_name')]['items']
else:
option_schema = option_schema['properties'][key]
except KeyError:
return None
try:
return option_schema['type']
except KeyError:
return None
def prepare_arguments_for_config(global_arguments, schema):
'''
Given global arguments as an argparse.Namespace and a configuration schema dict, parse each
argument that corresponds to an option in the schema and return a sequence of tuples (keys,
values) for that option, where keys is a sequence of strings. For instance, given the following
arguments:
argparse.Namespace(**{'my_option.sub_option': 'value1', 'other_option': 'value2'})
... return this:
(
(('my_option', 'sub_option'), 'value1'),
(('other_option'), 'value2'),
)
Raise ValueError if an override can't be parsed.
'''
prepared_values = []
for argument_name, value in global_arguments.__dict__.items():
try:
if value is None:
continue
keys = tuple(argument_name.split('.'))
option_type = type_for_option(schema, keys)
# The argument doesn't correspond to any option in the schema, so ignore it. It's
# probably a flag that borgmatic has on the command-line but not in configuration.
if option_type is None:
continue
prepared_values.append(
(
keys,
value,
)
)
except ruamel.yaml.error.YAMLError as error:
raise ValueError(f"Invalid override '{raw_override}': {error.problem}")
return tuple(prepared_values)
def apply_arguments_to_config(config, schema, global_arguments):
'''
Given a configuration dict, a corresponding configuration schema dict, and global arguments as
an argparse.Namespace, set those given argument values into their corresponding configuration
options in the configuration dict.
This supports argument flags of the from "--foo.bar.baz" where each dotted component is a nested
configuration object. Additionally, flags like "--foo.bar[0].baz" are supported to update a list
element in the configuration.
'''
for keys, value in prepare_arguments_for_config(global_arguments, schema):
set_values(config, keys, value)

View file

@ -1,8 +1,12 @@
import io
import logging
import ruamel.yaml
logger = logging.getLogger(__name__)
def set_values(config, keys, value):
'''
Given a hierarchy of configuration dicts, a sequence of parsed key strings, and a string value,
@ -134,6 +138,9 @@ def apply_overrides(config, schema, raw_overrides):
'''
overrides = parse_overrides(raw_overrides, schema)
if overrides:
logger.warning("The --override flag is deprecated and will be removed from a future release. Instead, use a command-line flag corresponding to the configuration option you'd like to set.")
for keys, value in overrides:
set_values(config, keys, value)
set_values(config, strip_section_names(keys), value)

View file

@ -36,9 +36,14 @@ properties:
properties:
path:
type: string
description: The local path or Borg URL of the repository.
example: ssh://user@backupserver/./{fqdn}
label:
type: string
description: |
An optional label for the repository, used in logging
and to make selecting the repository easier on the
command-line.
example: backupserver
description: |
A required list of local or remote repositories with paths and

View file

@ -4,7 +4,7 @@ import os
import jsonschema
import ruamel.yaml
import borgmatic.config
import borgmatic.config.arguments
from borgmatic.config import constants, environment, load, normalize, override
@ -84,13 +84,15 @@ def apply_logical_validation(config_filename, parsed_configuration):
)
def parse_configuration(config_filename, schema_filename, overrides=None, resolve_env=True):
def parse_configuration(config_filename, schema_filename, global_arguments, overrides=None, resolve_env=True):
'''
Given the path to a config filename in YAML format, the path to a schema filename in a YAML
rendition of JSON Schema format, a sequence of configuration file override strings in the form
of "option.suboption=value", and whether to resolve environment variables, return the parsed
configuration as a data structure of nested dicts and lists corresponding to the schema. Example
return value:
rendition of JSON Schema format, global arguments as an argparse.Namespace, a sequence of
configuration file override strings in the form of "option.suboption=value", and whether to
resolve environment variables, return the parsed configuration as a data structure of nested
dicts and lists corresponding to the schema. Example return value.
Example return value:
{
'source_directories': ['/home', '/etc'],
@ -113,6 +115,7 @@ def parse_configuration(config_filename, schema_filename, overrides=None, resolv
except (ruamel.yaml.error.YAMLError, RecursionError) as error:
raise Validation_error(config_filename, (str(error),))
borgmatic.config.arguments.apply_arguments_to_config(config, schema, global_arguments)
override.apply_overrides(config, schema, overrides)
constants.apply_constants(config, config.get('constants') if config else {})