Skip to content

Commit

Permalink
wait until feature on MG
Browse files Browse the repository at this point in the history
new feature: it is possible to add the condition "WaitUntil" to a
`MatchMessageType` action result.
The condition idea is to wait until the desired message arrives. In the
condition config there is a timeout, that avoids the MG test getting
stuck. There is also a list of allowed messages. If the test awaits a
certain message, then all the other messages arrived must belong to this
user specified list of allowed messages. This config is passed to the
MG in the json file.
A `MatchMessageField` with the `WaitUntil` condition has the following
shape:
```json
 "results": [
     {
         "type": "match_message_type",
         "value": "0x1b",
         "condition": {"WaitUntil": {"WaitUntilConfig": {"timeout": 120, "allowed_messages": ["0x1b", "0x16"] } } }
     }
 ],
```
This action ignores a message with id `0x16`, fails if arrives a message
with id `0x17` (for example) and succeeds if arrive the message with desired id
`0x1b`.

Add README notes for WaitUntil feature
  • Loading branch information
lorbax committed Jul 29, 2024
1 parent 5583b3d commit 1cb5896
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,11 @@
"results": [
{
"type": "match_message_type",
"value": "0x1b"
"value": "0x1b",
"condition": {"WaitUntil": {"WaitUntilConfig": {"timeout": 120, "allowed_messages": ["0x1b", "0x16"] } } }
}
],
"actiondoc": "Sends NewExtendedMiningJob and SetNewPrevHash and waits that a SubmitsShareExtended is submitted"
"actiondoc": "Sends NewExtendedMiningJob and SetNewPrevHash and waits until a SubmitsShareExtended is submitted"
}
],
"setup_commands": [
Expand Down
20 changes: 20 additions & 0 deletions utils/message-generator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,26 @@ message will be the abbreviated with `setup_connection_success_template_distribu
]
}
```
For the result `MatchMessageType` there is the optional feature `WaitUntil`. The sintax is like this
```json
"results": [
{
"type": "match_message_type",
"value": "0x1b",
"condition": {"WaitUntil": {"WaitUntilConfig": {"timeout": 120, "allowed_messages": ["0x1b", "0x16"] } } }
}
],
```
This action will have the following behaviour:
1. Checks all the messages that arrive.
The `allowed_messages` describe the message types that are allowed, i.e. `0x1b` and `0x16`.
If arrives one message that has a different type, the test will fail. This is needed for making
the MG a finite state machine.
2. THe test will listen all the allowed messages until the target message is received. In the
previous case, the target message has type `0x1b`.
3. A timeout that indicates when the test is considered to be failed. This is used in the case that
the MG received only messages of allowed type, but hasn't received the target message for a long
time, making the MG test stuck.

If the test version is "1", each object is composed by:
1. `messages_ids`: an array of strings, that are ids of sv1_messages previously defined.
Expand Down
120 changes: 97 additions & 23 deletions utils/message-generator/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use crate::{
into_static::into_static,
net::{setup_as_downstream, setup_as_upstream},
parser::sv2_messages::ReplaceField,
Action, ActionResult, Command, Role, SaveField, Sv2Type, Test,
Action, ActionResult, Command, Condition, Role, SaveField, Sv2Type, Test,
};
use async_channel::{Receiver, Sender};
use binary_sv2::Serialize;
use codec_sv2::{StandardEitherFrame as EitherFrame, Sv2Frame};
use roles_logic_sv2::parsers::{self, AnyMessage};
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use tokio::{pin, time::sleep};

use tracing::{debug, error, info};

Expand Down Expand Up @@ -200,32 +201,105 @@ impl Executor {
);

match result {
ActionResult::MatchMessageType(message_type) => {
let message = match recv.recv().await {
Ok(message) => message,
Err(_) => {
ActionResult::MatchMessageType(message_type, condition) => match condition {
None => {
let message = match recv.recv().await {
Ok(message) => message,
Err(_) => {
success = false;
error!("Connection closed before receiving the message");
break;
}
};

let message: Sv2Frame<AnyMessage<'static>, _> =
message.try_into().unwrap();
debug!("RECV {:#?}", message);
let header = message.get_header().unwrap();

if header.msg_type() != *message_type {
error!(
"WRONG MESSAGE TYPE expected: {} received: {}",
message_type,
header.msg_type()
);
success = false;
error!("Connection closed before receiving the message");
break;
} else {
info!("MATCHED MESSAGE TYPE {}", message_type);
}
};

let message: Sv2Frame<AnyMessage<'static>, _> = message.try_into().unwrap();
debug!("RECV {:#?}", message);
let header = message.get_header().unwrap();

if header.msg_type() != *message_type {
error!(
"WRONG MESSAGE TYPE expected: {} received: {}",
message_type,
header.msg_type()
);
success = false;
break;
} else {
info!("MATCHED MESSAGE TYPE {}", message_type);
}
}
Some(condition_inner) => {
match condition_inner {
Condition::WaitUntil(wait_until_config) => {
let duration = std::time::Duration::from_secs(
wait_until_config.timeout as u64,
);
let timer = sleep(duration);
pin!(timer);
let async_block = async {
loop {
let message = recv.recv().await;
let message = match message {
Ok(message) => message,
Err(_) => {
success = false;
error!("Connection closed before receiving the message");
break;
}
};
let message: Sv2Frame<AnyMessage<'static>, _> =
message.try_into().unwrap();
debug!("RECV {:#?}", message);
let header = match message.get_header() {
Some(header_) => header_,
None => {
error!("Failed to get message header");
success = false;
break;
}
};
let allowed_messages =
&wait_until_config.allowed_messages;
let received_message_type = header.msg_type();
if allowed_messages.contains(&received_message_type) {
if received_message_type == *message_type {
info!(
"MATCHED WAITED MESSAGE TYPE {}",
received_message_type
);
break;
} else {
info!(
"RECEIVED {}, WAITING MESSAGE TYPE {}",
received_message_type, message_type
);
continue;
}
} else {
error!(
"RECEIVED MESSAGE OF NOT ALLOWED TYPE {:?}",
received_message_type
);
success = false;
break;
}
}
};
tokio::select! {
_ = &mut timer => {
error!("Timer has elapsed before wanted message has arrived");
success = false;
break;
},
_ = async_block => {
info!("MATCHED WAITED MESSAGE TYPE {}", message_type);
}
}
}
};
}
},
ActionResult::MatchMessageField((
subprotocol,
message_type,
Expand Down
47 changes: 39 additions & 8 deletions utils/message-generator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ extern crate load_file;
use crate::parser::sv2_messages::ReplaceField;
use binary_sv2::{Deserialize, Serialize};
use codec_sv2::StandardEitherFrame as EitherFrame;
use core::fmt::Display;
use external_commands::*;
use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey};
use rand::Rng;
Expand Down Expand Up @@ -178,10 +179,31 @@ pub struct SaveField {
field_name: String,
keyword: String,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct WaitUntilConfig {
timeout: u32, // in seconds
allowed_messages: Vec<u8>,
}

// enum as a placeholder for new potential conditions to be implemented in the future
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum Condition {
WaitUntil(WaitUntilConfig),
}

impl Display for Condition {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Condition::WaitUntil(wait_until_config) => {
write!(f, "WaitUntil condition with config {:?}", wait_until_config)
}
}
}
}

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
enum ActionResult {
MatchMessageType(u8),
MatchMessageType(u8, Option<Condition>),
MatchMessageField((String, String, Vec<(String, Sv2Type)>)),
GetMessageField {
subprotocol: String,
Expand Down Expand Up @@ -209,13 +231,22 @@ enum Sv1ActionResult {
impl std::fmt::Display for ActionResult {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
ActionResult::MatchMessageType(message_type) => {
write!(
f,
"MatchMessageType: {} ({:#x})",
message_type, message_type
)
}
ActionResult::MatchMessageType(message_type, condition) => match condition {
None => {
write!(
f,
"MatchMessageType: {} ({:#x})",
message_type, message_type
)
}
Some(condition_inner) => {
write!(
f,
"MatchMessageType: {} ({:#x}) with condition {}",
message_type, message_type, condition_inner
)
}
},
ActionResult::MatchMessageField(message_field) => {
write!(f, "MatchMessageField: {:?}", message_field)
}
Expand Down
49 changes: 47 additions & 2 deletions utils/message-generator/src/parser/actions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{Action, ActionResult, Role, SaveField, Sv1Action, Sv1ActionResult, Sv2Type};
use crate::{
Action, ActionResult, Condition, Role, SaveField, Sv1Action, Sv1ActionResult, Sv2Type,
WaitUntilConfig,
};
use codec_sv2::{buffer_sv2::Slice, StandardEitherFrame, Sv2Frame};
use roles_logic_sv2::parsers::AnyMessage;
use serde_json::{Map, Value};
Expand Down Expand Up @@ -52,7 +55,49 @@ impl Sv2ActionParser {
match result.get("type").unwrap().as_str().unwrap() {
"match_message_type" => {
let message_type = u8::from_str_radix(&result.get("value").unwrap().as_str().unwrap()[2..], 16).expect("Result message_type should be an hex value starting with 0x and not bigger than 0xff");
action_results.push(ActionResult::MatchMessageType(message_type));
match result.get("condition") {
Some(condition_inner) => {
dbg!(&condition_inner);
let condition_inner = condition_inner
.get("WaitUntil")
.unwrap()
.get("WaitUntilConfig")
.unwrap();
let timeout = serde_json::from_value::<u32>(
condition_inner.get("timeout").unwrap().clone(),
)
.unwrap();
let allowed_messages = condition_inner
.get("allowed_messages")
.unwrap()
.as_array()
.unwrap();
let allowed_messages: Vec<u8> = allowed_messages.iter().map(|t| u8::from_str_radix(&t.as_str().unwrap()[2..], 16).expect("Result message_type should be an hex value starting with 0x and not bigger than 0xff")).collect();
let wait_until_config = WaitUntilConfig {
timeout,
allowed_messages,
};
action_results.push(ActionResult::MatchMessageType(
message_type,
Some(Condition::WaitUntil(wait_until_config)),
));

//dbg!(serde_json::from_value::<WaitUntilConfig>(condition_inner.clone()).unwrap());
//match condition_inner.get("WaitUntilConfig") {
// Some(wait_until_config) => {
// let wait_until_config = serde_json::from_value::<WaitUntilConfig>(wait_until_config.clone())
// .unwrap();
// action_results.push(ActionResult::MatchMessageType(
// message_type,
// Some(Condition::WaitUntil(wait_until_config)),
// ))
// },
// None => panic!("WaitUntilConfig not present"),
//};
}
None => action_results
.push(ActionResult::MatchMessageType(message_type, None)),
};
}
"get_message_field" => {
let sv2_type = result.get("value").unwrap().clone();
Expand Down

0 comments on commit 1cb5896

Please sign in to comment.