Skip to content

Commit

Permalink
Add appId to push/pull data for x-provided-by (#618)
Browse files Browse the repository at this point in the history
  • Loading branch information
pahearn73 authored Sep 27, 2024
1 parent 3270c70 commit d19ad9f
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 378 deletions.
141 changes: 111 additions & 30 deletions core/main/src/firebolt/handlers/provider_registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ use ripple_sdk::{
},
gateway::rpc_gateway_api::{CallContext, CallerSession},
},
log::{error, info},
log::{error, info, warn},
tokio::{sync::oneshot, time::timeout},
};
use serde_json::Value;
use serde_json::{Map, Value};

// TODO: Add to config
const DEFAULT_PROVIDER_RESPONSE_TIMEOUT_MS: u64 = 15000;
Expand Down Expand Up @@ -290,22 +290,67 @@ impl ProviderRegistrar {
);
if let Some(event) = &context.provider_relation_set.provides_to {
let mut params_sequence = params.sequence();
let _call_context: Option<CallContext> = params_sequence.next().ok();
let call_context: Option<CallContext> = params_sequence.next().ok();

let result: Value = match params_sequence.next() {
let mut event_data: Value = match params_sequence.next() {
Ok(r) => r,
Err(e) => {
error!("callback_app_event_emitter: Error: {:?}", e);
return Err(Error::Custom("Missing result".to_string()));
return Err(Error::Custom("Missing event_data".to_string()));
}
};

AppEvents::emit(
&context.platform_state,
&FireboltOpenRpcMethod::name_with_lowercase_module(event),
&result,
)
.await;
if let Some(event_data_map) = event_data.as_object_mut() {
if let Some(event_schema_map) = context
.platform_state
.open_rpc_state
.get_openrpc_validator()
.get_result_properties_schema_by_name(event)
{
// Populate the event result, injecting the app ID if the field exists in the event schema

let mut result_map = Map::new();

for key in event_schema_map.keys() {
if let Some(event_value) = event_data_map.get(key) {
result_map.insert(key.clone(), event_value.clone());
} else if key.eq("appId") {
if let Some(context) = call_context.clone() {
result_map.insert(key.clone(), Value::String(context.app_id));
} else {
error!("callback_app_event_emitter: Missing call context, could not determine app ID");
result_map.insert(key.clone(), Value::Null);
}
} else {
error!(
"callback_app_event_emitter: Missing field in event data: field={}",
key
);
result_map.insert(key.clone(), Value::Null);
}
}

AppEvents::emit(
&context.platform_state,
&FireboltOpenRpcMethod::name_with_lowercase_module(event),
&Value::Object(result_map),
)
.await;
} else {
error!("callback_app_event_emitter: Result schema not found");
return Err(Error::Custom(String::from("Result schema not found")));
}
} else {
warn!(
"callback_app_event_emitter: event data is not an object: event_data={:?}",
event_data
);
return Err(Error::Custom(String::from("Event data is not an object")));
}
} else {
return Err(Error::Custom(String::from(
"Unexpected schema configuration",
)));
}

Ok(None)
Expand Down Expand Up @@ -391,36 +436,72 @@ impl ProviderRegistrar {
app_id: None,
};

ProviderBroker::invoke_method(&context.platform_state, provider_broker_request)
.await;
let provider_app_id = ProviderBroker::invoke_method(
&context.platform_state,
provider_broker_request,
)
.await;

match timeout(
if let Ok(result) = timeout(
Duration::from_millis(DEFAULT_PROVIDER_RESPONSE_TIMEOUT_MS),
provider_response_payload_rx,
)
.await
{
Ok(result) => match result {
Ok(provider_response_payload) => {
return if let Some(e) = provider_response_payload.is_error() {
Err(Error::Call(CallError::Custom {
if let Ok(provider_response_payload) = result {
match provider_response_payload {
ProviderResponsePayload::GenericResponse(
provider_response_value,
) => {
if let Some(result_properties_map) = context
.platform_state
.open_rpc_state
.get_openrpc_validator()
.get_result_properties_schema_by_name(&context.method)
{
// Inject the provider app ID if the field exists in the provided-to response schema, the other field will be
// the provider response. The firebolt spec is not ideal in that the provider response data is captured
// within a field of the provided-to's response object, hence the somewhat arbritrary logic here. Ideally
// the provided-to response object would be identical to the provider response object aside from an optional
// appId field.

let mut response_map = Map::new();
for key in result_properties_map.keys() {
if key.eq("appId") {
response_map.insert(
key.clone(),
Value::String(
provider_app_id.clone().unwrap_or_default(),
),
);
} else {
response_map.insert(
key.clone(),
provider_response_value.clone(),
);
}
}
return Ok(Value::Object(response_map));
}
}
ProviderResponsePayload::GenericError(e) => {
return Err(Error::Call(CallError::Custom {
code: e.code,
message: e.message,
data: None,
}))
} else {
Ok(provider_response_payload.as_value())
};
}
Err(_) => {
return Err(Error::Custom(String::from(
"Error returning from provider",
)));
}));
}
_ => {
return Ok(provider_response_payload.as_value());
}
}
},
Err(_) => {
return Err(Error::Custom(String::from("Provider response timeout")));
} else {
return Err(Error::Custom(String::from(
"Error returning from provider",
)));
}
} else {
return Err(Error::Custom(String::from("Provider response timeout")));
}
}
}
Expand Down
22 changes: 17 additions & 5 deletions core/main/src/service/apps/provider_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,28 +220,36 @@ impl ProviderBroker {
ProviderResult::new(result)
}

pub async fn invoke_method(pst: &PlatformState, request: ProviderBrokerRequest) {
pub async fn invoke_method(
pst: &PlatformState,
request: ProviderBrokerRequest,
) -> Option<String> {
let mut provider_app_id = None;

let cap_method = format!(
"{}:{}",
request.capability,
FireboltOpenRpcMethod::name_with_lowercase_module(&request.method)
);

debug!("invoking provider for {}", cap_method);

let provider_opt = {
let provider_methods = pst.provider_broker_state.provider_methods.read().unwrap();
provider_methods.get(&cap_method).cloned()
};
if let Some(provider) = provider_opt {
let event_name = provider.event_name.clone();

if let Some(provider_method) = provider_opt {
let event_name = provider_method.event_name.clone();
let req_params = request.request.clone();
let app_id_opt = request.app_id.clone();
let c_id = ProviderBroker::start_provider_session(pst, request, provider);
let c_id =
ProviderBroker::start_provider_session(pst, request, provider_method.clone());
if let Some(app_id) = app_id_opt {
debug!("Sending request to specific app {}", app_id);
AppEvents::emit_to_app(
pst,
app_id,
app_id.clone(),
&event_name,
&serde_json::to_value(ProviderRequest {
correlation_id: c_id,
Expand All @@ -250,6 +258,7 @@ impl ProviderBroker {
.unwrap(),
)
.await;
provider_app_id = Some(app_id.clone());
} else {
debug!("Broadcasting request to all the apps!!");
AppEvents::emit(
Expand All @@ -262,11 +271,14 @@ impl ProviderBroker {
.unwrap(),
)
.await;
provider_app_id = Some(provider_method.provider.app_id);
}
} else {
debug!("queuing provider request");
ProviderBroker::queue_provider_request(pst, request);
}

provider_app_id
}

fn start_provider_session(
Expand Down
21 changes: 0 additions & 21 deletions core/sdk/src/api/firebolt/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,6 @@ impl ProviderResponsePayload {
ProviderResponsePayload::GenericResponse(res) => res.clone(),
}
}

pub fn is_error(&self) -> Option<GenericProviderError> {
if let ProviderResponsePayload::GenericError(e) = self {
Some(e.clone())
} else {
None
}
}
}

#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -380,17 +372,4 @@ mod tests {
})
);
}

#[test]
fn test_error_payload() {
let error = ProviderResponsePayload::GenericError(GenericProviderError {
code: -32121,
message: "some error message".into(),
data: None,
});
let er = error.is_error().unwrap();
assert!(er.code.eq(&-32121));
assert!(er.message.eq("some error message"));
assert!(er.data.is_none());
}
}
4 changes: 2 additions & 2 deletions device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ pub async fn boot_thunder(
SetupThunderProcessor::setup(state.clone()).await;
return Some(state);
} else {
error!("Unable to connect to Thuner, error in ThunderPoolStep");
error!("Unable to connect to Thunder, error in ThunderPoolStep");
}
} else {
error!("Unable to connect to Thuner, error in ThunderGetConfigStep");
error!("Unable to connect to Thunder, error in ThunderGetConfigStep");
}
None
}
54 changes: 52 additions & 2 deletions openrpc_validator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::HashMap, fs};

use jsonschema::JSONSchema;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use serde_json::{json, Map, Value};

pub extern crate jsonschema;

Expand Down Expand Up @@ -35,6 +35,56 @@ impl FireboltOpenRpc {
None
}

fn get_result_ref_schemas(
&self,
result_schema_map: &Map<String, Value>,
) -> Option<Map<String, Value>> {
if let Some(result_schema_value) = result_schema_map.get("$ref") {
if let Some(result_schema_string) = result_schema_value.as_str() {
let result_type_string = result_schema_string.split('/').last().unwrap();
for spec in self.apis.values() {
if let Value::Object(components) = &spec.components {
if let Some(Value::Object(schemas_map)) = components.get("schemas") {
if let Some(result_type_value) = schemas_map.get(result_type_string) {
if let Some(result_type_map) = result_type_value.as_object() {
if let Some(Value::Object(result_properties_map)) =
result_type_map.get("properties")
{
return Some(result_properties_map.clone());
}
}
}
}
}
}
}
}
None
}

pub fn get_result_properties_schema_by_name(&self, name: &str) -> Option<Map<String, Value>> {
if let Some(method) = self.get_method_by_name(name) {
if let Some(result_schema_map) = method.result.schema.as_object() {
if let Some(any_of_map) = result_schema_map.get("anyOf") {
if let Some(any_of_array) = any_of_map.as_array() {
for value in any_of_array.iter() {
if let Some(result_properties_map) =
self.get_result_ref_schemas(value.as_object().unwrap())
{
return Some(result_properties_map);
}
}
} else {
return None;
}
} else {
return self.get_result_ref_schemas(result_schema_map);
}
}
}
None
}

pub fn params_validator(
&self,
version: String,
Expand Down Expand Up @@ -205,7 +255,7 @@ pub struct RpcParam {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RpcResult {
name: String,
schema: Value,
pub schema: Value,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down
Loading

0 comments on commit d19ad9f

Please sign in to comment.