Add docstrings

This commit is contained in:
daniele 2023-05-29 23:10:29 +02:00
parent 335ad348e5
commit ba97b25e25
Signed by: fuxino
GPG Key ID: 981A2B2A3BBF5514
2 changed files with 93 additions and 29 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
.idea/
__pycache__/
test/

View File

@ -1,4 +1,15 @@
#!/usr/bin/python3
#!/usr/bin/env python3
"""
A simple python script that calls rsync to perform a backup
Parameters can be specified on the command line or using a configuration file
Backup to a remote server is also supported (experimental)
Classes:
MyFormatter
Backup
"""
# Import libraries
import sys
@ -59,6 +70,11 @@ if journal:
def timing(_logger):
"""Decorator to measure execution time of a function
Parameters:
_logger: Logger object
"""
def decorator_timing(func):
@wraps(func)
def wrapper_timing(*args, **kwargs):
@ -78,10 +94,44 @@ def timing(_logger):
class MyFormatter(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
pass
"""Custom format for argparse help text"""
class Backup:
"""Main class defining parameters and functions for performing backup
Attributes:
inputs: list
Files and folders that will be backup up
output: str
Path where the backup will be saved
exclude: list
List of files/folders/patterns to exclude from backup
options: str
String representing main backup options for rsync
keep: int
Number of old backup to preserve
host: str
Hostname of server (for remote backup)
username: str
Username for server login (for remote backup)
ssh_keyfile: str
Location of ssh key
remove_before: bool
Indicate if removing old backups will be performed before copying files
Methods:
check_params():
Check if parameters for the backup are valid
define_backup_dir():
Define the actual backup dir
remove_old_backups():
Remove old backups if there are more than indicated by 'keep'
find_last_backup():
Get path of last backup (from last_backup symlink) for rsync --link-dest
run():
Perform the backup
"""
def __init__(self, inputs, output, exclude, keep, options, host=None,
username=None, ssh_keyfile=None, remove_before=False):
@ -104,6 +154,8 @@ class Backup:
self._ssh = None
def check_params(self):
"""Check if parameters for the backup are valid"""
if self.inputs is None or len(self.inputs) == 0:
logger.info('No files or directory specified for backup.')
@ -146,6 +198,7 @@ class Backup:
# Function to create the actual backup directory
def define_backup_dir(self):
"""Define the actual backup dir"""
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self._output_dir = f'{self.output}/simple_backup/{now}'
@ -153,6 +206,8 @@ class Backup:
self._server = f'{self.username}@{self.host}:'
def remove_old_backups(self):
"""Remove old backups if there are more than indicated by 'keep'"""
if self._remote:
_, stdout, _ = self._ssh.exec_command(f'ls {self.output}/simple_backup')
@ -174,7 +229,7 @@ class Backup:
err = stderr.read().decode('utf-8').strip().split('\n')[0]
if err != '':
logger.error(f'Error while removing backup {dirs[i]}.')
logger.error('Error while removing backup %s.', {dirs[i]})
logger.error(err)
else:
count += 1
@ -199,16 +254,18 @@ class Backup:
rmtree(f'{self.output}/simple_backup/{dirs[i]}')
count += 1
except FileNotFoundError:
logger.error(f'Error while removing backup {dirs[i]}. Directory not found')
logger.error('Error while removing backup %s. Directory not found', dirs[i])
except PermissionError:
logger.error(f'Error while removing backup {dirs[i]}. Permission denied')
logger.error('Error while removing backup %s. Permission denied', dirs[i])
if count == 1:
logger.info(f'Removed {count} backup')
logger.info('Removed %d backup', count)
elif count > 1:
logger.info(f'Removed {count} backups')
logger.info('Removed %d backups', count)
def find_last_backup(self):
"""Get path of last backup (from last_backup symlink) for rsync --link-dest"""
if self._remote:
if self._ssh is None:
logger.critical('SSH connection to server failed')
@ -310,10 +367,12 @@ class Backup:
# Function to read configuration file
@timing(logger)
def run(self):
"""Perform the backup"""
logger.info('Starting backup...')
try:
notify('Starting backup...')
_notify('Starting backup...')
except NameError:
pass
@ -323,15 +382,15 @@ class Backup:
_, self._inputs_path = mkstemp(prefix='tmp_inputs', text=True)
_, self._exclude_path = mkstemp(prefix='tmp_exclude', text=True)
with open(self._inputs_path, 'w') as fp:
with open(self._inputs_path, 'w', encoding='utf-8') as fp:
for i in self.inputs:
if not os.path.exists(i):
logger.warning(f'Input {i} not found. Skipping')
logger.warning('Input %s not found. Skipping', i)
else:
fp.write(i)
fp.write('\n')
with open(self._exclude_path, 'w') as fp:
with open(self._exclude_path, 'w', encoding='utf-8') as fp:
for e in self.exclude:
fp.write(e)
fp.write('\n')
@ -351,24 +410,25 @@ class Backup:
'--ignore-missing-args --mkpath --protect-args'
args = shlex.split(rsync)
p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=STDOUT, shell=False)
output, _ = p.communicate()
with Popen(args, stdin=PIPE, stdout=PIPE, stderr=STDOUT, shell=False) as p:
output, _ = p.communicate()
if p.returncode != 0:
self._err_flag = True
if p.returncode != 0:
self._err_flag = True
output = output.decode("utf-8").split('\n')
if self._err_flag:
logger.error(f'rsync: {output[-3]}')
logger.error(f'rsync: {output[-2]}')
logger.error('rsync: %s', output[-3])
logger.error('rsync: %s', output[-2])
else:
logger.info(f'rsync: {output[-3]}')
logger.info(f'rsync: {output[-2]}')
logger.info('rsync: %s', output[-3])
logger.info('rsync: %s', output[-2])
if self._remote:
_, stdout, _ = self._ssh.exec_command(f'if [ -L "{self.output}/simple_backup/last_backup" ]; then echo "ok"; fi')
_, stdout, _ = \
self._ssh.exec_command(f'if [ -L "{self.output}/simple_backup/last_backup" ]; then echo "ok"; fi')
output = stdout.read().decode('utf-8').strip()
@ -392,7 +452,8 @@ class Backup:
self._err_flag = True
if self._remote and not self._err_flag:
_, _, stderr = self._ssh.exec_command(f'ln -s "{self._output_dir}" "{self.output}/simple_backup/last_backup"')
_, _, stderr =\
self._ssh.exec_command(f'ln -s "{self._output_dir}" "{self.output}/simple_backup/last_backup"')
err = stderr.read().decode('utf-8').strip()
@ -425,11 +486,11 @@ class Backup:
logger.warning('Some errors occurred')
try:
notify('Backup finished with errors (check log for details)')
_notify('Backup finished with errors (check log for details)')
except NameError:
pass
else:
notify('Backup finished')
_notify('Backup finished')
return 0
@ -462,9 +523,9 @@ def _parse_arguments():
def _read_config(config_file):
if not os.path.isfile(config_file):
logger.warning(f'Config file {config_file} does not exist')
logger.warning('Config file %s does not exist', config_file)
return None, None, None, None
return None, None, None, None, None, None, None
config = configparser.ConfigParser()
config.read(config_file)
@ -488,10 +549,10 @@ def _read_config(config_file):
return inputs, output, exclude, keep, host, username, ssh_keyfile
def notify(text):
euid = os.geteuid()
def _notify(text):
_euid = os.geteuid()
if euid == 0:
if _euid == 0:
uid = os.getenv('SUDO_UID')
else:
uid = os.geteuid()
@ -503,10 +564,12 @@ def notify(text):
obj = dbus.Interface(obj, 'org.freedesktop.Notifications')
obj.Notify('', 0, '', 'simple_backup', text, [], {'urgency': 1}, 10000)
os.seteuid(int(euid))
os.seteuid(int(_euid))
def simple_backup():
"""Main"""
args = _parse_arguments()
inputs, output, exclude, keep, username, host, ssh_keyfile = _read_config(args.config)