Skip to content

Commit

Permalink
refactor: Migrate to paho-mqtt callback v2; Close #308
Browse files Browse the repository at this point in the history
  • Loading branch information
empicano committed Jul 1, 2024
1 parent 25aa68f commit ba3e2e0
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions aiomqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def __init__( # noqa: C901, PLR0912, PLR0913, PLR0915

# Create the underlying paho-mqtt client instance
self._client: mqtt.Client = mqtt.Client(
callback_api_version=CallbackAPIVersion.VERSION1,
callback_api_version=CallbackAPIVersion.VERSION2,
client_id=identifier,
protocol=protocol.value,
clean_session=clean_session,
Expand Down Expand Up @@ -526,8 +526,8 @@ def _on_connect( # noqa: PLR0913
self,
client: mqtt.Client,
userdata: Any,
flags: dict[str, int],
rc: int | ReasonCode,
flags: mqtt.ConnectFlags,
reason_code: ReasonCode,
properties: Properties | None = None,
) -> None:
"""Called when we receive a CONNACK message from the broker."""
Expand All @@ -537,17 +537,18 @@ def _on_connect( # noqa: PLR0913
# self._connected twice (as it raises an asyncio.InvalidStateError).
if self._connected.done():
return
if rc == mqtt.CONNACK_ACCEPTED:
if reason_code == mqtt.CONNACK_ACCEPTED:
self._connected.set_result(None)
else:
# We received a negative CONNACK response
self._connected.set_exception(MqttConnectError(rc))
self._connected.set_exception(MqttConnectError(reason_code))

def _on_disconnect(
def _on_disconnect( # noqa: PLR0913
self,
client: mqtt.Client,
userdata: Any,
rc: int | ReasonCode | None,
flags: mqtt.DisconnectFlags,
reason_code: ReasonCode,
properties: Properties | None = None,
) -> None:
# Return early if the disconnect is already acknowledged.
Expand All @@ -567,26 +568,26 @@ def _on_disconnect(
# See also: https://docs.python.org/3/library/asyncio-dev.html#detect-never-retrieved-exceptions
if not self._connected.done() or self._connected.exception() is not None:
return
if rc == mqtt.MQTT_ERR_SUCCESS:
if reason_code == mqtt.MQTT_ERR_SUCCESS:
self._disconnected.set_result(None)
else:
self._disconnected.set_exception(
MqttCodeError(rc, "Unexpected disconnection")
MqttCodeError(reason_code, "Unexpected disconnection")
)

def _on_subscribe( # noqa: PLR0913
self,
client: mqtt.Client,
userdata: Any,
mid: int,
granted_qos: tuple[int, ...] | list[ReasonCode],
reason_codes: list[ReasonCode],
properties: Properties | None = None,
) -> None:
"""Called when we receive a SUBACK message from the broker."""
try:
fut = self._pending_subscribes.pop(mid)
if not fut.done():
fut.set_result(granted_qos)
fut.set_result(reason_codes)
except KeyError:
self._logger.exception(
'Unexpected message ID "%d" in on_subscribe callback', mid
Expand All @@ -597,8 +598,8 @@ def _on_unsubscribe( # noqa: PLR0913
client: mqtt.Client,
userdata: Any,
mid: int,
reason_codes: list[ReasonCode],
properties: Properties | None = None,
reason_codes: list[ReasonCode] | ReasonCode | None = None,
) -> None:
"""Called when we receive an UNSUBACK message from the broker."""
try:
Expand All @@ -619,7 +620,14 @@ def _on_message(
except asyncio.QueueFull:
self._logger.warning("Message queue is full. Discarding message.")

def _on_publish(self, client: mqtt.Client, userdata: Any, mid: int) -> None:
def _on_publish( # noqa: PLR0913
self,
client: mqtt.Client,
userdata: Any,
mid: int,
reason_code: ReasonCode,
properties: Properties,
) -> None:
try:
self._pending_publishes.pop(mid).set()
except KeyError:
Expand Down

0 comments on commit ba3e2e0

Please sign in to comment.