borgmatic/borgmatic/execute.py

46 lines
1.4 KiB
Python

import logging
import subprocess
from borgmatic.logger import get_logger
logger = get_logger(__name__)
def execute_and_log_output(full_command, output_log_level, shell):
process = subprocess.Popen(
full_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=shell
)
while process.poll() is None:
line = process.stdout.readline().rstrip().decode()
if not line:
continue
if line.startswith('borg: error:'):
logger.error(line)
else:
logger.log(output_log_level, line)
remaining_output = process.stdout.read().rstrip().decode()
if remaining_output:
logger.info(remaining_output)
exit_code = process.poll()
if exit_code != 0:
raise subprocess.CalledProcessError(exit_code, full_command)
def execute_command(full_command, output_log_level=logging.INFO, shell=False):
'''
Execute the given command (a sequence of command/argument strings) and log its output at the
given log level. If output log level is None, instead capture and return the output. If
shell is True, execute the command within a shell.
'''
logger.debug(' '.join(full_command))
if output_log_level is None:
output = subprocess.check_output(full_command, shell=shell)
return output.decode() if output is not None else None
else:
execute_and_log_output(full_command, output_log_level, shell=shell)