Skip to content

Commit

Permalink
chore:
Browse files Browse the repository at this point in the history
  • Loading branch information
wildonion committed Sep 27, 2024
1 parent d34ecb6 commit 3ac947c
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 24 deletions.
Binary file modified .DS_Store
Binary file not shown.
67 changes: 61 additions & 6 deletions .github/workflows/ui.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,76 @@ on:
branches: [main]

jobs:
build:
setup:
runs-on: ubuntu-latest
steps:
- name: Deploy & Build
# Step 1: Install NVM and Node.js on the remote server
- name: Install NVM and Node.js
uses: appleboy/ssh-action@master
with:
host: ${{ secrets.SERVER_HOST }}
username: ${{ secrets.SERVER_USER }}
password: ${{ secrets.SERVER_PASSWORD }}
port: 22
script: |
# Install NVM
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.1/install.sh | bash
export NVM_DIR="$HOME/.nvm"
[ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" # Load nvm
[ -s "$NVM_DIR/bash_completion" ] && \. "$NVM_DIR/bash_completion" # Load nvm bash_completion
# Install Node.js using NVM
nvm install --lts
nvm use --lts
node -v # Check the installed version of Node.js
npm -v # Check the installed version of npm
# Step 2: Clone the repository via SSH and navigate to the correct directory
- name: Deploy & Clone Repository
uses: appleboy/ssh-action@master
with:
host: ${{ secrets.SERVER_HOST }}
username: ${{ secrets.SERVER_USER }}
password: ${{ secrets.SERVER_PASSWORD }}
port: 22
script: | # cd SomeUiRepo directly cause we're already in root
if [ ! -d "/root/ui" ]; then
script: |
if [ ! -d "/root/GOPAT" ]; then
git clone https://wildonion:${{ secrets.ACCESS_TOKEN }}@github.com/wildonion/SomeUiRepo.git
fi
cd SomeUiRepo
cd GOPAT
git stash
git pull origin main
npm install
pwd
# Step 3: Run npm install to set up the dependencies
- name: Install NPM Dependencies
uses: appleboy/ssh-action@master
with:
host: ${{ secrets.SERVER_HOST }}
username: ${{ secrets.SERVER_USER }}
password: ${{ secrets.SERVER_PASSWORD }}
port: 22
script: |
export NVM_DIR="$HOME/.nvm"
[ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" # Load nvm
nvm use --lts
cd /root/GOPAT
npm install # Install npm dependencies
build:
runs-on: ubuntu-latest
needs: setup
steps:
# Step 4: Run npm build after dependencies are installed
- name: Build Project
uses: appleboy/ssh-action@master
with:
host: ${{ secrets.SERVER_HOST }}
username: ${{ secrets.SERVER_USER }}
password: ${{ secrets.SERVER_PASSWORD }}
port: 22
script: |
export NVM_DIR="$HOME/.nvm"
[ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" # Load nvm
nvm use --lts
cd /root/GOPAT
npm run build
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ i'm hoopoe, a realtime social event platform allows your hoop get heard!
```bash
# -----------------------
# ---- read/write access
sudo chmod -R 777 .
sudo chmod -R 777 . && sudo chmod +x /root/
```

#### step0) install necessary packages on Linux:
Expand Down
2 changes: 1 addition & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl HoopoeWsServerActor{
// thread of tokio runtime.
scheduler::runInterval(move || async move{
log::info!("websocket session for user#{} is alive", user_id);
}, 10).await;
}, 10, 0).await;

/* -------------------------
can't move pointers which is not live long enough into
Expand Down
3 changes: 3 additions & 0 deletions src/tests/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub static WALLET: Lazy<std::sync::Arc<tokio::sync::Mutex<wallexerr::misc::Walle
safe tx execution without double spending issue using tokio spawn select mutex and channels
finally tx pool service publishes the result of executed each tx into the exchange
create tx object with unique id -> publish to rmq
receive tx in txpool -> commit tx -> execute tx -> update records -> record in treasury
all or none to avoid double spending which is sending same amount for two destinations but charge only once
once a tx object is made publish it to the rmq exchange so consumer
can consume it for committing and executing all tx objects finally
Expand Down
60 changes: 46 additions & 14 deletions src/workers/notif/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@
through producing messages to the broker. we can send either producing or consuming
message to this actor to start producing or consuming in the background.
how capnpc works?
in RPC every method call is a round trip in networking, canpnp pack all calls together
in only one round trip, it uses the promise pipelining feature which every call is a
future object which can be solved by awaiting in which it returns all the results from
all the calls sent to the server it's like `foo().bar().end()` takes only 1 round trip
which by awaiting on them it returns all the result from the server, it can call methods
without waiting just take a round trip. call results are returned to the client before
the request even arrives at the server, this is the feature of promise it's a place
holder for the result of each call and once we await on them all the results will be
arrived in one round trip.
************************************************************************************
it's notable that for realtime push notif streaming we MUST start consuming from
the specified broker passed in to the message structure when talking with actor, in
Expand Down Expand Up @@ -112,7 +123,7 @@ use crate::interfaces::crypter::Crypter;
----------> partition-key1 queue(m1, m2, m3, ...) - all messages with key1
--------- ------------ |
|consumer1| <-----consume-----> |Kafka Broker| <-----topic-----> partition-key3 queue(m1, m2, m3, ...) - all messages with key3
--------- ------------ |
--------- ------------ | |
|_______partition1&2______________| | |----------> partition-key2 queue(m1, m2, m3, ...) - all messages with key2
--------- | | |
|consumer2| | | ----------> partition-key4 queue(m1, m2, m3, ...) - all messages with key4
Expand Down Expand Up @@ -153,11 +164,19 @@ use crate::interfaces::crypter::Crypter;
========================================================================================
*/

#[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
#[rtype(result = "()")]
pub struct SendRpcMessage{ // used to send rpc request through rmq queue, good for actor communication directly through rpc backed by rmq
pub reqQueue: String,
pub repQueue: String,
pub payload: String, // json string maybe! we'll convert it to u8 bytes eventually
}

#[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
#[rtype(result = "()")]
pub struct PublishNotifToKafka{
pub topic: String,
pub brokers: String,
pub brokers: String, // kafka servers separated by comma
pub partitions: u64,
pub headers: Vec<KafkaHeader>,
pub local_spawn: bool,
Expand All @@ -176,7 +195,7 @@ pub struct KafkaHeader{
pub struct ConsumeNotifFromKafka{ // we must build a unique consumer per each consuming process
pub topics: Vec<String>,
pub consumerGroupId: String,
pub brokers: String,
pub brokers: String, // kafka servers separated by comma
pub redis_cache_exp: u64,
pub decryptionConfig: Option<CryptoConfig>
}
Expand Down Expand Up @@ -295,7 +314,7 @@ pub struct ConsumeNotifFromRmq{ // we'll create a channel then start consuming b

#[derive(Clone)]
pub struct NotifBrokerActor{
pub notif_broker_sender: tokio::sync::mpsc::Sender<String>, // use to send notif data to mpsc channel for ws
pub notif_broker_ws_sender: tokio::sync::mpsc::Sender<String>, // use to send notif data to mpsc channel for ws
pub app_storage: std::option::Option<Arc<Storage>>, // REQUIRED: communicating with third party storage
pub notif_mutator_actor: Addr<NotifMutatorActor>, // REQUIRED: communicating with mutator actor to write into redis and db
pub zerlog_producer_actor: Addr<ZerLogProducerActor> // REQUIRED: send any error log to the zerlog rmq queue
Expand All @@ -307,13 +326,16 @@ impl Actor for NotifBrokerActor{

fn started(&mut self, ctx: &mut Self::Context) {

log::info!("🎬 NotifBrokerActor has started");
log::info!("🎬 NotifBrokerActor has started");

ctx.run_interval(PING_INTERVAL, |actor, ctx|{

let this = actor.clone();
let addr = ctx.address();
let actorState = ctx.state();

log::info!("NotifBrokerActor state is: {:#?}", actorState);

tokio::spawn(async move{

// check something constantly, schedule to be executed
Expand All @@ -325,6 +347,15 @@ impl Actor for NotifBrokerActor{
});

}
fn stopped(&mut self, ctx: &mut Self::Context) {

// stop internal states
// since db is only Arced we can't mutate it we should wrap it around Mutex
// cause Arc requires data to be mutexed
// ...

}

}

impl NotifBrokerActor{
Expand All @@ -341,7 +372,7 @@ impl NotifBrokerActor{
let zerlog_producer_actor = self.clone().zerlog_producer_actor;
// will be used to send received notif data from the broker to ws mpsc channel,
// later we can receive the notif in ws server setup and send it to the owner
let notif_data_sender = self.notif_broker_sender.clone();
let notif_data_sender = self.notif_broker_ws_sender.clone();

match redis_pool.get().await{
Ok(mut redis_conn) => {
Expand Down Expand Up @@ -372,8 +403,9 @@ impl NotifBrokerActor{
} else{
dataString
};

// we should keep sending until a consumer receive the data!

// we should keep sending until a consumer receives the data!
// do this in the background thread constantly
tokio::spawn(async move{
let mut int = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop{
Expand Down Expand Up @@ -419,7 +451,7 @@ impl NotifBrokerActor{
let zerlog_producer_actor = self.clone().zerlog_producer_actor;
// will be used to send received notif data from the broker to ws mpsc channel,
// later we can receive the notif in ws server setup and send it to the owner
let notif_data_sender = self.notif_broker_sender.clone();
let notif_data_sender = self.notif_broker_ws_sender.clone();

// first thing first check the redis is up!
match redis_pool.get().await{
Expand Down Expand Up @@ -731,7 +763,7 @@ impl NotifBrokerActor{
let zerlog_producer_actor = self.clone().zerlog_producer_actor;
// will be used to send received notif data from the broker to ws mpsc channel,
// later we can receive the notif in ws server setup and send it to the owner
let notif_data_sender = self.notif_broker_sender.clone();
let notif_data_sender = self.notif_broker_ws_sender.clone();

// since the redis is important, so we can't move forward without it
match redis_pool.get().await{
Expand Down Expand Up @@ -920,7 +952,7 @@ impl NotifBrokerActor{
let zerlog_producer_actor = self.clone().zerlog_producer_actor;
// will be used to send received notif data from the broker to ws mpsc channel,
// later we can receive the notif in ws server setup and send it to the owner
let notif_data_sender = self.notif_broker_sender.clone();
let notif_data_sender = self.notif_broker_ws_sender.clone();

// since the redis is important, so we can't move forward without it
match redis_pool.get().await{
Expand Down Expand Up @@ -1272,7 +1304,7 @@ impl NotifBrokerActor{
let redis_pool = storage.unwrap().get_redis_pool().await.unwrap();
let notif_mutator_actor = self.notif_mutator_actor.clone();
let zerlog_producer_actor = self.clone().zerlog_producer_actor;
let notif_data_sender = self.notif_broker_sender.clone();
let notif_data_sender = self.notif_broker_ws_sender.clone();

match rmq_pool.get().await{
Ok(conn) => {
Expand Down Expand Up @@ -1902,8 +1934,8 @@ impl NotifBrokerActor{
pub fn new(app_storage: std::option::Option<Arc<Storage>>,
notif_mutator_actor: Addr<NotifMutatorActor>,
zerlog_producer_actor: Addr<ZerLogProducerActor>,
notif_broker_sender: tokio::sync::mpsc::Sender<String>) -> Self{
Self { app_storage, notif_mutator_actor, zerlog_producer_actor, notif_broker_sender }
notif_broker_ws_sender: tokio::sync::mpsc::Sender<String>) -> Self{
Self { app_storage, notif_mutator_actor, zerlog_producer_actor, notif_broker_ws_sender }
}

}
Expand Down
13 changes: 11 additions & 2 deletions src/workers/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,23 @@ pub async fn runInterval1<M, R, O>(task: std::sync::Arc<dyn Fn() -> R + Send + S
}

// task is a closure that returns a future object
pub async fn runInterval<M, R, O>(task: M, period: u64)
pub async fn runInterval<M, R, O>(task: M, period: u64, mut retries: u8)
where M: Fn() -> R + Send + Sync + 'static,
R: std::future::Future<Output = O> + Send + Sync + 'static,
{
tokio::spawn(async move{
let mut int = tokio::time::interval(tokio::time::Duration::from_secs(period));
int.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut attempts = 0;

loop{
if retries == 0{
continue;
}
if attempts >= retries{
break;
}
int.tick().await;
task().await;
}
Expand Down Expand Up @@ -126,6 +134,7 @@ impl CronScheduler{
let cloned_task = task.clone();
loop{
interval.tick().await;
println!("ticking...");
tokio::spawn(cloned_task()); // the closure however returns a future
}
});
Expand All @@ -143,7 +152,7 @@ impl CronScheduler{
tokio::spawn(async move{
runInterval(|| async move{
println!("i'm executing intervally in the background thread ...");
}, period)
}, period, 10)
.await;
});

Expand Down

0 comments on commit 3ac947c

Please sign in to comment.