Coverage for src/edwh_files_plugin/files_plugin.py: 0%
111 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-28 11:02 +0100
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-28 11:02 +0100
1import json
2import sys
3import tempfile
4import typing
5from pathlib import Path
6from typing import Optional
8import requests
9from edwh import improved_task as task
10from invoke import Context
12# rich.progress is fancier but much slower (100ms import)
13# so use simpler progress library (also used by pip, before rich):
14from progress.bar import ChargingBar
15from requests_toolbelt.multipart.encoder import MultipartEncoder, MultipartEncoderMonitor
16from rich import print # noqa: A004
17from threadful import thread
18from threadful.bonus import animate
20from edwh_files_plugin.compression import Compression
22DEFAULT_TRANSFERSH_SERVER = "https://files.edwh.nl"
25def require_protocol(url: str) -> str:
26 """
27 Make sure 'url' has an HTTP or HTTPS schema.
28 """
29 return url if url.startswith(("http://", "https://")) else f"https://{url}"
32def create_callback(encoder: MultipartEncoder) -> typing.Callable[[MultipartEncoderMonitor], None]:
33 """
34 Creates a callback function for monitoring the progress of an upload.
36 Args:
37 encoder (MultipartEncoder): The multipart encoder that is uploading the data.
39 Returns:
40 A callback function that updates a progress bar based on the amount of data that has been read by the encoder.
42 Example:
44 import requests
45 from requests_toolbelt.multipart.encoder import MultipartEncoder, MultipartEncoderMonitor
47 def my_callback(monitor: MultipartEncoder):
48 print('bytes sent: {0}'.format(monitor.bytes_read))
50 filename = 'my_file.txt'
52 with open(filename, 'rb') as f:
53 encoder = MultipartEncoder(
54 fields={
55 'file': (filename, f, "text/plain"),
56 }
57 )
59 monitor = MultipartEncoderMonitor(encoder, my_callback)
60 result = requests.post('http://some-url.com/upload',
61 data=monitor,
62 headers={'Content-Type': monitor.content_type})
64 """
65 bar = ChargingBar("Uploading", max=encoder.len)
67 def callback(monitor: MultipartEncoderMonitor) -> None:
68 # goto instead of next because chunk size is unknown
69 bar.goto(monitor.bytes_read)
71 return callback
74# auto = best for directory; none for files
75# gzip: .tgz for directory; .gz for files. Pigz or Gzip based on availability
76FullCompressionTypes: typing.TypeAlias = typing.Literal["auto", "gzip", "zip", "tgz", "gz", "none"]
77CliCompressionTypes: typing.TypeAlias = typing.Literal["auto", "gzip", "zip", "none"]
80def upload_file(
81 url: str,
82 filename: str,
83 filepath: Path,
84 headers: Optional[dict[str, typing.Any]] = None,
85 compression: FullCompressionTypes = "auto",
86) -> requests.Response:
87 """
88 Upload a file to an url.
89 """
90 if headers is None:
91 headers = {}
93 with tempfile.TemporaryDirectory() as tmpdir:
94 if compression != "auto":
95 new_filepath = Path(tmpdir) / filepath.name
96 filename = compress_directory(
97 filepath, new_filepath, extension="gz" if compression == "gzip" else compression
98 )
99 filepath = new_filepath
101 with filepath.open("rb") as f:
102 encoder = MultipartEncoder(
103 fields={
104 filename: (filename, f, "text/plain"),
105 }
106 )
108 monitor = MultipartEncoderMonitor(encoder, create_callback(encoder))
109 return requests.post(url, data=monitor, headers=headers | {"Content-Type": monitor.content_type})
112@thread()
113def _compress_directory(dir_path: str | Path, file_path: str | Path, extension: FullCompressionTypes = "auto") -> str:
114 """
115 Compress a directory into a compressed (zip, gz) file.
116 """
117 compressor = Compression.best() if extension == "auto" else Compression.for_extension(extension)
119 if not compressor:
120 print(f"[red] No compression available for {extension} [/red]")
121 print(f"[blue] Please choose one of : {Compression.available()}[/blue]")
122 raise RuntimeError("Something went wrong during compression!")
124 if compressor.compress(dir_path, file_path):
125 return compressor.filename(dir_path)
126 else:
127 raise RuntimeError("Something went wrong during compression!")
130def compress_directory(dir_path: str | Path, file_path: str | Path, extension: FullCompressionTypes = "auto") -> str:
131 """
132 Compress a directory into a compressed file (zip, gz) and show a spinning animation.
133 """
134 return animate(_compress_directory(dir_path, file_path, extension), text=f"Compressing directory {dir_path}")
137def upload_directory(
138 url: str,
139 filepath: Path,
140 headers: Optional[dict[str, typing.Any]] = None,
141 upload_filename: Optional[str] = None,
142 compression: FullCompressionTypes = "auto",
143) -> requests.Response:
144 """
145 Zip a directory and upload it to an url.
147 Args:
148 url: which transfer.sh server to use
149 filepath: which directory to upload
150 headers: upload options
151 upload_filename: by default, the directory name with compression extension (e.g. .gz, .zip) will be used
152 compression: which method for compression to use (or best available by default)
153 """
154 filepath = filepath.expanduser().absolute()
155 filename = filepath.resolve().name
157 with tempfile.TemporaryDirectory() as tmpdir:
158 archive_path = Path(tmpdir) / filename
159 compressed_filename = compress_directory(
160 filepath, archive_path, extension="tgz" if compression == "gzip" else compression
161 ) # -> filename.zip e.g.
163 upload_filename = upload_filename or compressed_filename
165 return upload_file(url, upload_filename, Path(archive_path), headers=headers)
168@task(aliases=("add", "send"))
169def upload(
170 _: Context,
171 filename: str | Path,
172 server: str = DEFAULT_TRANSFERSH_SERVER,
173 max_downloads: Optional[int] = None,
174 max_days: Optional[int] = None,
175 encrypt: Optional[str] = None,
176 rename: Optional[str] = None,
177 compression: CliCompressionTypes = "auto", # auto | pigz | gzip | zip
178) -> None:
179 """
180 Upload a file.
182 Args:
183 _: invoke Context
184 filename (str): path to the file to upload
185 server (str): which transfer.sh server to use
186 max_downloads (int): how often can the file be downloaded?
187 max_days (int): how many days can the file be downloaded?
188 encrypt (str): encryption password
189 rename (str): upload the file/folder with a different name than it currently has
190 compression (str): by default files are not compressed.
191 For folders it will try pigz (.tgz), gzip (.tgz) then .zip.
192 You can also explicitly specify a compression method for files and directory,
193 and nothing else will be tried.
194 """
195 headers: dict[str, str | int] = {}
197 if max_downloads:
198 headers["Max-Downloads"] = max_downloads
199 if max_days:
200 headers["Max-Days"] = max_days
201 if encrypt:
202 headers["X-Encrypt-Password"] = encrypt
204 url = require_protocol(server)
206 filepath = Path(filename)
208 try:
209 if filepath.is_dir():
210 response = upload_directory(url, filepath, headers, upload_filename=rename, compression=compression)
211 else:
212 response = upload_file(url, rename or str(filename), filepath, headers, compression=compression)
213 except RuntimeError as e:
214 print(f"[red] {e} [/red]")
215 exit(1)
217 download_url = response.text.strip()
218 delete_url = response.headers.get("x-url-delete")
220 print(
221 json.dumps(
222 {
223 "status": response.status_code,
224 "url": download_url,
225 "delete": delete_url,
226 "download_command": f"edwh file.download {download_url}",
227 "delete_command": f"edwh file.delete {delete_url}",
228 },
229 indent=2,
230 ),
231 )
234@task(aliases=("get", "receive"))
235def download(
236 _: Context,
237 download_url: str,
238 output_file: Optional[str | Path] = None,
239 decrypt: Optional[str] = None,
240 unpack: bool = False,
241) -> None:
242 """
243 Download a file.
245 Args:
246 _ (Context)
247 download_url (str): file to download
248 output_file (str): path to store the file in
249 decrypt (str): decryption token
250 unpack (bool): unpack archive to file(s), removing the archive afterwards
251 """
252 if output_file is None:
253 output_file = download_url.split("/")[-1]
254 output_path = Path(output_file)
256 download_url = require_protocol(download_url)
258 headers = {}
259 if decrypt:
260 headers["X-Decrypt-Password"] = decrypt
262 response = requests.get(download_url, headers=headers, stream=True)
264 if response.status_code >= 400:
265 print("[red] Something went wrong: [/red]", response.status_code, response.content.decode(), file=sys.stderr)
266 return
268 total = int(response.headers["Content-Length"]) // 1024
269 with (
270 output_path.open("wb") as f,
271 ): # <- open file when we're sure the status code is successful!
272 for chunk in ChargingBar("Downloading", max=total).iter(response.iter_content(chunk_size=1024)):
273 f.write(chunk)
275 if unpack:
276 do_unpack(_, str(output_path), remove=True)
279@task(aliases=("remove",))
280def delete(_: Context, deletion_url: str) -> None:
281 """
282 Delete an uploaded file.
284 Args:
285 _ (Context)
286 deletion_url (str): File url + deletion token (from `x-url-delete`, shown in file.upload output)
287 """
288 deletion_url = require_protocol(deletion_url)
290 response = requests.delete(deletion_url, timeout=15)
292 print(
293 {
294 "status": response.status_code,
295 "response": response.text.strip(),
296 }
297 )
300@task(name="unpack")
301def do_unpack(_: Context, filename: str, remove: bool = False) -> None:
302 """
303 Decompress a given file.
305 Args:
306 _: invoke Context
307 filename (str): Path of the file to be decompressed
308 remove (bool, optional): If True, original compressed file will be deleted after decompression.
309 Defaults to False.
311 Returns:
312 None
313 """
314 filepath = Path(filename)
315 ext = filepath.suffix
317 compressor = Compression.for_extension(ext)
319 if compressor and compressor.decompress(filepath, filepath.with_suffix("")):
320 if remove:
321 filepath.unlink()
322 else:
323 print("[red] Something went wrong unpacking! [/red]")