Skip to content

Commit

Permalink
Merge branch 'main' into flow_editor_step_selector
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Oct 1, 2024
2 parents 1d5dd80 + 161c3fe commit 69b4112
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 16 deletions.
9 changes: 8 additions & 1 deletion backend/tests/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ async fn test_deno_flow(db: Pool<Postgres>) {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
},
FlowModule {
id: "b".to_string(),
Expand Down Expand Up @@ -1166,6 +1167,7 @@ async fn test_deno_flow(db: Pool<Postgres>) {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
}],
}
.into(),
Expand All @@ -1181,6 +1183,7 @@ async fn test_deno_flow(db: Pool<Postgres>) {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
},
],
same_worker: false,
Expand Down Expand Up @@ -1286,6 +1289,7 @@ async fn test_deno_flow_same_worker(db: Pool<Postgres>) {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
},
FlowModule {
id: "b".to_string(),
Expand Down Expand Up @@ -1336,6 +1340,7 @@ async fn test_deno_flow_same_worker(db: Pool<Postgres>) {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
},
FlowModule {
id: "e".to_string(),
Expand Down Expand Up @@ -1372,7 +1377,7 @@ async fn test_deno_flow_same_worker(db: Pool<Postgres>) {
priority: None,
delete_after_use: None,
continue_on_error: None,

skip_if: None,
},
],
}.into(),
Expand All @@ -1388,6 +1393,7 @@ async fn test_deno_flow_same_worker(db: Pool<Postgres>) {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
},
FlowModule {
id: "c".to_string(),
Expand Down Expand Up @@ -1431,6 +1437,7 @@ async fn test_deno_flow_same_worker(db: Pool<Postgres>) {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
},
],
same_worker: true,
Expand Down
4 changes: 4 additions & 0 deletions backend/windmill-api/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,7 @@ mod tests {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
},
FlowModule {
id: "b".to_string(),
Expand Down Expand Up @@ -1205,6 +1206,7 @@ mod tests {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
},
FlowModule {
id: "c".to_string(),
Expand Down Expand Up @@ -1232,6 +1234,7 @@ mod tests {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
},
],
failure_module: Some(Box::new(FlowModule {
Expand All @@ -1258,6 +1261,7 @@ mod tests {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
})),
preprocessor_module: None,
same_worker: false,
Expand Down
8 changes: 8 additions & 0 deletions backend/windmill-common/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ pub struct FlowModule {
pub delete_after_use: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub continue_on_error: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub skip_if: Option<SkipIf>,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct SkipIf {
pub expr: String,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -631,6 +638,7 @@ pub fn add_virtual_items_if_necessary(modules: &mut Vec<FlowModule>) {
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
});
}
}
1 change: 1 addition & 0 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3536,6 +3536,7 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
priority: None,
delete_after_use: None,
continue_on_error: None,
skip_if: None,
}],
same_worker: false,
failure_module: None,
Expand Down
45 changes: 34 additions & 11 deletions backend/windmill-worker/src/worker_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2210,6 +2210,23 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>

drop(resume_messages);

let is_skipped = if let Some(skip_if) = &module.skip_if {
let idcontext = get_transform_context(&flow_job, previous_id.as_str(), &status).await?;
compute_bool_from_expr(
skip_if.expr.to_string(),
arc_flow_job_args.clone(),
arc_last_job_result.clone(),
None,
Some(idcontext.clone()),
Some(client),
Some((resumes.clone(), resume.clone(), approvers.clone())),
None,
)
.await?
} else {
false
};

let args: windmill_common::error::Result<_> =
if module.mock.is_some() && module.mock.as_ref().unwrap().enabled {
let mut hm = HashMap::new();
Expand Down Expand Up @@ -2249,7 +2266,16 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
);
Ok(Marc::new(hm))
} else {
match &module.get_value() {
let value = module.get_value();
match &value {
Ok(_) if matches!(value, Ok(FlowModuleValue::Identity)) || is_skipped => serde_json::from_str(
&serde_json::to_string(&PreviousResult {
previous_result: Some(&arc_last_job_result),
})
.unwrap(),
)
.map(Marc::new)
.map_err(|e| error::Error::InternalErr(format!("identity: {e:#}"))),
Ok(
FlowModuleValue::Script { input_transforms, .. }
| FlowModuleValue::RawScript { input_transforms, .. }
Expand All @@ -2270,16 +2296,7 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
)
.await
.map(Marc::new)
}
Ok(FlowModuleValue::Identity) => serde_json::from_str(
&serde_json::to_string(&PreviousResult {
previous_result: Some(&arc_last_job_result),
})
.unwrap(),
)
.map(Marc::new)
.map_err(|e| error::Error::InternalErr(format!("identity: {e:#}"))),

},
Ok(_) => Ok(arc_flow_job_args.clone()),
Err(e) => {
return Err(error::Error::InternalErr(format!(
Expand All @@ -2305,6 +2322,7 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
resumes.clone(),
resume.clone(),
approvers.clone(),
is_skipped,
)
.await?;
tracing::info!(id = %flow_job.id, root_id = %job_root, "next flow transform computed");
Expand Down Expand Up @@ -2971,6 +2989,7 @@ async fn compute_next_flow_transform(
resumes: Arc<Box<RawValue>>,
resume: Arc<Box<RawValue>>,
approvers: Arc<Box<RawValue>>,
is_skipped: bool,
) -> error::Result<NextFlowTransform> {
if module.mock.is_some() && module.mock.as_ref().unwrap().enabled {
return Ok(NextFlowTransform::Continue(
Expand All @@ -2997,6 +3016,9 @@ async fn compute_next_flow_transform(
let delete_after_use = module.delete_after_use.unwrap_or(false);

tracing::debug!(id = %flow_job.id, "computing next flow transform for {:?}", &module.value);
if is_skipped {
return trivial_next_job(JobPayload::Identity);
}
match &module.get_value()? {
FlowModuleValue::Identity => trivial_next_job(JobPayload::Identity),
FlowModuleValue::Flow { path, .. } => {
Expand Down Expand Up @@ -3490,6 +3512,7 @@ fn is_simple_modules(modules: &Vec<FlowModule>, flow: &FlowValue) -> bool {
&& modules[0].retry.is_none()
&& modules[0].stop_after_if.is_none()
&& modules[0].stop_after_all_iters_if.is_none()
&& modules[0].skip_if.is_none()
&& (modules[0].mock.is_none() || modules[0].mock.as_ref().is_some_and(|m| !m.enabled))
&& flow.failure_module.is_none();
is_simple
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import FlowModuleMock from './FlowModuleMock.svelte'
import FlowModuleDeleteAfterUse from './FlowModuleDeleteAfterUse.svelte'
import { enterpriseLicense } from '$lib/stores'
import FlowModuleSkip from './FlowModuleSkip.svelte'
export let noEditor: boolean
export let flowModule: FlowModule
export let previousModule: FlowModule | undefined
export let parentModule: FlowModule | undefined
let value = flowModule.value as BranchAll
$: value = flowModule.value as BranchAll
Expand Down Expand Up @@ -77,6 +79,7 @@
<Pane size={40}>
<Tabs bind:selected>
<Tab value="early-stop">Early Stop/Break</Tab>
<Tab value="skip">Skip</Tab>
<Tab value="suspend">Suspend/Approval/Prompt</Tab>
<Tab value="sleep">Sleep</Tab>
<Tab value="mock">Mock</Tab>
Expand All @@ -88,6 +91,11 @@
<FlowModuleEarlyStop bind:flowModule />
</div>
</TabContent>
<TabContent value="skip" class="flex flex-col flex-1 h-full">
<div class="p-4 overflow-y-auto">
<FlowModuleSkip bind:flowModule {parentModule} {previousModule} />
</div>
</TabContent>
<TabContent value="suspend" class="flex flex-col flex-1 h-full">
<div class="p-4 overflow-y-auto">
<FlowModuleSuspend previousModuleId={previousModule?.id} bind:flowModule />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import SplitPanesWrapper from '../../splitPanes/SplitPanesWrapper.svelte'
import FlowModuleMock from './FlowModuleMock.svelte'
import { enterpriseLicense } from '$lib/stores'
import FlowModuleSkip from './FlowModuleSkip.svelte'
// import FlowRetries from './FlowRetries.svelte'
export let flowModule: FlowModule
export let previousModule: FlowModule | undefined
export let parentModule: FlowModule | undefined
export let noEditor: boolean
export let enableAi = false
Expand Down Expand Up @@ -89,6 +91,7 @@
<Pane size={40}>
<Tabs bind:selected>
<Tab value="early-stop">Early Stop/Break</Tab>
<Tab value="skip">Skip</Tab>
<Tab value="suspend">Suspend/Approval/Prompt</Tab>
<Tab value="sleep">Sleep</Tab>
<Tab value="mock">Mock</Tab>
Expand All @@ -100,6 +103,11 @@
<FlowModuleEarlyStop bind:flowModule />
</div>
</TabContent>
<TabContent value="skip" class="flex flex-col flex-1 h-full">
<div class="p-4 overflow-y-auto">
<FlowModuleSkip bind:flowModule {parentModule} {previousModule} />
</div>
</TabContent>
<TabContent value="suspend" class="flex flex-col flex-1 h-full">
<div class="p-4 overflow-y-auto">
<FlowModuleSuspend previousModuleId={previousModule?.id} bind:flowModule />
Expand Down
8 changes: 7 additions & 1 deletion frontend/src/lib/components/flows/content/FlowLoop.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import FlowLoopIterationPreview from '$lib/components/FlowLoopIterationPreview.svelte'
import FlowModuleDeleteAfterUse from './FlowModuleDeleteAfterUse.svelte'
import IteratorGen from '$lib/components/copilot/IteratorGen.svelte'
import FlowModuleSkip from './FlowModuleSkip.svelte'
const { previewArgs, flowStateStore, flowStore } =
getContext<FlowEditorContext>('FlowEditorContext')
Expand Down Expand Up @@ -210,6 +211,7 @@
<Tabs bind:selected>
<!-- <Tab value="retries">Retries</Tab> -->
<Tab value="early-stop">Early Stop/Break</Tab>
<Tab value="skip">Skip</Tab>
<Tab value="suspend">Suspend/Approval/Prompt</Tab>
<Tab value="sleep">Sleep</Tab>
<Tab value="mock">Mock</Tab>
Expand All @@ -228,7 +230,11 @@
<FlowModuleEarlyStop bind:flowModule={mod} />
</div>
</TabContent>

<TabContent value="skip" class="flex flex-col flex-1 h-full">
<div class="p-4 overflow-y-auto">
<FlowModuleSkip bind:flowModule={mod} {parentModule} {previousModule} />
</div>
</TabContent>
<TabContent value="suspend" class="flex flex-col flex-1 h-full">
<div class="p-4 overflow-y-auto">
<FlowModuleSuspend previousModuleId={previousModule?.id} bind:flowModule={mod} />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import { computeFlowStepWarning, initFlowStepWarnings } from '../utils'
import { debounce } from '$lib/utils'
import { dfs } from '../dfs'
import FlowModuleSkip from './FlowModuleSkip.svelte'
const {
selectedId,
Expand Down Expand Up @@ -418,6 +419,7 @@
>
Early Stop
</Tab>
<Tab value="skip" active={Boolean(flowModule.skip_if)}>Skip</Tab>
<Tab value="suspend" active={Boolean(flowModule.suspend)}>Suspend</Tab>
<Tab value="sleep" active={Boolean(flowModule.sleep)}>Sleep</Tab>
<Tab value="mock" active={Boolean(flowModule.mock?.enabled)}>Mock</Tab>
Expand Down Expand Up @@ -581,6 +583,8 @@
</div>
{:else if advancedSelected === 'early-stop'}
<FlowModuleEarlyStop bind:flowModule />
{:else if advancedSelected === 'skip'}
<FlowModuleSkip bind:flowModule {parentModule} {previousModule} />
{:else if advancedSelected === 'suspend'}
<div>
<FlowModuleSuspend previousModuleId={previousModule?.id} bind:flowModule />
Expand Down
Loading

0 comments on commit 69b4112

Please sign in to comment.