borgmatic/borgmatic/execute.py

44 lines
1.4 KiB
Python

import logging
import subprocess
logger = logging.getLogger(__name__)
def execute_and_log_output(full_command, output_log_level, shell):
process = subprocess.Popen(
full_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=shell
)
while process.poll() is None:
line = process.stdout.readline().rstrip().decode()
if not line:
continue
if line.startswith('borg: error:'):
logger.error(line)
else:
logger.log(output_log_level, line)
remaining_output = process.stdout.read().rstrip().decode()
if remaining_output: # pragma: no cover
logger.log(output_log_level, remaining_output)
exit_code = process.poll()
if exit_code != 0:
raise subprocess.CalledProcessError(exit_code, full_command)
def execute_command(full_command, output_log_level=logging.INFO, shell=False):
'''
Execute the given command (a sequence of command/argument strings) and log its output at the
given log level. If output log level is None, instead capture and return the output. If
shell is True, execute the command within a shell.
'''
logger.debug(' '.join(full_command))
if output_log_level is None:
output = subprocess.check_output(full_command, shell=shell)
return output.decode() if output is not None else None
else:
execute_and_log_output(full_command, output_log_level, shell=shell)