Source code for voca.caster_adapter

from __future__ import annotations

import contextlib
import sys
import types
import re

from typing import Optional
from typing import List
from typing_extensions import Protocol


import attr
import trio
import lark

import voca.plugins
from voca import utils
from voca import context
from voca import log
from voca import patching


[docs]class LazyLoader: def __getattr__(self, name): from voca.plugins import basic return getattr(basic, name)
basic = LazyLoader()
[docs]class AsyncActionType(Protocol):
[docs] async def execute(self, arg=None) -> None: ...
[docs]@attr.dataclass class TextAction: text: str
[docs] async def execute(self, arg=None) -> None: await basic.write(self.text.format_map(arg))
[docs]@attr.dataclass class KeyAction: name: str
[docs] async def execute(self, arg=None): await basic.press(self.name)
[docs]@attr.dataclass class RegisteredAction: instruction: AsyncActionType = attr.ib() rdescript: Optional[str] = attr.ib(default=None)
[docs] async def execute(self, arg=None) -> None: await self.instruction.execute(arg)
[docs]@attr.dataclass class ActionSequence: actions: List[AsyncActionType] def __add__(self, other): return attr.evolve(self, actions=self.actions + other.actions)
[docs] async def execute(self, arg=None): for action in self.actions: await action.execute(arg)
[docs]@attr.dataclass class ConditionalAction: condition: types.FunctionType action: RegisteredAction
[docs] async def execute(self, arg=None): if await self.condition(arg): await self.action(arg)
# @attr.dataclass # class AppContext: # title: str # executable: str
[docs]@attr.dataclass class Dictation: name: str
[docs] def make_definitions(self): return {self.name: r"/\w+/"}
[docs]@attr.dataclass class IntegerRefST: name: str start: int end: int
[docs] def make_definitions(self): mapping = utils.value_to_pronunciation() alternatives = [ mapping.get(str(i)) for i in range(self.start, min(self.end, 20)) ] return {self.name: "/" + "|".join(alternatives) + "/"}
[docs]@attr.dataclass class RepeatedAction: action: AsyncActionType extra: str
[docs] async def execute(self, arg=None): # self.extra should be the name of the extra, like 'n' # arg should be a mapping of values said times = int(arg[self.extra]) for _ in range(times): await self.action.execute(arg)
[docs]@attr.dataclass class DelayAction: duration: float
[docs] async def execute(self, arg=None): await trio.sleep(self.duration)
[docs]@attr.dataclass class Choice: name: str mapping: dict
[docs] def make_definitions(self): definitions = {k: utils.quote(v) for k, v in self.mapping.items()} definitions[self.name] = "|".join(definitions.keys()) return definitions
[docs]@attr.dataclass class MyRepeat: extra: str def __mul__(self, action): return RepeatedAction(action, self.extra) def __rmul__(self, action): return self * action
[docs]@attr.dataclass class FunctionAction: callable: types.FunctionType positional_arguments: tuple keyword_arguments: dict
[docs] async def execute(self, arg): return self.callable(*self.positional_arguments, **self.keyword_arguments)
[docs]class CasterTransformer(lark.Transformer):
[docs] def n(self, arg): pronunciation = "".join(arg[0]) value = utils.pronunciation_to_value()[pronunciation] return {"n": int(value)}
[docs]def transform_tree(tree): if not tree: return tree result = CasterTransformer().transform(tree[0]) return result
async def _run(action, data): result = transform_tree(data) await action.execute(result)
[docs]def add_to_registry(mapping, registry): # TODO don't mutate registry for pattern, action in mapping.items(): registry.register(utils.quote(pattern))(lambda data: _run(action, data)) return registry
def _function(f, **kwargs): return FunctionAction(f, positional_arguments=(), keyword_arguments=kwargs)
[docs]class MergeRule: pass
[docs]class CCRMerger: CORE = 1
[docs]@attr.dataclass class Grammar: name: str context: AppContext
[docs]class Settings: def __getitem__(self, name): return Settings() def __bool__(self): return False
[docs]class SpecTransformer(lark.Transformer):
[docs] def name(self, args): return "".join(args)
[docs] def literal_name(self, args): return " " + utils.quote(args[0].strip()) + " "
[docs] def angled_name(self, args): return args[0]
[docs] def optional_component(self, args): return f"[{args[0]}]"
[docs] def phrase_component(self, args): return args[0]
[docs] def component(self, args): return args[0]
[docs] def spec(self, args): return "".join(args).strip()
[docs] def group(self, args): return "(" + "|".join(args) + ")"
[docs]def convert_spec(spec): pattern = r""" ?start : spec spec : component+ component : optional_component | phrase_component | group optional_component : "[" component "]" group : "(" component ("|" component)+ ")" phrase_component : angled_name | literal_name angled_name : "<" name ">" literal_name : /[\w ]+/ name : /\w+/ %import common.WS %ignore WS """ parser = lark.Lark(pattern, debug=True, maybe_placeholders=True) tree = parser.parse(spec) result = "".join(SpecTransformer().transform(tree)) result = result.replace('""', "") # XXX # if result.startswith("[") and result.endswith("]"): # raise NotImplementedError(result + ": Buggy translator") return result
[docs]def adapt_AppContext(title=None, executable=None): return context.WindowContext(title=title)
[docs]def adapt_Dictation(text): return Dictation(text)
[docs]def convert_key_name(name): modifiers, _dash, simple = name.rpartition("-") modifier_map = {"c": "ctrl_l", "a": "alt", "s": "shift", "w": "cmd"} key_map = {"pgup": "page_up", "pgdown": "page_down"} new_modifiers = [utils.KeyModifier(modifier_map[m]) for m in modifiers] return utils.KeyChord(new_modifiers, key_map.get(simple, simple))
[docs]def adapt_Key(text): keys = [key.strip() for key in text.split(",")] actions = [] for key in keys: name, _slash, delay = key.partition("/") actions.append(KeyAction(convert_key_name(name))) if delay: actions.append(DelayAction(float(delay))) return ActionSequence(actions)
[docs]def adapt_Text(name): return ActionSequence([TextAction(name)])
[docs]def find_merge_rule_classes(module): rules = [] for obj in module.__dict__.values(): if ( isinstance(obj, type) and issubclass(obj, Placeholder) and obj is not Placeholder ): rules.append(obj) return rules
[docs]@attr.dataclass class Patch: module: types.ModuleType name: str new: object
[docs]@contextlib.contextmanager def monkeypatch(module, name, new): original = getattr(module, name) setattr(module, name, new) yield setattr(module, name, original)
[docs]def monkeypatch_each(patches): for patch in patches: monkeypatch(patch.module, patch.name, patch.new)
[docs]@log.log_call def add_wrapper(module): if getattr(module, "wrapper", False): return module rules = find_merge_rule_classes(module) registry = utils.Registry() for rule in rules: for spec, action in rule.mapping.items(): try: converted_spec = convert_spec(spec) except (lark.exceptions.UnexpectedCharacters, NotImplementedError): # XXX converted_spec = '"aklsdjfkldjfk"' # TODO Make this easier to read. registry.register(converted_spec)( lambda data, action=action: _run(action, data) ) for extra in rule.extras: registry.define(extra.make_definitions()) # TODO use module.context wrapper = utils.Wrapper(registry, context=context.AlwaysContext()) module.wrapper = wrapper # XXX Avoid mutate-and-return. return module
[docs]def patch_all(): # XXX Should fix this to avoid having to actually import castervoice. # Currently sys.modules is used to find castervoice in order to find its submodules. import castervoice finder = patching.make_finder(module_mapping) sys.meta_path.insert(0, finder) # XXX Maybe replace this with an import hook. utils.MODULE_TRANSFORMERS.append(add_wrapper)
[docs]class AttributeHaver: def __getattr__(self, name): return AttributeHaver()
[docs]class Placeholder: pass
[docs]class VirtualModule: def __init__(self, **kwargs): self.__dict__.update(kwargs)
[docs]class VirtualPackage: def __init__(self, path, contents): self.__path__ = path self.__dict__.update(contents)
R = RegisteredAction Text = adapt_Text Key = adapt_Key module_mapping = { "dragonfly.__init__": {}, "dragonfly": { "Grammar": Grammar, "Context": None, "AppContext": adapt_AppContext, "Dictation": adapt_Dictation, "Repeat": MyRepeat, "Function": _function, "Choice": Choice, "Mouse": None, "Pause": None, }, # "castervoice.lib": {}, "castervoice.lib.control": {}, "castervoice.lib.utilities": {"simple_log": lambda: None}, # "castervoice.apps": {}, # "castervoice.apps.__init__": {}, "castervoice.lib.settings": {"SETTINGS": Settings()}, "castervoice.lib.actions": {"Key": adapt_Key, "Text": adapt_Text}, "castervoice.lib.context": {"AppContext": adapt_AppContext}, "castervoice.lib.dfplus.additions": {"IntegerRefST": IntegerRefST}, "castervoice.lib.dfplus.merge": {"gfilter": None}, "castervoice.lib.dfplus.merge.mergerule": {"MergeRule": Placeholder}, "castervoice.lib.dfplus.state.short": {"R": RegisteredAction}, "castervoice.lib.dfplus.merge.ccrmerger": {"CCRMerger": AttributeHaver()}, }