Added support for grafana loki #747

Merged
witten merged 5 commits from :main into main 2023-08-25 16:28:20 +00:00
Showing only changes of commit 7e419ec995 - Show all commits

View File

@ -1,5 +1,6 @@
import json
import logging
import os
import platform
import time
@ -9,7 +10,7 @@ from borgmatic.hooks import monitor
logger = logging.getLogger(__name__)
witten marked this conversation as resolved
Review

This should probably be renamed to MONITOR_STATE_TO_LOKI. 😄

This should probably be renamed to `MONITOR_STATE_TO_LOKI`. 😄
MONITOR_STATE_TO_HEALTHCHECKS = {
MONITOR_STATE_TO_LOKI = {
monitor.State.START: 'Started',
monitor.State.FINISH: 'Finished',
monitor.State.FAIL: 'Failed',
@ -19,84 +20,107 @@ MONITOR_STATE_TO_HEALTHCHECKS = {
MAX_BUFFER_LINES = 100
witten marked this conversation as resolved
Review

Code style convention: Uppercase first letter of class names, e.g. Loki_log_buffer.

Code style convention: Uppercase first letter of class names, e.g. `Loki_log_buffer`.
class loki_log_buffer:
class Loki_log_buffer:
'''
A log buffer that allows to output the logs as loki requests in json
A log buffer that allows to output the logs as loki requests in json. Allows
adding labels to the log stream and takes care of communication with loki.
'''
def __init__(self, url, dry_run):
self.url = url
self.dry_run = dry_run
self.root = {}
self.root['streams'] = [{}]
self.root['streams'][0]['stream'] = {}
self.root['streams'][0]['values'] = []
self.root = {'streams': [{'stream': {}, 'values': []}]}
def add_value(self, value):
'''
Add a log entry to the stream.
'''
timestamp = str(time.time_ns())
self.root['streams'][0]['values'].append((timestamp, value))
def add_label(self, label, value):
'''
Add a label to the logging stream.
'''
self.root['streams'][0]['stream'][label] = value
def _to_request(self):
def to_request(self):
return json.dumps(self.root)
witten marked this conversation as resolved
Review

Blank lines after some of these ifs would be nice. Maybe before the try as well. In general I find that helps readability even if it's not required by the language.

Blank lines after some of these `if`s would be nice. Maybe before the `try` as well. In general I find that helps readability even if it's not required by the language.
def __len__(self):
'''
Gets the number of lines currently in the buffer.
'''
return len(self.root['streams'][0]['values'])
def flush(self):
if self.dry_run:
# Just empty the buffer and skip
self.root['streams'][0]['values'] = []
witten marked this conversation as resolved
Review

This code looks good, but it's a little counter-intuitive to me that it would be here. For instance, I think of a buffer as a data structure for storing stuff, not necessarily as a data structure that also implicitly has the side effect of pushing logs to an external service. Maybe I'm just not used to OOP. 😄 I don't feel super strongly or anything, but this might be less surprising if the push to Loki took place elsewhere like in ping_monitor().

I'm guessing part of the reason you're doing it this way though is so that logs get sent to Loki as borgmatic runs rather than all at the end once ping_monitor() is called..? The Healthchecks hook for example only sends logs at the end, but the rationale there is that it's explicitly logging the success/failure status of the backup rather than only logs along the way. So the requirements may be a little different.

Anyway, let me know your thoughts. (And then maybe put some of them into docstrings. 😄)

This code looks good, but it's a little counter-intuitive to me that it would be here. For instance, I think of a buffer as a data structure for storing stuff, not necessarily as a data structure that also implicitly has the side effect of pushing logs to an external service. Maybe I'm just not used to OOP. 😄 I don't feel super strongly or anything, but this might be less surprising if the push to Loki took place elsewhere like in `ping_monitor()`. I'm guessing part of the reason you're doing it this way though is so that logs get sent to Loki as borgmatic runs rather than all at the end once `ping_monitor()` is called..? The Healthchecks hook for example only sends logs at the end, but the rationale there is that it's explicitly logging the success/failure status of the backup rather than only logs along the way. So the requirements may be a little different. Anyway, let me know your thoughts. (And then maybe put some of them into docstrings. 😄)
Review

Well I am trying to not hit the max request size limits of a lot of loki instances that run behind e.g. nginx. I think it is much better for large volumes of logs to be pushed incrementally instead of pushing it as one who knows how big request in the end.

Well I am trying to not hit the max request size limits of a lot of loki instances that run behind e.g. nginx. I think it is much better for large volumes of logs to be pushed incrementally instead of pushing it as one who knows how big request in the end.
Review

Gotcha. The Healthchecks hook "solves" that particular problem by reverse truncating the logs so that older messages are not sent if the logs get too big by the time ping_monitor() is called. However I can see why you might not want to do that in this case, since loki seems much more about log aggregation than simply tracking service status.

Gotcha. The Healthchecks hook "solves" that particular problem by reverse truncating the logs so that older messages are not sent if the logs get too big by the time `ping_monitor()` is called. However I can see why you might not want to do that in this case, since loki seems much more about log aggregation than simply tracking service status.
logger.info('Skipped uploading logs to loki due to dry run')
return
if len(self) == 0:
# Skip as there are not logs to send yet
return
request_body = self._to_request()
request_body = self.to_request()
self.root['streams'][0]['values'] = []
request_header = {'Content-Type': 'application/json'}
try:
result = requests.post(self.url, headers=request_header, data=request_body, timeout=5)
result.raise_for_status()
except requests.RequestException:
logger.warn('Failed to upload logs to loki')
logger.warning('Failed to upload logs to loki')
class loki_log_handeler(logging.Handler):
class Loki_log_handler(logging.Handler):
'''
A log handler that sends logs to loki
A log handler that sends logs to loki.
'''
def __init__(self, url, dry_run):
super().__init__()
self.buffer = loki_log_buffer(url, dry_run)
self.buffer = Loki_log_buffer(url, dry_run)
def emit(self, record):
'''
Add a log record from the logging module to the stream.
'''
self.raw(record.getMessage())
IBims1NicerTobi marked this conversation as resolved
Review

Just a code style convention nit: Period at the end of sentences in docstrings.

Just a code style convention nit: Period at the end of sentences in docstrings.
def add_label(self, key, value):
'''
Add a label to the logging stream.
'''
self.buffer.add_label(key, value)
def raw(self, msg):
'''
Add an arbitrary string as a log entry to the stream.
'''
self.buffer.add_value(msg)
if len(self.buffer) > MAX_BUFFER_LINES:
self.buffer.flush()
def flush(self):
if len(self.buffer) > 0:
self.buffer.flush()
'''
Send the logs to loki and empty the buffer.
'''
self.buffer.flush()
def initialize_monitor(hook_config, config, config_filename, monitoring_log_level, dry_run):
'''
Add a handler to the root logger to regularly send the logs to loki
Add a handler to the root logger to regularly send the logs to loki.
'''
url = hook_config.get('url')
loki = loki_log_handeler(url, dry_run)
loki = Loki_log_handler(url, dry_run)
for key, value in hook_config.get('labels').items():
if value == '__hostname':
loki.add_label(key, platform.node())
elif value == '__config':
loki.add_label(key, config_filename.split('/')[-1])
loki.add_label(key, os.path.basename(config_filename))
elif value == '__config_path':
loki.add_label(key, config_filename)
else:
@ -106,13 +130,14 @@ def initialize_monitor(hook_config, config, config_filename, monitoring_log_leve
def ping_monitor(hook_config, config, config_filename, state, monitoring_log_level, dry_run):
'''
Adds an entry to the loki logger with the current state
Add an entry to the loki logger with the current state.
'''
if not dry_run:
for handler in tuple(logging.getLogger().handlers):
if isinstance(handler, loki_log_handeler):
if state in MONITOR_STATE_TO_HEALTHCHECKS.keys():
handler.raw(f'{config_filename} {MONITOR_STATE_TO_HEALTHCHECKS[state]} backup')
if dry_run:
return
for handler in tuple(logging.getLogger().handlers):
if isinstance(handler, Loki_log_handler):
if state in MONITOR_STATE_TO_LOKI.keys():
handler.raw(f'{config_filename}: {MONITOR_STATE_TO_LOKI[state]} backup')
def destroy_monitor(hook_config, config, config_filename, monitoring_log_level, dry_run):
@ -121,6 +146,6 @@ def destroy_monitor(hook_config, config, config_filename, monitoring_log_level,
'''
logger = logging.getLogger()
for handler in tuple(logger.handlers):
if isinstance(handler, loki_log_handeler):
if isinstance(handler, Loki_log_handler):
handler.flush()
logger.removeHandler(handler)