Source code for AutoArchive._services.archiver._external_tar_archiver_provider._external_tar_incremental_utility

# _external_tar_incremental_utility.py
#
# Project: AutoArchive
# License: GNU GPLv3
#
# Copyright (C) 2003 - 2026 Róbert Čerňanský



import errno
import glob
import itertools
import os
import shutil
import tempfile

from AutoArchive._infrastructure.utils import Utils, Constants



[docs] class _ExternalTarIncrementalUtility: """Utility class for GNU tar incremental backup operations.""" # subdirectory for snapshots __SNAPSHOTS_SUBDIR = "snapshots" # suffix for snapshot files used in incremental backups __SNAPSHOT_SUFFIX = ".snar" def __init__(self, backupId, workDir): self.__backupId = backupId self.__workDir = workDir self.__snapshotsDir = self.getSnapshotsDir(self.__workDir)
[docs] def getMaxBackupLevel(self): """Returns maximal backup level that is possible to create. :raise OSError: If a system error occurred.""" currentBackupLevel = self.__getBackupLevel() return currentBackupLevel + 1 if currentBackupLevel is not None else 0
[docs] def getSnapshotFileName(self, level, keepingId = None): "Returns full path to snapshot file for a certain backup level." keepingToken = "." + keepingId if keepingId else "" return os.path.join(self.__snapshotsDir, self.__backupId + self.__getLevelSuffix(level) + keepingToken + self.__SNAPSHOT_SUFFIX)
[docs] def getSnapshots(self): "Returns sequence of snapshot file names for current backup." return self.getSnapshotsForBackup(self.__snapshotsDir, self.__backupId)
[docs] @classmethod def getSnapshotsForBackup(cls, snapshotsDir, backupId = ""): """Returns sequence of snapshot file names for the archive named ``backupId`` or all of them. :param snapshotsDir: Directory where the snapshot files are stored. Can be obtained with :meth:`getSnapshotsDir` method. :type snapshotsDir: ``str`` :param backupId: Name of the archive for which the snapshot file names shall be returned. If not specified all snapshot files will be returned. :type backupId: ``str`` :return: Sequence of snapshot file names. :rtype: ``Sequence<str>`` :raise OSError: If ``snapshotsDir`` does not exist or is not accessible. The exception contains two parameters: the error message and the name of the directory.""" if not os.path.isdir(snapshotsDir): raise OSError("Snapshots directory does not exists.", snapshotsDir) if not Utils.effectiveAccess(snapshotsDir, os.R_OK | os.X_OK): raise OSError("Snapshots directory is not accessible for reading or listing", snapshotsDir) if backupId == "": backupId = "*" snapshots = itertools.chain( glob.iglob(os.path.join(snapshotsDir, backupId + cls.__SNAPSHOT_SUFFIX)), glob.iglob(os.path.join(snapshotsDir, backupId + ".*" + cls.__SNAPSHOT_SUFFIX))) return tuple(os.path.basename(snapshot) for snapshot in snapshots)
[docs] @classmethod def getSnapshotsDir(cls, workDir): return os.path.join(workDir, cls.__SNAPSHOTS_SUBDIR)
[docs] def manageSnapshotFiles(self, level, latestLevelSnapshotFilePath): """Moves snapshot file to its proper location and name in order to preserve it and removes redundant ones. :raise OSError: If a system error occurred.""" self.removeSnapshotFiles(level + 1) shutil.move(latestLevelSnapshotFilePath, self.getSnapshotFileName(level)) # change the file permissions according to umask umask = os.umask(0) os.umask(umask) os.chmod(self.getSnapshotFileName(level), 0o666 & ~umask)
[docs] def removeSnapshotFiles(self, level): """Remove snapshot files for levels higher or equal to ``level``.""" for snapshot in self.getSnapshots(): if self.getLevelFromFileName(snapshot) >= level: os.remove(os.path.join(self.__snapshotsDir, snapshot))
# SMELL: Currently a snapshot file with a keeping ID other than None will never exists. Snapshots are not kept.
[docs] def tryRemoveSnapshotFile(self, level, keepingId = None): """Removes snapshot file for given backup level if it exists. :param level: Backup level for which the snapshot file shall be removed. :type level: ``int`` :param keepingId: If not ``None`` a kept snapshot with this ID will be removed. :type keepingId: ``str`` :return: ``True`` if the snapshot file was removed; ``False`` if the file does not exists. :raise OSError: If a system error occurred.""" snapshotFileName = self.getSnapshotFileName(level, keepingId) if os.path.exists(snapshotFileName): try: os.remove(snapshotFileName) except OSError as ex: if ex.errno != errno.ENOENT: raise return False return True return False
[docs] def createWorkingSnapshotFile(self, level): """Copies snapshot file for ``level`` to a temporary file.""" tempFileDescriptor, tempFilePath = tempfile.mkstemp(self.__SNAPSHOT_SUFFIX, Constants.TEMP_FILE_PREFIX) # copy the snapshot file for previous level to a temporary one which will be used to create the new # increment; after that it will be moved to the proper location and name according to processed archive and # level; this way backup files for each created level will be preserved and thus it will be possible to # create also lower level backups (otherwise it would be only possible to create level N+1 backup (where N # is the latest/current level) or level 0 (go from the beginning)) if level > 0: with open(self.getSnapshotFileName(level - 1), "rb") as srcSnapshotFile: with open(tempFileDescriptor, "wb") as tempFile: shutil.copyfileobj(srcSnapshotFile, tempFile) else: # we do not need a previous snapshot file if the backup level is 0 os.close(tempFileDescriptor) os.remove(tempFilePath) return tempFilePath
[docs] @classmethod def makeSnapshotsDir(cls, workDir): """Creates the snapshots directory. :raise OSError: If creation of the directory was not successful.""" snapshotsDir = cls.getSnapshotsDir(workDir) if not os.path.exists(snapshotsDir): try: os.mkdir(snapshotsDir) except OSError as ex: if ex.errno != errno.EEXIST: raise
[docs] @staticmethod def getLevelFromFileName(fileName, keptBackup = False): """Extracts backup level number from the file name. :param fileName: Name of the file used to get the backup level from. It should be in the form: '<archive_name>[.<level>].<suffix>'. :type fileName: ``str`` :return: The :term:`backup level` retrieved from the file name. :rtype: ``int``""" root, levelToken = os.path.splitext(os.path.splitext(fileName)[0]) if keptBackup: root, levelToken = os.path.splitext(root) try: level = int(levelToken[1:]) except ValueError: levelToken = os.path.splitext(root)[1] try: level = int(levelToken[1:]) except ValueError: level = 0 return level
def __getBackupLevel(self): level = None snapshotFiles = self.getSnapshots() if len(snapshotFiles) > 0: level = max((self.getLevelFromFileName(snapshotFile) for snapshotFile in snapshotFiles)) return level @staticmethod def __getLevelSuffix(level): """Returns file name suffix according to the backup level. :param level: Backup level for which the suffix shall be returned. :type level: ``int`` :return: File name suffix for backup level in the form '.<level>'. :rtype: ``str``""" return "." + str(level) if level > 0 else ""