6. Samples

6.1. Basic

This sample demonstrates how to create a basic one-step chain with parameters. It uses CTE to directly update the timetable schema tables.

 1SELECT timetable.add_job(
 2    job_name            => 'notify every minute',
 3    job_schedule        => '* * * * *',
 4    job_command         => 'SELECT pg_notify($1, $2)',
 5    job_parameters      => '[ "TT_CHANNEL", "Ahoj from SQL base task" ]' :: jsonb,
 6    job_kind            => 'SQL'::timetable.command_kind,
 7    job_client_name     => NULL,
 8    job_max_instances   => 1,
 9    job_live            => TRUE,
10    job_self_destruct   => FALSE,
11    job_ignore_errors   => TRUE
12) as chain_id;

6.2. Send email

This sample demonstrates how to create an advanced email job. It will check if there are emails to send, will send them and log the status of the command execution. You don’t need to setup anything, every parameter can be specified during the chain creation.

 1DO $$
 2    -- An example for using the SendMail task.
 3DECLARE
 4    v_mail_task_id bigint;
 5    v_log_task_id bigint;
 6    v_chain_id bigint;
 7BEGIN
 8    -- Get the chain id
 9    INSERT INTO timetable.chain (chain_name, max_instances, live) VALUES ('Send Mail', 1, TRUE)
10    RETURNING chain_id INTO v_chain_id;
11
12    -- Add SendMail task
13    INSERT INTO timetable.task (chain_id, task_order, kind, command) 
14    SELECT v_chain_id, 10, 'BUILTIN', 'SendMail'
15    RETURNING task_id INTO v_mail_task_id;
16
17    -- Create the parameters for the SensMail task
18        -- "username":	      The username used for authenticating on the mail server
19        -- "password":        The password used for authenticating on the mail server
20        -- "serverhost":      The IP address or hostname of the mail server
21        -- "serverport":      The port of the mail server
22        -- "senderaddr":      The email that will appear as the sender
23        -- "ccaddr":	      String array of the recipients(Cc) email addresses
24        -- "bccaddr":	      String array of the recipients(Bcc) email addresses
25        -- "toaddr":          String array of the recipients(To) email addresses
26        -- "subject":	      Subject of the email
27        -- "attachment":      String array of the attachments (local file)
28        -- "attachmentdata":  Pairs of name and base64-encoded content
29        -- "msgbody":	      The body of the email
30
31    INSERT INTO timetable.parameter (task_id, order_id, value)
32        VALUES (v_mail_task_id, 1, '{
33                "username":     "user@example.com",
34                "password":     "password",
35                "serverhost":   "smtp.example.com",
36                "serverport":   587,
37                "senderaddr":   "user@example.com",
38                "ccaddr":       ["recipient_cc@example.com"],
39                "bccaddr":      ["recipient_bcc@example.com"],
40                "toaddr":       ["recipient@example.com"],
41                "subject":      "pg_timetable - No Reply",
42                "attachment":   ["D:\\Go stuff\\Books\\Concurrency in Go.pdf","report.yaml"],
43                "attachmentdata": [{"name": "File.txt", "base64data": "RmlsZSBDb250ZW50"}],
44                "msgbody":      "<b>Hello User,</b> <p>I got some Go books for you enjoy</p> <i>pg_timetable</i>!",
45                "contenttype":  "text/html; charset=UTF-8"
46                }'::jsonb);
47    
48    -- Add Log task and make it the last task using `task_order` column (=30)
49    INSERT INTO timetable.task (chain_id, task_order, kind, command) 
50    SELECT v_chain_id, 30, 'BUILTIN', 'Log'
51    RETURNING task_id INTO v_log_task_id;
52
53    -- Add housekeeping task, that will delete sent mail and update parameter for the previous logging task
54    -- Since we're using special add_task() function we don't need to specify the `chain_id`.
55    -- Function will take the same `chain_id` from the parent task, SendMail in this particular case
56    PERFORM timetable.add_task(
57        kind => 'SQL', 
58        parent_id => v_mail_task_id,
59        command => format(
60$query$WITH sent_mail(toaddr) AS (DELETE FROM timetable.parameter WHERE task_id = %s RETURNING value->>'username')
61INSERT INTO timetable.parameter (task_id, order_id, value) 
62SELECT %s, 1, to_jsonb('Sent emails to: ' || string_agg(sent_mail.toaddr, ';'))
63FROM sent_mail
64ON CONFLICT (task_id, order_id) DO UPDATE SET value = EXCLUDED.value$query$, 
65                v_mail_task_id, v_log_task_id
66            ),
67        order_delta => 10
68    );
69
70-- In the end we should have something like this. Note, that even Log task was created earlier it will be executed later
71-- due to `task_order` column.
72
73-- timetable=> SELECT task_id, chain_id, kind, left(command, 50) FROM timetable.task ORDER BY task_order;  
74--  task_id | chain_id | task_order |  kind   |                             left
75-- ---------+----------+------------+---------+---------------------------------------------------------------
76--       45 |       24 |         10 | BUILTIN | SendMail
77--       47 |       24 |         20 | SQL     | WITH sent_mail(toaddr) AS (DELETE FROM timetable.p
78--       46 |       24 |         30 | BUILTIN | Log
79-- (3 rows)
80
81END;
82$$
83LANGUAGE PLPGSQL;

6.3. Download, Transform and Import

This sample demonstrates how to create enhanced three-step chain with parameters. It uses DO statement to directly update the timetable schema tables.

 1-- Prepare the destination table 'location'
 2CREATE TABLE IF NOT EXISTS city(
 3    city text,
 4    lat numeric,
 5    lng numeric,
 6    country text,
 7    iso2 text,
 8    admin_name text,
 9    capital text,
10    population bigint,
11    population_proper bigint);
12
13-- An enhanced example consisting of three tasks:
14-- 1. Download text file from internet using BUILT-IN command
15-- 2. Remove accents (diacritic signs) from letters using PROGRAM command (can be done with `unaccent` PostgreSQL extension) 
16-- 3. Import text file as CSV file using BUILT-IN command (can be down with `psql -c /copy`)
17DO $$
18DECLARE
19    v_head_id bigint;
20    v_task_id bigint;
21    v_chain_id bigint;
22BEGIN
23    -- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
24    INSERT INTO timetable.chain (chain_name, live)
25    VALUES ('Download locations and aggregate', TRUE)
26    RETURNING chain_id INTO v_chain_id;
27
28    -- Step 1. Download file from the server
29    -- Create the chain
30    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
31    VALUES (v_chain_id, 1, 'BUILTIN', 'Download', TRUE)
32    RETURNING task_id INTO v_task_id;
33
34    -- Create the parameters for the step 1:
35    INSERT INTO timetable.parameter (task_id, order_id, value)
36        VALUES (v_task_id, 1, 
37           '{
38                "workersnum": 1,
39                "fileurls": ["https://simplemaps.com/static/data/country-cities/mt/mt.csv"], 
40                "destpath": "."
41            }'::jsonb);
42    
43    RAISE NOTICE 'Step 1 completed. Chain added with ID: %; DownloadFile task added with ID: %', v_chain_id, v_task_id;
44
45    -- Step 2. Transform Unicode characters into ASCII
46    -- Create the program task to call 'uconv' and name it 'unaccent'
47    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
48    VALUES (v_chain_id, 2, 'PROGRAM', 'uconv', TRUE, 'unaccent')
49    RETURNING task_id INTO v_task_id;
50
51    -- Create the parameters for the 'unaccent' task. Input and output files in this case
52    -- Under Windows we should call PowerShell instead of "uconv" with command:
53    -- Set-content "orte_ansi.txt" ((Get-content "orte.txt").Normalize("FormD") -replace '\p{M}', '')
54    INSERT INTO timetable.parameter (task_id, order_id, value)
55        VALUES (v_task_id, 1, '["-x", "Latin-ASCII", "-o", "mt_ansi.csv", "mt.csv"]'::jsonb);
56
57    RAISE NOTICE 'Step 2 completed. Unacent task added with ID: %', v_task_id;
58
59    -- Step 3. Import ASCII file to PostgreSQL table using "CopyFromFile" built-in command
60    INSERT INTO timetable.task (chain_id, task_order, kind, command)
61        VALUES (v_chain_id, 3, 'BUILTIN', 'CopyFromFile')
62    RETURNING task_id INTO v_task_id;
63
64    -- Add the parameters for the download task. Execute client side COPY to 'location' from 'orte_ansi.txt'
65    INSERT INTO timetable.parameter (task_id, order_id, value)
66        VALUES (v_task_id, 1, '{"sql": "COPY city FROM STDIN (FORMAT csv, HEADER true)", "filename": "mt_ansi.csv" }'::jsonb);
67
68    RAISE NOTICE 'Step 3 completed. Import task added with ID: %', v_task_id;
69
70    INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, task_name)
71    VALUES (v_chain_id, 4, 'PROGRAM', 'bash', TRUE, 'remove .csv')
72    RETURNING task_id INTO v_task_id;
73
74    INSERT INTO timetable.parameter (task_id, order_id, value)
75    VALUES (v_task_id, 1, '["-c", "rm *.csv"]'::jsonb);
76END;
77$$ LANGUAGE PLPGSQL;

6.4. Run tasks in autonomous transaction

This sample demonstrates how to run special tasks out of chain transaction context. This is useful for special routines and/or non-transactional operations, e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.

 1-- An advanced example showing how to use atutonomous tasks.
 2-- This one-task chain will execute test_proc() procedure.
 3-- Since procedure will make two commits (after f1() and f2())
 4-- we cannot use it as a regular task, because all regular tasks 
 5-- must be executed in the context of a single chain transaction.
 6-- Same rule applies for some other SQL commands, 
 7-- e.g. CREATE DATABASE, REINDEX, VACUUM, CREATE TABLESPACE, etc.
 8CREATE OR REPLACE FUNCTION f (msg TEXT) RETURNS void AS $$
 9BEGIN 
10    RAISE notice '%', msg; 
11END;
12$$ LANGUAGE PLPGSQL;
13
14CREATE OR REPLACE PROCEDURE test_proc () AS $$
15BEGIN
16    PERFORM f('hey 1');
17    COMMIT;
18    PERFORM f('hey 2');
19    COMMIT;
20END;
21$$
22LANGUAGE PLPGSQL;
23
24WITH
25    cte_chain (v_chain_id) AS (
26        INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct) 
27        VALUES (
28            'call proc() every 10 sec', -- chain_name, 
29            '@every 10 seconds',        -- run_at,
30            1,     -- max_instances, 
31            TRUE,  -- live, 
32            FALSE -- self_destruct
33        ) RETURNING chain_id
34    ),
35    cte_task(v_task_id) AS (
36        INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
37        SELECT v_chain_id, 10, 'SQL', 'CALL test_proc()', TRUE, TRUE
38        FROM cte_chain
39        RETURNING task_id
40    )
41SELECT v_chain_id, v_task_id FROM cte_task, cte_chain;

6.5. Shutdown the scheduler and terminate the session

This sample demonstrates how to shutdown the scheduler using special built-in task. This can be used to control maintenance windows, to restart the scheduler for update purposes, or to stop session before the database should be dropped.

 1-- This one-task chain (aka job) will terminate pg_timetable session.
 2-- This is useful for maintaining purposes or before database being destroyed.
 3-- One should take care of restarting pg_timetable if needed.
 4
 5SELECT timetable.add_job (
 6    job_name     => 'Shutdown pg_timetable session on schedule',
 7    job_schedule => '* * 1 * *',
 8    job_command  => 'Shutdown',
 9    job_kind     => 'BUILTIN'
10);

6.6. Access previous task result code and output from the next task

This sample demonstrates how to check the result code and output of a previous task. If the last task failed, that is possible only if ignore_error boolean = true is set for that task. Otherwise, a scheduler will stop the chain. This sample shows how to calculate failed, successful, and the total number of tasks executed. Based on these values, we can calculate the success ratio.

 1WITH 
 2    cte_chain (v_chain_id) AS ( -- let's create a new chain and add tasks to it later
 3        INSERT INTO timetable.chain (chain_name, run_at, max_instances, live) 
 4        VALUES ('many tasks', '* * * * *', 1, true)
 5        RETURNING chain_id
 6    ),
 7    cte_tasks(v_task_id) AS ( -- now we'll add 500 tasks to the chain, some of them will fail
 8        INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
 9        SELECT v_chain_id, g.s, 'SQL', 'SELECT 1.0 / round(random())::int4;', true
10        FROM cte_chain, generate_series(1, 500) AS g(s)
11        RETURNING task_id
12    ),
13    report_task(v_task_id) AS ( -- and the last reporting task will calculate the statistic
14        INSERT INTO timetable.task (chain_id, task_order, kind, command)
15        SELECT v_chain_id, 501, 'SQL', $CMD$DO
16$$
17DECLARE
18    s TEXT;
19BEGIN
20    WITH report AS (
21        SELECT 
22        count(*) FILTER (WHERE returncode = 0) AS success,
23        count(*) FILTER (WHERE returncode != 0) AS fail,
24        count(*) AS total
25        FROM timetable.execution_log 
26        WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint
27          AND txid = txid_current()
28    )
29    SELECT 'Tasks executed:' || total || 
30         '; succeeded: ' || success || 
31         '; failed: ' || fail || 
32         '; ratio: ' || 100.0*success/GREATEST(total,1)
33    INTO s
34    FROM report;
35    RAISE NOTICE '%', s;
36END;
37$$
38$CMD$
39        FROM cte_chain
40        RETURNING task_id
41    )
42SELECT v_chain_id FROM cte_chain