Skip to content

datatable_*

Data tables are wick's shared key/value store. One data table = one named rowset with a typed schema, declared in the Data Tables tab and bound to a workflow via its data_tables: block. Seven node types share one executor:

TypeOperation
datatable_getLoad one row by primary key. Branches on found / not_found.
datatable_existsCheck whether any row matches. Branches on true / false.
datatable_queryMulti-row search with where / order_by / limit.
datatable_countCount rows matching where without loading them.
datatable_insertInsert a new row; fails on PK conflict.
datatable_upsertInsert or update by primary key. Returns action: insert|update.
datatable_deleteDelete rows matching where.
Sourceinternal/agents/workflow/nodes/datatable.go
EngineIn-memory by default — see the data tables design doc for the Postgres-backed roadmap.

Schema (per type)

datatable_get

FieldRequiredNotes
tableData table alias from the workflow's data_tables: block.
keyPrimary key value (templated).

datatable_exists, datatable_delete

FieldRequiredNotes
table
whereYAML map of field equality filters (templated).

datatable_query

FieldRequiredNotes
table
whereEquality filters.
order_byColumn name (prefix - for DESC).
limitRow cap.
offsetSkip rows.

datatable_count

FieldRequiredNotes
table
whereSame shape as datatable_query.

datatable_insert, datatable_upsert

FieldRequiredNotes
table
keyPK value(s) (templated).
row_valuesColumn → value map (templated).

Output

OpFields
datatable_getrow: map[string]any — nil if not_found branch taken.
datatable_existsBranches on true / false.
datatable_queryrows: []map[string]any, count: int.
datatable_countcount: int.
datatable_insertkey: string — inserted primary key.
datatable_upsertaction: stringinsert or update.
datatable_delete(no output fields beyond status).

Example: idempotency by primary key

json
{
  "data_tables": [
    {"name": "processed_events", "ref": "processed_events", "mode": "read_write"}
  ],
  "graph": {
    "entry": "check_seen",
    "nodes": [
      {
        "id": "check_seen",
        "type": "datatable_exists",
        "table": "processed_events",
        "where": {"event_id": "{{index .Event.Payload \"id\"}}"}
      },
      {"id": "skip", "type": "end", "result": "already_processed"},
      {"id": "handle", "type": "agent", "prompt_file": "nodes/handle.md"},
      {
        "id": "mark_done",
        "type": "datatable_insert",
        "table": "processed_events",
        "key": "{{index .Event.Payload \"id\"}}",
        "row": {
          "event_id": "{{index .Event.Payload \"id\"}}",
          "processed_at": "{{now}}"
        }
      }
    ],
    "edges": [
      {"from": "check_seen", "to": "skip",      "case": "true"},
      {"from": "check_seen", "to": "handle",    "case": "false"},
      {"from": "handle",     "to": "mark_done"}
    ]
  }
}

Pair with

  • branch — route on datatable_exists verdict or datatable_get found / not_found.
  • schedule_at — write a "fire later" row, then a later trigger reads it.
Built with ❤️ by a developer, for developers.