6. Samples

6.1. Basic

This sample demonstrates how to create a basic one-step chain with parameters. It uses CTE to directly update the timetable schema tables.

 1WITH 
 2chain_insert(task_id) AS (
 3    INSERT INTO timetable.task 
 4        (task_id, kind, command, ignore_error)
 5    VALUES 
 6        (DEFAULT, 'SQL', 'SELECT pg_notify($1, $2)', TRUE)
 7    RETURNING task_id
 8),
 9chain_config(id) as (
10    INSERT INTO timetable.chain (
11        chain_id, 
12        task_id, 
13        chain_name, 
14        run_at, 
15        max_instances, 
16        live,
17        self_destruct, 
18        exclusive_execution
19    ) VALUES (
20        DEFAULT, -- chain_id, 
21        (SELECT task_id FROM chain_insert), -- task_id, 
22        'notify every minute', -- chain_name, 
23        '* * * * *', -- run_at, 
24        1, -- max_instances, 
25        TRUE, -- live, 
26        FALSE, -- self_destruct,
27        FALSE -- exclusive_execution, 
28    )
29    RETURNING  chain_id
30)
31INSERT INTO timetable.parameter 
32    (chain_id, task_id, order_id, value)
33VALUES (
34    (SELECT id FROM chain_config),
35    (SELECT task_id FROM chain_insert),
36    1,
37    '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb) 

6.2. Download, Transform and Import

This sample demonstrates how to create enhanced three-step chain with parameters. It uses DO statement to directly update the timetable schema tables.

 1-- An enhanced example consisting of three tasks:
 2-- 1. Download text file from internet using BUILT-IN command
 3-- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL extension) 
 4-- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`)
 5DO $$
 6DECLARE
 7    v_head_id bigint;
 8    v_task_id bigint;
 9    v_chain_id bigint;
10BEGIN
11    -- Step 1. Download file from the server
12    -- Create the chain
13    INSERT INTO timetable.task (kind, command, ignore_error)
14    VALUES ('BUILTIN', 'Download', TRUE)
15    RETURNING
16        task_id INTO v_head_id;
17
18    -- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
19    INSERT INTO timetable.chain (task_id, chain_name, live)
20    VALUES (v_head_id, 'Download locations and aggregate', TRUE)
21    RETURNING
22        chain_id INTO v_chain_id;
23
24    -- Create the parameters for the step 1:
25    INSERT INTO timetable.parameter (chain_id, task_id, order_id, value)
26        VALUES (v_chain_id, v_head_id, 1, '
27                {
28                    "workersnum": 1,
29                    "fileurls": ["https://www.cybertec-postgresql.com/secret/orte.txt"], 
30                    "destpath": "."
31                }'::jsonb);
32    
33    RAISE NOTICE 'Step 1 completed. DownloadFile task added with ID: %', v_chain_id;
34
35    -- Step 2. Transform Unicode characters into ASCII
36    -- Create the program task to call 'uconv' and name it 'unaccent'
37    INSERT INTO timetable.task (parent_id, kind, command, ignore_error, task_name)
38        VALUES (v_head_id, 'PROGRAM', 'uconv', TRUE, 'unaccent')
39    RETURNING
40        task_id INTO v_task_id;
41
42    -- Create the parameters for the 'unaccent' task. Input and output files in this case
43    -- Under Windows we should call PowerShell instead of "uconv" with command:
44    -- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '')
45    INSERT INTO timetable.parameter (chain_id, task_id, order_id, value)
46        VALUES (v_chain_id, v_task_id, 1, '["-x", "Latin-ASCII", "-o", "orte_ansi.txt", "orte.txt"]'::jsonb);
47
48    RAISE NOTICE 'Step 2 completed. Unacent task added';
49
50    -- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command
51    INSERT INTO timetable.task (parent_id, kind, command)
52        VALUES (v_task_id, 'BUILTIN', 'CopyFromFile')
53    RETURNING
54        task_id INTO v_task_id;
55
56    -- Prepare the destination table 'location'
57    CREATE TABLE IF NOT EXISTS location(name text);
58
59    -- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt'
60    INSERT INTO timetable.parameter (chain_id, task_id, order_id, value)
61        VALUES (v_chain_id, v_task_id, 1, '{"sql": "COPY location FROM STDIN", "filename": "orte_ansi.txt" }'::jsonb);
62
63    RAISE NOTICE 'Step 3 completed. Import task added';
64END;
65$$ LANGUAGE PLPGSQL;

6.3. Run tasks in autonomous transaction

This sample demonstrates how to run special tasks out of chain transaction context. This is useful for special routines and/or non-transactional operations, e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.

 1-- An advanced example showing how to use atutonomous tasks.
 2-- This one-task chain will execute test_proc() procedure.
 3-- Since procedure will make two commits (after f1() and f2())
 4-- we cannot use it as a regular task, because all regular tasks 
 5-- must be executed in the context of a single chain transaction.
 6-- Same rule applies for some other SQL commands, 
 7-- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
 8CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$
 9BEGIN 
10    RAISE notice '%', msg; 
11END;
12$$ LANGUAGE PLPGSQL;
13
14CREATE OR REPLACE PROCEDURE test_proc () AS $$
15BEGIN
16    PERFORM f('hey 1');
17    COMMIT;
18    PERFORM f('hey 2');
19    COMMIT;
20END;
21$$
22LANGUAGE PLPGSQL;
23
24WITH chain_insert(task_id) AS (
25    INSERT INTO timetable.task(command, ignore_error, autonomous)
26    VALUES('CALL test_proc()', TRUE, TRUE)
27    RETURNING task_id
28)
29INSERT INTO timetable.chain (
30    chain_id, 
31    task_id, 
32    chain_name, 
33    run_at, 
34    max_instances, 
35    live,
36    self_destruct, 
37    exclusive_execution
38)  VALUES (
39    DEFAULT, -- chain_id, 
40    (SELECT task_id FROM chain_insert), -- task_id, 
41    'call proc() every 10 sec', -- chain_name, 
42    '@every 10 seconds', -- run_at, 
43    1, -- max_instances, 
44    TRUE, -- live, 
45    FALSE, -- self_destruct,
46    FALSE -- exclusive_execution, 
47)
48RETURNING  chain_id, run_at;