Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cleanup devicename handlers -wip #566

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0c43e88
feat: Supporting Non Json Rpc HTTP/WS Brokers
satlead Jun 16, 2024
eadec6a
feat: add unsubscribe support
satlead Jun 27, 2024
782f297
merge from main
satlead Jun 27, 2024
56c3ff1
feat: Make subscription more cleaner
satlead Jun 28, 2024
b60eff4
merge from main
satlead Jul 1, 2024
488cb2f
feat: add more coverag
satlead Jul 1, 2024
1804639
feat: Add mock websocket support
satlead Jul 3, 2024
7ddd952
feat: Add cleanup tests
satlead Jul 7, 2024
84b487b
merge to main
satlead Jul 8, 2024
5872a1f
fix: mock unit test
satlead Jul 8, 2024
95f7d62
fix: cleaner mock
satlead Jul 8, 2024
e21ebad
fix: add more refactoring
satlead Jul 10, 2024
fe1b217
Merge branch 'main' of https://github.com/rdkcentral/Ripple into http…
satlead Jul 10, 2024
4e2019e
Merge branch 'main' of https://github.com/rdkcentral/Ripple into http…
satlead Jul 11, 2024
85f4de1
feat: cleanup devicename handlers -wip
bsenth200 Jul 11, 2024
c221c8e
fix: Changes to support subscription
satlead Jul 11, 2024
ce2c66c
Merge branch 'main' of https://github.com/rdkcentral/Ripple into http…
satlead Jul 11, 2024
2a3cc73
feat: cleanup devicename handlers -wip
bsenth200 Jul 11, 2024
2115df7
Merge branch 'cleanup-devicename-handlers' of https://github.com/rdkc…
bsenth200 Jul 12, 2024
f82fa51
feat: Updated code to fix rpc processor issue
bsenth200 Jul 12, 2024
f773672
feat: merge main
bsenth200 Jul 16, 2024
81420f9
feat: fixed settings_processor friendlyName event subscription
bsenth200 Jul 19, 2024
147a5a9
feat: fixed/reverted test change
bsenth200 Jul 19, 2024
3798b00
Merge branch 'main' into cleanup-devicename-handlers
bsenth200 Jul 19, 2024
07d8ce9
feat: code cleanup
bsenth200 Jul 19, 2024
c04a4fa
build: Merge branch 'main' into cleanup-devicename-handlers
Vinodsathyaseelan Jul 25, 2024
e9a106f
build: Merge branch 'main' into cleanup-devicename-handlers
Vinodsathyaseelan Jul 26, 2024
fe5751e
Merge branch 'main' into cleanup-devicename-handlers
Vinodsathyaseelan Aug 1, 2024
8964da7
build: Merge branch 'main' into cleanup-devicename-handlers
Vinodsathyaseelan Aug 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/main/src/bootstrap/start_communication_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Bootstep<BootstrapState> for StartCommunicationBroker {
state
.platform_state
.get_client()
.add_request_processor(RpcGatewayProcessor::new(state.platform_state.get_client()));
.add_request_processor(RpcGatewayProcessor::new(state.platform_state.clone()));

// Start the Broker Reciever
if let Ok(rx) = state.channels_state.get_broker_receiver() {
Expand Down
40 changes: 38 additions & 2 deletions core/main/src/broker/broker_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
// SPDX-License-Identifier: Apache-2.0
//

use std::time::Duration;

use crate::state::platform_state::PlatformState;
use crate::utils::rpc_utils::extract_tcp_port;
use futures::stream::{SplitSink, SplitStream};
use futures_util::StreamExt;
use jsonrpsee::{core::RpcResult, types::error::CallError};
use ripple_sdk::api::firebolt::fb_capabilities::CAPABILITY_NOT_AVAILABLE;
use ripple_sdk::{
api::gateway::rpc_gateway_api::{ApiProtocol, CallContext, RpcRequest},
extn::extn_client_message::ExtnResponse,
log::{error, info},
tokio::{self, net::TcpStream},
};
use serde_json::from_value;
use std::time::Duration;
use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream};

pub struct BrokerUtils;
Expand Down Expand Up @@ -66,4 +71,35 @@ impl BrokerUtils {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

pub async fn get_device_name(ctx: &CallContext, state: &PlatformState) -> RpcResult<String> {
let mut new_ctx = ctx.clone();
new_ctx.protocol = ApiProtocol::Extn;

let rpc_request = RpcRequest {
ctx: new_ctx.clone(),
method: "device.name".into(),
params_json: RpcRequest::prepend_ctx(None, &new_ctx),
};

let resp = state
.get_client()
.get_extn_client()
.main_internal_request(rpc_request.clone())
.await;

if let Ok(res) = resp {
if let Some(ExtnResponse::Value(val)) = res.payload.extract::<ExtnResponse>() {
if let Ok(v) = from_value::<String>(val) {
return Ok(v);
}
}
}

Err(jsonrpsee::core::Error::Call(CallError::Custom {
code: CAPABILITY_NOT_AVAILABLE,
message: "device.name is not available".into(),
data: None,
}))
}
}
3 changes: 2 additions & 1 deletion core/main/src/broker/endpoint_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ pub struct EndpointBrokerState {
callback: BrokerCallback,
request_map: Arc<RwLock<HashMap<u64, BrokerRequest>>>,
extension_request_map: Arc<RwLock<HashMap<u64, ExtnMessage>>>,
rule_engine: RuleEngine,
pub rule_engine: RuleEngine,
cleaner_list: Arc<RwLock<Vec<BrokerCleaner>>>,
reconnect_tx: Sender<BrokerConnectRequest>,
}
Expand Down Expand Up @@ -605,6 +605,7 @@ impl BrokerOutputForwarder {

// Step 3: Handle Non Extension
if matches!(rpc_request.ctx.protocol, ApiProtocol::Extn) {
is_event = rpc_request.is_subscription();
if let Ok(extn_message) =
platform_state.endpoint_state.get_extn_message(id, is_event)
{
Expand Down
1 change: 0 additions & 1 deletion core/main/src/broker/websocket_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ mod tests {
) -> WebsocketBroker {
// setup mock websocket server
let port = MockWebsocket::start(send_data, Vec::new(), tx, on_close).await;

let endpoint = RuleEndpoint {
url: format!("ws://127.0.0.1:{}", port),
protocol: crate::broker::rules_engine::RuleEndpointProtocol::Websocket,
Expand Down
71 changes: 4 additions & 67 deletions core/main/src/firebolt/handlers/device_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
use std::{collections::HashMap, env, time::Duration};

use crate::{
firebolt::rpc::RippleRPCProvider,
processor::storage::storage_manager::StorageManager,
service::apps::app_events::AppEvents,
state::platform_state::PlatformState,
utils::rpc_utils::{rpc_add_event_listener, rpc_err},
firebolt::rpc::RippleRPCProvider, processor::storage::storage_manager::StorageManager,
service::apps::app_events::AppEvents, state::platform_state::PlatformState,
utils::rpc_utils::rpc_err,
};

use jsonrpsee::{
Expand All @@ -41,7 +39,6 @@ use ripple_sdk::{
},
device_info_request::{DeviceInfoRequest, DeviceResponse, FirmwareInfo},
device_operator::DEFAULT_DEVICE_OPERATION_TIMEOUT_SECS,
device_peristence::SetStringProperty,
device_request::{
AudioProfile, DeviceVersionResponse, HdcpProfile, HdrProfile, NetworkResponse,
},
Expand All @@ -53,10 +50,7 @@ use ripple_sdk::{
},
gateway::rpc_gateway_api::CallContext,
session::{AccountSessionRequest, ProvisionRequest},
storage_property::{
StorageProperty, StoragePropertyData, EVENT_DEVICE_DEVICE_NAME_CHANGED,
EVENT_DEVICE_NAME_CHANGED, KEY_FIREBOLT_DEVICE_UID, SCOPE_DEVICE,
},
storage_property::{StoragePropertyData, KEY_FIREBOLT_DEVICE_UID, SCOPE_DEVICE},
},
extn::extn_client_message::ExtnResponse,
log::error,
Expand Down Expand Up @@ -88,30 +82,10 @@ pub const DEVICE_UID: &str = "device.uid";

#[rpc(server)]
pub trait Device {
#[method(name = "device.name")]
async fn name(&self, ctx: CallContext) -> RpcResult<String>;
#[method(name = "device.setName")]
async fn set_name(
&self,
ctx: CallContext,
_setname_request: SetStringProperty,
) -> RpcResult<()>;
#[method(name = "device.id")]
async fn id(&self, ctx: CallContext) -> RpcResult<String>;
#[method(name = "device.uid")]
async fn uid(&self, ctx: CallContext) -> RpcResult<String>;
#[method(name = "device.onNameChanged")]
async fn on_name_changed(
&self,
ctx: CallContext,
request: ListenRequest,
) -> RpcResult<ListenerResponse>;
#[method(name = "device.onDeviceNameChanged")]
async fn on_device_name_changed(
&self,
ctx: CallContext,
request: ListenRequest,
) -> RpcResult<ListenerResponse>;
#[method(name = "device.model")]
async fn model(&self, ctx: CallContext) -> RpcResult<String>;
#[method(name = "device.sku")]
Expand Down Expand Up @@ -275,15 +249,6 @@ pub async fn get_ll_mac_addr(state: PlatformState) -> RpcResult<String> {
))),
}
}

pub async fn set_device_name(state: &PlatformState, prop: SetStringProperty) -> RpcResult<()> {
StorageManager::set_string(state, StorageProperty::DeviceName, prop.value, None).await
}

pub async fn get_device_name(state: &PlatformState) -> RpcResult<String> {
StorageManager::get_string(state, StorageProperty::DeviceName).await
}

#[derive(Debug)]
pub struct DeviceImpl {
pub state: PlatformState,
Expand Down Expand Up @@ -313,34 +278,6 @@ impl DeviceImpl {

#[async_trait]
impl DeviceServer for DeviceImpl {
async fn name(&self, _ctx: CallContext) -> RpcResult<String> {
get_device_name(&self.state).await
}

async fn set_name(
&self,
_ctx: CallContext,
setname_request: SetStringProperty,
) -> RpcResult<()> {
set_device_name(&self.state, setname_request).await
}

async fn on_name_changed(
&self,
ctx: CallContext,
request: ListenRequest,
) -> RpcResult<ListenerResponse> {
rpc_add_event_listener(&self.state, ctx, request, EVENT_DEVICE_NAME_CHANGED).await
}

async fn on_device_name_changed(
&self,
ctx: CallContext,
request: ListenRequest,
) -> RpcResult<ListenerResponse> {
rpc_add_event_listener(&self.state, ctx, request, EVENT_DEVICE_DEVICE_NAME_CHANGED).await
}

async fn id(&self, _ctx: CallContext) -> RpcResult<String> {
get_device_id(&self.state).await
}
Expand Down
29 changes: 1 addition & 28 deletions core/main/src/firebolt/handlers/second_screen_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,14 @@ use ripple_sdk::api::{
gateway::rpc_gateway_api::CallContext,
};

use super::device_rpc::{get_device_id, get_device_name};
use super::device_rpc::get_device_id;

pub const EVENT_SECOND_SCREEN_ON_CLOSE_REQUEST: &str = "secondscreen.onCloseRequest";
pub const EVENT_SECOND_SCREEN_ON_FRIENDLY_NAME_CHANGED: &str = "secondscreen.onFriendlyNameChanged";

#[rpc(server)]
pub trait SecondScreen {
#[method(name = "secondscreen.device")]
async fn device(&self, ctx: CallContext, param: SecondScreenDeviceInfo) -> RpcResult<String>;
#[method(name = "secondscreen.friendlyName")]
async fn friendly_name(&self, ctx: CallContext) -> RpcResult<String>;
#[method(name = "secondscreen.protocols")]
async fn protocols(&self, _ctx: CallContext) -> RpcResult<HashMap<String, bool>>;
#[method(name = "secondscreen.onLaunchRequest")]
Expand All @@ -59,12 +56,6 @@ pub trait SecondScreen {
ctx: CallContext,
request: ListenRequest,
) -> RpcResult<ListenerResponse>;
#[method(name = "secondscreen.onFriendlyNameChanged")]
async fn on_friendly_name_changed(
&self,
ctx: CallContext,
request: ListenRequest,
) -> RpcResult<ListenerResponse>;
}

pub struct SecondScreenImpl {
Expand All @@ -77,10 +68,6 @@ impl SecondScreenServer for SecondScreenImpl {
get_device_id(&self.state).await
}

async fn friendly_name(&self, _ctx: CallContext) -> RpcResult<String> {
get_device_name(&self.state).await
}

async fn protocols(&self, _ctx: CallContext) -> RpcResult<HashMap<String, bool>> {
let mut protocols = HashMap::<String, bool>::new();
protocols.insert("dial2".into(), true);
Expand Down Expand Up @@ -114,20 +101,6 @@ impl SecondScreenServer for SecondScreenImpl {
)
.await
}

async fn on_friendly_name_changed(
&self,
ctx: CallContext,
request: ListenRequest,
) -> RpcResult<ListenerResponse> {
rpc_add_event_listener(
&self.state,
ctx,
request,
EVENT_SECOND_SCREEN_ON_FRIENDLY_NAME_CHANGED,
)
.await
}
}

pub struct SecondScreenRPCProvider;
Expand Down
45 changes: 29 additions & 16 deletions core/main/src/processor/rpc_gateway_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,35 @@ use ripple_sdk::{
},
extn_client_message::ExtnMessage,
},
log::debug,
tokio::sync::mpsc::Sender,
};

use crate::{
firebolt::firebolt_gateway::FireboltGatewayCommand, service::extn::ripple_client::RippleClient,
firebolt::firebolt_gateway::FireboltGatewayCommand, state::platform_state::PlatformState,
};

/// Processor to service incoming RPC Requests used by extensions and other local rpc handlers for aliasing.
#[derive(Debug)]
pub struct RpcGatewayProcessor {
client: RippleClient,
state: PlatformState,
streamer: DefaultExtnStreamer,
}

impl RpcGatewayProcessor {
pub fn new(client: RippleClient) -> RpcGatewayProcessor {
pub fn new(state: PlatformState) -> RpcGatewayProcessor {
RpcGatewayProcessor {
client,
state,
streamer: DefaultExtnStreamer::new(),
}
}
}

impl ExtnStreamProcessor for RpcGatewayProcessor {
type STATE = RippleClient;
type STATE = PlatformState;
type VALUE = RpcRequest;
fn get_state(&self) -> Self::STATE {
self.client.clone()
self.state.clone()
}

fn sender(&self) -> Sender<ExtnMessage> {
Expand All @@ -66,23 +67,34 @@ impl ExtnStreamProcessor for RpcGatewayProcessor {
#[async_trait]
impl ExtnRequestProcessor for RpcGatewayProcessor {
fn get_client(&self) -> ripple_sdk::extn::client::extn_client::ExtnClient {
self.client.get_extn_client()
self.state.get_client().get_extn_client()
}

async fn process_request(state: Self::STATE, msg: ExtnMessage, request: Self::VALUE) -> bool {
debug!("Inside RPC gateway processor");
match request.ctx.protocol {
ApiProtocol::Extn => {
// Notice how this processor is different from others where it doesnt respond to
// Self::respond this processor delegates the request down
// to the gateway which does more complex inter connected operations. The design for
// Extn Processor is built in such a way to support transient processors which do not
// necessarily need to provide response
if let Err(e) =
state.send_gateway_command(FireboltGatewayCommand::HandleRpcForExtn {
msg: msg.clone(),
})
{
return Self::handle_error(state.get_extn_client(), msg, e).await;
if state.endpoint_state.rule_engine.has_rule(&request) {
if !state
.endpoint_state
.handle_brokerage(request, Some(msg.clone()))
{
return Self::handle_error(
state.get_client().get_extn_client(),
msg,
ripple_sdk::utils::error::RippleError::InvalidAccess,
)
.await;
}
} else if let Err(e) = state.get_client().send_gateway_command(
FireboltGatewayCommand::HandleRpcForExtn { msg: msg.clone() },
) {
return Self::handle_error(state.get_client().get_extn_client(), msg, e).await;
}
}
_ =>
Expand All @@ -92,10 +104,11 @@ impl ExtnRequestProcessor for RpcGatewayProcessor {
// Extn Processor is built in such a way to support transient processors which do not
// necessarily need to provide response
{
if let Err(e) =
state.send_gateway_command(FireboltGatewayCommand::HandleRpc { request })
if let Err(e) = state
.get_client()
.send_gateway_command(FireboltGatewayCommand::HandleRpc { request })
{
return Self::handle_error(state.get_extn_client(), msg, e).await;
return Self::handle_error(state.get_client().get_extn_client(), msg, e).await;
}
}
}
Expand Down
Loading
Loading