diff --git a/core/main/src/broker/thunder/mod.rs b/core/main/src/broker/thunder/mod.rs index 816e08e0c..c280a6468 100644 --- a/core/main/src/broker/thunder/mod.rs +++ b/core/main/src/broker/thunder/mod.rs @@ -15,3 +15,4 @@ // SPDX-License-Identifier: Apache-2.0 // pub mod thunder_plugins_status_mgr; +pub mod user_data_migrator; diff --git a/core/main/src/broker/thunder/user_data_migrator.rs b/core/main/src/broker/thunder/user_data_migrator.rs new file mode 100644 index 000000000..71e9e27be --- /dev/null +++ b/core/main/src/broker/thunder/user_data_migrator.rs @@ -0,0 +1,302 @@ +// Copyright 2023 Comcast Cable Communications Management, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::path::Path; +use std::sync::Arc; + +use ripple_sdk::tokio::net::TcpStream; +use ripple_sdk::{ + log::{debug, error, info}, + tokio::{ + self, + sync::mpsc::{self, Receiver, Sender}, + sync::Mutex, + time::{timeout, Duration}, + }, +}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +use crate::broker::endpoint_broker::{ + BrokerCallback, BrokerOutput, BrokerRequest, EndpointBrokerState, +}; +use futures::stream::SplitSink; +use futures_util::SinkExt; + +use crate::broker::thunder_broker::ThunderBroker; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +const USER_DATA_MIGRATION_CONFIG_FILE_NAME: &str = "user_data_migration_config.json"; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct MigrationConfigEntry { + namespace: String, + key: String, + default: Value, + getter: String, + setter: String, + migrated: bool, +} + +type MigrationMap = HashMap; +// This struct is responsible for migrating user data from the legacy storage to the new storage. +#[derive(Clone, Debug)] +pub struct UserDataMigrator { + migration_config: Arc>, // persistent migration map + config_file_path: String, // path to the migration map file + response_tx: Sender, + response_rx: Arc>>, +} + +impl UserDataMigrator { + pub fn create() -> Option { + let possible_config_file_paths = vec![ + format!("/etc/{}", USER_DATA_MIGRATION_CONFIG_FILE_NAME), + format!( + "/opt/persistent/ripple/{}", + USER_DATA_MIGRATION_CONFIG_FILE_NAME + ), + format!("./{}", USER_DATA_MIGRATION_CONFIG_FILE_NAME), + ]; + + for path in possible_config_file_paths { + if Path::new(&path).exists() { + debug!("Found migration map file: {}", path); + if let Some(migration_map) = Self::load_migration_config(&path) { + let (response_tx, response_rx) = mpsc::channel(16); + return Some(UserDataMigrator { + migration_config: Arc::new(Mutex::new(migration_map)), + config_file_path: path.to_string(), + response_tx, + response_rx: Arc::new(Mutex::new(response_rx)), + }); + } + } + } + debug!("No migration map file found"); + None + } + + async fn get_matching_migration_entry_on_method( + &self, + method: &str, + ) -> Option { + let migration_map = self.migration_config.lock().await; + migration_map + .values() + .find(|entry| entry.getter == method || entry.setter == method) + .cloned() + } + + // function to intercept and handle broker request. Perform migration if needed + pub async fn intercept_broker_request( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &mut BrokerRequest, + ) -> (bool, Option) { + let method = request.rpc.method.clone(); + if let Some(config_entry) = self.get_matching_migration_entry_on_method(&method).await { + // migration entry found for either getter or setter method + // for setter case, irrespective of the migration status, update the new value in the new storage and sync + // with the legacy storage + + if config_entry.setter == method { + // perform the setter update and sync up logic asynchronously + // update legacy storage with the new value as fire and forget operation + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + // TBD: apply transform rule if any and get the params. + self.write_to_legacy_storage( + &config_entry.namespace, + &config_entry.key, + &broker, + ws_tx.clone(), + &request, + &config_entry.default, + ) + .await; + // returning false to continue with the original setter request + return (false, None); + } else { + // perform the getter migration logic asynchronously + if !config_entry.migrated { + let migrated_value = self + .perform_getter_migration(&broker, &request, &config_entry) + .await; + return (false, Some(migrated_value)); + } else { + // the migration is already done, continue with the original request + return (false, None); + } + } + } + + // continue with the original request + (false, None) + } + + async fn write_to_legacy_storage( + &self, + namespace: &str, + key: &str, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + _request: &BrokerRequest, + value: &Value, + ) { + let request_id = EndpointBrokerState::get_next_id(); + let call_sign = "org.rdk.PersistentStore.1.".to_owned(); + let thunder_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": format!("{}setValue", call_sign), + "params": json!({ + "namespace": namespace, + "key": key, + "value": value.to_string(), + "scope": "device", + }) + }) + .to_string(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // send the request to the legacy storage + if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { + error!("Failed to send thunder request: {:?}", e); + return; + } + + // Spawn a task to wait for the response + let response_rx = self.response_rx.clone(); + let broker_clone = broker.clone(); + tokio::spawn(async move { + if let Err(e) = + UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await + { + error!("Error waiting for response: {:?}", e); + } + }); + } + + async fn send_thunder_request( + &self, + ws_tx: &Arc, Message>>>, + request: &str, + ) -> Result<(), Box> { + let mut ws_tx = ws_tx.lock().await; + ws_tx.feed(Message::Text(request.to_string())).await?; + ws_tx.flush().await?; + Ok(()) + } + + async fn wait_for_response( + response_rx: Arc>>, + broker: ThunderBroker, + request_id: u64, + ) -> Result<(), Box> { + let mut response_rx = response_rx.lock().await; + match timeout(Duration::from_secs(30), response_rx.recv()).await { + Ok(Some(response)) => { + info!( + "Received response at custom write_to_legacy_storage: {:?}", + response + ); + } + Ok(None) => { + error!("Failed to receive response"); + } + Err(_) => { + error!("Timeout waiting for response"); + } + } + broker.unregister_custom_callback(request_id).await; + Ok(()) + } + // function to perform the getter migration logic asynchronously + async fn perform_getter_migration( + &self, + broker: &ThunderBroker, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) -> Value { + let mut new_storage_value = Value::Null; + // Get the value from the new storage + //new_storage_value = self.get_new_storage_value(&broker, &request).await; + new_storage_value + } + + // function to set the migration flag to true and update the migration map in the config file + async fn set_migration_status(&self, namespace: &str, key: &str) { + let mut config_entry_changed = false; + { + let mut migration_map = self.migration_config.lock().await; + if let Some(mut config_entry) = migration_map + .values_mut() + .find(|entry| entry.namespace == namespace && entry.key == key) + { + if !config_entry.migrated { + config_entry.migrated = true; + config_entry_changed = true; + } + } + } + + // save the migration map to the config file after releasing the lock in case config_entry_changed + if config_entry_changed { + if let Err(e) = self.update_migration_config_file().await { + error!("Failed to update migration config file: {}", e); + } + } + } + // load the migration map from the file + pub fn load_migration_config(config_file_path: &str) -> Option { + let file = File::open(config_file_path).ok()?; + let reader = std::io::BufReader::new(file); + Some(serde_json::from_reader(reader).unwrap_or_else(|_| HashMap::new())) + } + + // function to update the migration status in the config file + async fn update_migration_config_file(&self) -> Result<(), String> { + if Path::new(&self.config_file_path).exists() { + let migration_map = self.migration_config.lock().await; + let file = OpenOptions::new() + .write(true) + .truncate(true) + .open(&self.config_file_path) + .map_err(|e| format!("Failed to open migration config file: {}", e))?; + serde_json::to_writer_pretty(file, &*migration_map) + .map_err(|e| format!("Failed to write to migration config file: {}", e))?; + Ok(()) + } else { + Err(format!( + "Migration config file not found at path {}", + self.config_file_path + )) + } + } +} diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 4872ad063..da33fd3e5 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -20,6 +20,7 @@ use super::{ BrokerSender, BrokerSubMap, EndpointBroker, }, thunder::thunder_plugins_status_mgr::StatusManager, + thunder::user_data_migrator::UserDataMigrator, }; use crate::broker::broker_utils::BrokerUtils; use futures_util::{SinkExt, StreamExt}; @@ -27,11 +28,13 @@ use futures_util::{SinkExt, StreamExt}; use ripple_sdk::{ api::gateway::rpc_gateway_api::JsonRpcApiResponse, log::{debug, error, info}, + tokio::sync::Mutex, tokio::{self, sync::mpsc}, utils::error::RippleError, }; use serde_json::json; use std::{ + collections::HashMap, sync::{Arc, RwLock}, vec, }; @@ -42,43 +45,90 @@ pub struct ThunderBroker { subscription_map: Arc>, cleaner: BrokerCleaner, status_manager: StatusManager, + default_callback: BrokerCallback, + data_migrator: Option, + custom_callback_list: Arc>>, } impl ThunderBroker { + fn new( + sender: BrokerSender, + subscription_map: Arc>, + cleaner: BrokerCleaner, + default_callback: BrokerCallback, + ) -> Self { + Self { + sender, + subscription_map, + cleaner, + status_manager: StatusManager::new(), + default_callback, + data_migrator: None, + custom_callback_list: Arc::new(Mutex::new(HashMap::new())), + } + } + + fn with_data_migtator(mut self) -> Self { + self.data_migrator = UserDataMigrator::create(); + self + } + + fn get_default_callback(&self) -> BrokerCallback { + self.default_callback.clone() + } + + pub async fn register_custom_callback(&self, id: u64, callback: BrokerCallback) { + let mut custom_callback_list = self.custom_callback_list.lock().await; + custom_callback_list.insert(id, callback); + } + + pub async fn unregister_custom_callback(&self, id: u64) { + let mut custom_callback_list = self.custom_callback_list.lock().await; + custom_callback_list.remove(&id); + } + + async fn get_broker_callback(&self, id: Option) -> BrokerCallback { + if id.is_none() { + return self.default_callback.clone(); + } + let custom_callback_list = self.custom_callback_list.lock().await; + if let Some(callback) = custom_callback_list.get(&id.unwrap()) { + return callback.clone(); + } + self.default_callback.clone() + } + fn start(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { let endpoint = request.endpoint.clone(); let (tx, mut tr) = mpsc::channel(10); let (c_tx, mut c_tr) = mpsc::channel(2); let sender = BrokerSender { sender: tx }; let subscription_map = Arc::new(RwLock::new(request.sub_map.clone())); - let broker = Self { - sender, - subscription_map, - cleaner: BrokerCleaner { - cleaner: Some(c_tx.clone()), - }, - status_manager: StatusManager::new(), + let cleaner = BrokerCleaner { + cleaner: Some(c_tx.clone()), }; + let broker = Self::new(sender, subscription_map, cleaner, callback).with_data_migtator(); let broker_c = broker.clone(); let broker_for_cleanup = broker.clone(); - let callback_for_sender = callback.clone(); let broker_for_reconnect = broker.clone(); tokio::spawn(async move { - let (mut ws_tx, mut ws_rx) = - BrokerUtils::get_ws_broker(&endpoint.get_url(), None).await; + let (ws_tx, mut ws_rx) = BrokerUtils::get_ws_broker(&endpoint.get_url(), None).await; + let ws_tx_wrap = Arc::new(Mutex::new(ws_tx)); // send the first request to the broker. This is the controller statechange subscription request let status_request = broker_c .status_manager .generate_state_change_subscribe_request(); + { + let mut ws_tx = ws_tx_wrap.lock().await; - let _feed = ws_tx - .feed(tokio_tungstenite::tungstenite::Message::Text( - status_request.to_string(), - )) - .await; - let _flush = ws_tx.flush().await; - + let _feed = ws_tx + .feed(tokio_tungstenite::tungstenite::Message::Text( + status_request.to_string(), + )) + .await; + let _flush = ws_tx.flush().await; + } tokio::pin! { let read = ws_rx.next(); } @@ -88,12 +138,13 @@ impl ThunderBroker { match value { Ok(v) => { if let tokio_tungstenite::tungstenite::Message::Text(t) = v { - if broker_c.status_manager.is_controller_response(broker_c.get_sender(), callback.clone(), t.as_bytes()).await { - broker_c.status_manager.handle_controller_response(broker_c.get_sender(), callback.clone(), t.as_bytes()).await; + if broker_c.status_manager.is_controller_response(broker_c.get_sender(), broker_c.get_default_callback(), t.as_bytes()).await { + broker_c.status_manager.handle_controller_response(broker_c.get_sender(), broker_c.get_default_callback(), t.as_bytes()).await; } else { // send the incoming text without context back to the sender - Self::handle_jsonrpc_response(t.as_bytes(),callback.clone()) + let id = Self::get_id_from_result(t.as_bytes()); + Self::handle_jsonrpc_response(t.as_bytes(),broker_c.get_broker_callback(id).await) } } }, @@ -105,14 +156,44 @@ impl ThunderBroker { } }, - Some(request) = tr.recv() => { + Some(mut request) = tr.recv() => { debug!("Got request from receiver for broker {:?}", request); - match broker_c.prepare_request(&request) { - Ok(updated_request) => { - debug!("Sending request to broker {:?}", updated_request); - for r in updated_request { - let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; - let _flush = ws_tx.flush().await; + + match broker_c.check_and_generate_plugin_activation_request(&request) { + Ok(requests) => { + if !requests.is_empty() { + let mut ws_tx = ws_tx_wrap.lock().await; + for r in requests { + let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; + let _flush = ws_tx.flush().await; + } + } + else { + // empty request means plugin is activated and ready to process the request + // Intercept the request for data migration + let mut request_consumed = false; + let mut response = None; + if let Some(user_data_migrator) = broker_c.data_migrator.clone() { + (request_consumed, response) = user_data_migrator.intercept_broker_request(&broker_c, ws_tx_wrap.clone(), &mut request).await; + } + + // If the request is not consumed by the data migrator, continue with the request + if !request_consumed { + match broker_c.prepare_request(&request) { + Ok(updated_request) => { + debug!("Sending request to broker {:?}", updated_request); + let binding = ws_tx_wrap.clone(); + let mut ws_tx = binding.lock().await; + for r in updated_request { + let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; + let _flush = ws_tx.flush().await; + } + } + Err(e) => { + broker_c.get_default_callback().send_error(request,e).await + } + } + } } } Err(e) => { @@ -121,10 +202,11 @@ impl ThunderBroker { info!("Thunder Service not ready, request is now in pending list {:?}", request); }, _ => - callback_for_sender.send_error(request,e).await + broker_c.get_default_callback().send_error(request,e).await + } } } - } + }, Some(cleanup_request) = c_tr.recv() => { let value = { @@ -168,6 +250,13 @@ impl ThunderBroker { new_response } + fn get_id_from_result(result: &[u8]) -> Option { + if let Ok(data) = serde_json::from_slice::(result) { + return data.id; + } + None + } + fn get_callsign_and_method_from_alias(alias: &str) -> (String, Option<&str>) { let mut collection: Vec<&str> = alias.split('.').collect(); let method = collection.pop(); @@ -207,29 +296,14 @@ impl ThunderBroker { } response } -} - -impl EndpointBroker for ThunderBroker { - fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { - Self::start(request, callback) - } - - fn get_sender(&self) -> BrokerSender { - self.sender.clone() - } - - fn get_cleaner(&self) -> BrokerCleaner { - self.cleaner.clone() - } - fn prepare_request( + fn check_and_generate_plugin_activation_request( &self, rpc_request: &super::endpoint_broker::BrokerRequest, ) -> Result, RippleError> { let mut requests = Vec::new(); - let rpc = rpc_request.clone().rpc; - let id = rpc.ctx.call_id; let (callsign, method) = Self::get_callsign_and_method_from_alias(&rpc_request.rule.alias); + if method.is_none() { return Err(RippleError::InvalidInput); } @@ -269,6 +343,31 @@ impl EndpointBroker for ThunderBroker { requests.push(request.to_string()); return Ok(requests); } + Ok(requests) + } +} + +impl EndpointBroker for ThunderBroker { + fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { + Self::start(request, callback) + } + + fn get_sender(&self) -> BrokerSender { + self.sender.clone() + } + + fn get_cleaner(&self) -> BrokerCleaner { + self.cleaner.clone() + } + + fn prepare_request( + &self, + rpc_request: &super::endpoint_broker::BrokerRequest, + ) -> Result, RippleError> { + let rpc = rpc_request.clone().rpc; + let id = rpc.ctx.call_id; + let (callsign, method) = Self::get_callsign_and_method_from_alias(&rpc_request.rule.alias); + let mut requests = Vec::new(); let method = method.unwrap(); // Below chunk of code is basically for subscription where thunder needs some special care based on