> ## Documentation Index
> Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt
> Use this file to discover all available pages before exploring further.

# PostgreSQL aggregation

> Perform real-time aggregations directly in PostgreSQL using database triggers

<Note>
  This feature is experimental and may change in future releases.
</Note>

## Overview

The PostgreSQL aggregation sink enables real-time aggregations directly in PostgreSQL using database triggers. Data flows into a landing table, and a trigger function automatically maintains aggregated values in a separate aggregation table.

This is useful for:

* Account balances
* Liquidity pools
* Running totals
* Trade statistics
* Event counts by category

## How it works

1. **Landing table**: Stores raw records in append-only mode with a composite primary key (`primary_key` + `_gs_op`)
2. **Aggregation table**: Stores aggregated results, updated incrementally by a PostgreSQL trigger
3. **Trigger function**: Automatically generated and executed on each insert to update aggregations

```text theme={null}
Source Data → Landing Table → Trigger → Aggregation Table
```

## Configuration

```yaml theme={null}
sinks:
  account_balances:
    type: postgres_aggregate
    from: transfers
    schema: public
    landing_table: transfer_log
    agg_table: account_balances
    primary_key: transfer_id
    secret_name: MY_POSTGRES
    group_by:
      account:
        type: text
    aggregate:
      balance:
        from: amount
        fn: sum
```

## Parameters

<ParamField path="type" type="string" required>
  Must be `postgres_aggregate`
</ParamField>

<ParamField path="from" type="string" required>
  The transform or source to read data from
</ParamField>

<ParamField path="schema" type="string" required>
  PostgreSQL schema name (e.g., `public`, `analytics`)
</ParamField>

<ParamField path="landing_table" type="string" required>
  Table name for raw records. Created automatically if it doesn't exist.
</ParamField>

<ParamField path="agg_table" type="string" required>
  Table name for aggregated results. Created automatically if it doesn't exist.
</ParamField>

<ParamField path="primary_key" type="string" required>
  Column to use for deduplication in the landing table
</ParamField>

<ParamField path="secret_name" type="string" required>
  Name of a Goldsky secret holding the PostgreSQL connection details. Uses the same format as the [PostgreSQL sink](/turbo-pipelines/sinks/postgres#secret-format).
</ParamField>

<ParamField path="group_by" type="object">
  Columns to group by. Omit for global aggregations.
</ParamField>

<ParamField path="aggregate" type="object" required>
  Aggregation columns. At least one is required.
</ParamField>

<ParamField path="batch_size" type="integer">
  Maximum number of rows to accumulate before flushing to the landing table. Falls back to the engine default when unset.
</ParamField>

<ParamField path="batch_flush_interval" type="string">
  Maximum time to wait before flushing a partial batch, parsed as a [humantime](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) duration (for example, `"500ms"`, `"1s"`, `"2s"`). Falls back to the engine default when unset.
</ParamField>

### Group by columns

Each group by column can have:

<ParamField path="from" type="string">
  Source column name. Defaults to the key name if omitted.
</ParamField>

<ParamField path="type" type="string">
  PostgreSQL type override (e.g., `varchar(100)`, `text`)
</ParamField>

### Aggregate columns

Each aggregate column requires:

<ParamField path="fn" type="string" required>
  Aggregation function: `sum`, `count`, `avg`, `min`, or `max`
</ParamField>

<ParamField path="from" type="string">
  Source column name. Defaults to the key name if omitted. Not required for `count`.
</ParamField>

<ParamField path="type" type="string">
  PostgreSQL type override (e.g., `numeric(30,5)`)
</ParamField>

## Supported aggregation functions

The trigger inspects each record's `_gs_op` column (`i` = insert, `u` = update, `d` = delete) to decide how to contribute the value to the running aggregate.

| Function | Supports inserts | Supports deletes | Supports updates | Notes                                             |
| -------- | ---------------- | ---------------- | ---------------- | ------------------------------------------------- |
| `sum`    | Yes              | Yes              | No               | Updates treated as new values                     |
| `count`  | Yes              | Yes              | Yes              | Inserts add +1, updates add 0, deletes add -1     |
| `avg`    | Yes              | Yes              | No               | Stored as `<name>_sum` and `<name>_count` columns |
| `min`    | Yes              | No               | No               | Use only with insert-only streams                 |
| `max`    | Yes              | No               | No               | Use only with insert-only streams                 |

<Warning>
  **`sum` and `avg` do not support updates**: These functions cannot correctly handle updates because they would need the old value to compute the delta. The trigger treats an update like an insert and adds the new value on top of the previous aggregate, double-counting the row. Use only with insert/delete streams.

  **`min` and `max` do not support deletes or updates**: These functions cannot retract values without rescanning the entire landing table. Use only with insert-only streams.
</Warning>

## Deduplication

The landing table uses append-only mode with a composite primary key (`primary_key` + `_gs_op`). This enables:

* **Deduplication within checkpoint window**: Duplicate records (same primary key and operation type) arriving within the same checkpoint epoch are deduplicated via upsert. Records with the same key but different operations (e.g., insert and delete) are stored separately.
* **Checkpoint-based truncation**: The landing table includes a `_gs_checkpoint_epoch` column. When a checkpoint is finalized, data from previous epochs is deleted, keeping the landing table small.

<Note>
  Deduplication only works within the checkpoint window. Records arriving in different checkpoint epochs are not deduplicated.
</Note>

## Examples

### Multiple aggregations with type override

```yaml theme={null}
sinks:
  market_stats:
    type: postgres_aggregate
    from: trades
    schema: trading
    landing_table: trade_log
    agg_table: market_stats
    primary_key: trade_id
    secret_name: MY_POSTGRES
    group_by:
      market:
        type: varchar(100)
    aggregate:
      trade_count:
        fn: count
      total_volume:
        from: volume
        fn: sum
        type: numeric(30,5)
      avg_price:
        from: price
        fn: avg
```

### Global aggregation (no group by)

For aggregations across all records without grouping:

```yaml theme={null}
sinks:
  totals:
    type: postgres_aggregate
    from: events
    schema: analytics
    landing_table: event_log
    agg_table: global_totals
    primary_key: event_id
    secret_name: MY_POSTGRES
    aggregate:
      total_count:
        fn: count
```

When no `group_by` is defined, a sentinel `key` column is added automatically to the aggregation table.

### Column renaming

Use `from` to map source columns to different output names:

```yaml theme={null}
sinks:
  balances:
    type: postgres_aggregate
    from: transfers
    schema: public
    landing_table: transfer_log
    agg_table: balances
    primary_key: id
    secret_name: MY_POSTGRES
    group_by:
      wallet:
        from: account_address
    aggregate:
      total:
        from: amount
        fn: sum
```

## Generated SQL

The sink automatically creates the trigger function and trigger. Here's an example of the generated SQL for a balance aggregation:

```sql theme={null}
CREATE OR REPLACE FUNCTION "public"."transfer_log_to_account_balances_fn"()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
  INSERT INTO "public"."account_balances" ("account", "balance")
  SELECT new_table."account" AS "account",
         SUM(CASE WHEN new_table."_gs_op" = 'd' THEN -new_table."amount"
                  ELSE new_table."amount" END) AS "balance"
  FROM new_table
  GROUP BY new_table."account"
  ON CONFLICT ("account")
  DO UPDATE SET "balance" = "public"."account_balances"."balance" + EXCLUDED."balance";
  RETURN NULL;
END;
$$;

DROP TRIGGER IF EXISTS "transfer_log_to_account_balances_trigger" ON "public"."transfer_log";
CREATE TRIGGER "transfer_log_to_account_balances_trigger"
AFTER INSERT ON "public"."transfer_log"
REFERENCING NEW TABLE AS new_table
FOR EACH STATEMENT
EXECUTE FUNCTION "public"."transfer_log_to_account_balances_fn"();
```

## Complete pipeline example

```yaml theme={null}
name: token-balances
resource_size: m
description: Track token balances from transfer events

sources:
  transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  usdc_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        lower(recipient) as account,
        CAST(amount AS BIGINT) as amount
      FROM transfers
      WHERE lower(address) = '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'

sinks:
  balances:
    type: postgres_aggregate
    from: usdc_transfers
    schema: public
    landing_table: usdc_transfer_log
    agg_table: usdc_balances
    primary_key: id
    secret_name: MY_POSTGRES
    group_by:
      account:
        type: text
    aggregate:
      balance:
        from: amount
        fn: sum
        type: numeric(30,0)
```

## Best practices

<AccordionGroup>
  <Accordion title="Choose the right aggregation function">
    * Use `count` for counting events (supports all operations)
    * Use `sum` for totals when you have insert/delete streams
    * Use `min`/`max` only for insert-only streams
    * Avoid `avg` with updates; use `sum` and `count` separately if needed
  </Accordion>

  <Accordion title="Use type overrides for precision">
    For financial data, specify explicit numeric precision:

    ```yaml theme={null}
    aggregate:
      balance:
        from: amount
        fn: sum
        type: numeric(30,5)
    ```
  </Accordion>

  <Accordion title="Monitor landing table size">
    The landing table is automatically truncated after checkpoint finalization, but monitor its size during high-throughput periods.
  </Accordion>

  <Accordion title="Use appropriate primary keys">
    Choose a unique, stable identifier for the `primary_key` to ensure proper deduplication:

    ```yaml theme={null}
    primary_key: transaction_hash  # Good: unique per transaction
    ```
  </Accordion>
</AccordionGroup>
