Coverage for src/edwh_files_plugin/files_plugin.py: 0%

111 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-28 11:02 +0100

1import json 

2import sys 

3import tempfile 

4import typing 

5from pathlib import Path 

6from typing import Optional 

7 

8import requests 

9from edwh import improved_task as task 

10from invoke import Context 

11 

12# rich.progress is fancier but much slower (100ms import) 

13# so use simpler progress library (also used by pip, before rich): 

14from progress.bar import ChargingBar 

15from requests_toolbelt.multipart.encoder import MultipartEncoder, MultipartEncoderMonitor 

16from rich import print # noqa: A004 

17from threadful import thread 

18from threadful.bonus import animate 

19 

20from edwh_files_plugin.compression import Compression 

21 

22DEFAULT_TRANSFERSH_SERVER = "https://files.edwh.nl" 

23 

24 

25def require_protocol(url: str) -> str: 

26 """ 

27 Make sure 'url' has an HTTP or HTTPS schema. 

28 """ 

29 return url if url.startswith(("http://", "https://")) else f"https://{url}" 

30 

31 

32def create_callback(encoder: MultipartEncoder) -> typing.Callable[[MultipartEncoderMonitor], None]: 

33 """ 

34 Creates a callback function for monitoring the progress of an upload. 

35 

36 Args: 

37 encoder (MultipartEncoder): The multipart encoder that is uploading the data. 

38 

39 Returns: 

40 A callback function that updates a progress bar based on the amount of data that has been read by the encoder. 

41 

42 Example: 

43 

44 import requests 

45 from requests_toolbelt.multipart.encoder import MultipartEncoder, MultipartEncoderMonitor 

46 

47 def my_callback(monitor: MultipartEncoder): 

48 print('bytes sent: {0}'.format(monitor.bytes_read)) 

49 

50 filename = 'my_file.txt' 

51 

52 with open(filename, 'rb') as f: 

53 encoder = MultipartEncoder( 

54 fields={ 

55 'file': (filename, f, "text/plain"), 

56 } 

57 ) 

58 

59 monitor = MultipartEncoderMonitor(encoder, my_callback) 

60 result = requests.post('http://some-url.com/upload', 

61 data=monitor, 

62 headers={'Content-Type': monitor.content_type}) 

63 

64 """ 

65 bar = ChargingBar("Uploading", max=encoder.len) 

66 

67 def callback(monitor: MultipartEncoderMonitor) -> None: 

68 # goto instead of next because chunk size is unknown 

69 bar.goto(monitor.bytes_read) 

70 

71 return callback 

72 

73 

74# auto = best for directory; none for files 

75# gzip: .tgz for directory; .gz for files. Pigz or Gzip based on availability 

76FullCompressionTypes: typing.TypeAlias = typing.Literal["auto", "gzip", "zip", "tgz", "gz", "none"] 

77CliCompressionTypes: typing.TypeAlias = typing.Literal["auto", "gzip", "zip", "none"] 

78 

79 

80def upload_file( 

81 url: str, 

82 filename: str, 

83 filepath: Path, 

84 headers: Optional[dict[str, typing.Any]] = None, 

85 compression: FullCompressionTypes = "auto", 

86) -> requests.Response: 

87 """ 

88 Upload a file to an url. 

89 """ 

90 if headers is None: 

91 headers = {} 

92 

93 with tempfile.TemporaryDirectory() as tmpdir: 

94 if compression != "auto": 

95 new_filepath = Path(tmpdir) / filepath.name 

96 filename = compress_directory( 

97 filepath, new_filepath, extension="gz" if compression == "gzip" else compression 

98 ) 

99 filepath = new_filepath 

100 

101 with filepath.open("rb") as f: 

102 encoder = MultipartEncoder( 

103 fields={ 

104 filename: (filename, f, "text/plain"), 

105 } 

106 ) 

107 

108 monitor = MultipartEncoderMonitor(encoder, create_callback(encoder)) 

109 return requests.post(url, data=monitor, headers=headers | {"Content-Type": monitor.content_type}) 

110 

111 

112@thread() 

113def _compress_directory(dir_path: str | Path, file_path: str | Path, extension: FullCompressionTypes = "auto") -> str: 

114 """ 

115 Compress a directory into a compressed (zip, gz) file. 

116 """ 

117 compressor = Compression.best() if extension == "auto" else Compression.for_extension(extension) 

118 

119 if not compressor: 

120 print(f"[red] No compression available for {extension} [/red]") 

121 print(f"[blue] Please choose one of : {Compression.available()}[/blue]") 

122 raise RuntimeError("Something went wrong during compression!") 

123 

124 if compressor.compress(dir_path, file_path): 

125 return compressor.filename(dir_path) 

126 else: 

127 raise RuntimeError("Something went wrong during compression!") 

128 

129 

130def compress_directory(dir_path: str | Path, file_path: str | Path, extension: FullCompressionTypes = "auto") -> str: 

131 """ 

132 Compress a directory into a compressed file (zip, gz) and show a spinning animation. 

133 """ 

134 return animate(_compress_directory(dir_path, file_path, extension), text=f"Compressing directory {dir_path}") 

135 

136 

137def upload_directory( 

138 url: str, 

139 filepath: Path, 

140 headers: Optional[dict[str, typing.Any]] = None, 

141 upload_filename: Optional[str] = None, 

142 compression: FullCompressionTypes = "auto", 

143) -> requests.Response: 

144 """ 

145 Zip a directory and upload it to an url. 

146 

147 Args: 

148 url: which transfer.sh server to use 

149 filepath: which directory to upload 

150 headers: upload options 

151 upload_filename: by default, the directory name with compression extension (e.g. .gz, .zip) will be used 

152 compression: which method for compression to use (or best available by default) 

153 """ 

154 filepath = filepath.expanduser().absolute() 

155 filename = filepath.resolve().name 

156 

157 with tempfile.TemporaryDirectory() as tmpdir: 

158 archive_path = Path(tmpdir) / filename 

159 compressed_filename = compress_directory( 

160 filepath, archive_path, extension="tgz" if compression == "gzip" else compression 

161 ) # -> filename.zip e.g. 

162 

163 upload_filename = upload_filename or compressed_filename 

164 

165 return upload_file(url, upload_filename, Path(archive_path), headers=headers) 

166 

167 

168@task(aliases=("add", "send")) 

169def upload( 

170 _: Context, 

171 filename: str | Path, 

172 server: str = DEFAULT_TRANSFERSH_SERVER, 

173 max_downloads: Optional[int] = None, 

174 max_days: Optional[int] = None, 

175 encrypt: Optional[str] = None, 

176 rename: Optional[str] = None, 

177 compression: CliCompressionTypes = "auto", # auto | pigz | gzip | zip 

178) -> None: 

179 """ 

180 Upload a file. 

181 

182 Args: 

183 _: invoke Context 

184 filename (str): path to the file to upload 

185 server (str): which transfer.sh server to use 

186 max_downloads (int): how often can the file be downloaded? 

187 max_days (int): how many days can the file be downloaded? 

188 encrypt (str): encryption password 

189 rename (str): upload the file/folder with a different name than it currently has 

190 compression (str): by default files are not compressed. 

191 For folders it will try pigz (.tgz), gzip (.tgz) then .zip. 

192 You can also explicitly specify a compression method for files and directory, 

193 and nothing else will be tried. 

194 """ 

195 headers: dict[str, str | int] = {} 

196 

197 if max_downloads: 

198 headers["Max-Downloads"] = max_downloads 

199 if max_days: 

200 headers["Max-Days"] = max_days 

201 if encrypt: 

202 headers["X-Encrypt-Password"] = encrypt 

203 

204 url = require_protocol(server) 

205 

206 filepath = Path(filename) 

207 

208 try: 

209 if filepath.is_dir(): 

210 response = upload_directory(url, filepath, headers, upload_filename=rename, compression=compression) 

211 else: 

212 response = upload_file(url, rename or str(filename), filepath, headers, compression=compression) 

213 except RuntimeError as e: 

214 print(f"[red] {e} [/red]") 

215 exit(1) 

216 

217 download_url = response.text.strip() 

218 delete_url = response.headers.get("x-url-delete") 

219 

220 print( 

221 json.dumps( 

222 { 

223 "status": response.status_code, 

224 "url": download_url, 

225 "delete": delete_url, 

226 "download_command": f"edwh file.download {download_url}", 

227 "delete_command": f"edwh file.delete {delete_url}", 

228 }, 

229 indent=2, 

230 ), 

231 ) 

232 

233 

234@task(aliases=("get", "receive")) 

235def download( 

236 _: Context, 

237 download_url: str, 

238 output_file: Optional[str | Path] = None, 

239 decrypt: Optional[str] = None, 

240 unpack: bool = False, 

241) -> None: 

242 """ 

243 Download a file. 

244 

245 Args: 

246 _ (Context) 

247 download_url (str): file to download 

248 output_file (str): path to store the file in 

249 decrypt (str): decryption token 

250 unpack (bool): unpack archive to file(s), removing the archive afterwards 

251 """ 

252 if output_file is None: 

253 output_file = download_url.split("/")[-1] 

254 output_path = Path(output_file) 

255 

256 download_url = require_protocol(download_url) 

257 

258 headers = {} 

259 if decrypt: 

260 headers["X-Decrypt-Password"] = decrypt 

261 

262 response = requests.get(download_url, headers=headers, stream=True) 

263 

264 if response.status_code >= 400: 

265 print("[red] Something went wrong: [/red]", response.status_code, response.content.decode(), file=sys.stderr) 

266 return 

267 

268 total = int(response.headers["Content-Length"]) // 1024 

269 with ( 

270 output_path.open("wb") as f, 

271 ): # <- open file when we're sure the status code is successful! 

272 for chunk in ChargingBar("Downloading", max=total).iter(response.iter_content(chunk_size=1024)): 

273 f.write(chunk) 

274 

275 if unpack: 

276 do_unpack(_, str(output_path), remove=True) 

277 

278 

279@task(aliases=("remove",)) 

280def delete(_: Context, deletion_url: str) -> None: 

281 """ 

282 Delete an uploaded file. 

283 

284 Args: 

285 _ (Context) 

286 deletion_url (str): File url + deletion token (from `x-url-delete`, shown in file.upload output) 

287 """ 

288 deletion_url = require_protocol(deletion_url) 

289 

290 response = requests.delete(deletion_url, timeout=15) 

291 

292 print( 

293 { 

294 "status": response.status_code, 

295 "response": response.text.strip(), 

296 } 

297 ) 

298 

299 

300@task(name="unpack") 

301def do_unpack(_: Context, filename: str, remove: bool = False) -> None: 

302 """ 

303 Decompress a given file. 

304 

305 Args: 

306 _: invoke Context 

307 filename (str): Path of the file to be decompressed 

308 remove (bool, optional): If True, original compressed file will be deleted after decompression. 

309 Defaults to False. 

310 

311 Returns: 

312 None 

313 """ 

314 filepath = Path(filename) 

315 ext = filepath.suffix 

316 

317 compressor = Compression.for_extension(ext) 

318 

319 if compressor and compressor.decompress(filepath, filepath.with_suffix("")): 

320 if remove: 

321 filepath.unlink() 

322 else: 

323 print("[red] Something went wrong unpacking! [/red]")