From bae2767707d615fbcc96f213e5ae1991298409c8 Mon Sep 17 00:00:00 2001 From: Borys Date: Wed, 11 Sep 2024 23:00:53 +0300 Subject: [PATCH] test: fix test_cluster_replication_migration (#3699) --- tests/dragonfly/cluster_test.py | 42 ++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index af38ca171491..d2b88461f655 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1527,7 +1527,6 @@ async def test_cluster_config_reapply(df_factory: DflyInstanceFactory): await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) -@pytest.mark.skip("broken") @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_replication_migration( df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory @@ -1544,7 +1543,6 @@ async def test_cluster_replication_migration( df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + 1000 + i) for i in range(4) ] df_factory.start_all(instances) - m1, r1, m2, r2 = instances nodes = [await create_node_info(n) for n in instances] m1_node, r1_node, m2_node, r2_node = nodes @@ -1556,27 +1554,26 @@ async def test_cluster_replication_migration( m2_node.slots = [(8001, 16383)] m2_node.replicas = [r2_node] - # generate some data with seederv1 - seeder = df_seeder_factory.create(keys=2000, port=m1.port, cluster_mode=True) + logging.debug("Push initial config") + await push_config( + json.dumps(generate_config(master_nodes)), [node.admin_client for node in nodes] + ) + + logging.debug("create data") + seeder = df_seeder_factory.create(keys=2000, port=m1_node.instance.port, cluster_mode=True) await seeder.run(target_deviation=0.1) - # start replication from replicas + logging.debug("start replication") await r1_node.admin_client.execute_command(f"replicaof localhost {m1_node.instance.port}") await r2_node.admin_client.execute_command(f"replicaof localhost {m2_node.instance.port}") await wait_available_async(r1_node.admin_client) await wait_available_async(r2_node.admin_client) - # push this config - await push_config( - json.dumps(generate_config(master_nodes)), [node.admin_client for node in nodes] - ) - - # Create caputres on the replicas with v2 seeder - r1_caputre = await SeederBase.capture(r1_node.admin_client) - r2_caputre = await SeederBase.capture(r2_node.admin_client) + r1_capture = await seeder.capture(r1_node.instance.port) + r2_capture = await seeder.capture(r2_node.instance.port) - # add migration and update config + logging.debug("start migration") m1_node.migrations = [ MigrationInfo("127.0.0.1", m2_node.instance.admin_port, [(0, 8000)], m2_node.id) ] @@ -1587,16 +1584,27 @@ async def test_cluster_replication_migration( json.dumps(generate_config(master_nodes)), [node.admin_client for node in nodes] ) - # wait for migration to finish await wait_for_status(m1_node.admin_client, m2_node.id, "FINISHED") await wait_for_status(m2_node.admin_client, m1_node.id, "FINISHED") + logging.debug("finish migration") + m1_node.migrations = [] + m1_node.slots = [(8001, 16383)] + m2_node.migrations = [] + m2_node.slots = [(0, 8000)] + + await push_config( + json.dumps(generate_config(master_nodes)), [node.admin_client for node in nodes] + ) + # wait for replicas to catch up await asyncio.sleep(2) # ensure captures got exchanged - assert (await SeederBase.capture(r1_node.admin_client)) == r2_caputre - assert (await SeederBase.capture(r2_node.admin_client)) == r1_caputre + assert await seeder.compare(r2_capture, r1_node.instance.port) + assert await seeder.compare(r1_capture, r2_node.instance.port) + + await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})