-
Notifications
You must be signed in to change notification settings - Fork 0
/
stomp_test.py
72 lines (52 loc) · 1.98 KB
/
stomp_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# test stomp -> amq
# see https://stackoverflow.com/questions/9328863/stomp-py-return-message-from-listener/10102673#10102673
# TODO: make class as sper: https://github.com/jasonrbriggs/stomp.py/issues/325
from stomp import *
from sqlalchemy.engine import create_engine
from sqlalchemy.sql import text
import time, ast, os
DB_HOST = os.environ.get('DB_HOST', 'postgresql+psycopg2://postgres:postgres@localhost/notes')
AMQ_HOST = os.environ.get('AMQ_HOST', '127.0.0.1')
QUEUE_NAME = os.environ.get('/queue/test')
# db engine for read/write
engine = create_engine(DB_HOST)
# connection to AMQ via STOMP
c = Connection([(AMQ_HOST, 61613)])
# custom listener class for receiving messages
class MyListener(ConnectionListener):
def __init__(self):
self.msg_list = []
def on_error(self, frame):
return self.msg_list.append('(ERROR) ' + frame.header + frame.body)
def on_message(self, frame):
return self.msg_list.append((frame.headers, frame.body))
# connect
c.connect('admin', 'admin', wait=True)
# instantiate
lst = MyListener()
# send test messages to AMQ
with engine.connect() as conn:
#rs = conn.execute('SELECT * FROM notes order by random() limit 1000;')
rs = conn.execute('SELECT * FROM notes;')
for row in rs:
#print(row.text)
c.send(QUEUE_NAME, str({'text':row.text, 'id':row.note_id}), headers={"persistent":"true",
"activemq.prefetchSize": 100})
# set listener
c.set_listener('', lst)
# subscribe client
#c.subscribe('/queue/test', id=1, ack='client-individual')
c.subscribe(QUEUE_NAME, id=1, ack='client')
#c.subscribe('/queue/test', id=1, ack='auto')
time.sleep(2)
messages = lst.msg_list
message_id = []
for m in messages:
#print(m[0]['message-id'], ast.literal_eval(m[1])['text'],ast.literal_eval(m[1])['id'] )
time.sleep(.5)
# ack to acknowledge snarfing of message out of queue by message-id
c.ack(m[0]['message-id'], 1)
#messages.remove(m)
message_id.append(m[0]['message-id'])
print('len', len(message_id))
c.disconnect()