Coverage for tests/test_HA_MQTT/test_transport_callback.py: 95%
58 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:44 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:44 +0100
1#!/usr/bin/env python3
2"""Unit tests for the CallbackTransport (Inversion of Control)."""
4import asyncio
5import unittest
6from typing import Any
7from unittest.mock import AsyncMock, MagicMock
9from ramses_tx import exceptions as exc
10from ramses_tx.transport import CallbackTransport
13class TestCallbackTransport(unittest.IsolatedAsyncioTestCase):
14 async def asyncSetUp(self) -> None:
15 self.mock_protocol = MagicMock()
16 self.mock_writer = AsyncMock()
18 # Initialize transport with our mocks
19 self.transport = CallbackTransport(
20 self.mock_protocol, io_writer=self.mock_writer
21 )
23 async def test_initial_state_is_paused(self) -> None:
24 """Verify transport starts in PAUSED state (Circuit Breaker default)."""
25 # It should default to not reading until explicitly resumed
26 self.assertFalse(self.transport.is_reading())
28 async def test_write_frame_delegates_to_writer(self) -> None:
29 """Verify outbound frames are passed to the injected io_writer."""
30 test_frame = "--- RQ --- 18:000730 01:195932 --:------ 1F41 001 00"
32 await self.transport.write_frame(test_frame)
34 # Check if the injected writer was called with the exact frame
35 self.mock_writer.assert_awaited_once_with(test_frame)
37 async def test_receive_frame_respects_circuit_breaker(self) -> None:
38 """Verify inbound frames are gated by pause/resume state."""
39 test_frame = (
40 "059 RP --- 01:195932 04:017982 --:------ 313F 009 00FC2300C4150C07E9"
41 )
43 # 1. Test while PAUSED (Initial State)
44 self.transport.pause_reading()
45 self.transport.receive_frame(test_frame)
47 # Give the loop a chance to spin (in case it tried to process)
48 await asyncio.sleep(0)
50 # Protocol should NOT have received data
51 self.mock_protocol.pkt_received.assert_not_called()
53 # 2. Test while RESUMED
54 self.transport.resume_reading()
55 self.transport.receive_frame(test_frame)
57 # We must yield control to the loop so 'call_soon' tasks can execute
58 await asyncio.sleep(0)
60 # Protocol SHOULD receive data now
61 # Note: pkt_received is called with a Packet object, so we verify call count
62 self.assertEqual(self.mock_protocol.pkt_received.call_count, 1)
64 async def test_write_error_handling(self) -> None:
65 """Verify writer exceptions are wrapped in TransportError."""
66 self.mock_writer.side_effect = Exception("MQTT Connection Lost")
68 with self.assertRaises(exc.TransportError):
69 await self.transport.write_frame("test_frame")
71 async def test_gateway_integration(self) -> None:
72 """Verify the Gateway accepts the transport via IoC."""
73 from ramses_rf import Gateway
75 # Define a factory that returns our mocked transport
76 async def mock_factory(protocol: Any, **kwargs: Any) -> CallbackTransport:
77 # We must return the transport instance we are testing
78 # but we need to update its protocol reference first
79 self.transport._protocol = protocol
81 # 1. Simulate Gateway Identification (Critical for protocol state)
82 # Use the string literal "active_hgi" to avoid import issues
83 self.transport._extra["active_hgi"] = "18:000730"
85 # 2. Manually signal that the connection is made.
86 # MUST include ramses=True to satisfy the protocol stack
87 protocol.connection_made(self.transport, ramses=True)
89 return self.transport
91 # Initialize Gateway with the factory
92 gwy = Gateway("/dev/null", transport_constructor=mock_factory)
93 await gwy.start()
95 # Verify the Gateway is actually using our transport
96 self.assertIs(gwy._transport, self.transport)
98 await gwy.stop()
100 async def test_factory_propagates_disable_sending(self) -> None:
101 """Verify transport_factory passes disable_sending=True to the constructor."""
102 from ramses_rf import Gateway
104 # 1. Define a factory that checks if disable_sending was passed
105 async def strict_factory(protocol: Any, **kwargs: Any) -> CallbackTransport:
106 # Check if 'disable_sending' made it through
107 if not kwargs.get("disable_sending"):
108 raise ValueError("disable_sending flag was lost in the factory!")
110 # Create the transport
111 transport = CallbackTransport(
112 protocol,
113 io_writer=self.mock_writer,
114 disable_sending=kwargs["disable_sending"],
115 )
117 # We must tell the protocol we are connected, or gwy.start() will timeout
118 transport._extra["active_hgi"] = "18:000730"
119 protocol.connection_made(transport, ramses=True)
121 return transport
123 # 2. Initialize Gateway normally (bypass Schema validation)
124 # We do NOT pass disable_sending here to avoid the Voluptuous error
125 gwy = Gateway("/dev/null", transport_constructor=strict_factory)
127 # 3. Force the flag internally.
128 # This simulates the Gateway being in a read-only state (e.g. reading from a file)
129 # and ensures we test that this state is propagated to the Transport Factory.
130 gwy._disable_sending = True
132 # 4. Start the Gateway (this triggers the factory)
133 try:
134 await gwy.start()
135 except ValueError as err:
136 self.fail(f"Test failed: {err}")
137 finally:
138 await gwy.stop()