Coverage for /home/runner/.local/share/hatch/env/virtual/importnb/KA2AwMZG/test.stdlib/lib/python3.9/site-packages/importnb/loader.py: 87%
279 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 04:02 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-02 04:02 +0000
1# coding: utf-8
2"""# `loader` 1bdcae
4the loading machinery for notebooks style documents, and less.
5notebooks combine code, markdown, and raw cells to create a complete document.
6the importnb loader provides an interface for transforming these objects to valid python.
7"""
10import ast 1bdcae
11import inspect 1bdcae
12import re 1bdcae
13import shlex 1bdcae
14import sys 1bdcae
15import textwrap 1bdcae
16from contextlib import contextmanager 1bdcae
17from dataclasses import asdict, dataclass, field 1bdcae
18from functools import partial 1bdcae
19from importlib import _bootstrap as bootstrap 1bdcae
20from importlib import reload 1bdcae
21from importlib._bootstrap import _init_module_attrs, _requires_builtin 1bdcae
22from importlib._bootstrap_external import FileFinder, decode_source 1bdcae
23from importlib.machinery import ModuleSpec, SourceFileLoader 1bdcae
24from importlib.util import LazyLoader, find_spec 1bdcae
25from pathlib import Path 1bdcae
26from types import ModuleType 1bdcae
28from . import get_ipython 1bdcae
29from .decoder import LineCacheNotebookDecoder, quote 1bdcae
30from .docstrings import update_docstring 1bdcae
31from .finder import FileModuleSpec, FuzzyFinder, get_loader_details, get_loader_index 1bdcae
33__all__ = "Notebook", "reload" 1bdcae
35VERSION = sys.version_info.major, sys.version_info.minor 1bdcae
37MAGIC = re.compile(r"^\s*%{2}", re.MULTILINE) 1bdcae
38ALLOW_TOP_LEVEL_AWAIT = getattr(ast, "PyCF_ALLOW_TOP_LEVEL_AWAIT", 0x0) 1bdcae
41def _get_co_flags_set(co_flags): 1bdcae
42 """return a deconstructed set of code flags from a code object."""
43 flags = set() 1rbcaminklsghojpqtf
44 for i in range(12): 44 ↛ 52line 44 didn't jump to line 52, because the loop on line 44 didn't complete1rbcaminklsghojpqtf
45 flag = 1 << i 1rbcaminklsghojpqtf
46 if co_flags & flag: 1rbcaminklsghojpqtf
47 flags.add(flag) 1rbcaminklsghojpqtf
48 co_flags ^= flag 1rbcaminklsghojpqtf
49 if not co_flags: 1rbcaminklsghojpqtf
50 break 1rbcaminklsghojpqtf
51 else:
52 flags.intersection_update(flags)
53 return flags 1rbcaminklsghojpqtf
56class SourceModule(ModuleType): 1bdcae
57 def __fspath__(self): 1bdcae
58 return self.__file__
61@dataclass 1bdcae
62class Interface: 1bdcae
63 """a configuration python importing interface"""
65 name: str = None 1bdcae
66 path: str = None 1bdcae
67 lazy: bool = False 1bdcae
68 extensions: tuple = field(default_factory=[".ipy", ".ipynb"].copy) 1bdcae
69 include_fuzzy_finder: bool = True 1bdcae
70 include_markdown_docstring: bool = True 1bdcae
71 include_non_defs: bool = True 1bdcae
72 include_await: bool = True 1bdcae
73 module_type: ModuleType = field(default=SourceModule) 1bdcae
74 no_magic: bool = False 1bdcae
76 _loader_hook_position: int = field(default=0, repr=False) 1bdcae
78 def __new__(cls, name=None, path=None, **kwargs): 1bdcae
79 kwargs.update(name=name, path=path) 1rbdcamuvinwxklzsghojpqytf
80 self = super().__new__(cls) 1rbdcamuvinwxklzsghojpqytf
81 self.__init__(**kwargs) 1rbdcamuvinwxklzsghojpqytf
82 return self 1rbdcamuvinwxklzsghojpqytf
85class Loader(Interface, SourceFileLoader): 1bdcae
86 """The simplest implementation of a Notebook Source File Loader.
87 This class breaks down the loading process into finer steps."""
89 extensions: tuple = field(default_factory=[".py"].copy) 1bdcae
91 @property 1bdcae
92 def loader(self): 1bdcae
93 """generate a new loader based on the state of an existing loader."""
94 loader = type(self) 1amuvinwxklghojpqyf
95 if self.lazy: 1amuvinwxklghojpqyf
96 loader = LazyLoader.factory(loader) 1l
97 # Strip the leading underscore from slots
98 params = asdict(self) 1amuvinwxklghojpqyf
99 params.pop("name") 1amuvinwxklghojpqyf
100 params.pop("path") 1amuvinwxklghojpqyf
101 return partial(loader, **params) 1amuvinwxklghojpqyf
103 @property 1bdcae
104 def finder(self): 1bdcae
105 """generate a new finder based on the state of an existing loader"""
106 return self.include_fuzzy_finder and FuzzyFinder or FileFinder 1amuvinwxklghojpqyf
108 def raw_to_source(self, source): 1bdcae
109 """transform a string from a raw file to python source."""
110 if self.path and self.path.endswith(".ipynb"):
111 # when we encounter notebooks we apply different transformers to the diff cell types
112 return LineCacheNotebookDecoder(
113 code=self.code, raw=self.raw, markdown=self.markdown
114 ).decode(source, self.path)
116 # for a normal file we just apply the code transformer.
117 return self.code(source)
119 def source_to_nodes(self, source, path="<unknown>", *, _optimize=-1): 1bdcae
120 """parse source string as python ast"""
121 flags = ast.PyCF_ONLY_AST 1rbcaminklsghojpqtf
122 return bootstrap._call_with_frames_removed( 1rbcaminklsghojpqtf
123 compile, source, path, "exec", flags=flags, dont_inherit=True, optimize=_optimize
124 )
126 def nodes_to_code(self, nodes, path="<unknown>", *, _optimize=-1): 1bdcae
127 """compile ast nodes to python code object"""
128 flags = ALLOW_TOP_LEVEL_AWAIT 1rbcaminklsghojpqtf
129 return bootstrap._call_with_frames_removed( 1rbcaminklsghojpqtf
130 compile, nodes, path, "exec", flags=flags, dont_inherit=True, optimize=_optimize
131 )
133 def source_to_code(self, source, path="<unknown>", *, _optimize=-1): 1bdcae
134 """tangle python source to compiled code by:
135 1. parsing the source as ast nodes
136 2. compiling the ast nodes as python code"""
137 nodes = self.source_to_nodes(source, path, _optimize=_optimize) 1rbcaminklsghojpqtf
138 return self.nodes_to_code(nodes, path, _optimize=_optimize) 1rbcaminklsghojpqtf
140 def get_data(self, path): 1bdcae
141 """get_data injects an input transformation before the raw text.
143 this method allows notebook json to be transformed line for line into vertically sparse python code.
144 """
145 return self.raw_to_source(decode_source(super().get_data(self.path))) 1rbcaminklsghojpqtf
147 def create_module(self, spec): 1bdcae
148 """an overloaded create_module method injecting fuzzy finder setup up logic."""
149 module = self.module_type(str(spec.name)) 1rbcamuinklsghojpqtf
150 _init_module_attrs(spec, module) 1rbcamuinklsghojpqtf
151 if self.name: 151 ↛ 154line 151 didn't jump to line 154, because the condition on line 151 was never false1rbcamuinklsghojpqtf
152 module.__name__ = self.name 1rbcamuinklsghojpqtf
154 if module.__file__.endswith((".ipynb", ".ipy")): 1rbcamuinklsghojpqtf
155 module.get_ipython = get_ipython 1rbcaminklsghojpqtf
157 if getattr(spec, "alias", None): 1rbcamuinklsghojpqtf
158 # put a fuzzy spec on the modules to avoid re importing it.
159 # there is a funky trick you do with the fuzzy finder where you
160 # load multiple versions with different finders.
162 sys.modules[spec.alias] = module 1kf
164 return module 1rbcamuinklsghojpqtf
166 def exec_module(self, module): 1bdcae
167 """Execute the module."""
168 # importlib uses module.__name__, but when running modules as __main__ name will change.
169 # this approach uses the original name on the spec.
170 try: 1rbcaminklsghojpqtf
171 code = self.get_code(module.__spec__.name) 1rbcaminklsghojpqtf
173 # from importlib
174 if code is None: 174 ↛ 175line 174 didn't jump to line 175, because the condition on line 174 was never true1rbcaminklsghojpqtf
175 raise ImportError(
176 f"cannot load module {module.__name__!r} when " "get_code() returns None"
177 )
179 if inspect.CO_COROUTINE not in _get_co_flags_set(code.co_flags): 1rbcaminklsghojpqtf
180 # if there isn't any async non sense then we proceed with convention.
181 bootstrap._call_with_frames_removed(exec, code, module.__dict__) 1rbcaminklsghojpqt
182 else:
183 self.aexec_module_sync(module) 1f
185 except BaseException as e:
186 alias = getattr(module.__spec__, "alias", None)
187 if alias:
188 sys.modules.pop(alias, None)
190 raise e
192 def aexec_module_sync(self, module): 1bdcae
193 if "anyio" in sys.modules: 193 ↛ 194line 193 didn't jump to line 194, because the condition on line 193 was never true1f
194 import anyio
196 __import__("anyio").run(self.aexec_module, module)
197 else:
198 from asyncio import get_event_loop 1f
200 get_event_loop().run_until_complete(self.aexec_module(module)) 1f
202 async def aexec_module(self, module): 1bdcae
203 """an async exec_module method permitting top-level await."""
204 # there is so redudancy in this approach, but it starts getting asynchier.
205 nodes = self.source_to_nodes(self.get_data(self.path)) 1f
207 # iterate through the nodes and compile individual statements
208 for node in nodes.body: 1f
209 co = bootstrap._call_with_frames_removed( 1f
210 compile,
211 ast.Module([node], []),
212 module.__file__,
213 "exec",
214 flags=ALLOW_TOP_LEVEL_AWAIT,
215 )
216 if inspect.CO_COROUTINE in _get_co_flags_set(co.co_flags): 1f
217 # when something async is encountered we compile it with the single flag
218 # this lets us use eval to retreive our coroutine.
219 co = bootstrap._call_with_frames_removed( 1f
220 compile,
221 ast.Interactive([node]),
222 module.__file__,
223 "single",
224 flags=ALLOW_TOP_LEVEL_AWAIT,
225 )
226 await bootstrap._call_with_frames_removed( 1f
227 eval, co, module.__dict__, module.__dict__
228 )
229 else:
230 bootstrap._call_with_frames_removed(exec, co, module.__dict__, module.__dict__) 1f
232 def code(self, str): 1bdcae
233 return dedent(str) 1rbdcaminklzsghojpqtf
235 @classmethod 1bdcae
236 @_requires_builtin 1bdcae
237 def is_package(cls, fullname): 1bdcae
238 """Return False as built-in modules are never packages."""
239 if "." not in fullname:
240 return True
241 return super().is_package(fullname)
243 def __enter__(self): 1bdcae
244 path_id, loader_id, details = get_loader_index(".py") 1amuvinwxklghojpqyf
245 for _, e in details: 1amuvinwxklghojpqyf
246 if all(map(e.__contains__, self.extensions)): 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true1amuvinwxklghojpqyf
247 self._loader_hook_position = None
248 return self
249 else:
250 self._loader_hook_position = loader_id + 1 1amuvinwxklghojpqyf
251 details.insert(self._loader_hook_position, (self.loader, self.extensions)) 1amuvinwxklghojpqyf
252 sys.path_hooks[path_id] = self.finder.path_hook(*details) 1amuvinwxklghojpqyf
253 sys.path_importer_cache.clear() 1amuvinwxklghojpqyf
254 return self 1amuvinwxklghojpqyf
256 def __exit__(self, *excepts): 1bdcae
257 if self._loader_hook_position is not None: 257 ↛ exitline 257 didn't return from function '__exit__', because the condition on line 257 was never false1amuvinwxklghojpqyf
258 path_id, details = get_loader_details() 1amuvinwxklghojpqyf
259 details.pop(self._loader_hook_position) 1amuvinwxklghojpqyf
260 sys.path_hooks[path_id] = self.finder.path_hook(*details) 1amuvinwxklghojpqyf
261 sys.path_importer_cache.clear() 1amuvinwxklghojpqyf
263 @classmethod 1bdcae
264 def load_file(cls, filename, main=True, **kwargs): 1bdcae
265 """Import a notebook as a module from a filename.
267 dir: The directory to load the file from.
268 main: Load the module in the __main__ context.
270 >>> assert Notebook.load_file('foo.ipynb')
271 """
272 name = main and "__main__" or filename 1rbcst
273 loader = cls(name, str(filename), **kwargs) 1rbcst
274 spec = FileModuleSpec(name, loader, origin=loader.path) 1rbcst
275 module = loader.create_module(spec) 1rbcst
276 loader.exec_module(module) 1rbcst
277 return module 1rbcst
279 @classmethod 1bdcae
280 def load_module(cls, module, main=False, **kwargs): 1bdcae
281 """Import a notebook as a module.
283 main: Load the module in the __main__ context.
285 >>> assert Notebook.load_module('foo')
286 """
287 from runpy import _run_module_as_main, run_module 1agh
289 with cls() as loader: 1agh
290 spec = find_spec(module) 1agh
291 module = spec.loader.create_module(spec) 1agh
292 if main: 1agh
293 sys.modules["__main__"] = module 1a
294 module.__name__ = "__main__" 1a
295 spec.loader.exec_module(module) 1agh
296 return module 1agh
298 @classmethod 1bdcae
299 def load_argv(cls, argv=None, *, parser=None): 1bdcae
300 """load a module based on python arguments
302 load a notebook from its file name
303 >>> Notebook.load_argv("foo.ipynb --arg abc")
305 load the same notebook from a module alias.
306 >>> Notebook.load_argv("-m foo --arg abc")
307 """
308 if parser is None: 308 ↛ 311line 308 didn't jump to line 311, because the condition on line 308 was never false1bdcae
309 parser = cls.get_argparser() 1bdcae
311 if argv is None: 311 ↛ 316line 311 didn't jump to line 316, because the condition on line 311 was never false1bdcae
312 from sys import argv 1bdcae
314 argv = argv[1:] 1bdcae
316 if isinstance(argv, str): 316 ↛ 317line 316 didn't jump to line 317, because the condition on line 316 was never true1bdcae
317 argv = shlex.split(argv)
319 module = cls.load_ns(parser.parse_args(argv)) 1bdcae
320 if module is None: 1bdcae
321 return parser.print_help() 1e
323 return module 1bdca
325 @classmethod 1bdcae
326 def load_ns(cls, ns): 1bdcae
327 """load a module from a namespace, used when loading module from sys.argv parameters."""
328 if ns.tasks: 1bdcae
329 # i don't quite why we need to do this here, but we do. so don't move it
330 from doit.cmd_base import ModuleTaskLoader 1b
331 from doit.doit_cmd import DoitMain 1b
333 if ns.code: 1bdcae
334 with main_argv(sys.argv[0], ns.args): 1d
335 result = cls.load_code(ns.code) 1d
336 elif ns.module: 1bcae
337 if ns.dir: 337 ↛ 340line 337 didn't jump to line 340, because the condition on line 337 was never false1a
338 if ns.dir not in sys.path: 338 ↛ 344line 338 didn't jump to line 344, because the condition on line 338 was never false1a
339 sys.path.insert(0, ns.dir) 1a
340 elif "" in sys.path:
341 pass
342 else:
343 sys.path.insert(0, "")
344 with main_argv(ns.module, ns.args): 1a
345 result = cls.load_module(ns.module, main=True) 1a
346 elif ns.file: 1bce
347 where = Path(ns.dir, ns.file) if ns.dir else Path(ns.file) 1bc
348 with main_argv(str(where), ns.args): 1bc
349 result = cls.load_file(ns.file) 1bc
350 else:
351 return 1e
352 if ns.tasks: 1bdca
353 DoitMain(ModuleTaskLoader(result)).run(ns.args or ["help"]) 1b
354 return result 1bdca
356 @classmethod 1bdcae
357 def load_code(cls, code, argv=None, mod_name=None, script_name=None, main=False): 1bdcae
358 """load a module from raw source code"""
360 from runpy import _run_module_code 1dz
362 self = cls() 1dz
363 name = main and "__main__" or mod_name or "<raw code>" 1dz
365 return _dict_module( 1dz
366 _run_module_code(self.raw_to_source(code), mod_name=name, script_name=script_name)
367 )
369 @staticmethod 1bdcae
370 def get_argparser(parser=None): 1bdcae
371 from argparse import REMAINDER, ArgumentParser 1bdcae
373 if parser is None: 373 ↛ 375line 373 didn't jump to line 375, because the condition on line 373 was never false1bdcae
374 parser = ArgumentParser("importnb", description="run notebooks as python code") 1bdcae
375 parser.add_argument("file", nargs="?", help="run a file") 1bdcae
376 parser.add_argument("args", nargs=REMAINDER, help="arguments to pass to script") 1bdcae
377 parser.add_argument("-m", "--module", help="run a module") 1bdcae
378 parser.add_argument("-c", "--code", help="run raw code") 1bdcae
379 parser.add_argument("-d", "--dir", help="path to run script in") 1bdcae
380 parser.add_argument("-t", "--tasks", action="store_true", help="run doit tasks") 1bdcae
381 return parser 1bdcae
384def comment(str): 1bdcae
385 return textwrap.indent(str, "# ") 1j
388class DefsOnly(ast.NodeTransformer): 1bdcae
389 INCLUDE = ast.Import, ast.ImportFrom, ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef 1bdcae
391 def visit_Module(self, node): 1bdcae
392 args = ([x for x in node.body if isinstance(x, self.INCLUDE)],) 1i
393 if VERSION >= (3, 8): 393 ↛ 395line 393 didn't jump to line 395, because the condition on line 393 was never false1i
394 args += (node.type_ignores,) 1i
395 return ast.Module(*args) 1i
398class Notebook(Loader): 1bdcae
399 """Notebook is a user friendly file finder and module loader for notebook source code.
401 > Remember, restart and run all or it didn't happen.
403 Notebook provides several useful options.
405 * Lazy module loading. A module is executed the first time it is used in a script.
406 """
408 def markdown(self, str): 1bdcae
409 return quote(str) 1rbcaminklsghojpqtf
411 def raw(self, str): 1bdcae
412 return comment(str)
414 def visit(self, nodes): 1bdcae
415 if self.include_non_defs: 1rbcaminklsghojpqtf
416 return nodes 1rbcamnklsghojpqtf
417 return DefsOnly().visit(nodes) 1i
419 def code(self, str): 1bdcae
420 if self.no_magic: 1rbdcaminklzsghojpqtf
421 if MAGIC.match(str): 1j
422 return comment(str) 1j
423 return super().code(str) 1rbdcaminklzsghojpqtf
425 def source_to_nodes(self, source, path="<unknown>", *, _optimize=-1): 1bdcae
426 nodes = super().source_to_nodes(source, path) 1rbcaminklsghojpqtf
427 if self.include_markdown_docstring: 427 ↛ 429line 427 didn't jump to line 429, because the condition on line 427 was never false1rbcaminklsghojpqtf
428 nodes = update_docstring(nodes) 1rbcaminklsghojpqtf
429 nodes = self.visit(nodes) 1rbcaminklsghojpqtf
430 return ast.fix_missing_locations(nodes) 1rbcaminklsghojpqtf
432 def raw_to_source(self, source): 1bdcae
433 """transform a string from a raw file to python source."""
434 if self.path and self.path.endswith(".ipynb"): 1rbdcaminklzsghojpqtf
435 # when we encounter notebooks we apply different transformers to the diff cell types
436 return LineCacheNotebookDecoder( 1rbcaminklsghojpqtf
437 code=self.code, raw=self.raw, markdown=self.markdown
438 ).decode(source, self.path)
440 # for a normal file we just apply the code transformer.
441 return self.code(source) 1dz
444def _dict_module(ns): 1bdcae
445 m = ModuleType(ns.get("__name__"), ns.get("__doc__")) 1dz
446 m.__dict__.update(ns) 1dz
447 return m 1dz
450@contextmanager 1bdcae
451def main_argv(prog, args=None): 1bdcae
452 if args is not None: 452 ↛ 455line 452 didn't jump to line 455, because the condition on line 452 was never false1bdca
453 args = [prog] + list(args) 1bdca
454 prior, sys.argv = sys.argv, args 1bdca
455 yield 1bdca
456 if args is not None: 456 ↛ exitline 456 didn't return from function 'main_argv', because the condition on line 456 was never false1bdca
457 sys.argv = prior 1bdca
460try: 1bdcae
461 import IPython 1bdcae
462 from IPython.core.inputsplitter import IPythonInputSplitter
464 dedent = IPythonInputSplitter(
465 line_input_checker=False,
466 physical_line_transforms=[
467 IPython.core.inputsplitter.leading_indent(),
468 IPython.core.inputsplitter.ipy_prompt(),
469 IPython.core.inputsplitter.cellmagic(end_on_blank_line=False),
470 ],
471 ).transform_cell
472except ModuleNotFoundError: 1bdcae
474 def dedent(body): 1bdcae
475 from textwrap import dedent, indent 1rbdcaminklzsghojpqtf
477 if MAGIC.match(body): 1rbdcaminklzsghojpqtf
478 return indent(body, "# ") 1rbcaminklsghopqt
479 return dedent(body) 1rbdcaminklzsghojpqtf