From 5da2381802b83c724f307ec8f9891bfd461114c0 Mon Sep 17 00:00:00 2001 From: VinodSathyaseelan Date: Sat, 19 Oct 2024 00:17:41 -0700 Subject: [PATCH] fix: Validated some corner cases in getter migration use case. Code cleanup. --- .../src/broker/thunder/user_data_migrator.rs | 287 ++++++++++++------ 1 file changed, 194 insertions(+), 93 deletions(-) diff --git a/core/main/src/broker/thunder/user_data_migrator.rs b/core/main/src/broker/thunder/user_data_migrator.rs index 629fa8677..89804d813 100644 --- a/core/main/src/broker/thunder/user_data_migrator.rs +++ b/core/main/src/broker/thunder/user_data_migrator.rs @@ -24,12 +24,15 @@ use std::{ }; use ripple_sdk::{ + api::{device::device_peristence::StorageData, gateway::rpc_gateway_api::JsonRpcApiResponse}, log::{debug, error, info}, tokio::{ self, net::TcpStream, - sync::mpsc::{self, Receiver, Sender}, - sync::Mutex, + sync::{ + mpsc::{self, Receiver, Sender}, + Mutex, + }, time::{timeout, Duration}, }, utils::error::RippleError, @@ -172,6 +175,10 @@ impl UserDataMigrator { } } + info!( + "intercept_broker_request: No migration entry found for method: {:?}", + method + ); // Continue with the original request if no migration entry is found false } @@ -280,42 +287,56 @@ impl UserDataMigrator { .to_string(); info!( - "read_from_legacy_storage: Sending request to plugin: {:?}", + "read_from_legacy_storage: Sending request : {:?}", thunder_request ); // send the request to the legacy storage if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { - error!( - "read_from_legacy_storage: Failed to send thunder request: {:?}", - e - ); - // Unregister the custom callback and return + error!("read_from_legacy_storage: Failed to send request: {:?}", e); broker.unregister_custom_callback(request_id).await; return Err(e); } - // get the response and check if the response is successful by checking result or error field. - // Value::Null is a valid response, return Err if the response is not successful + let response = Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await; - match response { - Ok(response) => { - if let Some(result) = response.data.result { - // extract the value field from the result - if let Some(value) = result.get("value") { - return Ok(value.clone()); - } - Err(UserDataMigratorError::ResponseError( - "No value field in response".to_string(), - )) - } else { - Err(UserDataMigratorError::ResponseError( - "No result in response".to_string(), - )) - } - } - Err(e) => Err(e), - } + self.process_response_from_legacy_storage(response) + } + + fn process_response_from_legacy_storage( + &self, + response: Result, + ) -> Result { + info!("process_response_from_legacy_storage: Processing response"); + let response = response.map_err(|e| { + error!("Failed to get response: {}", e); + UserDataMigratorError::ResponseError(e.to_string()) + })?; + + let result = response.data.result.ok_or_else(|| { + UserDataMigratorError::ResponseError("No result field in response".to_string()) + })?; + + let value = result.get("value").ok_or_else(|| { + UserDataMigratorError::ResponseError("No value field in response".to_string()) + })?; + + let value_str = value.as_str().ok_or_else(|| { + UserDataMigratorError::ResponseError("Value is not a string".to_string()) + })?; + + let storage_data: StorageData = serde_json::from_str(value_str).map_err(|_e| { + UserDataMigratorError::ResponseError("Failed to deserialize JSON".to_string()) + })?; + + let final_value = storage_data.value; + + info!( + "process_response_from_legacy_storage: Successfully read from legacy storage: {:?}", + final_value + ); + + Ok(final_value.clone()) } async fn write_to_legacy_storage( @@ -339,6 +360,8 @@ impl UserDataMigrator { ) .await; + // set storage data in the format required by the legacy storage + let data = StorageData::new(params_json.clone()); // create the request to the legacy storage let thunder_request = json!({ "jsonrpc": "2.0", @@ -347,22 +370,19 @@ impl UserDataMigrator { "params": json!({ "namespace": namespace, "key": key, - "value": params_json, + "value": data, "scope": "device", }) }) .to_string(); info!( - "write_to_legacy_storage: Sending request to plugin: {:?}", + "write_to_legacy_storage: Sending request : {:?}", thunder_request ); // send the request to the legacy storage if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await { - error!( - "write_to_legacy_storage: Failed to send thunder request: {:?}", - e - ); + error!("write_to_legacy_storage: Failed to send request: {:?}", e); // Unregister the custom callback and return broker.unregister_custom_callback(request_id).await; return Ok(()); @@ -457,7 +477,13 @@ impl UserDataMigrator { format!("{}_request", method), ); } - Ok(serde_json::to_value(&data).unwrap()) + serde_json::to_value(&data).map_err(|e| { + error!( + "Failed to serialize data in transform_requets_params: {}", + e + ); + RippleError::BrokerError(e.to_string()) + }) } async fn write_to_thunder_plugin( @@ -519,7 +545,7 @@ impl UserDataMigrator { return Err(e); } - // get the response from the custom callback + // get the response from the custom callback, unregister the callback and return the response Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await } @@ -553,10 +579,12 @@ impl UserDataMigrator { } Ok(None) => { error!("No response received at custom write_to_legacy_storage"); + broker.unregister_custom_callback(request_id).await; return Err(UserDataMigratorError::TimeoutError); } Err(_) => { error!("Error receiving response at custom write_to_legacy_storage"); + broker.unregister_custom_callback(request_id).await; return Err(UserDataMigratorError::TimeoutError); } }; @@ -668,78 +696,151 @@ impl UserDataMigrator { "perform_getter_migration: Read from legacy storage: {:?}", legacy_value ); - let value_from_thunder_plugin = self + + let output_from_thunder_plugin = self .read_from_thunder_plugin(broker, ws_tx.clone(), request) .await?; - // apply the response transform to the data from the plugin - let mut data = value_from_thunder_plugin.clone().data; + let mut response = output_from_thunder_plugin.clone().data; + let data_for_callback = response.clone(); + if let Some(filter) = request .rule .transform .get_transform_data(RuleTransformType::Response) { - endpoint_broker::apply_response(filter, &request.rule.alias, &mut data); + endpoint_broker::apply_response(filter, &request.rule.alias, &mut response); } - if let Some(result) = data.clone().result { - if result != config_entry.default { - info!( - "perform_getter_migration: Plugin has non-default value. Updating legacy storage with new value: {:?}", - result - ); - self.update_legacy_storage( - &config_entry.namespace, - &config_entry.key, + if let Some(result) = response.result { + // legacy storage has some value. It needs to be migrated to the plugin in case plugin has default value + return self + .check_migration_cases( + result, + legacy_value, broker, - ws_tx.clone(), - &result, + ws_tx, + config_entry, + data_for_callback, ) - .await?; - self.set_migration_status(&config_entry.namespace, &config_entry.key) - .await; - // this BrokerOutput has the data from the plugin - Ok((true, Some(BrokerOutput { data }))) - } else if legacy_value != config_entry.default { - info!( - "perform_getter_migration: Plugin has default value and Legacy storage has the latest value. Updating plugin with value from legacy storage: {:?}", - legacy_value - ); - let mut response = self - .update_plugin_from_legacy(broker, ws_tx.clone(), config_entry, &legacy_value) - .await?; - self.set_migration_status(&config_entry.namespace, &config_entry.key) - .await; + .await; + } - // this BrokerOutput has the data from the legacy storage. - // prepare the response to send back to the caller as response from plugin but with the data from the legacy storage - response.data.result = Some(legacy_value.clone()); - // check if there is any value conversion rule available - if let Some(conversion_rule) = &config_entry.legacy_to_plugin_value_conversion { - // apply the conversion rule to the data from the legacy storage - let data = crate::broker::rules_engine::jq_compile( - json!({ "value": legacy_value }), - &conversion_rule.conversion_rule, - "legacy_to_plugin_value_conversion".to_string(), - ); - if let Ok(data) = data { - response.data.result = Some(data); - } - } - return Ok((true, Some(response))); - } else { - info!( - "perform_getter_migration: Both plugin and legacy storage have default value. No migration needed." - ); - self.set_migration_status(&config_entry.namespace, &config_entry.key) - .await; - return Ok((true, Some(BrokerOutput { data }))); + Err(UserDataMigratorError::ResponseError( + "No data collected from Legacy Storage".to_string(), + )) + } + + async fn check_migration_cases( + &self, + result: Value, + legacy_value: Value, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + data_for_callback: JsonRpcApiResponse, + ) -> Result<(bool, Option), UserDataMigratorError> { + info!( + "perform_getter_migration: Checking migration cases. Legacy value: {:?}, Plugin value: {:?}, Config default: {:?}", + legacy_value, result, config_entry.default + ); + if result != config_entry.default { + // Case 1: Plugin has non-default value. Updating legacy storage with new value + return self + .handle_non_default_plugin_value( + result, + legacy_value, + broker, + ws_tx, + config_entry, + data_for_callback, + ) + .await; + } + + if legacy_value != config_entry.default { + // Case 2: Plugin has default value and Legacy storage has the latest value + return self + .handle_default_plugin_value(legacy_value, broker, ws_tx, config_entry) + .await; + } + + // Case 3: Both plugin and legacy storage have default value + info!( + "perform_getter_migration: Both plugin and legacy storage have default value. No migration needed." + ); + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + Ok(( + true, + Some(BrokerOutput { + data: data_for_callback, + }), + )) + } + + async fn handle_non_default_plugin_value( + &self, + result: Value, + legacy_value: Value, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + data_for_callback: JsonRpcApiResponse, + ) -> Result<(bool, Option), UserDataMigratorError> { + info!( + "perform_getter_migration: Plugin has non-default value. Updating legacy storage with new value: {:?}", + result + ); + if result != legacy_value { + self.update_legacy_storage( + &config_entry.namespace, + &config_entry.key, + broker, + ws_tx.clone(), + &result, + ) + .await?; + } + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + Ok(( + true, + Some(BrokerOutput { + data: data_for_callback, + }), + )) + } + + async fn handle_default_plugin_value( + &self, + legacy_value: Value, + broker: &ThunderBroker, + ws_tx: Arc, Message>>>, + config_entry: &MigrationConfigEntry, + ) -> Result<(bool, Option), UserDataMigratorError> { + info!( + "perform_getter_migration: Plugin has default value and Legacy storage has the latest value. Updating plugin with value from legacy storage: {:?}", + legacy_value + ); + let mut response = self + .update_plugin_from_legacy(broker, ws_tx.clone(), config_entry, &legacy_value) + .await?; + self.set_migration_status(&config_entry.namespace, &config_entry.key) + .await; + + response.data.result = Some(legacy_value.clone()); + if let Some(conversion_rule) = &config_entry.legacy_to_plugin_value_conversion { + let data = crate::broker::rules_engine::jq_compile( + json!({ "value": legacy_value }), + &conversion_rule.conversion_rule, + "legacy_to_plugin_value_conversion".to_string(), + ); + if let Ok(data) = data { + response.data.result = Some(data); } - } else { - Err(UserDataMigratorError::ResponseError( - "No result in response".to_string(), - )) } + Ok((true, Some(response))) } async fn update_legacy_storage(