Coverage for src/ramses_tx/frame.py: 19%

216 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - a RAMSES-II protocol decoder & analyser. 

3 

4Provide the base class for commands (constructed/sent packets) and packets. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10from typing import TYPE_CHECKING 

11 

12from . import exceptions as exc 

13from .address import ALL_DEV_ADDR, NON_DEV_ADDR, Address, pkt_addrs 

14from .const import COMMAND_REGEX, DEV_ROLE_MAP, DEV_TYPE_MAP 

15from .ramses import ( 

16 CODE_IDX_ARE_COMPLEX, 

17 CODE_IDX_ARE_NONE, 

18 CODE_IDX_ARE_SIMPLE, 

19 CODE_IDX_DOMAIN, 

20 CODES_ONLY_FROM_CTL, 

21 CODES_SCHEMA, 

22 CODES_WITH_ARRAYS, 

23 RQ_NO_PAYLOAD, 

24) 

25 

26# TODO: add _has_idx (as func return only one type, or raise) 

27 

28from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

29 I_, 

30 RP, 

31 RQ, 

32 W_, 

33 Code, 

34) 

35from .const import ( # noqa: F401, isort: skip, pylint: disable=unused-import 

36 F8, 

37 F9, 

38 FA, 

39 FC, 

40 FF, 

41) 

42 

43if TYPE_CHECKING: 

44 from .const import VerbT 

45 

46 

47_LOGGER = logging.getLogger(__name__) 

48 

49 

50HeaderT = str 

51PayloadT = str 

52_PktIdxT = str 

53 

54 

55class Frame: 

56 """The Frame class - used as a base by the Command and Packet classes. 

57 

58 `RQ --- 01:078710 10:067219 --:------ 3220 005 0000050000` 

59 """ 

60 

61 src: Address # Address | Device 

62 dst: Address # Address | Device 

63 _addrs: tuple[Address, Address, Address] 

64 

65 def __init__(self, frame: str) -> None: 

66 """Create a frame from a string. 

67 

68 :raises InvalidPacketError: if provided string is invalid. 

69 """ 

70 

71 self._frame: str = frame 

72 if not COMMAND_REGEX.match(self._frame): 

73 raise exc.PacketInvalid(f"Bad frame: invalid structure: >>>{frame}<<<") 

74 

75 fields = frame.lstrip().split(" ") 

76 

77 self.verb: VerbT = frame[:2] # type: ignore[assignment] 

78 self.seqn: str = fields[1] # . frame[3:6] 

79 self.code: Code = fields[5] # type: ignore[assignment] 

80 self.len_: str = fields[6] # . frame[42:45] FIXME: len_, _len & len(payload)/2 

81 self.payload: PayloadT = fields[7] # frame[46:].split(" ")[0] 

82 self._len: int = int(len(self.payload) / 2) 

83 

84 try: 

85 self.src, self.dst, *self._addrs = pkt_addrs( # type: ignore[assignment] 

86 " ".join(fields[i] for i in range(2, 5)) # frame[7:36] 

87 ) 

88 except exc.PacketInvalid as err: # will be: InvalidAddrSetError 

89 raise exc.PacketInvalid("Bad frame: invalid address set") from err 

90 

91 if len(self.payload) != int(self.len_) * 2: 

92 raise exc.PacketInvalid( 

93 f"Bad frame: invalid payload: " 

94 f"len({self.payload}) is not int('{self.len_}' * 2))" 

95 ) 

96 

97 self._ctx_: bool | str = None # type: ignore[assignment] 

98 self._hdr_: str = None # type: ignore[assignment] 

99 self._idx_: bool | str = None # type: ignore[assignment] 

100 

101 self._has_array_: bool = None # type: ignore[assignment] 

102 self._has_ctl_: bool = None # type: ignore[assignment] # TODO: remove 

103 self._has_payload_: bool = None # type: ignore[assignment] 

104 

105 self._repr: str = None # type: ignore[assignment] 

106 

107 # FIXME: this is messy 

108 def _validate(self, *, strict_checking: bool = False) -> None: 

109 """Validate the frame: it may be a cmd or a (response) pkt. 

110 

111 Raise an exception InvalidPacketError (InvalidAddrSetError) if it is not valid. 

112 """ 

113 

114 if len(self._frame[46:].split(" ")[0]) != int(self._frame[42:45]) * 2: 

115 raise exc.PacketInvalid("Bad frame: Payload length mismatch") 

116 

117 try: 

118 # self.src, self.dst, *self._addrs = pkt_addrs(self._frame[7:36]) 

119 src, dst, *addrs = pkt_addrs(self._frame[7:36]) 

120 except exc.PacketInvalid as err: # will be: InvalidAddrSetError 

121 raise exc.PacketInvalid("Bad frame: Invalid address set") from err 

122 

123 if not strict_checking: 

124 return 

125 

126 try: # Strict checking: helps users avoid constructing bad commands 

127 if addrs[0] == NON_DEV_ADDR: 

128 assert self.verb == I_, "wrong verb or dst addr should be present" 

129 elif addrs[2] == NON_DEV_ADDR: 

130 assert self.verb == I_ or src is not dst, ( 

131 "wrong verb or dst addr should not be src" 

132 ) 

133 elif addrs[0] is addrs[2]: 

134 assert self.verb == I_, "wrong verb or dst addr should not be src" 

135 else: 

136 assert self.verb in (I_, W_), "wrong verb or dst addr should be src" 

137 except AssertionError as err: 

138 raise exc.PacketInvalid(f"Bad frame: Invalid address set: {err}") from err 

139 

140 def __repr__(self) -> str: 

141 """Return an unambiguous string representation of this object.""" 

142 

143 if self._repr is None: 

144 self._repr = " ".join( # type: ignore[unreachable] 

145 ( 

146 self.verb, 

147 self.seqn, 

148 *(repr(a) for a in self._addrs), 

149 self.code, 

150 self.len_, 

151 self.payload, 

152 ) 

153 ) 

154 return self._repr 

155 

156 def __str__(self) -> str: 

157 """Return a brief readable string representation of this object.""" 

158 # repr(self) == repr(cls(str(self))) 

159 

160 try: 

161 return f"{self!r} # {self._hdr}" # code|ver|device_id|context 

162 except AttributeError as err: 

163 return f"{self!r} < {err}" 

164 

165 def __eq__(self, other: object) -> bool: 

166 if not hasattr(other, "_frame"): 

167 return NotImplemented 

168 return self._frame[4:] == other._frame[4:] # type: ignore[no-any-return] 

169 

170 @property 

171 def _has_array(self) -> None | bool: # TODO: a mess - has false negatives 

172 """Return the True if the payload is an array, False otherwise. 

173 

174 May return false negatives (e.g. arrays of length 1), and None if undetermined. 

175 

176 An example of a false negative is evohome with only one zone (i.e. the periodic 

177 2309/30C9/000A packets). 

178 """ 

179 

180 if self._has_array_ is not None: # HACK: overridden by detect_array(msg, prev) 

181 return self._has_array_ 

182 

183 # False -ves (array length is 1) are an acceptable compromise to extensive checking 

184 

185 # .W --- 01:145038 34:092243 --:------ 1FC9 006 07230906368E 

186 # .I --- 01:145038 --:------ 01:145038 1FC9 018 07000806368E-FC3B0006368E-071FC906368E 

187 # .I --- 01:145038 --:------ 01:145038 1FC9 018 FA000806368E-FC3B0006368E-FA1FC906368E 

188 # .I --- 34:092243 --:------ 34:092243 1FC9 030 0030C9896853-002309896853-001060896853-0010E0896853-001FC9896853 

189 if self.code == Code._1FC9: # type: ignore[unreachable] 

190 self._has_array_ = self.verb != RQ # safe to treat all as array, even len=1 

191 return self._has_array_ # don't do any checks for 1FC9 (they will fail) 

192 

193 elif self.verb != I_ or self.code not in CODES_WITH_ARRAYS: 

194 self._has_array_ = False 

195 

196 elif self._len != CODES_WITH_ARRAYS[self.code][0]: # NOTE: can be false -ves 

197 a, b = divmod(self._len, CODES_WITH_ARRAYS[self.code][0]) 

198 self._has_array_ = a > 0 and b == 0 

199 

200 elif ( 

201 self.code in (Code._22C9, Code._3150) 

202 and self.src.type == DEV_TYPE_MAP.UFC 

203 and self.dst is self.src 

204 and self.payload[:1] != "F" 

205 ): 

206 self._has_array_ = True 

207 

208 # elif self.code == Code._000C: # anachronism: variable array length 

209 # return 

210 

211 else: 

212 self._has_array_ = False 

213 

214 if self._has_array_: 

215 len_ = CODES_WITH_ARRAYS[self.code][0] 

216 

217 assert self._len % len_ == 0, ( 

218 f"{self} < array has length ({self._len}) that is not multiple of {len_}" 

219 ) 

220 assert ( 

221 self.src.type in (DEV_TYPE_MAP.DTS, DEV_TYPE_MAP.DT2) 

222 or self.src == self.dst # DEX 

223 ), f"{self} < array is from a non-controller (01)" 

224 assert ( 

225 self.src.type not in (DEV_TYPE_MAP.DTS, DEV_TYPE_MAP.DT2) 

226 or self.dst.id == NON_DEV_ADDR.id # DEX 

227 ), f"{self} < array is from a non-controller (02)" 

228 

229 # .I --- 10:040239 01:223036 --:------ 0009 003 000000 # not array 

230 # .I --- 01:102458 --:------ 01:102458 0009 006 FC01FF-F901FF 

231 # .I --- 01:145038 --:------ 01:145038 0009 006 FC00FF-F900FF 

232 # .I 034 --:------ --:------ 12:126457 2309 006 017EFF-027EFF 

233 # .I --- 01:223036 --:------ 01:223036 000A 012 081001F40DAC-091001F40DAC # 2nd fragment 

234 # .I 024 --:------ --:------ 12:126457 000A 012 010001F40BB8-020001F40BB8 

235 # .I --- 02:044328 --:------ 02:044328 22C9 018 0001F40A2801-0101F40A2801-0201F40A2801 

236 # .I --- 23:100224 --:------ 23:100224 2249 007 007EFF7EFFFFFF # can have 2 zones 

237 # .I --- 02:044328 --:------ 02:044328 22C9 018 0001F40A2801-0101F40A2801-0201F40A2801 

238 # .I --- 02:001107 --:------ 02:001107 3150 010 007A-017A-027A-036A-046A 

239 

240 return self._has_array_ 

241 

242 @property 

243 def _has_ctl(self) -> None | bool: 

244 """Return True if the packet is to/from a controller.""" 

245 

246 # NB: the difference between these (_has_ctl, src, dst, -:-) and above (_has_ctl, src, -:-, src) 

247 # 2000-01-01T03:00:00.000000 ... I --- 37:123456 --:------ 37:123456 31DA 030 00C8400518646427102AF82EE031FFFFFFC800C8C83FFF64640AFF0AFF00 

248 # 2022-11-20T08:32:06.904058 063 RP --- 32:134446 37:171685 --:------ 31DA 030 00EF007FFF2F1B0226069A07EEFFE4F8000038988F0000EFEF1F2420FC00 

249 # Maybe only use this for CH/DHW, and not HVAC? 

250 

251 if self._has_ctl_ is not None: 

252 return self._has_ctl_ 

253 

254 # TODO: handle RQ/RP to/from HGI/RFG, handle HVAC 

255 

256 if {self.src.type, self.dst.type} & { # type: ignore[unreachable] 

257 DEV_TYPE_MAP.CTL, 

258 DEV_TYPE_MAP.UFC, 

259 DEV_TYPE_MAP.PRG, 

260 }: # DEX 

261 _LOGGER.debug(f"{self} # HAS controller (10)") 

262 self._has_ctl_ = True 

263 

264 # .I --- 12:010740 --:------ 12:010740 30C9 003 0008D9 # not ctl 

265 elif self.dst is self.src: # (not needed?) & self.code == I_: 

266 _LOGGER.debug( 

267 f"{self} < " 

268 + ( 

269 "HAS" 

270 if self.code in CODES_ONLY_FROM_CTL + (Code._31D9, Code._31DA) 

271 else "no" 

272 ) 

273 + " controller (20)" 

274 ) 

275 self._has_ctl_ = any( 

276 ( 

277 self.code == Code._3B00 and self.payload[:2] == FC, 

278 self.code in CODES_ONLY_FROM_CTL + (Code._31D9, Code._31DA), 

279 ) 

280 ) 

281 

282 # .I --- --:------ --:------ 10:050360 1FD4 003 002ABE # no ctl 

283 # .I 095 --:------ --:------ 12:126457 1F09 003 000BC2 # HAS ctl 

284 # .I --- --:------ --:------ 20:001473 31D9 003 000001 # ctl? (HVAC) 

285 elif self.dst.id == NON_DEV_ADDR.id: 

286 _LOGGER.debug(f"{self} # HAS controller (21)") 

287 self._has_ctl_ = self.src.type != DEV_TYPE_MAP.OTB # DEX 

288 

289 # .I --- 10:037879 --:------ 12:228610 3150 002 0000 # HAS ctl 

290 # .I --- 04:029390 --:------ 12:126457 1060 003 01FF01 # HAS ctl 

291 elif self.dst.type in (DEV_TYPE_MAP.DTS, DEV_TYPE_MAP.DT2): # DEX 

292 _LOGGER.debug(f"{self} # HAS controller (22)") 

293 self._has_ctl_ = True 

294 

295 # RQ --- 30:258720 10:050360 --:------ 3EF0 001 00 # UNKNOWN (99) 

296 # RP --- 10:050360 30:258720 --:------ 3EF0 006 000011010A1C # UNKNOWN (99) 

297 

298 # RQ --- 18:006402 13:049798 --:------ 1FC9 001 00 

299 # RP --- 13:049798 18:006402 --:------ 1FC9 006 003EF034C286 

300 # RQ --- 30:258720 10:050360 --:------ 22D9 001 00 

301 # RP --- 10:050360 30:258720 --:------ 22D9 003 0003E8 

302 # RQ --- 30:258720 10:050360 --:------ 3220 005 0000120000 

303 # RP --- 10:050360 30:258720 --:------ 3220 005 0040120166 

304 # RQ --- 30:258720 10:050360 --:------ 3EF0 001 00 

305 # RP --- 10:050360 30:258720 --:------ 3EF0 006 000011010A1C 

306 

307 # .I --- 34:021943 63:262142 --:------ 10E0 038 000001C8380A01... # unknown 

308 # .I --- 32:168090 30:082155 --:------ 31E0 004 0000C800 # unknown 

309 

310 if self._has_ctl_ is None: 

311 # if DEV_MODE and DEV_TYPE_MAP.HGI not in ( 

312 # self.src.type, 

313 # self.dst.type, 

314 # ): # DEX 

315 # _LOGGER.warning(f"{self} # has_ctl - undetermined (99)") 

316 self._has_ctl_ = False 

317 

318 return self._has_ctl_ 

319 

320 @property 

321 def _has_idx(self) -> bool: 

322 """Return True if the payload has an index (or has an array), False otherwise.""" 

323 

324 return self._idx is not False 

325 

326 @property 

327 def _has_payload(self) -> bool: 

328 """Return True if the packet has a non-null payload, False otherwise. 

329 

330 May return false positives. The payload may still have an idx. 

331 """ 

332 

333 if self._has_payload_ is not None: 

334 return self._has_payload_ 

335 

336 self._has_payload_ = not any( # type: ignore[unreachable] 

337 ( 

338 self._len == 1, 

339 self.verb == RQ and self.code in RQ_NO_PAYLOAD, 

340 self.verb == RQ and self._len == 2 and self.code != Code._0016, 

341 # self.verb == RQ and self._len == 2 and self.code in ( 

342 # Code._2309, Code._2349, Code._3EF1 

343 # ), 

344 ) 

345 ) 

346 

347 return self._has_payload_ 

348 

349 def _force_has_array(self) -> None: 

350 self._has_array_ = True 

351 self._ctx_ = None # type: ignore[assignment] 

352 self._hdr_ = None # type: ignore[assignment] 

353 self._idx_ = None # type: ignore[assignment] 

354 

355 @property 

356 def _is_fragment(self) -> bool: 

357 """Return True is the payload *could* be a fragment, False otherwise.""" 

358 # .I 036 --:------ --:------ 12:126457 000A 012 010001F40BB8-020001F40BB8 # max 2 zones 

359 

360 return ( 

361 self.code in (Code._000A, Code._22C9) 

362 and self.verb == I_ 

363 and self.src is self.dst 

364 # and self._has_array # not needed 

365 ) or (self.code == Code._0404 and self.verb in (I_, RP)) 

366 

367 @property 

368 def _ctx(self) -> bool | str: # incl. self._idx 

369 """Return the payload's full context, if any (e.g. for 0404: zone_idx/frag_idx). 

370 

371 Used to store packets in the entity's message DB. It is a superset of _idx. 

372 """ 

373 

374 if self._ctx_ is not None: 

375 return self._ctx_ 

376 

377 if self.code in ( # type: ignore[unreachable] 

378 Code._0005, 

379 Code._000C, 

380 ): # zone_idx, zone_type (device_role) 

381 self._ctx_ = self.payload[:4] 

382 elif self.code == Code._0404: # zone_idx, frag_idx 

383 self._ctx_ = self._idx + self.payload[10:12] 

384 else: 

385 self._ctx_ = self._idx 

386 return self._ctx_ 

387 

388 @property 

389 def _hdr(self) -> HeaderT: # incl. self._ctx 

390 """Return the QoS header (fingerprint) of this packet (i.e. device_id|code|verb). 

391 

392 Used for QoS (timeouts, retries), callbacks, etc. 

393 """ 

394 

395 if self._hdr_ is not None: 

396 return self._hdr_ 

397 

398 # FIXME: HACK: sometimes RecursionError 

399 self._hdr_ = "|".join((self.code, self.verb)) # type: ignore[unreachable] 

400 self._hdr_ = pkt_header(self) 

401 return self._hdr_ 

402 

403 @property 

404 def _idx(self) -> bool | str: # FIXME: a mess 

405 """Return the payload's index, if any (e.g. zone_idx, domain_id or log_idx). 

406 

407 Used to route a packet to the correct entity's (i.e. zone/domain) msg handler. 

408 """ 

409 

410 if self._idx_ is not None: 

411 return self._idx_ 

412 

413 self._idx_ = _pkt_idx(self) or False # type: ignore[unreachable] 

414 return self._idx_ 

415 

416 

417# TODO: a mess - has false negatives 

418def _pkt_idx(pkt: Frame) -> None | bool | str: # _has_array, _has_ctl 

419 """Return the payload's 2-byte context (e.g. zone_idx, domain_id or log_idx). 

420 

421 May return a 2-byte string (usu. pkt.payload[:2]), or: 

422 - False if there is no context at all 

423 - True if the payload is an array 

424 - None if it is indeterminable 

425 """ 

426 # The three iterables (none, simple, complex) are mutex 

427 

428 # FIXME: 0016 is broken 

429 

430 # mutex 2/4, CODE_IDX_COMPLEX: are not payload[:2] 

431 if pkt.code == Code._0005: 

432 return pkt._has_array 

433 

434 # .I --- 10:040239 01:223036 --:------ 0009 003 000000 

435 if pkt.code == Code._0009 and pkt.src.type == DEV_TYPE_MAP.OTB: # DEX 

436 return False 

437 

438 if pkt.code == Code._000C: # zone_idx/domain_id (complex, payload[0:4]) 

439 if pkt.payload[2:4] == DEV_ROLE_MAP.APP: # "000F" 

440 return str(FC) # mypy 

441 if pkt.payload[0:4] == f"01{DEV_ROLE_MAP.HTG}": # "010E" 

442 return str(F9) # mypy 

443 if pkt.payload[2:4] in ( 

444 DEV_ROLE_MAP.DHW, 

445 DEV_ROLE_MAP.HTG, 

446 ): # "000D", "000E" 

447 return str(FA) # mypy 

448 return pkt.payload[:2] 

449 

450 if pkt.code == Code._0404: # assumes only 1 DHW zone (can be 2, but never seen) 

451 return "HW" if pkt.payload[2:4] == "23" else pkt.payload[:2] 

452 

453 if pkt.code == Code._0418: # log_idx (payload[4:6]) 

454 return pkt.payload[4:6] 

455 

456 if pkt.code == Code._1100: # TODO; can do in parser 

457 return pkt.payload[:2] if pkt.payload[:1] == "F" else False # only FC 

458 

459 if pkt.code == Code._3220: # msg_id/data_id (payload[4:6]) 

460 return pkt.payload[4:6] 

461 

462 if pkt.code in CODE_IDX_ARE_COMPLEX: # these should be handled above 

463 raise NotImplementedError(f"{pkt} # CODE_IDX_COMPLEX") # a coding error 

464 

465 # mutex 1/4, CODE_IDX_NONE: always returns False 

466 if pkt.code in CODE_IDX_ARE_NONE: # returns False 

467 if ( 

468 CODES_SCHEMA[pkt.code].get(pkt.verb, "")[:3] == "^00" 

469 and pkt.payload[:2] != "00" 

470 ): 

471 raise exc.PacketPayloadInvalid( 

472 f"Packet idx is {pkt.payload[:2]}, but expecting no idx (00) (0xAA)" 

473 ) 

474 return False 

475 

476 # mutex 3/4, CODE_IDX_SIMPLE: potentially some false -ves? 

477 if pkt._has_array: 

478 return True # excludes len==1 for 000A, 2309, 30C9 

479 

480 # TODO: is this needed?: exceptions to CODE_IDX_SIMPLE 

481 if pkt.payload[:2] in (F8, F9, FA, FC): # TODO: F6, F7?, FB, FD 

482 if pkt.code not in CODE_IDX_DOMAIN: 

483 raise exc.PacketPayloadInvalid( 

484 f"Packet idx is {pkt.payload[:2]}, but not expecting a domain id" 

485 ) 

486 return pkt.payload[:2] 

487 

488 if ( 

489 pkt._has_ctl # TODO: exclude HVAC? 

490 ): # risk of false -ves, TODO: pkt.src.type == DEV_TYPE_MAP.HGI too? # DEX 

491 # 02: 22C9: would be picked up as an array, if len==1 counted 

492 # 03: # .I 028 03:094242 --:------ 03:094242 30C9 003 010B22 # ctl 

493 # 12/22: 000A|1030|2309|30C9 from (addr0 --:), 1060|3150 (addr0 04:) 

494 # 23: 0009|10A0 

495 return pkt.payload[:2] # tcs._max_zones checked elsewhere 

496 

497 if pkt.code in (Code._31D9, Code._31DA): 

498 return pkt.payload[:2] 

499 

500 if pkt.payload[:2] != "00": 

501 raise exc.PacketPayloadInvalid( 

502 f"Packet idx is {pkt.payload[:2]}, but expecting no idx (00) (0xAB)" 

503 ) # TODO: add a test for this 

504 

505 if pkt.code in CODE_IDX_ARE_SIMPLE: 

506 return None # False # TODO: return None (less precise) or risk false -ves? 

507 

508 # mutex 4/4, CODE_IDX_UNKNOWN: an unknown code 

509 _LOGGER.info(f"{pkt} # Unable to determine payload index (is probably OK)") 

510 return None 

511 

512 

513def pkt_header(pkt: Frame, /, rx_header: bool = False) -> None | HeaderT: 

514 """Return the header of a packet (all packets have a header). 

515 

516 Used for QoS, and others. 

517 

518 For rx_header=True, return instead the header of the response packet, if one is 

519 expected, otherwise return None. 

520 

521 Examples include: 

522 I --- 04:155407 --:------ 04:155407 30C9 003 00092F # 30C9| I|04:155407 

523 RP --- 01:223036 18:005567 --:------ 2349 007 0404B000FFFFFF # 2349|RP|01:223036|04 

524 I --- 01:223036 --:------ 01:223036 3B00 002 FCC8 # 3B00| I|01:223036|FC 

525 RP --- 01:223036 18:005567 --:------ 000C 012 020800125F91020800125F8D # 000C|RP|01:223036|0208 

526 I --- 01:223036 --:------ 01:223036 2309 030 00-0640 01-03E8 02-03... # 2309| I|01:223036 (True) 

527 """ 

528 

529 if pkt.code == Code._1FC9: 

530 # .I --- 34:021943 --:------ 34:021943 1FC9 024 00-2309-8855B7 00-1FC9-8855B7 

531 # .W --- 01:145038 34:021943 --:------ 1FC9 006 00-2309-06368E # won't know src until it arrives 

532 # .I --- 34:021943 01:145038 --:------ 1FC9 006 00-2309-8855B7 

533 if not rx_header: 

534 device_id = ALL_DEV_ADDR.id if pkt.src == pkt.dst else pkt.dst.id 

535 return "|".join((pkt.code, pkt.verb, device_id)) 

536 if pkt.src == pkt.dst: # and pkt.verb == I_: 

537 return "|".join((pkt.code, W_, pkt.src.id)) 

538 if pkt.verb == W_: # and pkt.src != pkt.dst: 

539 return "|".join((pkt.code, I_, pkt.src.id)) # TODO: why not pkt.dst? 

540 # if pkt.verb == RQ: # and pkt.src != pkt.dst: # TODO: this breaks things 

541 # return "|".join((pkt.code, RP, pkt.dst.id)) 

542 return None 

543 

544 # RQ and W use the dst.id rather than the src.id, as: 

545 # - cmd.src could be 18:000730, and echo .src will have changed to (say) 18:123456 

546 # - cmd.dst is the effector 

547 

548 if rx_header: 

549 if pkt.verb in (I_, RP) or pkt.src == pkt.dst: # say: xxxx| W|00:000000|xx 

550 return None # no response expected 

551 header = "|".join((pkt.code, RP if pkt.verb == RQ else I_, pkt.dst.id)) 

552 

553 elif pkt.verb in (I_, RP) or pkt.src == pkt.dst: 

554 header = "|".join((pkt.code, pkt.verb, pkt.src.id)) 

555 

556 else: 

557 header = "|".join((pkt.code, pkt.verb, pkt.dst.id)) 

558 

559 try: 

560 return f"{header}|{pkt._ctx}" if isinstance(pkt._ctx, str) else header 

561 except AssertionError: 

562 return header