From d1464da947f04554e8680009539b80723c65cef7 Mon Sep 17 00:00:00 2001 From: Earle Lowe <30607889+emlowe@users.noreply.github.com> Date: Wed, 7 Aug 2024 11:54:45 -0700 Subject: [PATCH] CHIA-783: Stop auto-subscribing to local stores (#18166) * less auto-subscribing * Make sure to include owned stores in the managed data loop * Rearrange flaky test * trying out flaky and increase timeout * Test adjustments * Change to not actually subscribing to owned stores * Fix up typo * More test adjustments * return error when unsubscribing from not-subscribed store * Add flaky * flaky is optional dependency * Increase timeout testing * Ignore errors in getting owned stores during data loop * Undo changes to test code * Need to self-subscribe in unsub tests * Add tests for auto_subscribe_to_local_stores * Add negative test and remove flaky * Linting * Some test adjustments * Linting fixes * updates per PR comments --- chia/_tests/core/data_layer/test_data_rpc.py | 102 ++++++++++++++++++- chia/data_layer/data_layer.py | 42 ++++++-- 2 files changed, 136 insertions(+), 8 deletions(-) diff --git a/chia/_tests/core/data_layer/test_data_rpc.py b/chia/_tests/core/data_layer/test_data_rpc.py index 1dda28e3c7a5..979eb583c815 100644 --- a/chia/_tests/core/data_layer/test_data_rpc.py +++ b/chia/_tests/core/data_layer/test_data_rpc.py @@ -5,6 +5,7 @@ import copy import enum import json +import logging import os import random import sqlite3 @@ -14,7 +15,7 @@ from dataclasses import dataclass from enum import IntEnum from pathlib import Path -from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, cast +from typing import Any, AsyncIterator, Dict, List, Optional, Set, Tuple, cast import anyio import pytest @@ -2239,6 +2240,16 @@ async def test_maximum_full_file_count( assert filename not in filenames +@pytest.mark.limit_consensus_modes(reason="does not depend on consensus rules") +@pytest.mark.anyio +async def test_unsubscribe_unknown( + bare_data_layer_api: DataLayerRpcApi, + seeded_random: random.Random, +) -> None: + with pytest.raises(RuntimeError, match="No subscription found for the given store_id."): + await bare_data_layer_api.unsubscribe(request={"id": bytes32.random(seeded_random).hex(), "retain": False}) + + @pytest.mark.parametrize("retain", [True, False]) @pytest.mark.limit_consensus_modes(reason="does not depend on consensus rules") @pytest.mark.anyio @@ -2266,6 +2277,8 @@ async def test_unsubscribe_removes_files( store_id = bytes32.from_hexstr(res["id"]) await farm_block_check_singleton(data_layer, full_node_api, ph, store_id, wallet=wallet_rpc_api.service) + # subscribe to ourselves + await data_rpc_api.subscribe(request={"id": store_id.hex()}) update_count = 10 for batch_count in range(update_count): key = batch_count.to_bytes(2, "big") @@ -3712,3 +3725,90 @@ class ModifiedStatus(IntEnum): await farm_block_with_spend(full_node_api, ph, update_tx_rec1, wallet_rpc_api) keys = await data_rpc_api.get_keys({"id": store_id.hex()}) assert keys == {"keys": ["0x30303031", "0x30303030"]} + + +@pytest.mark.limit_consensus_modes(reason="does not depend on consensus rules") +@boolean_datacases(name="auto_subscribe_to_local_stores", false="do not auto subscribe", true="auto subscribe") +@pytest.mark.anyio +async def test_auto_subscribe_to_local_stores( + self_hostname: str, + one_wallet_and_one_simulator_services: SimulatorsAndWalletsServices, + tmp_path: Path, + monkeypatch: Any, + auto_subscribe_to_local_stores: bool, +) -> None: + wallet_rpc_api, full_node_api, wallet_rpc_port, ph, bt = await init_wallet_and_node( + self_hostname, one_wallet_and_one_simulator_services + ) + manage_data_interval = 5 + fake_store = bytes32([1] * 32) + + async def mock_get_store_ids(self: Any) -> Set[bytes32]: + return {fake_store} + + async def mock_dl_track_new(self: Any, request: Dict[str, Any]) -> Dict[str, Any]: + # ignore and just return empty response + return {} + + with monkeypatch.context() as m: + m.setattr("chia.data_layer.data_store.DataStore.get_store_ids", mock_get_store_ids) + m.setattr("chia.rpc.wallet_rpc_client.WalletRpcClient.dl_track_new", mock_dl_track_new) + + config = bt.config + config["data_layer"]["auto_subscribe_to_local_stores"] = auto_subscribe_to_local_stores + bt.change_config(new_config=config) + + async with init_data_layer( + wallet_rpc_port=wallet_rpc_port, + bt=bt, + db_path=tmp_path, + manage_data_interval=manage_data_interval, + maximum_full_file_count=100, + ) as data_layer: + data_rpc_api = DataLayerRpcApi(data_layer) + + await asyncio.sleep(manage_data_interval) + + response = await data_rpc_api.subscriptions(request={}) + + if auto_subscribe_to_local_stores: + assert fake_store.hex() in response["store_ids"] + else: + assert fake_store.hex() not in response["store_ids"] + + +@pytest.mark.limit_consensus_modes(reason="does not depend on consensus rules") +@pytest.mark.anyio +async def test_local_store_exception( + self_hostname: str, + one_wallet_and_one_simulator_services: SimulatorsAndWalletsServices, + tmp_path: Path, + monkeypatch: Any, + caplog: pytest.LogCaptureFixture, +) -> None: + wallet_rpc_api, full_node_api, wallet_rpc_port, ph, bt = await init_wallet_and_node( + self_hostname, one_wallet_and_one_simulator_services + ) + manage_data_interval = 5 + fake_store = bytes32([1] * 32) + + async def mock_get_store_ids(self: Any) -> Set[bytes32]: + return {fake_store} + + with monkeypatch.context() as m, caplog.at_level(logging.INFO): + m.setattr("chia.data_layer.data_store.DataStore.get_store_ids", mock_get_store_ids) + + config = bt.config + config["data_layer"]["auto_subscribe_to_local_stores"] = True + bt.change_config(new_config=config) + + async with init_data_layer( + wallet_rpc_port=wallet_rpc_port, + bt=bt, + db_path=tmp_path, + manage_data_interval=manage_data_interval, + maximum_full_file_count=100, + ): + await asyncio.sleep(manage_data_interval) + + assert f"Can't subscribe to local store {fake_store.hex()}:" in caplog.text diff --git a/chia/data_layer/data_layer.py b/chia/data_layer/data_layer.py index 76e3c59386dd..b9d7b1a3c991 100644 --- a/chia/data_layer/data_layer.py +++ b/chia/data_layer/data_layer.py @@ -772,6 +772,10 @@ async def remove_subscriptions(self, store_id: bytes32, urls: List[str]) -> None async def unsubscribe(self, store_id: bytes32, retain_data: bool) -> None: async with self.subscription_lock: + subscriptions = await self.data_store.get_subscriptions() + if store_id not in (subscription.store_id for subscription in subscriptions): + raise RuntimeError("No subscription found for the given store_id.") + # Unsubscribe is processed later, after all fetching of data is done, to avoid races. self.unsubscribe_data_queue.append(UnsubscribeData(store_id, retain_data)) @@ -869,22 +873,46 @@ async def periodically_manage_data(self) -> None: await asyncio.sleep(0.1) while not self._shut_down: + # Add existing subscriptions async with self.subscription_lock: subscriptions = await self.data_store.get_subscriptions() - # Subscribe to all local store_ids that we can find on chain. - local_store_ids = await self.data_store.get_store_ids() + # pseudo-subscribe to all unsubscribed owned stores + # Need this to make sure we process updates and generate DAT files + try: + owned_stores = await self.get_owned_stores() + except ValueError: + # Sometimes the DL wallet isn't available, so we can't get the owned stores. + # We'll try again next time. + owned_stores = [] subscription_store_ids = {subscription.store_id for subscription in subscriptions} - for local_id in local_store_ids: - if local_id not in subscription_store_ids: + for record in owned_stores: + store_id = record.launcher_id + if store_id not in subscription_store_ids: try: - subscription = await self.subscribe(local_id, []) - subscriptions.insert(0, subscription) + # don't actually subscribe, just add to the list + subscriptions.insert(0, Subscription(store_id=store_id, servers_info=[])) except Exception as e: self.log.info( - f"Can't subscribe to locally stored {local_id}: {type(e)} {e} {traceback.format_exc()}" + f"Can't subscribe to owned store {store_id}: {type(e)} {e} {traceback.format_exc()}" ) + # Optionally + # Subscribe to all local non-owned store_ids that we can find on chain. + # This is the prior behavior where all local stores, both owned and not owned, are subscribed to. + if self.config.get("auto_subscribe_to_local_stores", False): + local_store_ids = await self.data_store.get_store_ids() + subscription_store_ids = {subscription.store_id for subscription in subscriptions} + for local_id in local_store_ids: + if local_id not in subscription_store_ids: + try: + subscription = await self.subscribe(local_id, []) + subscriptions.insert(0, subscription) + except Exception as e: + self.log.info( + f"Can't subscribe to local store {local_id}: {type(e)} {e} {traceback.format_exc()}" + ) + work_queue: asyncio.Queue[Job[Subscription]] = asyncio.Queue() async with QueuedAsyncPool.managed( name="DataLayer subscription update pool",