Coverage for tests/test_HA_MQTT/test_transport_callback.py: 95%

58 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:44 +0100

1#!/usr/bin/env python3 

2"""Unit tests for the CallbackTransport (Inversion of Control).""" 

3 

4import asyncio 

5import unittest 

6from typing import Any 

7from unittest.mock import AsyncMock, MagicMock 

8 

9from ramses_tx import exceptions as exc 

10from ramses_tx.transport import CallbackTransport 

11 

12 

13class TestCallbackTransport(unittest.IsolatedAsyncioTestCase): 

14 async def asyncSetUp(self) -> None: 

15 self.mock_protocol = MagicMock() 

16 self.mock_writer = AsyncMock() 

17 

18 # Initialize transport with our mocks 

19 self.transport = CallbackTransport( 

20 self.mock_protocol, io_writer=self.mock_writer 

21 ) 

22 

23 async def test_initial_state_is_paused(self) -> None: 

24 """Verify transport starts in PAUSED state (Circuit Breaker default).""" 

25 # It should default to not reading until explicitly resumed 

26 self.assertFalse(self.transport.is_reading()) 

27 

28 async def test_write_frame_delegates_to_writer(self) -> None: 

29 """Verify outbound frames are passed to the injected io_writer.""" 

30 test_frame = "--- RQ --- 18:000730 01:195932 --:------ 1F41 001 00" 

31 

32 await self.transport.write_frame(test_frame) 

33 

34 # Check if the injected writer was called with the exact frame 

35 self.mock_writer.assert_awaited_once_with(test_frame) 

36 

37 async def test_receive_frame_respects_circuit_breaker(self) -> None: 

38 """Verify inbound frames are gated by pause/resume state.""" 

39 test_frame = ( 

40 "059 RP --- 01:195932 04:017982 --:------ 313F 009 00FC2300C4150C07E9" 

41 ) 

42 

43 # 1. Test while PAUSED (Initial State) 

44 self.transport.pause_reading() 

45 self.transport.receive_frame(test_frame) 

46 

47 # Give the loop a chance to spin (in case it tried to process) 

48 await asyncio.sleep(0) 

49 

50 # Protocol should NOT have received data 

51 self.mock_protocol.pkt_received.assert_not_called() 

52 

53 # 2. Test while RESUMED 

54 self.transport.resume_reading() 

55 self.transport.receive_frame(test_frame) 

56 

57 # We must yield control to the loop so 'call_soon' tasks can execute 

58 await asyncio.sleep(0) 

59 

60 # Protocol SHOULD receive data now 

61 # Note: pkt_received is called with a Packet object, so we verify call count 

62 self.assertEqual(self.mock_protocol.pkt_received.call_count, 1) 

63 

64 async def test_write_error_handling(self) -> None: 

65 """Verify writer exceptions are wrapped in TransportError.""" 

66 self.mock_writer.side_effect = Exception("MQTT Connection Lost") 

67 

68 with self.assertRaises(exc.TransportError): 

69 await self.transport.write_frame("test_frame") 

70 

71 async def test_gateway_integration(self) -> None: 

72 """Verify the Gateway accepts the transport via IoC.""" 

73 from ramses_rf import Gateway 

74 

75 # Define a factory that returns our mocked transport 

76 async def mock_factory(protocol: Any, **kwargs: Any) -> CallbackTransport: 

77 # We must return the transport instance we are testing 

78 # but we need to update its protocol reference first 

79 self.transport._protocol = protocol 

80 

81 # 1. Simulate Gateway Identification (Critical for protocol state) 

82 # Use the string literal "active_hgi" to avoid import issues 

83 self.transport._extra["active_hgi"] = "18:000730" 

84 

85 # 2. Manually signal that the connection is made. 

86 # MUST include ramses=True to satisfy the protocol stack 

87 protocol.connection_made(self.transport, ramses=True) 

88 

89 return self.transport 

90 

91 # Initialize Gateway with the factory 

92 gwy = Gateway("/dev/null", transport_constructor=mock_factory) 

93 await gwy.start() 

94 

95 # Verify the Gateway is actually using our transport 

96 self.assertIs(gwy._transport, self.transport) 

97 

98 await gwy.stop() 

99 

100 async def test_factory_propagates_disable_sending(self) -> None: 

101 """Verify transport_factory passes disable_sending=True to the constructor.""" 

102 from ramses_rf import Gateway 

103 

104 # 1. Define a factory that checks if disable_sending was passed 

105 async def strict_factory(protocol: Any, **kwargs: Any) -> CallbackTransport: 

106 # Check if 'disable_sending' made it through 

107 if not kwargs.get("disable_sending"): 

108 raise ValueError("disable_sending flag was lost in the factory!") 

109 

110 # Create the transport 

111 transport = CallbackTransport( 

112 protocol, 

113 io_writer=self.mock_writer, 

114 disable_sending=kwargs["disable_sending"], 

115 ) 

116 

117 # We must tell the protocol we are connected, or gwy.start() will timeout 

118 transport._extra["active_hgi"] = "18:000730" 

119 protocol.connection_made(transport, ramses=True) 

120 

121 return transport 

122 

123 # 2. Initialize Gateway normally (bypass Schema validation) 

124 # We do NOT pass disable_sending here to avoid the Voluptuous error 

125 gwy = Gateway("/dev/null", transport_constructor=strict_factory) 

126 

127 # 3. Force the flag internally. 

128 # This simulates the Gateway being in a read-only state (e.g. reading from a file) 

129 # and ensures we test that this state is propagated to the Transport Factory. 

130 gwy._disable_sending = True 

131 

132 # 4. Start the Gateway (this triggers the factory) 

133 try: 

134 await gwy.start() 

135 except ValueError as err: 

136 self.fail(f"Test failed: {err}") 

137 finally: 

138 await gwy.stop()