Skip to main content
This feature is experimental and may change in future releases.

Overview

The PostgreSQL aggregation sink enables real-time aggregations directly in PostgreSQL using database triggers. Data flows into a landing table, and a trigger function automatically maintains aggregated values in a separate aggregation table. This is useful for:
  • Account balances
  • Liquidity pools
  • Running totals
  • Trade statistics
  • Event counts by category

How it works

  1. Landing table: Stores raw records in append-only mode with a composite primary key (primary_key + _gs_op)
  2. Aggregation table: Stores aggregated results, updated incrementally by a PostgreSQL trigger
  3. Trigger function: Automatically generated and executed on each insert to update aggregations
Source Data → Landing Table → Trigger → Aggregation Table

Configuration

sinks:
  account_balances:
    type: postgres_aggregate
    from: transfers
    schema: public
    landing_table: transfer_log
    agg_table: account_balances
    primary_key: transfer_id
    secret_name: MY_POSTGRES
    group_by:
      account:
        type: text
    aggregate:
      balance:
        from: amount
        fn: sum

Parameters

type
string
required
Must be postgres_aggregate
from
string
required
The transform or source to read data from
schema
string
required
PostgreSQL schema name (e.g., public, analytics)
landing_table
string
required
Table name for raw records. Created automatically if it doesn’t exist.
agg_table
string
required
Table name for aggregated results. Created automatically if it doesn’t exist.
primary_key
string
required
Column to use for deduplication in the landing table
secret_name
string
required
Name of the secret containing the PostgreSQL connection string
group_by
object
Columns to group by. Omit for global aggregations.
aggregate
object
required
Aggregation columns. At least one is required.

Group by columns

Each group by column can have:
from
string
Source column name. Defaults to the key name if omitted.
type
string
PostgreSQL type override (e.g., varchar(100), text)

Aggregate columns

Each aggregate column requires:
fn
string
required
Aggregation function: sum, count, avg, min, or max
from
string
Source column name. Defaults to the key name if omitted. Not required for count.
type
string
PostgreSQL type override (e.g., numeric(30,5))

Supported aggregation functions

FunctionSupports insertsSupports deletesSupports updatesNotes
sumYesYesNoUpdates treated as new values
countYesYesYesCorrectly handles all operations
avgYesYesNoStored as _sum and _count columns
minYesNoNoUse only with insert-only streams
maxYesNoNoUse only with insert-only streams
SUM and AVG do not support updates: These functions cannot correctly handle updates because they would need the old value to compute the delta. Use only with insert/delete streams.MIN and MAX do not support deletes or updates: These functions cannot retract values without rescanning the entire landing table. Use only with insert-only streams.

Deduplication

The landing table uses append-only mode with a composite primary key (primary_key + _gs_op). This enables:
  • Deduplication within checkpoint window: Duplicate records (same primary key and operation type) arriving within the same checkpoint epoch are deduplicated via upsert. Records with the same key but different operations (e.g., insert and delete) are stored separately.
  • Checkpoint-based truncation: The landing table includes a _gs_checkpoint_epoch column. When a checkpoint is finalized, data from previous epochs is deleted, keeping the landing table small.
Deduplication only works within the checkpoint window. Records arriving in different checkpoint epochs are not deduplicated.

Examples

Multiple aggregations with type override

sinks:
  market_stats:
    type: postgres_aggregate
    from: trades
    schema: trading
    landing_table: trade_log
    agg_table: market_stats
    primary_key: trade_id
    secret_name: MY_POSTGRES
    group_by:
      market:
        type: varchar(100)
    aggregate:
      trade_count:
        fn: count
      total_volume:
        from: volume
        fn: sum
        type: numeric(30,5)
      avg_price:
        from: price
        fn: avg

Global aggregation (no group by)

For aggregations across all records without grouping:
sinks:
  totals:
    type: postgres_aggregate
    from: events
    schema: analytics
    landing_table: event_log
    agg_table: global_totals
    primary_key: event_id
    secret_name: MY_POSTGRES
    aggregate:
      total_count:
        fn: count
When no group_by is defined, a sentinel key column is added automatically to the aggregation table.

Column renaming

Use from to map source columns to different output names:
sinks:
  balances:
    type: postgres_aggregate
    from: transfers
    schema: public
    landing_table: transfer_log
    agg_table: balances
    primary_key: id
    secret_name: MY_POSTGRES
    group_by:
      wallet:
        from: account_address
    aggregate:
      total:
        from: amount
        fn: sum

Generated SQL

The sink automatically creates the trigger function and trigger. Here’s an example of the generated SQL for a balance aggregation:
CREATE OR REPLACE FUNCTION "public"."transfer_log_to_account_balances_fn"()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
  INSERT INTO "public"."account_balances" ("account", "balance")
  SELECT new_table."account" AS "account",
         SUM(CASE WHEN new_table."_gs_op" = 'd' THEN -new_table."amount"
                  ELSE new_table."amount" END) AS "balance"
  FROM new_table
  GROUP BY new_table."account"
  ON CONFLICT ("account")
  DO UPDATE SET "balance" = "public"."account_balances"."balance" + EXCLUDED."balance";
  RETURN NULL;
END;
$$;

DROP TRIGGER IF EXISTS "transfer_log_to_account_balances_trigger" ON "public"."transfer_log";
CREATE TRIGGER "transfer_log_to_account_balances_trigger"
AFTER INSERT ON "public"."transfer_log"
REFERENCING NEW TABLE AS new_table
FOR EACH STATEMENT
EXECUTE FUNCTION "public"."transfer_log_to_account_balances_fn"();

Complete pipeline example

name: token-balances
resource_size: m
description: Track token balances from transfer events

sources:
  transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  usdc_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(recipient) as account,
        CAST(value AS BIGINT) as amount
      FROM transfers
      WHERE lower(contract_address) = '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'

sinks:
  balances:
    type: postgres_aggregate
    from: usdc_transfers
    schema: public
    landing_table: usdc_transfer_log
    agg_table: usdc_balances
    primary_key: id
    secret_name: MY_POSTGRES
    group_by:
      account:
        type: text
    aggregate:
      balance:
        from: amount
        fn: sum
        type: numeric(30,0)

Best practices

  • Use count for counting events (supports all operations)
  • Use sum for totals when you have insert/delete streams
  • Use min/max only for insert-only streams
  • Avoid avg with updates; use sum and count separately if needed
For financial data, specify explicit numeric precision:
aggregate:
  balance:
    from: amount
    fn: sum
    type: numeric(30,5)
The landing table is automatically truncated after checkpoint finalization, but monitor its size during high-throughput periods.
Choose a unique, stable identifier for the primary_key to ensure proper deduplication:
primary_key: transaction_hash  # Good: unique per transaction