Skip to content

Commit

Permalink
update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
jbsmith7741 committed Sep 25, 2024
1 parent 9513d6b commit ce26cbf
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 2 deletions.
28 changes: 26 additions & 2 deletions apps/flowlord/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ template = "?date={yyyy}-{mm}-{dd}T{hh}"
### files
schedule a task after a specified file is written. This should be used with the filewatcher taskmaster or GCP file watching service. File matching is done using the stdlib filepath.Match which does not support `**` matching. Flowlord will attempt to pull the timestamp from the filepath which will be used to populate the date-time in phase's template `{yyyy}|{dd}|{mm}|{hh}`. The matching file can be referenced in the phase's template with `{meta:file}.`. The filename can be references with `{meta:filename}`.

```
``` toml
[[Phase]]
task = "topic"
rule = "files=/folder/*/*/*.txt"
Expand All @@ -98,7 +98,7 @@ template = "{meta:file}?opt=true"

used to indicate a required field or value before starting a child process.

```
``` toml
[[Phase]]
task = "child:job"
rule = "require:{meta:file}"
Expand All @@ -115,3 +115,27 @@ batching is a way to create multiple tasks when the phase is run. This can be do
* meta: comma separated data associate with a key. Each item will generate a new task Ex: meta=key:a,b,c|key2=1,2,3
* meta_file: a line deliminated data file. each row (line) will generate a new task.

``` toml
# run every day at 2:01:00 for multiple items
# generates 3 tasks with the info of
# ?name=a&value=1 AND ?name=b&value=2 AND ?name=c&value=3
[[Phase]]
task="worker:job-meta"
rule="cron=0 1 2 * * *&meta=name:a,b,c|value:1,2,3"
template="?name={meta:name}&value={meta:value}&day={yyyy}-{mm}-{dd}"

# run every day at 5:05:00
# generates a task for every line in the file
[[Phase]]
task="worker:job-file"
rule="cron=0 5 5 * * *&meta_file=data.json"
template="?name={meta:name}&value={meta:value}&day={yyyy}-{mm}-{dd}"

# run every day for the last week
# generates 8 tasks from today to 7 days ago
[[Phase]]
task="worker:lastweek"
rule="cron=0 7 7 * * *&for=-168h&by=day
template=?day={yyyy}-{mm}-{dd}
```
12 changes: 12 additions & 0 deletions apps/flowlord/taskmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,18 @@ func TestTaskMaster_Batch(t *testing.T) {
{Type: "batch-date", Info: "?day=2024-01-13", Meta: "cron=2024-01-13T00&workflow=batch.toml"},
},
},
"for-48h +offset": {
Input: workflow.Phase{
Task: "batch-date",
Rule: "for=-48h&offset=-48h",
Template: "?day={yyyy}-{mm}-{dd}",
},
Expected: []task.Task{
{Type: "batch-date", Info: "?day=2024-01-13", Meta: "cron=2024-01-13T00&workflow=batch.toml"},
{Type: "batch-date", Info: "?day=2024-01-12", Meta: "cron=2024-01-12T00&workflow=batch.toml"},
{Type: "batch-date", Info: "?day=2024-01-11", Meta: "cron=2024-01-11T00&workflow=batch.toml"},
},
},
"metas": {
Input: workflow.Phase{
Task: "meta-batch",
Expand Down
104 changes: 104 additions & 0 deletions internal/Docs/PG_Tables.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Postgres Tables
Suggested tables to be able to track and store processed task records.

### Required Functions

``` sql
-- parse_param will convert a url param without a starting ? to a json key,value pair
CREATE OR REPLACE FUNCTION parse_param(url text)
RETURNS jsonb AS $$
DECLARE
result jsonb := '{}';
param text;
key text;
value text;
BEGIN
FOREACH param IN ARRAY regexp_split_to_array(url, '&')
LOOP
key := split_part(param, '=', 1);
value := coalesce(split_part(param, '=', 2),'');
if key is not null and key != '' then
result := jsonb_set(result, ARRAY [key], to_jsonb(value), true);
end if;
END LOOP;

RETURN result;
END;
$$ LANGUAGE plpgsql;

-- parse_url takes a url value, extract the origin and the query params into a json object
CREATE OR REPLACE FUNCTION parse_url(url text)
RETURNS jsonb AS $$
DECLARE
result jsonb := '{}';
query text;
origin text;
BEGIN
-- Construct origin
origin := coalesce(substring(url from '^((?:([^:/?#]+)://([^/?#]*))?([^?#]*))'), '');
result := jsonb_set(result, '{origin}', to_jsonb(origin), true);

-- Extract query parameters
query := substring(url from '\?.*$');
IF query IS NOT NULL THEN
query := substring(query from 2); -- Remove the leading '?'
result := result || parse_param(query);
END IF;

RETURN result;
END;
$$ LANGUAGE plpgsql;
```

## Task-logs
used to store the done tasks.

``` sql
create table public.task_log
(
id text,
type text,
job text,
info text,
result text,
meta text,
msg text,
created timestamp,
started timestamp,
ended timestamp
);

create index task_log_created
on public.task_log (created);

create index task_log_started
on public.task_log (started);

create index task_log_type
on public.task_log (type);

create index task_log_job
on public.task_log (job);
```

## Task View
a user friendly view of task-logs that adds time to complete task in _**task_seconds**_ and _**task_time**_ and parsed url values of _**info**_ and _**meta**_

``` sql
create or replace view public.tasks (id, type, job, info, infoJSON, meta, metaJSON, msg, result, task_seconds, task_time, created, started, ended) as
select task_log.id,
task_log.type,
task_log.job,
task_log.info,
parse_url(task_log.info),
task_log.meta,
parse_param(task_log.meta),
task_log.msg,
task_log.result,
date_part('epoch'::text, task_log.ended - task_log.started) as task_seconds,
to_char(task_log.ended - task_log.started, 'HH24:MI:SS'::text) as task_time,
task_log.created,
task_log.started,
task_log.ended
from task_log;
```

0 comments on commit ce26cbf

Please sign in to comment.