borgmatic/borgmatic/hooks/loki.py

155 lines
4.3 KiB
Python
Raw Permalink Normal View History

2023-08-22 01:13:39 +00:00
import json
2023-08-22 11:40:05 +00:00
import logging
import os
2023-08-22 01:13:39 +00:00
import platform
2023-08-22 11:40:05 +00:00
import time
import requests
2023-08-22 01:13:39 +00:00
from borgmatic.hooks import monitor
logger = logging.getLogger(__name__)
MONITOR_STATE_TO_LOKI = {
2023-08-22 01:13:39 +00:00
monitor.State.START: 'Started',
monitor.State.FINISH: 'Finished',
monitor.State.FAIL: 'Failed',
}
# Threshold at which logs get flushed to loki
MAX_BUFFER_LINES = 100
2023-08-22 11:40:05 +00:00
class Loki_log_buffer:
2023-08-22 01:13:39 +00:00
'''
A log buffer that allows to output the logs as loki requests in json. Allows
adding labels to the log stream and takes care of communication with loki.
2023-08-22 01:13:39 +00:00
'''
2023-08-22 11:40:05 +00:00
2023-08-22 01:13:39 +00:00
def __init__(self, url, dry_run):
self.url = url
self.dry_run = dry_run
self.root = {'streams': [{'stream': {}, 'values': []}]}
2023-08-22 01:13:39 +00:00
def add_value(self, value):
'''
Add a log entry to the stream.
'''
2023-08-22 01:13:39 +00:00
timestamp = str(time.time_ns())
2023-08-22 11:40:05 +00:00
self.root['streams'][0]['values'].append((timestamp, value))
2023-08-22 01:13:39 +00:00
def add_label(self, label, value):
'''
Add a label to the logging stream.
'''
2023-08-22 11:40:05 +00:00
self.root['streams'][0]['stream'][label] = value
2023-08-22 01:13:39 +00:00
def to_request(self):
2023-08-22 01:13:39 +00:00
return json.dumps(self.root)
def __len__(self):
'''
Gets the number of lines currently in the buffer.
'''
2023-08-22 11:40:05 +00:00
return len(self.root['streams'][0]['values'])
2023-08-22 01:13:39 +00:00
def flush(self):
if self.dry_run:
# Just empty the buffer and skip
2023-08-22 11:40:05 +00:00
self.root['streams'][0]['values'] = []
logger.info('Skipped uploading logs to loki due to dry run')
2023-08-22 01:13:39 +00:00
return
2023-08-22 01:13:39 +00:00
if len(self) == 0:
# Skip as there are not logs to send yet
2023-08-22 01:13:39 +00:00
return
request_body = self.to_request()
2023-08-22 11:40:05 +00:00
self.root['streams'][0]['values'] = []
request_header = {'Content-Type': 'application/json'}
2023-08-22 01:13:39 +00:00
try:
2023-08-22 11:40:05 +00:00
result = requests.post(self.url, headers=request_header, data=request_body, timeout=5)
result.raise_for_status()
2023-08-22 01:13:39 +00:00
except requests.RequestException:
logger.warning('Failed to upload logs to loki')
2023-08-22 11:40:05 +00:00
2023-08-22 01:13:39 +00:00
class Loki_log_handler(logging.Handler):
2023-08-22 01:13:39 +00:00
'''
A log handler that sends logs to loki.
2023-08-22 01:13:39 +00:00
'''
2023-08-22 11:40:05 +00:00
2023-08-22 01:13:39 +00:00
def __init__(self, url, dry_run):
super().__init__()
self.buffer = Loki_log_buffer(url, dry_run)
2023-08-22 01:13:39 +00:00
def emit(self, record):
'''
Add a log record from the logging module to the stream.
'''
2023-08-22 01:13:39 +00:00
self.raw(record.getMessage())
def add_label(self, key, value):
'''
Add a label to the logging stream.
'''
2023-08-22 01:13:39 +00:00
self.buffer.add_label(key, value)
def raw(self, msg):
'''
Add an arbitrary string as a log entry to the stream.
'''
2023-08-22 01:13:39 +00:00
self.buffer.add_value(msg)
2023-08-22 01:13:39 +00:00
if len(self.buffer) > MAX_BUFFER_LINES:
self.buffer.flush()
def flush(self):
'''
Send the logs to loki and empty the buffer.
'''
self.buffer.flush()
2023-08-22 01:13:39 +00:00
2023-08-22 11:40:05 +00:00
2023-08-22 01:13:39 +00:00
def initialize_monitor(hook_config, config, config_filename, monitoring_log_level, dry_run):
'''
Add a handler to the root logger to regularly send the logs to loki.
2023-08-22 01:13:39 +00:00
'''
url = hook_config.get('url')
loki = Loki_log_handler(url, dry_run)
2023-08-22 11:40:05 +00:00
for key, value in hook_config.get('labels').items():
if value == '__hostname':
loki.add_label(key, platform.node())
elif value == '__config':
loki.add_label(key, os.path.basename(config_filename))
2023-08-22 11:40:05 +00:00
elif value == '__config_path':
loki.add_label(key, config_filename)
2023-08-22 01:13:39 +00:00
else:
2023-08-22 11:40:05 +00:00
loki.add_label(key, value)
2023-08-22 01:13:39 +00:00
logging.getLogger().addHandler(loki)
2023-08-22 11:40:05 +00:00
2023-08-22 01:13:39 +00:00
def ping_monitor(hook_config, config, config_filename, state, monitoring_log_level, dry_run):
'''
Add an entry to the loki logger with the current state.
2023-08-22 01:13:39 +00:00
'''
for handler in tuple(logging.getLogger().handlers):
if isinstance(handler, Loki_log_handler):
if state in MONITOR_STATE_TO_LOKI.keys():
handler.raw(f'{config_filename}: {MONITOR_STATE_TO_LOKI[state]} backup')
2023-08-22 01:13:39 +00:00
2023-08-22 11:40:05 +00:00
2023-08-22 01:13:39 +00:00
def destroy_monitor(hook_config, config, config_filename, monitoring_log_level, dry_run):
'''
Remove the monitor handler that was added to the root logger.
'''
logger = logging.getLogger()
2023-08-22 01:13:39 +00:00
for handler in tuple(logger.handlers):
if isinstance(handler, Loki_log_handler):
2023-08-22 01:13:39 +00:00
handler.flush()
logger.removeHandler(handler)