From 715a8fad0cf96799a3fd0caddf5753db9704930f Mon Sep 17 00:00:00 2001 From: VinodSathyaseelan Date: Mon, 14 Oct 2024 12:25:26 -0700 Subject: [PATCH 1/6] feat: Implement UserDataMigrator Framework - Rule based Shim Layer --- core/main/src/broker/thunder/mod.rs | 1 + .../src/broker/thunder/user_data_migrator.rs | 302 ++++++++++++++++++ core/main/src/broker/thunder_broker.rs | 191 ++++++++--- 3 files changed, 448 insertions(+), 46 deletions(-) create mode 100644 core/main/src/broker/thunder/user_data_migrator.rs diff --git a/core/main/src/broker/thunder/mod.rs b/core/main/src/broker/thunder/mod.rs index 816e08e0c..c280a6468 100644 --- a/core/main/src/broker/thunder/mod.rs +++ b/core/main/src/broker/thunder/mod.rs @@ -15,3 +15,4 @@ // SPDX-License-Identifier: Apache-2.0 // pub mod thunder_plugins_status_mgr; +pub mod user_data_migrator; diff --git a/core/main/src/broker/thunder/user_data_migrator.rs b/core/main/src/broker/thunder/user_data_migrator.rs new file mode 100644 index 000000000..71e9e27be --- /dev/null +++ b/core/main/src/broker/thunder/user_data_migrator.rs @@ -0,0 +1,302 @@ +// Copyright 2023 Comcast Cable Communications Management, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::path::Path; +use std::sync::Arc; + +use ripple_sdk::tokio::net::TcpStream; +use ripple_sdk::{ + log::{debug, error, info}, + tokio::{ + self, + sync::mpsc::{self, Receiver, Sender}, + sync::Mutex, + time::{timeout, Duration}, + }, +}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +use crate::broker::endpoint_broker::{ + BrokerCallback, BrokerOutput, BrokerRequest, EndpointBrokerState, +}; +use futures::stream::SplitSink; +use futures_util::SinkExt; + +use crate::broker::thunder_broker::ThunderBroker; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +const USER_DATA_MIGRATION_CONFIG_FILE_NAME: &str = "user_data_migration_config.json"; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct MigrationConfigEntry { + namespace: String, + key: String, + default: Value, + getter: String, + setter: String, + migrated: bool, +} + +type MigrationMap = HashMap; +// This struct is responsible for migrating user data from the legacy storage to the new storage. +#[derive(Clone, Debug)] +pub struct UserDataMigrator { + migration_config: Arc>, // persistent migration map + config_file_path: String, // path to the migration map file + response_tx: Sender, + response_rx: Arc>>, +} + +impl UserDataMigrator { + pub fn create() -> Option { + let possible_config_file_paths = vec![ + format!("/etc/{}", USER_DATA_MIGRATION_CONFIG_FILE_NAME), + format!( + "/opt/persistent/ripple/{}", + USER_DATA_MIGRATION_CONFIG_FILE_NAME + ), + format!("./{}", USER_DATA_MIGRATION_CONFIG_FILE_NAME), + ]; + + for path in possible_config_file_paths { + if Path::new(&path).exists() { + debug!("Found migration map file: {}", path); + if let Some(migration_map) = Self::load_migration_config(&path) { + let (response_tx, response_rx) = mpsc::channel(16); + return Some(UserDataMigrator { + migration_config: Arc::new(Mutex::new(migration_map)), + config_file_path: path.to_string(), + response_tx, + response_rx: Arc::new(Mutex::new(response_rx)), + }); + } + } + } + debug!("No migration map file found"); + None + } + + async fn get_matching_migration_entry_on_method( + &self, + method: &str, + ) -> Option { + let migration_map = self.migration_config.lock().await; + migration_map + .values() + .find(|entry| entry.getter == method || entry.setter == method) + .cloned() + } + + // function to intercept and handle broker request. Perform migration if needed + pub async fn intercept_broker_request( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &mut BrokerRequest, + ) -> (bool, Option) { + let method = request.rpc.method.clone(); + if let Some(config_entry) = self.get_matching_migration_entry_on_method(&method).await { + // migration entry found for either getter or setter method + // for setter case, irrespective of the migration status, update the new value in the new storage and sync + // with the legacy storage + + if config_entry.setter == method { + // perform the setter update and sync up logic asynchronously + // update legacy storage with the new value as fire and forget operation + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + // TBD: apply transform rule if any and get the params. + self.write_to_legacy_storage( + &config_entry.namespace, + &config_entry.key, + &broker, + ws_tx.clone(), + &request, + &config_entry.default, + ) + .await; + // returning false to continue with the original setter request + return (false, None); + } else { + // perform the getter migration logic asynchronously + if !config_entry.migrated { + let migrated_value = self + .perform_getter_migration(&broker, &request, &config_entry) + .await; + return (false, Some(migrated_value)); + } else { + // the migration is already done, continue with the original request + return (false, None); + } + } + } + + // continue with the original request + (false, None) + } + + async fn write_to_legacy_storage( + &self, + namespace: &str, + key: &str, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + _request: &BrokerRequest, + value: &Value, + ) { + let request_id = EndpointBrokerState::get_next_id(); + let call_sign = "org.rdk.PersistentStore.1.".to_owned(); + let thunder_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": format!("{}setValue", call_sign), + "params": json!({ + "namespace": namespace, + "key": key, + "value": value.to_string(), + "scope": "device", + }) + }) + .to_string(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // send the request to the legacy storage + if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { + error!("Failed to send thunder request: {:?}", e); + return; + } + + // Spawn a task to wait for the response + let response_rx = self.response_rx.clone(); + let broker_clone = broker.clone(); + tokio::spawn(async move { + if let Err(e) = + UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await + { + error!("Error waiting for response: {:?}", e); + } + }); + } + + async fn send_thunder_request( + &self, + ws_tx: &Arc, Message>>>, + request: &str, + ) -> Result<(), Box> { + let mut ws_tx = ws_tx.lock().await; + ws_tx.feed(Message::Text(request.to_string())).await?; + ws_tx.flush().await?; + Ok(()) + } + + async fn wait_for_response( + response_rx: Arc>>, + broker: ThunderBroker, + request_id: u64, + ) -> Result<(), Box> { + let mut response_rx = response_rx.lock().await; + match timeout(Duration::from_secs(30), response_rx.recv()).await { + Ok(Some(response)) => { + info!( + "Received response at custom write_to_legacy_storage: {:?}", + response + ); + } + Ok(None) => { + error!("Failed to receive response"); + } + Err(_) => { + error!("Timeout waiting for response"); + } + } + broker.unregister_custom_callback(request_id).await; + Ok(()) + } + // function to perform the getter migration logic asynchronously + async fn perform_getter_migration( + &self, + broker: &ThunderBroker, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) -> Value { + let mut new_storage_value = Value::Null; + // Get the value from the new storage + //new_storage_value = self.get_new_storage_value(&broker, &request).await; + new_storage_value + } + + // function to set the migration flag to true and update the migration map in the config file + async fn set_migration_status(&self, namespace: &str, key: &str) { + let mut config_entry_changed = false; + { + let mut migration_map = self.migration_config.lock().await; + if let Some(mut config_entry) = migration_map + .values_mut() + .find(|entry| entry.namespace == namespace && entry.key == key) + { + if !config_entry.migrated { + config_entry.migrated = true; + config_entry_changed = true; + } + } + } + + // save the migration map to the config file after releasing the lock in case config_entry_changed + if config_entry_changed { + if let Err(e) = self.update_migration_config_file().await { + error!("Failed to update migration config file: {}", e); + } + } + } + // load the migration map from the file + pub fn load_migration_config(config_file_path: &str) -> Option { + let file = File::open(config_file_path).ok()?; + let reader = std::io::BufReader::new(file); + Some(serde_json::from_reader(reader).unwrap_or_else(|_| HashMap::new())) + } + + // function to update the migration status in the config file + async fn update_migration_config_file(&self) -> Result<(), String> { + if Path::new(&self.config_file_path).exists() { + let migration_map = self.migration_config.lock().await; + let file = OpenOptions::new() + .write(true) + .truncate(true) + .open(&self.config_file_path) + .map_err(|e| format!("Failed to open migration config file: {}", e))?; + serde_json::to_writer_pretty(file, &*migration_map) + .map_err(|e| format!("Failed to write to migration config file: {}", e))?; + Ok(()) + } else { + Err(format!( + "Migration config file not found at path {}", + self.config_file_path + )) + } + } +} diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 4872ad063..da33fd3e5 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -20,6 +20,7 @@ use super::{ BrokerSender, BrokerSubMap, EndpointBroker, }, thunder::thunder_plugins_status_mgr::StatusManager, + thunder::user_data_migrator::UserDataMigrator, }; use crate::broker::broker_utils::BrokerUtils; use futures_util::{SinkExt, StreamExt}; @@ -27,11 +28,13 @@ use futures_util::{SinkExt, StreamExt}; use ripple_sdk::{ api::gateway::rpc_gateway_api::JsonRpcApiResponse, log::{debug, error, info}, + tokio::sync::Mutex, tokio::{self, sync::mpsc}, utils::error::RippleError, }; use serde_json::json; use std::{ + collections::HashMap, sync::{Arc, RwLock}, vec, }; @@ -42,43 +45,90 @@ pub struct ThunderBroker { subscription_map: Arc>, cleaner: BrokerCleaner, status_manager: StatusManager, + default_callback: BrokerCallback, + data_migrator: Option, + custom_callback_list: Arc>>, } impl ThunderBroker { + fn new( + sender: BrokerSender, + subscription_map: Arc>, + cleaner: BrokerCleaner, + default_callback: BrokerCallback, + ) -> Self { + Self { + sender, + subscription_map, + cleaner, + status_manager: StatusManager::new(), + default_callback, + data_migrator: None, + custom_callback_list: Arc::new(Mutex::new(HashMap::new())), + } + } + + fn with_data_migtator(mut self) -> Self { + self.data_migrator = UserDataMigrator::create(); + self + } + + fn get_default_callback(&self) -> BrokerCallback { + self.default_callback.clone() + } + + pub async fn register_custom_callback(&self, id: u64, callback: BrokerCallback) { + let mut custom_callback_list = self.custom_callback_list.lock().await; + custom_callback_list.insert(id, callback); + } + + pub async fn unregister_custom_callback(&self, id: u64) { + let mut custom_callback_list = self.custom_callback_list.lock().await; + custom_callback_list.remove(&id); + } + + async fn get_broker_callback(&self, id: Option) -> BrokerCallback { + if id.is_none() { + return self.default_callback.clone(); + } + let custom_callback_list = self.custom_callback_list.lock().await; + if let Some(callback) = custom_callback_list.get(&id.unwrap()) { + return callback.clone(); + } + self.default_callback.clone() + } + fn start(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { let endpoint = request.endpoint.clone(); let (tx, mut tr) = mpsc::channel(10); let (c_tx, mut c_tr) = mpsc::channel(2); let sender = BrokerSender { sender: tx }; let subscription_map = Arc::new(RwLock::new(request.sub_map.clone())); - let broker = Self { - sender, - subscription_map, - cleaner: BrokerCleaner { - cleaner: Some(c_tx.clone()), - }, - status_manager: StatusManager::new(), + let cleaner = BrokerCleaner { + cleaner: Some(c_tx.clone()), }; + let broker = Self::new(sender, subscription_map, cleaner, callback).with_data_migtator(); let broker_c = broker.clone(); let broker_for_cleanup = broker.clone(); - let callback_for_sender = callback.clone(); let broker_for_reconnect = broker.clone(); tokio::spawn(async move { - let (mut ws_tx, mut ws_rx) = - BrokerUtils::get_ws_broker(&endpoint.get_url(), None).await; + let (ws_tx, mut ws_rx) = BrokerUtils::get_ws_broker(&endpoint.get_url(), None).await; + let ws_tx_wrap = Arc::new(Mutex::new(ws_tx)); // send the first request to the broker. This is the controller statechange subscription request let status_request = broker_c .status_manager .generate_state_change_subscribe_request(); + { + let mut ws_tx = ws_tx_wrap.lock().await; - let _feed = ws_tx - .feed(tokio_tungstenite::tungstenite::Message::Text( - status_request.to_string(), - )) - .await; - let _flush = ws_tx.flush().await; - + let _feed = ws_tx + .feed(tokio_tungstenite::tungstenite::Message::Text( + status_request.to_string(), + )) + .await; + let _flush = ws_tx.flush().await; + } tokio::pin! { let read = ws_rx.next(); } @@ -88,12 +138,13 @@ impl ThunderBroker { match value { Ok(v) => { if let tokio_tungstenite::tungstenite::Message::Text(t) = v { - if broker_c.status_manager.is_controller_response(broker_c.get_sender(), callback.clone(), t.as_bytes()).await { - broker_c.status_manager.handle_controller_response(broker_c.get_sender(), callback.clone(), t.as_bytes()).await; + if broker_c.status_manager.is_controller_response(broker_c.get_sender(), broker_c.get_default_callback(), t.as_bytes()).await { + broker_c.status_manager.handle_controller_response(broker_c.get_sender(), broker_c.get_default_callback(), t.as_bytes()).await; } else { // send the incoming text without context back to the sender - Self::handle_jsonrpc_response(t.as_bytes(),callback.clone()) + let id = Self::get_id_from_result(t.as_bytes()); + Self::handle_jsonrpc_response(t.as_bytes(),broker_c.get_broker_callback(id).await) } } }, @@ -105,14 +156,44 @@ impl ThunderBroker { } }, - Some(request) = tr.recv() => { + Some(mut request) = tr.recv() => { debug!("Got request from receiver for broker {:?}", request); - match broker_c.prepare_request(&request) { - Ok(updated_request) => { - debug!("Sending request to broker {:?}", updated_request); - for r in updated_request { - let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; - let _flush = ws_tx.flush().await; + + match broker_c.check_and_generate_plugin_activation_request(&request) { + Ok(requests) => { + if !requests.is_empty() { + let mut ws_tx = ws_tx_wrap.lock().await; + for r in requests { + let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; + let _flush = ws_tx.flush().await; + } + } + else { + // empty request means plugin is activated and ready to process the request + // Intercept the request for data migration + let mut request_consumed = false; + let mut response = None; + if let Some(user_data_migrator) = broker_c.data_migrator.clone() { + (request_consumed, response) = user_data_migrator.intercept_broker_request(&broker_c, ws_tx_wrap.clone(), &mut request).await; + } + + // If the request is not consumed by the data migrator, continue with the request + if !request_consumed { + match broker_c.prepare_request(&request) { + Ok(updated_request) => { + debug!("Sending request to broker {:?}", updated_request); + let binding = ws_tx_wrap.clone(); + let mut ws_tx = binding.lock().await; + for r in updated_request { + let _feed = ws_tx.feed(tokio_tungstenite::tungstenite::Message::Text(r)).await; + let _flush = ws_tx.flush().await; + } + } + Err(e) => { + broker_c.get_default_callback().send_error(request,e).await + } + } + } } } Err(e) => { @@ -121,10 +202,11 @@ impl ThunderBroker { info!("Thunder Service not ready, request is now in pending list {:?}", request); }, _ => - callback_for_sender.send_error(request,e).await + broker_c.get_default_callback().send_error(request,e).await + } } } - } + }, Some(cleanup_request) = c_tr.recv() => { let value = { @@ -168,6 +250,13 @@ impl ThunderBroker { new_response } + fn get_id_from_result(result: &[u8]) -> Option { + if let Ok(data) = serde_json::from_slice::(result) { + return data.id; + } + None + } + fn get_callsign_and_method_from_alias(alias: &str) -> (String, Option<&str>) { let mut collection: Vec<&str> = alias.split('.').collect(); let method = collection.pop(); @@ -207,29 +296,14 @@ impl ThunderBroker { } response } -} - -impl EndpointBroker for ThunderBroker { - fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { - Self::start(request, callback) - } - - fn get_sender(&self) -> BrokerSender { - self.sender.clone() - } - - fn get_cleaner(&self) -> BrokerCleaner { - self.cleaner.clone() - } - fn prepare_request( + fn check_and_generate_plugin_activation_request( &self, rpc_request: &super::endpoint_broker::BrokerRequest, ) -> Result, RippleError> { let mut requests = Vec::new(); - let rpc = rpc_request.clone().rpc; - let id = rpc.ctx.call_id; let (callsign, method) = Self::get_callsign_and_method_from_alias(&rpc_request.rule.alias); + if method.is_none() { return Err(RippleError::InvalidInput); } @@ -269,6 +343,31 @@ impl EndpointBroker for ThunderBroker { requests.push(request.to_string()); return Ok(requests); } + Ok(requests) + } +} + +impl EndpointBroker for ThunderBroker { + fn get_broker(request: BrokerConnectRequest, callback: BrokerCallback) -> Self { + Self::start(request, callback) + } + + fn get_sender(&self) -> BrokerSender { + self.sender.clone() + } + + fn get_cleaner(&self) -> BrokerCleaner { + self.cleaner.clone() + } + + fn prepare_request( + &self, + rpc_request: &super::endpoint_broker::BrokerRequest, + ) -> Result, RippleError> { + let rpc = rpc_request.clone().rpc; + let id = rpc.ctx.call_id; + let (callsign, method) = Self::get_callsign_and_method_from_alias(&rpc_request.rule.alias); + let mut requests = Vec::new(); let method = method.unwrap(); // Below chunk of code is basically for subscription where thunder needs some special care based on From f7915883a6e4aaf891c1c4dd7155b6fa47fb4c05 Mon Sep 17 00:00:00 2001 From: VinodSathyaseelan Date: Tue, 15 Oct 2024 23:39:57 -0700 Subject: [PATCH 2/6] feat : Added migraton logic for getters --- core/main/src/broker/endpoint_broker.rs | 57 +-- core/main/src/broker/rules_engine.rs | 12 +- .../src/broker/thunder/user_data_migrator.rs | 428 ++++++++++++++++-- core/main/src/broker/thunder_broker.rs | 10 + 4 files changed, 440 insertions(+), 67 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 9431d6ef6..c4f1789d7 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -610,10 +610,11 @@ impl BrokerOutputForwarder { apply_rule_for_event( &broker_request, &result, - &rpc_request, + &rpc_request.ctx.method, &mut response, ); - if !apply_filter(&broker_request, &result, &rpc_request) { + if !apply_filter(&broker_request, &result, &rpc_request.ctx.method) + { continue; } // check if the request transform has event_decorator_method @@ -674,7 +675,7 @@ impl BrokerOutputForwarder { } response.result = Some(json!({ "listening" : rpc_request.is_listening(), - "event" : rpc_request.ctx.method + "event" : &rpc_request.ctx.method })); platform_state.endpoint_state.update_unsubscribe_request(id); } else { @@ -688,7 +689,7 @@ impl BrokerOutputForwarder { if let Some(filter) = broker_request.rule.transform.get_transform_data( super::rules_engine::RuleTransformType::Response, ) { - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); } else if response.result.is_none() && response.error.is_none() { response.result = Some(Value::Null); } @@ -792,9 +793,9 @@ async fn forward_extn_event( } } -fn apply_response( +pub fn apply_response( result_response_filter: String, - rpc_request: &RpcRequest, + method: &str, response: &mut JsonRpcApiResponse, ) { match serde_json::to_value(response.clone()) { @@ -802,7 +803,7 @@ fn apply_response( match jq_compile( input, &result_response_filter, - format!("{}_response", rpc_request.ctx.method), + format!("{}_response", method), ) { Ok(jq_out) => { trace!( @@ -837,7 +838,7 @@ fn apply_response( fn apply_rule_for_event( broker_request: &BrokerRequest, result: &Value, - rpc_request: &RpcRequest, + method: &str, response: &mut JsonRpcApiResponse, ) { if let Some(filter) = broker_request @@ -845,23 +846,15 @@ fn apply_rule_for_event( .transform .get_transform_data(super::rules_engine::RuleTransformType::Event) { - if let Ok(r) = jq_compile( - result.clone(), - &filter, - format!("{}_event", rpc_request.ctx.method), - ) { + if let Ok(r) = jq_compile(result.clone(), &filter, format!("{}_event", method)) { response.result = Some(r); } } } -fn apply_filter(broker_request: &BrokerRequest, result: &Value, rpc_request: &RpcRequest) -> bool { +fn apply_filter(broker_request: &BrokerRequest, result: &Value, method: &str) -> bool { if let Some(filter) = broker_request.rule.filter.clone() { - if let Ok(r) = jq_compile( - result.clone(), - &filter, - format!("{}_event filter", rpc_request.ctx.method), - ) { + if let Ok(r) = jq_compile(result.clone(), &filter, format!("{}_event filter", method)) { println!("apply_filter: {:?}", r); if r.is_null() { return false; @@ -1005,7 +998,7 @@ mod tests { let filter = "if .result and .result.success then (.result.stbVersion | split(\"_\") [0]) elif .error then if .error.code == -32601 then {error: { code: -1, message: \"Unknown method.\" }} else \"Error occurred with a different code\" end else \"No result or recognizable error\" end".to_string(); //let mut response = JsonRpcApiResponse::mock(); //response.error = Some(error); - apply_response(filter, &rpc_request, &mut output.data); + apply_response(filter, &rpc_request.ctx.method, &mut output.data); //let msg = output.data.error.unwrap().get("message").unwrap().clone(); assert_eq!( output.data.error.unwrap().get("message").unwrap().clone(), @@ -1020,7 +1013,7 @@ mod tests { let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then null else .error end".to_string(); //let mut response = JsonRpcApiResponse::mock(); //response.error = Some(error); - apply_response(filter, &rpc_request, &mut output.data); + apply_response(filter, &rpc_request.ctx.method, &mut output.data); assert_eq!(output.data.error, None); assert_eq!(output.data.result.unwrap(), serde_json::Value::Null); @@ -1032,7 +1025,7 @@ mod tests { let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then null else { error: .error } end".to_string(); //let mut response = JsonRpcApiResponse::mock(); //response.error = Some(error.clone()); - apply_response(filter, &rpc_request, &mut output.data); + apply_response(filter, &rpc_request.ctx.method, &mut output.data); assert_eq!(output.data.error, Some(error)); } @@ -1059,7 +1052,7 @@ mod tests { let mut data = JsonRpcApiResponse::mock(); data.result = Some(result); let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - apply_response(filter, &rpc_request, &mut output.data); + apply_response(filter, &rpc_request.ctx.method, &mut output.data); assert_eq!(output.data.result.unwrap(), "SCXI11BEI".to_string()); // device.videoResolution @@ -1069,7 +1062,7 @@ mod tests { response.result = Some(result); //let data = JsonRpcApiResponse::mock(); //let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), json!([1920, 1080])); // device.audio @@ -1079,7 +1072,7 @@ mod tests { let filter = "if .result and .result.success then .result | {\"stereo\": (.supportedAudioFormat | index(\"PCM\") > 0),\"dolbyDigital5.1\": (.supportedAudioFormat | index(\"DOLBY AC3\") > 0),\"dolbyDigital5.1plus\": (.supportedAudioFormat | index(\"DOLBY EAC3\") > 0),\"dolbyAtmos\": (.supportedAudioFormat | index(\"DOLBY EAC3 ATMOS\") > 0)} elif .error then if .error.code == -32601 then \"Unknown method.\" else \"Error occurred with a different code\" end else \"No result or recognizable error\" end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!( response.result.unwrap(), json!({"dolbyAtmos": true, "dolbyDigital5.1": true, "dolbyDigital5.1plus": false, "stereo": true}) @@ -1094,7 +1087,7 @@ mod tests { let filter = "if .result and .result.success then (.result.interfaces | .[] | select(.connected) | {\"state\": \"connected\",\"type\": .interface | ascii_downcase }) elif .error then if .error.code == -32601 then \"Unknown method.\" else \"Error occurred with a different code\" end else \"No result or recognizable error\" end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!( response.result.unwrap(), json!({"state":"connected", "type":"wifi"}) @@ -1107,7 +1100,7 @@ mod tests { let filter = "if .result.success then (if .result.friendlyName | length == 0 then \"Living Room\" else .result.friendlyName end) else \"Living Room\" end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), json!("my_device")); // localization.language @@ -1118,7 +1111,7 @@ mod tests { .to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), json!("FR")); @@ -1129,7 +1122,7 @@ mod tests { let filter = "if .result.success then (if .result.friendlyName | length == 0 then \"Living Room\" else .result.friendlyName end) else \"Living Room\" end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), json!("my_device")); @@ -1140,7 +1133,7 @@ mod tests { let filter = "if .result.success then null else { code: -32100, message: \"couldn't set skip restriction\" } end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), serde_json::Value::Null); @@ -1151,7 +1144,7 @@ mod tests { let filter = "if .result.success then .result.value elif .error.code==22 or .error.code==43 then \"null\" else .error end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), "some_value"); // localization.countryCode @@ -1161,7 +1154,7 @@ mod tests { let filter = "if .result.success then if .result.territory == \"ITA\" then \"IT\" elif .result.territory == \"GBR\" then \"GB\" elif .result.territory == \"IRL\" then \"IE\" elif .result.territory == \"DEU\" then \"DE\" elif .result.territory == \"AUS\" then \"AU\" else \"GB\" end end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); - apply_response(filter, &rpc_request, &mut response); + apply_response(filter, &rpc_request.ctx.method, &mut response); assert_eq!(response.result.unwrap(), "GB"); } } diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index c2de717b0..2be82dca3 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -24,7 +24,7 @@ use ripple_sdk::{ serde_json::Value, utils::error::RippleError, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::{fs, path::Path}; @@ -83,7 +83,7 @@ pub enum RuleEndpointProtocol { Thunder, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Rule { pub alias: String, // Not every rule needs transform @@ -93,7 +93,7 @@ pub struct Rule { pub endpoint: Option, } -#[derive(Debug, Clone, Deserialize, Default)] +#[derive(Debug, Clone, Deserialize, Default, Serialize)] pub struct RuleTransform { pub request: Option, pub response: Option, @@ -202,6 +202,12 @@ impl RuleEngine { } None } + pub fn get_rule_by_method(&self, method: &str) -> Option { + if let Some(rule) = self.rules.rules.get(&method.to_lowercase()).cloned() { + return Some(rule); + } + None + } } pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result { diff --git a/core/main/src/broker/thunder/user_data_migrator.rs b/core/main/src/broker/thunder/user_data_migrator.rs index 71e9e27be..05d0cb0df 100644 --- a/core/main/src/broker/thunder/user_data_migrator.rs +++ b/core/main/src/broker/thunder/user_data_migrator.rs @@ -16,11 +16,13 @@ // use std::collections::HashMap; +use std::fmt; use std::fs::{File, OpenOptions}; use std::path::Path; use std::sync::Arc; use ripple_sdk::tokio::net::TcpStream; +use ripple_sdk::utils::error::RippleError; use ripple_sdk::{ log::{debug, error, info}, tokio::{ @@ -34,8 +36,10 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use crate::broker::endpoint_broker::{ - BrokerCallback, BrokerOutput, BrokerRequest, EndpointBrokerState, + self, BrokerCallback, BrokerOutput, BrokerRequest, EndpointBrokerState, }; +use crate::broker::rules_engine::{Rule, RuleTransformType}; + use futures::stream::SplitSink; use futures_util::SinkExt; @@ -43,12 +47,40 @@ use crate::broker::thunder_broker::ThunderBroker; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; const USER_DATA_MIGRATION_CONFIG_FILE_NAME: &str = "user_data_migration_config.json"; +#[derive(Debug)] +enum UserDataMigratorError { + ThunderRequestError(String), + ResponseError(String), + SetterRuleNotAvailable, + RequestTransformError(String), + TimeoutError, +} + +impl fmt::Display for UserDataMigratorError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + UserDataMigratorError::ThunderRequestError(msg) => { + write!(f, "Thunder request error: {}", msg) + } + UserDataMigratorError::ResponseError(msg) => write!(f, "Response error: {}", msg), + UserDataMigratorError::TimeoutError => write!(f, "Timeout error"), + UserDataMigratorError::SetterRuleNotAvailable => { + write!(f, "Setter rule is not available") + } + UserDataMigratorError::RequestTransformError(msg) => { + write!(f, "Request transform error: {}", msg) + } + } + } +} +impl std::error::Error for UserDataMigratorError {} #[derive(Clone, Debug, Deserialize, Serialize)] pub struct MigrationConfigEntry { namespace: String, key: String, default: Value, getter: String, + setter_rule: Option, setter: String, migrated: bool, } @@ -92,7 +124,7 @@ impl UserDataMigrator { None } - async fn get_matching_migration_entry_on_method( + async fn get_matching_migration_entry_by_method( &self, method: &str, ) -> Option { @@ -109,9 +141,9 @@ impl UserDataMigrator { broker: &ThunderBroker, ws_tx: Arc, Message>>>, request: &mut BrokerRequest, - ) -> (bool, Option) { + ) -> (bool, Option) { let method = request.rpc.method.clone(); - if let Some(config_entry) = self.get_matching_migration_entry_on_method(&method).await { + if let Some(config_entry) = self.get_matching_migration_entry_by_method(&method).await { // migration entry found for either getter or setter method // for setter case, irrespective of the migration status, update the new value in the new storage and sync // with the legacy storage @@ -127,8 +159,7 @@ impl UserDataMigrator { &config_entry.key, &broker, ws_tx.clone(), - &request, - &config_entry.default, + &request.rpc.params_json, ) .await; // returning false to continue with the original setter request @@ -137,9 +168,18 @@ impl UserDataMigrator { // perform the getter migration logic asynchronously if !config_entry.migrated { let migrated_value = self - .perform_getter_migration(&broker, &request, &config_entry) + .perform_getter_migration(&broker, ws_tx.clone(), &request, &config_entry) .await; - return (false, Some(migrated_value)); + match migrated_value { + Ok((status, value)) => { + return (status, value); + } + Err(e) => { + error!("Error performing getter migration and continuing without migration {:?}", e); + // return false to continue with the original request + return (false, None); + } + } } else { // the migration is already done, continue with the original request return (false, None); @@ -151,14 +191,77 @@ impl UserDataMigrator { (false, None) } + async fn read_from_legacy_storage( + &self, + namespace: &str, + key: &str, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + ) -> Result { + let request_id = EndpointBrokerState::get_next_id(); + let call_sign = "org.rdk.PersistentStore.1.".to_owned(); + let thunder_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": format!("{}getValue", call_sign), + "params": json!({ + "namespace": namespace, + "key": key, + "scope": "device", + }) + }) + .to_string(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // send the request to the legacy storage + if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { + error!( + "read_from_legacy_storage: Failed to send thunder request: {:?}", + e + ); + // Unregister the custom callback and return + broker.unregister_custom_callback(request_id).await; + return Err(e); + } + // get the response from the custom callback + let response_rx = self.response_rx.clone(); + let broker_clone = broker.clone(); + // get the response and check if the response is successful by checking result or error field. + // Value::Null is a valid response, return Err if the response is not successful + let response = + UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await; + match response { + Ok(response) => { + if let Some(result) = response.data.result { + return Ok(result); + } else { + return Err(UserDataMigratorError::ResponseError( + "No result in response".to_string(), + )); + } + } + Err(e) => { + return Err(e); + } + } + } + async fn write_to_legacy_storage( &self, namespace: &str, key: &str, broker: &ThunderBroker, ws_tx: Arc, Message>>>, - _request: &BrokerRequest, - value: &Value, + params_json: &str, ) { let request_id = EndpointBrokerState::get_next_id(); let call_sign = "org.rdk.PersistentStore.1.".to_owned(); @@ -169,7 +272,7 @@ impl UserDataMigrator { "params": json!({ "namespace": namespace, "key": key, - "value": value.to_string(), + "value": params_json, "scope": "device", }) }) @@ -187,7 +290,12 @@ impl UserDataMigrator { // send the request to the legacy storage if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { - error!("Failed to send thunder request: {:?}", e); + error!( + "write_to_legacy_storage: Failed to send thunder request: {:?}", + e + ); + // Unregister the custom callback and return + broker.unregister_custom_callback(request_id).await; return; } @@ -195,22 +303,165 @@ impl UserDataMigrator { let response_rx = self.response_rx.clone(); let broker_clone = broker.clone(); tokio::spawn(async move { - if let Err(e) = - UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await - { - error!("Error waiting for response: {:?}", e); + match UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await { + Ok(response) => { + // Handle the successful response here + info!( + "write_to_legacy_storage: Successfully received response: {:?}", + response + ); + } + Err(e) => { + error!("Error waiting for response: {:?}", e); + } } }); } + async fn read_from_true_north_plugin( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + ) -> Result { + // no params for the getter function + let request_id = EndpointBrokerState::get_next_id(); + let thunder_plugin_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": request.rule.alias, + }) + .to_string(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // send the request to the new pluin as thunder request + if let Err(e) = self + .send_thunder_request(&ws_tx, &thunder_plugin_request) + .await + { + error!( + "perform_getter_migration: Failed to send thunder request: {:?}", + e + ); + broker.unregister_custom_callback(request_id).await; + return Err(e); + } + + // get the response from the custom callback + let response_rx = self.response_rx.clone(); + let broker_clone = broker.clone(); + + UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await + } + + fn transform_requets_params( + params_json: &str, + rule: &Rule, + method: &str, + ) -> Result { + if let Ok(mut params) = serde_json::from_str::>(¶ms_json) { + if params.len() > 1 { + if let Some(last) = params.pop() { + if let Some(filter) = rule + .transform + .get_transform_data(RuleTransformType::Request) + { + return crate::broker::rules_engine::jq_compile( + last, + &filter, + format!("{}_request", method), + ); + } + return Ok(serde_json::to_value(&last).unwrap()); + } + } else { + return Ok(Value::Null); + } + } + Err(RippleError::ParseError) + } + async fn write_to_true_north_plugin( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + params_json: &str, // param from the legacy storage + ) -> Result { + // get the setter rule from the rule engine by giving the setter method name + let setter_rule = Self::retrive_setter_rule_from_rule_engine(config_entry)?; + // apply the setter rule to the params_json + let transformed_params = + Self::transform_requets_params(params_json, &setter_rule, &config_entry.setter); + // rerurn error if the transform fails + let transformed_params = match transformed_params { + Ok(params) => params, + Err(e) => { + return Err(UserDataMigratorError::RequestTransformError(e.to_string())); + } + }; + // create the request to the new plugin + let request_id = EndpointBrokerState::get_next_id(); + let thunder_plugin_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": setter_rule.alias, + "params": transformed_params, + }) + .to_string(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // send the request to the new plugin as thunder request + if let Err(e) = self + .send_thunder_request(&ws_tx, &thunder_plugin_request) + .await + { + error!( + "write_to_true_north_plugin: Failed to send thunder request: {:?}", + e + ); + broker.unregister_custom_callback(request_id).await; + return Err(e); + } + + // get the response from the custom callback + let response_rx = self.response_rx.clone(); + let broker_clone = broker.clone(); + + UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await + } + async fn send_thunder_request( &self, ws_tx: &Arc, Message>>>, request: &str, - ) -> Result<(), Box> { + ) -> Result<(), UserDataMigratorError> { let mut ws_tx = ws_tx.lock().await; - ws_tx.feed(Message::Text(request.to_string())).await?; - ws_tx.flush().await?; + ws_tx + .feed(Message::Text(request.to_string())) + .await + .map_err(|e| UserDataMigratorError::ThunderRequestError(e.to_string()))?; + ws_tx + .flush() + .await + .map_err(|e| UserDataMigratorError::ThunderRequestError(e.to_string()))?; Ok(()) } @@ -218,36 +469,149 @@ impl UserDataMigrator { response_rx: Arc>>, broker: ThunderBroker, request_id: u64, - ) -> Result<(), Box> { + ) -> Result { let mut response_rx = response_rx.lock().await; - match timeout(Duration::from_secs(30), response_rx.recv()).await { + let response = match timeout(Duration::from_secs(30), response_rx.recv()).await { Ok(Some(response)) => { info!( "Received response at custom write_to_legacy_storage: {:?}", response ); + response } Ok(None) => { - error!("Failed to receive response"); + error!("No response received at custom write_to_legacy_storage"); + return Err(UserDataMigratorError::TimeoutError); } Err(_) => { - error!("Timeout waiting for response"); + error!("Error receiving response at custom write_to_legacy_storage"); + return Err(UserDataMigratorError::TimeoutError); } - } + }; broker.unregister_custom_callback(request_id).await; - Ok(()) + Ok(response) } - // function to perform the getter migration logic asynchronously + + fn retrive_setter_rule_from_rule_engine( + config_entry: &MigrationConfigEntry, + ) -> Result { + // TBD: get the getter rule from the rule engine by giving the setter method name + let setter_rule = config_entry.setter_rule.clone(); + // return rule if available else return error + if let Some(rule) = setter_rule { + return Ok(rule); + } else { + return Err(UserDataMigratorError::SetterRuleNotAvailable); + } + } + async fn perform_getter_migration( &self, broker: &ThunderBroker, + ws_tx: Arc, Message>>>, request: &BrokerRequest, config_entry: &MigrationConfigEntry, - ) -> Value { - let mut new_storage_value = Value::Null; - // Get the value from the new storage - //new_storage_value = self.get_new_storage_value(&broker, &request).await; - new_storage_value + ) -> Result<(bool, Option), UserDataMigratorError> { + let legacy_value = self + .read_from_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + ) + .await; + + match legacy_value { + Ok(legacy_value) => { + let value_from_plugin = self + .read_from_true_north_plugin(broker, ws_tx.clone(), request) + .await; + match value_from_plugin { + Ok(value_from_plugin) => { + // apply the response transform rule if any + let mut data = value_from_plugin.clone().data; + if let Some(filter) = request + .rule + .transform + .get_transform_data(RuleTransformType::Response) + { + endpoint_broker::apply_response(filter, &request.rule.alias, &mut data); + } + if let Some(result) = data.clone().result { + // if the plugins has a non default value, assuming that it is holding the latest value + // update the legacy storage with the new value + if result != config_entry.default { + self.write_to_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + &result.to_string(), + ) + .await; + self.set_migration_status( + &config_entry.namespace, + &config_entry.key, + ) + .await; + + // create broker output with the result + let response = BrokerOutput { data }; + return Ok((true, Some(response))); + } else { + // plugin has the default value, now check if the legacy storage has a non default value + // if so, update the plugin with the value from the legacy storage + if legacy_value != config_entry.default { + let response = self + .write_to_true_north_plugin( + broker, + ws_tx.clone(), + config_entry, + &legacy_value.to_string(), + ) + .await; + match response { + Ok(response) => { + self.set_migration_status( + &config_entry.namespace, + &config_entry.key, + ) + .await; + return Ok((true, Some(response))); + } + Err(e) => { + return Err(e); + } + } + } else { + // both the plugin and the legacy storage has the default value, no need to update + // continue with the original request + self.set_migration_status( + &config_entry.namespace, + &config_entry.key, + ) + .await; + // create broker output with the result + let response = BrokerOutput { data }; + return Ok((true, Some(response))); + } + } + } else { + return Err(UserDataMigratorError::ResponseError( + "No result in response".to_string(), + )); + } + } + Err(e) => { + return Err(e); + } + } + } + Err(e) => { + // TBD : Add more detailed error code like no entry in legacy storage, etc + return Err(e); + } + } } // function to set the migration flag to true and update the migration map in the config file @@ -255,7 +619,7 @@ impl UserDataMigrator { let mut config_entry_changed = false; { let mut migration_map = self.migration_config.lock().await; - if let Some(mut config_entry) = migration_map + if let Some(config_entry) = migration_map .values_mut() .find(|entry| entry.namespace == namespace && entry.key == key) { diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index da33fd3e5..e84d0a009 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -194,6 +194,16 @@ impl ThunderBroker { } } } + else { // Data migrator consumed the request + if let Some(response) = response { + let broker_clone = broker_c.clone(); + tokio::spawn(async move { + if let Err(e) = broker_clone.get_default_callback().sender.send(response).await { + error!("Failed to send response: {:?}", e); + } + }); + } + } } } Err(e) => { From 01da4fb8ecb82792005bc33228efd3682305e780 Mon Sep 17 00:00:00 2001 From: VinodSathyaseelan Date: Wed, 16 Oct 2024 22:25:14 -0700 Subject: [PATCH 3/6] test: fixed issues in setter migration param extraction --- core/main/src/broker/endpoint_broker.rs | 23 ++++--- .../src/broker/thunder/user_data_migrator.rs | 67 +++++++++++-------- 2 files changed, 55 insertions(+), 35 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index c4f1789d7..e45fc2db2 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -610,11 +610,10 @@ impl BrokerOutputForwarder { apply_rule_for_event( &broker_request, &result, - &rpc_request.ctx.method, + &rpc_request, &mut response, ); - if !apply_filter(&broker_request, &result, &rpc_request.ctx.method) - { + if !apply_filter(&broker_request, &result, &rpc_request) { continue; } // check if the request transform has event_decorator_method @@ -675,7 +674,7 @@ impl BrokerOutputForwarder { } response.result = Some(json!({ "listening" : rpc_request.is_listening(), - "event" : &rpc_request.ctx.method + "event" : rpc_request.ctx.method })); platform_state.endpoint_state.update_unsubscribe_request(id); } else { @@ -838,7 +837,7 @@ pub fn apply_response( fn apply_rule_for_event( broker_request: &BrokerRequest, result: &Value, - method: &str, + rpc_request: &RpcRequest, response: &mut JsonRpcApiResponse, ) { if let Some(filter) = broker_request @@ -846,15 +845,23 @@ fn apply_rule_for_event( .transform .get_transform_data(super::rules_engine::RuleTransformType::Event) { - if let Ok(r) = jq_compile(result.clone(), &filter, format!("{}_event", method)) { + if let Ok(r) = jq_compile( + result.clone(), + &filter, + format!("{}_event", rpc_request.ctx.method), + ) { response.result = Some(r); } } } -fn apply_filter(broker_request: &BrokerRequest, result: &Value, method: &str) -> bool { +fn apply_filter(broker_request: &BrokerRequest, result: &Value, rpc_request: &RpcRequest) -> bool { if let Some(filter) = broker_request.rule.filter.clone() { - if let Ok(r) = jq_compile(result.clone(), &filter, format!("{}_event filter", method)) { + if let Ok(r) = jq_compile( + result.clone(), + &filter, + format!("{}_event filter", rpc_request.ctx.method), + ) { println!("apply_filter: {:?}", r); if r.is_null() { return false; diff --git a/core/main/src/broker/thunder/user_data_migrator.rs b/core/main/src/broker/thunder/user_data_migrator.rs index 05d0cb0df..8b41fe19f 100644 --- a/core/main/src/broker/thunder/user_data_migrator.rs +++ b/core/main/src/broker/thunder/user_data_migrator.rs @@ -21,6 +21,7 @@ use std::fs::{File, OpenOptions}; use std::path::Path; use std::sync::Arc; +use jsonrpsee::types::params; use ripple_sdk::tokio::net::TcpStream; use ripple_sdk::utils::error::RippleError; use ripple_sdk::{ @@ -153,13 +154,30 @@ impl UserDataMigrator { // update legacy storage with the new value as fire and forget operation self.set_migration_status(&config_entry.namespace, &config_entry.key) .await; - // TBD: apply transform rule if any and get the params. + + let mut params = Value::Null; + + // Get the params from the request as is. No need to transform the params for legacy storage + if let Ok(mut extract) = + serde_json::from_str::>(&request.rpc.params_json) + { + // Get the param to use in write to legacy storage + if let Some(last) = extract.pop() { + // Extract the value field from the last + params = last.get("value").cloned().unwrap_or(last); + } + } + + info!( + "intercept_broker_request: Updating legacy storage with new value: {:?}", + params + ); self.write_to_legacy_storage( &config_entry.namespace, &config_entry.key, &broker, ws_tx.clone(), - &request.rpc.params_json, + params.to_string().as_str(), ) .await; // returning false to continue with the original setter request @@ -277,7 +295,10 @@ impl UserDataMigrator { }) }) .to_string(); - + println!( + "write_to_legacy_storage: thunder_request: {:?}", + thunder_request + ); // Register custom callback to handle the response broker .register_custom_callback( @@ -299,7 +320,7 @@ impl UserDataMigrator { return; } - // Spawn a task to wait for the response + // Spawn a task to wait for the response as we don't want to block the main thread let response_rx = self.response_rx.clone(); let broker_clone = broker.clone(); tokio::spawn(async move { @@ -368,26 +389,21 @@ impl UserDataMigrator { rule: &Rule, method: &str, ) -> Result { - if let Ok(mut params) = serde_json::from_str::>(¶ms_json) { - if params.len() > 1 { - if let Some(last) = params.pop() { - if let Some(filter) = rule - .transform - .get_transform_data(RuleTransformType::Request) - { - return crate::broker::rules_engine::jq_compile( - last, - &filter, - format!("{}_request", method), - ); - } - return Ok(serde_json::to_value(&last).unwrap()); - } - } else { - return Ok(Value::Null); - } + let data: Value = json!({ + "value": params_json + }); + + if let Some(filter) = rule + .transform + .get_transform_data(RuleTransformType::Request) + { + return crate::broker::rules_engine::jq_compile( + data, + &filter, + format!("{}_request", method), + ); } - Err(RippleError::ParseError) + Ok(serde_json::to_value(&data).unwrap()) } async fn write_to_true_north_plugin( &self, @@ -473,10 +489,7 @@ impl UserDataMigrator { let mut response_rx = response_rx.lock().await; let response = match timeout(Duration::from_secs(30), response_rx.recv()).await { Ok(Some(response)) => { - info!( - "Received response at custom write_to_legacy_storage: {:?}", - response - ); + info!("wait_for_response : Received response : {:?}", response); response } Ok(None) => { From df66b3261c4cf1261dff4892e9b8370c785733ab Mon Sep 17 00:00:00 2001 From: VinodSathyaseelan Date: Wed, 16 Oct 2024 22:38:08 -0700 Subject: [PATCH 4/6] test: fixed clippy errors --- .../src/broker/thunder/user_data_migrator.rs | 410 +++++++++++++----- core/main/src/broker/thunder_broker.rs | 15 +- 2 files changed, 293 insertions(+), 132 deletions(-) diff --git a/core/main/src/broker/thunder/user_data_migrator.rs b/core/main/src/broker/thunder/user_data_migrator.rs index 8b41fe19f..3468ad070 100644 --- a/core/main/src/broker/thunder/user_data_migrator.rs +++ b/core/main/src/broker/thunder/user_data_migrator.rs @@ -21,7 +21,7 @@ use std::fs::{File, OpenOptions}; use std::path::Path; use std::sync::Arc; -use jsonrpsee::types::params; +use openrpc_validator::jsonschema::output; use ripple_sdk::tokio::net::TcpStream; use ripple_sdk::utils::error::RippleError; use ripple_sdk::{ @@ -142,7 +142,7 @@ impl UserDataMigrator { broker: &ThunderBroker, ws_tx: Arc, Message>>>, request: &mut BrokerRequest, - ) -> (bool, Option) { + ) -> bool { let method = request.rpc.method.clone(); if let Some(config_entry) = self.get_matching_migration_entry_by_method(&method).await { // migration entry found for either getter or setter method @@ -172,41 +172,38 @@ impl UserDataMigrator { "intercept_broker_request: Updating legacy storage with new value: {:?}", params ); - self.write_to_legacy_storage( - &config_entry.namespace, - &config_entry.key, - &broker, - ws_tx.clone(), - params.to_string().as_str(), - ) - .await; + let _ = self + .write_to_legacy_storage( + &config_entry.namespace, + &config_entry.key, + &broker, + ws_tx.clone(), + params.to_string().as_str(), + ) + .await; // returning false to continue with the original setter request - return (false, None); + return false; } else { // perform the getter migration logic asynchronously if !config_entry.migrated { - let migrated_value = self - .perform_getter_migration(&broker, ws_tx.clone(), &request, &config_entry) + let self_arc = Arc::new(self.clone()); + let _ = self_arc + .call_perform_getter_migration( + &broker, + ws_tx.clone(), + &request, + &config_entry, + ) .await; - match migrated_value { - Ok((status, value)) => { - return (status, value); - } - Err(e) => { - error!("Error performing getter migration and continuing without migration {:?}", e); - // return false to continue with the original request - return (false, None); - } - } + return true; } else { // the migration is already done, continue with the original request - return (false, None); + return false; } } } - // continue with the original request - (false, None) + false } async fn read_from_legacy_storage( @@ -260,16 +257,20 @@ impl UserDataMigrator { match response { Ok(response) => { if let Some(result) = response.data.result { - return Ok(result); + // extract the value field from the result + if let Some(value) = result.get("value") { + return Ok(value.clone()); + } + Err(UserDataMigratorError::ResponseError( + "No value field in response".to_string(), + )) } else { - return Err(UserDataMigratorError::ResponseError( + Err(UserDataMigratorError::ResponseError( "No result in response".to_string(), - )); + )) } } - Err(e) => { - return Err(e); - } + Err(e) => Err(e), } } @@ -280,7 +281,7 @@ impl UserDataMigrator { broker: &ThunderBroker, ws_tx: Arc, Message>>>, params_json: &str, - ) { + ) -> Result<(), UserDataMigratorError> { let request_id = EndpointBrokerState::get_next_id(); let call_sign = "org.rdk.PersistentStore.1.".to_owned(); let thunder_request = json!({ @@ -295,10 +296,6 @@ impl UserDataMigrator { }) }) .to_string(); - println!( - "write_to_legacy_storage: thunder_request: {:?}", - thunder_request - ); // Register custom callback to handle the response broker .register_custom_callback( @@ -317,7 +314,7 @@ impl UserDataMigrator { ); // Unregister the custom callback and return broker.unregister_custom_callback(request_id).await; - return; + return Ok(()); } // Spawn a task to wait for the response as we don't want to block the main thread @@ -337,6 +334,7 @@ impl UserDataMigrator { } } }); + Ok(()) } async fn read_from_true_north_plugin( @@ -512,12 +510,89 @@ impl UserDataMigrator { let setter_rule = config_entry.setter_rule.clone(); // return rule if available else return error if let Some(rule) = setter_rule { - return Ok(rule); + Ok(rule) } else { - return Err(UserDataMigratorError::SetterRuleNotAvailable); + Err(UserDataMigratorError::SetterRuleNotAvailable) } } + async fn call_perform_getter_migration( + self: Arc, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) -> (bool, Option) { + // Clone the parameters to move into the spawned task + let broker_clone = broker.clone(); + let ws_tx_clone = ws_tx.clone(); + let request_clone = request.clone(); + let config_entry_clone = config_entry.clone(); + let self_clone = Arc::clone(&self); + + tokio::spawn(async move { + match self_clone + .perform_getter_migration( + &broker_clone, + ws_tx_clone.clone(), + &request_clone, + &config_entry_clone, + ) + .await + { + Ok((_, Some(mut output))) => { + let broker_clone = broker_clone.clone(); + output.data.id = Some(request_clone.rpc.ctx.call_id); + tokio::spawn(async move { + if let Err(e) = broker_clone + .get_default_callback() + .sender + .send(output) + .await + { + error!("Failed to send response: {:?}", e); + } + }); + } + Ok((_, None)) | Err(_) => { + // Handle the case where no output is returned + let value_from_plugin = self_clone + .read_from_true_north_plugin( + &broker_clone, + ws_tx_clone.clone(), + &request_clone, + ) + .await; + match value_from_plugin { + Ok(mut value_from_plugin) => { + let broker_clone = broker_clone.clone(); + value_from_plugin.data.id = Some(request_clone.rpc.ctx.call_id); + tokio::spawn(async move { + if let Err(e) = broker_clone + .get_default_callback() + .sender + .send(value_from_plugin) + .await + { + error!("Failed to send response: {:?}", e); + } + }); + } + Err(_e) => { + broker_clone + .get_default_callback() + .send_error(request_clone, RippleError::ProcessorError) + .await + } + } + } + } + }); + + // Always return (false, None) to free up the calling thread + (false, None) + } + async fn perform_getter_migration( &self, broker: &ThunderBroker, @@ -532,101 +607,198 @@ impl UserDataMigrator { broker, ws_tx.clone(), ) - .await; + .await?; + + let value_from_plugin = self + .read_from_true_north_plugin(broker, ws_tx.clone(), request) + .await?; + + let mut data = value_from_plugin.clone().data; + if let Some(filter) = request + .rule + .transform + .get_transform_data(RuleTransformType::Response) + { + endpoint_broker::apply_response(filter, &request.rule.alias, &mut data); + } - match legacy_value { - Ok(legacy_value) => { - let value_from_plugin = self - .read_from_true_north_plugin(broker, ws_tx.clone(), request) + if let Some(result) = data.clone().result { + if result != config_entry.default { + info!( + "perform_getter_migration: Plugin has non-default value. Updating legacy storage with new value: {:?}", + result + ); + self.update_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + &result.to_string(), + ) + .await?; + self.set_migration_status(&config_entry.namespace, &config_entry.key) .await; - match value_from_plugin { - Ok(value_from_plugin) => { - // apply the response transform rule if any - let mut data = value_from_plugin.clone().data; - if let Some(filter) = request - .rule - .transform - .get_transform_data(RuleTransformType::Response) - { - endpoint_broker::apply_response(filter, &request.rule.alias, &mut data); - } - if let Some(result) = data.clone().result { - // if the plugins has a non default value, assuming that it is holding the latest value - // update the legacy storage with the new value - if result != config_entry.default { - self.write_to_legacy_storage( - &config_entry.namespace, - &config_entry.key, - broker, - ws_tx.clone(), - &result.to_string(), - ) - .await; - self.set_migration_status( - &config_entry.namespace, - &config_entry.key, - ) - .await; - - // create broker output with the result - let response = BrokerOutput { data }; - return Ok((true, Some(response))); - } else { - // plugin has the default value, now check if the legacy storage has a non default value - // if so, update the plugin with the value from the legacy storage - if legacy_value != config_entry.default { - let response = self - .write_to_true_north_plugin( - broker, - ws_tx.clone(), - config_entry, - &legacy_value.to_string(), - ) - .await; - match response { - Ok(response) => { - self.set_migration_status( - &config_entry.namespace, - &config_entry.key, - ) - .await; - return Ok((true, Some(response))); - } - Err(e) => { - return Err(e); - } - } - } else { - // both the plugin and the legacy storage has the default value, no need to update - // continue with the original request + return Ok((true, Some(BrokerOutput { data }))); + } else if legacy_value != config_entry.default { + info!( + "perform_getter_migration: Plugin has default value and Legacy storage has the latest value. Updating plugin with value from legacy storage: {:?}", + legacy_value + ); + let response = self + .update_plugin_from_legacy( + broker, + ws_tx.clone(), + config_entry, + &legacy_value.to_string(), + ) + .await?; + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + return Ok((true, Some(response))); + } else { + info!( + "perform_getter_migration: Both plugin and legacy storage have default value. Continuing with the original request" + ); + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + return Ok((true, Some(BrokerOutput { data }))); + } + } else { + Err(UserDataMigratorError::ResponseError( + "No result in response".to_string(), + )) + } + } + + async fn update_legacy_storage( + &self, + namespace: &str, + key: &str, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + value: &str, + ) -> Result<(), UserDataMigratorError> { + self.write_to_legacy_storage(namespace, key, broker, ws_tx, value) + .await + } + + async fn update_plugin_from_legacy( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + value: &str, + ) -> Result { + self.write_to_true_north_plugin(broker, ws_tx, config_entry, value) + .await + } + + /* + async fn perform_getter_migration( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) -> Result<(bool, Option), UserDataMigratorError> { + let legacy_value = self + .read_from_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + ) + .await; + + match legacy_value { + Ok(legacy_value) => { + let value_from_plugin = self + .read_from_true_north_plugin(broker, ws_tx.clone(), request) + .await; + match value_from_plugin { + Ok(value_from_plugin) => { + // apply the response transform rule if any + let mut data = value_from_plugin.clone().data; + if let Some(filter) = request + .rule + .transform + .get_transform_data(RuleTransformType::Response) + { + endpoint_broker::apply_response(filter, &request.rule.alias, &mut data); + } + if let Some(result) = data.clone().result { + // if the plugins has a non default value, assuming that it is holding the latest value + // update the legacy storage with the new value + if result != config_entry.default { + self.write_to_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + &result.to_string(), + ) + .await; self.set_migration_status( &config_entry.namespace, &config_entry.key, ) .await; + // create broker output with the result let response = BrokerOutput { data }; - return Ok((true, Some(response))); + Ok((true, Some(response))) + } else { + // plugin has the default value, now check if the legacy storage has a non default value + // if so, update the plugin with the value from the legacy storage + if legacy_value != config_entry.default { + let response = self + .write_to_true_north_plugin( + broker, + ws_tx.clone(), + config_entry, + &legacy_value.to_string(), + ) + .await; + match response { + Ok(response) => { + self.set_migration_status( + &config_entry.namespace, + &config_entry.key, + ) + .await; + Ok((true, Some(response))) + } + Err(e) => Err(e), + } + } else { + // both the plugin and the legacy storage has the default value, no need to update + // continue with the original request + self.set_migration_status( + &config_entry.namespace, + &config_entry.key, + ) + .await; + // create broker output with the result + let response = BrokerOutput { data }; + Ok((true, Some(response))) + } } + } else { + Err(UserDataMigratorError::ResponseError( + "No result in response".to_string(), + )) } - } else { - return Err(UserDataMigratorError::ResponseError( - "No result in response".to_string(), - )); } - } - Err(e) => { - return Err(e); + Err(e) => Err(e), } } - } - Err(e) => { - // TBD : Add more detailed error code like no entry in legacy storage, etc - return Err(e); + Err(e) => { + // TBD : Add more detailed error code like no entry in legacy storage, etc + Err(e) + } } } - } - + */ // function to set the migration flag to true and update the migration map in the config file async fn set_migration_status(&self, namespace: &str, key: &str) { let mut config_entry_changed = false; diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index e84d0a009..2a1058f31 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -73,7 +73,7 @@ impl ThunderBroker { self } - fn get_default_callback(&self) -> BrokerCallback { + pub fn get_default_callback(&self) -> BrokerCallback { self.default_callback.clone() } @@ -172,9 +172,8 @@ impl ThunderBroker { // empty request means plugin is activated and ready to process the request // Intercept the request for data migration let mut request_consumed = false; - let mut response = None; if let Some(user_data_migrator) = broker_c.data_migrator.clone() { - (request_consumed, response) = user_data_migrator.intercept_broker_request(&broker_c, ws_tx_wrap.clone(), &mut request).await; + request_consumed = user_data_migrator.intercept_broker_request(&broker_c, ws_tx_wrap.clone(), &mut request).await; } // If the request is not consumed by the data migrator, continue with the request @@ -194,16 +193,6 @@ impl ThunderBroker { } } } - else { // Data migrator consumed the request - if let Some(response) = response { - let broker_clone = broker_c.clone(); - tokio::spawn(async move { - if let Err(e) = broker_clone.get_default_callback().sender.send(response).await { - error!("Failed to send response: {:?}", e); - } - }); - } - } } } Err(e) => { From fb34f405ef65f74ddd22b1fd7644cfb0f85bd969 Mon Sep 17 00:00:00 2001 From: VinodSathyaseelan Date: Thu, 17 Oct 2024 14:33:00 -0700 Subject: [PATCH 5/6] test: Cleanup and test completed for getter migration use cases --- core/main/src/broker/endpoint_broker.rs | 1 - core/main/src/broker/rules_engine.rs | 6 + .../src/broker/thunder/user_data_migrator.rs | 512 ++++++++---------- 3 files changed, 245 insertions(+), 274 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index e45fc2db2..76ca7191b 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -862,7 +862,6 @@ fn apply_filter(broker_request: &BrokerRequest, result: &Value, rpc_request: &Rp &filter, format!("{}_event filter", rpc_request.ctx.method), ) { - println!("apply_filter: {:?}", r); if r.is_null() { return false; } else { diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index 2be82dca3..ca82c2f21 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -89,15 +89,21 @@ pub struct Rule { // Not every rule needs transform #[serde(default)] pub transform: RuleTransform, + #[serde(skip_serializing_if = "Option::is_none")] pub filter: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub endpoint: Option, } #[derive(Debug, Clone, Deserialize, Default, Serialize)] pub struct RuleTransform { + #[serde(skip_serializing_if = "Option::is_none")] pub request: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub response: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub event: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub event_decorator_method: Option, } diff --git a/core/main/src/broker/thunder/user_data_migrator.rs b/core/main/src/broker/thunder/user_data_migrator.rs index 3468ad070..629fa8677 100644 --- a/core/main/src/broker/thunder/user_data_migrator.rs +++ b/core/main/src/broker/thunder/user_data_migrator.rs @@ -15,24 +15,26 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::collections::HashMap; -use std::fmt; -use std::fs::{File, OpenOptions}; -use std::path::Path; -use std::sync::Arc; - -use openrpc_validator::jsonschema::output; -use ripple_sdk::tokio::net::TcpStream; -use ripple_sdk::utils::error::RippleError; +use std::{ + collections::HashMap, + fmt, + fs::{File, OpenOptions}, + path::Path, + sync::Arc, +}; + use ripple_sdk::{ log::{debug, error, info}, tokio::{ self, + net::TcpStream, sync::mpsc::{self, Receiver, Sender}, sync::Mutex, time::{timeout, Duration}, }, + utils::error::RippleError, }; + use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -46,6 +48,9 @@ use futures_util::SinkExt; use crate::broker::thunder_broker::ThunderBroker; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; + +// TBD get the storage dir from manifest or other Ripple config file +const RIPPLE_STORAGE_DIR: &str = "/opt/persistent/ripple"; const USER_DATA_MIGRATION_CONFIG_FILE_NAME: &str = "user_data_migration_config.json"; #[derive(Debug)] @@ -75,19 +80,26 @@ impl fmt::Display for UserDataMigratorError { } } impl std::error::Error for UserDataMigratorError {} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoversionRule { + conversion_rule: String, +} #[derive(Clone, Debug, Deserialize, Serialize)] pub struct MigrationConfigEntry { namespace: String, key: String, default: Value, getter: String, - setter_rule: Option, setter: String, + #[serde(skip_serializing_if = "Option::is_none")] + setter_rule: Option, + #[serde(skip_serializing_if = "Option::is_none")] + legacy_to_plugin_value_conversion: Option, migrated: bool, } type MigrationMap = HashMap; -// This struct is responsible for migrating user data from the legacy storage to the new storage. #[derive(Clone, Debug)] pub struct UserDataMigrator { migration_config: Arc>, // persistent migration map @@ -101,8 +113,8 @@ impl UserDataMigrator { let possible_config_file_paths = vec![ format!("/etc/{}", USER_DATA_MIGRATION_CONFIG_FILE_NAME), format!( - "/opt/persistent/ripple/{}", - USER_DATA_MIGRATION_CONFIG_FILE_NAME + "{}/{}", + RIPPLE_STORAGE_DIR, USER_DATA_MIGRATION_CONFIG_FILE_NAME ), format!("./{}", USER_DATA_MIGRATION_CONFIG_FILE_NAME), ]; @@ -136,7 +148,7 @@ impl UserDataMigrator { .cloned() } - // function to intercept and handle broker request. Perform migration if needed + /// function to intercept and handle broker request. Perform migration if needed pub async fn intercept_broker_request( &self, broker: &ThunderBroker, @@ -144,68 +156,96 @@ impl UserDataMigrator { request: &mut BrokerRequest, ) -> bool { let method = request.rpc.method.clone(); + info!( + "intercept_broker_request: Intercepting broker request for method: {:?}", + method + ); if let Some(config_entry) = self.get_matching_migration_entry_by_method(&method).await { - // migration entry found for either getter or setter method - // for setter case, irrespective of the migration status, update the new value in the new storage and sync - // with the legacy storage - if config_entry.setter == method { - // perform the setter update and sync up logic asynchronously - // update legacy storage with the new value as fire and forget operation - self.set_migration_status(&config_entry.namespace, &config_entry.key) + return self + .handle_setter_request(broker, ws_tx, request, &config_entry) .await; + } else { + return self + .handle_getter_request(broker, ws_tx, request, &config_entry) + .await; + } + } - let mut params = Value::Null; + // Continue with the original request if no migration entry is found + false + } - // Get the params from the request as is. No need to transform the params for legacy storage - if let Ok(mut extract) = - serde_json::from_str::>(&request.rpc.params_json) - { - // Get the param to use in write to legacy storage - if let Some(last) = extract.pop() { - // Extract the value field from the last - params = last.get("value").cloned().unwrap_or(last); - } - } + async fn handle_setter_request( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) -> bool { + info!( + "intercept_broker_request: Handling setter request for method: {:?}", + config_entry.setter + ); + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; - info!( - "intercept_broker_request: Updating legacy storage with new value: {:?}", - params - ); - let _ = self - .write_to_legacy_storage( - &config_entry.namespace, - &config_entry.key, - &broker, - ws_tx.clone(), - params.to_string().as_str(), - ) - .await; - // returning false to continue with the original setter request - return false; - } else { - // perform the getter migration logic asynchronously - if !config_entry.migrated { - let self_arc = Arc::new(self.clone()); - let _ = self_arc - .call_perform_getter_migration( - &broker, - ws_tx.clone(), - &request, - &config_entry, - ) - .await; - return true; - } else { - // the migration is already done, continue with the original request - return false; - } - } + let params = self.extract_params(&request.rpc.params_json); + info!( + "intercept_broker_request: Updating legacy storage with new value: {:?}", + params + ); + + let _ = self + .write_to_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + ¶ms, + ) + .await; + + // Return false to continue with the original setter request + false + } + + async fn handle_getter_request( + &self, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + request: &BrokerRequest, + config_entry: &MigrationConfigEntry, + ) -> bool { + info!( + "intercept_broker_request: Handling getter request for method: {:?}", + config_entry.getter + ); + if !config_entry.migrated { + let self_arc = Arc::new(self.clone()); + self_arc + .invoke_perform_getter_migration(broker, ws_tx.clone(), request, config_entry) + .await; + return true; } - // continue with the original request + + // The migration already done, continue with the original request + info!( + "intercept_broker_request: Migration already done for method: {:?}", + config_entry.getter + ); false } + fn extract_params(&self, params_json: &str) -> Value { + if let Ok(mut extract) = serde_json::from_str::>(params_json) { + if let Some(last) = extract.pop() { + return last.get("value").cloned().unwrap_or(last); + } + } + Value::Null + } + async fn read_from_legacy_storage( &self, namespace: &str, @@ -215,6 +255,18 @@ impl UserDataMigrator { ) -> Result { let request_id = EndpointBrokerState::get_next_id(); let call_sign = "org.rdk.PersistentStore.1.".to_owned(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // create the request to the legacy storage let thunder_request = json!({ "jsonrpc": "2.0", "id": request_id, @@ -227,15 +279,10 @@ impl UserDataMigrator { }) .to_string(); - // Register custom callback to handle the response - broker - .register_custom_callback( - request_id, - BrokerCallback { - sender: self.response_tx.clone(), - }, - ) - .await; + info!( + "read_from_legacy_storage: Sending request to plugin: {:?}", + thunder_request + ); // send the request to the legacy storage if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { @@ -247,13 +294,10 @@ impl UserDataMigrator { broker.unregister_custom_callback(request_id).await; return Err(e); } - // get the response from the custom callback - let response_rx = self.response_rx.clone(); - let broker_clone = broker.clone(); // get the response and check if the response is successful by checking result or error field. // Value::Null is a valid response, return Err if the response is not successful let response = - UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await; + Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await; match response { Ok(response) => { if let Some(result) = response.data.result { @@ -280,10 +324,22 @@ impl UserDataMigrator { key: &str, broker: &ThunderBroker, ws_tx: Arc, Message>>>, - params_json: &str, + params_json: &Value, ) -> Result<(), UserDataMigratorError> { let request_id = EndpointBrokerState::get_next_id(); let call_sign = "org.rdk.PersistentStore.1.".to_owned(); + + // Register custom callback to handle the response + broker + .register_custom_callback( + request_id, + BrokerCallback { + sender: self.response_tx.clone(), + }, + ) + .await; + + // create the request to the legacy storage let thunder_request = json!({ "jsonrpc": "2.0", "id": request_id, @@ -296,15 +352,10 @@ impl UserDataMigrator { }) }) .to_string(); - // Register custom callback to handle the response - broker - .register_custom_callback( - request_id, - BrokerCallback { - sender: self.response_tx.clone(), - }, - ) - .await; + info!( + "write_to_legacy_storage: Sending request to plugin: {:?}", + thunder_request + ); // send the request to the legacy storage if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { @@ -337,20 +388,13 @@ impl UserDataMigrator { Ok(()) } - async fn read_from_true_north_plugin( + async fn read_from_thunder_plugin( &self, broker: &ThunderBroker, ws_tx: Arc, Message>>>, request: &BrokerRequest, ) -> Result { - // no params for the getter function let request_id = EndpointBrokerState::get_next_id(); - let thunder_plugin_request = json!({ - "jsonrpc": "2.0", - "id": request_id, - "method": request.rule.alias, - }) - .to_string(); // Register custom callback to handle the response broker @@ -362,6 +406,21 @@ impl UserDataMigrator { ) .await; + // Create the request to the new plugin + // The current implementation assumes no params for the getter function + // extend the migration configuration to support params if needed + let thunder_plugin_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": request.rule.alias, + }) + .to_string(); + + info!( + "read_from_thunder_plugin: Sending request to plugin: {:?}", + thunder_plugin_request + ); + // send the request to the new pluin as thunder request if let Err(e) = self .send_thunder_request(&ws_tx, &thunder_plugin_request) @@ -376,14 +435,11 @@ impl UserDataMigrator { } // get the response from the custom callback - let response_rx = self.response_rx.clone(); - let broker_clone = broker.clone(); - - UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await + Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await } fn transform_requets_params( - params_json: &str, + params_json: &Value, rule: &Rule, method: &str, ) -> Result { @@ -403,12 +459,13 @@ impl UserDataMigrator { } Ok(serde_json::to_value(&data).unwrap()) } - async fn write_to_true_north_plugin( + + async fn write_to_thunder_plugin( &self, broker: &ThunderBroker, ws_tx: Arc, Message>>>, config_entry: &MigrationConfigEntry, - params_json: &str, // param from the legacy storage + params_json: &Value, // param from the legacy storage ) -> Result { // get the setter rule from the rule engine by giving the setter method name let setter_rule = Self::retrive_setter_rule_from_rule_engine(config_entry)?; @@ -422,15 +479,8 @@ impl UserDataMigrator { return Err(UserDataMigratorError::RequestTransformError(e.to_string())); } }; - // create the request to the new plugin + let request_id = EndpointBrokerState::get_next_id(); - let thunder_plugin_request = json!({ - "jsonrpc": "2.0", - "id": request_id, - "method": setter_rule.alias, - "params": transformed_params, - }) - .to_string(); // Register custom callback to handle the response broker @@ -442,13 +492,27 @@ impl UserDataMigrator { ) .await; + // create the request to the new plugin + let thunder_plugin_request = json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": setter_rule.alias, + "params": transformed_params, + }) + .to_string(); + + info!( + "write_to_thunder_plugin: Sending request to plugin: {:?}", + thunder_plugin_request + ); + // send the request to the new plugin as thunder request if let Err(e) = self .send_thunder_request(&ws_tx, &thunder_plugin_request) .await { error!( - "write_to_true_north_plugin: Failed to send thunder request: {:?}", + "write_to_thunder_plugin: Failed to send thunder request: {:?}", e ); broker.unregister_custom_callback(request_id).await; @@ -456,10 +520,7 @@ impl UserDataMigrator { } // get the response from the custom callback - let response_rx = self.response_rx.clone(); - let broker_clone = broker.clone(); - - UserDataMigrator::wait_for_response(response_rx, broker_clone, request_id).await + Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await } async fn send_thunder_request( @@ -516,13 +577,13 @@ impl UserDataMigrator { } } - async fn call_perform_getter_migration( + async fn invoke_perform_getter_migration( self: Arc, broker: &ThunderBroker, ws_tx: Arc, Message>>>, request: &BrokerRequest, config_entry: &MigrationConfigEntry, - ) -> (bool, Option) { + ) { // Clone the parameters to move into the spawned task let broker_clone = broker.clone(); let ws_tx_clone = ws_tx.clone(); @@ -541,42 +602,40 @@ impl UserDataMigrator { .await { Ok((_, Some(mut output))) => { - let broker_clone = broker_clone.clone(); + // Handle the case where output is returned. Send the response to the default callback of the broker. + // The response structure will be upated with the originial request id for the callback handler to match the response. output.data.id = Some(request_clone.rpc.ctx.call_id); - tokio::spawn(async move { - if let Err(e) = broker_clone - .get_default_callback() - .sender - .send(output) - .await - { - error!("Failed to send response: {:?}", e); - } - }); + if let Err(e) = broker_clone + .get_default_callback() + .sender + .send(output) + .await + { + error!("Failed to send response: {:?}", e); + } } Ok((_, None)) | Err(_) => { - // Handle the case where no output is returned - let value_from_plugin = self_clone - .read_from_true_north_plugin( + // Handle the case where no output is returned. Read the value from the plugin and send the response + // The response will be sent to the default callback of the broker. + // The response structure will be upated with the originial request id for the callback handler to match the response. + let value_from_thunder_plugin = self_clone + .read_from_thunder_plugin( &broker_clone, ws_tx_clone.clone(), &request_clone, ) .await; - match value_from_plugin { - Ok(mut value_from_plugin) => { - let broker_clone = broker_clone.clone(); - value_from_plugin.data.id = Some(request_clone.rpc.ctx.call_id); - tokio::spawn(async move { - if let Err(e) = broker_clone - .get_default_callback() - .sender - .send(value_from_plugin) - .await - { - error!("Failed to send response: {:?}", e); - } - }); + match value_from_thunder_plugin { + Ok(mut value_from_thunder_plugin) => { + value_from_thunder_plugin.data.id = Some(request_clone.rpc.ctx.call_id); + if let Err(e) = broker_clone + .get_default_callback() + .sender + .send(value_from_thunder_plugin) + .await + { + error!("Failed to send response: {:?}", e); + } } Err(_e) => { broker_clone @@ -588,9 +647,6 @@ impl UserDataMigrator { } } }); - - // Always return (false, None) to free up the calling thread - (false, None) } async fn perform_getter_migration( @@ -608,12 +664,16 @@ impl UserDataMigrator { ws_tx.clone(), ) .await?; - - let value_from_plugin = self - .read_from_true_north_plugin(broker, ws_tx.clone(), request) + info!( + "perform_getter_migration: Read from legacy storage: {:?}", + legacy_value + ); + let value_from_thunder_plugin = self + .read_from_thunder_plugin(broker, ws_tx.clone(), request) .await?; - let mut data = value_from_plugin.clone().data; + // apply the response transform to the data from the plugin + let mut data = value_from_thunder_plugin.clone().data; if let Some(filter) = request .rule .transform @@ -633,31 +693,43 @@ impl UserDataMigrator { &config_entry.key, broker, ws_tx.clone(), - &result.to_string(), + &result, ) .await?; self.set_migration_status(&config_entry.namespace, &config_entry.key) .await; - return Ok((true, Some(BrokerOutput { data }))); + // this BrokerOutput has the data from the plugin + Ok((true, Some(BrokerOutput { data }))) } else if legacy_value != config_entry.default { info!( "perform_getter_migration: Plugin has default value and Legacy storage has the latest value. Updating plugin with value from legacy storage: {:?}", legacy_value ); - let response = self - .update_plugin_from_legacy( - broker, - ws_tx.clone(), - config_entry, - &legacy_value.to_string(), - ) + let mut response = self + .update_plugin_from_legacy(broker, ws_tx.clone(), config_entry, &legacy_value) .await?; self.set_migration_status(&config_entry.namespace, &config_entry.key) .await; + + // this BrokerOutput has the data from the legacy storage. + // prepare the response to send back to the caller as response from plugin but with the data from the legacy storage + response.data.result = Some(legacy_value.clone()); + // check if there is any value conversion rule available + if let Some(conversion_rule) = &config_entry.legacy_to_plugin_value_conversion { + // apply the conversion rule to the data from the legacy storage + let data = crate::broker::rules_engine::jq_compile( + json!({ "value": legacy_value }), + &conversion_rule.conversion_rule, + "legacy_to_plugin_value_conversion".to_string(), + ); + if let Ok(data) = data { + response.data.result = Some(data); + } + } return Ok((true, Some(response))); } else { info!( - "perform_getter_migration: Both plugin and legacy storage have default value. Continuing with the original request" + "perform_getter_migration: Both plugin and legacy storage have default value. No migration needed." ); self.set_migration_status(&config_entry.namespace, &config_entry.key) .await; @@ -676,7 +748,7 @@ impl UserDataMigrator { key: &str, broker: &ThunderBroker, ws_tx: Arc, Message>>>, - value: &str, + value: &Value, ) -> Result<(), UserDataMigratorError> { self.write_to_legacy_storage(namespace, key, broker, ws_tx, value) .await @@ -687,118 +759,12 @@ impl UserDataMigrator { broker: &ThunderBroker, ws_tx: Arc, Message>>>, config_entry: &MigrationConfigEntry, - value: &str, + value: &Value, ) -> Result { - self.write_to_true_north_plugin(broker, ws_tx, config_entry, value) + self.write_to_thunder_plugin(broker, ws_tx, config_entry, value) .await } - /* - async fn perform_getter_migration( - &self, - broker: &ThunderBroker, - ws_tx: Arc, Message>>>, - request: &BrokerRequest, - config_entry: &MigrationConfigEntry, - ) -> Result<(bool, Option), UserDataMigratorError> { - let legacy_value = self - .read_from_legacy_storage( - &config_entry.namespace, - &config_entry.key, - broker, - ws_tx.clone(), - ) - .await; - - match legacy_value { - Ok(legacy_value) => { - let value_from_plugin = self - .read_from_true_north_plugin(broker, ws_tx.clone(), request) - .await; - match value_from_plugin { - Ok(value_from_plugin) => { - // apply the response transform rule if any - let mut data = value_from_plugin.clone().data; - if let Some(filter) = request - .rule - .transform - .get_transform_data(RuleTransformType::Response) - { - endpoint_broker::apply_response(filter, &request.rule.alias, &mut data); - } - if let Some(result) = data.clone().result { - // if the plugins has a non default value, assuming that it is holding the latest value - // update the legacy storage with the new value - if result != config_entry.default { - self.write_to_legacy_storage( - &config_entry.namespace, - &config_entry.key, - broker, - ws_tx.clone(), - &result.to_string(), - ) - .await; - self.set_migration_status( - &config_entry.namespace, - &config_entry.key, - ) - .await; - - // create broker output with the result - let response = BrokerOutput { data }; - Ok((true, Some(response))) - } else { - // plugin has the default value, now check if the legacy storage has a non default value - // if so, update the plugin with the value from the legacy storage - if legacy_value != config_entry.default { - let response = self - .write_to_true_north_plugin( - broker, - ws_tx.clone(), - config_entry, - &legacy_value.to_string(), - ) - .await; - match response { - Ok(response) => { - self.set_migration_status( - &config_entry.namespace, - &config_entry.key, - ) - .await; - Ok((true, Some(response))) - } - Err(e) => Err(e), - } - } else { - // both the plugin and the legacy storage has the default value, no need to update - // continue with the original request - self.set_migration_status( - &config_entry.namespace, - &config_entry.key, - ) - .await; - // create broker output with the result - let response = BrokerOutput { data }; - Ok((true, Some(response))) - } - } - } else { - Err(UserDataMigratorError::ResponseError( - "No result in response".to_string(), - )) - } - } - Err(e) => Err(e), - } - } - Err(e) => { - // TBD : Add more detailed error code like no entry in legacy storage, etc - Err(e) - } - } - } - */ // function to set the migration flag to true and update the migration map in the config file async fn set_migration_status(&self, namespace: &str, key: &str) { let mut config_entry_changed = false; From 5da2381802b83c724f307ec8f9891bfd461114c0 Mon Sep 17 00:00:00 2001 From: VinodSathyaseelan Date: Sat, 19 Oct 2024 00:17:41 -0700 Subject: [PATCH 6/6] fix: Validated some corner cases in getter migration use case. Code cleanup. --- .../src/broker/thunder/user_data_migrator.rs | 287 ++++++++++++------ 1 file changed, 194 insertions(+), 93 deletions(-) diff --git a/core/main/src/broker/thunder/user_data_migrator.rs b/core/main/src/broker/thunder/user_data_migrator.rs index 629fa8677..89804d813 100644 --- a/core/main/src/broker/thunder/user_data_migrator.rs +++ b/core/main/src/broker/thunder/user_data_migrator.rs @@ -24,12 +24,15 @@ use std::{ }; use ripple_sdk::{ + api::{device::device_peristence::StorageData, gateway::rpc_gateway_api::JsonRpcApiResponse}, log::{debug, error, info}, tokio::{ self, net::TcpStream, - sync::mpsc::{self, Receiver, Sender}, - sync::Mutex, + sync::{ + mpsc::{self, Receiver, Sender}, + Mutex, + }, time::{timeout, Duration}, }, utils::error::RippleError, @@ -172,6 +175,10 @@ impl UserDataMigrator { } } + info!( + "intercept_broker_request: No migration entry found for method: {:?}", + method + ); // Continue with the original request if no migration entry is found false } @@ -280,42 +287,56 @@ impl UserDataMigrator { .to_string(); info!( - "read_from_legacy_storage: Sending request to plugin: {:?}", + "read_from_legacy_storage: Sending request : {:?}", thunder_request ); // send the request to the legacy storage if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { - error!( - "read_from_legacy_storage: Failed to send thunder request: {:?}", - e - ); - // Unregister the custom callback and return + error!("read_from_legacy_storage: Failed to send request: {:?}", e); broker.unregister_custom_callback(request_id).await; return Err(e); } - // get the response and check if the response is successful by checking result or error field. - // Value::Null is a valid response, return Err if the response is not successful + let response = Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await; - match response { - Ok(response) => { - if let Some(result) = response.data.result { - // extract the value field from the result - if let Some(value) = result.get("value") { - return Ok(value.clone()); - } - Err(UserDataMigratorError::ResponseError( - "No value field in response".to_string(), - )) - } else { - Err(UserDataMigratorError::ResponseError( - "No result in response".to_string(), - )) - } - } - Err(e) => Err(e), - } + self.process_response_from_legacy_storage(response) + } + + fn process_response_from_legacy_storage( + &self, + response: Result, + ) -> Result { + info!("process_response_from_legacy_storage: Processing response"); + let response = response.map_err(|e| { + error!("Failed to get response: {}", e); + UserDataMigratorError::ResponseError(e.to_string()) + })?; + + let result = response.data.result.ok_or_else(|| { + UserDataMigratorError::ResponseError("No result field in response".to_string()) + })?; + + let value = result.get("value").ok_or_else(|| { + UserDataMigratorError::ResponseError("No value field in response".to_string()) + })?; + + let value_str = value.as_str().ok_or_else(|| { + UserDataMigratorError::ResponseError("Value is not a string".to_string()) + })?; + + let storage_data: StorageData = serde_json::from_str(value_str).map_err(|_e| { + UserDataMigratorError::ResponseError("Failed to deserialize JSON".to_string()) + })?; + + let final_value = storage_data.value; + + info!( + "process_response_from_legacy_storage: Successfully read from legacy storage: {:?}", + final_value + ); + + Ok(final_value.clone()) } async fn write_to_legacy_storage( @@ -339,6 +360,8 @@ impl UserDataMigrator { ) .await; + // set storage data in the format required by the legacy storage + let data = StorageData::new(params_json.clone()); // create the request to the legacy storage let thunder_request = json!({ "jsonrpc": "2.0", @@ -347,22 +370,19 @@ impl UserDataMigrator { "params": json!({ "namespace": namespace, "key": key, - "value": params_json, + "value": data, "scope": "device", }) }) .to_string(); info!( - "write_to_legacy_storage: Sending request to plugin: {:?}", + "write_to_legacy_storage: Sending request : {:?}", thunder_request ); // send the request to the legacy storage if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { - error!( - "write_to_legacy_storage: Failed to send thunder request: {:?}", - e - ); + error!("write_to_legacy_storage: Failed to send request: {:?}", e); // Unregister the custom callback and return broker.unregister_custom_callback(request_id).await; return Ok(()); @@ -457,7 +477,13 @@ impl UserDataMigrator { format!("{}_request", method), ); } - Ok(serde_json::to_value(&data).unwrap()) + serde_json::to_value(&data).map_err(|e| { + error!( + "Failed to serialize data in transform_requets_params: {}", + e + ); + RippleError::BrokerError(e.to_string()) + }) } async fn write_to_thunder_plugin( @@ -519,7 +545,7 @@ impl UserDataMigrator { return Err(e); } - // get the response from the custom callback + // get the response from the custom callback, unregister the callback and return the response Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await } @@ -553,10 +579,12 @@ impl UserDataMigrator { } Ok(None) => { error!("No response received at custom write_to_legacy_storage"); + broker.unregister_custom_callback(request_id).await; return Err(UserDataMigratorError::TimeoutError); } Err(_) => { error!("Error receiving response at custom write_to_legacy_storage"); + broker.unregister_custom_callback(request_id).await; return Err(UserDataMigratorError::TimeoutError); } }; @@ -668,78 +696,151 @@ impl UserDataMigrator { "perform_getter_migration: Read from legacy storage: {:?}", legacy_value ); - let value_from_thunder_plugin = self + + let output_from_thunder_plugin = self .read_from_thunder_plugin(broker, ws_tx.clone(), request) .await?; - // apply the response transform to the data from the plugin - let mut data = value_from_thunder_plugin.clone().data; + let mut response = output_from_thunder_plugin.clone().data; + let data_for_callback = response.clone(); + if let Some(filter) = request .rule .transform .get_transform_data(RuleTransformType::Response) { - endpoint_broker::apply_response(filter, &request.rule.alias, &mut data); + endpoint_broker::apply_response(filter, &request.rule.alias, &mut response); } - if let Some(result) = data.clone().result { - if result != config_entry.default { - info!( - "perform_getter_migration: Plugin has non-default value. Updating legacy storage with new value: {:?}", - result - ); - self.update_legacy_storage( - &config_entry.namespace, - &config_entry.key, + if let Some(result) = response.result { + // legacy storage has some value. It needs to be migrated to the plugin in case plugin has default value + return self + .check_migration_cases( + result, + legacy_value, broker, - ws_tx.clone(), - &result, + ws_tx, + config_entry, + data_for_callback, ) - .await?; - self.set_migration_status(&config_entry.namespace, &config_entry.key) - .await; - // this BrokerOutput has the data from the plugin - Ok((true, Some(BrokerOutput { data }))) - } else if legacy_value != config_entry.default { - info!( - "perform_getter_migration: Plugin has default value and Legacy storage has the latest value. Updating plugin with value from legacy storage: {:?}", - legacy_value - ); - let mut response = self - .update_plugin_from_legacy(broker, ws_tx.clone(), config_entry, &legacy_value) - .await?; - self.set_migration_status(&config_entry.namespace, &config_entry.key) - .await; + .await; + } - // this BrokerOutput has the data from the legacy storage. - // prepare the response to send back to the caller as response from plugin but with the data from the legacy storage - response.data.result = Some(legacy_value.clone()); - // check if there is any value conversion rule available - if let Some(conversion_rule) = &config_entry.legacy_to_plugin_value_conversion { - // apply the conversion rule to the data from the legacy storage - let data = crate::broker::rules_engine::jq_compile( - json!({ "value": legacy_value }), - &conversion_rule.conversion_rule, - "legacy_to_plugin_value_conversion".to_string(), - ); - if let Ok(data) = data { - response.data.result = Some(data); - } - } - return Ok((true, Some(response))); - } else { - info!( - "perform_getter_migration: Both plugin and legacy storage have default value. No migration needed." - ); - self.set_migration_status(&config_entry.namespace, &config_entry.key) - .await; - return Ok((true, Some(BrokerOutput { data }))); + Err(UserDataMigratorError::ResponseError( + "No data collected from Legacy Storage".to_string(), + )) + } + + async fn check_migration_cases( + &self, + result: Value, + legacy_value: Value, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + data_for_callback: JsonRpcApiResponse, + ) -> Result<(bool, Option), UserDataMigratorError> { + info!( + "perform_getter_migration: Checking migration cases. Legacy value: {:?}, Plugin value: {:?}, Config default: {:?}", + legacy_value, result, config_entry.default + ); + if result != config_entry.default { + // Case 1: Plugin has non-default value. Updating legacy storage with new value + return self + .handle_non_default_plugin_value( + result, + legacy_value, + broker, + ws_tx, + config_entry, + data_for_callback, + ) + .await; + } + + if legacy_value != config_entry.default { + // Case 2: Plugin has default value and Legacy storage has the latest value + return self + .handle_default_plugin_value(legacy_value, broker, ws_tx, config_entry) + .await; + } + + // Case 3: Both plugin and legacy storage have default value + info!( + "perform_getter_migration: Both plugin and legacy storage have default value. No migration needed." + ); + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + Ok(( + true, + Some(BrokerOutput { + data: data_for_callback, + }), + )) + } + + async fn handle_non_default_plugin_value( + &self, + result: Value, + legacy_value: Value, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + data_for_callback: JsonRpcApiResponse, + ) -> Result<(bool, Option), UserDataMigratorError> { + info!( + "perform_getter_migration: Plugin has non-default value. Updating legacy storage with new value: {:?}", + result + ); + if result != legacy_value { + self.update_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + &result, + ) + .await?; + } + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + Ok(( + true, + Some(BrokerOutput { + data: data_for_callback, + }), + )) + } + + async fn handle_default_plugin_value( + &self, + legacy_value: Value, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + ) -> Result<(bool, Option), UserDataMigratorError> { + info!( + "perform_getter_migration: Plugin has default value and Legacy storage has the latest value. Updating plugin with value from legacy storage: {:?}", + legacy_value + ); + let mut response = self + .update_plugin_from_legacy(broker, ws_tx.clone(), config_entry, &legacy_value) + .await?; + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + + response.data.result = Some(legacy_value.clone()); + if let Some(conversion_rule) = &config_entry.legacy_to_plugin_value_conversion { + let data = crate::broker::rules_engine::jq_compile( + json!({ "value": legacy_value }), + &conversion_rule.conversion_rule, + "legacy_to_plugin_value_conversion".to_string(), + ); + if let Ok(data) = data { + response.data.result = Some(data); } - } else { - Err(UserDataMigratorError::ResponseError( - "No result in response".to_string(), - )) } + Ok((true, Some(response))) } async fn update_legacy_storage(