diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index e45fc2db2..76ca7191b 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -862,7 +862,6 @@ fn apply_filter(broker_request: &BrokerRequest, result: &Value, rpc_request: &Rp &filter, format!("{}_event filter", rpc_request.ctx.method), ) { - println!("apply_filter: {:?}", r); if r.is_null() { return false; } else { diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index 2be82dca3..ca82c2f21 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -89,15 +89,21 @@ pub struct Rule { // Not every rule needs transform #[serde(default)] pub transform: RuleTransform, + #[serde(skip_serializing_if = "Option::is_none")] pub filter: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub endpoint: Option, } #[derive(Debug, Clone, Deserialize, Default, Serialize)] pub struct RuleTransform { + #[serde(skip_serializing_if = "Option::is_none")] pub request: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub response: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub event: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub event_decorator_method: Option, } diff --git a/core/main/src/broker/thunder/user_data_migrator.rs b/core/main/src/broker/thunder/user_data_migrator.rs index 3468ad070..629fa8677 100644 --- a/core/main/src/broker/thunder/user_data_migrator.rs +++ b/core/main/src/broker/thunder/user_data_migrator.rs @@ -15,24 +15,26 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::collections::HashMap; -use std::fmt; -use std::fs::{File, OpenOptions}; -use std::path::Path; -use std::sync::Arc; - -use openrpc_validator::jsonschema::output; -use ripple_sdk::tokio::net::TcpStream; -use ripple_sdk::utils::error::RippleError; +use std::{ + collections::HashMap, + fmt, + fs::{File, OpenOptions}, + path::Path, + sync::Arc, +}; + use ripple_sdk::{ log::{debug, error, info}, tokio::{ self, + net::TcpStream, sync::mpsc::{self, Receiver, Sender}, sync::Mutex, time::{timeout, Duration}, }, + utils::error::RippleError, }; + use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -46,6 +48,9 @@ use futures_util::SinkExt; use crate::broker::thunder_broker::ThunderBroker; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; + +// TBD get the storage dir from manifest or other Ripple config file +const RIPPLE_STORAGE_DIR: &str = "/opt/persistent/ripple"; const USER_DATA_MIGRATION_CONFIG_FILE_NAME: &str = "user_data_migration_config.json"; #[derive(Debug)] @@ -75,19 +80,26 @@ impl fmt::Display for UserDataMigratorError { } } impl std::error::Error for UserDataMigratorError {} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoversionRule { + conversion_rule: String, +} #[derive(Clone, Debug, Deserialize, Serialize)] pub struct MigrationConfigEntry { namespace: String, key: String, default: Value, getter: String, - setter_rule: Option, setter: String, + #[serde(skip_serializing_if = "Option::is_none")] + setter_rule: Option, + #[serde(skip_serializing_if = "Option::is_none")] + legacy_to_plugin_value_conversion: Option, migrated: bool, } type MigrationMap = HashMap; -// This struct is responsible for migrating user data from the legacy storage to the new storage. #[derive(Clone, Debug)] pub struct UserDataMigrator { migration_config: Arc>, // persistent migration map @@ -101,8 +113,8 @@ impl UserDataMigrator { let possible_config_file_paths = vec![ format!("/etc/{}", USER_DATA_MIGRATION_CONFIG_FILE_NAME), format!( - "/opt/persistent/ripple/{}", - USER_DATA_MIGRATION_CONFIG_FILE_NAME + "{}/{}", + RIPPLE_STORAGE_DIR, USER_DATA_MIGRATION_CONFIG_FILE_NAME ), format!("./{}", USER_DATA_MIGRATION_CONFIG_FILE_NAME), ]; @@ -136,7 +148,7 @@ impl UserDataMigrator { .cloned() } - // function to intercept and handle broker request. Perform migration if needed + /// function to intercept and handle broker request. Perform migration if needed pub async fn intercept_broker_request( &self, broker: &ThunderBroker, @@ -144,68 +156,96 @@ impl UserDataMigrator { request: &mut BrokerRequest, ) -> bool { let method = request.rpc.method.clone(); + info!( + "intercept_broker_request: Intercepting broker request for method: {:?}", + method + ); if let Some(config_entry) = self.get_matching_migration_entry_by_method(&method).await { - // migration entry found for either getter or setter method - // for setter case, irrespective of the migration status, update the new value in the new storage and sync - // with the legacy storage - if config_entry.setter == method { - // perform the setter update and sync up logic asynchronously - // update legacy storage with the new value as fire and forget operation - self.set_migration_status(&config_entry.namespace, &config_entry.key) + return self + .handle_setter_request(broker, ws_tx, request, &config_entry) .await; + } else { + return self + .handle_getter_request(broker, ws_tx, request, &config_entry) + .await; + } + } - let mut params = Value::Null; + // Continue with the original request if no migration entry is found + false + } - // Get the params from the request as is. No need to transform the params for legacy storage - if let Ok(mut extract) = - serde_json::from_str::>(&request.rpc.params_json) - { - // Get the param to use in write to legacy storage - if let Some(last) = extract.pop() { - // Extract the value field from the last - params = last.get("value").cloned().unwrap_or(last); - } - } + async fn handle_setter_request( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) -> bool { + info!( + "intercept_broker_request: Handling setter request for method: {:?}", + config_entry.setter + ); + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; - info!( - "intercept_broker_request: Updating legacy storage with new value: {:?}", - params - ); - let _ = self - .write_to_legacy_storage( - &config_entry.namespace, - &config_entry.key, - &broker, - ws_tx.clone(), - params.to_string().as_str(), - ) - .await; - // returning false to continue with the original setter request - return false; - } else { - // perform the getter migration logic asynchronously - if !config_entry.migrated { - let self_arc = Arc::new(self.clone()); - let _ = self_arc - .call_perform_getter_migration( - &broker, - ws_tx.clone(), - &request, - &config_entry, - ) - .await; - return true; - } else { - // the migration is already done, continue with the original request - return false; - } - } + let params = self.extract_params(&request.rpc.params_json); + info!( + "intercept_broker_request: Updating legacy storage with new value: {:?}", + params + ); + + let _ = self + .write_to_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + ¶ms, + ) + .await; + + // Return false to continue with the original setter request + false + } + + async fn handle_getter_request( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) -> bool { + info!( + "intercept_broker_request: Handling getter request for method: {:?}", + config_entry.getter + ); + if !config_entry.migrated { + let self_arc = Arc::new(self.clone()); + self_arc + .invoke_perform_getter_migration(broker, ws_tx.clone(), request, config_entry) + .await; + return true; } - // continue with the original request + + // The migration already done, continue with the original request + info!( + "intercept_broker_request: Migration already done for method: {:?}", + config_entry.getter + ); false } + fn extract_params(&self, params_json: &str) -> Value { + if let Ok(mut extract) = serde_json::from_str::>(params_json) { + if let Some(last) = extract.pop() { + return last.get("value").cloned().unwrap_or(last); + } + } + Value::Null + } + async fn read_from_legacy_storage( &self, namespace: &str, @@ -215,6 +255,18 @@ impl UserDataMigrator { ) -> Result { let request_id = EndpointBrokerState::get_next_id(); let call_sign = "org.rdk.PersistentStore.1.".to_owned(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // create the request to the legacy storage let thunder_request = json!({ "jsonrpc": "2.0", "id": request_id, @@ -227,15 +279,10 @@ impl UserDataMigrator { }) .to_string(); - // Register custom callback to handle the response - broker - .register_custom_callback( - request_id, - BrokerCallback { - sender: self.response_tx.clone(), - }, - ) - .await; + info!( + "read_from_legacy_storage: Sending request to plugin: {:?}", + thunder_request + ); // send the request to the legacy storage if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { @@ -247,13 +294,10 @@ impl UserDataMigrator { broker.unregister_custom_callback(request_id).await; return Err(e); } - // get the response from the custom callback - let response_rx = self.response_rx.clone(); - let broker_clone = broker.clone(); // get the response and check if the response is successful by checking result or error field. // Value::Null is a valid response, return Err if the response is not successful let response = - UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await; + Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await; match response { Ok(response) => { if let Some(result) = response.data.result { @@ -280,10 +324,22 @@ impl UserDataMigrator { key: &str, broker: &ThunderBroker, ws_tx: Arc, Message>>>, - params_json: &str, + params_json: &Value, ) -> Result<(), UserDataMigratorError> { let request_id = EndpointBrokerState::get_next_id(); let call_sign = "org.rdk.PersistentStore.1.".to_owned(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // create the request to the legacy storage let thunder_request = json!({ "jsonrpc": "2.0", "id": request_id, @@ -296,15 +352,10 @@ impl UserDataMigrator { }) }) .to_string(); - // Register custom callback to handle the response - broker - .register_custom_callback( - request_id, - BrokerCallback { - sender: self.response_tx.clone(), - }, - ) - .await; + info!( + "write_to_legacy_storage: Sending request to plugin: {:?}", + thunder_request + ); // send the request to the legacy storage if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { @@ -337,20 +388,13 @@ impl UserDataMigrator { Ok(()) } - async fn read_from_true_north_plugin( + async fn read_from_thunder_plugin( &self, broker: &ThunderBroker, ws_tx: Arc, Message>>>, request: &BrokerRequest, ) -> Result { - // no params for the getter function let request_id = EndpointBrokerState::get_next_id(); - let thunder_plugin_request = json!({ - "jsonrpc": "2.0", - "id": request_id, - "method": request.rule.alias, - }) - .to_string(); // Register custom callback to handle the response broker @@ -362,6 +406,21 @@ impl UserDataMigrator { ) .await; + // Create the request to the new plugin + // The current implementation assumes no params for the getter function + // extend the migration configuration to support params if needed + let thunder_plugin_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": request.rule.alias, + }) + .to_string(); + + info!( + "read_from_thunder_plugin: Sending request to plugin: {:?}", + thunder_plugin_request + ); + // send the request to the new pluin as thunder request if let Err(e) = self .send_thunder_request(&ws_tx, &thunder_plugin_request) @@ -376,14 +435,11 @@ impl UserDataMigrator { } // get the response from the custom callback - let response_rx = self.response_rx.clone(); - let broker_clone = broker.clone(); - - UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await + Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await } fn transform_requets_params( - params_json: &str, + params_json: &Value, rule: &Rule, method: &str, ) -> Result { @@ -403,12 +459,13 @@ impl UserDataMigrator { } Ok(serde_json::to_value(&data).unwrap()) } - async fn write_to_true_north_plugin( + + async fn write_to_thunder_plugin( &self, broker: &ThunderBroker, ws_tx: Arc, Message>>>, config_entry: &MigrationConfigEntry, - params_json: &str, // param from the legacy storage + params_json: &Value, // param from the legacy storage ) -> Result { // get the setter rule from the rule engine by giving the setter method name let setter_rule = Self::retrive_setter_rule_from_rule_engine(config_entry)?; @@ -422,15 +479,8 @@ impl UserDataMigrator { return Err(UserDataMigratorError::RequestTransformError(e.to_string())); } }; - // create the request to the new plugin + let request_id = EndpointBrokerState::get_next_id(); - let thunder_plugin_request = json!({ - "jsonrpc": "2.0", - "id": request_id, - "method": setter_rule.alias, - "params": transformed_params, - }) - .to_string(); // Register custom callback to handle the response broker @@ -442,13 +492,27 @@ impl UserDataMigrator { ) .await; + // create the request to the new plugin + let thunder_plugin_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": setter_rule.alias, + "params": transformed_params, + }) + .to_string(); + + info!( + "write_to_thunder_plugin: Sending request to plugin: {:?}", + thunder_plugin_request + ); + // send the request to the new plugin as thunder request if let Err(e) = self .send_thunder_request(&ws_tx, &thunder_plugin_request) .await { error!( - "write_to_true_north_plugin: Failed to send thunder request: {:?}", + "write_to_thunder_plugin: Failed to send thunder request: {:?}", e ); broker.unregister_custom_callback(request_id).await; @@ -456,10 +520,7 @@ impl UserDataMigrator { } // get the response from the custom callback - let response_rx = self.response_rx.clone(); - let broker_clone = broker.clone(); - - UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await + Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await } async fn send_thunder_request( @@ -516,13 +577,13 @@ impl UserDataMigrator { } } - async fn call_perform_getter_migration( + async fn invoke_perform_getter_migration( self: Arc, broker: &ThunderBroker, ws_tx: Arc, Message>>>, request: &BrokerRequest, config_entry: &MigrationConfigEntry, - ) -> (bool, Option) { + ) { // Clone the parameters to move into the spawned task let broker_clone = broker.clone(); let ws_tx_clone = ws_tx.clone(); @@ -541,42 +602,40 @@ impl UserDataMigrator { .await { Ok((_, Some(mut output))) => { - let broker_clone = broker_clone.clone(); + // Handle the case where output is returned. Send the response to the default callback of the broker. + // The response structure will be upated with the originial request id for the callback handler to match the response. output.data.id = Some(request_clone.rpc.ctx.call_id); - tokio::spawn(async move { - if let Err(e) = broker_clone - .get_default_callback() - .sender - .send(output) - .await - { - error!("Failed to send response: {:?}", e); - } - }); + if let Err(e) = broker_clone + .get_default_callback() + .sender + .send(output) + .await + { + error!("Failed to send response: {:?}", e); + } } Ok((_, None)) | Err(_) => { - // Handle the case where no output is returned - let value_from_plugin = self_clone - .read_from_true_north_plugin( + // Handle the case where no output is returned. Read the value from the plugin and send the response + // The response will be sent to the default callback of the broker. + // The response structure will be upated with the originial request id for the callback handler to match the response. + let value_from_thunder_plugin = self_clone + .read_from_thunder_plugin( &broker_clone, ws_tx_clone.clone(), &request_clone, ) .await; - match value_from_plugin { - Ok(mut value_from_plugin) => { - let broker_clone = broker_clone.clone(); - value_from_plugin.data.id = Some(request_clone.rpc.ctx.call_id); - tokio::spawn(async move { - if let Err(e) = broker_clone - .get_default_callback() - .sender - .send(value_from_plugin) - .await - { - error!("Failed to send response: {:?}", e); - } - }); + match value_from_thunder_plugin { + Ok(mut value_from_thunder_plugin) => { + value_from_thunder_plugin.data.id = Some(request_clone.rpc.ctx.call_id); + if let Err(e) = broker_clone + .get_default_callback() + .sender + .send(value_from_thunder_plugin) + .await + { + error!("Failed to send response: {:?}", e); + } } Err(_e) => { broker_clone @@ -588,9 +647,6 @@ impl UserDataMigrator { } } }); - - // Always return (false, None) to free up the calling thread - (false, None) } async fn perform_getter_migration( @@ -608,12 +664,16 @@ impl UserDataMigrator { ws_tx.clone(), ) .await?; - - let value_from_plugin = self - .read_from_true_north_plugin(broker, ws_tx.clone(), request) + info!( + "perform_getter_migration: Read from legacy storage: {:?}", + legacy_value + ); + let value_from_thunder_plugin = self + .read_from_thunder_plugin(broker, ws_tx.clone(), request) .await?; - let mut data = value_from_plugin.clone().data; + // apply the response transform to the data from the plugin + let mut data = value_from_thunder_plugin.clone().data; if let Some(filter) = request .rule .transform @@ -633,31 +693,43 @@ impl UserDataMigrator { &config_entry.key, broker, ws_tx.clone(), - &result.to_string(), + &result, ) .await?; self.set_migration_status(&config_entry.namespace, &config_entry.key) .await; - return Ok((true, Some(BrokerOutput { data }))); + // this BrokerOutput has the data from the plugin + Ok((true, Some(BrokerOutput { data }))) } else if legacy_value != config_entry.default { info!( "perform_getter_migration: Plugin has default value and Legacy storage has the latest value. Updating plugin with value from legacy storage: {:?}", legacy_value ); - let response = self - .update_plugin_from_legacy( - broker, - ws_tx.clone(), - config_entry, - &legacy_value.to_string(), - ) + let mut response = self + .update_plugin_from_legacy(broker, ws_tx.clone(), config_entry, &legacy_value) .await?; self.set_migration_status(&config_entry.namespace, &config_entry.key) .await; + + // this BrokerOutput has the data from the legacy storage. + // prepare the response to send back to the caller as response from plugin but with the data from the legacy storage + response.data.result = Some(legacy_value.clone()); + // check if there is any value conversion rule available + if let Some(conversion_rule) = &config_entry.legacy_to_plugin_value_conversion { + // apply the conversion rule to the data from the legacy storage + let data = crate::broker::rules_engine::jq_compile( + json!({ "value": legacy_value }), + &conversion_rule.conversion_rule, + "legacy_to_plugin_value_conversion".to_string(), + ); + if let Ok(data) = data { + response.data.result = Some(data); + } + } return Ok((true, Some(response))); } else { info!( - "perform_getter_migration: Both plugin and legacy storage have default value. Continuing with the original request" + "perform_getter_migration: Both plugin and legacy storage have default value. No migration needed." ); self.set_migration_status(&config_entry.namespace, &config_entry.key) .await; @@ -676,7 +748,7 @@ impl UserDataMigrator { key: &str, broker: &ThunderBroker, ws_tx: Arc, Message>>>, - value: &str, + value: &Value, ) -> Result<(), UserDataMigratorError> { self.write_to_legacy_storage(namespace, key, broker, ws_tx, value) .await @@ -687,118 +759,12 @@ impl UserDataMigrator { broker: &ThunderBroker, ws_tx: Arc, Message>>>, config_entry: &MigrationConfigEntry, - value: &str, + value: &Value, ) -> Result { - self.write_to_true_north_plugin(broker, ws_tx, config_entry, value) + self.write_to_thunder_plugin(broker, ws_tx, config_entry, value) .await } - /* - async fn perform_getter_migration( - &self, - broker: &ThunderBroker, - ws_tx: Arc, Message>>>, - request: &BrokerRequest, - config_entry: &MigrationConfigEntry, - ) -> Result<(bool, Option), UserDataMigratorError> { - let legacy_value = self - .read_from_legacy_storage( - &config_entry.namespace, - &config_entry.key, - broker, - ws_tx.clone(), - ) - .await; - - match legacy_value { - Ok(legacy_value) => { - let value_from_plugin = self - .read_from_true_north_plugin(broker, ws_tx.clone(), request) - .await; - match value_from_plugin { - Ok(value_from_plugin) => { - // apply the response transform rule if any - let mut data = value_from_plugin.clone().data; - if let Some(filter) = request - .rule - .transform - .get_transform_data(RuleTransformType::Response) - { - endpoint_broker::apply_response(filter, &request.rule.alias, &mut data); - } - if let Some(result) = data.clone().result { - // if the plugins has a non default value, assuming that it is holding the latest value - // update the legacy storage with the new value - if result != config_entry.default { - self.write_to_legacy_storage( - &config_entry.namespace, - &config_entry.key, - broker, - ws_tx.clone(), - &result.to_string(), - ) - .await; - self.set_migration_status( - &config_entry.namespace, - &config_entry.key, - ) - .await; - - // create broker output with the result - let response = BrokerOutput { data }; - Ok((true, Some(response))) - } else { - // plugin has the default value, now check if the legacy storage has a non default value - // if so, update the plugin with the value from the legacy storage - if legacy_value != config_entry.default { - let response = self - .write_to_true_north_plugin( - broker, - ws_tx.clone(), - config_entry, - &legacy_value.to_string(), - ) - .await; - match response { - Ok(response) => { - self.set_migration_status( - &config_entry.namespace, - &config_entry.key, - ) - .await; - Ok((true, Some(response))) - } - Err(e) => Err(e), - } - } else { - // both the plugin and the legacy storage has the default value, no need to update - // continue with the original request - self.set_migration_status( - &config_entry.namespace, - &config_entry.key, - ) - .await; - // create broker output with the result - let response = BrokerOutput { data }; - Ok((true, Some(response))) - } - } - } else { - Err(UserDataMigratorError::ResponseError( - "No result in response".to_string(), - )) - } - } - Err(e) => Err(e), - } - } - Err(e) => { - // TBD : Add more detailed error code like no entry in legacy storage, etc - Err(e) - } - } - } - */ // function to set the migration flag to true and update the migration map in the config file async fn set_migration_status(&self, namespace: &str, key: &str) { let mut config_entry_changed = false;