diff --git a/clickhouse_driver/client.py b/clickhouse_driver/client.py index ec6cf635..fe57b502 100644 --- a/clickhouse_driver/client.py +++ b/clickhouse_driver/client.py @@ -597,7 +597,7 @@ def process_insert_query(self, query_without_data, data, if sample_block: rv = self.send_data(sample_block, data, types_check=types_check, columnar=columnar) - self.receive_end_of_query() + self.receive_end_of_insert_query() return rv def receive_sample_block(self): @@ -651,8 +651,15 @@ def send_data(self, sample_block, data, types_check=False, columnar=False): self.connection.send_data(block) inserted_rows += block.num_rows + # Starting from the specific revision there are profile events + # sent by server in response to each inserted block + self.receive_profile_events() + # Empty block means end of data. self.connection.send_data(block_cls()) + # If enabled by revision profile events are also sent after empty block + self.receive_profile_events() + return inserted_rows def receive_end_of_query(self): @@ -679,7 +686,59 @@ def receive_end_of_query(self): else: message = self.connection.unexpected_packet_message( - 'Exception, EndOfStream or Log', packet.type + 'Exception, EndOfStream, Progress, TableColumns, ' + 'ProfileEvents or Log', packet.type + ) + raise errors.UnexpectedPacketFromServerError(message) + + def receive_end_of_insert_query(self): + while True: + packet = self.connection.receive_packet() + + if packet.type == ServerPacketTypes.END_OF_STREAM: + break + + elif packet.type == ServerPacketTypes.LOG: + log_block(packet.block) + + elif packet.type == ServerPacketTypes.PROGRESS: + continue + + elif packet.type == ServerPacketTypes.EXCEPTION: + raise packet.exception + + else: + message = self.connection.unexpected_packet_message( + 'EndOfStream, Log, Progress or Exception', packet.type + ) + raise errors.UnexpectedPacketFromServerError(message) + + def receive_profile_events(self): + revision = self.connection.server_info.used_revision + if ( + revision < + defines.DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT + ): + return None + + while True: + packet = self.connection.receive_packet() + + if packet.type == ServerPacketTypes.PROFILE_EVENTS: + break + + elif packet.type == ServerPacketTypes.PROGRESS: + self.last_query.store_progress(packet.progress) + + elif packet.type == ServerPacketTypes.LOG: + log_block(packet.block) + + elif packet.type == ServerPacketTypes.EXCEPTION: + raise packet.exception + + else: + message = self.connection.unexpected_packet_message( + 'ProfileEvents, Progress, Log or Exception', packet.type ) raise errors.UnexpectedPacketFromServerError(message) diff --git a/clickhouse_driver/defines.py b/clickhouse_driver/defines.py index 3f72771b..c86aa262 100644 --- a/clickhouse_driver/defines.py +++ b/clickhouse_driver/defines.py @@ -26,6 +26,7 @@ DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS = 54451 DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453 DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION = 54454 +DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT = 54456 DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM = 54458 DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY = 54458 DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS = 54459 diff --git a/tests/test_insert.py b/tests/test_insert.py index 703c28c6..2d76ed8e 100644 --- a/tests/test_insert.py +++ b/tests/test_insert.py @@ -163,6 +163,13 @@ def test_insert_from_input(self): inserted = self.emit_cli(query) self.assertEqual(inserted, '1\n') + def test_profile_events(self): + with self.create_table('x Int32'): + data = [{'x': 1}] + self.client.execute( + 'INSERT INTO test (x) VALUES', data + ) + class InsertColumnarTestCase(BaseTestCase): def test_insert_tuple_ok(self):