Coverage for /home/runner/.local/share/hatch/env/virtual/importnb/KA2AwMZG/test.interactive/lib/python3.9/site-packages/importnb/loader.py: 85%

279 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-11-02 04:03 +0000

1# coding: utf-8 

2"""# `loader` 1cedafb

3 

4the loading machinery for notebooks style documents, and less. 

5notebooks combine code, markdown, and raw cells to create a complete document. 

6the importnb loader provides an interface for transforming these objects to valid python. 

7""" 

8 

9 

10import ast 1cedafb

11import inspect 1cedafb

12import re 1cedafb

13import shlex 1cedafb

14import sys 1cedafb

15import textwrap 1cedafb

16from contextlib import contextmanager 1cedafb

17from dataclasses import asdict, dataclass, field 1cedafb

18from functools import partial 1cedafb

19from importlib import _bootstrap as bootstrap 1cedafb

20from importlib import reload 1cedafb

21from importlib._bootstrap import _init_module_attrs, _requires_builtin 1cedafb

22from importlib._bootstrap_external import FileFinder, decode_source 1cedafb

23from importlib.machinery import ModuleSpec, SourceFileLoader 1cedafb

24from importlib.util import LazyLoader, find_spec 1cedafb

25from pathlib import Path 1cedafb

26from types import ModuleType 1cedafb

27 

28from . import get_ipython 1cedafb

29from .decoder import LineCacheNotebookDecoder, quote 1cedafb

30from .docstrings import update_docstring 1cedafb

31from .finder import FileModuleSpec, FuzzyFinder, get_loader_details, get_loader_index 1cedafb

32 

33__all__ = "Notebook", "reload" 1cedafb

34 

35VERSION = sys.version_info.major, sys.version_info.minor 1cedafb

36 

37MAGIC = re.compile(r"^\s*%{2}", re.MULTILINE) 1cedafb

38ALLOW_TOP_LEVEL_AWAIT = getattr(ast, "PyCF_ALLOW_TOP_LEVEL_AWAIT", 0x0) 1cedafb

39 

40 

41def _get_co_flags_set(co_flags): 1cedafb

42 """return a deconstructed set of code flags from a code object.""" 

43 flags = set() 1tcdanbjolsmuhipkqrvg

44 for i in range(12): 44 ↛ 52line 44 didn't jump to line 52, because the loop on line 44 didn't complete1tcdanbjolsmuhipkqrvg

45 flag = 1 << i 1tcdanbjolsmuhipkqrvg

46 if co_flags & flag: 1tcdanbjolsmuhipkqrvg

47 flags.add(flag) 1tcdanbjolsmuhipkqrvg

48 co_flags ^= flag 1tcdanbjolsmuhipkqrvg

49 if not co_flags: 1tcdanbjolsmuhipkqrvg

50 break 1tcdanbjolsmuhipkqrvg

51 else: 

52 flags.intersection_update(flags) 

53 return flags 1tcdanbjolsmuhipkqrvg

54 

55 

56class SourceModule(ModuleType): 1cedafb

57 def __fspath__(self): 1cedafb

58 return self.__file__ 

59 

60 

61@dataclass 1cedafb

62class Interface: 1cedafb

63 """a configuration python importing interface""" 

64 

65 name: str = None 1cedafb

66 path: str = None 1cedafb

67 lazy: bool = False 1cedafb

68 extensions: tuple = field(default_factory=[".ipy", ".ipynb"].copy) 1cedafb

69 include_fuzzy_finder: bool = True 1cedafb

70 include_markdown_docstring: bool = True 1cedafb

71 include_non_defs: bool = True 1cedafb

72 include_await: bool = True 1cedafb

73 module_type: ModuleType = field(default=SourceModule) 1cedafb

74 no_magic: bool = False 1cedafb

75 

76 _loader_hook_position: int = field(default=0, repr=False) 1cedafb

77 

78 def __new__(cls, name=None, path=None, **kwargs): 1cedafb

79 kwargs.update(name=name, path=path) 1tcedanbwxjoyzlsmBuhipkqrAvg

80 self = super().__new__(cls) 1tcedanbwxjoyzlsmBuhipkqrAvg

81 self.__init__(**kwargs) 1tcedanbwxjoyzlsmBuhipkqrAvg

82 return self 1tcedanbwxjoyzlsmBuhipkqrAvg

83 

84 

85class Loader(Interface, SourceFileLoader): 1cedafb

86 """The simplest implementation of a Notebook Source File Loader. 

87 This class breaks down the loading process into finer steps.""" 

88 

89 extensions: tuple = field(default_factory=[".py"].copy) 1cedafb

90 

91 @property 1cedafb

92 def loader(self): 1cedafb

93 """generate a new loader based on the state of an existing loader.""" 

94 loader = type(self) 1anbwxjoyzlsmhipkqrAg

95 if self.lazy: 1anbwxjoyzlsmhipkqrAg

96 loader = LazyLoader.factory(loader) 1m

97 # Strip the leading underscore from slots 

98 params = asdict(self) 1anbwxjoyzlsmhipkqrAg

99 params.pop("name") 1anbwxjoyzlsmhipkqrAg

100 params.pop("path") 1anbwxjoyzlsmhipkqrAg

101 return partial(loader, **params) 1anbwxjoyzlsmhipkqrAg

102 

103 @property 1cedafb

104 def finder(self): 1cedafb

105 """generate a new finder based on the state of an existing loader""" 

106 return self.include_fuzzy_finder and FuzzyFinder or FileFinder 1anbwxjoyzlsmhipkqrAg

107 

108 def raw_to_source(self, source): 1cedafb

109 """transform a string from a raw file to python source.""" 

110 if self.path and self.path.endswith(".ipynb"): 

111 # when we encounter notebooks we apply different transformers to the diff cell types 

112 return LineCacheNotebookDecoder( 

113 code=self.code, raw=self.raw, markdown=self.markdown 

114 ).decode(source, self.path) 

115 

116 # for a normal file we just apply the code transformer. 

117 return self.code(source) 

118 

119 def source_to_nodes(self, source, path="<unknown>", *, _optimize=-1): 1cedafb

120 """parse source string as python ast""" 

121 flags = ast.PyCF_ONLY_AST 1tcdanbjolsmuhipkqrvg

122 return bootstrap._call_with_frames_removed( 1tcdanbjolsmuhipkqrvg

123 compile, source, path, "exec", flags=flags, dont_inherit=True, optimize=_optimize 

124 ) 

125 

126 def nodes_to_code(self, nodes, path="<unknown>", *, _optimize=-1): 1cedafb

127 """compile ast nodes to python code object""" 

128 flags = ALLOW_TOP_LEVEL_AWAIT 1tcdanbjolsmuhipkqrvg

129 return bootstrap._call_with_frames_removed( 1tcdanbjolsmuhipkqrvg

130 compile, nodes, path, "exec", flags=flags, dont_inherit=True, optimize=_optimize 

131 ) 

132 

133 def source_to_code(self, source, path="<unknown>", *, _optimize=-1): 1cedafb

134 """tangle python source to compiled code by: 

135 1. parsing the source as ast nodes 

136 2. compiling the ast nodes as python code""" 

137 nodes = self.source_to_nodes(source, path, _optimize=_optimize) 1tcdanbjolsmuhipkqrvg

138 return self.nodes_to_code(nodes, path, _optimize=_optimize) 1tcdanbjolsmuhipkqrvg

139 

140 def get_data(self, path): 1cedafb

141 """get_data injects an input transformation before the raw text. 

142 

143 this method allows notebook json to be transformed line for line into vertically sparse python code. 

144 """ 

145 return self.raw_to_source(decode_source(super().get_data(self.path))) 1tcdanbjolsmuhipkqrvg

146 

147 def create_module(self, spec): 1cedafb

148 """an overloaded create_module method injecting fuzzy finder setup up logic.""" 

149 module = self.module_type(str(spec.name)) 1tcdanbwjolsmuhipkqrvg

150 _init_module_attrs(spec, module) 1tcdanbwjolsmuhipkqrvg

151 if self.name: 151 ↛ 154line 151 didn't jump to line 154, because the condition on line 151 was never false1tcdanbwjolsmuhipkqrvg

152 module.__name__ = self.name 1tcdanbwjolsmuhipkqrvg

153 

154 if module.__file__.endswith((".ipynb", ".ipy")): 1tcdanbwjolsmuhipkqrvg

155 module.get_ipython = get_ipython 1tcdanbjolsmuhipkqrvg

156 

157 if getattr(spec, "alias", None): 1tcdanbwjolsmuhipkqrvg

158 # put a fuzzy spec on the modules to avoid re importing it. 

159 # there is a funky trick you do with the fuzzy finder where you 

160 # load multiple versions with different finders. 

161 

162 sys.modules[spec.alias] = module 1lg

163 

164 return module 1tcdanbwjolsmuhipkqrvg

165 

166 def exec_module(self, module): 1cedafb

167 """Execute the module.""" 

168 # importlib uses module.__name__, but when running modules as __main__ name will change. 

169 # this approach uses the original name on the spec. 

170 try: 1tcdanbjolsmuhipkqrvg

171 code = self.get_code(module.__spec__.name) 1tcdanbjolsmuhipkqrvg

172 

173 # from importlib 

174 if code is None: 174 ↛ 175line 174 didn't jump to line 175, because the condition on line 174 was never true1tcdanbjolsmuhipkqrvg

175 raise ImportError( 

176 f"cannot load module {module.__name__!r} when " "get_code() returns None" 

177 ) 

178 

179 if inspect.CO_COROUTINE not in _get_co_flags_set(code.co_flags): 1tcdanbjolsmuhipkqrvg

180 # if there isn't any async non sense then we proceed with convention. 

181 bootstrap._call_with_frames_removed(exec, code, module.__dict__) 1tcdanbjolsmuhipkqrv

182 else: 

183 self.aexec_module_sync(module) 1g

184 

185 except BaseException as e: 

186 alias = getattr(module.__spec__, "alias", None) 

187 if alias: 

188 sys.modules.pop(alias, None) 

189 

190 raise e 

191 

192 def aexec_module_sync(self, module): 1cedafb

193 if "anyio" in sys.modules: 193 ↛ 194line 193 didn't jump to line 194, because the condition on line 193 was never true1g

194 import anyio 

195 

196 __import__("anyio").run(self.aexec_module, module) 

197 else: 

198 from asyncio import get_event_loop 1g

199 

200 get_event_loop().run_until_complete(self.aexec_module(module)) 1g

201 

202 async def aexec_module(self, module): 1cedafb

203 """an async exec_module method permitting top-level await.""" 

204 # there is so redudancy in this approach, but it starts getting asynchier. 

205 nodes = self.source_to_nodes(self.get_data(self.path)) 1g

206 

207 # iterate through the nodes and compile individual statements 

208 for node in nodes.body: 1g

209 co = bootstrap._call_with_frames_removed( 1g

210 compile, 

211 ast.Module([node], []), 

212 module.__file__, 

213 "exec", 

214 flags=ALLOW_TOP_LEVEL_AWAIT, 

215 ) 

216 if inspect.CO_COROUTINE in _get_co_flags_set(co.co_flags): 1g

217 # when something async is encountered we compile it with the single flag 

218 # this lets us use eval to retreive our coroutine. 

219 co = bootstrap._call_with_frames_removed( 1g

220 compile, 

221 ast.Interactive([node]), 

222 module.__file__, 

223 "single", 

224 flags=ALLOW_TOP_LEVEL_AWAIT, 

225 ) 

226 await bootstrap._call_with_frames_removed( 1g

227 eval, co, module.__dict__, module.__dict__ 

228 ) 

229 else: 

230 bootstrap._call_with_frames_removed(exec, co, module.__dict__, module.__dict__) 1g

231 

232 def code(self, str): 1cedafb

233 return dedent(str) 1tcedanbjolsmBuhipkqrvg

234 

235 @classmethod 1cedafb

236 @_requires_builtin 1cedafb

237 def is_package(cls, fullname): 1cedafb

238 """Return False as built-in modules are never packages.""" 

239 if "." not in fullname: 

240 return True 

241 return super().is_package(fullname) 

242 

243 def __enter__(self): 1cedafb

244 path_id, loader_id, details = get_loader_index(".py") 1anbwxjoyzlsmhipkqrAg

245 for _, e in details: 1anbwxjoyzlsmhipkqrAg

246 if all(map(e.__contains__, self.extensions)): 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true1anbwxjoyzlsmhipkqrAg

247 self._loader_hook_position = None 

248 return self 

249 else: 

250 self._loader_hook_position = loader_id + 1 1anbwxjoyzlsmhipkqrAg

251 details.insert(self._loader_hook_position, (self.loader, self.extensions)) 1anbwxjoyzlsmhipkqrAg

252 sys.path_hooks[path_id] = self.finder.path_hook(*details) 1anbwxjoyzlsmhipkqrAg

253 sys.path_importer_cache.clear() 1anbwxjoyzlsmhipkqrAg

254 return self 1anbwxjoyzlsmhipkqrAg

255 

256 def __exit__(self, *excepts): 1cedafb

257 if self._loader_hook_position is not None: 257 ↛ exitline 257 didn't return from function '__exit__', because the condition on line 257 was never false1anbwxjoyzlsmhipkqrAg

258 path_id, details = get_loader_details() 1anbwxjoyzlsmhipkqrAg

259 details.pop(self._loader_hook_position) 1anbwxjoyzlsmhipkqrAg

260 sys.path_hooks[path_id] = self.finder.path_hook(*details) 1anbwxjoyzlsmhipkqrAg

261 sys.path_importer_cache.clear() 1anbwxjoyzlsmhipkqrAg

262 

263 @classmethod 1cedafb

264 def load_file(cls, filename, main=True, **kwargs): 1cedafb

265 """Import a notebook as a module from a filename. 

266 

267 dir: The directory to load the file from. 

268 main: Load the module in the __main__ context. 

269 

270 >>> assert Notebook.load_file('foo.ipynb') 

271 """ 

272 name = main and "__main__" or filename 1tcdbuv

273 loader = cls(name, str(filename), **kwargs) 1tcdbuv

274 spec = FileModuleSpec(name, loader, origin=loader.path) 1tcdbuv

275 module = loader.create_module(spec) 1tcdbuv

276 loader.exec_module(module) 1tcdbuv

277 return module 1tcdbuv

278 

279 @classmethod 1cedafb

280 def load_module(cls, module, main=False, **kwargs): 1cedafb

281 """Import a notebook as a module. 

282 

283 main: Load the module in the __main__ context. 

284 

285 >>> assert Notebook.load_module('foo') 

286 """ 

287 from runpy import _run_module_as_main, run_module 1ahi

288 

289 with cls() as loader: 1ahi

290 spec = find_spec(module) 1ahi

291 module = spec.loader.create_module(spec) 1ahi

292 if main: 1ahi

293 sys.modules["__main__"] = module 1a

294 module.__name__ = "__main__" 1a

295 spec.loader.exec_module(module) 1ahi

296 return module 1ahi

297 

298 @classmethod 1cedafb

299 def load_argv(cls, argv=None, *, parser=None): 1cedafb

300 """load a module based on python arguments 

301 

302 load a notebook from its file name 

303 >>> Notebook.load_argv("foo.ipynb --arg abc") 

304 

305 load the same notebook from a module alias. 

306 >>> Notebook.load_argv("-m foo --arg abc") 

307 """ 

308 if parser is None: 308 ↛ 311line 308 didn't jump to line 311, because the condition on line 308 was never false1cedafb

309 parser = cls.get_argparser() 1cedafb

310 

311 if argv is None: 311 ↛ 316line 311 didn't jump to line 316, because the condition on line 311 was never false1cedafb

312 from sys import argv 1cedafb

313 

314 argv = argv[1:] 1cedafb

315 

316 if isinstance(argv, str): 316 ↛ 317line 316 didn't jump to line 317, because the condition on line 316 was never true1cedafb

317 argv = shlex.split(argv) 

318 

319 module = cls.load_ns(parser.parse_args(argv)) 1cedafb

320 if module is None: 1cedafb

321 return parser.print_help() 1f

322 

323 return module 1cedab

324 

325 @classmethod 1cedafb

326 def load_ns(cls, ns): 1cedafb

327 """load a module from a namespace, used when loading module from sys.argv parameters.""" 

328 if ns.tasks: 1cedafb

329 # i don't quite why we need to do this here, but we do. so don't move it 

330 from doit.cmd_base import ModuleTaskLoader 1c

331 from doit.doit_cmd import DoitMain 1c

332 

333 if ns.code: 1cedafb

334 with main_argv(sys.argv[0], ns.args): 1e

335 result = cls.load_code(ns.code) 1e

336 elif ns.module: 1cdafb

337 if ns.dir: 337 ↛ 340line 337 didn't jump to line 340, because the condition on line 337 was never false1a

338 if ns.dir not in sys.path: 338 ↛ 344line 338 didn't jump to line 344, because the condition on line 338 was never false1a

339 sys.path.insert(0, ns.dir) 1a

340 elif "" in sys.path: 

341 pass 

342 else: 

343 sys.path.insert(0, "") 

344 with main_argv(ns.module, ns.args): 1a

345 result = cls.load_module(ns.module, main=True) 1a

346 elif ns.file: 1cdfb

347 where = Path(ns.dir, ns.file) if ns.dir else Path(ns.file) 1cdb

348 with main_argv(str(where), ns.args): 1cdb

349 result = cls.load_file(ns.file) 1cdb

350 else: 

351 return 1f

352 if ns.tasks: 1cedab

353 DoitMain(ModuleTaskLoader(result)).run(ns.args or ["help"]) 1c

354 return result 1cedab

355 

356 @classmethod 1cedafb

357 def load_code(cls, code, argv=None, mod_name=None, script_name=None, main=False): 1cedafb

358 """load a module from raw source code""" 

359 

360 from runpy import _run_module_code 1eB

361 

362 self = cls() 1eB

363 name = main and "__main__" or mod_name or "<raw code>" 1eB

364 

365 return _dict_module( 1eB

366 _run_module_code(self.raw_to_source(code), mod_name=name, script_name=script_name) 

367 ) 

368 

369 @staticmethod 1cedafb

370 def get_argparser(parser=None): 1cedafb

371 from argparse import REMAINDER, ArgumentParser 1cedafb

372 

373 if parser is None: 373 ↛ 375line 373 didn't jump to line 375, because the condition on line 373 was never false1cedafb

374 parser = ArgumentParser("importnb", description="run notebooks as python code") 1cedafb

375 parser.add_argument("file", nargs="?", help="run a file") 1cedafb

376 parser.add_argument("args", nargs=REMAINDER, help="arguments to pass to script") 1cedafb

377 parser.add_argument("-m", "--module", help="run a module") 1cedafb

378 parser.add_argument("-c", "--code", help="run raw code") 1cedafb

379 parser.add_argument("-d", "--dir", help="path to run script in") 1cedafb

380 parser.add_argument("-t", "--tasks", action="store_true", help="run doit tasks") 1cedafb

381 return parser 1cedafb

382 

383 

384def comment(str): 1cedafb

385 return textwrap.indent(str, "# ") 1k

386 

387 

388class DefsOnly(ast.NodeTransformer): 1cedafb

389 INCLUDE = ast.Import, ast.ImportFrom, ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef 1cedafb

390 

391 def visit_Module(self, node): 1cedafb

392 args = ([x for x in node.body if isinstance(x, self.INCLUDE)],) 1j

393 if VERSION >= (3, 8): 393 ↛ 395line 393 didn't jump to line 395, because the condition on line 393 was never false1j

394 args += (node.type_ignores,) 1j

395 return ast.Module(*args) 1j

396 

397 

398class Notebook(Loader): 1cedafb

399 """Notebook is a user friendly file finder and module loader for notebook source code. 

400 

401 > Remember, restart and run all or it didn't happen. 

402 

403 Notebook provides several useful options. 

404 

405 * Lazy module loading. A module is executed the first time it is used in a script. 

406 """ 

407 

408 def markdown(self, str): 1cedafb

409 return quote(str) 1tcdanbjolmuhipkqrvg

410 

411 def raw(self, str): 1cedafb

412 return comment(str) 

413 

414 def visit(self, nodes): 1cedafb

415 if self.include_non_defs: 1tcdanbjolsmuhipkqrvg

416 return nodes 1tcdanbolsmuhipkqrvg

417 return DefsOnly().visit(nodes) 1j

418 

419 def code(self, str): 1cedafb

420 if self.no_magic: 1tcedanbjolsmBuhipkqrvg

421 if MAGIC.match(str): 1k

422 return comment(str) 1k

423 return super().code(str) 1tcedanbjolsmBuhipkqrvg

424 

425 def source_to_nodes(self, source, path="<unknown>", *, _optimize=-1): 1cedafb

426 nodes = super().source_to_nodes(source, path) 1tcdanbjolsmuhipkqrvg

427 if self.include_markdown_docstring: 427 ↛ 429line 427 didn't jump to line 429, because the condition on line 427 was never false1tcdanbjolsmuhipkqrvg

428 nodes = update_docstring(nodes) 1tcdanbjolsmuhipkqrvg

429 nodes = self.visit(nodes) 1tcdanbjolsmuhipkqrvg

430 return ast.fix_missing_locations(nodes) 1tcdanbjolsmuhipkqrvg

431 

432 def raw_to_source(self, source): 1cedafb

433 """transform a string from a raw file to python source.""" 

434 if self.path and self.path.endswith(".ipynb"): 1tcedanbjolsmBuhipkqrvg

435 # when we encounter notebooks we apply different transformers to the diff cell types 

436 return LineCacheNotebookDecoder( 1tcdanbjolmuhipkqrvg

437 code=self.code, raw=self.raw, markdown=self.markdown 

438 ).decode(source, self.path) 

439 

440 # for a normal file we just apply the code transformer. 

441 return self.code(source) 1esB

442 

443 

444def _dict_module(ns): 1cedafb

445 m = ModuleType(ns.get("__name__"), ns.get("__doc__")) 1eB

446 m.__dict__.update(ns) 1eB

447 return m 1eB

448 

449 

450@contextmanager 1cedafb

451def main_argv(prog, args=None): 1cedafb

452 if args is not None: 452 ↛ 455line 452 didn't jump to line 455, because the condition on line 452 was never false1cedab

453 args = [prog] + list(args) 1cedab

454 prior, sys.argv = sys.argv, args 1cedab

455 yield 1cedab

456 if args is not None: 456 ↛ exitline 456 didn't return from function 'main_argv', because the condition on line 456 was never false1cedab

457 sys.argv = prior 1cedab

458 

459 

460try: 1cedafb

461 import IPython 1cedafb

462 from IPython.core.inputsplitter import IPythonInputSplitter 1cedafb

463 

464 dedent = IPythonInputSplitter( 1cedafb

465 line_input_checker=False, 

466 physical_line_transforms=[ 

467 IPython.core.inputsplitter.leading_indent(), 

468 IPython.core.inputsplitter.ipy_prompt(), 

469 IPython.core.inputsplitter.cellmagic(end_on_blank_line=False), 

470 ], 

471 ).transform_cell 

472except ModuleNotFoundError: 

473 

474 def dedent(body): 

475 from textwrap import dedent, indent 

476 

477 if MAGIC.match(body): 

478 return indent(body, "# ") 

479 return dedent(body)