Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive packet after each block insert, so profile events are not missed #392

Merged
merged 8 commits into from
Oct 19, 2023
11 changes: 11 additions & 0 deletions clickhouse_driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,12 +645,23 @@ def send_data(self, sample_block, data, types_check=False, columnar=False):
else:
slicer = column_chunks if columnar else chunks

revision = self.connection.server_info.used_revision
should_wait_for_profile_event = (
revision >=
defines.DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT
)

for chunk in slicer(data, client_settings['insert_block_size']):
block = block_cls(sample_block.columns_with_types, chunk,
types_check=types_check)
self.connection.send_data(block)
inserted_rows += block.num_rows

# Starting from the specific revision there are profile events
# sent by server in response to each inserted block
if should_wait_for_profile_event:
self.receive_packet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to explicitly allow only ServerPacketTypes.PROFILE_EVENTS (to make it compatible with ClickHouse code base).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used the existing function cause it receives the packet and also correctly handles the exception type. And as I get from sources, there is no extra processing function for ProfileEvent.
My goal was a minimal intervention, cause I am not familiar enough with overall packet processing in the library.

Copy link
Contributor Author

@insomnes insomnes Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I get it you propose adding some method like this on connection class?

def receive_profile_event(self):
    packet_type = read_varint(self.fin)
    if packet_type == ServerPacketTypes.PROFILE_EVENTS:
        self.receive_data(may_be_compressed=False)
        return None
    elif packet_type == ServerPacketTypes.EXCEPTION:
        raise self.receive_exception()
    else:
        message = self.unexpected_packet_message('ProfileEvent or Exception', packet_type)
        self.disconnect()
        raise errors.UnexpectedPacketFromServerError(message)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I get it you propose adding some method like this on connection class?

Exactly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


# Empty block means end of data.
self.connection.send_data(block_cls())
return inserted_rows
Expand Down
1 change: 1 addition & 0 deletions clickhouse_driver/defines.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS = 54451
DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453
DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION = 54454
DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT = 54456
DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM = 54458
DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY = 54458
DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS = 54459
Expand Down
19 changes: 19 additions & 0 deletions tests/test_long_insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from tests.testcase import BaseTestCase


class LongInsertTestCase(BaseTestCase):
client_kwargs = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that this settings deserves some comments:

  • insert_block_size=1 - to produce as much blocks as possible
  • send_timeout/receive_timeout - to trigger timeout in case of client does not consumes profile events packets

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@insomnes we have time limit of 10s for each test. Is 5s enough for reproducing? It seems that pytest-timer applies timeout to all test without any exceptions.

Should we patch connection.receive_profile_events or read_varint with following mock to ensure this test works?

        def side_receive_profile_events(*args, **kwargs):
            # do something, toggle flag, etc.
            return receive_profile_events(*args, **kwargs)

        with patch('socket.getaddrinfo') as side_receive_profile_events:
            receive_profile_events.side_effect = side_receive_profile_events
            self.client.execute('INSERT INTO ...')
           # assert something happened in side_receive_profile_events

See tests.test_connect.ConnectTestCase.test_alt_hosts for inspiration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xzkostyan thank you for the clarification. I would need a bit of time to wrap my head around this. This test depends on server behavior and I can't come up with an appropriate mock from the get-go.

With the unpatched function and this synthetic data, this will for sure error in the first 5 seconds as ServerException if we were inserting quickly enough or as our socket's ConnectionResetError.

With the patched one it would take a long for the proper ending cause we process 100 000 ProfileEvents packets on new server versions.
Maybe a mock that passes if we don't catch ServerException or ConnectionResetError in the first 7 seconds would be ok.

I will try to think about it a bit more and come back with a solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xzkostyan sorry for the long pause.
I've tried to emulate the situation of bug with adding of small sleep before calling receive_end_of_query in Client.
And lowered the row count. This configuration still reproduces the bug on unpatched version, but runs normally and under 7-8 seconds in patched one.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity I looked at this change, and this hack looks nice, though it was not clear how it works at first glance (when the driver do not receive profile events it will call receive_end_of_query, which will read all ProfileEvents and also EndOfStream, but it will sleep each time to 2 seconds, so eventually the server will timed out, and it does not timed out for EndOfStream, because it will be written to socket buffer)

But I think that it should be fixed differently, instead receive_end_of_query should not handle ProfileEvents for INSERT (they still should be handled for SELECT though), so I think there should be separate receive_end_of_insert_query that will not handle them, and then it will be enough to send one block in this test, without any timeouts trickery.

And this will match with how ClickHouse handle this protocol internally for INSERT, so this is preferable I would say.

Copy link
Contributor

@azat azat Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this - https://gist.github.com/azat/da85ff0bde3f0da259144b0ba361cd64

BTW it also founds one problem in this patch - missing reading of ProfileEvents after empty block

P.S. @insomnes this patch is done on top of your latest changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xzkostyan hello again!

I've applied @azat suggestion with a slight adjustment of moving method to client, and accepting possible Progress event in it (I found the possibility of Progress packet coming before ProfileEvents in test). Azat confirmed that it is normal and expected behavior from the server side.

There is only a simple insert test now. I have deleted the previous one, cause it still fails due to timeout inside github.

If you think, that some more direct test should be applied, maybe we can check that receive_end_of_query is not called and receive_end_of_insert_query has not get any ProfileEvents packet in it.

This breaks backward compatibility in some sens, so If you think this should be made other way, I would be happy to change code according to your vision.

'settings': {
'insert_block_size': 1,
'send_timeout': 1,
'receive_timeout': 1,
},
'send_receive_timeout': 2,
}

def test_long_insert(self):
data = [{'x': 1}] * 100_000
self.client.execute(
'insert into function null(\'x Int\') (x) values',
data
)
Loading