From 462e3b65327e4e6f45278d36c53c0b954e4ee9be Mon Sep 17 00:00:00 2001 From: root Date: Fri, 22 Dec 2023 16:53:42 +0000 Subject: [PATCH] Reorged logs support initial changes --- Cargo.lock | 341 ++++++++++++------------- bin/reth/src/args/rpc_server_args.rs | 6 +- crates/rpc/rpc-builder/src/auth.rs | 34 ++- crates/rpc/rpc-builder/src/eth.rs | 2 +- crates/rpc/rpc-builder/src/lib.rs | 16 +- crates/rpc/rpc-types/Cargo.toml | 4 + crates/rpc/rpc-types/src/eth/filter.rs | 31 ++- crates/rpc/rpc/src/eth/filter.rs | 119 ++++++++- crates/rpc/rpc/src/eth/pubsub.rs | 5 + 9 files changed, 348 insertions(+), 210 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af3eb7ed77ba..77577157c3e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,9 +140,9 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "alloy-dyn-abi" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fafc3b20c6d069d9db47037f34acfb0e079c050fa5c1ff9e79855609b359b92b" +checksum = "74ab9cc043cd4b0a806f79e32624c148efecd9c9395e4a75000d51fdc9726be0" dependencies = [ "alloy-json-abi", "alloy-primitives", @@ -158,9 +158,9 @@ dependencies = [ [[package]] name = "alloy-json-abi" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d32061da2f184e5defab8e65a3057f88b7017cfe1ea9e2d6b413edb5ca76a54" +checksum = "cf8889c85658aae27e96515ff2c9200cb8d8c78baefad5aee088e49b47f5f6f3" dependencies = [ "alloy-primitives", "alloy-sol-type-parser", @@ -170,9 +170,9 @@ dependencies = [ [[package]] name = "alloy-primitives" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08ca2c09d5911548a5cb620382ea0e1af99d3c898ce0efecbbd274a4676cf53e" +checksum = "6c0e5e60ff0e0c34c553822dabcfe0f5fece5a8c52f08a915be8c737de4b03fa" dependencies = [ "alloy-rlp", "arbitrary", @@ -195,32 +195,31 @@ dependencies = [ [[package]] name = "alloy-rlp" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc0fac0fc16baf1f63f78b47c3d24718f3619b0714076f6a02957d808d52cbef" +checksum = "8d58d9f5da7b40e9bfff0b7e7816700be4019db97d4b6359fe7f94a9e22e42ac" dependencies = [ "alloy-rlp-derive", "arrayvec", "bytes", - "smol_str", ] [[package]] name = "alloy-rlp-derive" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0391754c09fab4eae3404d19d0d297aa1c670c1775ab51d8a5312afeca23157" +checksum = "1a047897373be4bbb0224c1afdabca92648dc57a9c9ef6e7b0be3aff7a859c83" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] name = "alloy-sol-macro" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e40cea54ac58080a1b88ea6556866eac1902b321186c77d53ad2b5ebbbf0e038" +checksum = "a8c9d43ca0a56b356f35775deecc8f660ac99e34cbf4a33462d4bd8addd9ab6f" dependencies = [ "const-hex", "dunce", @@ -229,25 +228,25 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", "syn-solidity", "tiny-keccak", ] [[package]] name = "alloy-sol-type-parser" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f23cb462613b2046da46dbf69ebaee458b7bfd3e9d7fe05adcce38a8d4b8a14f" +checksum = "41cdf1064e9b5160ae47b5190171a0655c8b4966b9657b04f48ff5d868684ade" dependencies = [ "winnow", ] [[package]] name = "alloy-sol-types" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f81aa34725607be118c395d62c1d8d97c8a343dd1ada5370ed508ed5232eab6a" +checksum = "c169266a4b5ecf6f471947be10690f0aa295063774853b50540708b267a96e51" dependencies = [ "alloy-primitives", "alloy-sol-macro", @@ -326,15 +325,15 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.75" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +checksum = "59d2a3357dde987206219e78ecfbbb6e8dad06cbb65292758d3270e6254f7355" [[package]] name = "aquamarine" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df752953c49ce90719c7bf1fc587bc8227aed04732ea0c0f85e5397d7fdbd1a1" +checksum = "d1da02abba9f9063d786eab1509833ebb2fac0f966862ca59439c76b9c566760" dependencies = [ "include_dir", "itertools 0.10.5", @@ -558,18 +557,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] name = "async-trait" -version = "0.1.74" +version = "0.1.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -750,7 +749,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.39", + "syn 2.0.42", "which", ] @@ -771,7 +770,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -975,7 +974,7 @@ checksum = "005fa0c5bd20805466dda55eb34cd709bb31a2592bb26927b47714eeed6914d8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", "synstructure", ] @@ -1281,7 +1280,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -1315,7 +1314,7 @@ dependencies = [ "proc-macro2", "quote", "serde", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -1436,9 +1435,9 @@ dependencies = [ [[package]] name = "const-oid" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" [[package]] name = "const-str" @@ -1565,9 +1564,9 @@ checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216" [[package]] name = "crossbeam-channel" -version = "0.5.8" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +checksum = "14c3242926edf34aec4ac3a77108ad4854bffaa2e4ddc1824124ce59231302d5" dependencies = [ "cfg-if", "crossbeam-utils", @@ -1575,9 +1574,9 @@ dependencies = [ [[package]] name = "crossbeam-deque" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +checksum = "fca89a0e215bab21874660c67903c5f143333cab1da83d041c7ded6053774751" dependencies = [ "cfg-if", "crossbeam-epoch", @@ -1586,22 +1585,21 @@ dependencies = [ [[package]] name = "crossbeam-epoch" -version = "0.9.15" +version = "0.9.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +checksum = "2d2fe95351b870527a5d09bf563ed3c97c0cffb87cf1c78a591bf48bb218d9aa" dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", "memoffset", - "scopeguard", ] [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" dependencies = [ "cfg-if", ] @@ -1741,7 +1739,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -1813,7 +1811,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -1846,7 +1844,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core 0.20.3", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -1932,7 +1930,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -2106,7 +2104,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -2307,7 +2305,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -2320,7 +2318,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -2331,7 +2329,7 @@ checksum = "c2ad8cef1d801a4686bfd8919f0b30eac4c8e48968c437a6405ded4fb5272d2b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -2479,7 +2477,7 @@ dependencies = [ "regex", "serde", "serde_json", - "syn 2.0.39", + "syn 2.0.42", "toml 0.8.2", "walkdir", ] @@ -2497,7 +2495,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -2523,7 +2521,7 @@ dependencies = [ "serde", "serde_json", "strum", - "syn 2.0.39", + "syn 2.0.42", "tempfile", "thiserror", "tiny-keccak", @@ -2644,9 +2642,9 @@ dependencies = [ [[package]] name = "eyre" -version = "0.6.10" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bbb8258be8305fb0237d7b295f47bb24ff1b136a535f473baf40e70468515aa" +checksum = "b6267a1fa6f59179ea4afc8e50fd8612a3cc60bc858f786ff877a4a8cb042799" dependencies = [ "indenter", "once_cell", @@ -2861,7 +2859,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -3162,9 +3160,9 @@ checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" [[package]] name = "hkdf" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" dependencies = [ "hmac", ] @@ -3180,11 +3178,11 @@ dependencies = [ [[package]] name = "home" -version = "0.5.5" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -3282,9 +3280,9 @@ dependencies = [ [[package]] name = "hyper" -version = "0.14.27" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", @@ -3297,7 +3295,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -3351,7 +3349,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.51.1", ] [[package]] @@ -3724,6 +3722,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -3994,9 +4001,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.150" +version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "libloading" @@ -4114,9 +4121,9 @@ checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8" [[package]] name = "mach2" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +checksum = "19b955cdeb2a02b9117f121ce63aa52d08ade45de53e48fe6a38b39c10f6f709" dependencies = [ "libc", ] @@ -4175,9 +4182,9 @@ dependencies = [ [[package]] name = "memmap2" -version = "0.9.0" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deaba38d7abf1d4cca21cc89e932e542ba2b9258664d2a9ef0e61512039c9375" +checksum = "45fd3a57831bf88bc63f8cebc0cf956116276e97fef3966103e96416209f7c92" dependencies = [ "libc", ] @@ -4204,9 +4211,9 @@ dependencies = [ [[package]] name = "metrics-exporter-prometheus" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5" +checksum = "1d4fa7ce7c4862db464a37b0b31d89bca874562f034bd7993895572783d02950" dependencies = [ "base64 0.21.5", "hyper", @@ -4228,14 +4235,14 @@ checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] name = "metrics-process" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2674a02f6ad51326c2106d9aa5a07d1f759695b655c06df0bba5d5fb338ac0a4" +checksum = "6aa2a67e2580fbeba4d5a96e659945981e700a383b4cea1432e0cfc18f58c5da" dependencies = [ "libproc", "mach2", @@ -4553,7 +4560,7 @@ dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -4565,7 +4572,7 @@ dependencies = [ "proc-macro-crate 2.0.1", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -4875,7 +4882,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -4904,7 +4911,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -4931,9 +4938,9 @@ dependencies = [ [[package]] name = "pkg-config" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" [[package]] name = "plain_hasher" @@ -5095,7 +5102,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -5158,9 +5165,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.70" +version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b" +checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" dependencies = [ "unicode-ident", ] @@ -5490,9 +5497,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.22" +version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ "base64 0.21.5", "bytes", @@ -6051,7 +6058,7 @@ dependencies = [ "quote", "regex", "serial_test", - "syn 2.0.39", + "syn 2.0.42", "trybuild", ] @@ -6470,6 +6477,8 @@ dependencies = [ "serde_with", "similar-asserts", "thiserror", + "tokio", + "tokio-util", "url", ] @@ -7181,7 +7190,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -7208,9 +7217,9 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12022b835073e5b11e90a14f86838ceb1c8fb0325b72416845c487ac0fa95e80" +checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" dependencies = [ "serde", ] @@ -7253,7 +7262,7 @@ dependencies = [ "darling 0.20.3", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -7278,7 +7287,7 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -7464,15 +7473,6 @@ dependencies = [ "serde", ] -[[package]] -name = "smol_str" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74212e6bbe9a4352329b2f68ba3130c15a3f26fe88ff22dbdc6cdd58fa85e99c" -dependencies = [ - "serde", -] - [[package]] name = "snap" version = "1.1.1" @@ -7595,17 +7595,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.39", -] - -[[package]] -name = "subprocess" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c2e86926081dda636c546d8c5e641661049d7562a68f5488be4a1f7f66f6086" -dependencies = [ - "libc", - "winapi", + "syn 2.0.42", ] [[package]] @@ -7653,7 +7643,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cccfffbc6bb3bb2d3a26cd2077f4d055f6808d266f9d4d158797a4c60510dfe" dependencies = [ "debugid", - "memmap2 0.9.0", + "memmap2 0.9.3", "stable_deref_trait", "uuid 1.6.1", ] @@ -7682,9 +7672,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.39" +version = "2.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "5b7d0a2c048d661a1a59fcd7355baa232f7ed34e0ee4df2eef3c1c1c0d3852d8" dependencies = [ "proc-macro2", "quote", @@ -7693,14 +7683,14 @@ dependencies = [ [[package]] name = "syn-solidity" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2c7ad08db24862d5b787a94714ff6b047935c3e3f60af944ac969404debd7ff" +checksum = "c4e95b65f5854377a31ebfa69d71b87c9d0d9922fddbfeb91a8eda4a0c5868eb" dependencies = [ "paste", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -7711,7 +7701,7 @@ checksum = "285ba80e733fac80aa4270fbcdf83772a79b80aa35c97075320abfee4a915b06" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", "unicode-xid", ] @@ -7772,9 +7762,9 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "test-fuzz" -version = "4.0.4" +version = "4.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de8cb3597f1463b9c98b21c08d11033166a57942e60e8044e7e3bb4a8ca5416b" +checksum = "470137c4c87413dd450bef3d516e6bf88a6dfc70e3f5e359dedbc7f9fc6992b3" dependencies = [ "serde", "test-fuzz-internal", @@ -7784,9 +7774,9 @@ dependencies = [ [[package]] name = "test-fuzz-internal" -version = "4.0.4" +version = "4.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dd8da182ee4e8b195da3aa38f72b84d267bda3874cd6ef8dd29c03a71f866f2" +checksum = "9dc636f7d25fd41828408b08931216b93a4d18454a72e2d55d78bc3b78dc167e" dependencies = [ "bincode", "cargo_metadata", @@ -7795,26 +7785,25 @@ dependencies = [ [[package]] name = "test-fuzz-macro" -version = "4.0.4" +version = "4.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86cb030b9e51def5bd7bf98b3ee6e81aae7f021ebf2e05e70029b768508c376f" +checksum = "4da89cfe93508da3676114d5a15aa26e29aa88d5230e22bc5ea5f3c1f08ef5f6" dependencies = [ "darling 0.20.3", "if_chain", - "itertools 0.11.0", + "itertools 0.12.0", "once_cell", + "prettyplease", "proc-macro2", "quote", - "subprocess", - "syn 2.0.39", - "toolchain_find", + "syn 2.0.42", ] [[package]] name = "test-fuzz-runtime" -version = "4.0.4" +version = "4.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd6e7a964e6c5b20df8b03572f7fa43aa28d80fa4871b3083e597ed32664f614" +checksum = "09f701acb832f8c7911fa32863683cd9e02fe83976ed6bc9a9f56c2e4fbea936" dependencies = [ "hex", "num-traits", @@ -7831,22 +7820,22 @@ checksum = "a38c90d48152c236a3ab59271da4f4ae63d678c5d7ad6b7714d7cb9760be5e4b" [[package]] name = "thiserror" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" +checksum = "f11c217e1416d6f036b870f14e0413d480dbf28edbee1f877abaf0206af43bb7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" +checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -7870,9 +7859,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" dependencies = [ "deranged", "itoa", @@ -7892,9 +7881,9 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" dependencies = [ "time-core", ] @@ -7946,9 +7935,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.0" +version = "1.35.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", "bytes", @@ -7971,7 +7960,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -8078,19 +8067,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "toolchain_find" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebc8c9a7f0a2966e1acdaf0461023d0b01471eeead645370cf4c3f5cff153f2a" -dependencies = [ - "home", - "once_cell", - "regex", - "semver 1.0.20", - "walkdir", -] - [[package]] name = "tower" version = "0.4.13" @@ -8198,7 +8174,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -8372,9 +8348,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "trybuild" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "196a58260a906cedb9bf6d8034b6379d0c11f552416960452f267402ceeddff1" +checksum = "8419ecd263363827c5730386f418715766f584e2f874d32c23c5b00bd9727e7e" dependencies = [ "basic-toml", "glob", @@ -8663,7 +8639,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", "wasm-bindgen-shared", ] @@ -8697,7 +8673,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -8794,12 +8770,12 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows" -version = "0.51.1" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ - "windows-core", - "windows-targets 0.48.5", + "windows-core 0.52.0", + "windows-targets 0.52.0", ] [[package]] @@ -8811,6 +8787,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.45.0" @@ -9011,9 +8996,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.26" +version = "0.5.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b67b5f0a4e7a27a64c651977932b9dc5667ca7fc31ac44b03ed37a0cf42fdfff" +checksum = "9b5c3db89721d50d0e2a673f5043fc4722f76dcc352d7b1ab8b8288bed4ed2c5" dependencies = [ "memchr", ] @@ -9118,28 +9103,28 @@ checksum = "9e6936f0cce458098a201c245a11bef556c6a0181129c7034d10d76d1ec3a2b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", "synstructure", ] [[package]] name = "zerocopy" -version = "0.7.30" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "306dca4455518f1f31635ec308b6b3e4eb1b11758cefafc782827d0aa7acb5c7" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.30" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be912bf68235a88fbefd1b73415cb218405958d1655b2ece9035a19920bdf6ba" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -9159,7 +9144,7 @@ checksum = "e6a647510471d372f2e6c2e6b7219e44d8c574d24fdc11c610a61455782f18c3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", "synstructure", ] @@ -9180,7 +9165,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] @@ -9203,7 +9188,7 @@ checksum = "7a4a1638a1934450809c2266a70362bfc96cd90550c073f5b8a55014d1010157" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.42", ] [[package]] diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 54f0f040fda0..93a0a0cb5a57 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -283,7 +283,8 @@ impl RpcServerArgs { } /// Create Engine API server. - pub async fn start_auth_server( + #[allow(clippy::too_many_arguments)] + pub async fn start_auth_server( &self, provider: Provider, pool: Pool, @@ -291,6 +292,7 @@ impl RpcServerArgs { executor: Tasks, engine_api: EngineApi, jwt_secret: JwtSecret, + events: Events, ) -> Result where Provider: BlockReaderIdExt @@ -304,6 +306,7 @@ impl RpcServerArgs { Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, { let socket_address = SocketAddr::new(self.auth_addr, self.auth_port); @@ -315,6 +318,7 @@ impl RpcServerArgs { engine_api, socket_address, jwt_secret, + events, ) .await } diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 7f1158e1cf5c..112143c5aade 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -12,15 +12,15 @@ use jsonrpsee::{ }; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{ - BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt, - StateProviderFactory, + BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, HeaderProvider, + ReceiptProviderIdExt, StateProviderFactory, }; use reth_rpc::{ eth::{ cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig, FeeHistoryCache, FeeHistoryCacheConfig, }, - AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, + AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, }; use reth_rpc_api::{servers::*, EngineApiServer}; @@ -33,7 +33,7 @@ use std::{ /// Configure and launch a _standalone_ auth server with `engine` and a _new_ `eth` namespace. #[allow(clippy::too_many_arguments)] -pub async fn launch( +pub async fn launch( provider: Provider, pool: Pool, network: Network, @@ -41,6 +41,7 @@ pub async fn launch( engine_api: EngineApi, socket_addr: SocketAddr, secret: JwtSecret, + events: Events, ) -> Result where Provider: BlockReaderIdExt @@ -56,6 +57,7 @@ where Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, EngineApi: EngineApiServer, + Events: CanonStateSubscriptions + Clone + 'static, { // spawn a new cache task let eth_cache = @@ -68,7 +70,7 @@ where let eth_api = EthApi::with_spawner( provider.clone(), pool.clone(), - network, + network.clone(), eth_cache.clone(), gas_oracle, EthConfig::default().rpc_gas_cap, @@ -79,15 +81,28 @@ where let config = EthFilterConfig::default() .max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE) .max_blocks_per_filter(DEFAULT_MAX_BLOCKS_PER_FILTER); - let eth_filter = - EthFilter::new(provider, pool, eth_cache.clone(), config, Box::new(executor.clone())); + let eth_pubsub = EthPubSub::with_spawner( + provider.clone(), + pool.clone(), + events.clone(), + network.clone(), + Box::new(executor.clone()), + ); + let eth_filter = EthFilter::new( + provider.clone(), + pool.clone(), + eth_cache.clone(), + config, + Box::new(executor.clone()), + eth_pubsub.clone(), + ); launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await } /// Configure and launch a _standalone_ auth server with existing EthApi implementation. -pub async fn launch_with_eth_api( +pub async fn launch_with_eth_api( eth_api: EthApi, - eth_filter: EthFilter, + eth_filter: EthFilter, engine_api: EngineApi, socket_addr: SocketAddr, secret: JwtSecret, @@ -104,6 +119,7 @@ where Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, EngineApi: EngineApiServer, + Events: CanonStateSubscriptions + Clone + 'static, { // Configure the module and start the server. let mut module = RpcModule::new(()); diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 8da3405368fd..1e5b5fb3c627 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -19,7 +19,7 @@ pub struct EthHandlers { /// The async caching layer used by the eth handlers pub cache: EthStateCache, /// Polling based filter handler available on all transports - pub filter: EthFilter, + pub filter: EthFilter, /// Handler for subscriptions only available for transports that support it (ws, ipc) pub pubsub: EthPubSub, /// The configured tracing call pool diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index b2c7ff3af966..33404ee44a35 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1210,20 +1210,20 @@ where blocking_task_pool.clone(), fee_history_cache, ); - let filter = EthFilter::new( + let pubsub = EthPubSub::with_spawner( self.provider.clone(), self.pool.clone(), - cache.clone(), - self.config.eth.filter_config(), + self.events.clone(), + self.network.clone(), executor.clone(), ); - - let pubsub = EthPubSub::with_spawner( + let filter = EthFilter::new( self.provider.clone(), self.pool.clone(), - self.events.clone(), - self.network.clone(), - executor, + cache.clone(), + self.config.eth.filter_config(), + executor.clone(), + pubsub.clone(), ); let eth = EthHandlers { api, cache, filter, pubsub, blocking_task_pool }; diff --git a/crates/rpc/rpc-types/Cargo.toml b/crates/rpc/rpc-types/Cargo.toml index 7074686e39ec..cc654b7deabd 100644 --- a/crates/rpc/rpc-types/Cargo.toml +++ b/crates/rpc/rpc-types/Cargo.toml @@ -15,6 +15,10 @@ alloy-primitives = { workspace = true, features = ["rand", "rlp", "serde"] } ethereum_ssz_derive = { version = "0.5", optional = true } ethereum_ssz = { version = "0.5", optional = true } +#async +tokio = { workspace = true, features = ["sync"] } +tokio-util = "0.7" + # misc thiserror.workspace = true itertools.workspace = true diff --git a/crates/rpc/rpc-types/src/eth/filter.rs b/crates/rpc/rpc-types/src/eth/filter.rs index 74ead5cfc85b..5c18de1dd759 100644 --- a/crates/rpc/rpc-types/src/eth/filter.rs +++ b/crates/rpc/rpc-types/src/eth/filter.rs @@ -10,7 +10,9 @@ use std::{ collections::HashSet, hash::Hash, ops::{Range, RangeFrom, RangeTo}, + sync::Arc, }; +use tokio::sync::Mutex; /// Helper type to represent a bloom filter used for matching logs. #[derive(Default, Debug)] @@ -245,7 +247,7 @@ impl FilterBlockOption { } /// Filter for -#[derive(Default, Debug, PartialEq, Eq, Clone)] +#[derive(Default, Debug, Clone)] pub struct Filter { /// Filter block options, specifying on which blocks the filter should /// match. @@ -253,10 +255,21 @@ pub struct Filter { pub block_option: FilterBlockOption, /// Address pub address: FilterSet
, - /// Topics (maxmimum of 4) + /// Topics (maximum of 4) pub topics: [Topic; 4], + /// Reorged logs + pub reorged_logs: Arc>>>, } +impl PartialEq for Filter { + fn eq(&self, other: &Self) -> bool { + (&self.block_option, &self.address, &self.topics) == + (&other.block_option, &other.address, &other.topics) + } +} + +impl Eq for Filter {} + impl Filter { /// Creates a new, empty filter pub fn new() -> Self { @@ -619,7 +632,10 @@ impl<'de> Deserialize<'de> for Filter { FilterBlockOption::Range { from_block, to_block } }; - Ok(Filter { block_option, address, topics }) + let reorged_logs: Arc>>> = + Arc::new(Mutex::new(Some(Vec::new()))); + + Ok(Filter { block_option, address, topics, reorged_logs }) } } @@ -1108,6 +1124,7 @@ mod tests { Default::default(), Default::default(), ], + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), } } @@ -1147,6 +1164,7 @@ mod tests { block_option: Default::default(), address: Default::default(), topics: Default::default(), + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let topics = filter.topics; @@ -1173,6 +1191,7 @@ mod tests { Default::default(), Default::default(), ], + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let topics = filter.topics; @@ -1204,6 +1223,7 @@ mod tests { Default::default(), Default::default(), ], + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let topics = filter.topics; @@ -1225,6 +1245,7 @@ mod tests { Default::default(), Default::default(), ], + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let topics_input = filter.topics; @@ -1242,6 +1263,7 @@ mod tests { block_option: Default::default(), address: rng_address.into(), topics: Default::default(), + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let address_bloom = FilteredParams::address_filter(&filter.address); assert!(FilteredParams::matches_address( @@ -1258,6 +1280,7 @@ mod tests { block_option: Default::default(), address: rng_address.into(), topics: Default::default(), + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let address_bloom = FilteredParams::address_filter(&filter.address); assert!(!FilteredParams::matches_address( @@ -1308,6 +1331,7 @@ mod tests { .into(), Default::default(), ], + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))) } ); } @@ -1333,6 +1357,7 @@ mod tests { }, address: Default::default(), topics: Default::default(), + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))) } ); } diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 96cb687e6375..86dd93e4cec7 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -3,16 +3,23 @@ use crate::{ eth::{ error::{EthApiError, EthResult}, logs_utils, + pubsub::EthPubSub, }, result::{rpc_error_with_code, ToRpcResult}, EthSubscriptionIdProvider, }; use core::fmt; +use futures::{executor, StreamExt}; +use reth_network_api::NetworkInfo; +use tokio_stream::{wrappers::BroadcastStream, Stream}; +use alloy_primitives::B256; use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; use reth_primitives::{BlockHashOrNumber, IntoRecoveredTransaction, Receipt, SealedBlock, TxHash}; -use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError}; +use reth_provider::{ + BlockIdReader, BlockReader, CanonStateSubscriptions, EvmEnvProvider, ProviderError, +}; use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{ Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log, @@ -23,6 +30,7 @@ use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, Transa use std::{ collections::HashMap, iter::StepBy, + marker::PhantomData, ops::RangeInclusive, sync::Arc, time::{Duration, Instant}, @@ -37,15 +45,19 @@ use tracing::trace; const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb /// `Eth` filter RPC implementation. -pub struct EthFilter { +pub struct EthFilter { /// All nested fields bundled together inner: Arc>, + phantom_events: PhantomData, + phantom_network: PhantomData, } -impl EthFilter +impl EthFilter where - Provider: Send + Sync + 'static, + Provider: BlockReader + EvmEnvProvider + Send + Sync + 'static, Pool: Send + Sync + 'static, + Events: CanonStateSubscriptions + 'static, + Network: NetworkInfo + 'static, { /// Creates a new, shareable instance. /// @@ -61,6 +73,7 @@ where eth_cache: EthStateCache, config: EthFilterConfig, task_spawner: Box, + pubsub: EthPubSub, ) -> Self { let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } = config; @@ -78,7 +91,11 @@ where max_logs_per_response: max_logs_per_response.unwrap_or(usize::MAX), }; - let eth_filter = Self { inner: Arc::new(inner) }; + let eth_filter = Self { + inner: Arc::new(inner), + phantom_events: PhantomData::, + phantom_network: PhantomData::, + }; let this = eth_filter.clone(); eth_filter.inner.task_spawner.clone().spawn_critical( @@ -88,6 +105,14 @@ where }), ); + let also_this = eth_filter.clone(); + eth_filter.inner.task_spawner.clone().spawn_critical( + "eth-filters_update_reorged_logs", + Box::pin(async move { + also_this.update_reorged_logs(pubsub).await; + }), + ); + eth_filter } @@ -120,9 +145,69 @@ where is_valid }) } + + async fn update_reorged_logs(&self, pubsub: EthPubSub) { + let mut stream = self.reorged_logs_stream(pubsub).await; + while let Some((id, logs)) = stream.next().await { + let mut filters = executor::block_on(self.active_filters().inner.lock()); + let active_filter = + filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id)).unwrap(); + if let FilterKind::Log(ref mut filter) = active_filter.kind { + filter.reorged_logs = Arc::new(Mutex::new(Some(logs))); + } + } + } + + /// Reacts to reorged blocks, checks impacted log filters, stores reorged logs in the log filter + pub async fn reorged_logs_stream( + &self, + pubsub: EthPubSub, + ) -> impl Stream)> + '_ { + let mut temp_reorged_blocks = Vec::new(); + + BroadcastStream::new(pubsub.get_chain_events().subscribe_to_canonical_state()) + .map(move |canon_state| { + canon_state.expect("new block subscription never ends; qed").block_receipts() + }) + .flat_map(futures::stream::iter) + .flat_map(move |(block_receipts, removed)| { + let mut reorged_logs: Vec<(FilterId, Vec)> = Vec::new(); + + if removed { + temp_reorged_blocks.push(block_receipts); + + let filters = executor::block_on(self.active_filters().inner.lock()); + + filters.iter().for_each(|(id, active_filter)| { + if let FilterKind::Log(ref filter) = active_filter.kind { + let mut reverted_logs: Vec = Vec::new(); + let filtered_params = FilteredParams::new(Some(*filter.clone())); + + for reorged_block in &mut temp_reorged_blocks { + let mut matching_logs = logs_utils::matching_block_logs( + &filtered_params, + reorged_block.block, + reorged_block.tx_receipts.clone(), + true, + ); + + reverted_logs.append(&mut matching_logs); + if Some(reorged_block.block.hash) == active_filter.block_hash { + reorged_logs.push((id.clone(), reverted_logs.clone())); + } + } + } + }); + } else { + temp_reorged_blocks.clear(); + } + + futures::stream::iter(reorged_logs) + }) + } } -impl EthFilter +impl EthFilter where Provider: BlockReader + BlockIdReader + EvmEnvProvider + 'static, Pool: TransactionPool + 'static, @@ -221,10 +306,13 @@ where } #[async_trait] -impl EthFilterApiServer for EthFilter +impl EthFilterApiServer + for EthFilter where Provider: BlockReader + BlockIdReader + EvmEnvProvider + 'static, Pool: TransactionPool + 'static, + Events: CanonStateSubscriptions + 'static, + Network: NetworkInfo + 'static, { /// Handler for `eth_newFilter` async fn new_filter(&self, filter: Filter) -> RpcResult { @@ -303,15 +391,21 @@ where } } -impl std::fmt::Debug for EthFilter { +impl std::fmt::Debug + for EthFilter +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("EthFilter").finish_non_exhaustive() } } -impl Clone for EthFilter { +impl Clone for EthFilter { fn clone(&self) -> Self { - Self { inner: Arc::clone(&self.inner) } + Self { + inner: Arc::clone(&self.inner), + phantom_events: PhantomData::, + phantom_network: PhantomData::, + } } } @@ -389,12 +483,15 @@ where /// Installs a new filter and returns the new identifier. async fn install_filter(&self, kind: FilterKind) -> RpcResult { let last_poll_block_number = self.provider.best_block_number().to_rpc_result()?; + let last_poll_block_hash = + self.provider.block_hash(last_poll_block_number).to_rpc_result()?; let id = FilterId::from(self.id_provider.next_id()); let mut filters = self.active_filters.inner.lock().await; filters.insert( id.clone(), ActiveFilter { block: last_poll_block_number, + block_hash: last_poll_block_hash, last_poll_timestamp: Instant::now(), kind, }, @@ -549,6 +646,8 @@ pub struct ActiveFilters { struct ActiveFilter { /// At which block the filter was polled last. block: u64, + /// Hash of the block at which the filter was polled last. + block_hash: Option, /// Last time this filter was polled. last_poll_timestamp: Instant, /// What kind of filter it is. diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index b720ebec1647..bda53d427910 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -60,6 +60,11 @@ impl EthPubSub let inner = EthPubSubInner { provider, pool, chain_events, network }; Self { inner: Arc::new(inner), subscription_task_spawner } } + + /// Method to get chain events for canonical state subscription + pub fn get_chain_events(&self) -> &Events { + &self.inner.chain_events + } } #[async_trait::async_trait]