Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive packet after each block insert, so profile events are not missed #392

Merged
merged 8 commits into from
Oct 19, 2023
4 changes: 4 additions & 0 deletions clickhouse_driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,10 @@ def send_data(self, sample_block, data, types_check=False, columnar=False):
self.connection.send_data(block)
inserted_rows += block.num_rows

# Starting from the specific revision there are profile events
# sent by server in response to each inserted block
self.connection.receive_profile_events()

# Empty block means end of data.
self.connection.send_data(block_cls())
return inserted_rows
Expand Down
21 changes: 21 additions & 0 deletions clickhouse_driver/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,27 @@ def ping(self):

return True

def receive_profile_events(self):
revision = self.server_info.used_revision
if (
revision <
defines.DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT
):
return None

packet_type = read_varint(self.fin)
if packet_type == ServerPacketTypes.PROFILE_EVENTS:
self.receive_data(may_be_compressed=False)
return None
elif packet_type == ServerPacketTypes.EXCEPTION:
raise self.receive_exception()
else:
message = self.unexpected_packet_message(
'ProfileEvent or Exception', packet_type
)
self.disconnect()
raise errors.UnexpectedPacketFromServerError(message)

def receive_packet(self):
packet = Packet()

Expand Down
1 change: 1 addition & 0 deletions clickhouse_driver/defines.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS = 54451
DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453
DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION = 54454
DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT = 54456
DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM = 54458
DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY = 54458
DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS = 54459
Expand Down
33 changes: 33 additions & 0 deletions tests/test_long_insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from tests.testcase import BaseTestCase


class LongInsertTestCase(BaseTestCase):
client_kwargs = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that this settings deserves some comments:

  • insert_block_size=1 - to produce as much blocks as possible
  • send_timeout/receive_timeout - to trigger timeout in case of client does not consumes profile events packets

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@insomnes we have time limit of 10s for each test. Is 5s enough for reproducing? It seems that pytest-timer applies timeout to all test without any exceptions.

Should we patch connection.receive_profile_events or read_varint with following mock to ensure this test works?

        def side_receive_profile_events(*args, **kwargs):
            # do something, toggle flag, etc.
            return receive_profile_events(*args, **kwargs)

        with patch('socket.getaddrinfo') as side_receive_profile_events:
            receive_profile_events.side_effect = side_receive_profile_events
            self.client.execute('INSERT INTO ...')
           # assert something happened in side_receive_profile_events

See tests.test_connect.ConnectTestCase.test_alt_hosts for inspiration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xzkostyan thank you for the clarification. I would need a bit of time to wrap my head around this. This test depends on server behavior and I can't come up with an appropriate mock from the get-go.

With the unpatched function and this synthetic data, this will for sure error in the first 5 seconds as ServerException if we were inserting quickly enough or as our socket's ConnectionResetError.

With the patched one it would take a long for the proper ending cause we process 100 000 ProfileEvents packets on new server versions.
Maybe a mock that passes if we don't catch ServerException or ConnectionResetError in the first 7 seconds would be ok.

I will try to think about it a bit more and come back with a solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xzkostyan sorry for the long pause.
I've tried to emulate the situation of bug with adding of small sleep before calling receive_end_of_query in Client.
And lowered the row count. This configuration still reproduces the bug on unpatched version, but runs normally and under 7-8 seconds in patched one.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity I looked at this change, and this hack looks nice, though it was not clear how it works at first glance (when the driver do not receive profile events it will call receive_end_of_query, which will read all ProfileEvents and also EndOfStream, but it will sleep each time to 2 seconds, so eventually the server will timed out, and it does not timed out for EndOfStream, because it will be written to socket buffer)

But I think that it should be fixed differently, instead receive_end_of_query should not handle ProfileEvents for INSERT (they still should be handled for SELECT though), so I think there should be separate receive_end_of_insert_query that will not handle them, and then it will be enough to send one block in this test, without any timeouts trickery.

And this will match with how ClickHouse handle this protocol internally for INSERT, so this is preferable I would say.

Copy link
Contributor

@azat azat Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this - https://gist.github.com/azat/da85ff0bde3f0da259144b0ba361cd64

BTW it also founds one problem in this patch - missing reading of ProfileEvents after empty block

P.S. @insomnes this patch is done on top of your latest changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xzkostyan hello again!

I've applied @azat suggestion with a slight adjustment of moving method to client, and accepting possible Progress event in it (I found the possibility of Progress packet coming before ProfileEvents in test). Azat confirmed that it is normal and expected behavior from the server side.

There is only a simple insert test now. I have deleted the previous one, cause it still fails due to timeout inside github.

If you think, that some more direct test should be applied, maybe we can check that receive_end_of_query is not called and receive_end_of_insert_query has not get any ProfileEvents packet in it.

This breaks backward compatibility in some sens, so If you think this should be made other way, I would be happy to change code according to your vision.

'settings': {
'insert_block_size': 1,
'send_timeout': 1,
'receive_timeout': 1,
},
}

def test_long_insert(self):
"""
In this test we are trying to emulate the situation, where we have a
lot of insert blocks. From specific clickhouse version server would
send ProfileEvents packet in response to each insert.

This insert should work normally for all clickhouse versions,
even without response ProfileEvents on each insert.
The 100_000 rows used to provide somewaht consistent experience of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The 100_000 rows used to provide somewaht consistent experience of
The 100_000 rows used to provide somewhat consistent experience of

bug reproducability without too long test duration.

`send_timeout` & `receive_timeout` are set to 1,
so we can emulate the real world situation on synthetic data.
The server will send exception and timeout if the client will not
receive the ProfileEvent during this time.
"""
with self.create_table('x Int32'):
data = [{'x': 1}] * 100_000
self.client.execute(
'INSERT INTO test (x) VALUES', data
)
Loading