Source code for AutoArchive._services.archiver._external_tar_archiver_provider.external_tar_archiver_provider

# external_tar_archiver_provider.py
#
# Project: AutoArchive
# License: GNU GPLv3
#
# Copyright (C) 2003 - 2026 Róbert Čerňanský



__all__ = ["ExternalTarArchiverProvider"]



import errno
import glob
import itertools
import os
import re
import shutil
import tempfile
from collections.abc import Iterable

from AutoArchive._infrastructure.py_additions import staticproperty
from AutoArchive._infrastructure.utils import Utils, Constants
from AutoArchive._infrastructure.utils.interval import IntervalElement, Interval
from AutoArchive._services.archiver import (
    BackupTypes, BackupSubOperations, ArchiverFeatures, BackupOperationErrors, MIN_COMPRESSION_STRENGTH,
    MAX_COMPRESSION_STRENGTH)
from AutoArchive._services.archiver._external_tar_archiver_provider._external_tar_incremental_utility import \
    _ExternalTarIncrementalUtility
from AutoArchive._services.archiver._external_tar_archiver_provider._tar_executable import _TarExecutable, \
    _TarOutputChannel
from AutoArchive._services.archiver.tar_archiver_provider_base import TarArchiverProviderBase, \
    _BACKUP_TYPES_TO_EXTENSIONS



[docs] class ExternalTarArchiverProvider(TarArchiverProviderBase): """External archiver service provider. See also: :class:`.TarArchiverProviderBase`. :raise OSError: If creation of the snapshot directory failed.""" # backup type to GNU tar compress option map __BACKUP_TYPE_TO_COMPRESS_OPTION = {BackupTypes.Tar: "", BackupTypes.TarGz: "--gzip", BackupTypes.TarBz2: "--bzip2", BackupTypes.TarXz: "--xz", BackupTypes.TarZst: "--zstd"} # {{{ TarArchiverProviderBase overrides def __init__(self, workDir): super().__init__(workDir) # stores the error state during the backup operation self.__errorOccurred = None self.__tarExecutable = _TarExecutable() _ExternalTarIncrementalUtility.makeSnapshotsDir(workDir) @staticproperty def supportedBackupTypes(): "See :attr:`.TarArchiverProviderBase.supportedBackupTypes`" return frozenset({BackupTypes.Tar, BackupTypes.TarGz, BackupTypes.TarBz2, BackupTypes.TarXz, BackupTypes.TarZst})
[docs] def backupFiles(self, backupDefinition, compressionStrength = None, overwriteAtStart = False): "See: :meth:`.TarArchiverProviderBase.backupFiles()`." super().backupFiles(backupDefinition, compressionStrength, overwriteAtStart) self.__raiseIfBadCompressionStrength(compressionStrength) backupFilePath = self.getBackupFilePath_(backupDefinition.backupId, backupDefinition.backupType, backupDefinition.destination) workingBackupFilePath = backupFilePath if overwriteAtStart else self.getWorkingPath_(backupFilePath) sysEnvironment = os.environ.copy() arguments = self.__arguments(backupDefinition.backupType, workingBackupFilePath, backupDefinition.root, backupDefinition.includeFiles, backupDefinition.excludeFiles, compressionStrength, sysEnvironment) sysEnvironment.update(arguments[1]) self.__tarExecutable.run(arguments[0], sysEnvironment) self.__processTarOutput() if not overwriteAtStart: shutil.move(workingBackupFilePath, backupFilePath) return backupFilePath
[docs] def backupFilesIncrementally(self, backupDefinition, compressionStrength = None, level = None, overwriteAtStart = False): "See: :meth:`.TarArchiverProviderBase.backupFilesIncrementally()`." super().backupFilesIncrementally(backupDefinition, compressionStrength, level, overwriteAtStart) self.__raiseIfBadCompressionStrength(compressionStrength) externalTarIncrementalUtility = _ExternalTarIncrementalUtility(backupDefinition.backupId, self.workDir_) maxBackupLevel = externalTarIncrementalUtility.getMaxBackupLevel() if level is None: level = maxBackupLevel if level < 0 or level > maxBackupLevel: raise ValueError(str.format( "'level' must be from interval 0 <= level <= maxBackupLevel ({}). The passed value was {}.", maxBackupLevel, level)) workingSnapshotFilePath = externalTarIncrementalUtility.createWorkingSnapshotFile(level) backupFilePath = self.getBackupFilePath_(backupDefinition.backupId, backupDefinition.backupType, backupDefinition.destination, level) workingBackupFilePath = backupFilePath if overwriteAtStart else self.getWorkingPath_(backupFilePath) sysEnvironment = os.environ.copy() arguments = self.__arguments(backupDefinition.backupType, workingBackupFilePath, backupDefinition.root, backupDefinition.includeFiles, backupDefinition.excludeFiles, compressionStrength, sysEnvironment, workingSnapshotFilePath) sysEnvironment.update(arguments[1]) try: self.__tarExecutable.run(arguments[0], sysEnvironment) self.__processTarOutput() if not overwriteAtStart: shutil.move(workingBackupFilePath, backupFilePath) externalTarIncrementalUtility.manageSnapshotFiles(level, workingSnapshotFilePath) finally: if os.path.exists(workingSnapshotFilePath): os.remove(workingSnapshotFilePath) return backupFilePath
[docs] def removeBackupIncrements(self, backupDefinition, level = None, keepingId = None): "See: :meth:`.TarArchiverProviderBase.removeBackupIncrements()`." externalTarIncrementalUtility = _ExternalTarIncrementalUtility(backupDefinition.backupId, self.workDir_) self.raiseIfUnsupportedBackupType_(backupDefinition.backupType) if level is not None: if level < 0: raise ValueError(str.format("'level' must be > 0. The passed value was {}.", level)) else: level = self.getMaxBackupLevel(backupDefinition.backupId) removeLevel = level backupExists = True while backupExists: backupFilePath = self.getBackupFilePath_( backupDefinition.backupId, backupDefinition.backupType, backupDefinition.destination, removeLevel, keepingId) backupExists = os.path.exists(backupFilePath) if backupExists: os.remove(backupFilePath) externalTarIncrementalUtility.tryRemoveSnapshotFile(removeLevel, keepingId) removeLevel += 1 if self.getMaxBackupLevel(backupDefinition.backupId) > level and keepingId is None: # ouch! some rogue snapshots still exists; deal with them slowly and painfully externalTarIncrementalUtility.removeSnapshotFiles(level)
[docs] @classmethod def getSupportedFeatures(cls, backupType = None): "See: :meth:`.TarArchiverProviderBase.getSupportedFeatures()`." if backupType is not None: cls.raiseIfUnsupportedBackupType_(backupType) if backupType == BackupTypes.Tar: supportedFeatures = frozenset((ArchiverFeatures.Incremental,)) else: supportedFeatures = frozenset({ArchiverFeatures.Incremental, ArchiverFeatures.CompressionStrength}) return supportedFeatures
[docs] def getMaxBackupLevel(self, backupId): "See: :meth:`.TarArchiverProviderBase.getMaxBackupLevel()`." return _ExternalTarIncrementalUtility(backupId, self.workDir_).getMaxBackupLevel()
[docs] @Utils.uniq def getStoredBackupIds(self): "See: :meth:`.TarArchiverProviderBase.getStoredBackupIds()`." snapshots = _ExternalTarIncrementalUtility.getSnapshotsForBackup( _ExternalTarIncrementalUtility.getSnapshotsDir(self.workDir_)) return (os.path.splitext(os.path.splitext(os.path.basename(snapshot))[0])[0] for snapshot in snapshots)
[docs] def purgeStoredBackupData(self, backupId): "See: :meth:`.TarArchiverProviderBase.purgeStoredBackupData()`." snapshotsDir = _ExternalTarIncrementalUtility.getSnapshotsDir(self.workDir_) snapshots = _ExternalTarIncrementalUtility.getSnapshotsForBackup(snapshotsDir, backupId) for snapshot in snapshots: os.remove(os.path.join(snapshotsDir, snapshot))
[docs] def doesAnyBackupLevelExist(self, backupDefinition, fromLevel = 0, keepingId = None): "See: :meth:`.TarArchiverProviderBase.doesAnyBackupLevelExist()`." keepToken = "." + keepingId if keepingId else "" # SMELL: Backup path is similarly assembled in super().getBackupFilePath_. level0Glob = os.path.join(backupDefinition.destination, backupDefinition.backupId + keepToken + "." + _BACKUP_TYPES_TO_EXTENSIONS[backupDefinition.backupType]) levelGreaterThan0Glob = os.path.join(backupDefinition.destination, backupDefinition.backupId + ".*" + keepToken + "." + _BACKUP_TYPES_TO_EXTENSIONS[backupDefinition.backupType]) backups = itertools.chain(glob.iglob(os.path.join(level0Glob)), glob.iglob(os.path.join(levelGreaterThan0Glob))) backups = itertools.dropwhile( lambda bac: _ExternalTarIncrementalUtility.getLevelFromFileName( os.path.basename(bac), keepingId is not None) < fromLevel, backups) return bool(list(itertools.islice(backups, 1)))
[docs] def isBackupTypeAvailable_(self, backupType: BackupTypes) -> bool: sysEnvironment = os.environ.copy() if self.__tarExecutable.run(["--version"], sysEnvironment).wait() == 0: if not any(line[0].find("GNU tar") >= 0 for line in self.__tarExecutable.output()): return False with tempfile.NamedTemporaryFile(prefix = Constants.TEMP_FILE_PREFIX, delete_on_close = False) as tempFile: tempFile.close() backupFile = tempFile.name + "." + _BACKUP_TYPES_TO_EXTENSIONS[backupType] arguments = self.__arguments( backupType, backupFile, os.curdir, [tempFile.name], [], None, sysEnvironment) try: tarRunSuccess = self.__tarExecutable.run(arguments[0], sysEnvironment).wait() == 0 except OSError: return False finally: try: os.remove(backupFile) except FileNotFoundError: # intentional silent handling of the exception pass return tarRunSuccess
# }}} TarArchiverProviderBase overrides def __processTarOutput(self): self.__errorOccurred = False self.backupOperationError += self.__onBackupOperationError try: for line, channel in self.__tarExecutable.output(): self.__propagateArchiverMessage(line[:-1], sentToStderr = channel == _TarOutputChannel.StdErr) finally: self.backupOperationError -= self.__onBackupOperationError if self.__tarExecutable.returnCode: self.__handleArchiverExitCode(self.__tarExecutable.returnCode) def __arguments(self, backupType, backupFilePath, root, includeFiles, excludeFiles, compressionStrength, sysEnvironment, snapshotPath = None): "Assembles and returns arguments to the tar binary." compressOption = self.__BACKUP_TYPE_TO_COMPRESS_OPTION[backupType] # operation has to be first one archiverOptions = ["--create", "--format=posix", "--verbose"] # do not let tar interpret drive letter as remote machine name on Windows if Utils.isWindows() and re.match("[a-zA-Z]:\\\\", backupFilePath): archiverOptions += ["--force-local"] # insert options required for this archiver if compressOption: archiverOptions.append(compressOption) if snapshotPath is not None: archiverOptions.append("--listed-incremental=" + snapshotPath) # add options required for this archiver type # >if 'root' is just drive letter and backslash, MinGW GNU tar 1.35 on Windows does not recognize it if # >--listed-incremental is passed as well; a dot has to be added if Utils.isWindows() and re.match("[a-zA-Z]:\\\\$", root): root = f"{root}." archiverOptions += ["--file=" + backupFilePath, "--directory=" + root] # add converted include and exclude files archiverOptions += self.__convertIncludesAndExcludes(includeFiles, excludeFiles) # create environment environment = self.__setupCompressionStrength(backupType, compressionStrength, sysEnvironment) return archiverOptions, environment def __propagateArchiverMessage(self, message, sentToStderr = False): """Propagates archiver message as event. Parses the passed ``message``, evaluates it and fires :meth:`.TarArchiverProviderBase.backupOperationError` event if it is a (non-fatal) error message or :meth:`.TarArchiverProviderBase.fileAdd` otherwise.""" if sentToStderr: # messages that will be ignored if not message or re.search("(: Exiting with failure status due to previous errors)|" + "(: (.*): Directory is new)", message): return match = re.search(": (.*): cannot stat: (.*)", message, re.IGNORECASE) if match: if match.groups()[1].find(os.strerror(errno.EACCES)) != -1: self.backupOperationError(BackupSubOperations.Stat, BackupOperationErrors.PermissionDenied, match.groups()[0]) else: self.backupOperationError(BackupSubOperations.Stat, BackupOperationErrors.UnknownOsError, match.groups()[0], match.groups()[1]) return match = re.search(": (.*): cannot open: (.*)", message, re.IGNORECASE) if match: if match.groups()[1].find(os.strerror(errno.EACCES)) != -1: self.backupOperationError(BackupSubOperations.Open, BackupOperationErrors.PermissionDenied, match.groups()[0]) else: self.backupOperationError(BackupSubOperations.Open, BackupOperationErrors.UnknownOsError, match.groups()[0], match.groups()[1]) return match = re.search(": (.*): socket ignored", message, re.IGNORECASE) if match: self.backupOperationError(BackupSubOperations.Open, BackupOperationErrors.SocketIgnored, match.groups()[0]) return match = re.search(": (.*): file changed as we read it", message, re.IGNORECASE) if match: self.backupOperationError(BackupSubOperations.Read, BackupOperationErrors.FileChanged, match.groups()[0]) return match = re.search(": (.*): directory has been renamed", message, re.IGNORECASE) if match: self.backupOperationError(BackupSubOperations.Read, BackupOperationErrors.DirectoryRenamed, match.groups()[0]) return match = re.search(": No space left on device", message, re.IGNORECASE) if match: raise RuntimeError("No space left on device.") match = re.search("(unrecognized option.*)|(Try.+--help.+for more information.*)", message, re.IGNORECASE) if match: raise RuntimeError(str.format("Incompatible external archiver binary: {} ({}).", self.__tarExecutable.path, match.group(0))) match = re.search(": Error is not recoverable: exiting now", message, re.IGNORECASE) if match: raise RuntimeError("External archiver aborted.") match = re.search(": (.*): (.+)", message, re.IGNORECASE) if match: self.backupOperationError(BackupSubOperations.UnknownFileOperation, BackupOperationErrors.UnknownError, match.groups()[0], match.groups()[1]) return self.backupOperationError(BackupSubOperations.Unknown, BackupOperationErrors.UnknownError, unknownErrorString = message) else: self.fileAdd(message) def __handleArchiverExitCode(self, exitCode): if exitCode == 1: self.backupOperationError(BackupSubOperations.Finish, BackupOperationErrors.SomeFilesChanged) else: if not self.__errorOccurred: raise RuntimeError(str.format("Unexpected failure of the archiver program; exit code: {}", exitCode)) @staticmethod def __convertIncludesAndExcludes(includeFiles: Iterable[str], excludeFiles: Iterable[str]): """Converts list of files and list of excluded files to the form suitable for the archiver program. :return: List of arguments for the archiver.""" # MinGW GNU tar 1.35 on Windows does not find a file in some cases if path uses backslash as separator; also, # excludes do not work in such case; therefore forward slash has to be used if Utils.isWindows(): includeFiles = (f.replace("\\", "/") for f in includeFiles) excludeFiles = (f.replace("\\", "/") for f in excludeFiles) archiverOptions = [] if excludeFiles: archiverOptions.append("--anchored") for exclude in excludeFiles: archiverOptions.append("--exclude=" + exclude) archiverOptions += includeFiles return archiverOptions @staticmethod def __setupCompressionStrength(backupType, compressionStrength, sysEnvironment): """Converts compression strength to an environment variable. :return: Dictionary representing environment with the required environment variable.""" compressionStrength = \ IntervalElement(compressionStrength, Interval(MIN_COMPRESSION_STRENGTH, MAX_COMPRESSION_STRENGTH)) environment = {} # SMELL: Each backup type should have its own archiver provider class or at least mapping of type to env name if compressionStrength.value is not None: if backupType == BackupTypes.TarGz: envName = "GZIP" compressionStrength = compressionStrength.remapTo(Interval(1, 9)) elif backupType == BackupTypes.TarBz2: envName = "BZIP2" compressionStrength = compressionStrength.remapTo(Interval(1, 9)) elif backupType == BackupTypes.TarXz: envName = "XZ_OPT" elif backupType == BackupTypes.TarZst: envName = "ZSTD_CLEVEL" compressionStrength = compressionStrength.remapTo(Interval(1, 19)) else: raise RuntimeError(str.format("Unexpected backup type: {}", backupType)) if backupType == BackupTypes.TarZst: environment[envName] = str(compressionStrength.value) else: sysEnvValue = sysEnvironment[envName] + " " if envName in sysEnvironment else "" environment[envName] = sysEnvValue + "-" + str(compressionStrength.value) return environment def __onBackupOperationError(self, operation, error, filesystemObjectName = None, unknownErrorString = None): self.__errorOccurred = self.__errorOccurred or \ (operation != BackupSubOperations.Finish and (error == BackupOperationErrors.PermissionDenied or error == BackupOperationErrors.UnknownOsError or error == BackupOperationErrors.UnknownError)) @staticmethod def __raiseIfBadCompressionStrength(compressionStrength): if compressionStrength is not None and \ (compressionStrength < MIN_COMPRESSION_STRENGTH or compressionStrength > MAX_COMPRESSION_STRENGTH): raise ValueError(str.format("Compression strength value {} is out of defined interval", compressionStrength))