Coverage for src/edwh_files_plugin/compression.py: 81%
217 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-28 11:02 +0100
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-28 11:02 +0100
1import abc
2import os
3import shutil
4import typing
5import warnings
6from pathlib import Path
7from subprocess import run
8from typing import Optional, Self
10from plumbum import local
11from plumbum.commands.processes import CommandNotFound
12from rich import print # noqa: A004
14PathLike: typing.TypeAlias = str | Path
16DEFAULT_COMPRESSION_LEVEL = 5
19def run_ok(command: str) -> bool:
20 """
21 Executes a command and returns whether it ended successfully (with return code 0).
23 Args:
24 command (str): The command to run.
26 Returns:
27 bool: True if the command ended successfully, False otherwise.
28 """
29 with Path(os.devnull).open("w") as devnull:
30 return run(command.split(" "), stdout=devnull, stderr=devnull).returncode == 0
33def is_installed(program: str) -> bool:
34 """
35 Checks if a given program is installed on the system.
37 Args:
38 program (str): The name of the program to check.
40 Returns:
41 bool: True if the program is installed, False otherwise.
42 """
43 return run_ok(f"which {program}")
46# FileLike: typing.TypeAlias = PathLike | typing.BinaryIO | typing.TextIO
47# def filelike_to_binaryio(fl: FileLike) -> typing.BinaryIO: ...
50class Compression(abc.ABC):
51 _registrations: dict[tuple[int, str], typing.Type[Self]] = {}
52 extension: str | tuple[str, ...]
54 def __init_subclass__(cls, extension: str | tuple[str, ...] = "", prio: int = 0):
55 if not extension: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true
56 warnings.warn("Defined compression algorithm without extension, it will be ignored.")
58 if isinstance(extension, str):
59 Compression._registrations[(prio, extension)] = cls
60 else:
61 for ext in extension:
62 Compression._registrations[(prio, ext)] = cls
64 cls.extension = extension
66 @abc.abstractmethod
67 def _compress(
68 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, overwrite: bool = True
69 ) -> bool:
70 """
71 Compresses the source file or directory to the target location.
73 Args:
74 source (Path): Path to the source file or directory to compress.
75 target (Path): Path where the compressed file will be saved.
76 level (int, optional): Compression level (1-9), where higher numbers indicate higher compression.
77 Defaults to 5.
78 overwrite (bool, optional): Whether to overwrite the target file if it already exists. Defaults to True.
79 """
81 def compress(
82 self,
83 source: PathLike,
84 target: Optional[PathLike] = None,
85 level: int = DEFAULT_COMPRESSION_LEVEL,
86 overwrite: bool = True,
87 ) -> bool:
88 source = Path(source).expanduser().absolute()
90 if target is None:
91 target = self.filepath(source)
92 # assert target != source, "Please provide a target file to compress to"
93 else:
94 target = Path(target)
96 try:
97 return self._compress(
98 source,
99 target,
100 level=level,
101 overwrite=overwrite,
102 )
103 except Exception as e:
104 print("[red] Something went wrong during compression [/red]")
105 print(e)
106 return False
108 @abc.abstractmethod
109 def _decompress(self, source: Path, target: Path, overwrite: bool = True) -> bool:
110 """
111 Decompresses the source file to the target location.
113 Args:
114 source (str): Path to the compressed file.
115 target (str): Path where the decompressed contents will be saved.
116 overwrite (bool, optional): Whether to overwrite the target files if they already exist. Defaults to True.
117 """
119 def decompress(self, source: PathLike, target: Optional[PathLike] = None, overwrite: bool = True) -> bool:
120 source = Path(source).expanduser().absolute()
122 if target is None and source.suffix in (".tgz", ".tar", ".gz", ".zip"):
123 # strip last extension (e.g. .tgz); retain other extension (.txt)
124 extension = ".".join(source.suffixes[:-1])
125 target = source.with_suffix(f".{extension}" if extension else "")
126 elif target is None:
127 target = source
128 else:
129 target = Path(target)
131 try:
132 return self._decompress(
133 source,
134 target,
135 overwrite=overwrite,
136 )
137 except Exception as e:
138 print("[red] Something went wrong during decompression [/red]")
139 print(e)
140 return False
142 @classmethod
143 @abc.abstractmethod
144 def is_available(cls) -> bool:
145 """
146 Checks if the required compression tool is available.
148 Returns:
149 bool: True if the compression tool is available, False otherwise.
150 """
152 @classmethod
153 def registrations(
154 cls, extension_filter: Optional[str] = None
155 ) -> list[tuple[tuple[int, str], typing.Type["Compression"]]]:
156 return sorted(
157 (
158 (key, CompressionClass)
159 for (key, CompressionClass) in cls._registrations.items()
160 if CompressionClass.is_available() and extension_filter in (None, key[1])
161 ),
162 key=lambda registration: registration[0],
163 reverse=True,
164 )
166 @classmethod
167 def available(cls) -> set[str]:
168 return set([extension for (_, extension) in cls._registrations])
170 @classmethod
171 def best(cls) -> Self | None:
172 """
173 Find the absolute best (by priority) available compression method.
174 """
175 if registrations := cls.registrations(): 175 ↛ 179line 175 didn't jump to line 179 because the condition on line 175 was always true
176 CompressionClass = registrations[0][1] # noqa: N806
177 return typing.cast(Self, CompressionClass())
179 return None
181 @classmethod
182 def for_extension(cls, extension: str) -> Self | None:
183 """
184 Find the best (by priority) available compression method for a specific extension (zip, gz).
185 """
186 if registrations := cls.registrations(extension.strip(".").strip()):
187 CompressionClass = registrations[0][1] # noqa: N806
188 return typing.cast(Self, CompressionClass())
190 return None
192 @classmethod
193 def filepath(cls, filepath: str | Path) -> Path:
194 """
195 Generate an output filepath with the right extension
196 """
197 filepath = Path(filepath)
198 extension = f"{filepath.suffix}.{cls.extension}" if filepath.is_file() else f".{cls.extension}"
199 return filepath.with_suffix(extension)
201 @classmethod
202 def filename(cls, filepath: str | Path) -> str:
203 """
204 Generate an output filename with the right extension
205 """
206 return cls.filepath(filepath).name
209class Nocompression(Compression, extension=("none", "tar"), prio=0):
210 def _compress(
211 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, overwrite: bool = True
212 ) -> bool:
213 if source.is_dir():
214 tar = local["tar"]
215 cmd = tar["-cf", "-", "-C", source.parent, source.name] > str(target)
216 cmd()
217 elif source != target: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 shutil.copyfile(source, target)
219 # else: nothing to do
221 return True
223 def _decompress(self, source: Path, target: Path, overwrite: bool = True) -> bool:
224 if source.suffix == ".tar":
225 tar = local["tar"]
226 cmd = tar["-xvf", source, "--strip-components=1", "-C", target]
227 cmd()
228 elif source != target: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true
229 shutil.copyfile(source, target)
230 # else: nothing to do
232 return True
234 @classmethod
235 def is_available(cls) -> bool:
236 return True
238 @classmethod
239 def filepath(cls, filepath: str | Path) -> Path:
240 filepath = Path(filepath)
241 if filepath.is_dir():
242 return filepath.with_suffix(".tar")
243 else:
244 return filepath
246class Zip(Compression, extension="zip"):
247 def _compress(
248 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, overwrite: bool = True
249 ) -> bool:
250 from zipfile import ZIP_DEFLATED, ZipFile
252 if target.exists() and not overwrite: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 return False
255 with ZipFile(target, "w", compression=ZIP_DEFLATED, compresslevel=level) as zip_object:
256 if source.is_dir():
257 # shutil.make_archive(str(target), "zip", str(source))
258 # Traverse all files in directory
259 for file_path in source.rglob("*"):
260 if file_path.is_file(): 260 ↛ 259line 260 didn't jump to line 259 because the condition on line 260 was always true
261 # Add files to zip file with the correct relative path
262 arcname = file_path.relative_to(source)
263 zip_object.write(file_path, arcname)
264 else:
265 zip_object.write(source, source.name)
267 return True
269 def _decompress(self, source: Path, target: Path, overwrite: bool = True) -> bool:
270 if not source.exists() or not source.is_file(): 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true
271 return False
273 from zipfile import ZipFile
275 with ZipFile(source, "r") as zip_object:
276 namelist = zip_object.namelist()
278 # Check if the archive contains exactly one file
279 if len(namelist) == 1 and not namelist[0].endswith("/"):
280 # The archive contains a single file; treat target as a file
281 first_file = namelist[0]
283 # If the target is a directory, ensure we create the file inside
284 if target.is_dir(): 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true
285 target = target / Path(first_file).name
287 # Handle overwrite behavior
288 if target.exists() and not overwrite: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true
289 return False
291 # Ensure the parent directory exists
292 target.parent.mkdir(parents=True, exist_ok=True)
294 # Extract the single file directly to the target
295 with target.open("wb") as f:
296 f.write(zip_object.read(first_file))
298 else:
299 # Treat target as a directory and extract all files
300 target.mkdir(parents=True, exist_ok=True)
302 for member in namelist:
303 # Resolve full path of the extracted file
304 file_path = target / member
306 # Check if file already exists and handle overwrite
307 if file_path.exists() and not overwrite: 307 ↛ 308line 307 didn't jump to line 308 because the condition on line 307 was never true
308 continue
310 # Ensure parent directories exist
311 file_path.parent.mkdir(parents=True, exist_ok=True)
313 # Extract the file
314 zip_object.extract(member, target)
316 return True
318 @classmethod
319 def is_available(cls) -> bool:
320 try:
321 import zipfile # noqa: F401
323 return True
324 except ImportError:
325 return False
328class Gzip(Compression, extension=("tgz", "gz"), prio=1):
329 def gzip_compress(
330 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, _tar: str = "tar", _gzip: str = "gzip"
331 ) -> bool:
332 """
333 Compress data using gzip.
335 This function compresses data from a source to a target path using the gzip tool.
337 Args:
338 source (Path): Path to the file or data to be compressed.
339 target (Path): Path where the compressed data will be saved.
340 level (int): compression level, where 0 is fastest and 9 is strongest but slowest.
341 Defaults to DEFAULT_COMPRESSION_LEVEL.
342 _tar (str): For internal usage
343 _gzip (str): For internal usage
345 Returns:
346 bool: True if compression was successful, False on any failure.
347 """
348 tar = local[_tar]
349 gzip = local[_gzip]
351 if source.is_dir():
352 # .tar.gz
353 # cmd = tar["-cf", "-", source] | gzip[f"-{level}"] > str(target)
354 # ↑ stores whole path in tar; ↓ stores only folder name
355 cmd = tar["-cf", "-", "-C", source.parent, source.name] | gzip[f"-{level}"] > str(target)
356 else:
357 cmd = gzip[f"-{level}", "-c", source] > str(target)
359 cmd()
360 return True
362 def _compress(
363 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, overwrite: bool = True
364 ) -> bool:
365 if target.exists() and not overwrite: 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true
366 return False
368 try:
369 self.gzip_compress(source, target, level=level)
370 return True
371 except Exception:
372 return False
374 def gzip_decompress(self, source: Path, target: Path, _tar: str = "tar", _gunzip: str = "gunzip") -> bool:
375 """
376 Decompresses a gzipped file and extracts it into the specified target directory.
378 Args:
379 source (Path): The path to the gzipped file.
380 target (Path): The directory to extract the decompressed file(s) to.
382 Returns:
383 bool: True if the decompression and extraction were successful, False otherwise.
384 """
385 gunzip = local[_gunzip]
386 tar = local[_tar]
388 if ".tar" in source.suffixes or ".tgz" in source.suffixes:
389 # tar gz
390 target.mkdir(parents=True, exist_ok=True)
391 cmd = tar["-xvf", source, "--strip-components=1", f"--use-compress-program={_gunzip}", "-C", target]
392 else:
393 # assume just a .gz
394 cmd = gunzip["-c", source] > str(target)
396 cmd()
397 return True
399 def _decompress(self, source: Path, target: Path, overwrite: bool = True) -> bool:
400 if target.exists() and not overwrite: 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true
401 return False
403 self.gzip_decompress(source, target)
404 return True
406 @classmethod
407 def is_available(cls) -> bool:
408 """
409 Check if 'gzip' and 'gunzip' are available in the local context.
411 Returns:
412 bool: The return value is True if 'gzip' and 'gunzip' are found,
413 False otherwise.
414 """
415 try:
416 assert local["gzip"] and local["gunzip"]
417 return True
418 except CommandNotFound:
419 return False
421 @classmethod
422 def filepath(cls, filepath: str | Path) -> Path:
423 """
424 Return a Path object with either '.gz' or '.tgz' appended as file extension based on whether
425 the provided file path is a file or not.
427 Args:
428 filepath (str | Path): The input file path in string or Path format.
430 Returns:
431 Path: The updated file path with appended file extension.
432 """
433 filepath = Path(filepath)
434 extension = f"{filepath.suffix}.gz" if filepath.is_file() else ".tgz"
435 return filepath.with_suffix(extension)
438class Pigz(Gzip, extension=("tgz", "gz"), prio=2):
439 """
440 The Pigz class inherits from the Gzip base class.
442 Its priority is higher than that of the base class, as indicated by the value 2.
444 Pigz (Parallel Implementation of GZip) is a fully functional replacement for gzip
445 that exploits multiple processors and multiple cores to the hilt when compressing data.
446 Pigz can be a good choice when you're handling large amounts of data,
447 and your machine has multiple cores/processors.
449 Advantages of pigz over classic gzip:
450 - Multithreading: Pigz can split the input data into chunks and process them in parallel.
451 This utilizes multiple cores on your machine,
452 leading to faster compression times.
453 - Compatibility: Pigz maintains backward compatibility with gzip, so it can handle any file that gzip can.
454 - Speed: In multi-core systems, pigz can be significantly faster than gzip
455 because of its ability to process different parts of the data simultaneously.
456 """
458 def _compress(
459 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, overwrite: bool = True
460 ) -> bool:
461 if target.exists() and not overwrite: 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true
462 return False
464 self.gzip_compress(source, target, level=level, _gzip="pigz")
465 return True
467 def _decompress(self, source: Path, target: Path, overwrite: bool = True) -> bool:
468 if target.exists() and not overwrite: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true
469 return False
471 self.gzip_decompress(source, target, _gunzip="unpigz")
472 return True
474 @classmethod
475 def is_available(cls) -> bool:
476 """
477 Check if 'pigz' and 'unpigz' commands are available in the local environment.
479 Returns:
480 bool: The return value. True for success, False otherwise.
481 """
482 try:
483 assert local["pigz"] and local["unpigz"]
484 return True
485 except CommandNotFound:
486 return False