#!/usr/bin/python3 """Stefan Huber's simplistic backup solution.""" import datetime import os, shutil, sys import configparser import hashlib import subprocess import random, re Mode = ["full", "incr", "diff"] RealEpoch = { \ "hour" : datetime.timedelta(0, 3600), \ "day" : datetime.timedelta(1), \ "week" : datetime.timedelta(7), \ "month" : datetime.timedelta(30), \ "year" : datetime.timedelta(365) } Epoch = dict(RealEpoch, **{ \ "sporadic" : datetime.timedelta(0,0) \ }) class Backup: """A single backup has a date, an epoch and a mode.""" def __init__(self, date, epoch, mode): self.date = date self.epoch = epoch self.mode = mode @staticmethod def fromDirName(dirname): [strdate, strtime, epoch, mode] = dirname.split("-") if not epoch in Epoch.keys(): raise ValueError("Invalid epoch: " + epoch) if not mode in Mode: raise ValueError("Invalid mode: " + mode) date = datetime.datetime(int(strdate[0:4]), int(strdate[4:6]), int(strdate[6:8]),\ int(strtime[0:2]), int(strtime[2:4])) return Backup(date, epoch, mode) def __str__(self): return "[date: " + self.date.ctime() + \ ", epoch: " + self.epoch + \ ", mode: " + self.mode + "]" def colAlignedString(self): return "%16s %8s %4s" % ( \ self.date.strftime("%Y-%m-%d %H:%M"), self.epoch, self.mode) @staticmethod def getDirName(date, epoch, mode): """Get directory name of backup by given properties.""" return date.strftime("%Y%m%d-%H%M") + "-" + epoch + "-" + mode @staticmethod def isBackupDir(dirname): """Is directory a backup directory?""" p = re.compile(r'^\d\d\d\d\d\d\d\d-\d\d\d\d-\w+-\w+$') return p.match(dirname) class Config: """Encapsules the configuration for the backup program.""" class ReadError(RuntimeError): """An exception raised when reading configurations.""" def __init__(self, value): self.value = value self.message = value class FileSet: """A fileset has a name and a list of directories.""" def __init__(self, name, dirs): self.name = name self.dirs = dirs def __str__(self): return "[name: " + self.name + ", dirs: " + str(self.dirs) + "]" formats = ["tar", "tar.gz", "tar.bz2", "tar.xz" ] # Filename where checksum of config is saved checksumfn = "checksum" def __init__(self): self.directory = "/media/backup" self.format = self.formats[0] self.epochkeeps = { k : 0 for k in RealEpoch.keys() } self.epochmodes = { k : "full" for k in RealEpoch.keys() } self.exclpatterns = [] self.sets = [] self.checksum = None self.lastchecksum = None def __str__(self): return "[directory: " + self.directory + \ ", format: " + self.format + \ ", keeps: " + str(self.epochkeeps) + \ ", modes: " + str(self.epochmodes) + \ ", exclpatterns: " + str(self.exclpatterns) + \ ", sets: " + str([str(s) for s in self.sets]) + "]" def read(self, filename): """Read configuration from file""" if not os.path.isfile(filename): raise Config.ReadError("Cannot read config file '" + filename + "'.") config = configparser.RawConfigParser() config.read(filename) for reqsec in ["destination"]: if not config.has_section(reqsec): raise Config.ReadError("Section '" + reqsec + "' is missing.") self.directory = config.get("destination", "directory") if not os.path.isdir(self.directory): raise Config.ReadError("Directory '{0}' does not exist.".format(self.directory)) self.format = config.get("destination", "format") if not self.format in Config.formats: raise Config.ReadError("Invalid 'format' given.") if config.has_section("history"): for opt in config.options("history"): if opt.startswith("keep"): epoch = opt[4:] if not epoch in RealEpoch.keys(): raise Config.ReadError("Invalid option 'keep" + epoch + "'.") try: self.epochkeeps[epoch] = int(config.getint("history", opt)) except ValueError: raise Config.ReadError("Invalid integer given for '" + opt + "'.") elif opt.startswith("mode"): epoch = opt[4:] if not epoch in RealEpoch.keys(): raise Config.ReadError("Invalid option 'mode" + epoch + "'.") self.epochmodes[epoch] = config.get("history", opt) if not self.epochmodes[epoch] in Mode: raise Config.ReadError("Invalid mode given.") else: raise Config.ReadError("Invalid option '" + opt + "'.") if config.has_section("input"): for opt in config.options("input"): if opt.startswith("exclude"): self.exclpatterns += [ config.get("input", opt) ] else: raise Config.ReadError("Invalid option '" + opt + "'.") for sec in config.sections(): if sec in ["destination", "history", "input"]: continue elif sec.startswith("set "): name = sec[4:].strip() dirs = [] for opt in config.options(sec): if not opt.startswith("dir"): raise Config.ReadError("Unknown option '" + opt + "'.") else: dirs += [config.get(sec, opt)] self.sets += [Config.FileSet(name, dirs)] else: raise Config.ReadError("Unknown section '" + sec + "'.") # Compute checksum of config file m = hashlib.sha1() f = open(filename, 'rb') try: m.update(f.read()) self.checksum = m.hexdigest() finally: f.close() try: f = open(os.path.join(self.directory, self.checksumfn), 'r') self.lastchecksum = f.read().strip() f.close() except IOError: self.lastchecksum = None class BackupManager: """List and create backups""" def __init__(self, conffn, alwaysyes): self.conf = Config() self.alwaysyes = alwaysyes self.conf.read(conffn) def listAllDirs(self): """List all dirs in destination directory""" # Get all entries basedir = self.conf.directory dirs = os.listdir(basedir) # Filter directories return [ d for d in dirs if os.path.isdir(os.path.join(basedir, d)) ] def listOldBackups(self): """Returns a list of old backups.""" backups = [] for entry in [ b for b in self.listAllDirs() if Backup.isBackupDir(b) ]: backups += [ Backup.fromDirName(entry) ] return backups def getDesiredEpoch(self, backups, now): """Get desired epoch based on self.configuration and list of old backups""" # Find the longest epoch for which we would like the make a backup latest = datetime.datetime(1900, 1, 1) for timespan, e in reversed(sorted( [ (Epoch[e], e) for e in RealEpoch ] )): # We make backups of that epoch if self.conf.epochkeeps[e] == 0: continue # Get backups of that epoch byepoch = list(sorted( [ b for b in backups if b.epoch==e], \ key=lambda b: b.date)) # If there are any, determine the latest if len(byepoch) > 0: latest = max(latest, byepoch[-1].date ) # the latest backup is too old if now-latest > timespan: return e # No backup is to be made return None def backupFileSet(self, fileset, targetdir, since=None): """Create an archive for given fileset at given target directory.""" print("Running file set: " + fileset.name) tarpath = "/bin/tar" fsfn = os.path.join(targetdir, fileset.name) + "." + self.conf.format taropts = ["-cpva"] if since != None: taropts += ["-N", since.strftime("%Y-%m-%d %H:%M:%S")] for pat in self.conf.exclpatterns: taropts += ["--exclude", pat] tarargs = [tarpath] + taropts + ["-f", fsfn] + fileset.dirs #print("tarargs: ", tarargs) tarp = subprocess.Popen( tarargs ) rett = tarp.wait() if rett != 0: print(tarpath + " returned with exit status " + str(rett) + ":") def backup(self, epoch=None, mode=None): """Make a new backup, if necessary. If epoch is None then determine desired epoch automatically. Use given epoch otherwise. If mode is None then use mode for given epoch. Use given mode otherwise.""" now = datetime.datetime.now() oldbackups = self.listOldBackups() # Get epoch of backup if epoch == None: epoch = self.getDesiredEpoch(oldbackups, now) if epoch == None: print("No backup planned.") return # Get mode of backup if mode == None: mode = self.conf.epochmodes[epoch] print("Making a backup. Epoch: " + epoch + ", mode: " + mode) oldfullbackups = [ b for b in oldbackups if b.mode == "full" ] # No old full backups existing if mode != "full" and len(oldfullbackups)==0: print("No full backups existing. Making a full backup.") # Checksum changed -> self.config file changed if self.conf.checksum != self.conf.lastchecksum: print("Config file changed since last time.") if mode != "full": print("** Warning: full backup recommended!") # If we have a full backup, we backup everything since = None if mode == "diff": since = sorted(oldfullbackups, key=lambda b: b.date)[-1].date elif mode == "incr": since = sorted(oldbackups, key=lambda b: b.date)[-1].date if since != None: print("Making backup relative to ", since.ctime()) yesno = self.ask_user_yesno("Proceed? [Y, n] ") if yesno == "n": return # Create new target directory basedir = self.conf.directory dirname = Backup.getDirName(now, epoch, mode) tmpdirname = dirname + ("-%x" % (random.random()*2e16) ) targetdir = os.path.join(basedir, tmpdirname) os.mkdir( targetdir ) # Backup all file sets for s in self.conf.sets: self.backupFileSet(s, targetdir, since) # Rename backup directory to final name os.rename( targetdir, os.path.join(basedir, dirname) ) # We made a full backup -- recall checksum of config if mode == "full": f = open( os.path.join(basedir, self.conf.checksumfn), "w") f.write( self.conf.checksum ) f.close() def prune(self): """Prune old backup files""" allDirs = self.listAllDirs() # Collect all directories not matching backup name removeDirs = [ d for d in allDirs if not Backup.isBackupDir(d) ] # Get all directories which are kept backups = self.listOldBackups() keepdirs = [] byepoch = { e : list(sorted( [ b for b in backups if b.epoch == e ], \ key=lambda b : b.date, reverse=True)) for e in RealEpoch } for e in byepoch: keep = self.conf.epochkeeps[e] old = byepoch[e][keep:] removeDirs += [ Backup.getDirName(b.date, b.epoch, b.mode) for b in old] print("List of stale/outdated entries:") for d in allDirs: if d in removeDirs: print("[*] ", end="") else: print("[ ] ", end="") if Backup.isBackupDir(d): print( Backup.fromDirName(d).colAlignedString()) else: print(d) # Check that dirs to be removed is in list of all dirs for d in removeDirs: assert( d in allDirs ) if len(removeDirs) == 0: print("No stale/outdated entries to remove.") return basedir = self.conf.directory yesno = self.ask_user_yesno("Remove entries marked by '*'? [y, N] ") if yesno == "y": for d in removeDirs: shutil.rmtree(os.path.join(basedir, d)) def ask_user_yesno(self, question): if self.alwaysyes: print(question + " y") return "y" else: return input(question) def printUsage(): """Print --help text""" print("shbackup - a simple backup solution.") print("") print("Usage:") print(" " + sys.argv[0] + " {options} [cmd]") print(" " + sys.argv[0] + " --help") print("") print("Commands:") print(" backup make a new backup, if necessary") print(" list list all backups (default)") print(" prune prune outdated/old backups") print("") print("Options:") print(" -h, --help print this usage text") print(" -c, --conf use given configuration file") print(" default: /etc/shbackup.conf") print(" -e, --epoch force to create backup for given epoch:") print(" year, month, week, day, hour, sporadic") print(" -m, --mode override mode: full, diff, or incr") print(" -y, --yes always assume 'yes' when user is asked") if __name__ == "__main__": conffn = "/etc/shbackup.conf" cmd = "list" mode = None epoch = None yes = False i = 0 while i < len(sys.argv)-1: i += 1 opt = sys.argv[i] if opt in ["-h", "--help"]: printUsage() exit(0) elif opt in ["-c", "--conf"]: i += 1 conffn = sys.argv[i] elif opt in ["-y", "--yes"]: yes = True elif opt in ["-m", "--mode"]: i += 1 mode = sys.argv[i] if not mode in Mode: print("Unknown mode '" + mode + "'.") exit(1) elif opt in ["-e", "--epoch"]: i += 1 epoch = sys.argv[i] if not epoch in Epoch: print("Unknown epoch '" + epoch + "'.") exit(1) elif opt in ["backup", "list", "prune"]: cmd = opt else: print("Unknown option: " + opt) exit(1) try: man = BackupManager(conffn, yes) if cmd == "backup": man.backup(epoch, mode) if cmd == "list": for b in sorted(man.listOldBackups(), key=lambda b: b.date): print(b.colAlignedString()) if cmd == "prune": man.prune() except (Config.ReadError, configparser.DuplicateOptionError) as e: print("Error reading config file: " + e.message)