diff --git a/borgmatic/config/schema.yaml b/borgmatic/config/schema.yaml index 752c75ab..a2ba64eb 100644 --- a/borgmatic/config/schema.yaml +++ b/borgmatic/config/schema.yaml @@ -1403,3 +1403,33 @@ properties: Configuration for a monitoring integration with Crunhub. Create an account at https://cronhub.io if you'd like to use this service. See borgmatic monitoring documentation for details. + loki: + type: object + required: ['url', 'labels'] + additionalProperties: false + properties: + url: + type: string + description: | + Grafana loki log URL to notify when a backup begins, + ends, or fails. + example: "http://localhost:3100/loki/api/v1/push" + labels: + type: object + additionalProperties: + type: string + description: | + Allows setting custom labels for the logging stream. At + least one label is required. "__hostname" gets replaced by + the machine hostname automatically. "__config" gets replaced + by just the name of the configuration file. "__config_path" + gets replaced by the full path of the configuration file. + example: + app: "borgmatic" + config: "__config" + hostname: "__hostname" + description: | + Configuration for a monitoring integration with Grafana loki. You + can send the logs to a self-hosted instance or create an account at + https://grafana.com/auth/sign-up/create-user. See borgmatic + monitoring documentation for details. diff --git a/borgmatic/hooks/dispatch.py b/borgmatic/hooks/dispatch.py index 0c003e33..24793b5a 100644 --- a/borgmatic/hooks/dispatch.py +++ b/borgmatic/hooks/dispatch.py @@ -4,6 +4,7 @@ from borgmatic.hooks import ( cronhub, cronitor, healthchecks, + loki, mariadb, mongodb, mysql, @@ -26,6 +27,7 @@ HOOK_NAME_TO_MODULE = { 'pagerduty': pagerduty, 'postgresql_databases': postgresql, 'sqlite_databases': sqlite, + 'loki': loki, } diff --git a/borgmatic/hooks/loki.py b/borgmatic/hooks/loki.py new file mode 100644 index 00000000..9a5dae8f --- /dev/null +++ b/borgmatic/hooks/loki.py @@ -0,0 +1,149 @@ +import json +import logging +import os +import platform +import time + +import requests + +from borgmatic.hooks import monitor + +logger = logging.getLogger(__name__) + +MONITOR_STATE_TO_LOKI = { + monitor.State.START: 'Started', + monitor.State.FINISH: 'Finished', + monitor.State.FAIL: 'Failed', +} + +# Threshold at which logs get flushed to loki +MAX_BUFFER_LINES = 100 + + +class Loki_log_buffer: + ''' + A log buffer that allows to output the logs as loki requests in json. Allows + adding labels to the log stream and takes care of communication with loki. + ''' + + def __init__(self, url, dry_run): + self.url = url + self.dry_run = dry_run + self.root = {'streams': [{'stream': {}, 'values': []}]} + + def add_value(self, value): + ''' + Add a log entry to the stream. + ''' + timestamp = str(time.time_ns()) + self.root['streams'][0]['values'].append((timestamp, value)) + + def add_label(self, label, value): + ''' + Add a label to the logging stream. + ''' + self.root['streams'][0]['stream'][label] = value + + def to_request(self): + return json.dumps(self.root) + + def __len__(self): + ''' + Gets the number of lines currently in the buffer. + ''' + return len(self.root['streams'][0]['values']) + + def flush(self): + if self.dry_run: + # Just empty the buffer and skip + self.root['streams'][0]['values'] = [] + logger.info('Skipped uploading logs to loki due to dry run') + return + + if len(self) == 0: + # Skip as there are not logs to send yet + return + + request_body = self.to_request() + self.root['streams'][0]['values'] = [] + request_header = {'Content-Type': 'application/json'} + try: + result = requests.post(self.url, headers=request_header, data=request_body, timeout=5) + result.raise_for_status() + except requests.RequestException: + logger.warning('Failed to upload logs to loki') + + +class Loki_log_handler(logging.Handler): + ''' + A log handler that sends logs to loki. + ''' + + def __init__(self, url, dry_run): + super().__init__() + self.buffer = Loki_log_buffer(url, dry_run) + + def emit(self, record): + ''' + Add a log record from the logging module to the stream. + ''' + self.raw(record.getMessage()) + + def add_label(self, key, value): + ''' + Add a label to the logging stream. + ''' + self.buffer.add_label(key, value) + + def raw(self, msg): + ''' + Add an arbitrary string as a log entry to the stream. + ''' + self.buffer.add_value(msg) + if len(self.buffer) > MAX_BUFFER_LINES: + self.buffer.flush() + + def flush(self): + ''' + Send the logs to loki and empty the buffer. + ''' + self.buffer.flush() + + +def initialize_monitor(hook_config, config, config_filename, monitoring_log_level, dry_run): + ''' + Add a handler to the root logger to regularly send the logs to loki. + ''' + url = hook_config.get('url') + loki = Loki_log_handler(url, dry_run) + for key, value in hook_config.get('labels').items(): + if value == '__hostname': + loki.add_label(key, platform.node()) + elif value == '__config': + loki.add_label(key, os.path.basename(config_filename)) + elif value == '__config_path': + loki.add_label(key, config_filename) + else: + loki.add_label(key, value) + logging.getLogger().addHandler(loki) + + +def ping_monitor(hook_config, config, config_filename, state, monitoring_log_level, dry_run): + ''' + Add an entry to the loki logger with the current state. + ''' + for handler in tuple(logging.getLogger().handlers): + if isinstance(handler, Loki_log_handler): + if state in MONITOR_STATE_TO_LOKI.keys(): + handler.raw(f'{config_filename}: {MONITOR_STATE_TO_LOKI[state]} backup') + + +def destroy_monitor(hook_config, config, config_filename, monitoring_log_level, dry_run): + ''' + Remove the monitor handler that was added to the root logger. + ''' + logger = logging.getLogger() + for handler in tuple(logger.handlers): + if isinstance(handler, Loki_log_handler): + handler.flush() + logger.removeHandler(handler) diff --git a/borgmatic/hooks/monitor.py b/borgmatic/hooks/monitor.py index c0168178..118639f5 100644 --- a/borgmatic/hooks/monitor.py +++ b/borgmatic/hooks/monitor.py @@ -1,6 +1,6 @@ from enum import Enum -MONITOR_HOOK_NAMES = ('healthchecks', 'cronitor', 'cronhub', 'pagerduty', 'ntfy') +MONITOR_HOOK_NAMES = ('healthchecks', 'cronitor', 'cronhub', 'pagerduty', 'ntfy', 'loki') class State(Enum): diff --git a/tests/integration/hooks/test_loki.py b/tests/integration/hooks/test_loki.py new file mode 100644 index 00000000..3eac29d3 --- /dev/null +++ b/tests/integration/hooks/test_loki.py @@ -0,0 +1,82 @@ +import logging +import platform + +from flexmock import flexmock + +from borgmatic.hooks import loki as module + + +def test_log_handler_label_replacment(): + ''' + Assert that label placeholders get replaced + ''' + hook_config = { + 'url': 'http://localhost:3100/loki/api/v1/push', + 'labels': {'hostname': '__hostname', 'config': '__config', 'config_full': '__config_path'}, + } + config_filename = '/mock/path/test.yaml' + dry_run = True + module.initialize_monitor(hook_config, flexmock(), config_filename, flexmock(), dry_run) + for handler in tuple(logging.getLogger().handlers): + if isinstance(handler, module.Loki_log_handler): + assert handler.buffer.root['streams'][0]['stream']['hostname'] == platform.node() + assert handler.buffer.root['streams'][0]['stream']['config'] == 'test.yaml' + assert handler.buffer.root['streams'][0]['stream']['config_full'] == config_filename + return + assert False + + +def test_initalize_adds_log_handler(): + ''' + Assert that calling initialize_monitor adds our logger to the root logger + ''' + hook_config = {'url': 'http://localhost:3100/loki/api/v1/push', 'labels': {'app': 'borgmatic'}} + module.initialize_monitor( + hook_config, + flexmock(), + config_filename='test.yaml', + monitoring_log_level=flexmock(), + dry_run=True, + ) + for handler in tuple(logging.getLogger().handlers): + if isinstance(handler, module.Loki_log_handler): + return + assert False + + +def test_ping_adds_log_message(): + ''' + Assert that calling ping_monitor adds a message to our logger + ''' + hook_config = {'url': 'http://localhost:3100/loki/api/v1/push', 'labels': {'app': 'borgmatic'}} + config_filename = 'test.yaml' + dry_run = True + module.initialize_monitor(hook_config, flexmock(), config_filename, flexmock(), dry_run) + module.ping_monitor( + hook_config, flexmock(), config_filename, module.monitor.State.FINISH, flexmock(), dry_run + ) + for handler in tuple(logging.getLogger().handlers): + if isinstance(handler, module.Loki_log_handler): + assert any( + map( + lambda log: log + == f'{config_filename}: {module.MONITOR_STATE_TO_LOKI[module.monitor.State.FINISH]} backup', + map(lambda x: x[1], handler.buffer.root['streams'][0]['values']), + ) + ) + return + assert False + + +def test_log_handler_gets_removed(): + ''' + Assert that destroy_monitor removes the logger from the root logger + ''' + hook_config = {'url': 'http://localhost:3100/loki/api/v1/push', 'labels': {'app': 'borgmatic'}} + config_filename = 'test.yaml' + dry_run = True + module.initialize_monitor(hook_config, flexmock(), config_filename, flexmock(), dry_run) + module.destroy_monitor(hook_config, flexmock(), config_filename, flexmock(), dry_run) + for handler in tuple(logging.getLogger().handlers): + if isinstance(handler, module.Loki_log_handler): + assert False diff --git a/tests/unit/hooks/test_loki.py b/tests/unit/hooks/test_loki.py new file mode 100644 index 00000000..33e35ecd --- /dev/null +++ b/tests/unit/hooks/test_loki.py @@ -0,0 +1,98 @@ +import json + +import requests +from flexmock import flexmock + +from borgmatic.hooks import loki as module + + +def test_log_handler_gets_labels(): + ''' + Assert that adding labels works + ''' + buffer = module.Loki_log_buffer(flexmock(), False) + buffer.add_label('test', 'label') + assert buffer.root['streams'][0]['stream']['test'] == 'label' + buffer.add_label('test2', 'label2') + assert buffer.root['streams'][0]['stream']['test2'] == 'label2' + + +def test_log_buffer_gets_raw(): + ''' + Assert that adding values to the log buffer increases it's length + ''' + buffer = module.Loki_log_buffer(flexmock(), False) + assert len(buffer) == 0 + buffer.add_value('Some test log line') + assert len(buffer) == 1 + buffer.add_value('Another test log line') + assert len(buffer) == 2 + + +def test_log_buffer_gets_log_messages(): + ''' + Assert that adding log records works + ''' + handler = module.Loki_log_handler(flexmock(), False) + handler.emit(flexmock(getMessage=lambda: 'Some test log line')) + assert len(handler.buffer) == 1 + + +def test_log_buffer_json(): + ''' + Assert that the buffer correctly serializes when empty + ''' + buffer = module.Loki_log_buffer(flexmock(), False) + assert json.loads(buffer.to_request()) == json.loads('{"streams":[{"stream":{},"values":[]}]}') + + +def test_log_buffer_json_labels(): + ''' + Assert that the buffer correctly serializes with labels + ''' + buffer = module.Loki_log_buffer(flexmock(), False) + buffer.add_label('test', 'label') + assert json.loads(buffer.to_request()) == json.loads( + '{"streams":[{"stream":{"test": "label"},"values":[]}]}' + ) + + +def test_log_buffer_json_log_lines(): + ''' + Assert that log lines end up in the correct place in the log buffer + ''' + buffer = module.Loki_log_buffer(flexmock(), False) + buffer.add_value('Some test log line') + assert json.loads(buffer.to_request())['streams'][0]['values'][0][1] == 'Some test log line' + + +def test_log_handler_post(): + ''' + Assert that the flush function sends a post request after a certain limit + ''' + handler = module.Loki_log_handler(flexmock(), False) + flexmock(module.requests).should_receive('post').and_return( + flexmock(raise_for_status=lambda: '') + ).once() + for num in range(int(module.MAX_BUFFER_LINES * 1.5)): + handler.raw(num) + + +def test_log_handler_post_failiure(): + ''' + Assert that the flush function catches request exceptions + ''' + handler = module.Loki_log_handler(flexmock(), False) + flexmock(module.requests).should_receive('post').and_return( + flexmock(raise_for_status=lambda: (_ for _ in ()).throw(requests.RequestException())) + ).once() + for num in range(int(module.MAX_BUFFER_LINES * 1.5)): + handler.raw(num) + + +def test_log_handler_empty_flush_noop(): + ''' + Test that flushing an empty buffer does indeed nothing + ''' + handler = module.Loki_log_handler(flexmock(), False) + handler.flush()