diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 9431d6ef6..76ca7191b 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -688,7 +688,7 @@ impl BrokerOutputForwarder { if let Some(filter) = broker_request.rule.transform.get_transform_data( super::rules_engine::RuleTransformType::Response, ) { - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); } else if response.result.is_none() && response.error.is_none() { response.result = Some(Value::Null); } @@ -792,9 +792,9 @@ async fn forward_extn_event( } } -fn apply_response( +pub fn apply_response( result_response_filter: String, - rpc_request: &RpcRequest, + method: &str, response: &mut JsonRpcApiResponse, ) { match serde_json::to_value(response.clone()) { @@ -802,7 +802,7 @@ fn apply_response( match jq_compile( input, &result_response_filter, - format!("{}_response", rpc_request.ctx.method), + format!("{}_response", method), ) { Ok(jq_out) => { trace!( @@ -862,7 +862,6 @@ fn apply_filter(broker_request: &BrokerRequest, result: &Value, rpc_request: &Rp &filter, format!("{}_event filter", rpc_request.ctx.method), ) { - println!("apply_filter: {:?}", r); if r.is_null() { return false; } else { @@ -1005,7 +1004,7 @@ mod tests { let filter = "if .result and .result.success then (.result.stbVersion | split(\"_\") [0]) elif .error then if .error.code == -32601 then {error: { code: -1, message: \"Unknown method.\" }} else \"Error occurred with a different code\" end else \"No result or recognizable error\" end".to_string(); //let mut response = JsonRpcApiResponse::mock(); //response.error = Some(error); - apply_response(filter, &rpc_request, &mut output.data); + apply_response(filter, &rpc_request.ctx.method, &mut output.data); //let msg = output.data.error.unwrap().get("message").unwrap().clone(); assert_eq!( output.data.error.unwrap().get("message").unwrap().clone(), @@ -1020,7 +1019,7 @@ mod tests { let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then null else .error end".to_string(); //let mut response = JsonRpcApiResponse::mock(); //response.error = Some(error); - apply_response(filter, &rpc_request, &mut output.data); + apply_response(filter, &rpc_request.ctx.method, &mut output.data); assert_eq!(output.data.error, None); assert_eq!(output.data.result.unwrap(), serde_json::Value::Null); @@ -1032,7 +1031,7 @@ mod tests { let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then null else { error: .error } end".to_string(); //let mut response = JsonRpcApiResponse::mock(); //response.error = Some(error.clone()); - apply_response(filter, &rpc_request, &mut output.data); + apply_response(filter, &rpc_request.ctx.method, &mut output.data); assert_eq!(output.data.error, Some(error)); } @@ -1059,7 +1058,7 @@ mod tests { let mut data = JsonRpcApiResponse::mock(); data.result = Some(result); let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - apply_response(filter, &rpc_request, &mut output.data); + apply_response(filter, &rpc_request.ctx.method, &mut output.data); assert_eq!(output.data.result.unwrap(), "SCXI11BEI".to_string()); // device.videoResolution @@ -1069,7 +1068,7 @@ mod tests { response.result = Some(result); //let data = JsonRpcApiResponse::mock(); //let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), json!([1920, 1080])); // device.audio @@ -1079,7 +1078,7 @@ mod tests { let filter = "if .result and .result.success then .result | {\"stereo\": (.supportedAudioFormat | index(\"PCM\") > 0),\"dolbyDigital5.1\": (.supportedAudioFormat | index(\"DOLBY AC3\") > 0),\"dolbyDigital5.1plus\": (.supportedAudioFormat | index(\"DOLBY EAC3\") > 0),\"dolbyAtmos\": (.supportedAudioFormat | index(\"DOLBY EAC3 ATMOS\") > 0)} elif .error then if .error.code == -32601 then \"Unknown method.\" else \"Error occurred with a different code\" end else \"No result or recognizable error\" end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!( response.result.unwrap(), json!({"dolbyAtmos": true, "dolbyDigital5.1": true, "dolbyDigital5.1plus": false, "stereo": true}) @@ -1094,7 +1093,7 @@ mod tests { let filter = "if .result and .result.success then (.result.interfaces | .[] | select(.connected) | {\"state\": \"connected\",\"type\": .interface | ascii_downcase }) elif .error then if .error.code == -32601 then \"Unknown method.\" else \"Error occurred with a different code\" end else \"No result or recognizable error\" end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!( response.result.unwrap(), json!({"state":"connected", "type":"wifi"}) @@ -1107,7 +1106,7 @@ mod tests { let filter = "if .result.success then (if .result.friendlyName | length == 0 then \"Living Room\" else .result.friendlyName end) else \"Living Room\" end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), json!("my_device")); // localization.language @@ -1118,7 +1117,7 @@ mod tests { .to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), json!("FR")); @@ -1129,7 +1128,7 @@ mod tests { let filter = "if .result.success then (if .result.friendlyName | length == 0 then \"Living Room\" else .result.friendlyName end) else \"Living Room\" end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), json!("my_device")); @@ -1140,7 +1139,7 @@ mod tests { let filter = "if .result.success then null else { code: -32100, message: \"couldn't set skip restriction\" } end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), serde_json::Value::Null); @@ -1151,7 +1150,7 @@ mod tests { let filter = "if .result.success then .result.value elif .error.code==22 or .error.code==43 then \"null\" else .error end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), "some_value"); // localization.countryCode @@ -1161,7 +1160,7 @@ mod tests { let filter = "if .result.success then if .result.territory == \"ITA\" then \"IT\" elif .result.territory == \"GBR\" then \"GB\" elif .result.territory == \"IRL\" then \"IE\" elif .result.territory == \"DEU\" then \"DE\" elif .result.territory == \"AUS\" then \"AU\" else \"GB\" end end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), "GB"); } } diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index c2de717b0..ca82c2f21 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -24,7 +24,7 @@ use ripple_sdk::{ serde_json::Value, utils::error::RippleError, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::{fs, path::Path}; @@ -83,21 +83,27 @@ pub enum RuleEndpointProtocol { Thunder, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Rule { pub alias: String, // Not every rule needs transform #[serde(default)] pub transform: RuleTransform, + #[serde(skip_serializing_if = "Option::is_none")] pub filter: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub endpoint: Option, } -#[derive(Debug, Clone, Deserialize, Default)] +#[derive(Debug, Clone, Deserialize, Default, Serialize)] pub struct RuleTransform { + #[serde(skip_serializing_if = "Option::is_none")] pub request: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub response: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub event: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub event_decorator_method: Option, } @@ -202,6 +208,12 @@ impl RuleEngine { } None } + pub fn get_rule_by_method(&self, method: &str) -> Option { + if let Some(rule) = self.rules.rules.get(&method.to_lowercase()).cloned() { + return Some(rule); + } + None + } } pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result { diff --git a/core/main/src/broker/thunder/mod.rs b/core/main/src/broker/thunder/mod.rs index 816e08e0c..c280a6468 100644 --- a/core/main/src/broker/thunder/mod.rs +++ b/core/main/src/broker/thunder/mod.rs @@ -15,3 +15,4 @@ // SPDX-License-Identifier: Apache-2.0 // pub mod thunder_plugins_status_mgr; +pub mod user_data_migrator; diff --git a/core/main/src/broker/thunder/user_data_migrator.rs b/core/main/src/broker/thunder/user_data_migrator.rs new file mode 100644 index 000000000..89804d813 --- /dev/null +++ b/core/main/src/broker/thunder/user_data_migrator.rs @@ -0,0 +1,918 @@ +// Copyright 2023 Comcast Cable Communications Management, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::{ + collections::HashMap, + fmt, + fs::{File, OpenOptions}, + path::Path, + sync::Arc, +}; + +use ripple_sdk::{ + api::{device::device_peristence::StorageData, gateway::rpc_gateway_api::JsonRpcApiResponse}, + log::{debug, error, info}, + tokio::{ + self, + net::TcpStream, + sync::{ + mpsc::{self, Receiver, Sender}, + Mutex, + }, + time::{timeout, Duration}, + }, + utils::error::RippleError, +}; + +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +use crate::broker::endpoint_broker::{ + self, BrokerCallback, BrokerOutput, BrokerRequest, EndpointBrokerState, +}; +use crate::broker::rules_engine::{Rule, RuleTransformType}; + +use futures::stream::SplitSink; +use futures_util::SinkExt; + +use crate::broker::thunder_broker::ThunderBroker; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; + +// TBD get the storage dir from manifest or other Ripple config file +const RIPPLE_STORAGE_DIR: &str = "/opt/persistent/ripple"; +const USER_DATA_MIGRATION_CONFIG_FILE_NAME: &str = "user_data_migration_config.json"; + +#[derive(Debug)] +enum UserDataMigratorError { + ThunderRequestError(String), + ResponseError(String), + SetterRuleNotAvailable, + RequestTransformError(String), + TimeoutError, +} + +impl fmt::Display for UserDataMigratorError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + UserDataMigratorError::ThunderRequestError(msg) => { + write!(f, "Thunder request error: {}", msg) + } + UserDataMigratorError::ResponseError(msg) => write!(f, "Response error: {}", msg), + UserDataMigratorError::TimeoutError => write!(f, "Timeout error"), + UserDataMigratorError::SetterRuleNotAvailable => { + write!(f, "Setter rule is not available") + } + UserDataMigratorError::RequestTransformError(msg) => { + write!(f, "Request transform error: {}", msg) + } + } + } +} +impl std::error::Error for UserDataMigratorError {} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoversionRule { + conversion_rule: String, +} +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct MigrationConfigEntry { + namespace: String, + key: String, + default: Value, + getter: String, + setter: String, + #[serde(skip_serializing_if = "Option::is_none")] + setter_rule: Option, + #[serde(skip_serializing_if = "Option::is_none")] + legacy_to_plugin_value_conversion: Option, + migrated: bool, +} + +type MigrationMap = HashMap; +#[derive(Clone, Debug)] +pub struct UserDataMigrator { + migration_config: Arc>, // persistent migration map + config_file_path: String, // path to the migration map file + response_tx: Sender, + response_rx: Arc>>, +} + +impl UserDataMigrator { + pub fn create() -> Option { + let possible_config_file_paths = vec![ + format!("/etc/{}", USER_DATA_MIGRATION_CONFIG_FILE_NAME), + format!( + "{}/{}", + RIPPLE_STORAGE_DIR, USER_DATA_MIGRATION_CONFIG_FILE_NAME + ), + format!("./{}", USER_DATA_MIGRATION_CONFIG_FILE_NAME), + ]; + + for path in possible_config_file_paths { + if Path::new(&path).exists() { + debug!("Found migration map file: {}", path); + if let Some(migration_map) = Self::load_migration_config(&path) { + let (response_tx, response_rx) = mpsc::channel(16); + return Some(UserDataMigrator { + migration_config: Arc::new(Mutex::new(migration_map)), + config_file_path: path.to_string(), + response_tx, + response_rx: Arc::new(Mutex::new(response_rx)), + }); + } + } + } + debug!("No migration map file found"); + None + } + + async fn get_matching_migration_entry_by_method( + &self, + method: &str, + ) -> Option { + let migration_map = self.migration_config.lock().await; + migration_map + .values() + .find(|entry| entry.getter == method || entry.setter == method) + .cloned() + } + + /// function to intercept and handle broker request. Perform migration if needed + pub async fn intercept_broker_request( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &mut BrokerRequest, + ) -> bool { + let method = request.rpc.method.clone(); + info!( + "intercept_broker_request: Intercepting broker request for method: {:?}", + method + ); + if let Some(config_entry) = self.get_matching_migration_entry_by_method(&method).await { + if config_entry.setter == method { + return self + .handle_setter_request(broker, ws_tx, request, &config_entry) + .await; + } else { + return self + .handle_getter_request(broker, ws_tx, request, &config_entry) + .await; + } + } + + info!( + "intercept_broker_request: No migration entry found for method: {:?}", + method + ); + // Continue with the original request if no migration entry is found + false + } + + async fn handle_setter_request( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) -> bool { + info!( + "intercept_broker_request: Handling setter request for method: {:?}", + config_entry.setter + ); + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + + let params = self.extract_params(&request.rpc.params_json); + info!( + "intercept_broker_request: Updating legacy storage with new value: {:?}", + params + ); + + let _ = self + .write_to_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + ¶ms, + ) + .await; + + // Return false to continue with the original setter request + false + } + + async fn handle_getter_request( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) -> bool { + info!( + "intercept_broker_request: Handling getter request for method: {:?}", + config_entry.getter + ); + if !config_entry.migrated { + let self_arc = Arc::new(self.clone()); + self_arc + .invoke_perform_getter_migration(broker, ws_tx.clone(), request, config_entry) + .await; + return true; + } + + // The migration already done, continue with the original request + info!( + "intercept_broker_request: Migration already done for method: {:?}", + config_entry.getter + ); + false + } + + fn extract_params(&self, params_json: &str) -> Value { + if let Ok(mut extract) = serde_json::from_str::>(params_json) { + if let Some(last) = extract.pop() { + return last.get("value").cloned().unwrap_or(last); + } + } + Value::Null + } + + async fn read_from_legacy_storage( + &self, + namespace: &str, + key: &str, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + ) -> Result { + let request_id = EndpointBrokerState::get_next_id(); + let call_sign = "org.rdk.PersistentStore.1.".to_owned(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // create the request to the legacy storage + let thunder_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": format!("{}getValue", call_sign), + "params": json!({ + "namespace": namespace, + "key": key, + "scope": "device", + }) + }) + .to_string(); + + info!( + "read_from_legacy_storage: Sending request : {:?}", + thunder_request + ); + + // send the request to the legacy storage + if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { + error!("read_from_legacy_storage: Failed to send request: {:?}", e); + broker.unregister_custom_callback(request_id).await; + return Err(e); + } + + let response = + Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await; + self.process_response_from_legacy_storage(response) + } + + fn process_response_from_legacy_storage( + &self, + response: Result, + ) -> Result { + info!("process_response_from_legacy_storage: Processing response"); + let response = response.map_err(|e| { + error!("Failed to get response: {}", e); + UserDataMigratorError::ResponseError(e.to_string()) + })?; + + let result = response.data.result.ok_or_else(|| { + UserDataMigratorError::ResponseError("No result field in response".to_string()) + })?; + + let value = result.get("value").ok_or_else(|| { + UserDataMigratorError::ResponseError("No value field in response".to_string()) + })?; + + let value_str = value.as_str().ok_or_else(|| { + UserDataMigratorError::ResponseError("Value is not a string".to_string()) + })?; + + let storage_data: StorageData = serde_json::from_str(value_str).map_err(|_e| { + UserDataMigratorError::ResponseError("Failed to deserialize JSON".to_string()) + })?; + + let final_value = storage_data.value; + + info!( + "process_response_from_legacy_storage: Successfully read from legacy storage: {:?}", + final_value + ); + + Ok(final_value.clone()) + } + + async fn write_to_legacy_storage( + &self, + namespace: &str, + key: &str, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + params_json: &Value, + ) -> Result<(), UserDataMigratorError> { + let request_id = EndpointBrokerState::get_next_id(); + let call_sign = "org.rdk.PersistentStore.1.".to_owned(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // set storage data in the format required by the legacy storage + let data = StorageData::new(params_json.clone()); + // create the request to the legacy storage + let thunder_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": format!("{}setValue", call_sign), + "params": json!({ + "namespace": namespace, + "key": key, + "value": data, + "scope": "device", + }) + }) + .to_string(); + info!( + "write_to_legacy_storage: Sending request : {:?}", + thunder_request + ); + + // send the request to the legacy storage + if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { + error!("write_to_legacy_storage: Failed to send request: {:?}", e); + // Unregister the custom callback and return + broker.unregister_custom_callback(request_id).await; + return Ok(()); + } + + // Spawn a task to wait for the response as we don't want to block the main thread + let response_rx = self.response_rx.clone(); + let broker_clone = broker.clone(); + tokio::spawn(async move { + match UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await { + Ok(response) => { + // Handle the successful response here + info!( + "write_to_legacy_storage: Successfully received response: {:?}", + response + ); + } + Err(e) => { + error!("Error waiting for response: {:?}", e); + } + } + }); + Ok(()) + } + + async fn read_from_thunder_plugin( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + ) -> Result { + let request_id = EndpointBrokerState::get_next_id(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // Create the request to the new plugin + // The current implementation assumes no params for the getter function + // extend the migration configuration to support params if needed + let thunder_plugin_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": request.rule.alias, + }) + .to_string(); + + info!( + "read_from_thunder_plugin: Sending request to plugin: {:?}", + thunder_plugin_request + ); + + // send the request to the new pluin as thunder request + if let Err(e) = self + .send_thunder_request(&ws_tx, &thunder_plugin_request) + .await + { + error!( + "perform_getter_migration: Failed to send thunder request: {:?}", + e + ); + broker.unregister_custom_callback(request_id).await; + return Err(e); + } + + // get the response from the custom callback + Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await + } + + fn transform_requets_params( + params_json: &Value, + rule: &Rule, + method: &str, + ) -> Result { + let data: Value = json!({ + "value": params_json + }); + + if let Some(filter) = rule + .transform + .get_transform_data(RuleTransformType::Request) + { + return crate::broker::rules_engine::jq_compile( + data, + &filter, + format!("{}_request", method), + ); + } + serde_json::to_value(&data).map_err(|e| { + error!( + "Failed to serialize data in transform_requets_params: {}", + e + ); + RippleError::BrokerError(e.to_string()) + }) + } + + async fn write_to_thunder_plugin( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + params_json: &Value, // param from the legacy storage + ) -> Result { + // get the setter rule from the rule engine by giving the setter method name + let setter_rule = Self::retrive_setter_rule_from_rule_engine(config_entry)?; + // apply the setter rule to the params_json + let transformed_params = + Self::transform_requets_params(params_json, &setter_rule, &config_entry.setter); + // rerurn error if the transform fails + let transformed_params = match transformed_params { + Ok(params) => params, + Err(e) => { + return Err(UserDataMigratorError::RequestTransformError(e.to_string())); + } + }; + + let request_id = EndpointBrokerState::get_next_id(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // create the request to the new plugin + let thunder_plugin_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": setter_rule.alias, + "params": transformed_params, + }) + .to_string(); + + info!( + "write_to_thunder_plugin: Sending request to plugin: {:?}", + thunder_plugin_request + ); + + // send the request to the new plugin as thunder request + if let Err(e) = self + .send_thunder_request(&ws_tx, &thunder_plugin_request) + .await + { + error!( + "write_to_thunder_plugin: Failed to send thunder request: {:?}", + e + ); + broker.unregister_custom_callback(request_id).await; + return Err(e); + } + + // get the response from the custom callback, unregister the callback and return the response + Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await + } + + async fn send_thunder_request( + &self, + ws_tx: &Arc, Message>>>, + request: &str, + ) -> Result<(), UserDataMigratorError> { + let mut ws_tx = ws_tx.lock().await; + ws_tx + .feed(Message::Text(request.to_string())) + .await + .map_err(|e| UserDataMigratorError::ThunderRequestError(e.to_string()))?; + ws_tx + .flush() + .await + .map_err(|e| UserDataMigratorError::ThunderRequestError(e.to_string()))?; + Ok(()) + } + + async fn wait_for_response( + response_rx: Arc>>, + broker: ThunderBroker, + request_id: u64, + ) -> Result { + let mut response_rx = response_rx.lock().await; + let response = match timeout(Duration::from_secs(30), response_rx.recv()).await { + Ok(Some(response)) => { + info!("wait_for_response : Received response : {:?}", response); + response + } + Ok(None) => { + error!("No response received at custom write_to_legacy_storage"); + broker.unregister_custom_callback(request_id).await; + return Err(UserDataMigratorError::TimeoutError); + } + Err(_) => { + error!("Error receiving response at custom write_to_legacy_storage"); + broker.unregister_custom_callback(request_id).await; + return Err(UserDataMigratorError::TimeoutError); + } + }; + broker.unregister_custom_callback(request_id).await; + Ok(response) + } + + fn retrive_setter_rule_from_rule_engine( + config_entry: &MigrationConfigEntry, + ) -> Result { + // TBD: get the getter rule from the rule engine by giving the setter method name + let setter_rule = config_entry.setter_rule.clone(); + // return rule if available else return error + if let Some(rule) = setter_rule { + Ok(rule) + } else { + Err(UserDataMigratorError::SetterRuleNotAvailable) + } + } + + async fn invoke_perform_getter_migration( + self: Arc, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) { + // Clone the parameters to move into the spawned task + let broker_clone = broker.clone(); + let ws_tx_clone = ws_tx.clone(); + let request_clone = request.clone(); + let config_entry_clone = config_entry.clone(); + let self_clone = Arc::clone(&self); + + tokio::spawn(async move { + match self_clone + .perform_getter_migration( + &broker_clone, + ws_tx_clone.clone(), + &request_clone, + &config_entry_clone, + ) + .await + { + Ok((_, Some(mut output))) => { + // Handle the case where output is returned. Send the response to the default callback of the broker. + // The response structure will be upated with the originial request id for the callback handler to match the response. + output.data.id = Some(request_clone.rpc.ctx.call_id); + if let Err(e) = broker_clone + .get_default_callback() + .sender + .send(output) + .await + { + error!("Failed to send response: {:?}", e); + } + } + Ok((_, None)) | Err(_) => { + // Handle the case where no output is returned. Read the value from the plugin and send the response + // The response will be sent to the default callback of the broker. + // The response structure will be upated with the originial request id for the callback handler to match the response. + let value_from_thunder_plugin = self_clone + .read_from_thunder_plugin( + &broker_clone, + ws_tx_clone.clone(), + &request_clone, + ) + .await; + match value_from_thunder_plugin { + Ok(mut value_from_thunder_plugin) => { + value_from_thunder_plugin.data.id = Some(request_clone.rpc.ctx.call_id); + if let Err(e) = broker_clone + .get_default_callback() + .sender + .send(value_from_thunder_plugin) + .await + { + error!("Failed to send response: {:?}", e); + } + } + Err(_e) => { + broker_clone + .get_default_callback() + .send_error(request_clone, RippleError::ProcessorError) + .await + } + } + } + } + }); + } + + async fn perform_getter_migration( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) -> Result<(bool, Option), UserDataMigratorError> { + let legacy_value = self + .read_from_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + ) + .await?; + info!( + "perform_getter_migration: Read from legacy storage: {:?}", + legacy_value + ); + + let output_from_thunder_plugin = self + .read_from_thunder_plugin(broker, ws_tx.clone(), request) + .await?; + + let mut response = output_from_thunder_plugin.clone().data; + let data_for_callback = response.clone(); + + if let Some(filter) = request + .rule + .transform + .get_transform_data(RuleTransformType::Response) + { + endpoint_broker::apply_response(filter, &request.rule.alias, &mut response); + } + + if let Some(result) = response.result { + // legacy storage has some value. It needs to be migrated to the plugin in case plugin has default value + return self + .check_migration_cases( + result, + legacy_value, + broker, + ws_tx, + config_entry, + data_for_callback, + ) + .await; + } + + Err(UserDataMigratorError::ResponseError( + "No data collected from Legacy Storage".to_string(), + )) + } + + async fn check_migration_cases( + &self, + result: Value, + legacy_value: Value, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + data_for_callback: JsonRpcApiResponse, + ) -> Result<(bool, Option), UserDataMigratorError> { + info!( + "perform_getter_migration: Checking migration cases. Legacy value: {:?}, Plugin value: {:?}, Config default: {:?}", + legacy_value, result, config_entry.default + ); + if result != config_entry.default { + // Case 1: Plugin has non-default value. Updating legacy storage with new value + return self + .handle_non_default_plugin_value( + result, + legacy_value, + broker, + ws_tx, + config_entry, + data_for_callback, + ) + .await; + } + + if legacy_value != config_entry.default { + // Case 2: Plugin has default value and Legacy storage has the latest value + return self + .handle_default_plugin_value(legacy_value, broker, ws_tx, config_entry) + .await; + } + + // Case 3: Both plugin and legacy storage have default value + info!( + "perform_getter_migration: Both plugin and legacy storage have default value. No migration needed." + ); + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + Ok(( + true, + Some(BrokerOutput { + data: data_for_callback, + }), + )) + } + + async fn handle_non_default_plugin_value( + &self, + result: Value, + legacy_value: Value, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + data_for_callback: JsonRpcApiResponse, + ) -> Result<(bool, Option), UserDataMigratorError> { + info!( + "perform_getter_migration: Plugin has non-default value. Updating legacy storage with new value: {:?}", + result + ); + if result != legacy_value { + self.update_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + &result, + ) + .await?; + } + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + Ok(( + true, + Some(BrokerOutput { + data: data_for_callback, + }), + )) + } + + async fn handle_default_plugin_value( + &self, + legacy_value: Value, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + ) -> Result<(bool, Option), UserDataMigratorError> { + info!( + "perform_getter_migration: Plugin has default value and Legacy storage has the latest value. Updating plugin with value from legacy storage: {:?}", + legacy_value + ); + let mut response = self + .update_plugin_from_legacy(broker, ws_tx.clone(), config_entry, &legacy_value) + .await?; + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + + response.data.result = Some(legacy_value.clone()); + if let Some(conversion_rule) = &config_entry.legacy_to_plugin_value_conversion { + let data = crate::broker::rules_engine::jq_compile( + json!({ "value": legacy_value }), + &conversion_rule.conversion_rule, + "legacy_to_plugin_value_conversion".to_string(), + ); + if let Ok(data) = data { + response.data.result = Some(data); + } + } + Ok((true, Some(response))) + } + + async fn update_legacy_storage( + &self, + namespace: &str, + key: &str, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + value: &Value, + ) -> Result<(), UserDataMigratorError> { + self.write_to_legacy_storage(namespace, key, broker, ws_tx, value) + .await + } + + async fn update_plugin_from_legacy( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + value: &Value, + ) -> Result { + self.write_to_thunder_plugin(broker, ws_tx, config_entry, value) + .await + } + + // function to set the migration flag to true and update the migration map in the config file + async fn set_migration_status(&self, namespace: &str, key: &str) { + let mut config_entry_changed = false; + { + let mut migration_map = self.migration_config.lock().await; + if let Some(config_entry) = migration_map + .values_mut() + .find(|entry| entry.namespace == namespace && entry.key == key) + { + if !config_entry.migrated { + config_entry.migrated = true; + config_entry_changed = true; + } + } + } + + // save the migration map to the config file after releasing the lock in case config_entry_changed + if config_entry_changed { + if let Err(e) = self.update_migration_config_file().await { + error!("Failed to update migration config file: {}", e); + } + } + } + // load the migration map from the file + pub fn load_migration_config(config_file_path: &str) -> Option { + let file = File::open(config_file_path).ok()?; + let reader = std::io::BufReader::new(file); + Some(serde_json::from_reader(reader).unwrap_or_else(|_| HashMap::new())) + } + + // function to update the migration status in the config file + async fn update_migration_config_file(&self) -> Result<(), String> { + if Path::new(&self.config_file_path).exists() { + let migration_map = self.migration_config.lock().await; + let file = OpenOptions::new() + .write(true) + .truncate(true) + .open(&self.config_file_path) + .map_err(|e| format!("Failed to open migration config file: {}", e))?; + serde_json::to_writer_pretty(file, &*migration_map) + .map_err(|e| format!("Failed to write to migration config file: {}", e))?; + Ok(()) + } else { + Err(format!( + "Migration config file not found at path {}", + self.config_file_path + )) + } + } +} diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 4872ad063..2a1058f31 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -20,6 +20,7 @@ use super::{ BrokerSender, BrokerSubMap, EndpointBroker, }, thunder::thunder_plugins_status_mgr::StatusManager, + thunder::user_data_migrator::UserDataMigrator, }; use crate::broker::broker_utils::BrokerUtils; use futures_util::{SinkExt, StreamExt}; @@ -27,11 +28,13 @@ use futures_util::{SinkExt, StreamExt}; use ripple_sdk::{ api::gateway::rpc_gateway_api::JsonRpcApiResponse, log::{debug, error, info}, + tokio::sync::Mutex, tokio::{self, sync::mpsc}, utils::error::RippleError, }; use serde_json::json; use std::{ + collections::HashMap, sync::{Arc, RwLock}, vec, }; @@ -42,43 +45,90 @@ pub struct ThunderBroker { subscription_map: Arc>, cleaner: BrokerCleaner, status_manager: StatusManager, + default_callback: BrokerCallback, + data_migrator: Option, + custom_callback_list: Arc>>, } impl ThunderBroker { + fn new( + sender: BrokerSender, + subscription_map: Arc>, + cleaner: BrokerCleaner, + default_callback: BrokerCallback, + ) -> Self { + Self { + sender, + subscription_map, + cleaner, + status_manager: StatusManager::new(), + default_callback, + data_migrator: None, + custom_callback_list: Arc::new(Mutex::new(HashMap::new())), + } + } + + fn with_data_migtator(mut self) -> Self { + self.data_migrator = UserDataMigrator::create(); + self + } + + pub fn get_default_callback(&self) -> BrokerCallback { + self.default_callback.clone() + } + + pub async fn register_custom_callback(&self, id: u64, callback: BrokerCallback) { + let mut custom_callback_list = self.custom_callback_list.lock().await; + custom_callback_list.insert(id, callback); + } + + pub async fn unregister_custom_callback(&self, id: u64) { + let mut custom_callback_list = self.custom_callback_list.lock().await; + custom_callback_list.remove(&id); + } + + async fn get_broker_callback(&self, id: Option) -> BrokerCallback { + if id.is_none() { + return self.default_callback.clone(); + } + let custom_callback_list = self.custom_callback_list.lock().await; + if let Some(callback) = custom_callback_list.get(&id.unwrap()) { + return callback.clone(); + } + self.default_callback.clone() + } + fn start(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { let endpoint = request.endpoint.clone(); let (tx, mut tr) = mpsc::channel(10); let (c_tx, mut c_tr) = mpsc::channel(2); let sender = BrokerSender { sender: tx }; let subscription_map = Arc::new(RwLock::new(request.sub_map.clone())); - let broker = Self { - sender, - subscription_map, - cleaner: BrokerCleaner { - cleaner: Some(c_tx.clone()), - }, - status_manager: StatusManager::new(), + let cleaner = BrokerCleaner { + cleaner: Some(c_tx.clone()), }; + let broker = Self::new(sender, subscription_map, cleaner, callback).with_data_migtator(); let broker_c = broker.clone(); let broker_for_cleanup = broker.clone(); - let callback_for_sender = callback.clone(); let broker_for_reconnect = broker.clone(); tokio::spawn(async move { - let (mut ws_tx, mut ws_rx) = - BrokerUtils::get_ws_broker(&endpoint.get_url(), None).await; + let (ws_tx, mut ws_rx) = BrokerUtils::get_ws_broker(&endpoint.get_url(), None).await; + let ws_tx_wrap = Arc::new(Mutex::new(ws_tx)); // send the first request to the broker. This is the controller statechange subscription request let status_request = broker_c .status_manager .generate_state_change_subscribe_request(); + { + let mut ws_tx = ws_tx_wrap.lock().await; - let _feed = ws_tx - .feed(tokio_tungstenite::tungstenite::Message::Text( - status_request.to_string(), - )) - .await; - let _flush = ws_tx.flush().await; - + let _feed = ws_tx + .feed(tokio_tungstenite::tungstenite::Message::Text( + status_request.to_string(), + )) + .await; + let _flush = ws_tx.flush().await; + } tokio::pin! { let read = ws_rx.next(); } @@ -88,12 +138,13 @@ impl ThunderBroker { match value { Ok(v) => { if let tokio_tungstenite::tungstenite::Message::Text(t) = v { - if broker_c.status_manager.is_controller_response(broker_c.get_sender(), callback.clone(), t.as_bytes()).await { - broker_c.status_manager.handle_controller_response(broker_c.get_sender(), callback.clone(), t.as_bytes()).await; + if broker_c.status_manager.is_controller_response(broker_c.get_sender(), broker_c.get_default_callback(), t.as_bytes()).await { + broker_c.status_manager.handle_controller_response(broker_c.get_sender(), broker_c.get_default_callback(), t.as_bytes()).await; } else { // send the incoming text without context back to the sender - Self::handle_jsonrpc_response(t.as_bytes(),callback.clone()) + let id = Self::get_id_from_result(t.as_bytes()); + Self::handle_jsonrpc_response(t.as_bytes(),broker_c.get_broker_callback(id).await) } } }, @@ -105,14 +156,43 @@ impl ThunderBroker { } }, - Some(request) = tr.recv() => { + Some(mut request) = tr.recv() => { debug!("Got request from receiver for broker {:?}", request); - match broker_c.prepare_request(&request) { - Ok(updated_request) => { - debug!("Sending request to broker {:?}", updated_request); - for r in updated_request { - let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; - let _flush = ws_tx.flush().await; + + match broker_c.check_and_generate_plugin_activation_request(&request) { + Ok(requests) => { + if !requests.is_empty() { + let mut ws_tx = ws_tx_wrap.lock().await; + for r in requests { + let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; + let _flush = ws_tx.flush().await; + } + } + else { + // empty request means plugin is activated and ready to process the request + // Intercept the request for data migration + let mut request_consumed = false; + if let Some(user_data_migrator) = broker_c.data_migrator.clone() { + request_consumed = user_data_migrator.intercept_broker_request(&broker_c, ws_tx_wrap.clone(), &mut request).await; + } + + // If the request is not consumed by the data migrator, continue with the request + if !request_consumed { + match broker_c.prepare_request(&request) { + Ok(updated_request) => { + debug!("Sending request to broker {:?}", updated_request); + let binding = ws_tx_wrap.clone(); + let mut ws_tx = binding.lock().await; + for r in updated_request { + let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; + let _flush = ws_tx.flush().await; + } + } + Err(e) => { + broker_c.get_default_callback().send_error(request,e).await + } + } + } } } Err(e) => { @@ -121,10 +201,11 @@ impl ThunderBroker { info!("Thunder Service not ready, request is now in pending list {:?}", request); }, _ => - callback_for_sender.send_error(request,e).await + broker_c.get_default_callback().send_error(request,e).await + } } } - } + }, Some(cleanup_request) = c_tr.recv() => { let value = { @@ -168,6 +249,13 @@ impl ThunderBroker { new_response } + fn get_id_from_result(result: &[u8]) -> Option { + if let Ok(data) = serde_json::from_slice::(result) { + return data.id; + } + None + } + fn get_callsign_and_method_from_alias(alias: &str) -> (String, Option<&str>) { let mut collection: Vec<&str> = alias.split('.').collect(); let method = collection.pop(); @@ -207,29 +295,14 @@ impl ThunderBroker { } response } -} - -impl EndpointBroker for ThunderBroker { - fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { - Self::start(request, callback) - } - - fn get_sender(&self) -> BrokerSender { - self.sender.clone() - } - - fn get_cleaner(&self) -> BrokerCleaner { - self.cleaner.clone() - } - fn prepare_request( + fn check_and_generate_plugin_activation_request( &self, rpc_request: &super::endpoint_broker::BrokerRequest, ) -> Result, RippleError> { let mut requests = Vec::new(); - let rpc = rpc_request.clone().rpc; - let id = rpc.ctx.call_id; let (callsign, method) = Self::get_callsign_and_method_from_alias(&rpc_request.rule.alias); + if method.is_none() { return Err(RippleError::InvalidInput); } @@ -269,6 +342,31 @@ impl EndpointBroker for ThunderBroker { requests.push(request.to_string()); return Ok(requests); } + Ok(requests) + } +} + +impl EndpointBroker for ThunderBroker { + fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { + Self::start(request, callback) + } + + fn get_sender(&self) -> BrokerSender { + self.sender.clone() + } + + fn get_cleaner(&self) -> BrokerCleaner { + self.cleaner.clone() + } + + fn prepare_request( + &self, + rpc_request: &super::endpoint_broker::BrokerRequest, + ) -> Result, RippleError> { + let rpc = rpc_request.clone().rpc; + let id = rpc.ctx.call_id; + let (callsign, method) = Self::get_callsign_and_method_from_alias(&rpc_request.rule.alias); + let mut requests = Vec::new(); let method = method.unwrap(); // Below chunk of code is basically for subscription where thunder needs some special care based on