diff --git a/Cargo.lock b/Cargo.lock index d3757dd..7d1072d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,7 +90,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -207,35 +207,29 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] name = "actix_derive" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c7db3d5a9718568e4cf4a537cfd7070e6e6ff7481510d0237fb529ac850f6d3" +checksum = "b6ac1e58cded18cb28ddc17143c4dea5345b3ad575e14f32f66e4054a56eb271" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] name = "addr2line" -version = "0.22.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" +checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375" dependencies = [ "gimli", ] -[[package]] -name = "adler" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" - [[package]] name = "adler2" version = "2.0.0" @@ -466,9 +460,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.86" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" [[package]] name = "arc-swap" @@ -478,9 +472,9 @@ checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" [[package]] name = "arrayref" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a" +checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" [[package]] name = "arrayvec" @@ -518,7 +512,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", "synstructure", ] @@ -530,7 +524,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -568,13 +562,13 @@ dependencies = [ [[package]] name = "async-executor" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7ebdfa2ebdab6b1760375fa7d6f382b9f486eac35fc994625a00e89280bdbb7" +checksum = "30ca9a001c1e8ba5149f91a74362376cc6bc5b919d92d988668657bd570bdcec" dependencies = [ "async-task", "concurrent-queue", - "fastrand 2.1.0", + "fastrand 2.1.1", "futures-lite 2.3.0", "slab", ] @@ -639,7 +633,7 @@ dependencies = [ "futures-lite 2.3.0", "parking", "polling 3.7.3", - "rustix 0.38.34", + "rustix 0.38.37", "slab", "tracing", "windows-sys 0.59.0", @@ -700,20 +694,20 @@ dependencies = [ [[package]] name = "async-std" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615" dependencies = [ "async-attributes", "async-channel 1.9.0", "async-global-executor", - "async-io 1.13.0", - "async-lock 2.8.0", + "async-io 2.3.4", + "async-lock 3.4.0", "crossbeam-utils", "futures-channel", "futures-core", "futures-io", - "futures-lite 1.13.0", + "futures-lite 2.3.0", "gloo-timers", "kv-log-macro", "log", @@ -744,7 +738,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -755,13 +749,13 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" [[package]] name = "async-trait" -version = "0.1.81" +version = "0.1.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" +checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -785,66 +779,19 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" -[[package]] -name = "axum" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" -dependencies = [ - "async-trait", - "axum-core", - "bytes", - "futures-util", - "http 1.1.0", - "http-body 1.0.1", - "http-body-util", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "sync_wrapper 1.0.1", - "tower 0.4.13", - "tower-layer", - "tower-service", -] - -[[package]] -name = "axum-core" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http 1.1.0", - "http-body 1.0.1", - "http-body-util", - "mime", - "pin-project-lite", - "rustversion", - "sync_wrapper 0.1.2", - "tower-layer", - "tower-service", -] - [[package]] name = "backtrace" -version = "0.3.73" +version = "0.3.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" dependencies = [ "addr2line", - "cc", "cfg-if 1.0.0", "libc", - "miniz_oxide 0.7.4", + "miniz_oxide", "object", "rustc-demangle", + "windows-targets 0.52.6", ] [[package]] @@ -945,7 +892,7 @@ checksum = "23285ad32269793932e830392f2fe2f83e26488fd3ec778883a93c8323735780" dependencies = [ "arrayref", "arrayvec 0.7.6", - "constant_time_eq 0.3.0", + "constant_time_eq 0.3.1", ] [[package]] @@ -1021,10 +968,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3ef8005764f53cd4dca619f5bf64cafd4664dada50ece25e4d81de54c80cc0b" dependencies = [ "once_cell", - "proc-macro-crate 3.1.0", + "proc-macro-crate 3.2.0", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", "syn_derive", ] @@ -1091,9 +1038,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" dependencies = [ "serde", ] @@ -1107,41 +1054,11 @@ dependencies = [ "bytes", ] -[[package]] -name = "capnp" -version = "0.19.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de71387912cac7dd3cb7c219e09628411620a18061bba58c71453c26ae7bf66a" -dependencies = [ - "embedded-io", -] - -[[package]] -name = "capnp-futures" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fac483cb34e3bc0be251dba7ce318f465143dd18f948c7bd7ad035f6fecfb1b" -dependencies = [ - "capnp", - "futures", -] - -[[package]] -name = "capnp-rpc" -version = "0.19.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04f09758ec79a2825c9bc87cbe1c8ded306c99a83b52cf89577667ebff2d3ff7" -dependencies = [ - "capnp", - "capnp-futures", - "futures", -] - [[package]] name = "cc" -version = "1.1.13" +version = "1.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72db2f7947ecee9b03b510377e8bb9077afa27176fdbff55c51027e976fdcc48" +checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0" dependencies = [ "jobserver", "libc", @@ -1247,7 +1164,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -1299,9 +1216,9 @@ checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" [[package]] name = "constant_time_eq" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" +checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" [[package]] name = "convert_case" @@ -1361,9 +1278,9 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpufeatures" -version = "0.2.13" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51e852e6dc9a5bed1fae92dd2375037bf2b768725bf3be87811edee3249d09ad" +checksum = "608697df725056feaccfa42cffdaeeec3fccc4ffc38358ecd19b243e716a78e0" dependencies = [ "libc", ] @@ -1454,7 +1371,7 @@ dependencies = [ "rand_chacha", "ring", "rust-argon2", - "secp256k1 0.29.0", + "secp256k1 0.29.1", "serde", "serde_json", "sha2 0.10.8", @@ -1611,7 +1528,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -1649,7 +1566,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -1679,7 +1596,7 @@ dependencies = [ "enum-ordinalize", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -1691,12 +1608,6 @@ dependencies = [ "serde", ] -[[package]] -name = "embedded-io" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" - [[package]] name = "encoding_rs" version = "0.8.34" @@ -1723,7 +1634,7 @@ checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -1743,7 +1654,7 @@ checksum = "de0d48a183585823424a4ce1aa132d174a6a81bd540895822eb4c8373a8e49e8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -1920,9 +1831,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" [[package]] name = "fixed-hash" @@ -1936,20 +1847,14 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "fixedbitset" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" - [[package]] name = "flate2" -version = "1.0.32" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c0596c1eac1f9e04ed902702e9878208b336edc9d6fddc8a48387349bab3666" +checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253" dependencies = [ "crc32fast", - "miniz_oxide 0.8.0", + "miniz_oxide", ] [[package]] @@ -2079,7 +1984,7 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" dependencies = [ - "fastrand 2.1.0", + "fastrand 2.1.1", "futures-core", "futures-io", "parking", @@ -2094,7 +1999,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -2168,9 +2073,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.29.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" [[package]] name = "glob" @@ -2180,9 +2085,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "gloo-timers" -version = "0.2.6" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" dependencies = [ "futures-channel", "futures-core", @@ -2202,7 +2107,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.4.0", + "indexmap", "slab", "tokio", "tokio-util", @@ -2221,7 +2126,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap 2.4.0", + "indexmap", "slab", "tokio", "tokio-util", @@ -2382,8 +2287,6 @@ dependencies = [ "actix-web", "base58", "bytes", - "capnp", - "capnp-rpc", "chacha20", "chrono", "clap", @@ -2395,7 +2298,7 @@ dependencies = [ "futures", "futures-util", "hex", - "indexmap 2.4.0", + "indexmap", "is_type", "log", "log4rs", @@ -2403,8 +2306,8 @@ dependencies = [ "once_cell", "percentage", "pretty_env_logger", - "prost 0.13.1", - "prost-types 0.13.1", + "prost", + "prost-types", "rand", "rand_chacha", "rayon", @@ -2422,8 +2325,6 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tonic", - "tonic-build", "tracing", "tracing-subscriber", "uuid", @@ -2551,16 +2452,16 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.27.2" +version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", "http 1.1.0", "hyper 1.4.1", "hyper-util", "log", - "rustls 0.23.12", + "rustls 0.23.13", "rustls-native-certs", "rustls-pki-types", "tokio", @@ -2568,19 +2469,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "hyper-timeout" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" -dependencies = [ - "hyper 1.4.1", - "hyper-util", - "pin-project-lite", - "tokio", - "tower-service", -] - [[package]] name = "hyper-tls" version = "0.5.0" @@ -2612,9 +2500,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +checksum = "da62f120a8a37763efb0cf8fdf264b884c7b8b9ac8660b900c8661030c00e6ba" dependencies = [ "bytes", "futures-channel", @@ -2632,9 +2520,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.60" +version = "0.1.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -2725,19 +2613,9 @@ checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" [[package]] name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", -] - -[[package]] -name = "indexmap" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93ead53efc7ea8ed3cfb0c79fc8023fbb782a5432b52830b6518941cebe6505c" +checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", "hashbrown 0.14.5", @@ -2752,7 +2630,7 @@ checksum = "0122b7114117e64a63ac49f752a5ca4624d534c7b1c7de796ac196381cd2d947" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -2792,9 +2670,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +checksum = "187674a687eed5fe42285b40c6291f9a01517d415fad1c3cbc6a9f778af7fcd4" [[package]] name = "is-terminal" @@ -2819,15 +2697,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fee7cda167bfafe6eb1c70926de545028f3169c9a8bcfa6db9d53f67aab2f3ec" -[[package]] -name = "itertools" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.13.0" @@ -3083,12 +2952,6 @@ dependencies = [ "regex-automata 0.1.10", ] -[[package]] -name = "matchit" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" - [[package]] name = "md-5" version = "0.10.6" @@ -3145,15 +3008,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" -[[package]] -name = "miniz_oxide" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" -dependencies = [ - "adler", -] - [[package]] name = "miniz_oxide" version = "0.8.0" @@ -3418,18 +3272,18 @@ dependencies = [ [[package]] name = "object" -version = "0.36.3" +version = "0.36.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" +checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a" dependencies = [ "memchr", ] [[package]] name = "oid-registry" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c958dd45046245b9c3c2547369bb634eb461670b2e7e0de552905801a648d1d" +checksum = "a8d8034d9489cdaf79228eb9f6a3b8d7bb32ba00d6645ebd48eef4077ceb5bd9" dependencies = [ "asn1-rs", ] @@ -3469,7 +3323,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -3561,7 +3415,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -3590,7 +3444,7 @@ version = "3.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d830939c76d294956402033aee57a6da7b438f2294eb94864c37b0569053a42c" dependencies = [ - "proc-macro-crate 3.1.0", + "proc-macro-crate 3.2.0", "proc-macro2", "quote", "syn 1.0.109", @@ -3598,9 +3452,9 @@ dependencies = [ [[package]] name = "parking" -version = "2.2.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" [[package]] name = "parking_lot" @@ -3620,7 +3474,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if 1.0.0", "libc", - "redox_syscall 0.5.3", + "redox_syscall", "smallvec", "windows-targets 0.52.6", ] @@ -3680,16 +3534,6 @@ dependencies = [ "num", ] -[[package]] -name = "petgraph" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" -dependencies = [ - "fixedbitset", - "indexmap 2.4.0", -] - [[package]] name = "pin-project" version = "1.1.5" @@ -3707,7 +3551,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -3741,7 +3585,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" dependencies = [ "atomic-waker", - "fastrand 2.1.0", + "fastrand 2.1.1", "futures-io", ] @@ -3798,7 +3642,7 @@ dependencies = [ "concurrent-queue", "hermit-abi 0.4.0", "pin-project-lite", - "rustix 0.38.34", + "rustix 0.38.37", "tracing", "windows-sys 0.59.0", ] @@ -3851,16 +3695,6 @@ dependencies = [ "log", ] -[[package]] -name = "prettyplease" -version = "0.2.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" -dependencies = [ - "proc-macro2", - "syn 2.0.75", -] - [[package]] name = "primitive-types" version = "0.12.2" @@ -3886,11 +3720,11 @@ dependencies = [ [[package]] name = "proc-macro-crate" -version = "3.1.0" +version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" +checksum = "8ecf48c7ca261d60b74ab1a7b20da18bede46776b2e55535cb958eb595c5fa7b" dependencies = [ - "toml_edit 0.21.1", + "toml_edit 0.22.21", ] [[package]] @@ -3934,94 +3768,41 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", "version_check", "yansi", ] [[package]] name = "prost" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" -dependencies = [ - "bytes", - "prost-derive 0.12.6", -] - -[[package]] -name = "prost" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" -dependencies = [ - "bytes", - "prost-derive 0.13.1", -] - -[[package]] -name = "prost-build" -version = "0.12.6" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +checksum = "3b2ecbe40f08db5c006b5764a2645f7f3f141ce756412ac9e1dd6087e6d32995" dependencies = [ "bytes", - "heck 0.5.0", - "itertools 0.12.1", - "log", - "multimap", - "once_cell", - "petgraph", - "prettyplease", - "prost 0.12.6", - "prost-types 0.12.6", - "regex", - "syn 2.0.75", - "tempfile", -] - -[[package]] -name = "prost-derive" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" -dependencies = [ - "anyhow", - "itertools 0.12.1", - "proc-macro2", - "quote", - "syn 2.0.75", + "prost-derive", ] [[package]] name = "prost-derive" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" +checksum = "acf0c195eebb4af52c752bec4f52f645da98b6e92077a04110c7f349477ae5ac" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] name = "prost-types" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" -dependencies = [ - "prost 0.12.6", -] - -[[package]] -name = "prost-types" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" +checksum = "60caa6738c7369b940c3d49246a8d1749323674c65cb13010134f5c9bad5b519" dependencies = [ - "prost 0.13.1", + "prost", ] [[package]] @@ -4061,9 +3842,9 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.3" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b22d8e7369034b9a7132bc2008cac12f2013c8132b45e0554e6e20e2617f2156" +checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" dependencies = [ "bytes", "futures-io", @@ -4071,7 +3852,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.0.0", - "rustls 0.23.12", + "rustls 0.23.13", "socket2 0.5.7", "thiserror", "tokio", @@ -4080,15 +3861,15 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.6" +version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba92fb39ec7ad06ca2582c0ca834dfeadcaf06ddfc8e635c80aa7e1c05315fdd" +checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" dependencies = [ "bytes", "rand", "ring", "rustc-hash 2.0.0", - "rustls 0.23.12", + "rustls 0.23.13", "slab", "thiserror", "tinyvec", @@ -4097,22 +3878,22 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bffec3605b73c6f1754535084a85229fa8a30f86014e6c81aeec4abb68b0285" +checksum = "4fe68c2e9e1a1234e218683dbdf9f9dfcb094113c5ac2b938dfcb9bab4c4140b" dependencies = [ "libc", "once_cell", "socket2 0.5.7", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] name = "quote" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" dependencies = [ "proc-macro2", ] @@ -4298,18 +4079,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" -dependencies = [ - "bitflags 1.3.2", -] - -[[package]] -name = "redox_syscall" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" +checksum = "0884ad60e090bf1345b93da0a5de8923c93884cd03f40dfcfddd3b4bee661853" dependencies = [ "bitflags 2.6.0", ] @@ -4447,7 +4219,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 1.0.1", - "system-configuration 0.6.0", + "system-configuration 0.6.1", "tokio", "tokio-native-tls", "tokio-util", @@ -4555,7 +4327,7 @@ checksum = "9d9848531d60c9cbbcf9d166c885316c24bc0e2a9d3eba0956bb6cbbd79bc6e8" dependencies = [ "base64 0.21.7", "blake2b_simd", - "constant_time_eq 0.3.0", + "constant_time_eq 0.3.1", ] [[package]] @@ -4578,7 +4350,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.75", + "syn 2.0.77", "walkdir", ] @@ -4634,9 +4406,9 @@ checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6" [[package]] name = "rustc_version" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" dependencies = [ "semver", ] @@ -4666,9 +4438,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.34" +version = "0.38.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +checksum = "8acb788b847c24f28525660c4d7758620a7210875711f79e7f663cc152726811" dependencies = [ "bitflags 2.6.0", "errno", @@ -4690,24 +4462,24 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.12" +version = "0.23.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" +checksum = "f2dabaac7466917e566adb06783a81ca48944c6898a1b08b9374106dd671f4c8" dependencies = [ "log", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.102.6", + "rustls-webpki 0.102.8", "subtle", "zeroize", ] [[package]] name = "rustls-native-certs" -version = "0.7.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04182dffc9091a404e0fc069ea5cd60e5b866c3adf881eff99a32d048242dffa" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" dependencies = [ "openssl-probe", "rustls-pemfile 2.1.3", @@ -4753,21 +4525,15 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.102.6" +version = "0.102.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" dependencies = [ "ring", "rustls-pki-types", "untrusted", ] -[[package]] -name = "rustversion" -version = "1.0.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" - [[package]] name = "ryu" version = "1.0.18" @@ -4818,7 +4584,7 @@ dependencies = [ "bytes", "flate2", "futures-util", - "indexmap 2.4.0", + "indexmap", "salvo_core", "tokio", "tokio-util", @@ -4878,7 +4644,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12d7e8ed235fa6cbf34d3d39afb3070a1b8f6e5a0127ff70c78d77ed731cfbd2" dependencies = [ "bytes", - "fastrand 2.1.0", + "fastrand 2.1.1", "futures-util", "http 1.1.0", "pin-project-lite", @@ -4920,7 +4686,7 @@ dependencies = [ "chrono", "futures-util", "http 1.1.0", - "indexmap 2.4.0", + "indexmap", "inventory", "mime-infer", "parking_lot", @@ -4948,13 +4714,13 @@ version = "0.71.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3e39d4fdceefc3d9876a529e8b1824c200d89d60fc0d6b100451861757235e" dependencies = [ - "proc-macro-crate 3.1.0", + "proc-macro-crate 3.2.0", "proc-macro2", "proc-macro2-diagnostics", "quote", "regex", "salvo-serde-util", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -4975,7 +4741,7 @@ version = "0.71.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca52a800cdbe4081dccc3e692b24edb643f8aa2e13b905f3367ce1506a543a08" dependencies = [ - "fastrand 2.1.0", + "fastrand 2.1.1", "futures-util", "hyper 1.4.1", "hyper-rustls", @@ -5009,7 +4775,7 @@ checksum = "020a02e832a763c1c8bc4626027e7ba512fe5e68d9539e36d7f87553d972167e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -5069,7 +4835,7 @@ dependencies = [ "hyper 1.4.1", "hyper-rustls", "hyper-util", - "indexmap 2.4.0", + "indexmap", "mime", "mime-infer", "multer", @@ -5124,7 +4890,7 @@ dependencies = [ "serde_json", "tokio", "tokio-tungstenite", - "tower 0.5.0", + "tower 0.5.1", "tracing", "ulid", ] @@ -5135,12 +4901,12 @@ version = "0.71.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acaac9df10ca8df373381a6a46faa9ae7cadd53f07307c7c208920ba02dc4a78" dependencies = [ - "proc-macro-crate 3.1.0", + "proc-macro-crate 3.2.0", "proc-macro2", "quote", "regex", "salvo-serde-util", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -5164,11 +4930,11 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +checksum = "e9aaafd5a2b6e3d657ff009d82fbd630b6bd54dd4eb06f21693925cdf80f9b8b" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -5197,7 +4963,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -5277,7 +5043,7 @@ dependencies = [ "proc-macro2", "quote", "sea-bae", - "syn 2.0.75", + "syn 2.0.77", "unicode-ident", ] @@ -5291,7 +5057,7 @@ dependencies = [ "proc-macro2", "quote", "sea-bae", - "syn 2.0.75", + "syn 2.0.77", "unicode-ident", ] @@ -5376,7 +5142,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", "thiserror", ] @@ -5420,12 +5186,12 @@ dependencies = [ [[package]] name = "secp256k1" -version = "0.29.0" +version = "0.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e0cc0f1cf93f4969faf3ea1c7d8a9faed25918d96affa959720823dfe86d4f3" +checksum = "9465315bc9d4566e1724f0fffcbcc446268cb522e60f9a27bcded6b19c108113" dependencies = [ "rand", - "secp256k1-sys 0.10.0", + "secp256k1-sys 0.10.1", ] [[package]] @@ -5439,9 +5205,9 @@ dependencies = [ [[package]] name = "secp256k1-sys" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1433bd67156263443f14d603720b082dd3121779323fce20cba2aa07b874bc1b" +checksum = "d4387882333d3aa8cb20530a17c69a3752e97837832f34f6dccc760e715001d9" dependencies = [ "cc", ] @@ -5477,9 +5243,9 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.208" +version = "1.0.210" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2" +checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" dependencies = [ "serde_derive", ] @@ -5508,22 +5274,22 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.208" +version = "1.0.210" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" +checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] name = "serde_json" -version = "1.0.125" +version = "1.0.128" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" +checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" dependencies = [ - "indexmap 2.4.0", + "indexmap", "itoa", "memchr", "ryu", @@ -5548,7 +5314,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.4.0", + "indexmap", "itoa", "ryu", "serde", @@ -5742,9 +5508,9 @@ dependencies = [ [[package]] name = "sqlformat" -version = "0.2.4" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f895e3734318cc55f1fe66258926c9b910c124d47520339efecbb6c59cec7c1f" +checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790" dependencies = [ "nom", "unicode_categories", @@ -5786,7 +5552,7 @@ dependencies = [ "futures-util", "hashlink", "hex", - "indexmap 2.4.0", + "indexmap", "log", "memchr", "once_cell", @@ -6026,9 +5792,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.75" +version = "2.0.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9" +checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed" dependencies = [ "proc-macro2", "quote", @@ -6044,7 +5810,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -6070,7 +5836,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -6086,9 +5852,9 @@ dependencies = [ [[package]] name = "system-configuration" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bc6ee10a9b4fcf576e9b0819d95ec16f4d2c02d39fd83ac1c8789785c4a42" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ "bitflags 2.6.0", "core-foundation", @@ -6143,9 +5909,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" dependencies = [ "cfg-if 1.0.0", - "fastrand 2.1.0", + "fastrand 2.1.1", "once_cell", - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.59.0", ] @@ -6185,7 +5951,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -6284,9 +6050,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.3" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", @@ -6319,7 +6085,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -6334,11 +6100,10 @@ dependencies = [ [[package]] name = "tokio-openssl" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ffab79df67727f6acf57f1ff743091873c24c579b1e2ce4d8f53e47ded4d63d" +checksum = "59df6849caa43bb7567f9a36f863c447d95a11d5903c9cc334ba32576a27eadd" dependencies = [ - "futures-util", "openssl", "openssl-sys", "tokio", @@ -6350,16 +6115,16 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.12", + "rustls 0.23.13", "rustls-pki-types", "tokio", ] [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", @@ -6380,9 +6145,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", @@ -6404,63 +6169,20 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.4.0", + "indexmap", "toml_datetime", - "winnow", + "winnow 0.5.40", ] [[package]] name = "toml_edit" -version = "0.21.1" +version = "0.22.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +checksum = "3b072cee73c449a636ffd6f32bd8de3a9f7119139aff882f44943ce2986dc5cf" dependencies = [ - "indexmap 2.4.0", + "indexmap", "toml_datetime", - "winnow", -] - -[[package]] -name = "tonic" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" -dependencies = [ - "async-stream", - "async-trait", - "axum", - "base64 0.22.1", - "bytes", - "h2 0.4.6", - "http 1.1.0", - "http-body 1.0.1", - "http-body-util", - "hyper 1.4.1", - "hyper-timeout", - "hyper-util", - "percent-encoding", - "pin-project", - "prost 0.13.1", - "socket2 0.5.7", - "tokio", - "tokio-stream", - "tower 0.4.13", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "tonic-build" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" -dependencies = [ - "prettyplease", - "proc-macro2", - "prost-build", - "quote", - "syn 2.0.75", + "winnow 0.6.18", ] [[package]] @@ -6471,23 +6193,18 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", - "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", - "slab", "tokio", - "tokio-util", "tower-layer", "tower-service", - "tracing", ] [[package]] name = "tower" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36b837f86b25d7c0d7988f00a54e74739be6477f2aac6201b8f429a7569991b7" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" dependencies = [ "futures-core", "futures-util", @@ -6532,7 +6249,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -6655,15 +6372,15 @@ checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" -version = "1.0.12" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" [[package]] name = "unicode-normalization" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" dependencies = [ "tinyvec", ] @@ -6676,9 +6393,9 @@ checksum = "52ea75f83c0137a9b98608359a5f1af8144876eb67bcb1ce837368e906a9f524" [[package]] name = "unicode-segmentation" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" [[package]] name = "unicode_categories" @@ -6810,7 +6527,7 @@ dependencies = [ "once_cell", "ring", "rust-argon2", - "secp256k1 0.29.0", + "secp256k1 0.29.1", "serde", "serde_json", "sha2 0.10.8", @@ -6864,7 +6581,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", "wasm-bindgen-shared", ] @@ -6898,7 +6615,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6998,11 +6715,11 @@ checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" [[package]] name = "whoami" -version = "1.5.1" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9" +checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" dependencies = [ - "redox_syscall 0.4.1", + "redox_syscall", "wasite", ] @@ -7233,6 +6950,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winnow" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a9bda4691f099d435ad181000724da8e5899daa10713c2d432552b9ccd3a6f" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.50.0" @@ -7271,9 +6997,9 @@ dependencies = [ [[package]] name = "xml-rs" -version = "0.8.21" +version = "0.8.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "539a77ee7c0de333dcc6da69b177380a0b81e0dacfa4f7344c465a36871ee601" +checksum = "af4e2e2f7cba5a093896c1e150fbfe177d1883e7448200efb81d40b9d339ef26" [[package]] name = "xxhash-rust" @@ -7314,7 +7040,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] @@ -7334,7 +7060,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.77", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f463cf5..f7c91a3 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,6 @@ members = [ [dependencies] migration = { path = "migration", name = "migration" } crypter = { path = "crypter", name = "crypter" } -capnp-rpc = "0.19.4" salvo = { version = "0.71.1", features = ["full"] } actix = "0.13.5" actix-web = "4" @@ -30,7 +29,6 @@ is_type = "0.2.1" rayon = "1.10.0" rand = "0.8" rslock = "0.4.0" -tonic = "0.12" futures = "0.3" deadpool-lapin = { version = "0.12.1", features = ["serde", "rt_tokio_1"] } deadpool-redis = "0.16.0" @@ -68,11 +66,6 @@ rand_chacha = "0.3" chacha20 = "0.9" indexmap = "2.4" - -[[bin]] -name = "playground" -path = "playground/main.rs" - [[bin]] path = "src/app.rs" name = "hoopoe" @@ -84,11 +77,5 @@ lto = true codegen-units = 1 overflow-checks = true # panic on any overflow -[build-dependencies] -tonic-build = "0.10.2" - -[dependencies.capnp] -version = "0.19.0" - [profile.dev.package.sqlx-macros] opt-level = 3 \ No newline at end of file diff --git a/docs/Rmq.md b/docs/Rmq.md index 63073da..8a52ad0 100644 --- a/docs/Rmq.md +++ b/docs/Rmq.md @@ -115,8 +115,8 @@ async fn main() -> Result<(), Box> { channel .basic_publish( - "", - request_queue, + "", // "" means direct exchange + request_queue, // in direct exchange routing key is the same as the queue name BasicPublishOptions::default(), request_payload.to_vec(), BasicProperties::default() @@ -207,8 +207,8 @@ async fn main() -> Result<(), Box> { let response_payload = b"Response Payload"; channel .basic_publish( - "", - reply_to.as_str(), + "", // "" means direct exchange + reply_to.as_str(), // in direct exchange routing key is the same as the queue name BasicPublishOptions::default(), response_payload.to_vec(), BasicProperties::default() diff --git a/playground/main.rs b/playground/main.rs deleted file mode 100644 index 5b5aaf8..0000000 --- a/playground/main.rs +++ /dev/null @@ -1,305 +0,0 @@ - - - -use rand::{thread_rng, Rng}; -use rand::seq::SliceRandom; -use std::error::Error; -use std::task::Context; -use std::thread; -use std::{collections::HashMap, sync::atomic::AtomicUsize}; -use std::net::SocketAddr; -use deadpool_redis::redis::{AsyncCommands, RedisResult}; -use futures::sink::Buffer; -use once_cell::sync::Lazy; -use serde::{Serialize, Deserialize}; -use tokio::net::TcpStream; -use deadpool_redis::{Config as DeadpoolRedisConfig, Runtime as DeadPoolRedisRuntime}; -use std::sync::Arc; -use tokio::sync::Mutex; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; - -// store users along with their tcp streaming channels -pub static ID_TRACKER: AtomicUsize = AtomicUsize::new(1); -pub static USERS_TCP_STREAM: Lazy>>, - tokio::sync::mpsc::Sender)>>>> = - Lazy::new(|| { - Arc::new(Mutex::new( - HashMap::new() - )) - } -); -pub static ONLINE_USERS: Lazy>>> = - Lazy::new(|| { - let users = HashMap::default(); - Arc::new(Mutex::new( - users - )) - } -); - - - -// Error part is an object safe trait which will be dispatched dynamically -#[tokio::main] -async fn main() -> Result<(), Box>{ - - let redis_password = ""; - let redis_host = ""; - let redis_port = ""; - let redis_username = ""; - let redis_conn_url = if !redis_password.is_empty(){ - format!("redis://:{}@{}:{}", redis_password, redis_host, redis_port) - } else if !redis_password.is_empty() && !redis_username.is_empty(){ - format!("redis://{}:{}@{}:{}", redis_username, redis_password, redis_host, redis_port) - } else{ - format!("redis://{}:{}", redis_host, redis_port) - }; - - - let redis_pool_cfg = DeadpoolRedisConfig::from_url(&redis_conn_url); - let redis_pool = redis_pool_cfg.create_pool(Some(DeadPoolRedisRuntime::Tokio1)).unwrap(); - - let listener = tokio::net::TcpListener::bind("0.0.0.0:8753").await.unwrap(); - tokio::spawn(async move{ - while let Ok((mut stream, addr)) = listener.accept().await{ - - let get_id_tracker = ID_TRACKER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - - let online_users = ONLINE_USERS.clone(); - let mut get_online_users = online_users.lock().await; - - // try to find an existing user with this address - // otherwise insert it into the map - *get_online_users.entry( - addr.to_string() - ).and_modify(|v| { *v; } /* keep the old id */ ) - .or_insert(get_id_tracker); - - let cloned_redis_pool = redis_pool.clone(); - // try to connect the user to a random online one - tokio::spawn(async move{ - let stream = std::sync::Arc::new(tokio::sync::Mutex::new(stream)); - // execute the connect logic inside another light io thread since - // the logic contains some async locking mechanism as well as io processing - // it's better not to suspend any async function for the locking process - // and gets done in a separate thread - connectMe(stream, addr, cloned_redis_pool).await; - }); - } - }); - - // since the socket stream needs to be mutable hence moving it - // between threads requires to be Arced and Mutexed to use it - // mutably in a safe manner inside other threads. - pub async fn connectMe(mut current_user_stream: std::sync::Arc>, - current_user: SocketAddr, redis_pool: deadpool_redis::Pool){ - - let users_streams = USERS_TCP_STREAM.clone(); - let mut get_users_streams = users_streams.lock().await; - - let online_users = ONLINE_USERS.clone(); - let mut get_online_users = online_users.lock().await; - let cloned_current_user_stream = current_user_stream.clone(); - - - let mut redis_conn = redis_pool.get().await.unwrap(); - let get_connected_users: String = redis_conn.get("connectedUsers").await.unwrap(); - let mut decoded_connected_users = serde_json::from_str::< - HashMap - >(&get_connected_users) - .unwrap(); - - let mut map_keys = get_online_users - .clone() - .into_iter() - .map(|(user, id)| user) - .collect::>(); - - map_keys.shuffle(&mut thread_rng()); - let mut found_user: String = String::from(""); - let mut found_id: usize = 0; - - for user in map_keys.clone(){ - - let id = get_online_users.get(&user).unwrap(); - - let mut redis_conn = redis_pool.get().await.unwrap(); - let get_connected_users: String = redis_conn.get("connectedUsers").await.unwrap(); - let mut decoded_connected_users = serde_json::from_str::< - HashMap - >(&get_connected_users) - .unwrap(); - - // the user to connect to must not be the current user as well as - // must not on redis in during the 2 mins period - // a user can't connect to a user which was connected 2 mins ago - if user == current_user.to_string() || decoded_connected_users.contains_key(&user){ - continue; - } else{ - - // start chatting with the first found user - found_id = *id; - found_user = user; - break; - } - } - - // store the user on redis, for 2 mins he won't be able to - // connect to the previous user - if !found_user.is_empty() && found_id != 0{ - decoded_connected_users.insert(found_user, found_id); - } - let encoded_connected_user = serde_json::to_string(&decoded_connected_users).unwrap(); - let _: () = redis_conn.set_ex("connectedUsers", &encoded_connected_user, 120).await.unwrap(); - - // receive msg bytes from the user tcp stream channel - let getUserSender = get_users_streams.get(&found_id).unwrap().1.clone(); - let getUserReceiver = get_users_streams.get(&found_id).unwrap().0.clone(); - let cloned_current_user_stream = cloned_current_user_stream.clone(); - - tokio::spawn(async move{ - - let mut buff = vec![]; - let mut getStream = cloned_current_user_stream.lock().await; - - while let Ok(rcvd_bytes) = getStream.read(&mut buff).await{ - if rcvd_bytes == 0{ - getStream.shutdown().await; // shutdown the stream, disconnect the connection - } - let current_user_msg = std::str::from_utf8(&buff[..rcvd_bytes]).unwrap(); - // send the msg bytes of the current user to the connected user channel - // connected user will use his receiver to receive the msg - getUserSender.send(current_user_msg.to_string()).await; - - let cloned_getUserReceiver = getUserReceiver.clone(); - let mut getReceiver = cloned_getUserReceiver.lock().await; - // receive the connected user (user2) msg in here and send it through the - // current user (user1) tcp stream channel to the current user - while let Some(connected_user_msg) = getReceiver.recv().await{ - getStream.write_all(connected_user_msg.as_bytes()).await; - } - } - }); - - } - - pub async fn disconnectMe(mut current_user_stream: std::sync::Arc>, - user_id: usize, current_user: SocketAddr, redis_pool: deadpool_redis::Pool){ - - let cloned_current_user_stream = current_user_stream.clone(); - // lockcing as an async task inside a light io thread - tokio::spawn(async move{ - let mut getStream = cloned_current_user_stream.lock().await; - getStream.shutdown().await; // close the current user tcp streaming channel - - // try to remove the user from online users - let online_users = ONLINE_USERS.clone(); - let mut get_online_users = online_users.lock().await; - (*get_online_users).remove(¤t_user.to_string()).unwrap(); - - }); - - } - - // ---====---====---====---====---====---====---====---====---====---====---====---==== - - // an eventloop is a thread safe mpsc receiver queue - #[derive(Clone)] - struct EventLoop{ - // a thread safe receiver queue - pub queue: std::sync::Arc>>, - } - impl EventLoop{ - pub async fn on(&mut self, event_name: &str, triggerer: F) - where F: Fn(T) -> R + Send + Sync, - R: std::future::Future + Send + Sync - { - println!("[*] triggering {:?} event", event_name); - let mut get_queue = self.queue.lock().await; - while let Some(event) = get_queue.recv().await{ - triggerer(event); - } - } - - pub async fn cronScheduling(&mut self, func: F, period: u64) - where F: Fn() -> R + Send + Sync + 'static + Clone, - R: std::future::Future + Send + Sync + 'static - { - - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(period)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - // defining an async context or future object - let fut = async move{ - let cloned_task = func.clone(); - loop{ - interval.tick().await; - // since the closure returns a future we can run it - // in the background tokio thread - tokio::spawn(func()); // the closure however returns a future - } - }; - - // execute the future object in the background thread worker - // without waiting for the result - tokio::spawn(fut); - - } - - } - - #[derive(Clone)] - struct BufferEvent{ - pub data: std::sync::Arc>> - } - #[derive(Serialize, Deserialize, Clone, Debug)] - struct EventData{ - pub owner: String, - pub data: String, - pub recv_time: i64 - } - - let (tx, mut rx) = tokio::sync::mpsc::channel::(100); - let mut eventloop = EventLoop::{ - queue: std::sync::Arc::new(tokio::sync::Mutex::new(rx)) - }; - - /* ------------------------------------- - .await; suspend the task and tell runtime i need the result, if the - result wasn't ready runtime continue executing other task by poping - them out from the eventloop and suspend the awaited task in there until - the future completes and result becomes ready, not awaiting means we - don't care about the result, runtime executes task in the background - light io thread without having any suspention hence we could continue - with the rest of the code, thus if you need the result of async task - like sending it to channel, await on it, HOWEVER this won't block the - light thread. - also we could use tokio::select to control the execution flow of - the app in async context and get the result of whatever the async - task has solved sooner than the other. - tokio::spawn() is a place where async task can be executed by the runtime - scheduler, it's a lightweight thread of execution where async tasks - will be awaited in there without blocking the thread. - */ - tokio::spawn( - { - let mut eventloop = eventloop.clone(); - async move{ - // once we receive a buffer event we'll be decoding it - // into EventData structure - eventloop.on("receive", |e| async move{ - let get_event_data = e.data.lock().await; - let event_data = serde_json:: - from_slice::(&get_event_data) - .unwrap(); - println!("[*] received event: {:?}", event_data); - - }).await; - } - } - ); - - Ok(()) - -} \ No newline at end of file diff --git a/proto/notif.proto b/proto/notif.proto new file mode 100644 index 0000000..e69de29 diff --git a/src/.DS_Store b/src/.DS_Store index 43faeb6..abb2f7a 100644 Binary files a/src/.DS_Store and b/src/.DS_Store differ diff --git a/src/app.rs b/src/app.rs index eca734f..11fe778 100644 --- a/src/app.rs +++ b/src/app.rs @@ -82,7 +82,6 @@ use indexmap::IndexMap; use models::event::NotifData; use salvo::http::response; use serde_json::Value; -use tests::{actor, orex}; use workers::notif::{self, NotifBrokerActor}; use std::io::BufWriter; use std::str::FromStr; diff --git a/src/middlewares/check_token.rs b/src/middlewares/check_token.rs index aba7609..7918c71 100644 --- a/src/middlewares/check_token.rs +++ b/src/middlewares/check_token.rs @@ -1,7 +1,6 @@ -use capnp::data; use context::AppContext; use interfaces::passport::Passport; use models::user::UserData; diff --git a/src/storage.rs b/src/storage.rs index d6a85f7..20a5c1c 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,4 +1,5 @@ -pub mod engine; \ No newline at end of file +pub mod engine; +pub mod cruft; \ No newline at end of file diff --git a/src/tests/engine.rs b/src/storage/cruft.rs similarity index 99% rename from src/tests/engine.rs rename to src/storage/cruft.rs index 79cbfb5..470ff0b 100644 --- a/src/tests/engine.rs +++ b/src/storage/cruft.rs @@ -6,6 +6,7 @@ use std::{sync::atomic::AtomicU8, time::{SystemTime, UNIX_EPOCH}}; use tokio::task::JoinHandle; use actix::prelude::*; +use uuid::Uuid; use crate::*; diff --git a/src/tests/.DS_Store b/src/tests/.DS_Store deleted file mode 100644 index 5008ddf..0000000 Binary files a/src/tests/.DS_Store and /dev/null differ diff --git a/src/tests/actor.rs b/src/tests/actor.rs deleted file mode 100644 index e14d0ad..0000000 --- a/src/tests/actor.rs +++ /dev/null @@ -1,1238 +0,0 @@ - - - -// ------------------------------------------------------------ -/* actor worker threadpool implementations from scratch - - https://ryhl.io/blog/actors-with-tokio/ - https://medium.com/@maturationofthe/leveraging-rusts-tokio-library-for-asynchronous-actor-model-cf6d477afb19 - https://www.reddit.com/r/rust/comments/xec77k/rayon_or_tokio_for_heavy_filesystem_io_workloads/ - - - What is an Actor? - Actor is a threadpool or a single threaded structure which has its own mailbox and cron scheduler - to receive and execute tasks inside its thread of execution it can use tokio or os threads to execute - async io or cpu tasks, they talk through message sending patterns like mpsc in local and grpc remotely - the message or task execution can be happened by receiving the task from the actor eventloop which is - the receiver of the mailbox jobq mpsc channel in local or the grpc remotely then execute in a free - thread if it has threadpool or its own single thread of execution, the thread can be either a light - or os thread for executing async io or intensive cpu tasks. - - runtime takes the async task and put it inside the thread queue - then at an appropriate time the scheduler pop the task out of the - thread queue to execute it and if a thread is free it tries to steal - the task from other threads, once the task gets executed the runtime - waker of future object poll the task result out and other codes can - fill the placeholder to use the actual value of the solved task. - async tasks execution flow in user space level (not os threads): - >_ code : developer put a future on the stack - >_ code : hey runtime scheduler, developer did an await on the task in a lightweight thread of execution of an actor which has its own stack! - >_ code : if you don't mind me asking, can i have the result right now? - >_ runtime scheduler: allow me to poll the result out of the future stack,... oh no sorry the future is not ready yet! i'll go for other tasks and notify you once it gets ready - >_ code : alright sir, i'll keep executing other tasks, perhaps sending the result through an mpsc channel to.... - >_ runtime scheduler: hey i've just received the result of the future you wanted earlier from the waker, here is the result. - >_ code : as i was saying, ....to outside of this current thread! great other threads and scopes can now use the result as they're receiving the result from the channel that i've sent earlier. - - the threadpool of tokio is used for light io and rayon is used for cpu heavy io and is global and shared - across the program to speedup the execution with jobq mpsc channel like the one in actor object to execute - received message or future task/job in its own thread. however in those days that async wasn't there we had - been using the queue concepts to queue some tasks then execute them one by one but with the arrival of the - async tasks/jobs/processes we can just wait for multiple tasks while we're executing others simultaneously - and concurrently, here is the concurrency model for actor worker object (actix): - > example of executing async task: - - handle each socket in the background asyncly in a tokio spawn like writing bytes to a networking interface like tcp based protocols (ws, http, rmq, redis and rpc) - - mutating an object in a separate thread by locking on the mutex to acquire the lock to avoid blocking the current thread - - waiting for some io to get received while other tasks are being executed simultaneously - ::> in this case the runtime scheduler check for the task to poll out of the stack if - it has completed otherwise the waker fo the future object notify the scheduler as - soon as the task gets completed. - > creating new actor object creates new lightweight thread of execution using tokio spawn to execute asyc tasks inside that - > thread management: each actor has its own lighweight thread of execution (an actor object is a lightweight worker thread) - > async task process execution in the background: use tokio::spawn over tokio runtime scheduler - > send async task resp across threads: use jobq based channels mailboxes, mpsc or rpc - > async task example: atomic syncing for mutating data in threads using arc mutex - > control over the execution flow: use tokio select to only join that thread which is completed sooner than others - > distributed clustering talking: use rpc for sending message and calling each other methdos - conclusion: - A threadpool has its own internal eventloop queue for popping out tasks. - actor is a simple structure that can be used to execute async tasks and jobs in the whole - actor system threadpool they can also communicate and send message to each other by using - their mailbox, mailbox gives each actor a unique address to send message togehter using - channels like mpsc and oneshot in local and rpc and rmq in a remote manner. - if there are two many variables to be stored on CPU registers they'll be stored on the stack - related to the current thread cause each thread gets a seprate stack. - tokio runtime will execute all async tasks in its lightweight threads when we put the - #[tokio::main] above the main function we can also spawn async task inside a separate - lightweight thread manually by using tokio::spawn which contains lightweight thread workers - can be used to build actor to execute tasks inside of it hence talk with mpsc channels, we - can build multithreaded web servers upon tokio runtime in which each socket will be handled - inside tokio spawn threads as well as executing each api - - ==================================== - HOW 2 USE THESE IMPLEMENTATIONS: - ==================================== - for worker in 0..10{ //// spawning tokio green threads for 10 workers - tokio::spawn(async move{ //// spawning tokio worker green threadpool to solve async task - - //// any heavy logic here that must be shared using tokio channels inside a threadpool - //// ... - - }); - } - - let mut tokio_async_worker = AsyncWorker::new(); - let mut native_sync_worker = NativeSyncWorker::spawn(n_workers); - let mut rayon_sync_worker = RayonSyncWorker::new(); - let (sender, receiver) = std_mpsc::channel::(); - let cloned_sender = sender.clone(); - - native_sync_worker.execute(move ||{ - let async_heavy_method = || async move{ - // mining(); - let big_end_bytes = number.to_be_bytes(); - let index = 0; - let new_chunk = cloned_ops(big_end_bytes[index]); - cloned_sender.send(new_chunk).unwrap(); - } - block_on(async_heavy_method()); - }); - - rayon_sync_worker.spawn(move ||{ - block_on(async_heavy_method()); - }); - - tokio_async_worker.spawn(async move{ - async_heavy_method().await; - Ok(()) - }) - tokio_async_worker.execute().await // wait for all the workers of this worker to complete if there were any - - - let bytes: Vec = receiver.iter().take(n_workers).collect() // collecting data from all workers - -*/ - -use std::any::Any; -use std::ops::{Deref, DerefMut}; -use std::{default, fmt, thread}; -use std::sync::mpsc as std_mpsc; -use futures::future::select; -use sha2::digest::generic_array::functional; -use tokio::sync::mpsc; -use futures_util::Future; -use is_type::Is; -use uuid::Uuid; -use std::sync::{Arc, Mutex}; -use log::info; -use crate::*; - - -struct ActerMessage{ - pub to: String, - pub from: String, - pub body: String, -} - -#[derive(Clone)] -struct Acter{ - // std::sync::Mutex is not Send so we can't move it into tokio spawn - // we must use tokio Mutex - pub mailbox: Arc>>, - pub communicator: tokio::sync::mpsc::Sender, -} - -impl Acter{ - - pub async fn send(&mut self) -> std::pin::Pin>>{ - todo!() - } - - // execute tasks and messages in its threadpool (mainly tokio spawn) - pub async fn execute(&mut self){ - - // use custom worker threadpool - let this = self.clone(); - let mut pool = workerthreadpool::sync::RayonSyncWorker::new(); - pool.spawn(Box::new(||{ - // don't deref the arced_mutex_mailbox since Clone is not implemented for that - // and can't be move out of the type since derefing return the owned type it's - // kinda like clone the type - tokio::spawn(async move{ - // PROBLEM: can't move out self because it's behind a mutable pointer - // some how we should move it to tokio scope without losing ownership - // passing its ref to tokio scope is not ok since the reference won't - // be valid and must be static cause self is only valid inside the method - // body and by moving the &self.mailbox, the pointer will be in tokio scope - // and the `self` itself will be dropped out once the method gets executed - // so it escapes the method body, - // SOLUTION: use clone, Box, Arc, (Rc is for single thread) - // we've used Arc and Mutex to make it sendable, shareable and safe to share - let arced_mutex_mailbox = this.mailbox.clone(); - let mut mailbox = arced_mutex_mailbox.lock().await; - while let Some(task) = (*mailbox).recv().await{ - // ... - } - - }); - })); - - pool.execute().await; // it will receive the spawned task inside a free thread then it call it - - } - - pub async fn start(&mut self) -> Self { - // create mailbox and communicator - // ... - todo!() - } - -} - - -pub mod workerthreadpool{ - - /* - --------------- GUIDE TO CREATE A MULTITHREADED WEB SERVER --------------- - every worker is a thread with an id along with the thread itself, a threadpool is a vector containing the number - of spawned workers then we'll send the async job to the sender channel by calling the execute method and while we're - streaming over the arced mutexed receiver inside each thread we receives the task in on of those free threads and - finally call the async task, the spawning threads in the background part are done inside the spawn() method also every - worker threadpool needs a channel and multiple spawned threads in the background so we could send the task to the - channel and receives it in one of the free spawned thread so the steps would be: - 1 - create channel and spawn threads in the background waiting to receive from the channel by calling new() method - 2 - pass the task to spawn() or execute() method then send it to channel - 3 - make sure receiver is of type Arc> - - how threadpool works? - once the execute method is called the task is sent to the jobq channel - spawned threads on the other hand are thrilling to receive the task - coming from the channel that's why we should put the receiver inside - mutex and make it arc to move it between threads, once a free thread - acquire the lock on the receiver then the task can be called inside - that thread - */ - pub use super::*; - - // ------------------------------------------------------------ - // ------------ aync worker threadpool using tokio ------------ - // ------------------------------------------------------------ - // async worker pool scheduler using tokio based on mpsc jobq channel protocol - // this scheduler is used for asynchronous IO by not blocking the thread using tokio green threads - pub mod _async{ - - use super::*; - - pub struct AsyncWorker{ - count: usize, // number of workers - sender: mpsc::UnboundedSender>, // sender async side with no byte limitation - receiver: mpsc::UnboundedReceiver>, // receiver async side with no byte limitation - } - - - impl AsyncWorker{ // E can be shared between threads - - pub fn new() -> Self{ - let (sender, - receiver) = mpsc::unbounded_channel(); // async mpsc jobq channel channel with no byte limitation to avoid deadlocks and race conditions - AsyncWorker{ - count: 0, // will be equaled to the number of workers by solving all the jobs which are comming to the downside of the mpsc jobq channel - sender, - receiver - } - } - - pub fn spawn(&mut self, task: T) - where - T: Future> + Send + 'static, // T can be shared between threads - T::Output: Is>, // T is a future and now we can access the Output type to make sure that is of type Result<(), E> - T::Output is the GAT of the Future trait - { - let sender = self.sender.clone(); - tokio::spawn(async move{ // spawn the task inside tokio green threads - let res = task.await; - match sender.send(res.into_val()){ - Ok(()) => (), - Err(_) => panic!("Impossible Panic for Sender"), - } - }); - self.count += 1; - } - - - pub async fn execute(mut self) -> Result<(), E>{ - - std::mem::drop(self.sender); // make sure that the sender is dead since we want to receive all the messages and avoid deadlocks and race condition - let mut index = 0; - - loop{ // we can use while let Some() syntax - match self.receiver.recv().await{ - Some(Ok(())) => { - assert!(index < self.count); - } - Some(Err(e)) => { - assert!(index < self.count); - return Err(e); - } - None => { - assert_eq!(index, self.count); - break Ok(()); // return this to the main - } - } - index+=1; - } - - } - - } - - type AsyncJob = Box F + Send + Sync + 'static>; - fn create_async_job(job: impl FnOnce(I) -> F + Send + Sync + 'static) -> AsyncJob // static dispatch for job - where F: std::future::Future, - O: Send + Sync + 'static - { - Box::new(job) - } - async fn run(){ - let async_job = create_async_job::(|status|async move{String::from("")}); - tokio::spawn(async move{ - async_job(0).await; - }); - } - - - } - - - // ---------------------------------------------------------------------------------- - // ------------ none async worker threadpool using rayon and std::thread ------------ - // ---------------------------------------------------------------------------------- - // a sync task scheduler (worker pool) with mpsc as the jobq channel protocol - // this scheduler is used for synchronous IO by blocking the thread using rust native std thread - alternative to this is rayon - pub mod sync{ - - - use super::*; - - type Job = Box; // a job is of type closure which must be Send and static across all threads inside a Box on the heap - - - //// there is no guaranteed order of execution for spawns, given that other threads - //// may steal tasks at any time, however, they are generally prioritized in a LIFO order - //// on the thread from which they were spawned, other threads always steal from the - //// other end of the deque, like FIFO order, the idea is that recent tasks are most - //// likely to be fresh in the local CPU's cache, while other threads can steal older stale tasks. - pub struct RayonSyncWorker{ - count: usize, // number of workers - sender: mpsc::UnboundedSender, // sender async side with no byte limitation - receiver: mpsc::UnboundedReceiver, // receiver async side with no byte limitation - } - - - impl RayonSyncWorker{ - - pub fn new() -> Self{ - let (sender, - receiver) = mpsc::unbounded_channel(); // async mpsc jobq channel channel with no byte limitation to avoid deadlocks and race conditions - RayonSyncWorker{ - count: 0, // will be equaled to the number of workers by solving all the jobs which are comming to the downside of the mpsc jobq channel - sender, - receiver - } - } - - pub fn spawn(&mut self, task: Job) - where - { - let sender = self.sender.clone(); - rayon::spawn(move || { // firing off a task into the rayon threadpool in the 'static or global scope - match sender.send(task){ - Ok(()) => (), - Err(_) => panic!("Impossible Panic for Sender"), - } - }); - self.count += 1; - } - - pub async fn execute(mut self) -> Result<(), Box>{ - - std::mem::drop(self.sender); // make sure that the sender is dead since we want to receive all the messages and avoid deadlocks and race condition - let mut index = 0; - - loop{ // we can use while let Some() syntax - match self.receiver.recv().await{ - Some(job) => { - job(); - assert!(index < self.count); - }, - None => { - assert_eq!(index, self.count); - break Ok(()); // return this to the main - } - } - index+=1; - } - - } - - } - - /* - NOTE: the process of heavy cpu io must not be blocking that's why rayon is not - going to be used for none blocking operations cause it moves the tasks into the cpu - core instead of using threads per cpu it uses one thread per each cpu core, tokio - however can be used for io blocking tasks cause it uses lightweight thread of - execution and it blocks a light thread. - lightweight threads in tokio is user thread space and naitive threads in rayon is os - threads, the first ones have less overhead than the seconds ones. - spawning native threads are too slow since thread handling in rust is depends - on user base context switching means that based on the load of the IO in the - app rust might solve the data load inside another cpu core and use multiprocessing - approach, it's like rayon threadpool which are global threads and shared across the app - which causes the race with other threads and steal tasks - • https://www.reddit.com/r/rust/comments/az9ogy/help_am_i_doing_something_wrong_or_do_threads/ - • https://www.reddit.com/r/rust/comments/cz4bt8/is_there_a_simple_way_to_create_lightweight/ - */ - struct Worker{ - id: Uuid, - thread: Option>, //// thread is of type JoinHandld struct which return nothing or () - } - - pub struct NativeSyncWorker { - workers: Vec, - sender: std_mpsc::Sender, // all sends will be asynchronous and they never block - } - - enum Message { - NewJob(Job), - Terminate, - } - - impl NativeSyncWorker{ - - pub fn spawn(size: usize) -> NativeSyncWorker { - assert!(size > 0); - let (sender, receiver) = std_mpsc::channel(); - let receiver = Arc::new(Mutex::new(receiver)); // reading and writing from an IO must be mutable thus the receiver must be inside a Mutex cause data inside Arc can't be borrows as mutable since the receiver read operation is a mutable process - let mut workers = Vec::with_capacity(size); // capacity is not always equals to the length and the capacity of this vector is same as the maximum size based on the system arch, on 32 bits arch usize is 4 bytes and on 64 bits arch usize is 8 bytes - for _ in 0..size { // since the receiver is not bounded to trait Clone we must clone it using Arc in each iteration cause we want to share it between multiple threads to get what the sender has sent - workers.push(Worker::new(Uuid::new_v4(), Arc::clone(&receiver))); - } - NativeSyncWorker{workers, sender} - } - - pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static { // calling this method means send the incoming task from the process through the mpsc sender to down side of the channel in order to block a free thread using the receiver on locking the mutex - let job = Box::new(f); - self.sender.send(Message::NewJob(job)).unwrap(); // by executing the task handler sender will send a job asynchronously and only one receiver at a time can get that job and solve it by locking on the mutex to block the choosen thread since thread safe programming is all about this pattern! - } - } - - impl Drop for NativeSyncWorker{ // shutting down all threads on ctrl + c by dropping all of them - fn drop(&mut self) { // destructor for NativeSyncWorker struct - info!("Sending terminate message to all workers."); - for _ in &self.workers { - self.sender.send(Message::Terminate).unwrap(); - } - info!("Shutting down all workers."); - for worker in &mut self.workers { - info!("Shutting down worker {}", worker.id); - if let Some(thread) = worker.thread.take(){ // take() takes the value out of the option, leaving a None in its place - thread.join().unwrap(); // joining on thread will block the current thread to get the computation result and stop the thread from being processed in the background - } - } - } - } - - impl Worker{ - fn new(id: Uuid, receiver: Arc>>) -> Worker { - let thread = thread::spawn(move || loop { // spawning a thread inside the new() method and waiting for the receiver until a job becomes available to down side of the channel - // following is called the eventloop handler, handles and execute - // all coming tasks from the channel in a loop using while let some - // streaming and execute them in a separate thread as they're getting - // received. - while let Ok(message) = receiver.lock().unwrap().recv(){ // iterate through the receiver to get all incoming messages - since other thread shouldn't mutate this message while this thread is waiting for the job we must do a locking on the message received from the sender to acquire the mutex by blocking the current thread to avoid being in dead lock, shared state and race condition situation - match message { - Message::NewJob(job) => { - info!("Worker {} got a job; executing.", id); - job(); // this might be an async task or job spawned by the tokio spawner in the background - } - Message::Terminate => { - info!("Worker {} was told to terminate.", id); - break; // break the loop of this worker so we are not interested in solving task any more - } - } - } - }); - Worker { - id, - thread: Some(thread), - } - } - } - - - } -} - -// see actor.rs for a real world example of writing a threadpool actor object -// executor uses an event loop to execute tasks in their own thread. -pub async fn ExecutorEventLoop(){ - - // actor worker streaming with while let some, ws, atomic addr, mailbox, rpc, redis, rmq) - // actor worker task execution with io vs os threads (lazy static &mut rc arc mutex rwlock select spawn channels scheudler interval) - // thread queue and runtime scheduler to pop the task out of the queue, send sync static separate io threads vs os threads thread joining - - - // concurrency : (&mut <-> Mutex RwLock) - // threadpool : light or os threads, eventloop (threadpool channel queue to handle events in a loop see actor.rs with threadpool) - // atomic syncing : channels mutex rwlock arc select - // future objects : async io task, thread joining on main (caller) thread - // purchase locking : lock the object when someone is minting it using select mutex spawn - - /* - simple executor without threadpool but with eventloop: - task is a unit of work thread or groutine that must be executed - by runtime executor by poping it out of the queue, where each task - runs to completion before the next task starts. - it's similar to how tokio event loop manages async io future object - using an eventloop in its lightweight thread of execution - */ - - use tokio::sync::{mpsc::channel, Mutex}; - enum EventData{ - Task(Task), - Quit - } - // traits as separate objects must be behind dyn keyword - // and boxed into the heap cause they're unsized - type Function = fn(); - struct Fucntions + Send + Sync + 'static>{ - pub func: Function, - pub cls: F, - pub boxed_cls: Box, - pub fut: A, - pub fut1: std::pin::Pin + Send + Sync + 'static>> - } - struct Task{ - name: String, - func: Box, // closures are traits, traits are not sized thus we use dyn and need to behind pointer hence Box + Send + Sync + 'static>>>, - } - impl Task{ - fn new(name: String, func: F) -> Self{ - Task { name, func: Box::new(func), fut: None } - } - fn run(self){ // don't use &self in here cause we can't move out of it since func is not cloned - (self.func)() // calling the the function - } - } - struct Executor{ - pub tx: mpsc::Sender, - pub rx: mpsc::Receiver - } - impl Executor{ - fn new(buffer_size: usize) -> Self{ - let (tx, rx) = tokio::sync::mpsc::channel::(buffer_size); - Executor{ - tx, - rx - } - } - fn spawn(&mut self, name: String, func: F){ - self.tx.send(EventData::Task(Task::new(name, func))); - } - // the event lopp - async fn run(&mut self){ - // await puases the execution and run the future until it completes - // futures don't block the thread of execution - while let Some(event_data) = self.rx.recv().await{ - match event_data{ - EventData::Task(task) => { - log::info!("executing the task with name: {}", task.name); - std::thread::spawn(move || task.run()); - }, - EventData::Quit => { - break; - }, - _ => { - panic!("invalid event data, event loop is panicked"); - } - } - } - } - } - -} - -pub fn StockPriceEvent(){ - - /* -------------------------------------------------------------------- - condvar is used to check that either a condition var is met - inside a mutex or not, this will block the mutex thread by - waiting on it until this cond var receives a notification in - somewhere else. - CondVar: block the thread such that it consumes no CPU time while - waiting for an event to occur, other threads can do their jobs - functions in this module will block the current thread of - execution, condvars are typically associated with a boolean - predicate (a condition) and a mutex, the predicate is always - verified inside of the mutex before determining that a - thread must block. - condvar blocks the mutex thread by waiting until new changes - is received which has notified in other threads. - what we're doing here is basically we're monitoring a stock price in - a safe manner in such a way that we're creating 10 threads, each of - them wants to mutate the price of the stock instance but the actual - instance is wrapped through a mutex and has a condvar, the updating - process is happened like we start by locking on the stock instance - then call update price method after that notify the condvar that the - value of the price of the stock instance has changed so the notification - process is happening inside each thread. - then in the function we're calling the wait for release which lock the - stock again and checks its price against a limit causes to block the - main (caller) thread until the price of the stock is smaller than the limit, it - depends on the update price every time the update price function update - the price of the stock a notif gets triggered which will be checked - by the wait for release method to check the price agains the limit this - process continues constantly the main (caller) thread is blocked until the price - reaches a higher amount than the limit. - */ - - - use std::sync::{Arc, Mutex, Condvar}; - - struct Buffer{ - pub data: Arc>>, - pub size: usize - } - - #[derive(Debug, Clone)] - struct Stock{ - name: String, - price: f64 - } - impl Stock{ - fn new(name: &str, price: f64) -> Self{ - Self { name: name.to_string(), price } - } - fn getPrice(&self) -> f64{ - self.price - } - fn getName(&self) -> &str{ // ret pointer, use the lifetime of the self - &self.name - } - fn updatePrice(&mut self, new_price: f64){ - self.price = new_price; - } - } - - // worker, locker - struct Monitor{ - pub event: std::sync::Mutex, - pub events: Option>, - pub event_signal: Condvar, - pub std_worker: thread::JoinHandle<()>, - pub tokio_worker: tokio::task::JoinHandle<()>, - pub locker: std::sync::Mutex<()> - } - - impl Monitor{ - fn new(init_event: Stock) -> Self{ - Self { - events: None, - event: std::sync::Mutex::new(init_event), - event_signal: Condvar::new(), - std_worker: thread::spawn(move ||{}), - tokio_worker: tokio::spawn(async move{}), - locker: std::sync::Mutex::new(()) - } - } - - fn update_price(&self, new_price: f64){ - let mut get_stock = self.event.lock().unwrap(); - (*get_stock).updatePrice(new_price); - - // wakes up one blocked mutex thread on this condvar - // we notify the condvar that the stock price is changed - // the update_price method locks the Stock instance, updates - // its price, and then calls notify_one() on the Condvar. - // this notifies any thread waiting or blocking on the Condvar - // that the stock price has changed. - self.event_signal.notify_one() // notify the blocked thread that the value has changed, wake it up from the wait status - } - - /* - once the price of the locked stock reaches the limit we wait, wait blocks the current thread - until this condition variable receives a notification which will be triggered inside the - update_price method means that the price of the stock has changed and we need to block the - mutex thread again until we reaches the limit again for the stock price. - in the wait_for_release() method, we lock the Stock object. it then enters a loop where - it continually checks if the price of the Stock is less than a certain limit. if the price - is less than the limit, the method calls the self.event_signal.wait(get_stock) - method. this block the current (main) thread of the mutex, until another thread calls notify_one() - or notify_all() on the same Condvar - the consequence of this, is that if the price of the Stock is initially less than the limit, - this method will block the current (main) thread until the price increases to the limit or above. - this will allow other threads to update the price of the Stock while the current (main) thread is - blocked. once the prices reaches the limit, the wait() method will return. the method will - exit the loop and continue executing. - using a Condvar in this way, we can effectively manage access to the Stock. By using the - wait_for_release() method, the main (caller) thread waits for the price of the Stock to reach a certain - limit before proceeding. this is useful in scenarios where the order of operations matters, - for example when one operation depends on the result of another. example scenarios would be - things like managing stocks, purchasing a product, or a warehouse ledger system. - */ - fn wait_for_release(&self){ - let limit = 115.0; - let mut get_stock = self.event.lock().unwrap(); - while get_stock.getPrice() < limit{ // we block and wait as long as the the price is smaller than the limit - get_stock = self.event_signal.wait(get_stock).unwrap(); - } - - } - - } - - - /* - testing: - basically in here we're updating the price - in 10 threads and block the main (caller) thread if - the price is smaller than the limit until - we notify the blocked thread by the condvar - that the price value is changed, then there - would be no need to wait for the notif until - another thread tries to update the price. - we spawn the update_price() method inside 10 - threads then block the main (caller) thread if the price - is not met the limit finally we iterate through - all the threads to join them on the main (caller) thread - and wait for them to finish. - waiting in os threads means blocking the thread - until we get the result. - */ - - // arc monitor to move it between threads - let monitor = Arc::new(Monitor::new(Stock::new("DOGTOKEN", 100.0))); - let threads = (0..10) - .map(|counter|{ - let cloned_monitor = monitor.clone(); - // we'll update the price of the monitor instance in a separate 10 of threads - thread::spawn(move ||{ - cloned_monitor.update_price(110.0 + 2.0*(counter as f64)); - }) - }) - .collect::>(); // if you don't know the type use _ - - // we'll check the price of the stock against the limit - // if it was less than the limit then we'll block the main - // thread until the notifier notify the condvar in another - // thread with a new value of the price, then we'll wait and - // block the thread until the price reaches higher than the limit again. - // ------- this blocks the main (caller) thread ------- - monitor.wait_for_release(); - - // join on all threads in main (caller) thread to execute the stock price task - for thread in threads{ - thread.join().unwrap(); - } - - // finally get the final value of the stock event after all mutations - let final_value = monitor.event.lock().unwrap(); - println!("final value of the stock is {:?}", final_value); - - - // wait_for_release() method blocks the main (caller) thread until we reach - // the limit, or receives a notification from the condvar which might - // happens in another thread by updating the price of the stock. - - /* - product minting: - - condvar with tokio spawn threads: use a condvar with a mutex and lock the product id in a tokio - io thread then check while the product is still locked or its state is not minted yet, - we'll block the current thread until the notifier notifies the condvar once the - product gets minted successfully and its state changed to minted then we'll remove it - from the lock_ids. - - channels with tokio spawn thrads: use a mutex and lock the product id in a tokio io thread - then send a true flag to a channel if the product id is being locked then start minting - product in another tokio io thread finally use select to control the flow of execution - of each joinhandle task. - */ - - -} - -pub fn MutexCondvarPlayground(){ - - /* - in monolithic : use mutex, rwlock, arc, channels, spawn, select for atomic syncing and avoiding deadlocks - in microservice: use distributed lock tools like redlock, zookeeper, k8s - eventloop streaming with channels as publisher and subscriber of events like condvar - atomic syncing with mutex, condvar and channels in async and none async block - - none async version: std threadpool executor eventloop, channels, mutex will block the current thread and wait for the task to gets executed - async version : tokio threadpool executor eventloop, channel, mutex won't block the thread and execute tasks in the background thread - - NOTE: don't use Condvar in async environment cause it blocks the thread - NOTE: Mutex in both std and tokio will block the thread to make sure that only one thread can access data for writing - - use channel to share data between threads instead of Mutex or RwLock - tokio::spawn(), tokio::select! {}, channels, tokio::sync::Mutex - execute async io future obejcts in an io thread in a none blocking manner - there is no need to block threads cause we want to execute other tasks - without blocking any code flow or section in a concurrent manner. - - use mutex to ensure that only one thread can access the protected data. - mutex always block the current thread to ensure that only one thread can mutate the data. - do the locking in a separate thread if you care about executing rest of the codes in a none blocking manner. - use condvar to block the thread or wait for a notification on some value changes. - joining on thread block the thread till we get the result from that - in none async env we should use os threads which might get block during the execution when we need the result inside the thread. - - tokio runtime scheduler and executor can execute async io tasks in a none blocking manner - by awaiting on async tasks the executor pause the execution in there but don't block the thread - it goes to run and execute other tasks in other threads concurrently in a none blocking manner - it then notify the caller and fill the placeholder once the task completes. - */ - - - /* ---------------- BUCKET ---------------- - a bucket that contains a queue which is a pool of events - all the events are safe to be shared and mutated between threads. - it has a locker, worker, condvar and channels, eventloop streamer - to receive the task using receiver, use condvar with mutex - to lock the thread and wait for the notifier then if we want - to fill the bucket we should use either channels, or condvar mutex - */ - pub struct Bucket + Send + Sync>{ - pub signal: std::sync::Condvar, - pub broadcaster: std::sync::mpsc::Sender, - pub receiver: std::sync::mpsc::Receiver, - pub fut: F, - pub dep_injection_fut: std::pin::Pin>>, - pub worker_handler: std::thread::JoinHandle<()>, - pub queue: BufferEvent // a thread safe queue to mutate it - while !self.queue.is_empty() { pop the card out } - } - pub struct BufferEvent{ - pub data: std::sync::Arc>>, // Mutex always block the thread for mutating avoid accessing the data by other threads at the same time - pub size: usize - } - impl Drop for BufferEvent{ - fn drop(&mut self) { - self.data.lock().unwrap().clear(); // clear the whole events - } - } - /* ---------------------------------------- */ - - - // example: safe atomic data syncing between threads using mutex and condvar - #[derive(Debug, Clone)] - pub enum Color{ - Yellow, - Red, - Blue - } - #[derive(Clone, Debug)] - pub struct ExpensiveCar{ - pub brand: String, - pub color: Color - } - - impl fmt::Display for ExpensiveCar{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let color_string = format!("{:#?}", self.color); - write!(f, "brand: {}, color: {}", self.brand, color_string) // write to the current buffer formatter - } - } - - impl ExpensiveCar { - fn new(brand: &str, color: Color) -> ExpensiveCar { - ExpensiveCar { - brand: brand.to_string(), - color, - } - } - } - - #[derive(Debug)] - pub struct Garage{ - queue: Buffer, - guard: std::sync::Condvar, // the condvar notifier notifies the blocked or waiting thread that there might be data to be consumed. - } - - #[derive(Debug)] - pub struct Buffer{ - // Arc allows us to share between multiple threads by cloning the data (each thread gets its own ref to it) - // Mutex ensures only one thread at a time are mutating the data - pub data: std::sync::Arc>>, - } - impl Buffer{ - pub fn new() -> Self{ - Self{data: std::sync::Arc::new(std::sync::Mutex::new(vec![]))} - } - - pub fn size(&self) -> usize{ - let get_data = self.data.lock().unwrap(); - get_data.len() - } - } - - #[derive(Debug, Clone)] - pub struct PooledObject{ - pub item: T, // used to push back the last item into the data buffer when the pooled object is being dropped - pub pool: std::sync::Arc>>, - } - - // if you want to park means the garage must get blocked - // by the mutex and tells other threads that i'm busy with - // parking a car wait (block) until i release the lock and - // the car gets parked in there. - impl Garage{ - - pub fn new() -> Self{ - Self { queue: Buffer::new(), guard: std::sync::Condvar::new() } - } - - pub fn pool_size(&self) -> usize{ - self.queue.size() - } - - pub fn acquire(&self) -> Option>{ - let mut get_cars = self.queue.data.lock().unwrap(); - let pool_of_expensive_cars = (*get_cars).pop() - .map(|car| PooledObject{ // mapping the poped out element into a PooledObject - item: car, - pool: self.queue.data.clone(), - }); - pool_of_expensive_cars - } - - pub fn park(&self, car: ExpensiveCar){ // try to park an expensive car inside the garage - let mut get_queue = self.queue.data.lock().unwrap(); - (*get_queue).push(car); - - // notify the blocked thread that we have pushed a new car into the garage - // since the thread will be blocked if there would be no data in queue - // hence we should notify the thread as soon as a car comes into the garage - self.guard.notify_one(); - } - pub fn get(&self) -> ExpensiveCar{ - let mut get_queue = self.queue.data.lock().unwrap(); - while get_queue.is_empty(){ - get_queue = self.guard.wait(get_queue).unwrap(); - } - - // if the blocked thread gets woken up in here we remove the - // first item from the queue and return it since the get method - // method is about returning an expensive car - (*get_queue).remove(0) - } - - } - - - /* ---------------------------------------------------------------------- - impl ownership and borrowing traits for the PooledObject - deref mutable/immutable pointer will be called by the compiler at runtime - we can also use * to deref a type and get the value out of the pointer - deref any pointer (immutable or mutable) to the item of type T. - change the value of a pointer or &mut by dereferencing it like *v = 1; - - the drop, deref and derefmut will be called automatically at runtime - when the type wants to gets dropped out of the ram and dereferenced - by * or its inner value gets accessed by during the execution. - - note: pointers contain inner value of a type so accessing the inner - value requires to deref the pointer so when we use * it allows us to - access the inner value for mutating or reading, the Deref/DerefMut - traits however can be implemented for smart pointers like Mutex, Arc, Box - making them behave like regular references. - reimplementing the Deref/DerefMut trait methods for a type allows us - to call their methods on the object when we try to access the inner value - of the type by * or - - when we use * operator the deref or derefmut trait methods will be called - by the compiler as well as for smart pointers cause smart pointers implement - the traits any type that implements the deref or derefmut traits, trait methods - will be invoked once it’s dereferenced by * - */ - impl Drop for PooledObject{ - fn drop(&mut self) { - // we must push back the item into the pool cause we've poped it out - // in acquire method - println!("dropping pooled object, pushing back the item into the pool"); - let mut get_items = self.pool.lock().unwrap(); - (*get_items).push(self.item.clone()); - } - } - impl Deref for PooledObject{ // dereference the immutable pointer of type T which is the type of the pooled object item - type Target = T; - // compiler calls the deref() method once the type gets dereferenced by * immutably - fn deref(&self) -> &Self::Target { - println!("dereferencing pooled object"); - &self.item - } - } - impl DerefMut for PooledObject{ // dereference the mutable pointer of type T which is the type of the pooled object item - // this is useful when we have a mutable pointer to the last item like `ref mut` - // compiler calls the deref() method once the type gets dereferenced by * mutably - fn deref_mut(&mut self) -> &mut Self::Target { - println!("dereferencing pooled object mutably"); - &mut self.item - } - } - - /* - actor workers have os or ligh thread of executions (see actor.rs) - they can communicate with other parts through message passing techniques - using jobq channels, they're basically an smart threadpool object - with an isolated state from each other and are safe to mutate data - in their states. there must also be a task executor or some runtime - scheduler to handle task execution process in each thread. - tools: threadpool, mutex, spawn, arc, channels, executor and eventloop - */ - // worker threads - pub struct Handlers{ - pub producer: std::thread::JoinHandle<()>, - pub consumer: std::thread::JoinHandle<()> - } - - impl Handlers{ - pub fn new() -> Self{ - Self { producer: std::thread::spawn(move||{}), consumer: std::thread::spawn(move ||{}) } - } - pub fn produce(&mut self, garage: std::sync::Arc){ - // create 10 new expensive cars in a thread - // the producer threads, produces 10 expensive cars to park them - let cloned_garage = garage.clone(); - self.producer = thread::spawn(move || { - for i in 0..10{ - let new_expensive_car = ExpensiveCar::new("BMW", Color::Red); - cloned_garage.park(new_expensive_car); - } - }); - } - pub fn consume(&mut self, garage: std::sync::Arc){ - // get all the 10 cars from the garage one by one - // the consumer thread, consumes or get 10 expensive car from the garage - // since the garage queue is a lock hence we should wait and block the - // thread per each consumed data - self.consumer = thread::spawn(move ||{ - for i in 0..10{ - let get_expensive_car_from_queue = garage.get(); - println!("got a car: {}", get_expensive_car_from_queue); - } - }); - } - - } - - // with mutex and condvar we can control the producer and consumer - // threads at the same time, the first one is used to produce the - // data and the second one is used to consume the data by locking - // the data one at a time. - - let garage = std::sync::Arc::new(Garage::new()); - - - // ---------------------------------------------------------- - // poping all the cars out of the garage using consumer thread - // ---------------------------------------------------------- - let mut handler = Handlers::new(); - handler.produce(garage.clone()); // fill the garage - // handler.consume(garage.clone()); // pop out the cars one by one - - // join the both producer and consumer threads at the same time - // the first one is responsible for creating then parking cars - // inside the garage queue and second one is responsible for - // getting cars one by one from the garage, since the garage - // queue is a locker we should consider this that at the time of - // parking a new car we should notify the blocked thread since - // the thread or the consumer is blocked until a new card comes - // into the parking. - let Handlers { producer, consumer } = handler; - producer.join().unwrap(); - // consumer.join().unwrap(); - - // ---------------------------------------------------------- - // poping all the cars out of the garage using pooled object - // ---------------------------------------------------------- - - // testing pooledobject with mutable and immutable pointers to the - // the last item of type T - let pool = garage.clone(); - - // scope1 - { // define a new scope for fetching cars - let mut get_poped_out_item_with_pool = pool.acquire(); // acquireing the pooledobject - match get_poped_out_item_with_pool{ - // poped_out_item_with_pool is a mutable poitner to the PooledObject - // when dropping the object it calls the deref mut trait - Some(ref mut poped_out_item_with_pool) => { - println!("scope1 pooledobject : {:#?}", poped_out_item_with_pool.pool); - poped_out_item_with_pool.color = Color::Yellow; - }, - None => println!("there is no item!"), - } - - // after the scope has ended this object is dropped and automatically the last item returned to the pool - // since we're dereferencing mutably to mutate the color of the car the DerefMut trait will be called during - // this process and the log message will be printed out to the console. - } - - println!("-------------------------------------------"); - println!("pool size is : {:#?}", pool.pool_size()); - - // scope2 - let get_car = pool.acquire(); - match get_car{ - Some(ref car) => println!("scope2 pooledobject: {:?}", car), - None => println!("there is no item!"), - }; - - println!("-------------------------------------------"); - println!("pool size is : {:#?}", pool.pool_size()); - - // scope3 - let last_car = pool.acquire(); - match last_car{ - Some(ref car) => println!("scope3 pooledobject: {:?}", car), - None => println!("there is no item!"), - }; - - println!("-------------------------------------------"); - println!("pool size is : {:#?}", pool.pool_size()); - - -} - -pub async fn jobQChannelFromScratch(){ - - // use trait to pass different types to a function through a single interface - // use Any to try to cast any type that impls Any trait into an specific type - // pass different functions to a method using Fn closure - // dependency injection for observer field inside the Mutex (it can be other smart pointers like Arc and Box also) - - trait GetVal{ - fn getVal(&self) -> Self; - } - impl GetVal for T{ - fn getVal(&self) -> Self { - self.clone() - } - } - - struct Person{ - info: V, - // dependency injection with Mutex like Box smart pointers - subscribers: Vec>>, - } - - impl Person{ - pub fn new(info: V) -> Self{ - Self { info, subscribers: Vec::new() } - } - - // hanlding dynamic dispatch, supports any type through a single interface - pub fn set_info(&mut self, val: V){ - - // cast the value into the Any trait, we could use Box also - let any_val = &val as &dyn Any; - match any_val.downcast_ref::(){ - Some(string) => { - // ... - }, - None => { - println!("can't downcast it to string"); - } - }; - - self.info = val.getVal(); - self.publish(val); // name has changed - } - - // moving out of a type which is behind pointer takes - // the ownership of the type so we can't do this if the type is - // being used by the function scope and is behind a pointer, - // like self.name which returning it takes the ownership of - // the &self, Rust won't allow to return a pointer from the - // function since once the function gets executed all the inner - // types will be dropped out of the ram and having a pointer outside - // of the function would be meaningless unless we specify a valid - // lifetime for the pointer like using the self lifetime or pass - // directly to the function call addressing this issue would be either - // returning a pointer to the self.name or clone the self. - pub fn get_info(&self) -> &V{ - &self.info - } - - // this method adds new subscriber closure function to the current ones - pub fn subscribe(&mut self, f: F){ - self.subscribers.push(std::sync::Arc::new(std::sync::Mutex::new(f))); - } - - // this method publish a new value to all subscribers by iterating through - // all of them and call each closure function of them by passing the value - // the closure function body will be executed for each of the subscriber - fn publish(&self, value: V){ // this method notify all subscribers by passing the new value to each subscriber function - for subscriber in &self.subscribers{ // use & or clone the vector - if let Ok(subscriber_function) = subscriber.lock(){ - subscriber_function(value.clone()) // notify the subscriber with a new name, call the closure function and pass the new name to see the log - } - } - } - - } - - - // since the subscribe method has &mut self, in order to call - // the method on the instance in another thread we should wrap - // the instance in Mutex - let person = std::sync::Arc::new( - std::sync::Mutex::new( - Person::new(String::from("wildonion")) - ) - ); - - let cloned_person = std::sync::Arc::clone(&person); - - // -------------------------------------------------------- - // working with person object completely in another thread. - // -------------------------------------------------------- - let thread1 = thread::spawn(move ||{ - let mut person = cloned_person.lock().unwrap(); - person.subscribe(move |info| { - // the subscription logic goes here - // for now we're just logging things! - println!("[subthread subscriber]"); - println!("subscribing > value is : {}", info); - }); - - // updating the info field, will notify all subscribers - // and call their functions, which the logic has written right above - person.set_info(String::from("new wildonion")); - }); - - // ------------------------------------------------------------- - // working with person object completely inside the main (caller) thread. - // ------------------------------------------------------------- - // block the main (caller) thread for subscription - // subscribe() method push a new subscriber to the vector only - person.lock().unwrap().subscribe(move |info|{ - println!("[main (caller) thread subscriber]"); - println!("subscribing > value is : {}", info) - }); - - // set_info() change the info field as well as notify subscribers - // with the updated value - // block the main (caller) thread for changing the ingo - person.lock().unwrap().set_info(String::from("28")); - - // block the main (caller) thread to wait for the thread to complete the task - // wait for the thread to finish, this method returns immediately if the - // thread has already finished, so joining on the thread can be important - // if we need a result coming from the thread otherwise the thread will - // be solved in the background like tokio spawn threads. - thread1.join().unwrap(); - - -} \ No newline at end of file diff --git a/src/tests/mod.rs b/src/tests/mod.rs old mode 100755 new mode 100644 index e6c2d97..fab8330 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -1,11 +1,4 @@ - -// https://crates.io/crates/testcontainers -// https://nexte.st/ - pub mod orex; -pub mod tx; -pub mod actor; -pub mod engine; -pub mod task; \ No newline at end of file +pub mod tx; \ No newline at end of file diff --git a/src/tests/task.rs b/src/tests/task.rs deleted file mode 100644 index 608e7d5..0000000 --- a/src/tests/task.rs +++ /dev/null @@ -1,345 +0,0 @@ - - - -use std::{ops::DerefMut, sync::atomic::{AtomicU64, AtomicUsize}}; -use crate::*; -use interfaces::task::TaskExt; -use tracing::span_enabled; -use types::Job; - - - -/* - async tasks or jobs that will be executed in the background inside a lightweight thread of - execution using tokio::spawn() task scheduler and jobq based channels; actor workers will run - these tasks in their own execution context like what i've simulated here. - typically any instance of the Task which is kina actor must contains: - - the future job itself - - the sender to send the result of executed task to the channel for using outside of the thread - - a thread safe (Mutex) worker as the background worker thread to execute the future job in it - - a locker to lock the task when it's executing the task - the task can be awaited to complete its future or aborted during the dropping process of the - task instance, tokio::spawn() is the backbone of each background worker, it gets a future and - move it into a lightweight thread of execution. - lock the instance using the lock field to check that if we're doing some heavy process or not - then in switching the task or doing other heavy process check the lock that if the instance - is already locked or not also we should lock the worker if we want to execute something in the - background worker of the instance thread to tell obj caller that the worker is busy rn. - - more details: - locker, threadpool, worker, future io task, eventloop, - sender, signal condvar, job tree, dep inj future job: - - cron scheduler method - - execute in worker method - - receive from eventloop and exec in threadpool method - - instance locker method - - future object - job tree to push the job into the current tree - sender to broadcast or publish some data to a channel - an eventloop to receive a data from the channel or the queue to execute it in the background worker thread - background worker to run the job - locker to lock the task instance when a task is being executed - worker thread of type joinHandle to execute task or job of type async io or cpu tasks - threadpool to execute each task when receives them from mpsc recevier eventloop - atomic syncing with channels and mutex -*/ -// #[derive(Debug)] // don't implement this cause Pin doesn't implement Debug -pub struct Task, S, O> where // J is a Future object and must be executed with Box::pin(job); - J: std::future::Future + Send + Sync + 'static + Clone, - O: Send + Sync + 'static, - J::Output: Send + Sync + 'static -{ - pub status: TaskStatus, - pub id: String, - pub name: String, // like send_mail task - /* - Pin is a wrapper around some kind of pointer Ptr which makes - that pointer "pin" its pointee value in place, thus preventing - the value referenced by that pointer from being moved or otherwise - invalidated at that place in memory unless it implements Unpin - which means tha type type doesn't require to be pinned into - the ram, self ref types must implement !Unpin or must be pinned - */ - pub dep_injection_fut_obj: std::pin::Pin + Send + Sync + 'static>>, // a future as separate type to move between scopes - pub job: J, - pub job_tree: Vec>, - pub sender: tokio::sync::mpsc::Sender, // use this to send the result of the task into the channel to share between other lightweight thread workers - pub eventloop_queue: std::sync::Arc>>, // use this as eventloop to execute tasks as they're coming from the channel in the background worker thread - pub pool: Vec>, - pub worker: std::sync::Mutex>, // execute the task inside the background worker, this is a thread which is safe to be mutated in other threads - pub lock: std::sync::Mutex<()>, // the task itself is locked and can't be used by other threads - pub state: std::sync::Arc>> // the state of the worker must be safe to be shared between threads -} - -// thread safe eventloop and queue: arc mutex vec T vs arc mutex receiver T -pub struct QueueAndEventLoop{ - pub eventloop: std::sync::Arc>>, - pub queue: std::sync::Arc>>, -} - - -impl + Send + Sync + 'static + Clone, S: Sync + Send + 'static> - Task - where O: std::any::Any + Send + Sync + 'static{ - - pub async fn new(job: J, num_threads: usize, - sender: tokio::sync::mpsc::Sender, - eventloop_queue: std::sync::Arc>>, - fut_output: O) -> Self{ - - // sender and receiver - - let task = Self{ - status: TaskStatus::Initializing, - id: Uuid::new_v4().to_string(), - name: String::from("KJHS923"), - job: job.clone(), - dep_injection_fut_obj: Box::pin(async move{ fut_output }), // pinning the future into the ram with the output of type O - sender, - pool: { - (0..num_threads) - .map(|_| tokio::spawn(job.clone())) - .collect::>>() - }, - eventloop_queue, - job_tree: vec![], - worker: { // this is the worker that can execute the task inside of itself, it's basically a lightweight thread - std::sync::Mutex::new( // lock the worker - tokio::spawn(job) - ) - }, - state: std::sync::Arc::new(tokio::sync::Mutex::new(vec![])), - lock: Default::default(), - }; - - task - - } - - pub async fn send(&self, d: S){ - let sender = self.sender.clone(); - sender.send(d).await; - } - - pub fn is_busy(&mut self) -> bool{ - self.lock.try_lock().is_err() // is_err() can be either true or false, trying to acquire the lock - } - - pub async fn spawn(&self){ - - let job = self.job.clone(); - tokio::spawn(job); - } - - // method to execute the job in the task worker - pub async fn execute(&mut self){ - - // wailt until the lock gets freed cause we're pushing tasks into the tree - // if we slide down into the while loop means the method returns true which - // means the lock couldn't get acquired - while self.is_busy(){ - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; - } - - let t = self.job.clone(); // clone to prevent from moving - let mut get_worker = self.worker.try_lock().unwrap(); // lock the worker - (*get_worker) = tokio::spawn(t); - } - - pub async fn switch_task(&mut self, task: J){ - - // wailt until the lock gets freed cause we're pushing tasks into the tree - // if we slide down into the while loop means the method returns true which - // means the lock couldn't get acquired - while self.is_busy(){ - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; - } - - let mut get_worker = self.worker.lock().unwrap(); - (*get_worker) = tokio::spawn(task); - - } - - pub fn push(mut self, tasks: Vec>) -> Task{ - - // lock the instance to push tasks into the tree - self.lock.lock().unwrap(); - self.job_tree.extend(tasks); - self - } - - // task lifecycles - pub fn halt(&mut self){ - self.status = TaskStatus::Hanlted; - } - - /* ------------------------------------------------------------- - since futures are object safe trait hence they have all traits - features we can pass them to the functions in an static or dynamic - dispatch way using Arc or Box or impl Future or event as the return - type of a closure trait method: - returning reference or box to dyn trait by casting the type who impls the trait into the trait - dep injection object safe trait using & and smart pointers dyn - future as generic in return type of closure or function or pinning its box - std::pin::Pin + Send + Sync + 'static> - Arc R + Send + Sync + 'static> where R: std::future::Future + Send + Sync + 'static - Box R + Send + Sync + 'static> where R: std::future::Future + Send + Sync + 'static - Arc R + Send + Sync + 'static>> where R: std::future::Future + Send + Sync + 'static - F: std::future::Future + Send + Sync + 'static - param: impl std::future::Future + Send + Sync + 'static - - NOTE: mutex requires the type to be Sized and since traits are - not sized at compile time we should annotate them with dyn keyword - and put them behind a pointer with valid lifetime or Box and Arc smart pointers - so for the mutexed_job we must wrap the whole mutex inside an Arc or annotate it - with something like &'valid tokio::sync::Mutex R + Send + Sync + 'static> - the reason is that Mutex is a guard and not an smart pointer which can hanlde - an automatic pointer with lifetime - */ - pub async fn cron_scheduler(&mut self, - boxed_job: Box R + Send + Sync + 'static>, - mutexed_job: std::sync::Arc R + Send + Sync + 'static>>, - arced_job: std::sync::Arc R + Send + Sync + 'static>) - where - R: std::future::Future + Send + Sync + 'static{ - - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); - tokio::spawn(async move{ - loop{ - interval.tick().await; - arced_job().await; - } - }); - } - -} - -// once the task gets dropped drop any incomplete futures inside the worker -impl + Send + Sync + 'static + Clone, S> Drop for Task where - O: Send + Sync + 'static{ - fn drop(&mut self) { // use std::sync::Mutex instead of tokio cause drop() method is not async - if let Ok(job) = self.worker.lock(){ - job.abort(); // abort the current future inside the joinhandle - } - } -} - -#[derive(Clone, Debug, Default)] -pub enum TaskStatus{ - #[default] - Initializing, - Executed, - Hanlted -} - -type FutureTraitObject = std::pin::Pin + Send + Sync + 'static>>; -impl + Send + Sync + 'static + Clone, - S: Send + Sync + 'static, O: Send + Sync + 'static> - Task - where J::Output: Send + Sync + 'static{ - - pub async fn spawn_task< // none assoc method - F: std::future::Future + Send + Sync + 'static, - R: Send + Sync + 'static, - V: FnOnce(O) -> R + Send + Sync + 'static - >( - fut1: F, - pinned_fut: FutureTraitObject, // future as trait object - input: O, - fut: impl std::future::Future + Send + Sync + 'static, - func: V - ){ - tokio::spawn(fut); - tokio::spawn(async move{ func(input) }); - } - -} - -impl + - Send + Sync + 'static + Clone, S> TaskExt for Task - where - J::Output: Send + Sync + 'static, - ::Output: Send + Sync + 'static{ - - type State = String; - type Task = Self; - - async fn execute_this(&self, t: String) { - - let this = self.clone(); - let job = this.job.clone(); - tokio::spawn(job); // job is of type future, we're executing it inside another free thread - - } -} - -/* ----------------------------------------------------- */ -// a thread safe task tree executor -/* ----------------------------------------------------- -| use smart pointers to break the cycle of self ref -| types, in here we're creating a node for the entire -| task tree which contains a reference to the itself -| it gets executed in BFS order. -| -*/ - -pub struct TaskTree< - J: std::future::Future + Send + Sync + 'static + Clone, - S, O: Send + Sync + 'static>{ - // wrap it around mutex to share the task between threads cause we - // want to execute the task in a light thread without blocking so - // we need to move the reference of the task into the thread which - // can be done via mutex since it's an smart pointer for sharing data - // safely between threads - pub task: tokio::sync::Mutex>, - pub weight: std::sync::atomic::AtomicU8, - pub parent: std::sync::Arc>, // the parent itself - pub children: std::sync::Mutex>>> // vector of children -} - -impl + Send + Sync + 'static + Clone + std::fmt::Debug, - S: std::fmt::Debug + Send + Sync + 'static, O: std::fmt::Debug + Send + Sync + 'static> - TaskTree{ - - // execute all tasks in bfs order in none binary tree - pub fn execute_all_tasks(&mut self, root: std::sync::Arc>){ - let mut queue = vec![root]; - while !queue.is_empty(){ - let get_node = queue.pop(); // pop the child out - if get_node.is_some(){ - let node = get_node.unwrap(); - let cloned_node = node.clone(); - - // executing the task in the background light thread in a none - // blocking io manner, we tried to acquire the lock of the value - // in a separate thread to avoid blocking the current thread for doing so - tokio::spawn(async move{ - let mut task = cloned_node.task.lock().await; - println!("[*] executing the task with id: {:?}", task.id); - // this method contains a locking process on the task itself so it's better - // to execute it in a separate light io thread - task.execute().await; - }); - - let get_children = node.children.try_lock().unwrap(); - let children = get_children.to_vec(); - for child in children{ - queue.push(child); - } - } - } - } - - pub fn push_task(&mut self, child: std::sync::Arc>){ - let mut get_children = self.children.try_lock().unwrap(); - (*get_children).push(child); - } - - pub fn pop_task(&mut self) -> Option>>{ - let mut get_children = self.children.try_lock().unwrap(); - let poped_task = (*get_children).pop(); - poped_task - } - -} \ No newline at end of file diff --git a/src/tests/tx.rs b/src/tests/tx.rs index 55f0aec..beba12f 100644 --- a/src/tests/tx.rs +++ b/src/tests/tx.rs @@ -4,8 +4,6 @@ use std::sync::atomic::AtomicU8; use interfaces::payment::PaymentProcess; use models::event::ActionType; use notif::PublishNotifToRmq; -use rand_chacha::ChaCha12Core; -use salvo::rate_limiter::QuotaGetter; use wallexerr::misc::Wallet; use crate::interfaces::tx::TransactionExt; use crate::*; @@ -358,132 +356,66 @@ impl ActixMessageHandler for StatelessTransactionPool{ } -pub async fn any_type_dyn_stat_dispatch(){ - - // --------------------------------------------------------- - // SOLID BASED DESIGN PATTERN FOR PAYMENT - // --------------------------------------------------------- - // there should be always a rely solution on abstractions like - // implementing trait for struct and extending its behaviour - // instead of changing the actual code base and concrete impls. - struct PayPal; // paypal gateway - struct ZarinPal; // zarinpal gateway - - struct PaymentWallet{ - pub owner: String, - pub id: uuid::Uuid, - pub transactions: Vec // transactions that need to be executed - } - - impl PaymentProcess for PaymentWallet{ - type Status = AtomicU8; - - // future traits as objects must be completelly a separate type - // which can be achieved by pinning the boxed version of future obj - type Output = std::pin::Pin>>; - - type Wallet = Self; - - async fn pay(&self, gateway: PayPal) -> Self::Output { - - // either clone or borrow it to avoid from moving out of the self - // cause self is behind reference which is not allowed by Rust - // to move it around or take its ownership. - let txes: &Vec = self.transactions.as_ref(); - - // process all transactions with the paypal gateway - // ... - - todo!() - - } - } - - impl PaymentProcess for PaymentWallet{ - type Status = AtomicU8; +// --------------------------------------------------------- +// SOLID BASED DESIGN PATTERN FOR PAYMENT +// --------------------------------------------------------- +// there should be always a rely solution on abstractions like +// implementing trait for struct and extending its behaviour +// instead of changing the actual code base and concrete impls. +struct PayPal; // paypal gateway +struct ZarinPal; // zarinpal gateway + +struct PaymentWallet{ + pub owner: String, + pub id: uuid::Uuid, + pub transactions: Vec // transactions that need to be executed +} + +impl PaymentProcess for PaymentWallet{ + type Status = AtomicU8; + + // future traits as objects must be completelly a separate type + // which can be achieved by pinning the boxed version of future obj + type Output = std::pin::Pin>>; - // future traits as objects must be completelly a separate type - // which can be achieved by pinning the boxed version of future obj - type Output = std::pin::Pin>>; + type Wallet = Self; + + async fn pay(&self, gateway: PayPal) -> Self::Output { + + // either clone or borrow it to avoid from moving out of the self + // cause self is behind reference which is not allowed by Rust + // to move it around or take its ownership. + let txes: &Vec = self.transactions.as_ref(); - type Wallet = Self; - - async fn pay(&self, gateway: ZarinPal) -> Self::Output { - - // either clone or borrow it to avoid from moving out of the self - // cause self is behind reference which is not allowed by Rust - // to move it around or take its ownership. - let txes: &Vec = self.transactions.as_ref(); - - // process all transactions with the paypal gateway - // ... - - - todo!() - - } - } + // process all transactions with the paypal gateway + // ... + + todo!() - // spawn a tokio thread for every request in a lightweight - async fn getCode(param: O) - -> impl std::future::Future + Send + Sync + 'static{ - // the return type is a type which impls the trait directly through - // static dispatch - async move{ - param - } - } - tokio::spawn(getCode(String::from("wildonion"))); - - /* - Access different types through a single interface to use common method of traits with either default - or trait implementations we can impl the trait broadly for any possible types using impl Trair for T{} - instead of implementing for every single type manually box pin, box dyn trait impl trait for dyn stat - and poly implementations. - is, the type has been erased. As such, a dyn Trait reference contains two pointers. One pointer goes - to the data (e.g., an instance of a struct). Another pointer goes to a map of method call names to - function pointers (known as a virtual method table or vtable). - At run-time, when a method needs to be called on the dyn Trait, the vtable is consulted to get the - function pointer and then that function pointer is called. - See the Reference for more information on trait objects and object safety. - Trade-offs - The above indirection is the additional runtime cost of calling a function on a dyn Trait. Methods - called by dynamic dispatch generally cannot be inlined by the compiler. - However, dyn Trait is likely to produce smaller code than impl Trait / generic parameters as the - method won't be duplicated for each concrete type. - */ - trait AnyTypeCanBe1: Send + Sync + 'static{ - fn getNickName(&self) -> String{ - String::from("") - } } - impl AnyTypeCanBe1 for T{} - struct InGamePlayer{} - let player = InGamePlayer{}; - player.getNickName(); // don't need to impl AnyTypeCanBe1 for InGamePlayer cause it's already implemented for any T +} + +impl PaymentProcess for PaymentWallet{ + type Status = AtomicU8; + // future traits as objects must be completelly a separate type + // which can be achieved by pinning the boxed version of future obj + type Output = std::pin::Pin>>; - // handling pushing into the map using trait polymorphism - trait AnyTypeCanBe{} - impl AnyTypeCanBe for T{} // impl AnyTypeCanBe for every T, reduces the time of implementing trait - let any_map1: std::collections::HashMap>; - let mut any_map1 = std::collections::HashMap::new(); - any_map1.insert(String::from("wildonion"), Box::new(0)); - // or - // any_map1.insert(String::from("wildonion"), Box::new(String::from(""))); - - // to have any types we can dynamically dispatch the Any trait which is an object safe trait - type AnyType = Box; - let any_map: std::collections::HashMap; // the value can be any type impls the Any trait - let boxed_trait_object: Box; // Boxed trait object - let arced_trait_object: std::sync::Arc; // thread safe trait object - - fn getTrait(t: &(dyn AnyTypeCanBe + Send)){ // dynamic dispatch + type Wallet = Self; - } - fn getTrait1(t: impl AnyTypeCanBe + Send){ // static dispatch + async fn pay(&self, gateway: ZarinPal) -> Self::Output { + + // either clone or borrow it to avoid from moving out of the self + // cause self is behind reference which is not allowed by Rust + // to move it around or take its ownership. + let txes: &Vec = self.transactions.as_ref(); + + // process all transactions with the paypal gateway + // ... - } + todo!() + } } \ No newline at end of file diff --git a/src/workers/cqrs/mutators/hoop.rs b/src/workers/cqrs/mutators/hoop.rs index bba07fa..5942db4 100644 --- a/src/workers/cqrs/mutators/hoop.rs +++ b/src/workers/cqrs/mutators/hoop.rs @@ -5,7 +5,6 @@ use salvo::rate_limiter::QuotaGetter; use sea_orm::{ActiveModelTrait, ActiveValue, ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Statement, TryIntoModel, Value}; use serde::{Serialize, Deserialize}; use actix::prelude::*; -use tonic::IntoRequest; use std::sync::Arc; use std::thread; use actix::{Actor, AsyncContext, Context}; diff --git a/src/workers/notif/mod.rs b/src/workers/notif/mod.rs index 6de5d59..a19c556 100644 --- a/src/workers/notif/mod.rs +++ b/src/workers/notif/mod.rs @@ -43,9 +43,6 @@ use workers::scheduler::CronScheduler; use crate::*; use deadpool_lapin::lapin::protocol::channel; use deadpool_redis::redis::AsyncCommands; -use base58::FromBase58; -use constants::CRYPTER_THEMIS_ERROR_CODE; -use constants::FILE_ERROR_CODE; use deadpool_redis::redis::RedisResult; use futures::executor; use models::event::NotifData; @@ -89,6 +86,10 @@ use crate::interfaces::crypter::Crypter; queue per each consumer. offset in kafka is an strategy which determines the way of tracking the sequential order of receiving messages by kafka topics it's like routing key in rmq + in kafka you should create consumer and producer separately but in rmq everything is + started from a channel, we'll create a channel to declare the queue, exchange, consumer + and producer in that channel, channel is a thread that can manage multiple connection + to the broker through a single tcp connection. BROKER TYPES: (preferred stack: RMQ + RPC + WebSocket + ShortPollingJobId) → REDIS PUBSUB => light task queue @@ -1712,7 +1713,9 @@ impl NotifBrokerActor{ // -ˋˏ✄┈┈┈┈ publishing to exchange from this channel, // later consumer bind its queue to this exchange and its // routing key so messages go inside its queue, later they - // can be consumed from the queue by the consumer + // can be consumed from the queue by the consumer. + // in direct exchange ("") it has assumed that the queue + // name is the same as the routing key name. use deadpool_lapin::lapin::options::BasicPublishOptions; let payload = data.as_bytes(); match chan @@ -1743,6 +1746,10 @@ impl NotifBrokerActor{ return; // needs to terminate the caller in let else pattern }; + if confirmation.is_ack(){ + log::info!("publisher confirmation is acked"); + } + }, Err(e) => { use crate::error::{ErrorKind, HoopoeErrorResponse};