Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid sending pings when we have traffic #463

Merged
merged 2 commits into from
Jul 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions aioesphomeapi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class APIConnection:
"_connect_task",
"_fatal_exception",
"_expected_disconnect",
"_loop",
"_send_pending_ping",
)

def __init__(
Expand Down Expand Up @@ -172,6 +174,8 @@ def __init__(
self._connect_task: Optional[asyncio.Task[None]] = None
self._fatal_exception: Optional[Exception] = None
self._expected_disconnect = False
self._send_pending_ping = False
self._loop = asyncio.get_event_loop()
bdraco marked this conversation as resolved.
Show resolved Hide resolved

@property
def connection_state(self) -> ConnectionState:
Expand Down Expand Up @@ -271,7 +275,7 @@ async def _connect_socket_connect(self, addr: hr.AddrInfo) -> None:
sockaddr = astuple(addr.sockaddr)

try:
coro = asyncio.get_event_loop().sock_connect(self._socket, sockaddr)
coro = self._loop.sock_connect(self._socket, sockaddr)
async with async_timeout.timeout(TCP_CONNECT_TIMEOUT):
await coro
except OSError as err:
Expand All @@ -290,7 +294,7 @@ async def _connect_socket_connect(self, addr: hr.AddrInfo) -> None:
async def _connect_init_frame_helper(self) -> None:
"""Step 3 in connect process: initialize the frame helper and init read loop."""
fh: Union[APIPlaintextFrameHelper, APINoiseFrameHelper]
loop = asyncio.get_event_loop()
loop = self._loop

if self._params.noise_psk is None:
_, fh = await loop.create_connection(
Expand Down Expand Up @@ -357,13 +361,10 @@ async def _connect_hello(self) -> None:
f"Server sent a different name '{resp.name}'", resp.name
)

async def _connect_start_ping(self) -> None:
"""Step 5 in connect process: start the ping loop."""
self._async_schedule_keep_alive(asyncio.get_running_loop())

def _async_schedule_keep_alive(self, loop: asyncio.AbstractEventLoop) -> None:
def _async_schedule_keep_alive(self) -> None:
"""Start the keep alive task."""
self._ping_timer = loop.call_later(
self._send_pending_ping = True
self._ping_timer = self._loop.call_later(
self._keep_alive_interval, self._async_send_keep_alive
)

Expand All @@ -372,14 +373,14 @@ def _async_send_keep_alive(self) -> None:
if not self._is_socket_open:
return

loop = asyncio.get_running_loop()
self.send_message(PING_REQUEST_MESSAGE)
if self._send_pending_ping:
self.send_message(PING_REQUEST_MESSAGE)

if self._pong_timer is None:
# Do not reset the timer if it's already set
# since the only thing we want to reset the timer
# is if we receive a pong.
self._pong_timer = loop.call_later(
self._pong_timer = self._loop.call_later(
self._keep_alive_timeout, self._async_pong_not_received
)
else:
Expand All @@ -399,7 +400,7 @@ def _async_send_keep_alive(self) -> None:
self._keep_alive_interval,
)

self._async_schedule_keep_alive(loop)
self._async_schedule_keep_alive()

def _async_cancel_pong_timer(self) -> None:
"""Cancel the pong timer."""
Expand Down Expand Up @@ -434,7 +435,7 @@ async def _do_connect() -> None:
await self._connect_socket_connect(addr)
await self._connect_init_frame_helper()
await self._connect_hello()
await self._connect_start_ping()
self._async_schedule_keep_alive()
if login:
await self.login(check_connected=False)

Expand Down Expand Up @@ -599,7 +600,7 @@ async def send_message_await_response_complex(

:raises TimeoutAPIError: if a timeout occured
"""
fut = asyncio.get_event_loop().create_future()
fut = self._loop.create_future()
responses = []

def on_message(resp: message.Message) -> None:
Expand Down Expand Up @@ -725,6 +726,11 @@ def _process_packet(self, msg_type_proto: int, data: bytes) -> None:
# as we know the connection is still alive
self._async_cancel_pong_timer()

if self._send_pending_ping:
# Any valid message from the remove cancels the pending ping
# since we know the connection is still alive
self._send_pending_ping = False

for handler in self._message_handlers.get(msg_type, [])[:]:
handler(msg)

Expand Down