WIP : InfluxDB support #450 #1058

Closed
gautamaggarwal2810 wants to merge 1 commits from gautamaggarwal2810/borgmatic:issue999 into main
3 changed files with 388 additions and 0 deletions

View File

@@ -1181,6 +1181,81 @@ properties:
description: |
Support for the "borgmatic bootstrap" action, used to extract
borgmatic configuration files from a backup archive.
influxdb_databases:
gautamaggarwal2810 marked this conversation as resolved
Review

I would recommend moving this schema section to the bottom of the existing schema for databases. My thinking is that since this is the last database added, it's less of a major database (for borgmatic users) than the existing ones like PostgreSQL.

I would recommend moving this schema section to the bottom of the existing schema for databases. My thinking is that since this is the last database added, it's less of a major database (for borgmatic users) than the existing ones like PostgreSQL.
type: array
items:
type: object
required:
- token
additionalProperties: false
properties:
influx_command:
type: string
description: |
The InfluxDB executable to use. Defaults to `influx`.
gautamaggarwal2810 marked this conversation as resolved
Review

I think the convention in schema descriptions for this kind of thing is double quotes (e.g. "influx").

I think the convention in schema descriptions for this kind of thing is double quotes (e.g. `"influx"`).
token:
type: string
description: |
Token to authenticate request. Required to use InfluxDB.
gautamaggarwal2810 marked this conversation as resolved
Review

Each of these need examples (e.g. example: ...) so that they show up properly in generated configuration. You can try borgmatic config generate -d test.yaml to verify.

Each of these need examples (e.g. `example: ...`) so that they show up properly in generated configuration. You can try `borgmatic config generate -d test.yaml` to verify.
hostname:
type: string
description: |
HTTP address of InfluxDB. Defaults to http://127.0.0.1:8086.
example: https://example.com:8086
Review

Other database hooks tend to use hostname to mean a bare hostname (e.g. example.com) and then have a separate port option for any port. And in fact I think you'll need something like that so that borgmatic restore works with this.

The main downside I can see of this is that there wouldn't be a way to specify http vs. https. Maybe a separate tls boolean option would make sense here? But then that might need to be plumbed through to restore as well...

I dunno, let me know your thoughts on this. I'm not super familiar with InfluxDB, so maybe giving connection strings as URLs is standard.

Other database hooks tend to use `hostname` to mean a bare hostname (e.g. `example.com`) and then have a separate `port` option for any port. And in fact I think you'll need something like that so that `borgmatic restore` works with this. The main downside I can see of this is that there wouldn't be a way to specify `http` vs. `https`. Maybe a separate `tls` boolean option would make sense here? But then that might need to be plumbed through to `restore` as well... I dunno, let me know your thoughts on this. I'm not super familiar with InfluxDB, so maybe giving connection strings as URLs is standard.

Yes giving connection strings as URL is standard in Influxdb. But it would diverge from conventions used for other database hooks and can cause confusion for users familiar with other hooks.

If consistency with other hooks is a priority, we can take up hostname,port and tls approach.

But if Influxdb users are more accustomed to connection strings, we can retain URL approach but document clearly to avoid confusion.

So how should I proceed on this??

Yes giving connection strings as URL is standard in Influxdb. But it would diverge from conventions used for other database hooks and can cause confusion for users familiar with other hooks. If consistency with other hooks is a priority, we can take up hostname,port and tls approach. But if Influxdb users are more accustomed to connection strings, we can retain URL approach but document clearly to avoid confusion. So how should I proceed on this??
Review

My thinking is that consistency with other hooks should be the priority here, not just because of consistency for consistency's sake, but for the very practical reason that a bunch of the existing database code assumes there's a hostname, port, etc. and it would actually be more work to not have those.

My thinking is that consistency with other hooks should be the priority here, not just because of consistency for consistency's sake, but for the very practical reason that a bunch of the existing database code assumes there's a hostname, port, etc. and it would actually be more work to not have those.
org_id:
Review

borgmatic option names tend towards the super verbose and spelled out. So calling this organization_id might be more in keeping with that convention. (Usually id itself isn't expanded because it's so ubiquitous.)

borgmatic option names tend towards the super verbose and spelled out. So calling this `organization_id` might be more in keeping with that convention. (Usually `id` itself isn't expanded because it's so ubiquitous.)
type: number
description: |
The ID of the organization.
org_name:
Review

Similar here.

Similar here.
type: string
description: |
The name of the organization.
Review

Should probably mention that it's mutually exclutive with org_id.

Should probably mention that it's mutually exclutive with `org_id`.
bucket_name:
type: string
description: |
The name of the bucket to backup.
bucket_id:
type: number
description: |
The ID of the bucket to backup.
Review

IMO this should mention that this option is mutually exclusive with bucket_name.

IMO this should mention that this option is mutually exclusive with `bucket_name`.
configs_path:
Review

Maybe configs_path -> configurations_path?

Maybe `configs_path` -> `configurations_path`?
type: string
description: |
Path to the influx CLI configurations.
active_config:
Review

Similar here.. Maybe active_config -> active_configuration.

Similar here.. Maybe `active_config` -> `active_configuration`.
type: string
description: |
Config name to use for command.
restore_bucket:
type: string
description: |
New name to use for the restored bucket.
Review

It might be good to add similar restore_* option name variants for other options as well. For instance, many database hooks have options like restore_hostname, restore_port, etc.

It might be good to add similar `restore_*` option name variants for other options as well. For instance, many database hooks have options like `restore_hostname`, `restore_port`, etc.
new_org:
Review

And I recommend calling this restore_org or better yet restore_organization.

And I recommend calling this `restore_org` or better yet `restore_organization`.
type: string
description: |
New name to use for the restored organization.
skip_verify:
type: boolean
description: |
Skip TLS certificate chain and host name verification.
Defaults to false.
http_debug:
type: boolean
description: |
Enable debug logging for HTTP requests.
full:
type: boolean
description: |
Fully restore and replace all data on server.
Review

How does this work exactly? Does it ignore the restore_bucket and restore all buckets? Or something else?

How does this work exactly? Does it ignore the `restore_bucket` and restore all buckets? Or something else?
Defaults to false.
compression:
type: string
description: |
Compression to use for local backup files, either 'none' or 'gzip'.
Defaults to gzip.
enum:
- none
- gzip
postgresql_databases:
type: array
items:

View File

@@ -0,0 +1,225 @@
import logging
import os
import shlex
import borgmatic.borg.pattern
import borgmatic.config.paths
from borgmatic.execute import execute_command, execute_command_with_processes
from borgmatic.hooks.data_source import dump
logger = logging.getLogger(__name__)
def make_dump_path(base_directory): # pragma: no cover
'''
Given a base directory, make the corresponding dump path.
'''
return dump.make_data_source_dump_path(base_directory, 'influxdb_databases')
def get_default_port(databases, config): # pragma: no cover
return 8086
def use_streaming(databases, config):
'''
Return whether dump streaming is used for this hook. (Spoiler: It isn't.)
Review

This comment should be updated so it's consistent with what the function does!

This comment should be updated so it's consistent with what the function does!
'''
return True
def dump_data_sources(
databases,
config,
config_paths,
borgmatic_runtime_directory,
patterns,
dry_run,
):
'''
Dump the given InfluxDB databases to a named pipe. The databases are supplied as a sequence of
dicts, one dict describing each database as per the configuration schema. Use the borgmatic
runtime directory to construct the destination path (used for the directory format and the given
log prefix in any log entries.
Review

Missing close parenthesis.

Missing close parenthesis.
Return a sequence of subprocess.Popen instances for the dump processes ready to spew to a named
pipe. But if this is a dry run, then don't actually dump anything and return an empty sequence.
Also append the the parent directory of the database dumps to the given patterns list, so the
dumps actually get backed up.
'''
dry_run_label = ' (dry run; not actually dumping anything)' if dry_run else ''
logger.info(f'Dumping InfluxDB databases{dry_run_label}')
processes = []
for database in databases:
name = database['name']
dump_filename = dump.make_data_source_dump_filename(
make_dump_path(borgmatic_runtime_directory),
name,
database.get('hostname'),
database.get('port'),
Review

It doesn't look like port is actually in the schema—although see above for idea about how it could be!

It doesn't look like `port` is actually in the schema—although see above for idea about how it could be!
)
logger.debug(
f'Dumping InfluxDB database {name} to {dump_filename}{dry_run_label}',
)
command = build_dump_command(database, dump_filename)
if dry_run:
continue
dump.create_named_pipe_for_dump(dump_filename)
try:
execute_command(command, shell=True, run_to_completion=False)
Review

I think you can probably safely omit shell=True, because I don't see anything in the command building below that relies on shell features (redirection, etc.)

I think you can probably safely omit `shell=True`, because I don't see anything in the command building below that relies on shell features (redirection, etc.)
except subprocess.CalledProcessError as error:
logger.error(f"Command failed: {error}")
Review

Feel free to disagree, but I don't feel like this kind of catch-and-log-and-raise adds a ton of value, especially since it's just displaying the error—which presumably borgmatic already does higher up in the call stack...?

Feel free to disagree, but I don't feel like this kind of catch-and-log-and-raise adds a ton of value, especially since it's just displaying the error—which presumably borgmatic already does higher up in the call stack...?
raise
if not dry_run:
patterns.append(
borgmatic.borg.pattern.Pattern(
os.path.join(borgmatic_runtime_directory, 'influxdb_databases')
)
Review

I think this needs:

source=borgmatic.borg.pattern.Pattern_source.HOOK,

... added in, so that downstream hooks like filesystem snapshotting work properly.

I think this needs: ``` source=borgmatic.borg.pattern.Pattern_source.HOOK, ``` ... added in, so that downstream hooks like filesystem snapshotting work properly.
)
return processes
def build_dump_command(database, dump_filename):
'''
Return the backup command.
'''
hostname = database.get('dump_hostname', database.get('hostname')) # Prioritize 'dump_hostname'
Review

It doesn't look like dump_hostname is in the schema...? Was it intended to be?

It doesn't look like `dump_hostname` is in the schema...? Was it intended to be?
token = database.get('token')
skip_verify = database.get('skip_verify')
http_debug = database.get('http_debug')
influx_command = tuple(
shlex.quote(part) for part in shlex.split(database.get('influx_command') or 'influx')
)
return (
influx_command
+ ('backup',)
+ (('--skip-verify',) if skip_verify else ())
+ (('--http-debug',) if http_debug else ())
+ (('--host', shlex.quote(str(hostname))) if hostname else ())
+ (('--configs-path', shlex.quote(str(database['configs_path']))) if 'configs_path' in database else ())
+ (('--active-config', shlex.quote(str(database['active_config']))) if 'active_config' in database else ())
+ (('--token', shlex.quote(str(token))) if token else ())
+ (('--org-id', shlex.quote(str(database['org_id']))) if 'org_id' in database else ())
+ (('--org', shlex.quote(str(database['org_name']))) if 'org_name' in database else ())
+ (('--bucket-id', shlex.quote(str(database['bucket_id']))) if 'bucket_id' in database else ())
+ (('--bucket', shlex.quote(str(database['bucket_name']))) if 'bucket_name' in database else ())
)
def remove_data_source_dumps(
databases, config, borgmatic_runtime_directory, dry_run
): # pragma: no cover
'''
Remove all database dump files for this hook regardless of the given databases. Use the
borgmatic_runtime_directory to construct the destination path and the log prefix in any log
entries. If this is a dry run, then don't actually remove anything.
'''
dump.remove_data_source_dumps(
make_dump_path(borgmatic_runtime_directory), 'InfluxDB', dry_run
)
def make_data_source_dump_patterns(
databases, config, borgmatic_runtime_directory, name=None
): # pragma: no cover
'''
Given a sequence of configurations dicts, a configuration dict, a prefix to log with, the
borgmatic runtime directory, and a database name to match, return the corresponding glob
patterns to match the database dump in an archive.
'''
return (
dump.make_data_source_dump_filename(make_dump_path('borgmatic'), name, hostname='*'),
dump.make_data_source_dump_filename(
make_dump_path(borgmatic_runtime_directory), name, hostname='*'
),
)
def restore_data_source_dump(
hook_config,
config,
data_source,
dry_run,
extract_process,
connection_params,
borgmatic_runtime_directory,
):
'''
Restore a database from the given extract stream. The database is supplied as a data source
configuration dict, but the given hook configuration is ignored. The given configuration dict is
used to construct the destination path, and the given log prefix is used for any log entries. If
this is a dry run, then don't actually restore anything. Trigger the given active extract
process (an instance of subprocess.Popen) to produce output to consume.
If the extract process is None, then restore the dump from the filesystem rather than from an
extract stream.
'''
dry_run_label = ' (dry run; not actually restoring anything)' if dry_run else ''
dump_filename = dump.make_data_source_dump_filename(
make_dump_path(borgmatic_runtime_directory),
data_source['name'],
Review

I didn't actually see a name option in the schema. There probably needs to be one there for the rest of the database machinery to work as well...?

I didn't actually see a `name` option in the schema. There probably needs to be one there for the rest of the database machinery to work as well...?
data_source.get('hostname'),
Review

Probably needs port as well if it's there in the make_data_source_dump_filename() call in dump_data_sources(). Otherwise the filenames won't match.

Probably needs `port` as well if it's there in the `make_data_source_dump_filename()` call in `dump_data_sources()`. Otherwise the filenames won't match.
)
restore_command = build_restore_command(
extract_process, data_source, dump_filename, connection_params
)
logger.debug(f"Restoring InfluxDB database {data_source['name']}{dry_run_label}")
if dry_run:
return
# Don't give Borg local path so as to error on warnings, as "borg extract" only gives a warning
# if the restore paths don't exist in the archive.
execute_command_with_processes(
restore_command,
[extract_process] if extract_process else [],
output_log_level=logging.DEBUG,
input_file=extract_process.stdout if extract_process else None,
)
def build_restore_command(extract_process, database, dump_filename, connection_params):
'''
Return the restore command.
'''
hostname = connection_params['hostname'] or database.get('hostname')
token = connection_params['token'] or database.get('token')
org_id = database.get('org_id')
org_name = database.get('org_name')
bucket_name = database.get('bucket_name')
restore_bucket = database.get('restore_bucket')
new_org = database.get('new_org')
configs_path = database.get('configs_path')
active_config = database.get('active_config')
skip_verify = database.get('skip_verify')
http_debug = database.get('http_debug')
full = database.get('full')
influx_command = tuple(
shlex.quote(part) for part in shlex.split(database.get('influx_command') or 'influx')
)
return (
influx_command
+ ('restore',)
+ (('--host', hostname) if hostname else ())
+ (('--token', token) if token else ())
+ (('--org-id', str(org_id)) if org_id else ())
+ (('--org', org_name) if org_name else ())
+ (('--bucket-id', str(database['bucket_id'])) if 'bucket_id' in database else ())
+ (('--bucket', bucket_name) if bucket_name else ())
+ (('--new-bucket', restore_bucket) if restore_bucket else ())
+ (('--new-org', new_org) if new_org else ())
+ (('--configs-path', configs_path) if configs_path else ())
+ (('--active-config', active_config) if active_config else ())
+ (('--skip-verify',) if skip_verify else ())
+ (('--http-debug',) if http_debug else ())
+ (('--full',) if full else ())
+ (dump_filename,)
)

View File

@@ -0,0 +1,88 @@
import pytest
from flexmock import flexmock
from borgmatic.hooks.data_source import influxdb as module
def test_make_dump_path_creates_correct_path():
assert module.make_dump_path('/tmp') == '/tmp/influxdb_databases'
def test_build_dump_command_creates_correct_command():
database = {
'name': 'testdb',
'hostname': 'localhost',
'token': 'testtoken',
'skip_verify': True,
'http_debug': False,
'influx_command': 'custom_influx',
}
dump_filename = '/tmp/dumpfile'
command = module.build_dump_command(database, dump_filename)
assert command == (
'custom_influx',
'backup',
'--skip-verify',
'--host',
'localhost',
'--token',
'testtoken',
)
def test_dump_data_sources_creates_named_pipe_and_executes_command():
databases = [{'name': 'testdb', 'hostname': 'localhost'}]
flexmock(module.dump).should_receive('make_data_source_dump_filename').and_return('/tmp/dumpfile')
flexmock(module.dump).should_receive('create_named_pipe_for_dump').once()
flexmock(module).should_receive('build_dump_command').and_return(('influx', 'backup'))
flexmock(module).should_receive('execute_command').with_args(
('influx', 'backup'), shell=True, run_to_completion=False
).once()
processes = module.dump_data_sources(
databases,
config={},
config_paths=[],
borgmatic_runtime_directory='/tmp',
patterns=[],
dry_run=False,
)
assert len(processes) == 0 # No subprocess.Popen instances are returned.
def test_restore_data_source_dump_executes_restore_command():
data_source = {'name': 'testdb', 'hostname': 'localhost'}
connection_params = {'hostname': 'restorehost', 'token': 'restoretoken'}
extract_process = flexmock(stdout=flexmock())
flexmock(module).should_receive('build_restore_command').and_return(('influx', 'restore'))
flexmock(module).should_receive('execute_command_with_processes').with_args(
('influx', 'restore'),
[extract_process],
output_log_level=module.logging.DEBUG,
input_file=extract_process.stdout,
).once()
module.restore_data_source_dump(
hook_config={},
config={},
data_source=data_source,
dry_run=False,
extract_process=extract_process,
connection_params=connection_params,
borgmatic_runtime_directory='/tmp',
)
def test_remove_data_source_dumps_removes_dumps():
flexmock(module.dump).should_receive('remove_data_source_dumps').with_args(
'/tmp/influxdb_databases', 'InfluxDB', False
).once()
module.remove_data_source_dumps(
databases=[],
config={},
borgmatic_runtime_directory='/tmp',
dry_run=False,
)