6. Samples

6.1. Basic

This sample demonstrates how to create a basic one-step chain with parameters. It uses CTE to directly update the timetable schema tables.

 1SELECT timetable.add_job(
 2    job_name            => 'notify every minute',
 3    job_schedule        => '* * * * *',
 4    job_command         => 'SELECT pg_notify($1, $2)',
 5    job_parameters      => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb,
 6    job_kind            => 'SQL'::timetable.command_kind,
 7    job_client_name     => NULL,
 8    job_max_instances   => 1,
 9    job_live            => TRUE,
10    job_self_destruct   => FALSE,
11    job_ignore_errors   => TRUE
12) as chain_id;

6.2. Download, Transform and Import

This sample demonstrates how to create enhanced three-step chain with parameters. It uses DO statement to directly update the timetable schema tables.

 1-- An enhanced example consisting of three tasks:
 2-- 1. Download text file from internet using BUILT-IN command
 3-- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL extension) 
 4-- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`)
 5DO $$
 6DECLARE
 7    v_head_id bigint;
 8    v_task_id bigint;
 9    v_chain_id bigint;
10BEGIN
11    -- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
12    INSERT INTO timetable.chain (chain_name, live)
13    VALUES ('Download locations and aggregate', TRUE)
14    RETURNING chain_id INTO v_chain_id;
15
16    -- Step 1. Download file from the server
17    -- Create the chain
18    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
19    VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE)
20    RETURNING task_id INTO v_task_id;
21
22    -- Create the parameters for the step 1:
23    INSERT INTO timetable.parameter (task_id, order_id, value)
24        VALUES (v_task_id, 1, 
25           '{
26                "workersnum": 1,
27                "fileurls": ["https://www.cybertec-postgresql.com/secret/orte.txt"], 
28                "destpath": "."
29            }'::jsonb);
30    
31    RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, v_task_id;
32
33    -- Step 2. Transform Unicode characters into ASCII
34    -- Create the program task to call 'uconv' and name it 'unaccent'
35    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
36    VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent')
37    RETURNING task_id INTO v_task_id;
38
39    -- Create the parameters for the 'unaccent' task. Input and output files in this case
40    -- Under Windows we should call PowerShell instead of "uconv" with command:
41    -- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '')
42    INSERT INTO timetable.parameter (task_id, order_id, value)
43        VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "orte_ansi.txt", "orte.txt"]'::jsonb);
44
45    RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id;
46
47    -- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command
48    INSERT INTO timetable.task (chain_id, task_order, kind, command)
49        VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile')
50    RETURNING task_id INTO v_task_id;
51
52    -- Prepare the destination table 'location'
53    CREATE TABLE IF NOT EXISTS location(name text);
54
55    -- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt'
56    INSERT INTO timetable.parameter (task_id, order_id, value)
57        VALUES (v_task_id, 1, '{"sql": "COPY location FROM STDIN", "filename": "orte_ansi.txt" }'::jsonb);
58
59    RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id;
60END;
61$$ LANGUAGE PLPGSQL;

6.3. Run tasks in autonomous transaction

This sample demonstrates how to run special tasks out of chain transaction context. This is useful for special routines and/or non-transactional operations, e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.

 1-- An advanced example showing how to use atutonomous tasks.
 2-- This one-task chain will execute test_proc() procedure.
 3-- Since procedure will make two commits (after f1() and f2())
 4-- we cannot use it as a regular task, because all regular tasks 
 5-- must be executed in the context of a single chain transaction.
 6-- Same rule applies for some other SQL commands, 
 7-- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
 8CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$
 9BEGIN 
10    RAISE notice '%', msg; 
11END;
12$$ LANGUAGE PLPGSQL;
13
14CREATE OR REPLACE PROCEDURE test_proc () AS $$
15BEGIN
16    PERFORM f('hey 1');
17    COMMIT;
18    PERFORM f('hey 2');
19    COMMIT;
20END;
21$$
22LANGUAGE PLPGSQL;
23
24WITH
25    cte_chain (v_chain_id) AS (
26        INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct) 
27        VALUES (
28            'call proc() every 10 sec', -- chain_name, 
29            '@every 10 seconds',        -- run_at,
30            1,     -- max_instances, 
31            TRUE,  -- live, 
32            FALSE -- self_destruct
33        ) RETURNING chain_id
34    ),
35    cte_task(v_task_id) AS (
36        INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
37        SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE
38        FROM cte_chain
39        RETURNING task_id
40    )
41SELECT v_chain_id, v_task_id FROM cte_task, cte_chain;

6.4. Shutdown the scheduler and terminate the session

This sample demonstrates how to shutdown the scheduler using special built-in task. This can be used to control maintenance windows, to restart the scheduler for update purposes, or to stop session before the database should be dropped.

 1-- This one-task chain (aka job) will terminate pg_timetable session.
 2-- This is useful for maintaining purposes or before database being destroyed.
 3-- One should take care of restarting pg_timetable if needed.
 4
 5SELECT timetable.add_job (
 6    job_name     => 'Shutdown pg_timetable session on schedule',
 7    job_schedule => '* * 1 * *',
 8    job_command  => 'Shutdown',
 9    job_kind     => 'BUILTIN'
10);

6.5. Access previous task result code and output from the next task

This sample demonstrates how to check the result code and output of a previous task. If the last task failed, that is possible only if ignore_error boolean = true is set for that task. Otherwise, a scheduler will stop the chain. This sample shows how to calculate failed, successful, and the total number of tasks executed. Based on these values, we can calculate the success ratio.

 1WITH 
 2    cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later
 3        INSERT INTO timetable.chain (chain_name, run_at, max_instances, live) 
 4        VALUES ('many tasks', '* * * * *', 1, true)
 5        RETURNING chain_id
 6    ),
 7    cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail
 8        INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
 9        SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true
10        FROM cte_chain, generate_series(1, 500) AS g(s)
11        RETURNING task_id
12    ),
13    report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic
14        INSERT INTO timetable.task (chain_id, task_order, kind, command)
15        SELECT v_chain_id, 501, 'SQL', $CMD$DO
16$$
17DECLARE
18    s TEXT;
19BEGIN
20    WITH report AS (
21        SELECT 
22        count(*) FILTER (WHERE returncode = 0) AS success,
23        count(*) FILTER (WHERE returncode != 0) AS fail,
24        count(*) AS total
25        FROM timetable.execution_log 
26        WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint
27          AND txid = txid_current()
28    )
29    SELECT 'Tasks executed:' || total || 
30         '; succeeded: ' || success || 
31         '; failed: ' || fail || 
32         '; ratio: ' || 100.0*success/GREATEST(total,1)
33    INTO s
34    FROM report;
35    RAISE NOTICE '%', s;
36END;
37$$
38$CMD$
39        FROM cte_chain
40        RETURNING task_id
41    )
42SELECT v_chain_id FROM cte_chain