Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't hold the connection when reply fail #10

Open
wants to merge 1 commit into
base: eayunstack-1.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 49 additions & 9 deletions oslo/messaging/_drivers/amqpdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import logging
import threading
import time
import uuid

import cachetools
Expand Down Expand Up @@ -60,16 +61,19 @@ def _send_reply(self, conn, reply=None, failure=None,
msg['ending'] = True

rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]

# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibility.
if self.reply_q:
msg['_msg_id'] = self.msg_id
try:
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
except rpc_amqp.AMQPDestinationNotFound:
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
LOG.debug("sending reply msg_id: %(msg_id)s "
"reply queue: %(reply_q)s" % {
'msg_id': self.msg_id,
'unique_id': unique_id,
'reply_q': self.reply_q})
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
else:
# TODO(sileht): look at which version of oslo-incubator rpc
# send need this, but I guess this is older than icehouse
Expand All @@ -83,16 +87,52 @@ def reply(self, reply=None, failure=None, log_failure=True):
# because reply should not be expected by caller side
return

# NOTE(sileht): return without hold the a connection if possible
# NOTE(sileht): return without using a connection if possible
if (self.reply_q and
not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
self.msg_id)):
return

with self.listener.driver._get_connection(
rpc_amqp.PURPOSE_SEND) as conn:
self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True)
duration = 60
amqp_notfound_count = 0
last_amqp_notfound_time = 0
timer = rpc_common.DecayingTimer(duration=duration)
timer.start()

while True:
try:
with self.listener.driver._get_connection(
rpc_amqp.PURPOSE_SEND) as conn:
self._send_reply(conn, reply, failure,
log_failure=log_failure)
self._send_reply(conn, ending=True)
return
except rpc_amqp.AMQPDestinationNotFound:
if timer.check_return() > 0:
current_time = time.time()
amqp_notfound_count += 1
msg = _("The reply %(msg_id)s cannot be sent, "
"reply queue %(reply_q)s doesn't exist "
"after retried %(times)s times, keep retrying...") % {
'msg_id': self.msg_id,
'reply_q': self.reply_q,
'times': amqp_notfound_count}
if amqp_notfound_count == 1 or\
current_time - last_amqp_notfound_time >= 20:
LOG.info(msg)
last_amqp_notfound_time = current_time
time.sleep(0.25)
continue
else:
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
msg = _("The reply %(msg_id)s cannot be sent, "
"reply queue %(reply_q)s doesn't exist after "
"%(duration)s sec abandoning...") % {
'msg_id': self.msg_id,
'reply_q': self.reply_q,
'duration': duration}
LOG.error(msg)
return

def acknowledge(self):
self.listener.msg_id_cache.add(self.unique_id)
Expand Down
41 changes: 7 additions & 34 deletions oslo/messaging/_drivers/impl_rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,18 +1044,10 @@ def declare_fanout_consumer(self, topic, callback):
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""

duration = 60
amqp_notfound_count = 0
last_amqp_notfound_time = 0
timer = rpc_common.DecayingTimer(duration)
timer.start()
# NOTE(sileht): retry at least 60sec, after we have a good change
# that the caller is really dead too...

while True:
try:
self.publisher_send(DirectPublisher, msg_id, msg)
except self.connection.channel_errors as exc:
try:
self.publisher_send(DirectPublisher, msg_id, msg)
except self.connection.channel_errors as exc:
if exc.code == 404:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
Expand All @@ -1064,28 +1056,9 @@ def direct_send(self, msg_id, msg):
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404 and timer.check_return() > 0:
current_time = time.time()
amqp_notfound_count += 1
msg = _("The exchange to reply to %s doesn't "
"exist for %s times, retrying...") % (
msg_id, amqp_notfound_count)
if amqp_notfound_count == 1 or\
current_time - last_amqp_notfound_time >= 20:
LOG.info(msg)
last_amqp_notfound_time = current_time
time.sleep(0.25)
continue
elif exc.code == 404:
msg = _("The exchange to reply to "
"%(routing_key)s still doesn't exist after "
"%(duration)s sec abandonning...") % {
'duration': duration,
'routing_key': msg_id}
LOG.error(msg)
raise rpc_amqp.AMQPDestinationNotFound(msg)
raise
return
raise rpc_amqp.AMQPDestinationNotFound(
"exchange %s doesn't exits" % msg_id)
raise

def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""
Expand Down