diff --git a/README.md b/README.md index cde656b..114b85e 100644 --- a/README.md +++ b/README.md @@ -283,6 +283,40 @@ Then you can set the parameter by using `-P`: latte run -P row_count=200 ``` +### Multi-row partitions with different row count + +If there is a need to simulate real-life case where we have multi-row partitions +and their sizes differ we can easily cover it with latte. + +First step is to define following function in the `prepare` section of a rune script: +``` + pub async fn prepare(db) { + ... + db.init_partition_row_distribution_preset( + "foo", ROW_COUNT, ROWS_PER_PARTITION, "70:1,20:2.5,10:3.5").await?; + ... + } +``` + +With this function we pre-create a preset with the `foo` name +and instruct it to calculate number of partitions and their rows-sizes like following: +- `70%` of partitions will be of the `ROWS_PER_PARTITION` size +- `20%` of `2.5*ROWS_PER_PARTITION` +- `10%` of the `3.5*ROWS_PER_PARTITION`. + +Then, in the target functions we can reuse it like following: +``` + pub async fn insert(db, i) { + let idx = i % ROW_COUNT + OFFSET; + let partition_idx = db.get_partition_idx("foo", idx).await? + OFFSET; + ... + } +``` + +As a result we will be able to get multi-row partitions in a requested size proportions. + +Number of presets is unlimited. Any rune script may use multiple different presets for different tables. + ### Error handling Errors during execution of a workload script are divided into three classes: diff --git a/src/context.rs b/src/context.rs index 9052309..c988f64 100644 --- a/src/context.rs +++ b/src/context.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use anyhow::anyhow; use chrono::Utc; use hdrhistogram::Histogram; -use itertools::Itertools; +use itertools::{enumerate, Itertools}; use metrohash::{MetroHash128, MetroHash64}; use openssl::error::ErrorStack; use openssl::ssl::{SslContext, SslContextBuilder, SslFiletype, SslMethod}; @@ -82,7 +82,7 @@ pub async fn connect(conf: &ConnectionConf) -> Result { .await .map_err(|e| CassError(CassErrorKind::FailedToConnect(conf.addresses.clone(), e)))?; Ok(Context::new( - scylla_session, + Some(scylla_session), dc.to_string(), conf.retry_number, conf.retry_interval, @@ -228,6 +228,7 @@ pub enum CassErrorKind { SslConfiguration(ErrorStack), FailedToConnect(Vec, NewSessionError), PreparedStatementNotFound(String), + PartitionRowPresetNotFound(String), QueryRetriesExceeded(String), QueryParamConversion(TypeInfo, ColumnType), ValueOutOfRange(String, ColumnType), @@ -237,6 +238,7 @@ pub enum CassErrorKind { Prepare(String, QueryError), Overloaded(QueryInfo, QueryError), QueryExecution(QueryInfo, QueryError), + Error(String), } impl CassError { @@ -252,6 +254,9 @@ impl CassError { CassErrorKind::PreparedStatementNotFound(s) => { write!(buf, "Prepared statement not found: {s}") } + CassErrorKind::PartitionRowPresetNotFound(s) => { + write!(buf, "Partition-row preset not found: {s}") + } CassErrorKind::QueryRetriesExceeded(s) => { write!(buf, "QueryRetriesExceeded: {s}") } @@ -282,6 +287,9 @@ impl CassError { CassErrorKind::QueryExecution(q, e) => { write!(buf, "Failed to execute query {q}: {e}") } + CassErrorKind::Error(s) => { + write!(buf, "Error: {s}") + } } } } @@ -434,11 +442,14 @@ pub async fn handle_retry_error( /// It also tracks query execution metrics such as number of requests, rows, response times etc. #[derive(Any)] pub struct Context { - session: Arc, + // NOTE: 'session' is defined as optional for being able to test methods + // which don't 'depend on'/'use' the 'session' object. + session: Option>, statements: HashMap>, stats: TryLock, retry_number: u64, retry_interval: RetryInterval, + partition_row_presets: HashMap)>, #[rune(get, set, add_assign, copy)] pub load_cycle_count: u64, #[rune(get)] @@ -458,17 +469,18 @@ unsafe impl Sync for Context {} impl Context { pub fn new( - session: scylla::Session, + session: Option, preferred_datacenter: String, retry_number: u64, retry_interval: RetryInterval, ) -> Context { Context { - session: Arc::new(session), + session: session.map(Arc::new), statements: HashMap::new(), stats: TryLock::new(SessionStats::new()), retry_number, retry_interval, + partition_row_presets: HashMap::new(), load_cycle_count: 0, preferred_datacenter: preferred_datacenter, data: Value::Object(Shared::new(Object::new())), @@ -488,6 +500,7 @@ impl Context { stats: TryLock::new(SessionStats::default()), retry_number: self.retry_number, retry_interval: self.retry_interval, + partition_row_presets: self.partition_row_presets.clone(), load_cycle_count: self.load_cycle_count, preferred_datacenter: self.preferred_datacenter.clone(), data: deserialized, @@ -497,73 +510,297 @@ impl Context { /// Returns cluster metadata such as cluster name and cassandra version. pub async fn cluster_info(&self) -> Result, CassError> { let cql = "SELECT cluster_name, release_version FROM system.local"; - let rs = self - .session - .query(cql, ()) - .await - .map_err(|e| CassError::query_execution_error(cql, &[], e)); - match rs { - Ok(rs) => { - if let Some(rows) = rs.rows { - if let Some(row) = rows.into_iter().next() { - if let Ok((name, cassandra_version)) = row.into_typed() { - return Ok(Some(ClusterInfo { - name, - cassandra_version, - })); + + match &self.session { + Some(session) => { + let rs = session + .query(cql, ()) + .await + .map_err(|e| CassError::query_execution_error(cql, &[], e)); + match rs { + Ok(rs) => { + if let Some(rows) = rs.rows { + if let Some(row) = rows.into_iter().next() { + if let Ok((name, cassandra_version)) = row.into_typed() { + return Ok(Some(ClusterInfo { + name, + cassandra_version, + })); + } + } } + Ok(None) + } + Err(e) => { + eprintln!("WARNING: {e}", e=e); + Ok(None) } } - Ok(None) + }, + None => Err(CassError(CassErrorKind::Error("'session' is not defined".to_string()))) + } + } + + /// Creates a preset for uneven row distribution among partitions + pub async fn init_partition_row_distribution_preset( + &mut self, + preset_name: &str, + row_count: u64, + rows_per_partitions_base: u64, + mut rows_per_partitions_groups: &str, // "percent:base_multiplier, ..." -> "80:1,15:2,5:4" + ) -> Result<(), CassError> { + // Validate input data + if preset_name.is_empty() { + return Err(CassError(CassErrorKind::Error( + "init_partition_row_distribution_preset: 'preset_name' cannot be empty".to_string() + ))) + } + if row_count < 1 { + return Err(CassError(CassErrorKind::Error( + "init_partition_row_distribution_preset: 'row_count' cannot be less than '1'.".to_string() + ))) + } + if rows_per_partitions_base < 1 { + return Err(CassError(CassErrorKind::Error( + "init_partition_row_distribution_preset: 'rows_per_partitions_base' cannot be less than '1'.".to_string() + ))) + } + + // Parse the 'rows_per_partitions_groups' string parameter into a HashMap + let mut partn_multipliers: HashMap = HashMap::new(); + if rows_per_partitions_groups.is_empty() { + rows_per_partitions_groups = "95:1,4:2,1:4"; + } + let mut summary_percentage: f64 = 0.0; + let mut duplicates_dump: Vec = Vec::new(); + for pair in rows_per_partitions_groups.split(',') { + let processed_pair = &pair.replace(" ", ""); + if duplicates_dump.contains(processed_pair) { + return Err(CassError(CassErrorKind::Error(format!( + "init_partition_row_distribution_preset: found duplicates pairs - '{processed_pair}'") + ))) } - Err(e) => { - eprintln!("WARNING: {e}", e=e); - Ok(None) + let parts: Vec<&str> = processed_pair.split(':').collect(); + if let (Some(key), Some(value)) = (parts.get(0), parts.get(1)) { + if let (Ok(k), Ok(v)) = (key.parse::(), value.parse::()) { + let current_pair_key = format!("{k}:{v}"); + partn_multipliers.insert(current_pair_key.clone(), (k, v)); + summary_percentage += k; + duplicates_dump.push(current_pair_key); + } else { + return Err(CassError(CassErrorKind::Error(format!( + "init_partition_row_distribution_preset: \ + Wrong sub-value provided in the 'rows_per_partitions_groups' parameter: '{processed_pair}'. \ + It must be set of integer pairs separated with a ':' symbol. Example: '49.1:1,49:2,1.9:2.5'") + ))) + } + } + } + if (summary_percentage - 100.0).abs() > 0.01 { + return Err(CassError(CassErrorKind::Error(format!( + "init_partition_row_distribution_preset: \ + summary of partition percentage must be '100'. Got '{summary_percentage}' instead") + ))) + } + + // Calculate values + let mut partn_sizes: HashMap = HashMap::new(); + let mut partn_counts: HashMap = HashMap::new(); + let mut partn_cycle_size: f64 = 0.0; + for (key, (partn_percent, partn_multiplier)) in &partn_multipliers { + partn_sizes.insert( + key.to_string(), + (*partn_percent, ((rows_per_partitions_base as f64) * partn_multiplier) as u64) + ); + let partition_type_size: f64 = rows_per_partitions_base as f64 * partn_multiplier * partn_percent / 100.0; + partn_cycle_size += partition_type_size; + } + let mut partn_count: u64 = (row_count as f64 / partn_cycle_size) as u64; + for (key, (partn_percent, _partn_multiplier)) in &partn_multipliers { + let current_partn_count: u64 = ((partn_count as f64) * partn_percent / 100.0) as u64; + partn_counts.insert(key.to_string(), (*partn_percent, current_partn_count)); + } + partn_count = partn_counts.values().map(|&(_, last)| last).sum(); + + // Combine calculated data into a vector of tuples + let mut actual_row_count: u64 = 0; + let mut partitions: Vec<(f64, u64, u64, f64)> = Vec::new(); + for (key, (_partn_percent, partn_cnt)) in &partn_counts { + if let Some((_partn_percent, partn_size)) = partn_sizes.get(key) { + if let Some((partn_percent, partn_multiplier)) = partn_multipliers.get(key) { + partitions.push((*partn_percent, *partn_cnt, *partn_size, *partn_multiplier)); + actual_row_count += partn_cnt * partn_size; + } + } + } + partitions.sort_by(|a, b| b.1.cmp(&a.1).then(b.2.cmp(&a.2))); + + // Adjust partitions based on the difference between requested and total row count + let mut row_count_diff: u64 = 0; + if row_count > actual_row_count { + row_count_diff = row_count - actual_row_count; + let smallest_partn_count_diff = row_count_diff / partitions[0].2; + if smallest_partn_count_diff > 0 { + partn_count += smallest_partn_count_diff; + partitions[0].1 += smallest_partn_count_diff; + let additional_rows: u64 = smallest_partn_count_diff * partitions[0].2; + actual_row_count += additional_rows; + row_count_diff -= additional_rows; + } + } else if row_count < actual_row_count { + row_count_diff = actual_row_count - row_count; + let mut smallest_partn_count_diff = row_count_diff / partitions[0].2; + if row_count_diff % partitions[0].2 > 0 { + smallest_partn_count_diff += 1; } + if smallest_partn_count_diff > 0 { + partn_count -= smallest_partn_count_diff; + partitions[0].1 -= smallest_partn_count_diff; + actual_row_count -= smallest_partn_count_diff * partitions[0].2; + let additional_rows: u64 = smallest_partn_count_diff * partitions[0].2; + actual_row_count -= additional_rows; + row_count_diff = additional_rows - row_count_diff; + } + } + if row_count_diff > 0 { + partn_count += 1; + let mut same_size_exists = false; + for (i, partition) in enumerate(partitions.clone()) { + if partition.2 == row_count_diff { + partitions[i].1 += 1; + same_size_exists = true; + break + } + } + if !same_size_exists { + partitions.push(((100000.0 / (partn_count as f64)).round() / 1000.0, 1, row_count_diff, 1.0)); + } + actual_row_count += row_count_diff; } + partitions.sort_by(|a, b| b.1.cmp(&a.1).then(b.2.cmp(&a.2))); + + // Print calculated values + let partitions_str = partitions + .iter() + .map(|(_percent, partns, rows, _multiplier)| { + let percent = *partns as f64 / partn_count as f64 * 100.0; + let percent_str = format!("{:.10}", percent); + let parts = percent_str.split('.').collect::>(); + if parts.len() == 2 { + let int_part = parts[0]; + let mut frac_part: String = "".to_string(); + if parts[1].matches("0").count() != parts[1].len() { + frac_part = parts[1].chars() + .take_while(|&ch| ch == '0') + .chain(parts[1].chars().filter(|&ch| ch != '0').take(2)) + .collect::(); + } + if frac_part.len() > 0 { + frac_part = format!(".{}", frac_part); + } + format!("{}(~{}{}%):{}", partns, int_part, frac_part, rows) + } else { + format!("{}(~{}%):{}", partns, parts[0], rows) + } + }) + .collect::>() + .join(", "); + println!( + "info: init_partition_row_distribution_preset: \ + preset_name={preset_name}, total_partitions={partn_count}, total_rows={total_rows}\ + , partitions/rows -> {partitions}", + preset_name=preset_name, + partn_count=partn_count, + total_rows=actual_row_count, + partitions=partitions_str, + ); + + // Save data for further usage + partitions.retain(|&(_, current_partn_count, _, _)| current_partn_count != 0); + self.partition_row_presets.insert(preset_name.to_string(), (actual_row_count, partitions.clone())); + + Ok(()) + } + + /// Returns a partition index based on the stress operation index and a preset of values + pub async fn get_partition_idx(&self, preset_name: &str, idx: u64) -> Result { + let preset = self.partition_row_presets + .get(preset_name) + .ok_or_else(|| CassError(CassErrorKind::PartitionRowPresetNotFound(preset_name.to_string())))?; + let row_count = preset.0; + let partitions = &preset.1; + let current_idx = idx - ((idx / row_count) * row_count); + + let mut idx_offset: u64 = 0; + let mut partition_offset: u64 = 0; + for current_partition in partitions { + let current_partition_count = current_partition.1; + let current_partition_size = current_partition.2; + if current_idx < current_partition_count * current_partition_size + idx_offset { + return Ok(partition_offset + ((current_idx - idx_offset) / current_partition_size)) + } + idx_offset += current_partition_count * current_partition_size; + partition_offset += current_partition_count; + } + + Ok(partition_offset) } /// Returns list of datacenters used by nodes pub async fn get_datacenters(&self) -> Result, CassError> { - let dc_info = self.session.get_cluster_data().get_datacenters_info(); - let mut datacenters: Vec = dc_info.keys().cloned().collect(); - datacenters.sort(); - Ok(datacenters) + match &self.session { + Some(session) => { + let dc_info = session.get_cluster_data().get_datacenters_info(); + let mut datacenters: Vec = dc_info.keys().cloned().collect(); + datacenters.sort(); + Ok(datacenters) + }, + None => Err(CassError(CassErrorKind::Error("'session' is not defined".to_string()))), + } } /// Prepares a statement and stores it in an internal statement map for future use. pub async fn prepare(&mut self, key: &str, cql: &str) -> Result<(), CassError> { - let statement = self - .session - .prepare(cql) - .await - .map_err(|e| CassError::prepare_error(cql, e))?; - self.statements.insert(key.to_string(), Arc::new(statement)); - Ok(()) + match &self.session { + Some(session) => { + let statement = session + .prepare(cql) + .await + .map_err(|e| CassError::prepare_error(cql, e))?; + self.statements.insert(key.to_string(), Arc::new(statement)); + Ok(()) + }, + None => Err(CassError(CassErrorKind::Error("'session' is not defined".to_string()))), + } } /// Executes an ad-hoc CQL statement with no parameters. Does not prepare. pub async fn execute(&self, cql: &str) -> Result<(), CassError> { - for current_attempt_num in 0..self.retry_number + 1 { - let start_time = self.stats.try_lock().unwrap().start_request(); - let rs = self.session.query(cql, ()).await; - let duration = Instant::now() - start_time; - match rs { - Ok(_) => {} - Err(e) => { - let current_error = CassError::query_execution_error(cql, &[], e.clone()); - handle_retry_error(self, current_attempt_num, current_error).await; - continue; + match &self.session { + Some(session) => { + for current_attempt_num in 0..self.retry_number + 1 { + let start_time = self.stats.try_lock().unwrap().start_request(); + let rs = session.query(cql, ()).await; + let duration = Instant::now() - start_time; + match rs { + Ok(_) => {} + Err(e) => { + let current_error = CassError::query_execution_error(cql, &[], e.clone()); + handle_retry_error(self, current_attempt_num, current_error).await; + continue; + } + } + self.stats + .try_lock() + .unwrap() + .complete_request(duration, &rs); + rs.map_err(|e| CassError::query_execution_error(cql, &[], e.clone()))?; + return Ok(()); } - } - self.stats - .try_lock() - .unwrap() - .complete_request(duration, &rs); - rs.map_err(|e| CassError::query_execution_error(cql, &[], e.clone()))?; - return Ok(()); + Err(CassError::query_retries_exceeded(self.retry_number)) + }, + None => Err(CassError(CassErrorKind::Error("'session' is not defined".to_string()))), } - Err(CassError::query_retries_exceeded(self.retry_number)) } /// Executes a statement prepared and registered earlier by a call to `prepare`. @@ -574,32 +811,37 @@ impl Context { .ok_or_else(|| CassError(CassErrorKind::PreparedStatementNotFound(key.to_string())))?; let params = bind::to_scylla_query_params(¶ms, statement.get_variable_col_specs())?; - for current_attempt_num in 0..self.retry_number + 1 { - let start_time = self.stats.try_lock().unwrap().start_request(); - let rs = self.session.execute(statement, params.clone()).await; - let duration = Instant::now() - start_time; - match rs { - Ok(_) => {} - Err(e) => { - let current_error = CassError::query_execution_error( - statement.get_statement(), - ¶ms, - e.clone(), - ); - handle_retry_error(self, current_attempt_num, current_error).await; - continue; + match &self.session { + Some(session) => { + for current_attempt_num in 0..self.retry_number + 1 { + let start_time = self.stats.try_lock().unwrap().start_request(); + let rs = session.execute(statement, params.clone()).await; + let duration = Instant::now() - start_time; + match rs { + Ok(_) => {} + Err(e) => { + let current_error = CassError::query_execution_error( + statement.get_statement(), + ¶ms, + e.clone(), + ); + handle_retry_error(self, current_attempt_num, current_error).await; + continue; + } + } + self.stats + .try_lock() + .unwrap() + .complete_request(duration, &rs); + rs.map_err(|e| { + CassError::query_execution_error(statement.get_statement(), ¶ms, e) + })?; + return Ok(()); } - } - self.stats - .try_lock() - .unwrap() - .complete_request(duration, &rs); - rs.map_err(|e| { - CassError::query_execution_error(statement.get_statement(), ¶ms, e) - })?; - return Ok(()); + Err(CassError::query_retries_exceeded(self.retry_number)) + }, + None => Err(CassError(CassErrorKind::Error("'session' is not defined".to_string()))), } - Err(CassError::query_retries_exceeded(self.retry_number)) } /// Returns the current accumulated request stats snapshot and resets the stats. @@ -1126,3 +1368,238 @@ pub fn read_resource_lines(path: &str) -> io::Result> { .map(|s| s.to_string()) .collect_vec()) } + +#[cfg(test)] +mod tests { + use super::*; + + // NOTE: if tests which use session object get added + // then need to define the 'SCYLLA_URI="172.17.0.2:9042"' env var + // and create a DB session like following: + // let session = tokio::runtime::Runtime::new() + // .unwrap() + // .block_on(async { + // let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); + // SessionBuilder::new().known_node(uri).build().await.unwrap() + // }); + // let mut ctxt: Context = Context::new(Some(session), ...); + + fn init_and_use_partition_row_distribution_preset( + row_count: u64, + rows_per_partitions_base: u64, + rows_per_partitions_groups: String, // "95:1,4:2,1:4" + expected_result: (u64, Vec<(f64, u64, u64, f64)>), // partn_percent, partn_cnt, partn_size, partn_multiplier + expected_idx_partition_idx_mapping: Vec<(u64, u64)>, + ) { + let mut ctxt: Context = Context::new( + None, "foo-dc".to_string(), 0, RetryInterval::new("1,2").expect("REASON"), + ); + let preset_name = "foo_name"; + + assert!(ctxt.partition_row_presets.is_empty(), "The 'partition_row_presets' HashMap should not be empty"); + + tokio::runtime::Runtime::new().unwrap().block_on(async { + let _ = ctxt.init_partition_row_distribution_preset( + preset_name, row_count, rows_per_partitions_base, &rows_per_partitions_groups).await; + }); + + assert!(!ctxt.partition_row_presets.is_empty(), "The 'partition_row_presets' HashMap should not be empty"); + let actual_value = ctxt.partition_row_presets.get(preset_name) + .unwrap_or_else(|| panic!("Preset with name '{}' was not found", preset_name)); + assert_eq!(&expected_result, actual_value); + + for (idx, expected_partition_idx) in expected_idx_partition_idx_mapping { + let partition_idx = tokio::runtime::Runtime::new().unwrap().block_on(async { + ctxt.get_partition_idx(preset_name, idx).await + }).expect("Failed to get partition index"); + assert_eq!( + expected_partition_idx, partition_idx, "{}", + format_args!( + "Using '{}' idx expected partition_idx is '{}', but got '{}'", + idx, expected_partition_idx, partition_idx + ) + ); + } + } + + #[test] + fn test_partition_row_distribution_preset_01_pos_single_group_evenly_divisible() { + // total_partitions=40, total_rows=1000, partitions/rows -> 40(~100%):25 + init_and_use_partition_row_distribution_preset( + 1000, 25, "100:1".to_string(), + (1000, vec![(100.0, 40, 25, 1.0)]), + vec![ + // The only partition group as 1st cycle + (0, 0), (24, 0), (25, 1), (49, 1), (50, 2), (999, 39), + // Next cycle + (1000, 0), (1001, 0), (1024, 0), (1025, 1), + ], + ); + } + + #[test] + fn test_partition_row_distribution_preset_02_pos_single_group_unevenly_divisible() { + // total_partitions=77, total_rows=1000, partitions/rows -> 76(~98.71%):13, 1(~1.29%):12 + init_and_use_partition_row_distribution_preset( + 1000, 13, "100:1".to_string(), + (1000, vec![(100.0, 76, 13, 1.0), (1.299, 1, 12, 1.0)]), + vec![ + // 1st partition group + (0, 0), (12, 0), (13, 1), (25, 1), (26, 2), (987, 75), + // 2nd partition group + (988, 76), (999, 76), + // Next cycle + (1000, 0), (1001, 0), + ], + ); + } + + #[test] + fn test_partition_row_distribution_preset_03_pos_multiple_groups_with_implicit_one() { + // total_partitions=90, total_rows=1000, + // partitions/rows -> 46(~51.11%):6, 26(~28.88%):12, 17(~18.88%):24, 1(~1.11%):4 + init_and_use_partition_row_distribution_preset( + 1000, 6, "50:1,30:2,20:4".to_string(), + (1000, vec![(50.0, 46, 6, 1.0), (30.0, 26, 12, 2.0), (20.0, 17, 24, 4.0), (1.111, 1, 4, 1.0)]), + vec![ + // 1st cycle, 1st partition group + (0, 0), (5, 0), (6, 1), (11, 1), (12, 2), (275, 45), + // 1st cycle, 2nd partition group + (276, 46), (287, 46), (288, 47), (587, 71), + // 1st cycle, 3rd partition group + (588, 72), (611, 72), (612, 73), (995, 88), + // 1st cycle, 4th partition group + (996, 89), (997, 89), (998, 89), (999, 89), + // 2nd cycle, 1st partition group + (1000, 0), (1001, 0), (1005, 0), (1006, 1), (1999, 89), + // 3rd cycle, 2nd partition group + (2000, 0), (2001, 0), (2005, 0), (2006, 1), + ], + ); + } + + #[test] + fn test_partition_row_distribution_preset_04_pos_multiple_groups_with_implicit_one_different_input_same_result() { + // total_partitions=90, total_rows=1000, + // partitions/rows -> 46(~51.11%):6, 26(~28.88%):12, 17(~18.88%):24, 1(~1.11%):4 + init_and_use_partition_row_distribution_preset( + // NOTE: this set of data differs from the test above with different value for + // the 'rows_per_partitions_base' and multipliers, but resulting values must be the same + 1000, 12, "50:0.5,30:1,20:2".to_string(), + (1000, vec![(50.0, 46, 6, 0.5), (30.0, 26, 12, 1.0), (20.0, 17, 24, 2.0), (1.111, 1, 4, 1.0)]), + vec![ + // 1st cycle, 1st partition group + (0, 0), (5, 0), (6, 1), (11, 1), (12, 2), (275, 45), + // 1st cycle, 2nd partition group + (276, 46), (287, 46), (288, 47), (587, 71), + // 1st cycle, 3rd partition group + (588, 72), (611, 72), (612, 73), (995, 88), + // 1st cycle, 4th partition group + (996, 89), (997, 89), (998, 89), (999, 89), + // 2nd cycle, 1st partition group + (1000, 0), (1001, 0), (1005, 0), (1006, 1), (1999, 89), + // 3rd cycle, 2nd partition group + (2000, 0), (2001, 0), (2005, 0), (2006, 1), + ], + ); + } + + #[test] + fn test_partition_row_distribution_preset_05_pos_multiple_groups_without_implicit_one() { + // total_partitions=664, total_rows=10000, + // partitions/rows -> 332(~50%):20, 330(~49.69%):10, 1(~0.15%):50, 1(~0.15%):10 + init_and_use_partition_row_distribution_preset( + 10000, 10, "49.9:1,49.9:2, 0.2:5".to_string(), + (10000, vec![(49.9, 332, 20, 2.0), (49.9, 331, 10, 1.0), (0.2, 1, 50, 5.0)]), + vec![ + (0, 0), (19, 0), (20, 1), (39, 1), (40, 2), (6639, 331), + (6640, 332), (9949, 662), + (9950, 663), (9999, 663), + (10000, 0), (10019, 0), (10020, 1), + ], + ); + } + + #[test] + fn test_partition_row_distribution_preset_06_pos_multiple_presets() { + let name_foo: String = "foo".to_string(); + let name_bar: String = "bar".to_string(); + let mut ctxt: Context = Context::new( + None, "foo-dc".to_string(), 0, RetryInterval::new("1,2").expect("REASON"), + ); + + assert!(ctxt.partition_row_presets.is_empty(), "The 'partition_row_presets' HashMap should be empty"); + let foo_value = ctxt.partition_row_presets.get(&name_foo); + assert_eq!(None, foo_value); + + tokio::runtime::Runtime::new().unwrap().block_on(async { + ctxt.init_partition_row_distribution_preset( + &name_foo, 1000, 10, "100:1").await + }).unwrap_or_else(|_| panic!("The '{}' preset must have been created successfully", name_foo)); + assert!(!ctxt.partition_row_presets.is_empty(), "The 'partition_row_presets' HashMap should not be empty"); + ctxt.partition_row_presets.get(&name_foo) + .unwrap_or_else(|| panic!("Preset with name '{}' was not found", name_foo)); + + let absent_bar = ctxt.partition_row_presets.get(&name_bar); + assert_eq!(None, absent_bar, "{}", format_args!("The '{}' preset was expected to be absent", name_bar)); + + tokio::runtime::Runtime::new().unwrap().block_on(async { + ctxt.init_partition_row_distribution_preset( + &name_bar, 1000, 10, "90:1,10:2").await + }).unwrap_or_else(|_| panic!("The '{}' preset must have been created successfully", name_bar)); + ctxt.partition_row_presets.get(&name_bar) + .unwrap_or_else(|| panic!("Preset with name '{}' was not found", name_bar)); + } + + fn false_input_for_partition_row_distribution_preset( + preset_name: String, + row_count: u64, + rows_per_partitions_base: u64, + rows_per_partitions_groups: String, + ) { + let mut ctxt: Context = Context::new( + None, "foo-dc".to_string(), 0, RetryInterval::new("1,2").expect("REASON"), + ); + let result = tokio::runtime::Runtime::new().unwrap().block_on(async { + ctxt.init_partition_row_distribution_preset( + &preset_name, row_count, rows_per_partitions_base, &rows_per_partitions_groups).await + }); + + assert!(matches!(result, Err(ref _e)), "Error result was expected, but got: {:?}", result); + } + + #[test] + fn test_partition_row_distribution_preset_07_neg_empty_preset_name() { + false_input_for_partition_row_distribution_preset("".to_string(), 1000, 10, "100:1".to_string()) + } + + #[test] + fn test_partition_row_distribution_preset_08_neg_zero_rows() { + false_input_for_partition_row_distribution_preset("foo".to_string(), 0, 10, "100:1".to_string()) + } + + #[test] + fn test_partition_row_distribution_preset_09_neg_zero_base() { + false_input_for_partition_row_distribution_preset("foo".to_string(), 1000, 0, "100:1".to_string()) + } + + #[test] + fn test_partition_row_distribution_preset_10_neg_percentage_is_less_than_100() { + false_input_for_partition_row_distribution_preset("foo".to_string(), 1000, 10, "90:1,9.989:2".to_string()) + } + + #[test] + fn test_partition_row_distribution_preset_11_neg_percentage_is_more_than_100() { + false_input_for_partition_row_distribution_preset("foo".to_string(), 1000, 10, "90:1,10.011:2".to_string()) + } + + #[test] + fn test_partition_row_distribution_preset_12_neg_duplicated_percentages() { + false_input_for_partition_row_distribution_preset("foo".to_string(), 1000, 10, "50:1 , 50:1".to_string()) + } + + #[test] + fn test_partition_row_distribution_preset_13_neg_wrong_percentages() { + false_input_for_partition_row_distribution_preset("foo".to_string(), 1000, 10, "90:1,ten:1".to_string()) + } +} diff --git a/src/workload.rs b/src/workload.rs index 7e7531b..4d3b7af 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -101,6 +101,12 @@ impl Program { pub fn new(source: Source, params: HashMap) -> Result { let mut context_module = Module::default(); context_module.ty::().unwrap(); + context_module + .async_inst_fn("init_partition_row_distribution_preset", Context::init_partition_row_distribution_preset) + .unwrap(); + context_module + .async_inst_fn("get_partition_idx", Context::get_partition_idx) + .unwrap(); context_module .async_inst_fn("get_datacenters", Context::get_datacenters) .unwrap();