Source code for AutoArchive._services.archiver._external_tar_archiver_provider._external_tar_incremental_utility
# _external_tar_incremental_utility.py
#
# Project: AutoArchive
# License: GNU GPLv3
#
# Copyright (C) 2003 - 2026 Róbert Čerňanský
import errno
import glob
import itertools
import os
import shutil
import tempfile
from AutoArchive._infrastructure.utils import Utils, Constants
[docs]
class _ExternalTarIncrementalUtility:
"""Utility class for GNU tar incremental backup operations."""
# subdirectory for snapshots
__SNAPSHOTS_SUBDIR = "snapshots"
# suffix for snapshot files used in incremental backups
__SNAPSHOT_SUFFIX = ".snar"
def __init__(self, backupId, workDir):
self.__backupId = backupId
self.__workDir = workDir
self.__snapshotsDir = self.getSnapshotsDir(self.__workDir)
[docs]
def getMaxBackupLevel(self):
"""Returns maximal backup level that is possible to create.
:raise OSError: If a system error occurred."""
currentBackupLevel = self.__getBackupLevel()
return currentBackupLevel + 1 if currentBackupLevel is not None else 0
[docs]
def getSnapshotFileName(self, level, keepingId = None):
"Returns full path to snapshot file for a certain backup level."
keepingToken = "." + keepingId if keepingId else ""
return os.path.join(self.__snapshotsDir,
self.__backupId + self.__getLevelSuffix(level) + keepingToken + self.__SNAPSHOT_SUFFIX)
[docs]
def getSnapshots(self):
"Returns sequence of snapshot file names for current backup."
return self.getSnapshotsForBackup(self.__snapshotsDir, self.__backupId)
[docs]
@classmethod
def getSnapshotsForBackup(cls, snapshotsDir, backupId = ""):
"""Returns sequence of snapshot file names for the archive named ``backupId`` or all of them.
:param snapshotsDir: Directory where the snapshot files are stored. Can be obtained with
:meth:`getSnapshotsDir` method.
:type snapshotsDir: ``str``
:param backupId: Name of the archive for which the snapshot file names shall be returned. If not specified
all snapshot files will be returned.
:type backupId: ``str``
:return: Sequence of snapshot file names.
:rtype: ``Sequence<str>``
:raise OSError: If ``snapshotsDir`` does not exist or is not accessible. The exception contains two
parameters: the error message and the name of the directory."""
if not os.path.isdir(snapshotsDir):
raise OSError("Snapshots directory does not exists.", snapshotsDir)
if not Utils.effectiveAccess(snapshotsDir, os.R_OK | os.X_OK):
raise OSError("Snapshots directory is not accessible for reading or listing", snapshotsDir)
if backupId == "":
backupId = "*"
snapshots = itertools.chain(
glob.iglob(os.path.join(snapshotsDir, backupId + cls.__SNAPSHOT_SUFFIX)),
glob.iglob(os.path.join(snapshotsDir, backupId + ".*" + cls.__SNAPSHOT_SUFFIX)))
return tuple(os.path.basename(snapshot) for snapshot in snapshots)
[docs]
@classmethod
def getSnapshotsDir(cls, workDir):
return os.path.join(workDir, cls.__SNAPSHOTS_SUBDIR)
[docs]
def manageSnapshotFiles(self, level, latestLevelSnapshotFilePath):
"""Moves snapshot file to its proper location and name in order to preserve it and removes redundant ones.
:raise OSError: If a system error occurred."""
self.removeSnapshotFiles(level + 1)
shutil.move(latestLevelSnapshotFilePath, self.getSnapshotFileName(level))
# change the file permissions according to umask
umask = os.umask(0)
os.umask(umask)
os.chmod(self.getSnapshotFileName(level), 0o666 & ~umask)
[docs]
def removeSnapshotFiles(self, level):
"""Remove snapshot files for levels higher or equal to ``level``."""
for snapshot in self.getSnapshots():
if self.getLevelFromFileName(snapshot) >= level:
os.remove(os.path.join(self.__snapshotsDir, snapshot))
# SMELL: Currently a snapshot file with a keeping ID other than None will never exists. Snapshots are not kept.
[docs]
def tryRemoveSnapshotFile(self, level, keepingId = None):
"""Removes snapshot file for given backup level if it exists.
:param level: Backup level for which the snapshot file shall be removed.
:type level: ``int``
:param keepingId: If not ``None`` a kept snapshot with this ID will be removed.
:type keepingId: ``str``
:return: ``True`` if the snapshot file was removed; ``False`` if the file does not exists.
:raise OSError: If a system error occurred."""
snapshotFileName = self.getSnapshotFileName(level, keepingId)
if os.path.exists(snapshotFileName):
try:
os.remove(snapshotFileName)
except OSError as ex:
if ex.errno != errno.ENOENT:
raise
return False
return True
return False
[docs]
def createWorkingSnapshotFile(self, level):
"""Copies snapshot file for ``level`` to a temporary file."""
tempFileDescriptor, tempFilePath = tempfile.mkstemp(self.__SNAPSHOT_SUFFIX, Constants.TEMP_FILE_PREFIX)
# copy the snapshot file for previous level to a temporary one which will be used to create the new
# increment; after that it will be moved to the proper location and name according to processed archive and
# level; this way backup files for each created level will be preserved and thus it will be possible to
# create also lower level backups (otherwise it would be only possible to create level N+1 backup (where N
# is the latest/current level) or level 0 (go from the beginning))
if level > 0:
with open(self.getSnapshotFileName(level - 1), "rb") as srcSnapshotFile:
with open(tempFileDescriptor, "wb") as tempFile:
shutil.copyfileobj(srcSnapshotFile, tempFile)
else:
# we do not need a previous snapshot file if the backup level is 0
os.close(tempFileDescriptor)
os.remove(tempFilePath)
return tempFilePath
[docs]
@classmethod
def makeSnapshotsDir(cls, workDir):
"""Creates the snapshots directory.
:raise OSError: If creation of the directory was not successful."""
snapshotsDir = cls.getSnapshotsDir(workDir)
if not os.path.exists(snapshotsDir):
try:
os.mkdir(snapshotsDir)
except OSError as ex:
if ex.errno != errno.EEXIST:
raise
[docs]
@staticmethod
def getLevelFromFileName(fileName, keptBackup = False):
"""Extracts backup level number from the file name.
:param fileName: Name of the file used to get the backup level from. It should be in the form:
'<archive_name>[.<level>].<suffix>'.
:type fileName: ``str``
:return: The :term:`backup level` retrieved from the file name.
:rtype: ``int``"""
root, levelToken = os.path.splitext(os.path.splitext(fileName)[0])
if keptBackup:
root, levelToken = os.path.splitext(root)
try:
level = int(levelToken[1:])
except ValueError:
levelToken = os.path.splitext(root)[1]
try:
level = int(levelToken[1:])
except ValueError:
level = 0
return level
def __getBackupLevel(self):
level = None
snapshotFiles = self.getSnapshots()
if len(snapshotFiles) > 0:
level = max((self.getLevelFromFileName(snapshotFile)
for snapshotFile in snapshotFiles))
return level
@staticmethod
def __getLevelSuffix(level):
"""Returns file name suffix according to the backup level.
:param level: Backup level for which the suffix shall be returned.
:type level: ``int``
:return: File name suffix for backup level in the form '.<level>'.
:rtype: ``str``"""
return "." + str(level) if level > 0 else ""