# external_tar_archiver_provider.py
#
# Project: AutoArchive
# License: GNU GPLv3
#
# Copyright (C) 2003 - 2026 Róbert Čerňanský
__all__ = ["ExternalTarArchiverProvider"]
import errno
import glob
import itertools
import os
import re
import shutil
import tempfile
from collections.abc import Iterable
from AutoArchive._infrastructure.py_additions import staticproperty
from AutoArchive._infrastructure.utils import Utils, Constants
from AutoArchive._infrastructure.utils.interval import IntervalElement, Interval
from AutoArchive._services.archiver import (
BackupTypes, BackupSubOperations, ArchiverFeatures, BackupOperationErrors, MIN_COMPRESSION_STRENGTH,
MAX_COMPRESSION_STRENGTH)
from AutoArchive._services.archiver._external_tar_archiver_provider._external_tar_incremental_utility import \
_ExternalTarIncrementalUtility
from AutoArchive._services.archiver._external_tar_archiver_provider._tar_executable import _TarExecutable, \
_TarOutputChannel
from AutoArchive._services.archiver.tar_archiver_provider_base import TarArchiverProviderBase, \
_BACKUP_TYPES_TO_EXTENSIONS
[docs]
class ExternalTarArchiverProvider(TarArchiverProviderBase):
"""External archiver service provider.
See also: :class:`.TarArchiverProviderBase`.
:raise OSError: If creation of the snapshot directory failed."""
# backup type to GNU tar compress option map
__BACKUP_TYPE_TO_COMPRESS_OPTION = {BackupTypes.Tar: "",
BackupTypes.TarGz: "--gzip",
BackupTypes.TarBz2: "--bzip2",
BackupTypes.TarXz: "--xz",
BackupTypes.TarZst: "--zstd"}
# {{{ TarArchiverProviderBase overrides
def __init__(self, workDir):
super().__init__(workDir)
# stores the error state during the backup operation
self.__errorOccurred = None
self.__tarExecutable = _TarExecutable()
_ExternalTarIncrementalUtility.makeSnapshotsDir(workDir)
@staticproperty
def supportedBackupTypes():
"See :attr:`.TarArchiverProviderBase.supportedBackupTypes`"
return frozenset({BackupTypes.Tar,
BackupTypes.TarGz,
BackupTypes.TarBz2,
BackupTypes.TarXz,
BackupTypes.TarZst})
[docs]
def backupFiles(self, backupDefinition, compressionStrength = None, overwriteAtStart = False):
"See: :meth:`.TarArchiverProviderBase.backupFiles()`."
super().backupFiles(backupDefinition, compressionStrength, overwriteAtStart)
self.__raiseIfBadCompressionStrength(compressionStrength)
backupFilePath = self.getBackupFilePath_(backupDefinition.backupId, backupDefinition.backupType,
backupDefinition.destination)
workingBackupFilePath = backupFilePath if overwriteAtStart else self.getWorkingPath_(backupFilePath)
sysEnvironment = os.environ.copy()
arguments = self.__arguments(backupDefinition.backupType, workingBackupFilePath, backupDefinition.root,
backupDefinition.includeFiles, backupDefinition.excludeFiles, compressionStrength,
sysEnvironment)
sysEnvironment.update(arguments[1])
self.__tarExecutable.run(arguments[0], sysEnvironment)
self.__processTarOutput()
if not overwriteAtStart:
shutil.move(workingBackupFilePath, backupFilePath)
return backupFilePath
[docs]
def backupFilesIncrementally(self, backupDefinition, compressionStrength = None, level = None,
overwriteAtStart = False):
"See: :meth:`.TarArchiverProviderBase.backupFilesIncrementally()`."
super().backupFilesIncrementally(backupDefinition, compressionStrength, level, overwriteAtStart)
self.__raiseIfBadCompressionStrength(compressionStrength)
externalTarIncrementalUtility = _ExternalTarIncrementalUtility(backupDefinition.backupId, self.workDir_)
maxBackupLevel = externalTarIncrementalUtility.getMaxBackupLevel()
if level is None:
level = maxBackupLevel
if level < 0 or level > maxBackupLevel:
raise ValueError(str.format(
"'level' must be from interval 0 <= level <= maxBackupLevel ({}). The passed value was {}.",
maxBackupLevel, level))
workingSnapshotFilePath = externalTarIncrementalUtility.createWorkingSnapshotFile(level)
backupFilePath = self.getBackupFilePath_(backupDefinition.backupId, backupDefinition.backupType,
backupDefinition.destination, level)
workingBackupFilePath = backupFilePath if overwriteAtStart else self.getWorkingPath_(backupFilePath)
sysEnvironment = os.environ.copy()
arguments = self.__arguments(backupDefinition.backupType, workingBackupFilePath, backupDefinition.root,
backupDefinition.includeFiles, backupDefinition.excludeFiles, compressionStrength,
sysEnvironment, workingSnapshotFilePath)
sysEnvironment.update(arguments[1])
try:
self.__tarExecutable.run(arguments[0], sysEnvironment)
self.__processTarOutput()
if not overwriteAtStart:
shutil.move(workingBackupFilePath, backupFilePath)
externalTarIncrementalUtility.manageSnapshotFiles(level, workingSnapshotFilePath)
finally:
if os.path.exists(workingSnapshotFilePath):
os.remove(workingSnapshotFilePath)
return backupFilePath
[docs]
def removeBackupIncrements(self, backupDefinition, level = None, keepingId = None):
"See: :meth:`.TarArchiverProviderBase.removeBackupIncrements()`."
externalTarIncrementalUtility = _ExternalTarIncrementalUtility(backupDefinition.backupId, self.workDir_)
self.raiseIfUnsupportedBackupType_(backupDefinition.backupType)
if level is not None:
if level < 0:
raise ValueError(str.format("'level' must be > 0. The passed value was {}.", level))
else:
level = self.getMaxBackupLevel(backupDefinition.backupId)
removeLevel = level
backupExists = True
while backupExists:
backupFilePath = self.getBackupFilePath_(
backupDefinition.backupId, backupDefinition.backupType, backupDefinition.destination, removeLevel,
keepingId)
backupExists = os.path.exists(backupFilePath)
if backupExists:
os.remove(backupFilePath)
externalTarIncrementalUtility.tryRemoveSnapshotFile(removeLevel, keepingId)
removeLevel += 1
if self.getMaxBackupLevel(backupDefinition.backupId) > level and keepingId is None:
# ouch! some rogue snapshots still exists; deal with them slowly and painfully
externalTarIncrementalUtility.removeSnapshotFiles(level)
[docs]
@classmethod
def getSupportedFeatures(cls, backupType = None):
"See: :meth:`.TarArchiverProviderBase.getSupportedFeatures()`."
if backupType is not None:
cls.raiseIfUnsupportedBackupType_(backupType)
if backupType == BackupTypes.Tar:
supportedFeatures = frozenset((ArchiverFeatures.Incremental,))
else:
supportedFeatures = frozenset({ArchiverFeatures.Incremental,
ArchiverFeatures.CompressionStrength})
return supportedFeatures
[docs]
def getMaxBackupLevel(self, backupId):
"See: :meth:`.TarArchiverProviderBase.getMaxBackupLevel()`."
return _ExternalTarIncrementalUtility(backupId, self.workDir_).getMaxBackupLevel()
[docs]
@Utils.uniq
def getStoredBackupIds(self):
"See: :meth:`.TarArchiverProviderBase.getStoredBackupIds()`."
snapshots = _ExternalTarIncrementalUtility.getSnapshotsForBackup(
_ExternalTarIncrementalUtility.getSnapshotsDir(self.workDir_))
return (os.path.splitext(os.path.splitext(os.path.basename(snapshot))[0])[0] for snapshot in snapshots)
[docs]
def purgeStoredBackupData(self, backupId):
"See: :meth:`.TarArchiverProviderBase.purgeStoredBackupData()`."
snapshotsDir = _ExternalTarIncrementalUtility.getSnapshotsDir(self.workDir_)
snapshots = _ExternalTarIncrementalUtility.getSnapshotsForBackup(snapshotsDir, backupId)
for snapshot in snapshots:
os.remove(os.path.join(snapshotsDir, snapshot))
[docs]
def doesAnyBackupLevelExist(self, backupDefinition, fromLevel = 0, keepingId = None):
"See: :meth:`.TarArchiverProviderBase.doesAnyBackupLevelExist()`."
keepToken = "." + keepingId if keepingId else ""
# SMELL: Backup path is similarly assembled in super().getBackupFilePath_.
level0Glob = os.path.join(backupDefinition.destination,
backupDefinition.backupId + keepToken + "." +
_BACKUP_TYPES_TO_EXTENSIONS[backupDefinition.backupType])
levelGreaterThan0Glob = os.path.join(backupDefinition.destination,
backupDefinition.backupId + ".*" + keepToken + "." +
_BACKUP_TYPES_TO_EXTENSIONS[backupDefinition.backupType])
backups = itertools.chain(glob.iglob(os.path.join(level0Glob)),
glob.iglob(os.path.join(levelGreaterThan0Glob)))
backups = itertools.dropwhile(
lambda bac:
_ExternalTarIncrementalUtility.getLevelFromFileName(
os.path.basename(bac), keepingId is not None) < fromLevel, backups)
return bool(list(itertools.islice(backups, 1)))
[docs]
def isBackupTypeAvailable_(self, backupType: BackupTypes) -> bool:
sysEnvironment = os.environ.copy()
if self.__tarExecutable.run(["--version"], sysEnvironment).wait() == 0:
if not any(line[0].find("GNU tar") >= 0 for line in self.__tarExecutable.output()):
return False
with tempfile.NamedTemporaryFile(prefix = Constants.TEMP_FILE_PREFIX, delete_on_close = False) as tempFile:
tempFile.close()
backupFile = tempFile.name + "." + _BACKUP_TYPES_TO_EXTENSIONS[backupType]
arguments = self.__arguments(
backupType, backupFile, os.curdir, [tempFile.name], [], None, sysEnvironment)
try:
tarRunSuccess = self.__tarExecutable.run(arguments[0], sysEnvironment).wait() == 0
except OSError:
return False
finally:
try:
os.remove(backupFile)
except FileNotFoundError:
# intentional silent handling of the exception
pass
return tarRunSuccess
# }}} TarArchiverProviderBase overrides
def __processTarOutput(self):
self.__errorOccurred = False
self.backupOperationError += self.__onBackupOperationError
try:
for line, channel in self.__tarExecutable.output():
self.__propagateArchiverMessage(line[:-1], sentToStderr = channel == _TarOutputChannel.StdErr)
finally:
self.backupOperationError -= self.__onBackupOperationError
if self.__tarExecutable.returnCode:
self.__handleArchiverExitCode(self.__tarExecutable.returnCode)
def __arguments(self, backupType, backupFilePath, root, includeFiles, excludeFiles, compressionStrength,
sysEnvironment, snapshotPath = None):
"Assembles and returns arguments to the tar binary."
compressOption = self.__BACKUP_TYPE_TO_COMPRESS_OPTION[backupType]
# operation has to be first one
archiverOptions = ["--create", "--format=posix", "--verbose"]
# do not let tar interpret drive letter as remote machine name on Windows
if Utils.isWindows() and re.match("[a-zA-Z]:\\\\", backupFilePath):
archiverOptions += ["--force-local"]
# insert options required for this archiver
if compressOption:
archiverOptions.append(compressOption)
if snapshotPath is not None:
archiverOptions.append("--listed-incremental=" + snapshotPath)
# add options required for this archiver type
# >if 'root' is just drive letter and backslash, MinGW GNU tar 1.35 on Windows does not recognize it if
# >--listed-incremental is passed as well; a dot has to be added
if Utils.isWindows() and re.match("[a-zA-Z]:\\\\$", root):
root = f"{root}."
archiverOptions += ["--file=" + backupFilePath, "--directory=" + root]
# add converted include and exclude files
archiverOptions += self.__convertIncludesAndExcludes(includeFiles, excludeFiles)
# create environment
environment = self.__setupCompressionStrength(backupType, compressionStrength, sysEnvironment)
return archiverOptions, environment
def __propagateArchiverMessage(self, message, sentToStderr = False):
"""Propagates archiver message as event.
Parses the passed ``message``, evaluates it and fires :meth:`.TarArchiverProviderBase.backupOperationError`
event if it is a (non-fatal) error message or :meth:`.TarArchiverProviderBase.fileAdd` otherwise."""
if sentToStderr:
# messages that will be ignored
if not message or re.search("(: Exiting with failure status due to previous errors)|" +
"(: (.*): Directory is new)", message):
return
match = re.search(": (.*): cannot stat: (.*)", message, re.IGNORECASE)
if match:
if match.groups()[1].find(os.strerror(errno.EACCES)) != -1:
self.backupOperationError(BackupSubOperations.Stat, BackupOperationErrors.PermissionDenied,
match.groups()[0])
else:
self.backupOperationError(BackupSubOperations.Stat, BackupOperationErrors.UnknownOsError,
match.groups()[0], match.groups()[1])
return
match = re.search(": (.*): cannot open: (.*)", message, re.IGNORECASE)
if match:
if match.groups()[1].find(os.strerror(errno.EACCES)) != -1:
self.backupOperationError(BackupSubOperations.Open, BackupOperationErrors.PermissionDenied,
match.groups()[0])
else:
self.backupOperationError(BackupSubOperations.Open, BackupOperationErrors.UnknownOsError,
match.groups()[0], match.groups()[1])
return
match = re.search(": (.*): socket ignored", message, re.IGNORECASE)
if match:
self.backupOperationError(BackupSubOperations.Open, BackupOperationErrors.SocketIgnored,
match.groups()[0])
return
match = re.search(": (.*): file changed as we read it", message, re.IGNORECASE)
if match:
self.backupOperationError(BackupSubOperations.Read, BackupOperationErrors.FileChanged,
match.groups()[0])
return
match = re.search(": (.*): directory has been renamed", message, re.IGNORECASE)
if match:
self.backupOperationError(BackupSubOperations.Read, BackupOperationErrors.DirectoryRenamed,
match.groups()[0])
return
match = re.search(": No space left on device", message, re.IGNORECASE)
if match:
raise RuntimeError("No space left on device.")
match = re.search("(unrecognized option.*)|(Try.+--help.+for more information.*)", message, re.IGNORECASE)
if match:
raise RuntimeError(str.format("Incompatible external archiver binary: {} ({}).",
self.__tarExecutable.path, match.group(0)))
match = re.search(": Error is not recoverable: exiting now", message, re.IGNORECASE)
if match:
raise RuntimeError("External archiver aborted.")
match = re.search(": (.*): (.+)", message, re.IGNORECASE)
if match:
self.backupOperationError(BackupSubOperations.UnknownFileOperation,
BackupOperationErrors.UnknownError, match.groups()[0], match.groups()[1])
return
self.backupOperationError(BackupSubOperations.Unknown, BackupOperationErrors.UnknownError,
unknownErrorString = message)
else:
self.fileAdd(message)
def __handleArchiverExitCode(self, exitCode):
if exitCode == 1:
self.backupOperationError(BackupSubOperations.Finish, BackupOperationErrors.SomeFilesChanged)
else:
if not self.__errorOccurred:
raise RuntimeError(str.format("Unexpected failure of the archiver program; exit code: {}", exitCode))
@staticmethod
def __convertIncludesAndExcludes(includeFiles: Iterable[str], excludeFiles: Iterable[str]):
"""Converts list of files and list of excluded files to the form suitable for the archiver program.
:return: List of arguments for the archiver."""
# MinGW GNU tar 1.35 on Windows does not find a file in some cases if path uses backslash as separator; also,
# excludes do not work in such case; therefore forward slash has to be used
if Utils.isWindows():
includeFiles = (f.replace("\\", "/") for f in includeFiles)
excludeFiles = (f.replace("\\", "/") for f in excludeFiles)
archiverOptions = []
if excludeFiles:
archiverOptions.append("--anchored")
for exclude in excludeFiles:
archiverOptions.append("--exclude=" + exclude)
archiverOptions += includeFiles
return archiverOptions
@staticmethod
def __setupCompressionStrength(backupType, compressionStrength, sysEnvironment):
"""Converts compression strength to an environment variable.
:return: Dictionary representing environment with the required environment variable."""
compressionStrength = \
IntervalElement(compressionStrength, Interval(MIN_COMPRESSION_STRENGTH, MAX_COMPRESSION_STRENGTH))
environment = {}
# SMELL: Each backup type should have its own archiver provider class or at least mapping of type to env name
if compressionStrength.value is not None:
if backupType == BackupTypes.TarGz:
envName = "GZIP"
compressionStrength = compressionStrength.remapTo(Interval(1, 9))
elif backupType == BackupTypes.TarBz2:
envName = "BZIP2"
compressionStrength = compressionStrength.remapTo(Interval(1, 9))
elif backupType == BackupTypes.TarXz:
envName = "XZ_OPT"
elif backupType == BackupTypes.TarZst:
envName = "ZSTD_CLEVEL"
compressionStrength = compressionStrength.remapTo(Interval(1, 19))
else:
raise RuntimeError(str.format("Unexpected backup type: {}", backupType))
if backupType == BackupTypes.TarZst:
environment[envName] = str(compressionStrength.value)
else:
sysEnvValue = sysEnvironment[envName] + " " if envName in sysEnvironment else ""
environment[envName] = sysEnvValue + "-" + str(compressionStrength.value)
return environment
def __onBackupOperationError(self, operation, error, filesystemObjectName = None, unknownErrorString = None):
self.__errorOccurred = self.__errorOccurred or \
(operation != BackupSubOperations.Finish and
(error == BackupOperationErrors.PermissionDenied or
error == BackupOperationErrors.UnknownOsError or
error == BackupOperationErrors.UnknownError))
@staticmethod
def __raiseIfBadCompressionStrength(compressionStrength):
if compressionStrength is not None and \
(compressionStrength < MIN_COMPRESSION_STRENGTH or compressionStrength > MAX_COMPRESSION_STRENGTH):
raise ValueError(str.format("Compression strength value {} is out of defined interval",
compressionStrength))