Coverage for src/edwh_files_plugin/compression.py: 81%

217 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-28 11:02 +0100

1import abc 

2import os 

3import shutil 

4import typing 

5import warnings 

6from pathlib import Path 

7from subprocess import run 

8from typing import Optional, Self 

9 

10from plumbum import local 

11from plumbum.commands.processes import CommandNotFound 

12from rich import print # noqa: A004 

13 

14PathLike: typing.TypeAlias = str | Path 

15 

16DEFAULT_COMPRESSION_LEVEL = 5 

17 

18 

19def run_ok(command: str) -> bool: 

20 """ 

21 Executes a command and returns whether it ended successfully (with return code 0). 

22 

23 Args: 

24 command (str): The command to run. 

25 

26 Returns: 

27 bool: True if the command ended successfully, False otherwise. 

28 """ 

29 with Path(os.devnull).open("w") as devnull: 

30 return run(command.split(" "), stdout=devnull, stderr=devnull).returncode == 0 

31 

32 

33def is_installed(program: str) -> bool: 

34 """ 

35 Checks if a given program is installed on the system. 

36 

37 Args: 

38 program (str): The name of the program to check. 

39 

40 Returns: 

41 bool: True if the program is installed, False otherwise. 

42 """ 

43 return run_ok(f"which {program}") 

44 

45 

46# FileLike: typing.TypeAlias = PathLike | typing.BinaryIO | typing.TextIO 

47# def filelike_to_binaryio(fl: FileLike) -> typing.BinaryIO: ... 

48 

49 

50class Compression(abc.ABC): 

51 _registrations: dict[tuple[int, str], typing.Type[Self]] = {} 

52 extension: str | tuple[str, ...] 

53 

54 def __init_subclass__(cls, extension: str | tuple[str, ...] = "", prio: int = 0): 

55 if not extension: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true

56 warnings.warn("Defined compression algorithm without extension, it will be ignored.") 

57 

58 if isinstance(extension, str): 

59 Compression._registrations[(prio, extension)] = cls 

60 else: 

61 for ext in extension: 

62 Compression._registrations[(prio, ext)] = cls 

63 

64 cls.extension = extension 

65 

66 @abc.abstractmethod 

67 def _compress( 

68 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, overwrite: bool = True 

69 ) -> bool: 

70 """ 

71 Compresses the source file or directory to the target location. 

72 

73 Args: 

74 source (Path): Path to the source file or directory to compress. 

75 target (Path): Path where the compressed file will be saved. 

76 level (int, optional): Compression level (1-9), where higher numbers indicate higher compression. 

77 Defaults to 5. 

78 overwrite (bool, optional): Whether to overwrite the target file if it already exists. Defaults to True. 

79 """ 

80 

81 def compress( 

82 self, 

83 source: PathLike, 

84 target: Optional[PathLike] = None, 

85 level: int = DEFAULT_COMPRESSION_LEVEL, 

86 overwrite: bool = True, 

87 ) -> bool: 

88 source = Path(source).expanduser().absolute() 

89 

90 if target is None: 

91 target = self.filepath(source) 

92 # assert target != source, "Please provide a target file to compress to" 

93 else: 

94 target = Path(target) 

95 

96 try: 

97 return self._compress( 

98 source, 

99 target, 

100 level=level, 

101 overwrite=overwrite, 

102 ) 

103 except Exception as e: 

104 print("[red] Something went wrong during compression [/red]") 

105 print(e) 

106 return False 

107 

108 @abc.abstractmethod 

109 def _decompress(self, source: Path, target: Path, overwrite: bool = True) -> bool: 

110 """ 

111 Decompresses the source file to the target location. 

112 

113 Args: 

114 source (str): Path to the compressed file. 

115 target (str): Path where the decompressed contents will be saved. 

116 overwrite (bool, optional): Whether to overwrite the target files if they already exist. Defaults to True. 

117 """ 

118 

119 def decompress(self, source: PathLike, target: Optional[PathLike] = None, overwrite: bool = True) -> bool: 

120 source = Path(source).expanduser().absolute() 

121 

122 if target is None and source.suffix in (".tgz", ".tar", ".gz", ".zip"): 

123 # strip last extension (e.g. .tgz); retain other extension (.txt) 

124 extension = ".".join(source.suffixes[:-1]) 

125 target = source.with_suffix(f".{extension}" if extension else "") 

126 elif target is None: 

127 target = source 

128 else: 

129 target = Path(target) 

130 

131 try: 

132 return self._decompress( 

133 source, 

134 target, 

135 overwrite=overwrite, 

136 ) 

137 except Exception as e: 

138 print("[red] Something went wrong during decompression [/red]") 

139 print(e) 

140 return False 

141 

142 @classmethod 

143 @abc.abstractmethod 

144 def is_available(cls) -> bool: 

145 """ 

146 Checks if the required compression tool is available. 

147 

148 Returns: 

149 bool: True if the compression tool is available, False otherwise. 

150 """ 

151 

152 @classmethod 

153 def registrations( 

154 cls, extension_filter: Optional[str] = None 

155 ) -> list[tuple[tuple[int, str], typing.Type["Compression"]]]: 

156 return sorted( 

157 ( 

158 (key, CompressionClass) 

159 for (key, CompressionClass) in cls._registrations.items() 

160 if CompressionClass.is_available() and extension_filter in (None, key[1]) 

161 ), 

162 key=lambda registration: registration[0], 

163 reverse=True, 

164 ) 

165 

166 @classmethod 

167 def available(cls) -> set[str]: 

168 return set([extension for (_, extension) in cls._registrations]) 

169 

170 @classmethod 

171 def best(cls) -> Self | None: 

172 """ 

173 Find the absolute best (by priority) available compression method. 

174 """ 

175 if registrations := cls.registrations(): 175 ↛ 179line 175 didn't jump to line 179 because the condition on line 175 was always true

176 CompressionClass = registrations[0][1] # noqa: N806 

177 return typing.cast(Self, CompressionClass()) 

178 

179 return None 

180 

181 @classmethod 

182 def for_extension(cls, extension: str) -> Self | None: 

183 """ 

184 Find the best (by priority) available compression method for a specific extension (zip, gz). 

185 """ 

186 if registrations := cls.registrations(extension.strip(".").strip()): 

187 CompressionClass = registrations[0][1] # noqa: N806 

188 return typing.cast(Self, CompressionClass()) 

189 

190 return None 

191 

192 @classmethod 

193 def filepath(cls, filepath: str | Path) -> Path: 

194 """ 

195 Generate an output filepath with the right extension 

196 """ 

197 filepath = Path(filepath) 

198 extension = f"{filepath.suffix}.{cls.extension}" if filepath.is_file() else f".{cls.extension}" 

199 return filepath.with_suffix(extension) 

200 

201 @classmethod 

202 def filename(cls, filepath: str | Path) -> str: 

203 """ 

204 Generate an output filename with the right extension 

205 """ 

206 return cls.filepath(filepath).name 

207 

208 

209class Nocompression(Compression, extension=("none", "tar"), prio=0): 

210 def _compress( 

211 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, overwrite: bool = True 

212 ) -> bool: 

213 if source.is_dir(): 

214 tar = local["tar"] 

215 cmd = tar["-cf", "-", "-C", source.parent, source.name] > str(target) 

216 cmd() 

217 elif source != target: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 shutil.copyfile(source, target) 

219 # else: nothing to do 

220 

221 return True 

222 

223 def _decompress(self, source: Path, target: Path, overwrite: bool = True) -> bool: 

224 if source.suffix == ".tar": 

225 tar = local["tar"] 

226 cmd = tar["-xvf", source, "--strip-components=1", "-C", target] 

227 cmd() 

228 elif source != target: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 shutil.copyfile(source, target) 

230 # else: nothing to do 

231 

232 return True 

233 

234 @classmethod 

235 def is_available(cls) -> bool: 

236 return True 

237 

238 @classmethod 

239 def filepath(cls, filepath: str | Path) -> Path: 

240 filepath = Path(filepath) 

241 if filepath.is_dir(): 

242 return filepath.with_suffix(".tar") 

243 else: 

244 return filepath 

245 

246class Zip(Compression, extension="zip"): 

247 def _compress( 

248 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, overwrite: bool = True 

249 ) -> bool: 

250 from zipfile import ZIP_DEFLATED, ZipFile 

251 

252 if target.exists() and not overwrite: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 return False 

254 

255 with ZipFile(target, "w", compression=ZIP_DEFLATED, compresslevel=level) as zip_object: 

256 if source.is_dir(): 

257 # shutil.make_archive(str(target), "zip", str(source)) 

258 # Traverse all files in directory 

259 for file_path in source.rglob("*"): 

260 if file_path.is_file(): 260 ↛ 259line 260 didn't jump to line 259 because the condition on line 260 was always true

261 # Add files to zip file with the correct relative path 

262 arcname = file_path.relative_to(source) 

263 zip_object.write(file_path, arcname) 

264 else: 

265 zip_object.write(source, source.name) 

266 

267 return True 

268 

269 def _decompress(self, source: Path, target: Path, overwrite: bool = True) -> bool: 

270 if not source.exists() or not source.is_file(): 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true

271 return False 

272 

273 from zipfile import ZipFile 

274 

275 with ZipFile(source, "r") as zip_object: 

276 namelist = zip_object.namelist() 

277 

278 # Check if the archive contains exactly one file 

279 if len(namelist) == 1 and not namelist[0].endswith("/"): 

280 # The archive contains a single file; treat target as a file 

281 first_file = namelist[0] 

282 

283 # If the target is a directory, ensure we create the file inside 

284 if target.is_dir(): 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true

285 target = target / Path(first_file).name 

286 

287 # Handle overwrite behavior 

288 if target.exists() and not overwrite: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true

289 return False 

290 

291 # Ensure the parent directory exists 

292 target.parent.mkdir(parents=True, exist_ok=True) 

293 

294 # Extract the single file directly to the target 

295 with target.open("wb") as f: 

296 f.write(zip_object.read(first_file)) 

297 

298 else: 

299 # Treat target as a directory and extract all files 

300 target.mkdir(parents=True, exist_ok=True) 

301 

302 for member in namelist: 

303 # Resolve full path of the extracted file 

304 file_path = target / member 

305 

306 # Check if file already exists and handle overwrite 

307 if file_path.exists() and not overwrite: 307 ↛ 308line 307 didn't jump to line 308 because the condition on line 307 was never true

308 continue 

309 

310 # Ensure parent directories exist 

311 file_path.parent.mkdir(parents=True, exist_ok=True) 

312 

313 # Extract the file 

314 zip_object.extract(member, target) 

315 

316 return True 

317 

318 @classmethod 

319 def is_available(cls) -> bool: 

320 try: 

321 import zipfile # noqa: F401 

322 

323 return True 

324 except ImportError: 

325 return False 

326 

327 

328class Gzip(Compression, extension=("tgz", "gz"), prio=1): 

329 def gzip_compress( 

330 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, _tar: str = "tar", _gzip: str = "gzip" 

331 ) -> bool: 

332 """ 

333 Compress data using gzip. 

334 

335 This function compresses data from a source to a target path using the gzip tool. 

336 

337 Args: 

338 source (Path): Path to the file or data to be compressed. 

339 target (Path): Path where the compressed data will be saved. 

340 level (int): compression level, where 0 is fastest and 9 is strongest but slowest. 

341 Defaults to DEFAULT_COMPRESSION_LEVEL. 

342 _tar (str): For internal usage 

343 _gzip (str): For internal usage 

344 

345 Returns: 

346 bool: True if compression was successful, False on any failure. 

347 """ 

348 tar = local[_tar] 

349 gzip = local[_gzip] 

350 

351 if source.is_dir(): 

352 # .tar.gz 

353 # cmd = tar["-cf", "-", source] | gzip[f"-{level}"] > str(target) 

354 # ↑ stores whole path in tar; ↓ stores only folder name 

355 cmd = tar["-cf", "-", "-C", source.parent, source.name] | gzip[f"-{level}"] > str(target) 

356 else: 

357 cmd = gzip[f"-{level}", "-c", source] > str(target) 

358 

359 cmd() 

360 return True 

361 

362 def _compress( 

363 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, overwrite: bool = True 

364 ) -> bool: 

365 if target.exists() and not overwrite: 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true

366 return False 

367 

368 try: 

369 self.gzip_compress(source, target, level=level) 

370 return True 

371 except Exception: 

372 return False 

373 

374 def gzip_decompress(self, source: Path, target: Path, _tar: str = "tar", _gunzip: str = "gunzip") -> bool: 

375 """ 

376 Decompresses a gzipped file and extracts it into the specified target directory. 

377 

378 Args: 

379 source (Path): The path to the gzipped file. 

380 target (Path): The directory to extract the decompressed file(s) to. 

381 

382 Returns: 

383 bool: True if the decompression and extraction were successful, False otherwise. 

384 """ 

385 gunzip = local[_gunzip] 

386 tar = local[_tar] 

387 

388 if ".tar" in source.suffixes or ".tgz" in source.suffixes: 

389 # tar gz 

390 target.mkdir(parents=True, exist_ok=True) 

391 cmd = tar["-xvf", source, "--strip-components=1", f"--use-compress-program={_gunzip}", "-C", target] 

392 else: 

393 # assume just a .gz 

394 cmd = gunzip["-c", source] > str(target) 

395 

396 cmd() 

397 return True 

398 

399 def _decompress(self, source: Path, target: Path, overwrite: bool = True) -> bool: 

400 if target.exists() and not overwrite: 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true

401 return False 

402 

403 self.gzip_decompress(source, target) 

404 return True 

405 

406 @classmethod 

407 def is_available(cls) -> bool: 

408 """ 

409 Check if 'gzip' and 'gunzip' are available in the local context. 

410 

411 Returns: 

412 bool: The return value is True if 'gzip' and 'gunzip' are found, 

413 False otherwise. 

414 """ 

415 try: 

416 assert local["gzip"] and local["gunzip"] 

417 return True 

418 except CommandNotFound: 

419 return False 

420 

421 @classmethod 

422 def filepath(cls, filepath: str | Path) -> Path: 

423 """ 

424 Return a Path object with either '.gz' or '.tgz' appended as file extension based on whether 

425 the provided file path is a file or not. 

426 

427 Args: 

428 filepath (str | Path): The input file path in string or Path format. 

429 

430 Returns: 

431 Path: The updated file path with appended file extension. 

432 """ 

433 filepath = Path(filepath) 

434 extension = f"{filepath.suffix}.gz" if filepath.is_file() else ".tgz" 

435 return filepath.with_suffix(extension) 

436 

437 

438class Pigz(Gzip, extension=("tgz", "gz"), prio=2): 

439 """ 

440 The Pigz class inherits from the Gzip base class. 

441 

442 Its priority is higher than that of the base class, as indicated by the value 2. 

443 

444 Pigz (Parallel Implementation of GZip) is a fully functional replacement for gzip 

445 that exploits multiple processors and multiple cores to the hilt when compressing data. 

446 Pigz can be a good choice when you're handling large amounts of data, 

447 and your machine has multiple cores/processors. 

448 

449 Advantages of pigz over classic gzip: 

450 - Multithreading: Pigz can split the input data into chunks and process them in parallel. 

451 This utilizes multiple cores on your machine, 

452 leading to faster compression times. 

453 - Compatibility: Pigz maintains backward compatibility with gzip, so it can handle any file that gzip can. 

454 - Speed: In multi-core systems, pigz can be significantly faster than gzip 

455 because of its ability to process different parts of the data simultaneously. 

456 """ 

457 

458 def _compress( 

459 self, source: Path, target: Path, level: int = DEFAULT_COMPRESSION_LEVEL, overwrite: bool = True 

460 ) -> bool: 

461 if target.exists() and not overwrite: 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true

462 return False 

463 

464 self.gzip_compress(source, target, level=level, _gzip="pigz") 

465 return True 

466 

467 def _decompress(self, source: Path, target: Path, overwrite: bool = True) -> bool: 

468 if target.exists() and not overwrite: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true

469 return False 

470 

471 self.gzip_decompress(source, target, _gunzip="unpigz") 

472 return True 

473 

474 @classmethod 

475 def is_available(cls) -> bool: 

476 """ 

477 Check if 'pigz' and 'unpigz' commands are available in the local environment. 

478 

479 Returns: 

480 bool: The return value. True for success, False otherwise. 

481 """ 

482 try: 

483 assert local["pigz"] and local["unpigz"] 

484 return True 

485 except CommandNotFound: 

486 return False