Skip to content

Commit

Permalink
Add profile events support
Browse files Browse the repository at this point in the history
  • Loading branch information
kozzztik committed Sep 18, 2024
1 parent ca34155 commit 8d2ac70
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 4 deletions.
8 changes: 6 additions & 2 deletions clickhouse_driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ def receive_packet(self):
self.last_query.store_profile(packet.profile_info)
return True

elif packet.type == ServerPacketTypes.PROFILE_EVENTS:
self.last_query.store_profile_events(packet)
return True

else:
return True

Expand Down Expand Up @@ -691,7 +695,7 @@ def receive_end_of_query(self):
pass

elif packet.type == ServerPacketTypes.PROFILE_EVENTS:
self.last_query.store_profile(packet.profile_info)
self.last_query.store_profile_events(packet)

else:
message = self.connection.unexpected_packet_message(
Expand Down Expand Up @@ -734,7 +738,7 @@ def receive_profile_events(self):
packet = self.connection.receive_packet()

if packet.type == ServerPacketTypes.PROFILE_EVENTS:
self.last_query.store_profile(packet.profile_info)
self.last_query.store_profile_events(packet)
break

elif packet.type == ServerPacketTypes.PROGRESS:
Expand Down
12 changes: 12 additions & 0 deletions clickhouse_driver/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def __init__(self):
self.profile_info = BlockStreamProfileInfo()
self.progress = Progress()
self.elapsed = 0
self.stats = {}

def store_profile(self, profile_info):
self.profile_info = profile_info
Expand All @@ -142,3 +143,14 @@ def store_progress(self, progress):

def store_elapsed(self, elapsed):
self.elapsed = elapsed

def store_profile_events(self, packet):
data = QueryResult([packet]).get_result()
column_names = [i[0] for i in packet.block.columns_with_types]
for row in data:
item = dict(zip(column_names, row))
name = item.get('name', '')
if item['type'] == 'increment':
self.stats[name] = self.stats.get(name, 0) + item['value']
elif item['type'] == 'gauge':
self.stats[name] = item['value']
27 changes: 25 additions & 2 deletions tests/test_query_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def test_store_last_query_after_execute(self):

self.assertGreater(last_query.elapsed, 0)

self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 42)

def test_last_query_after_execute_iter(self):
with self.sample_table():
list(self.client.execute_iter(self.sample_query))
Expand All @@ -57,6 +60,9 @@ def test_last_query_after_execute_iter(self):

self.assertEqual(last_query.elapsed, 0)

self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 42)

def test_last_query_after_execute_with_progress(self):
with self.sample_table():
progress = self.client.execute_with_progress(self.sample_query)
Expand All @@ -77,6 +83,9 @@ def test_last_query_after_execute_with_progress(self):

self.assertEqual(last_query.elapsed, 0)

self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 42)

def test_last_query_progress_total_rows(self):
self.client.execute('SELECT number FROM numbers(10) LIMIT 10')

Expand All @@ -96,6 +105,9 @@ def test_last_query_progress_total_rows(self):

self.assertGreater(last_query.elapsed, 0)

self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 10)

def test_last_query_after_execute_insert(self):
with self.sample_table():
self.client.execute('INSERT INTO test (foo) VALUES',
Expand All @@ -111,14 +123,19 @@ def test_last_query_after_execute_insert(self):

self.assertGreater(last_query.elapsed, 0)

self.assertEqual(last_query.stats['InsertQuery'], 1)
self.assertEqual(last_query.stats['InsertedRows'], 42)

def test_override_after_subsequent_queries(self):
query = 'SELECT * FROM test WHERE foo < %(i)s ORDER BY foo LIMIT 5'
with self.sample_table():
for i in range(1, 10):
self.client.execute(query, {'i': i})

profile_info = self.client.last_query.profile_info
self.assertEqual(profile_info.rows_before_limit, i)
last_query = self.client.last_query
self.assertEqual(last_query.profile_info.rows_before_limit, i)
self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 42)

def test_reset_last_query(self):
with self.sample_table():
Expand Down Expand Up @@ -150,6 +167,11 @@ def test_progress_info_increment(self):
total_rows = 100000000 if self.server_version > (19, 4) else 0
self.assertEqual(last_query.progress.total_rows, total_rows)

last_query = self.client.last_query
self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 100000000)
self.assertEqual(last_query.stats['SelectedBytes'], 800000000)

def test_progress_info_ddl(self):
self.client.execute('DROP TABLE IF EXISTS foo')

Expand All @@ -162,3 +184,4 @@ def test_progress_info_ddl(self):
self.assertEqual(last_query.progress.elapsed_ns, 0)

self.assertGreater(last_query.elapsed, 0)
self.assertDictEqual(last_query.stats, {})

0 comments on commit 8d2ac70

Please sign in to comment.