Use a namedtuple for logical volume metadata (#80).

This commit is contained in:
2024-12-03 11:12:27 -08:00
parent 9b77de3d66
commit 8c4b899a13
2 changed files with 31 additions and 23 deletions

View File

@@ -1,3 +1,4 @@
import collections
import glob
import json
import logging
@@ -20,6 +21,9 @@ def use_streaming(hook_config, config, log_prefix): # pragma: no cover
BORGMATIC_SNAPSHOT_PREFIX = 'borgmatic-'
Logical_volume = collections.namedtuple(
'Logical_volume', ('name', 'device_path', 'mount_point', 'contained_source_directories')
)
def get_logical_volumes(lsblk_command, source_directories=None):
@@ -31,8 +35,7 @@ def get_logical_volumes(lsblk_command, source_directories=None):
If source directories is None, include all logical volume mounts points, not just those in
source directories.
Return the result as a sequence of (device name, device path, mount point, sequence of contained
source directories) tuples.
Return the result as a sequence of Logical_volume instances.
'''
try:
devices_info = json.loads(
@@ -50,12 +53,13 @@ def get_logical_volumes(lsblk_command, source_directories=None):
except json.JSONDecodeError as error:
raise ValueError('Invalid {lsblk_command} JSON output: {error}')
candidate_source_directories = set(source_directories or ())
try:
return tuple(
(device['name'], device['path'], device['mountpoint'], contained_source_directories)
Logical_volume(
device['name'], device['path'], device['mountpoint'], contained_source_directories
)
for device in devices_info['blockdevices']
if device['mountpoint'] and device['type'] == 'lvm'
for contained_source_directories in (
@@ -151,22 +155,17 @@ def dump_data_sources(
if not requested_logical_volumes:
logger.warning(f'{log_prefix}: No LVM logical volumes found to snapshot{dry_run_label}')
for (
device_name,
device_path,
mount_point,
contained_source_directories,
) in requested_logical_volumes:
snapshot_name = f'{device_name}_{snapshot_suffix}'
for logical_volume in requested_logical_volumes:
snapshot_name = f'{logical_volume.name}_{snapshot_suffix}'
logger.debug(
f'{log_prefix}: Creating LVM snapshot {snapshot_name} of {mount_point}{dry_run_label}'
f'{log_prefix}: Creating LVM snapshot {snapshot_name} of {logical_volume.mount_point}{dry_run_label}'
)
if not dry_run:
snapshot_logical_volume(
hook_config.get('lvcreate_command', 'lvcreate'),
snapshot_name,
device_path,
logical_volume.device_path,
hook_config.get('snapshot_size', DEFAULT_SNAPSHOT_SIZE),
)
@@ -183,7 +182,7 @@ def dump_data_sources(
snapshot_mount_path = os.path.join(
normalized_runtime_directory,
'lvm_snapshots',
mount_point.lstrip(os.path.sep),
logical_volume.mount_point.lstrip(os.path.sep),
)
logger.debug(
@@ -199,7 +198,7 @@ def dump_data_sources(
# Update the path for each contained source directory, so Borg sees it within the
# mounted snapshot.
for source_directory in contained_source_directories:
for source_directory in logical_volume.contained_source_directories:
try:
source_directories.remove(source_directory)
except ValueError:
@@ -319,8 +318,10 @@ def remove_data_source_dumps(hook_config, config, log_prefix, borgmatic_runtime_
if not dry_run:
shutil.rmtree(snapshots_directory, ignore_errors=True)
for _, _, mount_point, _ in logical_volumes:
snapshot_mount_path = os.path.join(snapshots_directory, mount_point.lstrip(os.path.sep))
for logical_volume in logical_volumes:
snapshot_mount_path = os.path.join(
snapshots_directory, logical_volume.mount_point.lstrip(os.path.sep)
)
if not os.path.isdir(snapshot_mount_path):
continue

View File

@@ -23,7 +23,9 @@ BORGMATIC_SNAPSHOT_PREFIX = 'borgmatic-'
BORGMATIC_USER_PROPERTY = 'org.torsion.borgmatic:backup'
Dataset = collections.namedtuple('Dataset', ('name', 'mount_point', 'user_property_value', 'contained_source_directories'))
Dataset = collections.namedtuple(
'Dataset', ('name', 'mount_point', 'user_property_value', 'contained_source_directories')
)
def get_datasets_to_backup(zfs_command, source_directories):
@@ -69,7 +71,12 @@ def get_datasets_to_backup(zfs_command, source_directories):
return sorted(
tuple(
Dataset(dataset.name, dataset.mount_point, dataset.user_property_value, contained_source_directories)
Dataset(
dataset.name,
dataset.mount_point,
dataset.user_property_value,
contained_source_directories,
)
for dataset in datasets
for contained_source_directories in (
borgmatic.hooks.data_source.snapshot.get_contained_directories(
@@ -99,9 +106,7 @@ def get_all_dataset_mount_points(zfs_command):
)
try:
return tuple(
sorted(line.rstrip() for line in list_output.splitlines())
)
return tuple(sorted(line.rstrip() for line in list_output.splitlines()))
except ValueError:
raise ValueError('Invalid {zfs_command} list output')
@@ -178,7 +183,9 @@ def dump_data_sources(
for dataset in requested_datasets:
full_snapshot_name = f'{dataset.name}@{snapshot_name}'
logger.debug(f'{log_prefix}: Creating ZFS snapshot {full_snapshot_name} of {dataset.mount_point}{dry_run_label}')
logger.debug(
f'{log_prefix}: Creating ZFS snapshot {full_snapshot_name} of {dataset.mount_point}{dry_run_label}'
)
if not dry_run:
snapshot_dataset(zfs_command, full_snapshot_name)