Coverage for /home/runner/.local/share/hatch/env/virtual/importnb/KA2AwMZG/test.interactive/lib/python3.9/site-packages/importnb/loader.py: 85%
279 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 04:03 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 04:03 +0000
1# coding: utf-8
2"""# `loader` 1cedafb
4the loading machinery for notebooks style documents, and less.
5notebooks combine code, markdown, and raw cells to create a complete document.
6the importnb loader provides an interface for transforming these objects to valid python.
7"""
10import ast 1cedafb
11import inspect 1cedafb
12import re 1cedafb
13import shlex 1cedafb
14import sys 1cedafb
15import textwrap 1cedafb
16from contextlib import contextmanager 1cedafb
17from dataclasses import asdict, dataclass, field 1cedafb
18from functools import partial 1cedafb
19from importlib import _bootstrap as bootstrap 1cedafb
20from importlib import reload 1cedafb
21from importlib._bootstrap import _init_module_attrs, _requires_builtin 1cedafb
22from importlib._bootstrap_external import FileFinder, decode_source 1cedafb
23from importlib.machinery import ModuleSpec, SourceFileLoader 1cedafb
24from importlib.util import LazyLoader, find_spec 1cedafb
25from pathlib import Path 1cedafb
26from types import ModuleType 1cedafb
28from . import get_ipython 1cedafb
29from .decoder import LineCacheNotebookDecoder, quote 1cedafb
30from .docstrings import update_docstring 1cedafb
31from .finder import FileModuleSpec, FuzzyFinder, get_loader_details, get_loader_index 1cedafb
33__all__ = "Notebook", "reload" 1cedafb
35VERSION = sys.version_info.major, sys.version_info.minor 1cedafb
37MAGIC = re.compile(r"^\s*%{2}", re.MULTILINE) 1cedafb
38ALLOW_TOP_LEVEL_AWAIT = getattr(ast, "PyCF_ALLOW_TOP_LEVEL_AWAIT", 0x0) 1cedafb
41def _get_co_flags_set(co_flags): 1cedafb
42 """return a deconstructed set of code flags from a code object."""
43 flags = set() 1tcdanbjolsmuhipkqrvg
44 for i in range(12): 44 ↛ 52line 44 didn't jump to line 52, because the loop on line 44 didn't complete1tcdanbjolsmuhipkqrvg
45 flag = 1 << i 1tcdanbjolsmuhipkqrvg
46 if co_flags & flag: 1tcdanbjolsmuhipkqrvg
47 flags.add(flag) 1tcdanbjolsmuhipkqrvg
48 co_flags ^= flag 1tcdanbjolsmuhipkqrvg
49 if not co_flags: 1tcdanbjolsmuhipkqrvg
50 break 1tcdanbjolsmuhipkqrvg
51 else:
52 flags.intersection_update(flags)
53 return flags 1tcdanbjolsmuhipkqrvg
56class SourceModule(ModuleType): 1cedafb
57 def __fspath__(self): 1cedafb
58 return self.__file__
61@dataclass 1cedafb
62class Interface: 1cedafb
63 """a configuration python importing interface"""
65 name: str = None 1cedafb
66 path: str = None 1cedafb
67 lazy: bool = False 1cedafb
68 extensions: tuple = field(default_factory=[".ipy", ".ipynb"].copy) 1cedafb
69 include_fuzzy_finder: bool = True 1cedafb
70 include_markdown_docstring: bool = True 1cedafb
71 include_non_defs: bool = True 1cedafb
72 include_await: bool = True 1cedafb
73 module_type: ModuleType = field(default=SourceModule) 1cedafb
74 no_magic: bool = False 1cedafb
76 _loader_hook_position: int = field(default=0, repr=False) 1cedafb
78 def __new__(cls, name=None, path=None, **kwargs): 1cedafb
79 kwargs.update(name=name, path=path) 1tcedanbwxjoyzlsmBuhipkqrAvg
80 self = super().__new__(cls) 1tcedanbwxjoyzlsmBuhipkqrAvg
81 self.__init__(**kwargs) 1tcedanbwxjoyzlsmBuhipkqrAvg
82 return self 1tcedanbwxjoyzlsmBuhipkqrAvg
85class Loader(Interface, SourceFileLoader): 1cedafb
86 """The simplest implementation of a Notebook Source File Loader.
87 This class breaks down the loading process into finer steps."""
89 extensions: tuple = field(default_factory=[".py"].copy) 1cedafb
91 @property 1cedafb
92 def loader(self): 1cedafb
93 """generate a new loader based on the state of an existing loader."""
94 loader = type(self) 1anbwxjoyzlsmhipkqrAg
95 if self.lazy: 1anbwxjoyzlsmhipkqrAg
96 loader = LazyLoader.factory(loader) 1m
97 # Strip the leading underscore from slots
98 params = asdict(self) 1anbwxjoyzlsmhipkqrAg
99 params.pop("name") 1anbwxjoyzlsmhipkqrAg
100 params.pop("path") 1anbwxjoyzlsmhipkqrAg
101 return partial(loader, **params) 1anbwxjoyzlsmhipkqrAg
103 @property 1cedafb
104 def finder(self): 1cedafb
105 """generate a new finder based on the state of an existing loader"""
106 return self.include_fuzzy_finder and FuzzyFinder or FileFinder 1anbwxjoyzlsmhipkqrAg
108 def raw_to_source(self, source): 1cedafb
109 """transform a string from a raw file to python source."""
110 if self.path and self.path.endswith(".ipynb"):
111 # when we encounter notebooks we apply different transformers to the diff cell types
112 return LineCacheNotebookDecoder(
113 code=self.code, raw=self.raw, markdown=self.markdown
114 ).decode(source, self.path)
116 # for a normal file we just apply the code transformer.
117 return self.code(source)
119 def source_to_nodes(self, source, path="<unknown>", *, _optimize=-1): 1cedafb
120 """parse source string as python ast"""
121 flags = ast.PyCF_ONLY_AST 1tcdanbjolsmuhipkqrvg
122 return bootstrap._call_with_frames_removed( 1tcdanbjolsmuhipkqrvg
123 compile, source, path, "exec", flags=flags, dont_inherit=True, optimize=_optimize
124 )
126 def nodes_to_code(self, nodes, path="<unknown>", *, _optimize=-1): 1cedafb
127 """compile ast nodes to python code object"""
128 flags = ALLOW_TOP_LEVEL_AWAIT 1tcdanbjolsmuhipkqrvg
129 return bootstrap._call_with_frames_removed( 1tcdanbjolsmuhipkqrvg
130 compile, nodes, path, "exec", flags=flags, dont_inherit=True, optimize=_optimize
131 )
133 def source_to_code(self, source, path="<unknown>", *, _optimize=-1): 1cedafb
134 """tangle python source to compiled code by:
135 1. parsing the source as ast nodes
136 2. compiling the ast nodes as python code"""
137 nodes = self.source_to_nodes(source, path, _optimize=_optimize) 1tcdanbjolsmuhipkqrvg
138 return self.nodes_to_code(nodes, path, _optimize=_optimize) 1tcdanbjolsmuhipkqrvg
140 def get_data(self, path): 1cedafb
141 """get_data injects an input transformation before the raw text.
143 this method allows notebook json to be transformed line for line into vertically sparse python code.
144 """
145 return self.raw_to_source(decode_source(super().get_data(self.path))) 1tcdanbjolsmuhipkqrvg
147 def create_module(self, spec): 1cedafb
148 """an overloaded create_module method injecting fuzzy finder setup up logic."""
149 module = self.module_type(str(spec.name)) 1tcdanbwjolsmuhipkqrvg
150 _init_module_attrs(spec, module) 1tcdanbwjolsmuhipkqrvg
151 if self.name: 151 ↛ 154line 151 didn't jump to line 154, because the condition on line 151 was never false1tcdanbwjolsmuhipkqrvg
152 module.__name__ = self.name 1tcdanbwjolsmuhipkqrvg
154 if module.__file__.endswith((".ipynb", ".ipy")): 1tcdanbwjolsmuhipkqrvg
155 module.get_ipython = get_ipython 1tcdanbjolsmuhipkqrvg
157 if getattr(spec, "alias", None): 1tcdanbwjolsmuhipkqrvg
158 # put a fuzzy spec on the modules to avoid re importing it.
159 # there is a funky trick you do with the fuzzy finder where you
160 # load multiple versions with different finders.
162 sys.modules[spec.alias] = module 1lg
164 return module 1tcdanbwjolsmuhipkqrvg
166 def exec_module(self, module): 1cedafb
167 """Execute the module."""
168 # importlib uses module.__name__, but when running modules as __main__ name will change.
169 # this approach uses the original name on the spec.
170 try: 1tcdanbjolsmuhipkqrvg
171 code = self.get_code(module.__spec__.name) 1tcdanbjolsmuhipkqrvg
173 # from importlib
174 if code is None: 174 ↛ 175line 174 didn't jump to line 175, because the condition on line 174 was never true1tcdanbjolsmuhipkqrvg
175 raise ImportError(
176 f"cannot load module {module.__name__!r} when " "get_code() returns None"
177 )
179 if inspect.CO_COROUTINE not in _get_co_flags_set(code.co_flags): 1tcdanbjolsmuhipkqrvg
180 # if there isn't any async non sense then we proceed with convention.
181 bootstrap._call_with_frames_removed(exec, code, module.__dict__) 1tcdanbjolsmuhipkqrv
182 else:
183 self.aexec_module_sync(module) 1g
185 except BaseException as e:
186 alias = getattr(module.__spec__, "alias", None)
187 if alias:
188 sys.modules.pop(alias, None)
190 raise e
192 def aexec_module_sync(self, module): 1cedafb
193 if "anyio" in sys.modules: 193 ↛ 194line 193 didn't jump to line 194, because the condition on line 193 was never true1g
194 import anyio
196 __import__("anyio").run(self.aexec_module, module)
197 else:
198 from asyncio import get_event_loop 1g
200 get_event_loop().run_until_complete(self.aexec_module(module)) 1g
202 async def aexec_module(self, module): 1cedafb
203 """an async exec_module method permitting top-level await."""
204 # there is so redudancy in this approach, but it starts getting asynchier.
205 nodes = self.source_to_nodes(self.get_data(self.path)) 1g
207 # iterate through the nodes and compile individual statements
208 for node in nodes.body: 1g
209 co = bootstrap._call_with_frames_removed( 1g
210 compile,
211 ast.Module([node], []),
212 module.__file__,
213 "exec",
214 flags=ALLOW_TOP_LEVEL_AWAIT,
215 )
216 if inspect.CO_COROUTINE in _get_co_flags_set(co.co_flags): 1g
217 # when something async is encountered we compile it with the single flag
218 # this lets us use eval to retreive our coroutine.
219 co = bootstrap._call_with_frames_removed( 1g
220 compile,
221 ast.Interactive([node]),
222 module.__file__,
223 "single",
224 flags=ALLOW_TOP_LEVEL_AWAIT,
225 )
226 await bootstrap._call_with_frames_removed( 1g
227 eval, co, module.__dict__, module.__dict__
228 )
229 else:
230 bootstrap._call_with_frames_removed(exec, co, module.__dict__, module.__dict__) 1g
232 def code(self, str): 1cedafb
233 return dedent(str) 1tcedanbjolsmBuhipkqrvg
235 @classmethod 1cedafb
236 @_requires_builtin 1cedafb
237 def is_package(cls, fullname): 1cedafb
238 """Return False as built-in modules are never packages."""
239 if "." not in fullname:
240 return True
241 return super().is_package(fullname)
243 def __enter__(self): 1cedafb
244 path_id, loader_id, details = get_loader_index(".py") 1anbwxjoyzlsmhipkqrAg
245 for _, e in details: 1anbwxjoyzlsmhipkqrAg
246 if all(map(e.__contains__, self.extensions)): 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true1anbwxjoyzlsmhipkqrAg
247 self._loader_hook_position = None
248 return self
249 else:
250 self._loader_hook_position = loader_id + 1 1anbwxjoyzlsmhipkqrAg
251 details.insert(self._loader_hook_position, (self.loader, self.extensions)) 1anbwxjoyzlsmhipkqrAg
252 sys.path_hooks[path_id] = self.finder.path_hook(*details) 1anbwxjoyzlsmhipkqrAg
253 sys.path_importer_cache.clear() 1anbwxjoyzlsmhipkqrAg
254 return self 1anbwxjoyzlsmhipkqrAg
256 def __exit__(self, *excepts): 1cedafb
257 if self._loader_hook_position is not None: 257 ↛ exitline 257 didn't return from function '__exit__', because the condition on line 257 was never false1anbwxjoyzlsmhipkqrAg
258 path_id, details = get_loader_details() 1anbwxjoyzlsmhipkqrAg
259 details.pop(self._loader_hook_position) 1anbwxjoyzlsmhipkqrAg
260 sys.path_hooks[path_id] = self.finder.path_hook(*details) 1anbwxjoyzlsmhipkqrAg
261 sys.path_importer_cache.clear() 1anbwxjoyzlsmhipkqrAg
263 @classmethod 1cedafb
264 def load_file(cls, filename, main=True, **kwargs): 1cedafb
265 """Import a notebook as a module from a filename.
267 dir: The directory to load the file from.
268 main: Load the module in the __main__ context.
270 >>> assert Notebook.load_file('foo.ipynb')
271 """
272 name = main and "__main__" or filename 1tcdbuv
273 loader = cls(name, str(filename), **kwargs) 1tcdbuv
274 spec = FileModuleSpec(name, loader, origin=loader.path) 1tcdbuv
275 module = loader.create_module(spec) 1tcdbuv
276 loader.exec_module(module) 1tcdbuv
277 return module 1tcdbuv
279 @classmethod 1cedafb
280 def load_module(cls, module, main=False, **kwargs): 1cedafb
281 """Import a notebook as a module.
283 main: Load the module in the __main__ context.
285 >>> assert Notebook.load_module('foo')
286 """
287 from runpy import _run_module_as_main, run_module 1ahi
289 with cls() as loader: 1ahi
290 spec = find_spec(module) 1ahi
291 module = spec.loader.create_module(spec) 1ahi
292 if main: 1ahi
293 sys.modules["__main__"] = module 1a
294 module.__name__ = "__main__" 1a
295 spec.loader.exec_module(module) 1ahi
296 return module 1ahi
298 @classmethod 1cedafb
299 def load_argv(cls, argv=None, *, parser=None): 1cedafb
300 """load a module based on python arguments
302 load a notebook from its file name
303 >>> Notebook.load_argv("foo.ipynb --arg abc")
305 load the same notebook from a module alias.
306 >>> Notebook.load_argv("-m foo --arg abc")
307 """
308 if parser is None: 308 ↛ 311line 308 didn't jump to line 311, because the condition on line 308 was never false1cedafb
309 parser = cls.get_argparser() 1cedafb
311 if argv is None: 311 ↛ 316line 311 didn't jump to line 316, because the condition on line 311 was never false1cedafb
312 from sys import argv 1cedafb
314 argv = argv[1:] 1cedafb
316 if isinstance(argv, str): 316 ↛ 317line 316 didn't jump to line 317, because the condition on line 316 was never true1cedafb
317 argv = shlex.split(argv)
319 module = cls.load_ns(parser.parse_args(argv)) 1cedafb
320 if module is None: 1cedafb
321 return parser.print_help() 1f
323 return module 1cedab
325 @classmethod 1cedafb
326 def load_ns(cls, ns): 1cedafb
327 """load a module from a namespace, used when loading module from sys.argv parameters."""
328 if ns.tasks: 1cedafb
329 # i don't quite why we need to do this here, but we do. so don't move it
330 from doit.cmd_base import ModuleTaskLoader 1c
331 from doit.doit_cmd import DoitMain 1c
333 if ns.code: 1cedafb
334 with main_argv(sys.argv[0], ns.args): 1e
335 result = cls.load_code(ns.code) 1e
336 elif ns.module: 1cdafb
337 if ns.dir: 337 ↛ 340line 337 didn't jump to line 340, because the condition on line 337 was never false1a
338 if ns.dir not in sys.path: 338 ↛ 344line 338 didn't jump to line 344, because the condition on line 338 was never false1a
339 sys.path.insert(0, ns.dir) 1a
340 elif "" in sys.path:
341 pass
342 else:
343 sys.path.insert(0, "")
344 with main_argv(ns.module, ns.args): 1a
345 result = cls.load_module(ns.module, main=True) 1a
346 elif ns.file: 1cdfb
347 where = Path(ns.dir, ns.file) if ns.dir else Path(ns.file) 1cdb
348 with main_argv(str(where), ns.args): 1cdb
349 result = cls.load_file(ns.file) 1cdb
350 else:
351 return 1f
352 if ns.tasks: 1cedab
353 DoitMain(ModuleTaskLoader(result)).run(ns.args or ["help"]) 1c
354 return result 1cedab
356 @classmethod 1cedafb
357 def load_code(cls, code, argv=None, mod_name=None, script_name=None, main=False): 1cedafb
358 """load a module from raw source code"""
360 from runpy import _run_module_code 1eB
362 self = cls() 1eB
363 name = main and "__main__" or mod_name or "<raw code>" 1eB
365 return _dict_module( 1eB
366 _run_module_code(self.raw_to_source(code), mod_name=name, script_name=script_name)
367 )
369 @staticmethod 1cedafb
370 def get_argparser(parser=None): 1cedafb
371 from argparse import REMAINDER, ArgumentParser 1cedafb
373 if parser is None: 373 ↛ 375line 373 didn't jump to line 375, because the condition on line 373 was never false1cedafb
374 parser = ArgumentParser("importnb", description="run notebooks as python code") 1cedafb
375 parser.add_argument("file", nargs="?", help="run a file") 1cedafb
376 parser.add_argument("args", nargs=REMAINDER, help="arguments to pass to script") 1cedafb
377 parser.add_argument("-m", "--module", help="run a module") 1cedafb
378 parser.add_argument("-c", "--code", help="run raw code") 1cedafb
379 parser.add_argument("-d", "--dir", help="path to run script in") 1cedafb
380 parser.add_argument("-t", "--tasks", action="store_true", help="run doit tasks") 1cedafb
381 return parser 1cedafb
384def comment(str): 1cedafb
385 return textwrap.indent(str, "# ") 1k
388class DefsOnly(ast.NodeTransformer): 1cedafb
389 INCLUDE = ast.Import, ast.ImportFrom, ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef 1cedafb
391 def visit_Module(self, node): 1cedafb
392 args = ([x for x in node.body if isinstance(x, self.INCLUDE)],) 1j
393 if VERSION >= (3, 8): 393 ↛ 395line 393 didn't jump to line 395, because the condition on line 393 was never false1j
394 args += (node.type_ignores,) 1j
395 return ast.Module(*args) 1j
398class Notebook(Loader): 1cedafb
399 """Notebook is a user friendly file finder and module loader for notebook source code.
401 > Remember, restart and run all or it didn't happen.
403 Notebook provides several useful options.
405 * Lazy module loading. A module is executed the first time it is used in a script.
406 """
408 def markdown(self, str): 1cedafb
409 return quote(str) 1tcdanbjolmuhipkqrvg
411 def raw(self, str): 1cedafb
412 return comment(str)
414 def visit(self, nodes): 1cedafb
415 if self.include_non_defs: 1tcdanbjolsmuhipkqrvg
416 return nodes 1tcdanbolsmuhipkqrvg
417 return DefsOnly().visit(nodes) 1j
419 def code(self, str): 1cedafb
420 if self.no_magic: 1tcedanbjolsmBuhipkqrvg
421 if MAGIC.match(str): 1k
422 return comment(str) 1k
423 return super().code(str) 1tcedanbjolsmBuhipkqrvg
425 def source_to_nodes(self, source, path="<unknown>", *, _optimize=-1): 1cedafb
426 nodes = super().source_to_nodes(source, path) 1tcdanbjolsmuhipkqrvg
427 if self.include_markdown_docstring: 427 ↛ 429line 427 didn't jump to line 429, because the condition on line 427 was never false1tcdanbjolsmuhipkqrvg
428 nodes = update_docstring(nodes) 1tcdanbjolsmuhipkqrvg
429 nodes = self.visit(nodes) 1tcdanbjolsmuhipkqrvg
430 return ast.fix_missing_locations(nodes) 1tcdanbjolsmuhipkqrvg
432 def raw_to_source(self, source): 1cedafb
433 """transform a string from a raw file to python source."""
434 if self.path and self.path.endswith(".ipynb"): 1tcedanbjolsmBuhipkqrvg
435 # when we encounter notebooks we apply different transformers to the diff cell types
436 return LineCacheNotebookDecoder( 1tcdanbjolmuhipkqrvg
437 code=self.code, raw=self.raw, markdown=self.markdown
438 ).decode(source, self.path)
440 # for a normal file we just apply the code transformer.
441 return self.code(source) 1esB
444def _dict_module(ns): 1cedafb
445 m = ModuleType(ns.get("__name__"), ns.get("__doc__")) 1eB
446 m.__dict__.update(ns) 1eB
447 return m 1eB
450@contextmanager 1cedafb
451def main_argv(prog, args=None): 1cedafb
452 if args is not None: 452 ↛ 455line 452 didn't jump to line 455, because the condition on line 452 was never false1cedab
453 args = [prog] + list(args) 1cedab
454 prior, sys.argv = sys.argv, args 1cedab
455 yield 1cedab
456 if args is not None: 456 ↛ exitline 456 didn't return from function 'main_argv', because the condition on line 456 was never false1cedab
457 sys.argv = prior 1cedab
460try: 1cedafb
461 import IPython 1cedafb
462 from IPython.core.inputsplitter import IPythonInputSplitter 1cedafb
464 dedent = IPythonInputSplitter( 1cedafb
465 line_input_checker=False,
466 physical_line_transforms=[
467 IPython.core.inputsplitter.leading_indent(),
468 IPython.core.inputsplitter.ipy_prompt(),
469 IPython.core.inputsplitter.cellmagic(end_on_blank_line=False),
470 ],
471 ).transform_cell
472except ModuleNotFoundError:
474 def dedent(body):
475 from textwrap import dedent, indent
477 if MAGIC.match(body):
478 return indent(body, "# ")
479 return dedent(body)