This feature is experimental and may change in future releases.
Overview
The PostgreSQL aggregation sink enables real-time aggregations directly in PostgreSQL using database triggers. Data flows into a landing table, and a trigger function automatically maintains aggregated values in a separate aggregation table. This is useful for:- Account balances
- Liquidity pools
- Running totals
- Trade statistics
- Event counts by category
How it works
- Landing table: Stores raw records in append-only mode with a composite primary key (
primary_key+_gs_op) - Aggregation table: Stores aggregated results, updated incrementally by a PostgreSQL trigger
- Trigger function: Automatically generated and executed on each insert to update aggregations
Configuration
Parameters
Must be
postgres_aggregateThe transform or source to read data from
PostgreSQL schema name (e.g.,
public, analytics)Table name for raw records. Created automatically if it doesn’t exist.
Table name for aggregated results. Created automatically if it doesn’t exist.
Column to use for deduplication in the landing table
Name of the secret containing the PostgreSQL connection string
Columns to group by. Omit for global aggregations.
Aggregation columns. At least one is required.
Group by columns
Each group by column can have:Source column name. Defaults to the key name if omitted.
PostgreSQL type override (e.g.,
varchar(100), text)Aggregate columns
Each aggregate column requires:Aggregation function:
sum, count, avg, min, or maxSource column name. Defaults to the key name if omitted. Not required for
count.PostgreSQL type override (e.g.,
numeric(30,5))Supported aggregation functions
| Function | Supports inserts | Supports deletes | Supports updates | Notes |
|---|---|---|---|---|
sum | Yes | Yes | No | Updates treated as new values |
count | Yes | Yes | Yes | Correctly handles all operations |
avg | Yes | Yes | No | Stored as _sum and _count columns |
min | Yes | No | No | Use only with insert-only streams |
max | Yes | No | No | Use only with insert-only streams |
Deduplication
The landing table uses append-only mode with a composite primary key (primary_key + _gs_op). This enables:
- Deduplication within checkpoint window: Duplicate records (same primary key and operation type) arriving within the same checkpoint epoch are deduplicated via upsert. Records with the same key but different operations (e.g., insert and delete) are stored separately.
- Checkpoint-based truncation: The landing table includes a
_gs_checkpoint_epochcolumn. When a checkpoint is finalized, data from previous epochs is deleted, keeping the landing table small.
Deduplication only works within the checkpoint window. Records arriving in different checkpoint epochs are not deduplicated.
Examples
Multiple aggregations with type override
Global aggregation (no group by)
For aggregations across all records without grouping:group_by is defined, a sentinel key column is added automatically to the aggregation table.
Column renaming
Usefrom to map source columns to different output names:
Generated SQL
The sink automatically creates the trigger function and trigger. Here’s an example of the generated SQL for a balance aggregation:Complete pipeline example
Best practices
Choose the right aggregation function
Choose the right aggregation function
- Use
countfor counting events (supports all operations) - Use
sumfor totals when you have insert/delete streams - Use
min/maxonly for insert-only streams - Avoid
avgwith updates; usesumandcountseparately if needed
Use type overrides for precision
Use type overrides for precision
For financial data, specify explicit numeric precision:
Monitor landing table size
Monitor landing table size
The landing table is automatically truncated after checkpoint finalization, but monitor its size during high-throughput periods.
Use appropriate primary keys
Use appropriate primary keys
Choose a unique, stable identifier for the
primary_key to ensure proper deduplication: