Add end-to-end tests for new credential hooks, along with some related configuration options.

This commit is contained in:
Dan Helfman 2025-02-14 15:33:30 -08:00
parent b283e379d0
commit 2ca23b629c
12 changed files with 342 additions and 10 deletions

View File

@ -2402,3 +2402,25 @@ properties:
description: |
Configuration for integration with Linux LVM (Logical Volume
Manager).
container:
type: object
additionalProperties: false
properties:
secrets_directory:
type: string
description: |
Secrets directory to use instead of "/run/secrets".
example: /path/to/secrets
description: |
Configuration for integration with Docker or Podman secrets.
keepassxc:
type: object
additionalProperties: false
properties:
keepassxc_cli_command:
type: string
description: |
Command to use instead of "keepassxc-cli".
example: /usr/local/bin/keepassxc-cli
description: |
Configuration for integration with the KeePassXC password manager.

View File

@ -6,7 +6,7 @@ logger = logging.getLogger(__name__)
SECRET_NAME_PATTERN = re.compile(r'^\w+$')
SECRETS_DIRECTORY = '/run/secrets'
DEFAULT_SECRETS_DIRECTORY = '/run/secrets'
def load_credential(hook_config, config, credential_parameters):
@ -27,7 +27,11 @@ def load_credential(hook_config, config, credential_parameters):
raise ValueError(f'Cannot load invalid secret name: "{secret_name}"')
try:
with open(os.path.join(SECRETS_DIRECTORY, secret_name)) as secret_file:
with open(
os.path.join(
(hook_config or {}).get('secrets_directory', DEFAULT_SECRETS_DIRECTORY), secret_name
)
) as secret_file:
return secret_file.read().rstrip(os.linesep)
except (FileNotFoundError, OSError) as error:
logger.warning(error)

View File

@ -1,5 +1,6 @@
import logging
import os
import shlex
import borgmatic.execute
@ -27,8 +28,8 @@ def load_credential(hook_config, config, credential_parameters):
)
return borgmatic.execute.execute_command_and_capture_output(
(
'keepassxc-cli',
tuple(shlex.split((hook_config or {}).get('keepassxc_cli_command', 'keepassxc-cli')))
+ (
'show',
'--show-protected',
'--attributes',

View File

@ -176,7 +176,7 @@ def dump_data_sources(
if 'password' in database
else None
)
dump_database_names = database_names_to_dump(database, extra_environment, dry_run)
dump_database_names = database_names_to_dump(database, config, extra_environment, dry_run)
if not dump_database_names:
if dry_run:

View File

@ -191,6 +191,16 @@ For specifics about which options are supported, see the
[configuration
reference](https://torsion.org/borgmatic/docs/reference/configuration/).
You can also optionally override the `/run/secrets` directory that borgmatic reads secrets from
inside a container:
```yaml
container:
secrets_directory: /path/to/secrets
```
But you should only need to do this for development or testing purposes.
### KeePassXC passwords
@ -236,6 +246,14 @@ For specifics about which options are supported, see the
[configuration
reference](https://torsion.org/borgmatic/docs/reference/configuration/).
You can also optionally override the `keepassxc-cli` command that borgmatic calls to load
passwords:
```yaml
keepassxc:
keepassxc_cli_command: /usr/local/bin/keepassxc-cli
```
### File-based credentials

View File

@ -0,0 +1,29 @@
import argparse
import sys
def parse_arguments(*unparsed_arguments):
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('command')
parser.add_argument('--show-protected', action='store_true')
parser.add_argument('--attributes')
parser.add_argument('database_path')
parser.add_argument('attribute_name')
return parser.parse_args(unparsed_arguments)
def main():
arguments = parse_arguments(*sys.argv[1:])
assert arguments.command == 'show'
assert arguments.show_protected
assert arguments.attributes == 'Password'
assert arguments.database_path.endswith('.kdbx')
assert arguments.attribute_name
print('test')
if __name__ == '__main__':
main()

View File

@ -0,0 +1,68 @@
import json
import os
import shutil
import subprocess
import sys
import tempfile
def generate_configuration(config_path, repository_path, secrets_directory):
'''
Generate borgmatic configuration into a file at the config path, and update the defaults so as
to work for testing, including updating the source directories, injecting the given repository
path, and tacking on an encryption passphrase loaded from container secrets in the given secrets
directory.
'''
subprocess.check_call(f'borgmatic config generate --destination {config_path}'.split(' '))
config = (
open(config_path)
.read()
.replace('ssh://user@backupserver/./sourcehostname.borg', repository_path)
.replace('- path: /mnt/backup', '')
.replace('label: local', '')
.replace('- /home/user/path with spaces', '')
.replace('- /home', f'- {config_path}')
.replace('- /etc', '')
.replace('- /var/log/syslog*', '')
+ '\nencryption_passphrase: "{credential container mysecret}"'
+ f'\ncontainer:\n secrets_directory: {secrets_directory}'
)
config_file = open(config_path, 'w')
config_file.write(config)
config_file.close()
def test_container_secret():
# Create a Borg repository.
temporary_directory = tempfile.mkdtemp()
repository_path = os.path.join(temporary_directory, 'test.borg')
original_working_directory = os.getcwd()
os.chdir(temporary_directory)
try:
config_path = os.path.join(temporary_directory, 'test.yaml')
generate_configuration(config_path, repository_path, secrets_directory=temporary_directory)
secret_path = os.path.join(temporary_directory, 'mysecret')
with open(secret_path, 'w') as secret_file:
secret_file.write('test')
subprocess.check_call(
f'borgmatic -v 2 --config {config_path} repo-create --encryption repokey'.split(' '),
)
# Run borgmatic to generate a backup archive, and then list it to make sure it exists.
subprocess.check_call(
f'borgmatic --config {config_path}'.split(' '),
)
output = subprocess.check_output(
f'borgmatic --config {config_path} list --json'.split(' '),
).decode(sys.stdout.encoding)
parsed_output = json.loads(output)
assert len(parsed_output) == 1
assert len(parsed_output[0]['archives']) == 1
finally:
os.chdir(original_working_directory)
shutil.rmtree(temporary_directory)

View File

@ -0,0 +1,68 @@
import json
import os
import shutil
import subprocess
import sys
import tempfile
def generate_configuration(config_path, repository_path, credential_path):
'''
Generate borgmatic configuration into a file at the config path, and update the defaults so as
to work for testing, including updating the source directories, injecting the given repository
path, and tacking on an encryption passphrase loaded from file at the given credential path.
'''
subprocess.check_call(f'borgmatic config generate --destination {config_path}'.split(' '))
config = (
open(config_path)
.read()
.replace('ssh://user@backupserver/./sourcehostname.borg', repository_path)
.replace('- path: /mnt/backup', '')
.replace('label: local', '')
.replace('- /home/user/path with spaces', '')
.replace('- /home', f'- {config_path}')
.replace('- /etc', '')
.replace('- /var/log/syslog*', '')
+ '\nencryption_passphrase: "{credential file '
+ credential_path
+ '}"'
)
config_file = open(config_path, 'w')
config_file.write(config)
config_file.close()
def test_file_credential():
# Create a Borg repository.
temporary_directory = tempfile.mkdtemp()
repository_path = os.path.join(temporary_directory, 'test.borg')
original_working_directory = os.getcwd()
os.chdir(temporary_directory)
try:
config_path = os.path.join(temporary_directory, 'test.yaml')
credential_path = os.path.join(temporary_directory, 'mycredential')
generate_configuration(config_path, repository_path, credential_path)
with open(credential_path, 'w') as credential_file:
credential_file.write('test')
subprocess.check_call(
f'borgmatic -v 2 --config {config_path} repo-create --encryption repokey'.split(' '),
)
# Run borgmatic to generate a backup archive, and then list it to make sure it exists.
subprocess.check_call(
f'borgmatic --config {config_path}'.split(' '),
)
output = subprocess.check_output(
f'borgmatic --config {config_path} list --json'.split(' '),
).decode(sys.stdout.encoding)
parsed_output = json.loads(output)
assert len(parsed_output) == 1
assert len(parsed_output[0]['archives']) == 1
finally:
os.chdir(original_working_directory)
shutil.rmtree(temporary_directory)

View File

@ -0,0 +1,67 @@
import json
import os
import shutil
import subprocess
import sys
import tempfile
def generate_configuration(config_path, repository_path):
'''
Generate borgmatic configuration into a file at the config path, and update the defaults so as
to work for testing, including updating the source directories, injecting the given repository
path, and tacking on an encryption passphrase loaded from keepassxc-cli.
'''
subprocess.check_call(f'borgmatic config generate --destination {config_path}'.split(' '))
config = (
open(config_path)
.read()
.replace('ssh://user@backupserver/./sourcehostname.borg', repository_path)
.replace('- path: /mnt/backup', '')
.replace('label: local', '')
.replace('- /home/user/path with spaces', '')
.replace('- /home', f'- {config_path}')
.replace('- /etc', '')
.replace('- /var/log/syslog*', '')
+ '\nencryption_passphrase: "{credential keepassxc keys.kdbx mypassword}"'
+ '\nkeepassxc:\n keepassxc_cli_command: python3 /app/tests/end-to-end/commands/fake_keepassxc_cli.py'
)
config_file = open(config_path, 'w')
config_file.write(config)
config_file.close()
def test_keepassxc_password():
# Create a Borg repository.
temporary_directory = tempfile.mkdtemp()
repository_path = os.path.join(temporary_directory, 'test.borg')
original_working_directory = os.getcwd()
os.chdir(temporary_directory)
try:
config_path = os.path.join(temporary_directory, 'test.yaml')
generate_configuration(config_path, repository_path)
database_path = os.path.join(temporary_directory, 'keys.kdbx')
with open(database_path, 'w') as database_file:
database_file.write('fake KeePassXC database to pacify file existence check')
subprocess.check_call(
f'borgmatic -v 2 --config {config_path} repo-create --encryption repokey'.split(' '),
)
# Run borgmatic to generate a backup archive, and then list it to make sure it exists.
subprocess.check_call(
f'borgmatic --config {config_path}'.split(' '),
)
output = subprocess.check_output(
f'borgmatic --config {config_path} list --json'.split(' '),
).decode(sys.stdout.encoding)
parsed_output = json.loads(output)
assert len(parsed_output) == 1
assert len(parsed_output[0]['archives']) == 1
finally:
os.chdir(original_working_directory)
shutil.rmtree(temporary_directory)

View File

@ -30,15 +30,13 @@ def generate_configuration(config_path, repository_path):
config_file.close()
def test_borgmatic_command():
def test_systemd_credential():
# Create a Borg repository.
temporary_directory = tempfile.mkdtemp()
repository_path = os.path.join(temporary_directory, 'test.borg')
extract_path = os.path.join(temporary_directory, 'extract')
original_working_directory = os.getcwd()
os.mkdir(extract_path)
os.chdir(extract_path)
os.chdir(temporary_directory)
try:
config_path = os.path.join(temporary_directory, 'test.yaml')

View File

@ -34,6 +34,21 @@ def test_load_credential_reads_named_secret_from_file():
)
def test_load_credential_with_custom_secrets_directory_looks_there_for_secret_file():
config = {'container': {'secrets_directory': '/secrets'}}
credential_stream = io.StringIO('password')
credential_stream.name = '/secrets/mysecret'
builtins = flexmock(sys.modules['builtins'])
builtins.should_receive('open').with_args('/secrets/mysecret').and_return(credential_stream)
assert (
module.load_credential(
hook_config=config['container'], config=config, credential_parameters=('mysecret',)
)
== 'password'
)
def test_load_credential_with_file_not_found_error_raises():
builtins = flexmock(sys.modules['builtins'])
builtins.should_receive('open').with_args('/run/secrets/mysecret').and_raise(FileNotFoundError)

View File

@ -28,7 +28,19 @@ def test_load_credential_with_present_database_fetches_password_from_keepassxc()
flexmock(module.os.path).should_receive('exists').and_return(True)
flexmock(module.borgmatic.execute).should_receive(
'execute_command_and_capture_output'
).and_return('password').once()
).with_args(
(
'keepassxc-cli',
'show',
'--show-protected',
'--attributes',
'Password',
'database.kdbx',
'mypassword',
)
).and_return(
'password'
).once()
assert (
module.load_credential(
@ -36,3 +48,33 @@ def test_load_credential_with_present_database_fetches_password_from_keepassxc()
)
== 'password'
)
def test_load_credential_with_custom_keepassxc_cli_command_calls_it():
config = {'keepassxc': {'keepassxc_cli_command': '/usr/local/bin/keepassxc-cli --some-option'}}
flexmock(module.os.path).should_receive('exists').and_return(True)
flexmock(module.borgmatic.execute).should_receive(
'execute_command_and_capture_output'
).with_args(
(
'/usr/local/bin/keepassxc-cli',
'--some-option',
'show',
'--show-protected',
'--attributes',
'Password',
'database.kdbx',
'mypassword',
)
).and_return(
'password'
).once()
assert (
module.load_credential(
hook_config=config['keepassxc'],
config=config,
credential_parameters=('database.kdbx', 'mypassword'),
)
== 'password'
)