import importlib
import functools
import sys
import os
import textwrap
import json
import types
import shutil
import pathlib
import importlib.util
from typing import Iterable
from typing import List
from typing import Tuple
import eliot
import trio
import toml
import lark
from voca import utils
from voca import streaming
from voca import log
from voca import parsing
from voca import context
from voca import config
[docs]@log.log_async_call
async def handle_message(wrapper_group: utils.WrapperGroup, data: dict):
"""Execute the command in ``data`` with the ``wrapper_group`` containing the grammar."""
message = data["result"]["hypotheses"][0]["transcript"]
with eliot.start_action(action_type="parse_command") as action:
handler = await make_specific_handler(wrapper_group, data)
tree = handler.parser.parse(message)
commands = parsing.extract_commands(tree)
for command in commands:
rule_name, args = command.data, command.children
function = handler.rule_name_to_function[rule_name]
with eliot.start_action(
action_type="run_command", command=rule_name, args=args, function=function
):
await function(args)
[docs]@log.log_call
def load_from_path(import_path: str, filename: str) -> types.ModuleType:
"""Load a module from a filesystem path."""
spec = importlib.util.spec_from_file_location(import_path, filename)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
[docs]@log.log_call
def get_backup_module(import_path: str, backup_dir: pathlib.Path) -> types.ModuleType:
"""Import a module from the backup directory."""
sys.path.insert(0, str(backup_dir))
try:
module = importlib.import_module(import_path)
except Exception:
module = None
finally:
del sys.path[0]
return module
[docs]@log.log_call
def save_backup_module(
module: types.ModuleType, import_path: str, backup_dir: pathlib.Path
) -> None:
"""Save a module in the backup directory."""
new_path = (backup_dir / import_path.replace(".", "/")).with_suffix(".py")
new_path.parent.mkdir(exist_ok=True, parents=True)
shutil.copy2(module.__file__, new_path)
[docs]@log.log_call
def get_module(
import_path: str, backup_dir: pathlib.Path, use_backup_modules: bool
) -> types.ModuleType:
"""Import module and cache it in backup_dir, returning backup on failure."""
try:
with eliot.start_action(
action_type="import_module",
import_path=import_path,
sys_path=sys.path,
sys_meta_path=sys.meta_path,
):
module = importlib.import_module(import_path)
except Exception:
if not use_backup_modules:
raise
module = get_backup_module(import_path, backup_dir)
else:
save_backup_module(module, import_path, backup_dir)
return module
[docs]@log.log_call
def collect_modules(
import_paths: Iterable[str], use_backup_modules: bool
) -> List[types.ModuleType]:
"""Collect modules from import paths, optionally defaulting to backup modules on failure."""
backup_dir = pathlib.Path(config.get_config_dir()) / "backup_modules"
modules = []
for import_path in import_paths:
module = get_module(import_path, backup_dir, use_backup_modules)
if module is not None:
modules.append(module)
return modules
[docs]def combine_registries(registries: utils.Registry) -> utils.Registry:
"""Combine multiple registries into a single one."""
combined = utils.Registry()
for registry in registries:
combined.pattern_to_function.update(registry.pattern_to_function)
combined.patterns.update(registry.patterns)
return combined
[docs]async def make_specific_handler(wrapper_group: utils.WrapperGroup, data: dict):
"""Build the command handler for the specific context."""
filtered = await context.filter_wrappers(wrapper_group, data)
registry = combine_registries([wrapper.registry for wrapper in filtered.wrappers])
rules = parsing.build_rules(registry)
grammar = parsing.build_grammar(registry, rules)
rule_name_to_function = {rule.name: rule.function for rule in rules}
parser = lark.Lark(
grammar, debug=True, lexer="dynamic_complete", maybe_placeholders=True
)
return utils.Handler(
registry=registry, parser=parser, rule_name_to_function=rule_name_to_function
)
[docs]@log.log_async_call
async def async_main(wrapper_group: utils.WrapperGroup):
"""Process input commands as newline-separated json on stdin."""
stream = trio._unix_pipes.PipeReceiveStream(os.dup(0))
receiver = streaming.TerminatedFrameReceiver(stream, b"\n")
async for message_bytes in receiver:
data = json.loads(message_bytes.decode())
try:
with eliot.Action.continue_task(
task_id=data.get("eliot_task_id", "@")
) as action:
await handle_message(wrapper_group=wrapper_group, data=data)
except Exception as e:
action.finish(e)
raise
sys.exit(0)
[docs]@log.log_call
def main(import_paths: Tuple[str], use_backup_modules: bool):
"""Get the wrapper group and start the event loop."""
modules = collect_modules(import_paths, use_backup_modules)
modules = [utils.transform_module(module) for module in modules]
wrapper_group = parsing.combine_modules(modules)
trio.run(functools.partial(async_main, wrapper_group=wrapper_group))