Skip to content

Commit

Permalink
fix: Validated some corner cases in getter migration use case. Code c…
Browse files Browse the repository at this point in the history
…leanup.
  • Loading branch information
Vinodsathyaseelan committed Oct 19, 2024
1 parent b263a80 commit 5da2381
Showing 1 changed file with 194 additions and 93 deletions.
287 changes: 194 additions & 93 deletions core/main/src/broker/thunder/user_data_migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ use std::{
};

use ripple_sdk::{
api::{device::device_peristence::StorageData, gateway::rpc_gateway_api::JsonRpcApiResponse},
log::{debug, error, info},
tokio::{
self,
net::TcpStream,
sync::mpsc::{self, Receiver, Sender},
sync::Mutex,
sync::{
mpsc::{self, Receiver, Sender},
Mutex,
},
time::{timeout, Duration},
},
utils::error::RippleError,
Expand Down Expand Up @@ -172,6 +175,10 @@ impl UserDataMigrator {
}
}

info!(
"intercept_broker_request: No migration entry found for method: {:?}",
method
);
// Continue with the original request if no migration entry is found
false
}
Expand Down Expand Up @@ -280,42 +287,56 @@ impl UserDataMigrator {
.to_string();

info!(
"read_from_legacy_storage: Sending request to plugin: {:?}",
"read_from_legacy_storage: Sending request : {:?}",
thunder_request
);

// send the request to the legacy storage
if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await {
error!(
"read_from_legacy_storage: Failed to send thunder request: {:?}",
e
);
// Unregister the custom callback and return
error!("read_from_legacy_storage: Failed to send request: {:?}", e);
broker.unregister_custom_callback(request_id).await;
return Err(e);
}
// get the response and check if the response is successful by checking result or error field.
// Value::Null is a valid response, return Err if the response is not successful

let response =
Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await;
match response {
Ok(response) => {
if let Some(result) = response.data.result {
// extract the value field from the result
if let Some(value) = result.get("value") {
return Ok(value.clone());
}
Err(UserDataMigratorError::ResponseError(
"No value field in response".to_string(),
))
} else {
Err(UserDataMigratorError::ResponseError(
"No result in response".to_string(),
))
}
}
Err(e) => Err(e),
}
self.process_response_from_legacy_storage(response)
}

fn process_response_from_legacy_storage(
&self,
response: Result<BrokerOutput, UserDataMigratorError>,
) -> Result<Value, UserDataMigratorError> {
info!("process_response_from_legacy_storage: Processing response");
let response = response.map_err(|e| {
error!("Failed to get response: {}", e);
UserDataMigratorError::ResponseError(e.to_string())
})?;

let result = response.data.result.ok_or_else(|| {
UserDataMigratorError::ResponseError("No result field in response".to_string())
})?;

let value = result.get("value").ok_or_else(|| {
UserDataMigratorError::ResponseError("No value field in response".to_string())
})?;

let value_str = value.as_str().ok_or_else(|| {
UserDataMigratorError::ResponseError("Value is not a string".to_string())
})?;

let storage_data: StorageData = serde_json::from_str(value_str).map_err(|_e| {
UserDataMigratorError::ResponseError("Failed to deserialize JSON".to_string())
})?;

let final_value = storage_data.value;

info!(
"process_response_from_legacy_storage: Successfully read from legacy storage: {:?}",
final_value
);

Ok(final_value.clone())
}

async fn write_to_legacy_storage(
Expand All @@ -339,6 +360,8 @@ impl UserDataMigrator {
)
.await;

// set storage data in the format required by the legacy storage
let data = StorageData::new(params_json.clone());
// create the request to the legacy storage
let thunder_request = json!({
"jsonrpc": "2.0",
Expand All @@ -347,22 +370,19 @@ impl UserDataMigrator {
"params": json!({
"namespace": namespace,
"key": key,
"value": params_json,
"value": data,
"scope": "device",
})
})
.to_string();
info!(
"write_to_legacy_storage: Sending request to plugin: {:?}",
"write_to_legacy_storage: Sending request : {:?}",
thunder_request
);

// send the request to the legacy storage
if let Err(e) = self.send_thunder_request(&ws_tx, &thunder_request).await {
error!(
"write_to_legacy_storage: Failed to send thunder request: {:?}",
e
);
error!("write_to_legacy_storage: Failed to send request: {:?}", e);
// Unregister the custom callback and return
broker.unregister_custom_callback(request_id).await;
return Ok(());
Expand Down Expand Up @@ -457,7 +477,13 @@ impl UserDataMigrator {
format!("{}_request", method),
);
}
Ok(serde_json::to_value(&data).unwrap())
serde_json::to_value(&data).map_err(|e| {
error!(
"Failed to serialize data in transform_requets_params: {}",
e
);
RippleError::BrokerError(e.to_string())
})
}

async fn write_to_thunder_plugin(
Expand Down Expand Up @@ -519,7 +545,7 @@ impl UserDataMigrator {
return Err(e);
}

// get the response from the custom callback
// get the response from the custom callback, unregister the callback and return the response
Self::wait_for_response(self.response_rx.clone(), broker.clone(), request_id).await
}

Expand Down Expand Up @@ -553,10 +579,12 @@ impl UserDataMigrator {
}
Ok(None) => {
error!("No response received at custom write_to_legacy_storage");
broker.unregister_custom_callback(request_id).await;
return Err(UserDataMigratorError::TimeoutError);
}
Err(_) => {
error!("Error receiving response at custom write_to_legacy_storage");
broker.unregister_custom_callback(request_id).await;
return Err(UserDataMigratorError::TimeoutError);
}
};
Expand Down Expand Up @@ -668,78 +696,151 @@ impl UserDataMigrator {
"perform_getter_migration: Read from legacy storage: {:?}",
legacy_value
);
let value_from_thunder_plugin = self

let output_from_thunder_plugin = self
.read_from_thunder_plugin(broker, ws_tx.clone(), request)
.await?;

// apply the response transform to the data from the plugin
let mut data = value_from_thunder_plugin.clone().data;
let mut response = output_from_thunder_plugin.clone().data;
let data_for_callback = response.clone();

if let Some(filter) = request
.rule
.transform
.get_transform_data(RuleTransformType::Response)
{
endpoint_broker::apply_response(filter, &request.rule.alias, &mut data);
endpoint_broker::apply_response(filter, &request.rule.alias, &mut response);
}

if let Some(result) = data.clone().result {
if result != config_entry.default {
info!(
"perform_getter_migration: Plugin has non-default value. Updating legacy storage with new value: {:?}",
result
);
self.update_legacy_storage(
&config_entry.namespace,
&config_entry.key,
if let Some(result) = response.result {
// legacy storage has some value. It needs to be migrated to the plugin in case plugin has default value
return self
.check_migration_cases(
result,
legacy_value,
broker,
ws_tx.clone(),
&result,
ws_tx,
config_entry,
data_for_callback,
)
.await?;
self.set_migration_status(&config_entry.namespace, &config_entry.key)
.await;
// this BrokerOutput has the data from the plugin
Ok((true, Some(BrokerOutput { data })))
} else if legacy_value != config_entry.default {
info!(
"perform_getter_migration: Plugin has default value and Legacy storage has the latest value. Updating plugin with value from legacy storage: {:?}",
legacy_value
);
let mut response = self
.update_plugin_from_legacy(broker, ws_tx.clone(), config_entry, &legacy_value)
.await?;
self.set_migration_status(&config_entry.namespace, &config_entry.key)
.await;
.await;
}

// this BrokerOutput has the data from the legacy storage.
// prepare the response to send back to the caller as response from plugin but with the data from the legacy storage
response.data.result = Some(legacy_value.clone());
// check if there is any value conversion rule available
if let Some(conversion_rule) = &config_entry.legacy_to_plugin_value_conversion {
// apply the conversion rule to the data from the legacy storage
let data = crate::broker::rules_engine::jq_compile(
json!({ "value": legacy_value }),
&conversion_rule.conversion_rule,
"legacy_to_plugin_value_conversion".to_string(),
);
if let Ok(data) = data {
response.data.result = Some(data);
}
}
return Ok((true, Some(response)));
} else {
info!(
"perform_getter_migration: Both plugin and legacy storage have default value. No migration needed."
);
self.set_migration_status(&config_entry.namespace, &config_entry.key)
.await;
return Ok((true, Some(BrokerOutput { data })));
Err(UserDataMigratorError::ResponseError(
"No data collected from Legacy Storage".to_string(),
))
}

async fn check_migration_cases(
&self,
result: Value,
legacy_value: Value,
broker: &ThunderBroker,
ws_tx: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
config_entry: &MigrationConfigEntry,
data_for_callback: JsonRpcApiResponse,
) -> Result<(bool, Option<BrokerOutput>), UserDataMigratorError> {
info!(
"perform_getter_migration: Checking migration cases. Legacy value: {:?}, Plugin value: {:?}, Config default: {:?}",
legacy_value, result, config_entry.default
);
if result != config_entry.default {
// Case 1: Plugin has non-default value. Updating legacy storage with new value
return self
.handle_non_default_plugin_value(
result,
legacy_value,
broker,
ws_tx,
config_entry,
data_for_callback,
)
.await;
}

if legacy_value != config_entry.default {
// Case 2: Plugin has default value and Legacy storage has the latest value
return self
.handle_default_plugin_value(legacy_value, broker, ws_tx, config_entry)
.await;
}

// Case 3: Both plugin and legacy storage have default value
info!(
"perform_getter_migration: Both plugin and legacy storage have default value. No migration needed."
);
self.set_migration_status(&config_entry.namespace, &config_entry.key)
.await;
Ok((
true,
Some(BrokerOutput {
data: data_for_callback,
}),
))
}

async fn handle_non_default_plugin_value(
&self,
result: Value,
legacy_value: Value,
broker: &ThunderBroker,
ws_tx: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
config_entry: &MigrationConfigEntry,
data_for_callback: JsonRpcApiResponse,
) -> Result<(bool, Option<BrokerOutput>), UserDataMigratorError> {
info!(
"perform_getter_migration: Plugin has non-default value. Updating legacy storage with new value: {:?}",
result
);
if result != legacy_value {
self.update_legacy_storage(
&config_entry.namespace,
&config_entry.key,
broker,
ws_tx.clone(),
&result,
)
.await?;
}
self.set_migration_status(&config_entry.namespace, &config_entry.key)
.await;
Ok((
true,
Some(BrokerOutput {
data: data_for_callback,
}),
))
}

async fn handle_default_plugin_value(
&self,
legacy_value: Value,
broker: &ThunderBroker,
ws_tx: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
config_entry: &MigrationConfigEntry,
) -> Result<(bool, Option<BrokerOutput>), UserDataMigratorError> {
info!(
"perform_getter_migration: Plugin has default value and Legacy storage has the latest value. Updating plugin with value from legacy storage: {:?}",
legacy_value
);
let mut response = self
.update_plugin_from_legacy(broker, ws_tx.clone(), config_entry, &legacy_value)
.await?;
self.set_migration_status(&config_entry.namespace, &config_entry.key)
.await;

response.data.result = Some(legacy_value.clone());
if let Some(conversion_rule) = &config_entry.legacy_to_plugin_value_conversion {
let data = crate::broker::rules_engine::jq_compile(
json!({ "value": legacy_value }),
&conversion_rule.conversion_rule,
"legacy_to_plugin_value_conversion".to_string(),
);
if let Ok(data) = data {
response.data.result = Some(data);
}
} else {
Err(UserDataMigratorError::ResponseError(
"No result in response".to_string(),
))
}
Ok((true, Some(response)))
}

async fn update_legacy_storage(
Expand Down

0 comments on commit 5da2381

Please sign in to comment.