Coverage for /home/runner/.local/share/hatch/env/virtual/importnb/KA2AwMZG/test.stdlib/lib/python3.9/site-packages/importnb/loader.py: 87%

279 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-11-02 04:02 +0000

1# coding: utf-8 

2"""# `loader` 1bdcae

3 

4the loading machinery for notebooks style documents, and less. 

5notebooks combine code, markdown, and raw cells to create a complete document. 

6the importnb loader provides an interface for transforming these objects to valid python. 

7""" 

8 

9 

10import ast 1bdcae

11import inspect 1bdcae

12import re 1bdcae

13import shlex 1bdcae

14import sys 1bdcae

15import textwrap 1bdcae

16from contextlib import contextmanager 1bdcae

17from dataclasses import asdict, dataclass, field 1bdcae

18from functools import partial 1bdcae

19from importlib import _bootstrap as bootstrap 1bdcae

20from importlib import reload 1bdcae

21from importlib._bootstrap import _init_module_attrs, _requires_builtin 1bdcae

22from importlib._bootstrap_external import FileFinder, decode_source 1bdcae

23from importlib.machinery import ModuleSpec, SourceFileLoader 1bdcae

24from importlib.util import LazyLoader, find_spec 1bdcae

25from pathlib import Path 1bdcae

26from types import ModuleType 1bdcae

27 

28from . import get_ipython 1bdcae

29from .decoder import LineCacheNotebookDecoder, quote 1bdcae

30from .docstrings import update_docstring 1bdcae

31from .finder import FileModuleSpec, FuzzyFinder, get_loader_details, get_loader_index 1bdcae

32 

33__all__ = "Notebook", "reload" 1bdcae

34 

35VERSION = sys.version_info.major, sys.version_info.minor 1bdcae

36 

37MAGIC = re.compile(r"^\s*%{2}", re.MULTILINE) 1bdcae

38ALLOW_TOP_LEVEL_AWAIT = getattr(ast, "PyCF_ALLOW_TOP_LEVEL_AWAIT", 0x0) 1bdcae

39 

40 

41def _get_co_flags_set(co_flags): 1bdcae

42 """return a deconstructed set of code flags from a code object.""" 

43 flags = set() 1rbcaminklsghojpqtf

44 for i in range(12): 44 ↛ 52line 44 didn't jump to line 52, because the loop on line 44 didn't complete1rbcaminklsghojpqtf

45 flag = 1 << i 1rbcaminklsghojpqtf

46 if co_flags & flag: 1rbcaminklsghojpqtf

47 flags.add(flag) 1rbcaminklsghojpqtf

48 co_flags ^= flag 1rbcaminklsghojpqtf

49 if not co_flags: 1rbcaminklsghojpqtf

50 break 1rbcaminklsghojpqtf

51 else: 

52 flags.intersection_update(flags) 

53 return flags 1rbcaminklsghojpqtf

54 

55 

56class SourceModule(ModuleType): 1bdcae

57 def __fspath__(self): 1bdcae

58 return self.__file__ 

59 

60 

61@dataclass 1bdcae

62class Interface: 1bdcae

63 """a configuration python importing interface""" 

64 

65 name: str = None 1bdcae

66 path: str = None 1bdcae

67 lazy: bool = False 1bdcae

68 extensions: tuple = field(default_factory=[".ipy", ".ipynb"].copy) 1bdcae

69 include_fuzzy_finder: bool = True 1bdcae

70 include_markdown_docstring: bool = True 1bdcae

71 include_non_defs: bool = True 1bdcae

72 include_await: bool = True 1bdcae

73 module_type: ModuleType = field(default=SourceModule) 1bdcae

74 no_magic: bool = False 1bdcae

75 

76 _loader_hook_position: int = field(default=0, repr=False) 1bdcae

77 

78 def __new__(cls, name=None, path=None, **kwargs): 1bdcae

79 kwargs.update(name=name, path=path) 1rbdcamuvinwxklzsghojpqytf

80 self = super().__new__(cls) 1rbdcamuvinwxklzsghojpqytf

81 self.__init__(**kwargs) 1rbdcamuvinwxklzsghojpqytf

82 return self 1rbdcamuvinwxklzsghojpqytf

83 

84 

85class Loader(Interface, SourceFileLoader): 1bdcae

86 """The simplest implementation of a Notebook Source File Loader. 

87 This class breaks down the loading process into finer steps.""" 

88 

89 extensions: tuple = field(default_factory=[".py"].copy) 1bdcae

90 

91 @property 1bdcae

92 def loader(self): 1bdcae

93 """generate a new loader based on the state of an existing loader.""" 

94 loader = type(self) 1amuvinwxklghojpqyf

95 if self.lazy: 1amuvinwxklghojpqyf

96 loader = LazyLoader.factory(loader) 1l

97 # Strip the leading underscore from slots 

98 params = asdict(self) 1amuvinwxklghojpqyf

99 params.pop("name") 1amuvinwxklghojpqyf

100 params.pop("path") 1amuvinwxklghojpqyf

101 return partial(loader, **params) 1amuvinwxklghojpqyf

102 

103 @property 1bdcae

104 def finder(self): 1bdcae

105 """generate a new finder based on the state of an existing loader""" 

106 return self.include_fuzzy_finder and FuzzyFinder or FileFinder 1amuvinwxklghojpqyf

107 

108 def raw_to_source(self, source): 1bdcae

109 """transform a string from a raw file to python source.""" 

110 if self.path and self.path.endswith(".ipynb"): 

111 # when we encounter notebooks we apply different transformers to the diff cell types 

112 return LineCacheNotebookDecoder( 

113 code=self.code, raw=self.raw, markdown=self.markdown 

114 ).decode(source, self.path) 

115 

116 # for a normal file we just apply the code transformer. 

117 return self.code(source) 

118 

119 def source_to_nodes(self, source, path="<unknown>", *, _optimize=-1): 1bdcae

120 """parse source string as python ast""" 

121 flags = ast.PyCF_ONLY_AST 1rbcaminklsghojpqtf

122 return bootstrap._call_with_frames_removed( 1rbcaminklsghojpqtf

123 compile, source, path, "exec", flags=flags, dont_inherit=True, optimize=_optimize 

124 ) 

125 

126 def nodes_to_code(self, nodes, path="<unknown>", *, _optimize=-1): 1bdcae

127 """compile ast nodes to python code object""" 

128 flags = ALLOW_TOP_LEVEL_AWAIT 1rbcaminklsghojpqtf

129 return bootstrap._call_with_frames_removed( 1rbcaminklsghojpqtf

130 compile, nodes, path, "exec", flags=flags, dont_inherit=True, optimize=_optimize 

131 ) 

132 

133 def source_to_code(self, source, path="<unknown>", *, _optimize=-1): 1bdcae

134 """tangle python source to compiled code by: 

135 1. parsing the source as ast nodes 

136 2. compiling the ast nodes as python code""" 

137 nodes = self.source_to_nodes(source, path, _optimize=_optimize) 1rbcaminklsghojpqtf

138 return self.nodes_to_code(nodes, path, _optimize=_optimize) 1rbcaminklsghojpqtf

139 

140 def get_data(self, path): 1bdcae

141 """get_data injects an input transformation before the raw text. 

142 

143 this method allows notebook json to be transformed line for line into vertically sparse python code. 

144 """ 

145 return self.raw_to_source(decode_source(super().get_data(self.path))) 1rbcaminklsghojpqtf

146 

147 def create_module(self, spec): 1bdcae

148 """an overloaded create_module method injecting fuzzy finder setup up logic.""" 

149 module = self.module_type(str(spec.name)) 1rbcamuinklsghojpqtf

150 _init_module_attrs(spec, module) 1rbcamuinklsghojpqtf

151 if self.name: 151 ↛ 154line 151 didn't jump to line 154, because the condition on line 151 was never false1rbcamuinklsghojpqtf

152 module.__name__ = self.name 1rbcamuinklsghojpqtf

153 

154 if module.__file__.endswith((".ipynb", ".ipy")): 1rbcamuinklsghojpqtf

155 module.get_ipython = get_ipython 1rbcaminklsghojpqtf

156 

157 if getattr(spec, "alias", None): 1rbcamuinklsghojpqtf

158 # put a fuzzy spec on the modules to avoid re importing it. 

159 # there is a funky trick you do with the fuzzy finder where you 

160 # load multiple versions with different finders. 

161 

162 sys.modules[spec.alias] = module 1kf

163 

164 return module 1rbcamuinklsghojpqtf

165 

166 def exec_module(self, module): 1bdcae

167 """Execute the module.""" 

168 # importlib uses module.__name__, but when running modules as __main__ name will change. 

169 # this approach uses the original name on the spec. 

170 try: 1rbcaminklsghojpqtf

171 code = self.get_code(module.__spec__.name) 1rbcaminklsghojpqtf

172 

173 # from importlib 

174 if code is None: 174 ↛ 175line 174 didn't jump to line 175, because the condition on line 174 was never true1rbcaminklsghojpqtf

175 raise ImportError( 

176 f"cannot load module {module.__name__!r} when " "get_code() returns None" 

177 ) 

178 

179 if inspect.CO_COROUTINE not in _get_co_flags_set(code.co_flags): 1rbcaminklsghojpqtf

180 # if there isn't any async non sense then we proceed with convention. 

181 bootstrap._call_with_frames_removed(exec, code, module.__dict__) 1rbcaminklsghojpqt

182 else: 

183 self.aexec_module_sync(module) 1f

184 

185 except BaseException as e: 

186 alias = getattr(module.__spec__, "alias", None) 

187 if alias: 

188 sys.modules.pop(alias, None) 

189 

190 raise e 

191 

192 def aexec_module_sync(self, module): 1bdcae

193 if "anyio" in sys.modules: 193 ↛ 194line 193 didn't jump to line 194, because the condition on line 193 was never true1f

194 import anyio 

195 

196 __import__("anyio").run(self.aexec_module, module) 

197 else: 

198 from asyncio import get_event_loop 1f

199 

200 get_event_loop().run_until_complete(self.aexec_module(module)) 1f

201 

202 async def aexec_module(self, module): 1bdcae

203 """an async exec_module method permitting top-level await.""" 

204 # there is so redudancy in this approach, but it starts getting asynchier. 

205 nodes = self.source_to_nodes(self.get_data(self.path)) 1f

206 

207 # iterate through the nodes and compile individual statements 

208 for node in nodes.body: 1f

209 co = bootstrap._call_with_frames_removed( 1f

210 compile, 

211 ast.Module([node], []), 

212 module.__file__, 

213 "exec", 

214 flags=ALLOW_TOP_LEVEL_AWAIT, 

215 ) 

216 if inspect.CO_COROUTINE in _get_co_flags_set(co.co_flags): 1f

217 # when something async is encountered we compile it with the single flag 

218 # this lets us use eval to retreive our coroutine. 

219 co = bootstrap._call_with_frames_removed( 1f

220 compile, 

221 ast.Interactive([node]), 

222 module.__file__, 

223 "single", 

224 flags=ALLOW_TOP_LEVEL_AWAIT, 

225 ) 

226 await bootstrap._call_with_frames_removed( 1f

227 eval, co, module.__dict__, module.__dict__ 

228 ) 

229 else: 

230 bootstrap._call_with_frames_removed(exec, co, module.__dict__, module.__dict__) 1f

231 

232 def code(self, str): 1bdcae

233 return dedent(str) 1rbdcaminklzsghojpqtf

234 

235 @classmethod 1bdcae

236 @_requires_builtin 1bdcae

237 def is_package(cls, fullname): 1bdcae

238 """Return False as built-in modules are never packages.""" 

239 if "." not in fullname: 

240 return True 

241 return super().is_package(fullname) 

242 

243 def __enter__(self): 1bdcae

244 path_id, loader_id, details = get_loader_index(".py") 1amuvinwxklghojpqyf

245 for _, e in details: 1amuvinwxklghojpqyf

246 if all(map(e.__contains__, self.extensions)): 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true1amuvinwxklghojpqyf

247 self._loader_hook_position = None 

248 return self 

249 else: 

250 self._loader_hook_position = loader_id + 1 1amuvinwxklghojpqyf

251 details.insert(self._loader_hook_position, (self.loader, self.extensions)) 1amuvinwxklghojpqyf

252 sys.path_hooks[path_id] = self.finder.path_hook(*details) 1amuvinwxklghojpqyf

253 sys.path_importer_cache.clear() 1amuvinwxklghojpqyf

254 return self 1amuvinwxklghojpqyf

255 

256 def __exit__(self, *excepts): 1bdcae

257 if self._loader_hook_position is not None: 257 ↛ exitline 257 didn't return from function '__exit__', because the condition on line 257 was never false1amuvinwxklghojpqyf

258 path_id, details = get_loader_details() 1amuvinwxklghojpqyf

259 details.pop(self._loader_hook_position) 1amuvinwxklghojpqyf

260 sys.path_hooks[path_id] = self.finder.path_hook(*details) 1amuvinwxklghojpqyf

261 sys.path_importer_cache.clear() 1amuvinwxklghojpqyf

262 

263 @classmethod 1bdcae

264 def load_file(cls, filename, main=True, **kwargs): 1bdcae

265 """Import a notebook as a module from a filename. 

266 

267 dir: The directory to load the file from. 

268 main: Load the module in the __main__ context. 

269 

270 >>> assert Notebook.load_file('foo.ipynb') 

271 """ 

272 name = main and "__main__" or filename 1rbcst

273 loader = cls(name, str(filename), **kwargs) 1rbcst

274 spec = FileModuleSpec(name, loader, origin=loader.path) 1rbcst

275 module = loader.create_module(spec) 1rbcst

276 loader.exec_module(module) 1rbcst

277 return module 1rbcst

278 

279 @classmethod 1bdcae

280 def load_module(cls, module, main=False, **kwargs): 1bdcae

281 """Import a notebook as a module. 

282 

283 main: Load the module in the __main__ context. 

284 

285 >>> assert Notebook.load_module('foo') 

286 """ 

287 from runpy import _run_module_as_main, run_module 1agh

288 

289 with cls() as loader: 1agh

290 spec = find_spec(module) 1agh

291 module = spec.loader.create_module(spec) 1agh

292 if main: 1agh

293 sys.modules["__main__"] = module 1a

294 module.__name__ = "__main__" 1a

295 spec.loader.exec_module(module) 1agh

296 return module 1agh

297 

298 @classmethod 1bdcae

299 def load_argv(cls, argv=None, *, parser=None): 1bdcae

300 """load a module based on python arguments 

301 

302 load a notebook from its file name 

303 >>> Notebook.load_argv("foo.ipynb --arg abc") 

304 

305 load the same notebook from a module alias. 

306 >>> Notebook.load_argv("-m foo --arg abc") 

307 """ 

308 if parser is None: 308 ↛ 311line 308 didn't jump to line 311, because the condition on line 308 was never false1bdcae

309 parser = cls.get_argparser() 1bdcae

310 

311 if argv is None: 311 ↛ 316line 311 didn't jump to line 316, because the condition on line 311 was never false1bdcae

312 from sys import argv 1bdcae

313 

314 argv = argv[1:] 1bdcae

315 

316 if isinstance(argv, str): 316 ↛ 317line 316 didn't jump to line 317, because the condition on line 316 was never true1bdcae

317 argv = shlex.split(argv) 

318 

319 module = cls.load_ns(parser.parse_args(argv)) 1bdcae

320 if module is None: 1bdcae

321 return parser.print_help() 1e

322 

323 return module 1bdca

324 

325 @classmethod 1bdcae

326 def load_ns(cls, ns): 1bdcae

327 """load a module from a namespace, used when loading module from sys.argv parameters.""" 

328 if ns.tasks: 1bdcae

329 # i don't quite why we need to do this here, but we do. so don't move it 

330 from doit.cmd_base import ModuleTaskLoader 1b

331 from doit.doit_cmd import DoitMain 1b

332 

333 if ns.code: 1bdcae

334 with main_argv(sys.argv[0], ns.args): 1d

335 result = cls.load_code(ns.code) 1d

336 elif ns.module: 1bcae

337 if ns.dir: 337 ↛ 340line 337 didn't jump to line 340, because the condition on line 337 was never false1a

338 if ns.dir not in sys.path: 338 ↛ 344line 338 didn't jump to line 344, because the condition on line 338 was never false1a

339 sys.path.insert(0, ns.dir) 1a

340 elif "" in sys.path: 

341 pass 

342 else: 

343 sys.path.insert(0, "") 

344 with main_argv(ns.module, ns.args): 1a

345 result = cls.load_module(ns.module, main=True) 1a

346 elif ns.file: 1bce

347 where = Path(ns.dir, ns.file) if ns.dir else Path(ns.file) 1bc

348 with main_argv(str(where), ns.args): 1bc

349 result = cls.load_file(ns.file) 1bc

350 else: 

351 return 1e

352 if ns.tasks: 1bdca

353 DoitMain(ModuleTaskLoader(result)).run(ns.args or ["help"]) 1b

354 return result 1bdca

355 

356 @classmethod 1bdcae

357 def load_code(cls, code, argv=None, mod_name=None, script_name=None, main=False): 1bdcae

358 """load a module from raw source code""" 

359 

360 from runpy import _run_module_code 1dz

361 

362 self = cls() 1dz

363 name = main and "__main__" or mod_name or "<raw code>" 1dz

364 

365 return _dict_module( 1dz

366 _run_module_code(self.raw_to_source(code), mod_name=name, script_name=script_name) 

367 ) 

368 

369 @staticmethod 1bdcae

370 def get_argparser(parser=None): 1bdcae

371 from argparse import REMAINDER, ArgumentParser 1bdcae

372 

373 if parser is None: 373 ↛ 375line 373 didn't jump to line 375, because the condition on line 373 was never false1bdcae

374 parser = ArgumentParser("importnb", description="run notebooks as python code") 1bdcae

375 parser.add_argument("file", nargs="?", help="run a file") 1bdcae

376 parser.add_argument("args", nargs=REMAINDER, help="arguments to pass to script") 1bdcae

377 parser.add_argument("-m", "--module", help="run a module") 1bdcae

378 parser.add_argument("-c", "--code", help="run raw code") 1bdcae

379 parser.add_argument("-d", "--dir", help="path to run script in") 1bdcae

380 parser.add_argument("-t", "--tasks", action="store_true", help="run doit tasks") 1bdcae

381 return parser 1bdcae

382 

383 

384def comment(str): 1bdcae

385 return textwrap.indent(str, "# ") 1j

386 

387 

388class DefsOnly(ast.NodeTransformer): 1bdcae

389 INCLUDE = ast.Import, ast.ImportFrom, ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef 1bdcae

390 

391 def visit_Module(self, node): 1bdcae

392 args = ([x for x in node.body if isinstance(x, self.INCLUDE)],) 1i

393 if VERSION >= (3, 8): 393 ↛ 395line 393 didn't jump to line 395, because the condition on line 393 was never false1i

394 args += (node.type_ignores,) 1i

395 return ast.Module(*args) 1i

396 

397 

398class Notebook(Loader): 1bdcae

399 """Notebook is a user friendly file finder and module loader for notebook source code. 

400 

401 > Remember, restart and run all or it didn't happen. 

402 

403 Notebook provides several useful options. 

404 

405 * Lazy module loading. A module is executed the first time it is used in a script. 

406 """ 

407 

408 def markdown(self, str): 1bdcae

409 return quote(str) 1rbcaminklsghojpqtf

410 

411 def raw(self, str): 1bdcae

412 return comment(str) 

413 

414 def visit(self, nodes): 1bdcae

415 if self.include_non_defs: 1rbcaminklsghojpqtf

416 return nodes 1rbcamnklsghojpqtf

417 return DefsOnly().visit(nodes) 1i

418 

419 def code(self, str): 1bdcae

420 if self.no_magic: 1rbdcaminklzsghojpqtf

421 if MAGIC.match(str): 1j

422 return comment(str) 1j

423 return super().code(str) 1rbdcaminklzsghojpqtf

424 

425 def source_to_nodes(self, source, path="<unknown>", *, _optimize=-1): 1bdcae

426 nodes = super().source_to_nodes(source, path) 1rbcaminklsghojpqtf

427 if self.include_markdown_docstring: 427 ↛ 429line 427 didn't jump to line 429, because the condition on line 427 was never false1rbcaminklsghojpqtf

428 nodes = update_docstring(nodes) 1rbcaminklsghojpqtf

429 nodes = self.visit(nodes) 1rbcaminklsghojpqtf

430 return ast.fix_missing_locations(nodes) 1rbcaminklsghojpqtf

431 

432 def raw_to_source(self, source): 1bdcae

433 """transform a string from a raw file to python source.""" 

434 if self.path and self.path.endswith(".ipynb"): 1rbdcaminklzsghojpqtf

435 # when we encounter notebooks we apply different transformers to the diff cell types 

436 return LineCacheNotebookDecoder( 1rbcaminklsghojpqtf

437 code=self.code, raw=self.raw, markdown=self.markdown 

438 ).decode(source, self.path) 

439 

440 # for a normal file we just apply the code transformer. 

441 return self.code(source) 1dz

442 

443 

444def _dict_module(ns): 1bdcae

445 m = ModuleType(ns.get("__name__"), ns.get("__doc__")) 1dz

446 m.__dict__.update(ns) 1dz

447 return m 1dz

448 

449 

450@contextmanager 1bdcae

451def main_argv(prog, args=None): 1bdcae

452 if args is not None: 452 ↛ 455line 452 didn't jump to line 455, because the condition on line 452 was never false1bdca

453 args = [prog] + list(args) 1bdca

454 prior, sys.argv = sys.argv, args 1bdca

455 yield 1bdca

456 if args is not None: 456 ↛ exitline 456 didn't return from function 'main_argv', because the condition on line 456 was never false1bdca

457 sys.argv = prior 1bdca

458 

459 

460try: 1bdcae

461 import IPython 1bdcae

462 from IPython.core.inputsplitter import IPythonInputSplitter 

463 

464 dedent = IPythonInputSplitter( 

465 line_input_checker=False, 

466 physical_line_transforms=[ 

467 IPython.core.inputsplitter.leading_indent(), 

468 IPython.core.inputsplitter.ipy_prompt(), 

469 IPython.core.inputsplitter.cellmagic(end_on_blank_line=False), 

470 ], 

471 ).transform_cell 

472except ModuleNotFoundError: 1bdcae

473 

474 def dedent(body): 1bdcae

475 from textwrap import dedent, indent 1rbdcaminklzsghojpqtf

476 

477 if MAGIC.match(body): 1rbdcaminklzsghojpqtf

478 return indent(body, "# ") 1rbcaminklsghopqt

479 return dedent(body) 1rbdcaminklzsghojpqtf