Compare commits

...

3 Commits

Author SHA1 Message Date
cadamswaite
976a877a25 Formatting 2021-11-14 22:37:42 +00:00
cadamswaite
b4117916b8 Add timeout and tests 2021-11-14 22:15:22 +00:00
cadamswaite
19cad89978 Add some tests for retry logic 2021-11-14 21:35:23 +00:00
3 changed files with 169 additions and 6 deletions

View File

@ -4,6 +4,7 @@ import json
import logging import logging
import os import os
import sys import sys
import time
from queue import Queue from queue import Queue
from subprocess import CalledProcessError from subprocess import CalledProcessError
@ -54,6 +55,7 @@ def run_configuration(config_filename, config, arguments):
local_path = location.get('local_path', 'borg') local_path = location.get('local_path', 'borg')
remote_path = location.get('remote_path') remote_path = location.get('remote_path')
retries = storage.get('retries', 0) retries = storage.get('retries', 0)
retry_timeout = storage.get('retry_timeout', 0)
borg_environment.initialize(storage) borg_environment.initialize(storage)
encountered_error = None encountered_error = None
error_repository = '' error_repository = ''
@ -124,12 +126,14 @@ def run_configuration(config_filename, config, arguments):
if not encountered_error: if not encountered_error:
repo_queue = Queue() repo_queue = Queue()
for repo in location['repositories']: for repo in location['repositories']:
repo_queue.put( repo_queue.put((repo, 0),)
(repo, 0),
)
while not repo_queue.empty(): while not repo_queue.empty():
repository_path, retry_num = repo_queue.get() repository_path, retry_num = repo_queue.get()
timeout = retry_num * retry_timeout
if timeout:
logger.warning(f'Sleeping {timeout}s before next retry')
time.sleep(timeout)
try: try:
yield from run_actions( yield from run_actions(
arguments=arguments, arguments=arguments,
@ -147,9 +151,7 @@ def run_configuration(config_filename, config, arguments):
'{}: Error running actions for repository'.format(repository_path), error '{}: Error running actions for repository'.format(repository_path), error
) )
if retry_num < retries: if retry_num < retries:
repo_queue.put( repo_queue.put((repository_path, retry_num + 1),)
(repository_path, retry_num + 1),
)
logger.warning(f'Retrying.. attempt {retry_num + 1}/{retries}') logger.warning(f'Retrying.. attempt {retry_num + 1}/{retries}')
continue continue
encountered_error = error encountered_error = error

View File

@ -257,6 +257,12 @@ properties:
Number of times to retry a backup before failing. Defaults Number of times to retry a backup before failing. Defaults
to 0 (i.e. does not attempt retry). to 0 (i.e. does not attempt retry).
example: 3 example: 3
retry_timeout:
type: integer
description: |
Wait time between retries, to allow transient issues to pass
Defaults to 0s.
example: 10
temporary_directory: temporary_directory:
type: string type: string
description: | description: |

View File

@ -1,5 +1,6 @@
import logging import logging
import subprocess import subprocess
import time
from flexmock import flexmock from flexmock import flexmock
@ -184,6 +185,160 @@ def test_run_configuration_bails_for_on_error_hook_soft_failure():
assert results == expected_results assert results == expected_results
def test_run_retries_soft_error():
# Run action first fails, second passes
flexmock(module.borg_environment).should_receive('initialize')
flexmock(module.command).should_receive('execute_hook')
flexmock(module).should_receive('run_actions').and_raise(OSError).and_return([])
expected_results = [flexmock()]
flexmock(module).should_receive('make_error_log_records').and_return(expected_results).once()
config = {'location': {'repositories': ['foo']}, 'storage': {'retries': 1}}
arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
results = list(module.run_configuration('test.yaml', config, arguments))
assert results == expected_results
def test_run_retries_hard_error():
# Run action fails twice
flexmock(module.borg_environment).should_receive('initialize')
flexmock(module.command).should_receive('execute_hook')
flexmock(module).should_receive('run_actions').and_raise(OSError).times(2)
expected_results = [flexmock(), flexmock()]
flexmock(module).should_receive('make_error_log_records').with_args(
'foo: Error running actions for repository', OSError
).and_return(expected_results[:1]).with_args(
'foo: Error running actions for repository', OSError
).and_return(
expected_results[1:]
).twice()
config = {'location': {'repositories': ['foo']}, 'storage': {'retries': 1}}
arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
results = list(module.run_configuration('test.yaml', config, arguments))
assert results == expected_results
def test_run_repos_ordered():
flexmock(module.borg_environment).should_receive('initialize')
flexmock(module.command).should_receive('execute_hook')
flexmock(module).should_receive('run_actions').and_raise(OSError).times(2)
expected_results = [flexmock(), flexmock()]
flexmock(module).should_receive('make_error_log_records').with_args(
'foo: Error running actions for repository', OSError
).and_return(expected_results[:1]).ordered()
flexmock(module).should_receive('make_error_log_records').with_args(
'bar: Error running actions for repository', OSError
).and_return(expected_results[1:]).ordered()
config = {'location': {'repositories': ['foo', 'bar']}}
arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
results = list(module.run_configuration('test.yaml', config, arguments))
assert results == expected_results
def test_run_retries_round_robbin():
flexmock(module.borg_environment).should_receive('initialize')
flexmock(module.command).should_receive('execute_hook')
flexmock(module).should_receive('run_actions').and_raise(OSError).times(4)
expected_results = [flexmock(), flexmock(), flexmock(), flexmock()]
flexmock(module).should_receive('make_error_log_records').with_args(
'foo: Error running actions for repository', OSError
).and_return(expected_results[0:1]).ordered()
flexmock(module).should_receive('make_error_log_records').with_args(
'bar: Error running actions for repository', OSError
).and_return(expected_results[1:2]).ordered()
flexmock(module).should_receive('make_error_log_records').with_args(
'foo: Error running actions for repository', OSError
).and_return(expected_results[2:3]).ordered()
flexmock(module).should_receive('make_error_log_records').with_args(
'bar: Error running actions for repository', OSError
).and_return(expected_results[3:4]).ordered()
config = {'location': {'repositories': ['foo', 'bar']}, 'storage': {'retries': 1}}
arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
results = list(module.run_configuration('test.yaml', config, arguments))
assert results == expected_results
def test_run_retries_one_passes():
flexmock(module.borg_environment).should_receive('initialize')
flexmock(module.command).should_receive('execute_hook')
flexmock(module).should_receive('run_actions').and_raise(OSError).and_raise(OSError).and_return(
[]
).and_raise(OSError).times(4)
expected_results = [flexmock(), flexmock(), flexmock()]
flexmock(module).should_receive('make_error_log_records').with_args(
'foo: Error running actions for repository', OSError
).and_return(expected_results[0:1]).ordered()
flexmock(module).should_receive('make_error_log_records').with_args(
'bar: Error running actions for repository', OSError
).and_return(expected_results[1:2]).ordered()
flexmock(module).should_receive('make_error_log_records').with_args(
'bar: Error running actions for repository', OSError
).and_return(expected_results[2:3]).ordered()
config = {'location': {'repositories': ['foo', 'bar']}, 'storage': {'retries': 1}}
arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
results = list(module.run_configuration('test.yaml', config, arguments))
assert results == expected_results
def test_run_retry_timeout():
flexmock(module.borg_environment).should_receive('initialize')
flexmock(module.command).should_receive('execute_hook')
flexmock(module).should_receive('run_actions').and_raise(OSError).times(4)
expected_results = [flexmock(), flexmock(), flexmock(), flexmock()]
flexmock(module).should_receive('make_error_log_records').with_args(
'foo: Error running actions for repository', OSError
).and_return(expected_results[0:1]).ordered()
flexmock(time).should_receive('sleep').with_args(10).and_return().ordered()
flexmock(module).should_receive('make_error_log_records').with_args(
'foo: Error running actions for repository', OSError
).and_return(expected_results[1:2]).ordered()
flexmock(time).should_receive('sleep').with_args(20).and_return().ordered()
flexmock(module).should_receive('make_error_log_records').with_args(
'foo: Error running actions for repository', OSError
).and_return(expected_results[2:3]).ordered()
flexmock(time).should_receive('sleep').with_args(30).and_return().ordered()
flexmock(module).should_receive('make_error_log_records').with_args(
'foo: Error running actions for repository', OSError
).and_return(expected_results[3:4]).ordered()
config = {'location': {'repositories': ['foo']}, 'storage': {'retries': 3, 'retry_timeout': 10}}
arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
results = list(module.run_configuration('test.yaml', config, arguments))
assert results == expected_results
def test_run_retries_timeout_multiple_repos():
flexmock(module.borg_environment).should_receive('initialize')
flexmock(module.command).should_receive('execute_hook')
flexmock(module).should_receive('run_actions').and_raise(OSError).and_raise(OSError).and_return(
[]
).and_raise(OSError).times(4)
expected_results = [flexmock(), flexmock(), flexmock()]
flexmock(module).should_receive('make_error_log_records').with_args(
'foo: Error running actions for repository', OSError
).and_return(expected_results[0:1]).ordered()
flexmock(module).should_receive('make_error_log_records').with_args(
'bar: Error running actions for repository', OSError
).and_return(expected_results[1:2]).ordered()
# Sleep before retrying foo (and passing)
flexmock(time).should_receive('sleep').with_args(10).and_return().ordered()
# Sleep before retrying bar (and failing)
flexmock(time).should_receive('sleep').with_args(10).and_return().ordered()
flexmock(module).should_receive('make_error_log_records').with_args(
'bar: Error running actions for repository', OSError
).and_return(expected_results[2:3]).ordered()
config = {
'location': {'repositories': ['foo', 'bar']},
'storage': {'retries': 1, 'retry_timeout': 10},
}
arguments = {'global': flexmock(monitoring_verbosity=1, dry_run=False), 'create': flexmock()}
results = list(module.run_configuration('test.yaml', config, arguments))
assert results == expected_results
def test_load_configurations_collects_parsed_configurations(): def test_load_configurations_collects_parsed_configurations():
configuration = flexmock() configuration = flexmock()
other_configuration = flexmock() other_configuration = flexmock()