From ba3e2e0389345e9a0f801b6e2fb25799dc1d91e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20B=C3=B6hm?= Date: Tue, 2 Jul 2024 01:07:40 +0200 Subject: [PATCH] refactor: Migrate to paho-mqtt callback v2; Close #308 --- aiomqtt/client.py | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/aiomqtt/client.py b/aiomqtt/client.py index c59ef8e..9ed4c8b 100644 --- a/aiomqtt/client.py +++ b/aiomqtt/client.py @@ -251,7 +251,7 @@ def __init__( # noqa: C901, PLR0912, PLR0913, PLR0915 # Create the underlying paho-mqtt client instance self._client: mqtt.Client = mqtt.Client( - callback_api_version=CallbackAPIVersion.VERSION1, + callback_api_version=CallbackAPIVersion.VERSION2, client_id=identifier, protocol=protocol.value, clean_session=clean_session, @@ -526,8 +526,8 @@ def _on_connect( # noqa: PLR0913 self, client: mqtt.Client, userdata: Any, - flags: dict[str, int], - rc: int | ReasonCode, + flags: mqtt.ConnectFlags, + reason_code: ReasonCode, properties: Properties | None = None, ) -> None: """Called when we receive a CONNACK message from the broker.""" @@ -537,17 +537,18 @@ def _on_connect( # noqa: PLR0913 # self._connected twice (as it raises an asyncio.InvalidStateError). if self._connected.done(): return - if rc == mqtt.CONNACK_ACCEPTED: + if reason_code == mqtt.CONNACK_ACCEPTED: self._connected.set_result(None) else: # We received a negative CONNACK response - self._connected.set_exception(MqttConnectError(rc)) + self._connected.set_exception(MqttConnectError(reason_code)) - def _on_disconnect( + def _on_disconnect( # noqa: PLR0913 self, client: mqtt.Client, userdata: Any, - rc: int | ReasonCode | None, + flags: mqtt.DisconnectFlags, + reason_code: ReasonCode, properties: Properties | None = None, ) -> None: # Return early if the disconnect is already acknowledged. @@ -567,11 +568,11 @@ def _on_disconnect( # See also: https://docs.python.org/3/library/asyncio-dev.html#detect-never-retrieved-exceptions if not self._connected.done() or self._connected.exception() is not None: return - if rc == mqtt.MQTT_ERR_SUCCESS: + if reason_code == mqtt.MQTT_ERR_SUCCESS: self._disconnected.set_result(None) else: self._disconnected.set_exception( - MqttCodeError(rc, "Unexpected disconnection") + MqttCodeError(reason_code, "Unexpected disconnection") ) def _on_subscribe( # noqa: PLR0913 @@ -579,14 +580,14 @@ def _on_subscribe( # noqa: PLR0913 client: mqtt.Client, userdata: Any, mid: int, - granted_qos: tuple[int, ...] | list[ReasonCode], + reason_codes: list[ReasonCode], properties: Properties | None = None, ) -> None: """Called when we receive a SUBACK message from the broker.""" try: fut = self._pending_subscribes.pop(mid) if not fut.done(): - fut.set_result(granted_qos) + fut.set_result(reason_codes) except KeyError: self._logger.exception( 'Unexpected message ID "%d" in on_subscribe callback', mid @@ -597,8 +598,8 @@ def _on_unsubscribe( # noqa: PLR0913 client: mqtt.Client, userdata: Any, mid: int, + reason_codes: list[ReasonCode], properties: Properties | None = None, - reason_codes: list[ReasonCode] | ReasonCode | None = None, ) -> None: """Called when we receive an UNSUBACK message from the broker.""" try: @@ -619,7 +620,14 @@ def _on_message( except asyncio.QueueFull: self._logger.warning("Message queue is full. Discarding message.") - def _on_publish(self, client: mqtt.Client, userdata: Any, mid: int) -> None: + def _on_publish( # noqa: PLR0913 + self, + client: mqtt.Client, + userdata: Any, + mid: int, + reason_code: ReasonCode, + properties: Properties, + ) -> None: try: self._pending_publishes.pop(mid).set() except KeyError: