Coverage for tests/tests_rf/test_database.py: 0%

84 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - Unit test for MessageIndex.""" 

3 

4import contextlib 

5from datetime import datetime as dt, timedelta as td 

6 

7from ramses_rf.database import MessageIndex 

8from ramses_tx import Message, Packet 

9 

10 

11class TestMessageIndex: 

12 """Test MessageIndex class.""" 

13 

14 _SRC1 = "32:166025" 

15 _SRC2 = "01:087939" # (CTR) 

16 _NONA = "--:------" 

17 _NOW = dt.now().replace(microsecond=0) 

18 

19 msg1: Message = Message._from_pkt( 

20 Packet(_NOW, "... I --- 32:166025 --:------ 32:166025 1298 003 007FFF") 

21 ) 

22 msg2: Message = Message._from_pkt( 

23 Packet( 

24 _NOW + td(seconds=10), 

25 "... I --- 32:166025 --:------ 32:166025 1298 003 001230", # co2_level 

26 ) 

27 ) 

28 msg3: Message = Message._from_pkt( 

29 Packet( 

30 _NOW + td(seconds=20), 

31 "060 I --- 01:087939 --:------ 01:087939 2309 021 0007D00106400201F40301F40401F40501F40601F4", 

32 ) 

33 ) 

34 msg4: Message = Message._from_pkt( 

35 Packet( 

36 _NOW + td(seconds=30), 

37 "060 I --- 32:166025 --:------ 32:166025 31DA 030 00EF00019E00EF06E17FFF08020766BE09001F0000000000008500850000", 

38 ) 

39 ) 

40 msg5: Message = Message._from_pkt( 

41 Packet( 

42 _NOW + td(seconds=40), 

43 "... I --- 04:189078 --:------ 01:145038 3150 002 0100", # heat_demand 

44 ) 

45 ) 

46 

47 msg6: Message = Message._from_pkt( 

48 Packet( 

49 _NOW + td(seconds=50), 

50 "061 RP --- 10:078099 01:087939 --:------ 3220 005 00C0110000", # OTB 

51 ) 

52 ) 

53 

54 msg7: Message = Message._from_pkt( 

55 Packet( 

56 _NOW + td(seconds=60), 

57 "... I --- 04:189078 --:------ 01:145038 12B0 003 040000", 

58 ) 

59 ) 

60 

61 async def test_add_msg(self) -> None: 

62 """Add a message to the MessageIndex.""" 

63 msg_db = MessageIndex() 

64 ret: Message | None 

65 

66 # add a message 

67 assert self.msg1.payload == { 

68 "co2_level": None, 

69 }, "unexpected parsed payload" 

70 

71 ret = msg_db.add(self.msg1) # replaced message 

72 

73 assert ret is None 

74 assert msg_db.contains(code="1298") 

75 assert len(msg_db.all()) == 1 

76 assert ( 

77 str(msg_db.all()) 

78 == "( I --- 32:166025 --:------ 32:166025 1298 003 007FFF,)" 

79 ) 

80 

81 # add another message with same code 

82 ret = msg_db.add(self.msg2) # replaced message 

83 

84 assert ( 

85 str(ret) 

86 == "|| 32:166025 | | I | co2_level | || {'co2_level': None}" 

87 ) 

88 assert len(msg_db.all()) == 1 

89 

90 # add another message with different code 

91 ret = msg_db.add(self.msg3) # new code 

92 

93 assert ret is None 

94 assert len(msg_db.all()) == 2 

95 

96 ret = msg_db.add(self.msg5) # new code 

97 assert ret is None 

98 ret = msg_db.add(self.msg5) # add copy code 

99 assert ret is None 

100 assert len(msg_db.all()) == 3 

101 

102 # test clear index 

103 msg_db.clr() 

104 assert len(msg_db.all()) == 0 

105 

106 msg_db.stop() # close sqlite3 connection 

107 

108 async def test_qry_msg(self) -> None: 

109 """Query the MessageIndex.""" 

110 msg_db = MessageIndex() 

111 msg_db.add(self.msg1) 

112 msg_db.add(self.msg2) 

113 msg_db.add(self.msg3) 

114 msg_db.add(self.msg4) 

115 msg_db.add(self.msg5) 

116 msg_db.add(self.msg6) 

117 

118 # qry by code 

119 assert msg_db.contains(code="2309"), "code 2309 missing" 

120 assert msg_db.contains(code="3150"), "code 3150 missing" 

121 assert msg_db.contains(src="01:087939", dst="01:087939", code="2309"), ( 

122 "src, dst missing" 

123 ) 

124 assert not msg_db.contains(src="01:12345", code="2309"), ( 

125 "random src should return False" 

126 ) 

127 assert not msg_db.contains(code="1234"), "a random code should return False" 

128 assert msg_db.contains(dst="01:087939"), "dst missing" 

129 assert not msg_db.contains(plk="co2_level"), ( 

130 "payload keys skipped if value is None" 

131 ) 

132 

133 with contextlib.suppress(ValueError): 

134 msg_db.qry_field("RANDOM from messages", (self._SRC1, self._SRC1)) 

135 # Only SELECT queries are allowed 

136 

137 # Use simplest SQLite query on MessageIndex 

138 sql = """ 

139 SELECT code, plk from messages WHERE (src = ? OR dst = ?) 

140 """ 

141 res: list[tuple[dt | str, str]] = msg_db.qry_field( 

142 sql, (self._SRC2, self._SRC2) 

143 ) 

144 assert res == [ 

145 ( 

146 "2309", 

147 "|zone_idx|setpoint|", 

148 ), 

149 ("3220", "|msg_id|msg_type|msg_name|value|description|"), 

150 ] 

151 

152 # Use multi-field SQLite query on MessageIndex 

153 sql = """ 

154 SELECT code, plk from messages WHERE verb in (' I', 'RP') 

155 AND (src = ? OR dst = ?) 

156 AND code in ('1298', '31DA') 

157 AND (plk LIKE '%co2_level%') 

158 """ 

159 res = msg_db.qry_field(sql, (self._SRC1, self._SRC1)) 

160 assert res == [ # key 'co2_level' included since value is not None 

161 ("1298", "|co2_level|"), 

162 ( 

163 "31DA", 

164 "|hvac_id|exhaust_fan_speed|fan_info|_unknown_fan_info_flags|co2_level|indoor_humidity|exhaust_temp|indoor_temp|outdoor_temp|speed_capabilities|bypass_position|supply_fan_speed|remaining_mins|post_heat|pre_heat|supply_flow_fault|exhaust_flow_fault|_extra|", 

165 ), 

166 ] 

167 assert msg_db.contains(plk="|co2_level|"), "payload keys missing" 

168 

169 # src only query on MessageIndex 

170 sql = """ 

171 SELECT code, dst from messages WHERE verb in (' I', 'RP') 

172 AND (src = ?) 

173 """ 

174 res = msg_db.qry_field(sql, ("04:189078",)) 

175 assert res == [("3150", "01:145038")] # so dst is addrs[2], not --:------ 

176 

177 # Use payload key SQLite query on MessageIndex 

178 sql = """ 

179 SELECT code, ctx from messages WHERE verb in (' I', 'RP') 

180 AND (src = ? OR dst = ?) 

181 AND (plk LIKE '%co2_level%') 

182 """ 

183 res = msg_db.qry_field(sql, (self._SRC1, self._SRC1)) 

184 assert res == [("1298", "False"), ("31DA", "00")] 

185 

186 assert msg_db.contains(plk="|co2_level|"), "payload keys missing" 

187 

188 assert len(msg_db.all()) == 5 

189 

190 # simulate entity_base _msgs lookup 

191 _SQL_SLICE = 9 

192 _id = "01:145038_01" 

193 sql = """ 

194 SELECT dtm from messages WHERE 

195 verb in (' I', 'RP') 

196 AND (src = ? OR dst = ?) 

197 AND ctx = ? 

198 """ 

199 _ctx_qry = "*" 

200 if len(_id) > _SQL_SLICE: 

201 _ctx_qry = _id[_SQL_SLICE + 1 :] 

202 m: tuple[Message, ...] = msg_db.qry( 

203 sql, (_id[:_SQL_SLICE], _id[:_SQL_SLICE], _ctx_qry) 

204 ) # e.g. 01:123456_01 

205 assert len(m) == 1 

206 assert m[0].payload == {"heat_demand": 0.0, "zone_idx": "01"} 

207 

208 msg_db.add(self.msg7) 

209 _id = "01:145038_04" 

210 m = msg_db.qry( 

211 sql, (_id[:_SQL_SLICE], _id[:_SQL_SLICE], _ctx_qry) 

212 ) # e.g. 01:123456_01 

213 assert len(m) == 1 

214 

215 # run maintenance loop 

216 # assert len(msg_db.all()) == 5 

217 # await msg_db._housekeeping_loop.housekeeping(self._NOW, _cutoff=dt(second=10)) 

218 # assert len(msg_db.all()) == 5 

219 

220 msg_db.stop() # close sqlite3 connection