Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(air): introduce explicit types for generation numbers [fixes VM-261] #530

Merged
merged 17 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions air/src/execution_step/boxed_value/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::ExecutionError;
use crate::JValue;
use crate::UncatchableError;

use air_interpreter_data::GenerationIdx;
use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler;

Expand All @@ -34,20 +35,19 @@ pub struct Stream {
values: Vec<Vec<ValueAggregate>>,

/// Count of values from previous data.
previous_gens_count: usize,
previous_gens_count: GenerationIdx,
}

impl Stream {
pub(crate) fn from_generations_count(previous_count: usize, current_count: usize) -> Self {
let last_generation_count = 1;
pub(crate) fn from_generations_count(previous_count: GenerationIdx, current_count: GenerationIdx) -> Self {
let last_generation_count: GenerationIdx = 1.into();
// TODO: bubble up an overflow error instead of expect
let overall_count = previous_count
.checked_add(current_count)
.and_then(|value| value.checked_add(last_generation_count))
.expect("it shouldn't overflow");

Self {
values: vec![vec![]; overall_count],
values: vec![vec![]; overall_count.try_into().unwrap()],
previous_gens_count: previous_count,
}
}
Expand All @@ -68,11 +68,11 @@ impl Stream {
value: ValueAggregate,
generation: Generation,
source: ValueSource,
) -> ExecutionResult<u32> {
) -> ExecutionResult<GenerationIdx> {
let generation_number = match (generation, source) {
(Generation::Last, _) => self.values.len() - 1,
(Generation::Nth(previous_gen), ValueSource::PreviousData) => previous_gen as usize,
(Generation::Nth(current_gen), ValueSource::CurrentData) => self.previous_gens_count + current_gen as usize,
(Generation::Last, _) => self.values.len().into() - 1,
(Generation::Nth(previous_gen), ValueSource::PreviousData) => previous_gen,
(Generation::Nth(current_gen), ValueSource::CurrentData) => self.previous_gens_count + current_gen,
};

if generation_number >= self.values.len() {
Expand All @@ -87,16 +87,16 @@ impl Stream {
Ok(generation_number as u32)
}

pub(crate) fn generations_count(&self) -> usize {
pub(crate) fn generations_count(&self) -> GenerationIdx {
// the last generation could be empty due to the logic of from_generations_count ctor
if self.values.last().unwrap().is_empty() {
self.values.len() - 1
self.values.len().try_into().unwrap() - 1
} else {
self.values.len()
self.values.len().try_into().unwrap()
}
}

pub(crate) fn last_non_empty_generation(&self) -> usize {
pub(crate) fn last_non_empty_generation(&self) -> GenerationIdx {
self.values
.iter()
.rposition(|generation| !generation.is_empty())
Expand Down Expand Up @@ -133,10 +133,10 @@ impl Stream {
should_remove_generation
}

pub(crate) fn elements_count(&self, generation: Generation) -> Option<usize> {
pub(crate) fn elements_count(&self, generation: Generation) -> Option<GenerationIdx> {
match generation {
Generation::Nth(generation) if generation as usize > self.generations_count() => None,
Generation::Nth(generation) => Some(self.values.iter().take(generation as usize).map(|v| v.len()).sum()),
Generation::Nth(generation) if generation.into() > self.generations_count() => None,
Generation::Nth(generation) => Some(self.values.iter().take(generation.into()).map(|v| v.len()).sum()),
Generation::Last => Some(self.values.iter().map(|v| v.len()).sum()),
}
}
Expand All @@ -160,7 +160,7 @@ impl Stream {

pub(crate) fn iter(&self, generation: Generation) -> Option<StreamIter<'_>> {
let iter: Box<dyn Iterator<Item = &ValueAggregate>> = match generation {
Generation::Nth(generation) if generation as usize >= self.generations_count() => return None,
Generation::Nth(generation) if generation >= self.generations_count() => return None,
Generation::Nth(generation) => {
Box::new(self.values.iter().take(generation as usize + 1).flat_map(|v| v.iter()))
}
Expand All @@ -179,7 +179,7 @@ impl Stream {
return None;
}

let generations_count = self.generations_count() as u32 - 1;
let generations_count = self.generations_count().into() - 1;
let (start, end) = match (start, end) {
(Generation::Nth(start), Generation::Nth(end)) => (start, end),
(Generation::Nth(start), Generation::Last) => (start, generations_count),
Expand All @@ -200,13 +200,13 @@ impl Stream {
}

/// Removes empty generations updating data and returns final generation count.
pub(crate) fn compactify(mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<usize> {
pub(crate) fn compactify(mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<GenerationIdx> {
self.remove_empty_generations();

for (generation, values) in self.values.iter().enumerate() {
for value in values.iter() {
trace_ctx
.update_generation(value.trace_pos, generation as u32)
.update_generation(value.trace_pos, generation)
.map_err(|e| ExecutionError::Uncatchable(UncatchableError::GenerationCompatificationError(e)))?;
}
}
Expand All @@ -223,7 +223,7 @@ impl Stream {
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Generation {
Last,
Nth(u32),
Nth(GenerationIdx),
}

pub(crate) struct StreamIter<'result> {
Expand Down
30 changes: 16 additions & 14 deletions air/src/execution_step/execution_context/streams_variables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ mod utils;
use crate::execution_step::ExecutionResult;
use crate::execution_step::Stream;
use crate::ExecutionError;

use stream_descriptor::*;
pub(crate) use stream_value_descriptor::StreamValueDescriptor;

use air_interpreter_data::GlobalStreamGens;
use air_interpreter_data::RestrictedStreamGens;
use air_interpreter_data::GenerationIdx;
use air_parser::ast::Span;
use air_parser::AirPos;
use air_trace_handler::TraceHandler;
Expand Down Expand Up @@ -82,7 +84,7 @@ impl Streams {
.and_then(|descriptors| find_closest_mut(descriptors.iter_mut(), position))
}

pub(crate) fn add_stream_value(&mut self, value_descriptor: StreamValueDescriptor<'_>) -> ExecutionResult<u32> {
pub(crate) fn add_stream_value(&mut self, value_descriptor: StreamValueDescriptor<'_>) -> ExecutionResult<GenerationIdx> {
let StreamValueDescriptor {
value,
name,
Expand All @@ -105,17 +107,17 @@ impl Streams {
let descriptor = StreamDescriptor::global(stream);
self.streams.insert(name.to_string(), vec![descriptor]);
let generation = 0;
Ok(generation)
Ok(generation.into())
}
}
}

pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: u32) {
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: GenerationIdx) {
let name = name.into();
let (prev_gens_count, current_gens_count) =
self.stream_generation_from_data(&name, span.left, iteration as usize);
self.stream_generation_from_data(&name, span.left, iteration);

let new_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize);
let new_stream = Stream::from_generations_count(prev_gens_count.into(), current_gens_count.into());
let new_descriptor = StreamDescriptor::restricted(new_stream, span);
match self.streams.entry(name) {
Occupied(mut entry) => {
Expand Down Expand Up @@ -143,7 +145,7 @@ impl Streams {
}
let gens_count = last_descriptor.stream.compactify(trace_ctx)?;

self.collect_stream_generation(name, position, gens_count as u32);
self.collect_stream_generation(name, position, gens_count /*as u32*/);
mikevoronov marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

Expand All @@ -164,14 +166,14 @@ impl Streams {
// of the execution
let stream = descriptors.pop().unwrap().stream;
let gens_count = stream.compactify(trace_ctx)?;
Ok((name, gens_count as u32))
Ok((name, gens_count.into()))
})
.collect::<Result<GlobalStreamGens, _>>()?;

Ok((global_streams, self.new_restricted_stream_gens))
}

fn stream_generation_from_data(&self, name: &str, position: AirPos, iteration: usize) -> (u32, u32) {
fn stream_generation_from_data(&self, name: &str, position: AirPos, iteration: GenerationIdx) -> (GenerationIdx, GenerationIdx) {
let previous_generation =
Self::restricted_stream_generation(&self.previous_restricted_stream_gens, name, position, iteration)
.unwrap_or_default();
Expand All @@ -186,25 +188,25 @@ impl Streams {
restricted_stream_gens: &RestrictedStreamGens,
name: &str,
position: AirPos,
iteration: usize,
) -> Option<u32> {
iteration: GenerationIdx,
) -> Option<GenerationIdx> {
restricted_stream_gens
.get(name)
.and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration)))
.copied()
}

fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: u32) {
fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: GenerationIdx) {
match self.new_restricted_stream_gens.entry(name) {
Occupied(mut streams) => match streams.get_mut().entry(position) {
Occupied(mut iterations) => iterations.get_mut().push(generation),
Occupied(mut iterations) => iterations.get_mut().push(generation.into()),
Vacant(entry) => {
entry.insert(vec![generation]);
entry.insert(vec![generation.into()]);
}
},
Vacant(entry) => {
let iterations = maplit::hashmap! {
position => vec![generation],
position => vec![generation.into()],
};
entry.insert(iterations);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(super) fn merge_global_streams(
.iter()
.map(|(stream_name, &prev_gens_count)| {
let current_gens_count = current_global_streams.get(stream_name).cloned().unwrap_or_default();
let global_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize);
let global_stream = Stream::from_generations_count(prev_gens_count.into(), current_gens_count.into());
let descriptor = StreamDescriptor::global(global_stream);
(stream_name.to_string(), vec![descriptor])
})
Expand All @@ -40,7 +40,7 @@ pub(super) fn merge_global_streams(
continue;
}

let global_stream = Stream::from_generations_count(0, current_gens_count as usize);
let global_stream = Stream::from_generations_count(0, current_gens_count.into());
let descriptor = StreamDescriptor::global(global_stream);
global_streams.insert(stream_name, vec![descriptor]);
}
Expand Down
5 changes: 3 additions & 2 deletions air/src/execution_step/instructions/ap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use utils::*;
use air_parser::ast;
use air_parser::ast::Ap;
use air_trace_handler::merger::MergerApResult;
use air_interpreter_data::GenerationIdx;

use std::rc::Rc;

Expand Down Expand Up @@ -75,7 +76,7 @@ fn populate_context<'ctx>(
merger_ap_result: &MergerApResult,
result: ValueAggregate,
exec_ctx: &mut ExecutionCtx<'ctx>,
) -> ExecutionResult<Option<u32>> {
) -> ExecutionResult<Option<GenerationIdx>> {
match ap_result {
ast::ApResult::Scalar(scalar) => exec_ctx.scalars.set_scalar_value(scalar.name, result).map(|_| None),
ast::ApResult::Stream(stream) => {
Expand All @@ -85,7 +86,7 @@ fn populate_context<'ctx>(
}
}

fn maybe_update_trace(maybe_generation: Option<u32>, trace_ctx: &mut TraceHandler) {
fn maybe_update_trace(maybe_generation: Option<GenerationIdx>, trace_ctx: &mut TraceHandler) {
use air_interpreter_data::ApResult;

if let Some(generation) = maybe_generation {
Expand Down
17 changes: 7 additions & 10 deletions air/src/execution_step/instructions/call/call_result_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@ pub(crate) fn populate_context_from_peer_service_result<'i>(
Generation::Last,
stream.position,
);
let generation = exec_ctx.streams.add_stream_value(value_descriptor)?;
Ok(CallResult::executed_stream(service_result_agg_cid, generation))
}
CallOutputValue::None => {
let value_cid = value_to_json_cid(&*executed_result.result)
.map_err(UncatchableError::from)?
.into();

Ok(CallResult::executed_unused(value_cid))
}
let generation = exec_ctx.streams.add_stream_value(value_descriptor)?.into();
Ok(CallResult::executed_stream(cid, generation))
}
// by the internal conventions if call has no output value,
// corresponding data should have scalar type
CallOutputValue::None => Ok(CallResult::executed_scalar(cid)),
}
}

Expand Down Expand Up @@ -100,7 +97,7 @@ pub(crate) fn populate_context_from_data<'i>(

let result = ValueRef::Stream {
cid,
generation: resulted_generation,
generation: resulted_generation.into(),
};
Ok(result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,28 @@
* limitations under the License.
*/

use air_interpreter_data::GenerationIdx;

use super::construct_stream_iterable_values;
use crate::execution_step::boxed_value::Generation;
use crate::execution_step::boxed_value::Stream;
use crate::execution_step::instructions::fold::IterableValue;

pub(super) struct StreamCursor {
last_seen_generation: u32,
last_seen_generation: GenerationIdx,
}

impl StreamCursor {
pub(super) fn new() -> Self {
Self {
last_seen_generation: 0,
last_seen_generation: 0.into(),
}
}

pub(super) fn construct_iterables(&mut self, stream: &Stream) -> Vec<IterableValue> {
let iterables =
construct_stream_iterable_values(stream, Generation::Nth(self.last_seen_generation), Generation::Last);
self.last_seen_generation = stream.last_non_empty_generation() as u32;
self.last_seen_generation = stream.last_non_empty_generation() /*as u32*/;
mikevoronov marked this conversation as resolved.
Show resolved Hide resolved

iterables
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl InstructionTracker {
}

impl NewTracker {
pub fn get_iteration(&self, position: AirPos) -> u32 {
pub fn get_iteration(&self, position: AirPos) -> u32{
self.executed_count
.get(&position)
.copied()
Expand Down
9 changes: 5 additions & 4 deletions crates/air-lib/interpreter-data/src/executed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod se_de;

use crate::JValue;
use crate::TracePos;
use crate::GenerationIdx;

use air_interpreter_cid::CID;
use polyplets::SecurityTetraplet;
Expand Down Expand Up @@ -85,8 +86,8 @@ pub enum ValueRef {
Scalar(Rc<CID<ServiceResultAggregate>>),
/// The call value is stored to a stream variable.
Stream {
cid: Rc<CID<ServiceResultAggregate>>,
generation: u32,
cid: Rc<CID<JValue>>,
mikevoronov marked this conversation as resolved.
Show resolved Hide resolved
generation: GenerationIdx,
},
/// The call value is not stored.
Unused(Rc<CID<JValue>>),
Expand Down Expand Up @@ -128,7 +129,7 @@ pub struct ServiceResultAggregate {
/// (call 3)
/// (call 4)
/// )
///
///x
mikevoronov marked this conversation as resolved.
Show resolved Hide resolved
/// Having started with stream with two elements {v1, v2} the resulted trace would looks like
/// [(1) (2)] [(1) (2)] [(3) (4)] [(3) (4)] <--- the sequence of call states
/// v1 v2 v2 v1 <---- corresponding values from $stream that
Expand Down Expand Up @@ -179,7 +180,7 @@ pub struct FoldResult {
#[serde(rename_all = "snake_case")]
pub struct ApResult {
#[serde(rename = "gens")]
pub res_generations: Vec<u32>,
pub res_generations: Vec<GenerationIdx>,
}

/// Contains ids of element that were on a stream at the moment of an appropriate canon call.
Expand Down
Loading