Coverage for src/ramses_cli/client.py: 44%

307 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:44 +0100

1#!/usr/bin/env python3 

2"""A CLI for the ramses_rf library.""" 

3 

4from __future__ import annotations 

5 

6import asyncio 

7import json 

8import logging 

9import sys 

10from collections.abc import Mapping 

11from typing import TYPE_CHECKING, Any, Final, Literal 

12 

13import click 

14from colorama import Fore, Style, init as colorama_init 

15 

16from ramses_rf import Gateway, GracefulExit, Message, exceptions as exc 

17from ramses_rf.const import DONT_CREATE_MESSAGES, SZ_ZONE_IDX 

18from ramses_rf.helpers import deep_merge 

19from ramses_rf.schemas import ( 

20 SCH_GLOBAL_CONFIG, 

21 SZ_CONFIG, 

22 SZ_DISABLE_DISCOVERY, 

23 SZ_ENABLE_EAVESDROP, 

24 SZ_REDUCE_PROCESSING, 

25) 

26from ramses_tx import is_valid_dev_id 

27from ramses_tx.logger import CONSOLE_COLS, DEFAULT_DATEFMT, DEFAULT_FMT 

28from ramses_tx.schemas import ( 

29 SZ_DISABLE_QOS, 

30 SZ_DISABLE_SENDING, 

31 SZ_ENFORCE_KNOWN_LIST, 

32 SZ_EVOFW_FLAG, 

33 SZ_FILE_NAME, 

34 SZ_KNOWN_LIST, 

35 SZ_PACKET_LOG, 

36 SZ_SERIAL_PORT, 

37) 

38 

39from .debug import SZ_DBG_MODE, start_debugging 

40from .discovery import GET_FAULTS, GET_SCHED, SET_SCHED, spawn_scripts 

41 

42from ramses_rf.const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

43 I_, 

44 RP, 

45 RQ, 

46 W_, 

47 DEV_TYPE_MAP, 

48 Code, 

49) 

50 

51if TYPE_CHECKING: 

52 from _typeshed import SupportsRead 

53 

54 

55_PROFILE_LIBRARY = False # NOTE: for profiling of library 

56 

57if _PROFILE_LIBRARY: 

58 import cProfile 

59 import pstats 

60 

61 

62SZ_INPUT_FILE: Final = "input_file" 

63 

64# DEFAULT_SUMMARY can be: True, False, or None 

65SHOW_SCHEMA = False 

66SHOW_PARAMS = False 

67SHOW_STATUS = False 

68SHOW_KNOWNS = False 

69SHOW_TRAITS = False 

70SHOW_CRAZYS = False 

71 

72PRINT_STATE = False # print engine state 

73# GET_STATE = False # get engine state 

74# SET_STATE = False # set engine state 

75 

76# this is called after import colorlog to ensure its handlers wrap the correct streams 

77logging.basicConfig(level=logging.WARNING, format=DEFAULT_FMT, datefmt=DEFAULT_DATEFMT) 

78 

79 

80EXECUTE: Final = "execute" 

81LISTEN: Final = "listen" 

82MONITOR: Final = "monitor" 

83PARSE: Final = "parse" 

84 

85 

86COLORS = { 

87 I_: Fore.GREEN, 

88 RP: Fore.CYAN, 

89 RQ: Fore.CYAN, 

90 W_: Style.BRIGHT + Fore.MAGENTA, 

91} 

92 

93CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) 

94 

95LIB_KEYS = tuple(SCH_GLOBAL_CONFIG({}).keys()) + (SZ_SERIAL_PORT,) 

96LIB_CFG_KEYS = tuple(SCH_GLOBAL_CONFIG({})[SZ_CONFIG].keys()) + (SZ_EVOFW_FLAG,) 

97 

98 

99def normalise_config( 

100 lib_config: dict[str, dict[str, str | bool | None]], 

101) -> tuple[str | None, dict[str, Any] | None]: 

102 """Convert a HA config dict into the client library's own format.""" 

103 

104 serial_port = lib_config.pop(SZ_SERIAL_PORT, None) 

105 

106 # fix for: https://github.com/ramses-rf/ramses_rf/issues/96 

107 packet_log: str | Mapping[str, str | bool | None] | None = lib_config.get( 

108 SZ_PACKET_LOG 

109 ) 

110 if isinstance(packet_log, str): 

111 packet_log = {SZ_FILE_NAME: packet_log} 

112 assert isinstance(packet_log, dict) 

113 lib_config[SZ_PACKET_LOG] = packet_log 

114 

115 return serial_port, lib_config # type: ignore[return-value] 

116 

117 

118def split_kwargs( 

119 obj: tuple[dict[str, Any], dict[str, Any]], kwargs: dict[str, Any] 

120) -> tuple[dict[str, Any], dict[str, Any]]: 

121 """Split kwargs into cli/library kwargs.""" 

122 cli_kwargs, lib_kwargs = obj 

123 

124 cli_kwargs.update( 

125 {k: v for k, v in kwargs.items() if k not in LIB_KEYS + LIB_CFG_KEYS} 

126 ) 

127 lib_kwargs.update({k: v for k, v in kwargs.items() if k in LIB_KEYS}) 

128 lib_kwargs[SZ_CONFIG].update({k: v for k, v in kwargs.items() if k in LIB_CFG_KEYS}) 

129 

130 return cli_kwargs, lib_kwargs 

131 

132 

133class DeviceIdParamType(click.ParamType): 

134 name = "device_id" 

135 

136 def convert(self, value: str, param: Any, ctx: click.Context | None) -> str: 

137 if is_valid_dev_id(value): 

138 return value.upper() 

139 self.fail(f"{value!r} is not a valid device_id", param, ctx) 

140 

141 

142# Args/Params for both RF and file 

143@click.group(context_settings=CONTEXT_SETTINGS) # , invoke_without_command=True) 

144@click.option("-z", "--debug-mode", count=True, help="enable debugger") 

145@click.option("-c", "--config-file", type=click.File("r")) 

146@click.option("-rk", "--restore-schema", type=click.File("r"), help="from a HA store") 

147@click.option("-rs", "--restore-state", type=click.File("r"), help=" from a HA store") 

148@click.option("-r", "--reduce-processing", count=True, help="-rrr will give packets") 

149@click.option("-lf", "--long-format", is_flag=True, help="dont truncate STDOUT") 

150@click.option("-e/-ne", "--eavesdrop/--no-eavesdrop", default=None) 

151@click.option("-g", "--print-state", count=True, help="print state (g=schema, gg=all)") 

152# @click.option("--get-state/--no-get-state", default=GET_STATE, help="get the engine state") 

153# @click.option("--set-state/--no-set-state", default=SET_STATE, help="set the engine state") 

154@click.option( # show_schema 

155 "-k/-nk", 

156 "--show-schema/--no-show-schema", 

157 default=SHOW_SCHEMA, 

158 help="display system schema", 

159) 

160@click.option( # show_params 

161 "-p/-np", 

162 "--show-params/--no-show-params", 

163 default=SHOW_PARAMS, 

164 help="display system params", 

165) 

166@click.option( # show_status 

167 "-s/-ns", 

168 "--show-status/--no-show-status", 

169 default=SHOW_STATUS, 

170 help="display system state", 

171) 

172@click.option( # show_knowns 

173 "-n/-nn", 

174 "--show-knowns/--no-show-knowns", 

175 default=SHOW_KNOWNS, 

176 help="display known_list (of devices)", 

177) 

178@click.option( # show_traits 

179 "-t/-nt", 

180 "--show-traits/--no-show-traits", 

181 default=SHOW_TRAITS, 

182 help="display device traits", 

183) 

184@click.option( # show_crazys 

185 "-x/-nx", 

186 "--show-crazys/--no-show-crazys", 

187 default=SHOW_CRAZYS, 

188 help="display crazy things", 

189) 

190@click.pass_context 

191def cli( 

192 ctx: click.Context, 

193 /, 

194 config_file: SupportsRead[str | bytes] | None = None, 

195 eavesdrop: None | bool = None, 

196 **kwargs: Any, 

197) -> None: 

198 """A CLI for the ramses_rf library.""" 

199 

200 if kwargs[SZ_DBG_MODE] > 0: # Do first 

201 start_debugging(kwargs[SZ_DBG_MODE] == 1) 

202 

203 kwargs, lib_kwargs = split_kwargs(({}, {SZ_CONFIG: {}}), kwargs) 

204 

205 if eavesdrop is not None: 

206 lib_kwargs[SZ_CONFIG][SZ_ENABLE_EAVESDROP] = eavesdrop 

207 

208 if config_file: # TODO: validate file with voluptuous, use YAML 

209 lib_kwargs = deep_merge( 

210 lib_kwargs, json.load(config_file) 

211 ) # CLI takes precedence 

212 

213 ctx.obj = kwargs, lib_kwargs 

214 

215 

216# Args/Params for packet log only 

217class FileCommand(click.Command): # client.py parse <file> 

218 def __init__(self, *args: Any, **kwargs: Any) -> None: 

219 super().__init__(*args, **kwargs) 

220 self.params.insert( # input_file name/path only 

221 0, click.Argument(("input-file",)) 

222 ) 

223 # self.params.insert( # --packet-log # NOTE: useful only for test/dev 

224 # 1, 

225 # click.Option( 

226 # ("-o", "--packet-log"), 

227 # type=click.Path(), 

228 # help="Log all packets to this file", 

229 # ), 

230 # ) 

231 

232 

233# Args/Params for RF packets only 

234class PortCommand( 

235 click.Command 

236): # client.py <command> <port> --packet-log xxx --evofw3-flag xxx 

237 def __init__(self, *args: Any, **kwargs: Any) -> None: 

238 super().__init__(*args, **kwargs) 

239 self.params.insert(0, click.Argument(("serial-port",))) 

240 """ # self.params.insert( # --no-discover 

241 # 1, 

242 # click.Option( 

243 # ("-d/-nd", "--discover/--no-discover"), 

244 # is_flag=True, 

245 # default=False, 

246 # help="Log all packets to this file", 

247 # ), 

248 # ) 

249 # """ 

250 self.params.insert( # --packet-log 

251 2, 

252 click.Option( 

253 ("-o", "--packet-log"), 

254 type=click.Path(), 

255 help="Log all packets to this file", 

256 ), 

257 ) 

258 self.params.insert( # --evofw-flag 

259 3, 

260 click.Option( 

261 ("-T", "--evofw-flag"), 

262 type=click.STRING, 

263 help="Pass this traceflag to evofw", 

264 ), 

265 ) 

266 

267 

268# 

269# 1/4: PARSE (a file, +/- eavesdrop) 

270@click.command(cls=FileCommand) # parse a packet log file, then stop 

271@click.pass_obj 

272def parse( 

273 obj: Any, /, **kwargs: Any 

274) -> tuple[Literal["parse"], dict[str, str], dict[str, str]]: 

275 """Command to parse a log file containing messages/packets.""" 

276 config, lib_config = split_kwargs(obj, kwargs) 

277 

278 lib_config[SZ_INPUT_FILE] = config.pop(SZ_INPUT_FILE) # just the file path 

279 

280 return PARSE, lib_config, config 

281 

282 

283# 

284# 2/4: MONITOR (listen to RF, +/- discovery, +/- eavesdrop) 

285@click.command(cls=PortCommand) # (optionally) execute a command/script, then monitor 

286@click.option("-d/-nd", "--discover/--no-discover", default=None) # --no-discover 

287@click.option( # --exec-cmd 'RQ 01:123456 1F09 00' 

288 "-x", "--exec-cmd", type=click.STRING, help="e.g. 'RQ 01:123456 1F09 00'" 

289) 

290@click.option( # --execute-scr script device_id 

291 "-X", 

292 "--exec-scr", 

293 type=(str, DeviceIdParamType()), 

294 help="scan_disc|scan_full|scan_hard|bind device_id", 

295) 

296@click.option( # --poll-devices device_id, device_id,... 

297 "--poll-devices", type=click.STRING, help="e.g. 'device_id, device_id, ...'" 

298) 

299@click.pass_obj 

300def monitor( 

301 obj: Any, /, discover: None | bool = None, **kwargs: Any 

302) -> tuple[Literal["monitor"], dict[str, str], dict[str, str]]: 

303 """Monitor (eavesdrop and/or probe) a serial port for messages/packets.""" 

304 config, lib_config = split_kwargs(obj, kwargs) 

305 

306 if discover is None: 

307 if kwargs["exec_scr"] is None and kwargs["poll_devices"] is None: 

308 print(" - discovery is enabled") 

309 lib_config[SZ_CONFIG][SZ_DISABLE_DISCOVERY] = False 

310 else: 

311 print(" - discovery is disabled") 

312 lib_config[SZ_CONFIG][SZ_DISABLE_DISCOVERY] = True 

313 

314 return MONITOR, lib_config, config 

315 

316 

317# 

318# 3/4: EXECUTE (send cmds to RF, +/- discovery, +/- eavesdrop) 

319@click.command(cls=PortCommand) # execute a (complex) script, then stop 

320@click.option("-d/-nd", "--discover/--no-discover", default=None) # --no-discover 

321@click.option( # --exec-cmd 'RQ 01:123456 1F09 00' 

322 "-x", "--exec-cmd", type=click.STRING, help="e.g. 'RQ 01:123456 1F09 00'" 

323) 

324@click.option( # --get-faults ctl_id 

325 "--get-faults", type=DeviceIdParamType(), help="controller_id" 

326) 

327@click.option( # --get-schedule ctl_id zone_idx|HW 

328 "--get-schedule", 

329 default=[None, None], 

330 type=(DeviceIdParamType(), str), 

331 help="controller_id, zone_idx (e.g. '0A', 'HW')", 

332) 

333@click.option( # --set-schedule ctl_id zone_idx|HW 

334 "--set-schedule", 

335 default=[None, None], 

336 type=(DeviceIdParamType(), click.File("r")), 

337 help="controller_id, filename.json", 

338) 

339@click.pass_obj 

340def execute( 

341 obj: Any, /, **kwargs: Any 

342) -> tuple[Literal["execute"], dict[str | None, str | dict[str, Any]], dict[str, str]]: 

343 """Execute any specified scripts, return the results, then quit. 

344 

345 Disables discovery, and enforces a strict allow_list. 

346 """ 

347 config, lib_config = split_kwargs(obj, kwargs) 

348 

349 print(" - discovery is force-disabled") 

350 lib_config[SZ_CONFIG][SZ_DISABLE_DISCOVERY] = True 

351 lib_config[SZ_CONFIG][SZ_DISABLE_QOS] = False 

352 

353 known_list: dict[str | None, dict[str, Any]] = {} 

354 if kwargs[GET_FAULTS]: 

355 known_list = {kwargs[GET_FAULTS]: {}} 

356 elif kwargs[GET_SCHED][0]: 

357 known_list = {kwargs[GET_SCHED][0]: {}} 

358 elif kwargs[SET_SCHED][0]: 

359 known_list = {kwargs[SET_SCHED][0]: {}} 

360 

361 if known_list: 

362 print(" - known list is force-configured/enforced") 

363 lib_config[SZ_KNOWN_LIST] = known_list 

364 lib_config[SZ_CONFIG][SZ_ENFORCE_KNOWN_LIST] = True 

365 

366 return EXECUTE, lib_config, config # type: ignore[return-value] 

367 

368 

369# 

370# 4/4: LISTEN (to RF, +/- eavesdrop - NO sending/discovery) 

371@click.command(cls=PortCommand) # (optionally) execute a command, then listen 

372@click.pass_obj 

373def listen( 

374 obj: Any, /, **kwargs: Any 

375) -> tuple[ 

376 Literal["listen"], dict[str, str | dict[str, str | None] | None], dict[str, Any] 

377]: 

378 """Listen to (eavesdrop only) a serial port for messages/packets.""" 

379 config, lib_config = split_kwargs(obj, kwargs) 

380 

381 print(" - sending is force-disabled") 

382 lib_config[SZ_CONFIG][SZ_DISABLE_SENDING] = True 

383 

384 return LISTEN, lib_config, config 

385 

386 

387def print_results(gwy: Gateway, **kwargs: Any) -> None: 

388 if kwargs[GET_FAULTS]: 

389 fault_log = gwy.system_by_id[kwargs[GET_FAULTS]]._faultlog.faultlog 

390 

391 if fault_log: 

392 [print(f"{k:02X}", v) for k, v in fault_log.items()] 

393 else: 

394 print("No fault log, or failed to get the fault log.") 

395 

396 if kwargs[GET_SCHED][0]: 

397 system_id, zone_idx = kwargs[GET_SCHED] 

398 if zone_idx == "HW": 

399 zone = gwy.system_by_id[system_id].dhw 

400 else: 

401 zone = gwy.system_by_id[system_id].zone_by_idx[zone_idx] # type: ignore[assignment] 

402 assert zone 

403 schedule = zone.schedule 

404 

405 if schedule is None: 

406 print("Failed to get the schedule.") 

407 else: 

408 result = {SZ_ZONE_IDX: zone_idx, "schedule": schedule} 

409 print(">>> Schedule JSON begins <<<") 

410 print(json.dumps(result, indent=4)) 

411 print(">>> Schedule JSON ended <<<") 

412 

413 if kwargs[SET_SCHED][0]: 

414 system_id, _ = kwargs[GET_SCHED] 

415 

416 

417def _save_state(gwy: Gateway) -> None: 

418 schema, msgs = gwy.get_state() 

419 

420 with open("state_msgs.log", "w") as f: 

421 [f.write(f"{dtm} {pkt}\r\n") for dtm, pkt in msgs.items()] # if not m._expired 

422 

423 with open("state_schema.json", "w") as f: 

424 f.write(json.dumps(schema, indent=4)) 

425 

426 

427def _print_engine_state(gwy: Gateway, **kwargs: Any) -> None: 

428 (schema, packets) = gwy.get_state(include_expired=True) 

429 

430 if kwargs["print_state"] > 0: 

431 print(f"schema: {json.dumps(schema, indent=4)}\r\n") 

432 if kwargs["print_state"] > 1: 

433 print(f"packets: {json.dumps(packets, indent=4)}\r\n") 

434 

435 

436def print_summary(gwy: Gateway, **kwargs: Any) -> None: 

437 entity = gwy.tcs or gwy 

438 

439 if kwargs.get("show_schema"): 

440 print(f"Schema[{entity}] = {json.dumps(entity.schema, indent=4)}\r\n") 

441 

442 # schema = {d.id: d.schema for d in sorted(gwy.devices)} 

443 # print(f"Schema[devices] = {json.dumps({'schema': schema}, indent=4)}\r\n") 

444 

445 if kwargs.get("show_params"): 

446 print(f"Params[{entity}] = {json.dumps(entity.params, indent=4)}\r\n") 

447 

448 params = {d.id: d.params for d in sorted(gwy.devices)} 

449 print(f"Params[devices] = {json.dumps({'params': params}, indent=4)}\r\n") 

450 

451 if kwargs.get("show_status"): 

452 print(f"Status[{entity}] = {json.dumps(entity.status, indent=4)}\r\n") 

453 

454 status = {d.id: d.status for d in sorted(gwy.devices)} 

455 print(f"Status[devices] = {json.dumps({'status': status}, indent=4)}\r\n") 

456 

457 if kwargs.get("show_knowns"): # show device hints (show-knowns) 

458 print(f"allow_list (hints) = {json.dumps(gwy._include, indent=4)}\r\n") 

459 

460 if kwargs.get("show_traits"): # show device traits 

461 result = { 

462 d.id: d.traits # {k: v for k, v in d.traits.items() if k[:1] == "_"} 

463 for d in sorted(gwy.devices) 

464 } 

465 print(json.dumps(result, indent=4), "\r\n") 

466 

467 if kwargs.get("show_crazys"): 

468 for device in [d for d in gwy.devices if d.type == DEV_TYPE_MAP.CTL]: 

469 if gwy.msg_db: 

470 for msg in gwy.msg_db.get(device=device.id, code=Code._0005): 

471 print(f"{msg._pkt}") 

472 for msg in gwy.msg_db.get(device=device.id, code=Code._000C): 

473 print(f"{msg._pkt}") 

474 else: # TODO(eb): replace next block by 

475 # raise NotImplementedError 

476 for code, verbs in device._msgz.items(): 

477 if code in (Code._0005, Code._000C): 

478 for verb in verbs.values(): 

479 for pkt in verb.values(): 

480 print(f"{pkt}") 

481 print() 

482 for device in [d for d in gwy.devices if d.type == DEV_TYPE_MAP.UFC]: 

483 if gwy.msg_db: 

484 for msg in gwy.msg_db.get(device=device.id): 

485 print(f"{msg._pkt}") 

486 else: # TODO(eb): Q1 2026 replace next legacy block by 

487 # raise NotImplementedError 

488 for cd in device._msgz.values(): 

489 for verb in cd.values(): 

490 for pkt in verb.values(): 

491 print(f"{pkt}") 

492 print() 

493 

494 

495async def async_main(command: str, lib_kwargs: dict[str, Any], **kwargs: Any) -> None: 

496 """Do certain things.""" 

497 

498 def handle_msg(_msg: Message) -> None: 

499 """Process the message as it arrives (a callback). 

500 

501 In this case, the message is merely printed. 

502 """ 

503 

504 if kwargs["long_format"]: # HACK for test/dev 

505 print( 

506 f"{_msg.dtm.isoformat(timespec='microseconds')} ... {_msg!r}" 

507 f" # {_msg.payload}" # or f' # ("{msg.src!r}", "{msg.dst!r}")' 

508 ) 

509 return 

510 

511 dtm = f"{_msg.dtm:%H:%M:%S.%f}"[:-3] 

512 con_cols = CONSOLE_COLS 

513 

514 if _msg.code == Code._PUZZ: 

515 print(f"{Style.BRIGHT}{Fore.YELLOW}{dtm} {_msg}"[:con_cols]) 

516 elif _msg.src and _msg.src.type == DEV_TYPE_MAP.HGI: 

517 print(f"{Style.BRIGHT}{COLORS.get(_msg.verb)}{dtm} {_msg}"[:con_cols]) 

518 elif _msg.code == Code._1F09 and _msg.verb == I_: 

519 print(f"{Fore.YELLOW}{dtm} {_msg}"[:con_cols]) 

520 elif _msg.code in (Code._000A, Code._2309, Code._30C9) and _msg._has_array: 

521 print(f"{Fore.YELLOW}{dtm} {_msg}"[:con_cols]) 

522 else: 

523 print(f"{COLORS.get(_msg.verb)}{dtm} {_msg}"[:con_cols]) 

524 

525 serial_port, lib_kwargs = normalise_config(lib_kwargs) # type: ignore[assignment] 

526 

527 if kwargs["restore_schema"]: 

528 print(" - restoring client schema from a HA cache...") 

529 state: dict[str, Any] = json.load(kwargs["restore_schema"])["data"][ 

530 "client_state" 

531 ] 

532 lib_kwargs = lib_kwargs | state["schema"] 

533 

534 # if serial_port == "/dev/ttyMOCK": 

535 # from tests.deprecated.mocked_rf import MockGateway # FIXME: for test/dev 

536 # gwy = MockGateway(serial_port, **lib_kwargs) 

537 # else: 

538 gwy = Gateway(serial_port, **lib_kwargs) # passes action to gateway 

539 

540 if int(lib_kwargs[SZ_CONFIG][SZ_REDUCE_PROCESSING]) < DONT_CREATE_MESSAGES: 

541 # library will not send MSGs to STDOUT, so we'll send PKTs instead 

542 colorama_init(autoreset=True) # WIP: remove strip=True 

543 gwy.add_msg_handler(handle_msg) 

544 

545 if kwargs["restore_state"]: 

546 print(" - restoring packets from a HA cache...") 

547 state = json.load(kwargs["restore_state"])["data"]["client_state"] 

548 await gwy._restore_cached_packets(state["packets"]) 

549 

550 print("\r\nclient.py: Starting engine...") 

551 

552 try: # main code here 

553 await gwy.start() 

554 

555 # TODO: 

556 # python client.py -rrr listen /dev/ttyUSB0 

557 # cat *.log | head | python client.py parse 

558 

559 if command == EXECUTE: 

560 tasks = spawn_scripts(gwy, **kwargs) 

561 await asyncio.gather(*tasks) 

562 

563 elif command == MONITOR: 

564 _ = spawn_scripts(gwy, **kwargs) 

565 await asyncio.wait_for(gwy._protocol._wait_connection_lost, 1.0) # type: ignore[arg-type] 

566 

567 elif command in (LISTEN, PARSE): 

568 await asyncio.wait_for(gwy._protocol._wait_connection_lost, 1.0) # type: ignore[arg-type] 

569 

570 except asyncio.CancelledError: 

571 msg = "ended via: CancelledError (e.g. SIGINT)" 

572 except GracefulExit: 

573 msg = "ended via: GracefulExit" 

574 except KeyboardInterrupt: # FIXME: why isn't this captured here? see main 

575 msg = "ended via: KeyboardInterrupt" 

576 except exc.RamsesException as err: 

577 msg = f"ended via: RamsesException: {err}" 

578 else: # if no Exceptions raised, e.g. EOF when parsing, or Ctrl-C? 

579 msg = "ended without error (e.g. EOF)" 

580 finally: 

581 await gwy.stop() # what happens if we have an exception here? 

582 

583 print(f"\r\nclient.py: Engine stopped: {msg}") 

584 

585 # if kwargs["save_state"]: 

586 # _save_state(gwy) 

587 

588 if kwargs["print_state"]: 

589 _print_engine_state(gwy, **kwargs) 

590 

591 elif command == EXECUTE: 

592 print_results(gwy, **kwargs) 

593 

594 print_summary(gwy, **kwargs) 

595 

596 

597cli.add_command(parse) 

598cli.add_command(monitor) 

599cli.add_command(execute) 

600cli.add_command(listen) 

601 

602 

603def main() -> None: 

604 print("\r\nclient.py: Starting ramses_rf...") 

605 

606 try: 

607 result = cli(standalone_mode=False) 

608 except click.NoSuchOption as err: 

609 print(f"Error: {err}") 

610 sys.exit(-1) 

611 

612 if isinstance(result, int): 

613 sys.exit(result) 

614 

615 (command, lib_kwargs, kwargs) = result 

616 

617 if sys.platform == "win32": 

618 print(" - event_loop_policy set for win32") # do before asyncio.run() 

619 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) 

620 

621 profile = None 

622 try: 

623 if _PROFILE_LIBRARY: 

624 profile = cProfile.Profile() 

625 profile.run("asyncio.run(main(command, lib_kwargs, **kwargs))") 

626 else: 

627 asyncio.run(async_main(command, lib_kwargs, **kwargs)) 

628 except KeyboardInterrupt: # , SystemExit): 

629 print("\r\nclient.py: Engine stopped: ended via: KeyboardInterrupt") 

630 

631 if _PROFILE_LIBRARY: 

632 ps = pstats.Stats(profile) 

633 ps.sort_stats(pstats.SortKey.TIME).print_stats(20) 

634 

635 print(" - finished ramses_rf.\r\n") 

636 

637 

638if __name__ == "__main__": 

639 main()