> ## Documentation Index
> Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Dynamic Tables

> Create real-time lookup tables for filtering and enrichment

Dynamic tables are a powerful feature that allows you to maintain updatable lookup tables within your pipeline. Unlike static SQL transforms, dynamic tables can be modified in real-time without redeploying your pipeline.

## Key Features

* **Real-time Updates**: Add or remove values without pipeline restart by updating a postgres table.
* **SQL Integration**: Use `dynamic_table_check()` function in any SQL transform
* **Pipeline Updates**: Update the table with on-chain data in the same pipeline.

## Use Cases

<CardGroup cols={3}>
  <Card title="Wallet Tracking" icon="wallet">
    Monitor transfers to/from specific wallet addresses
  </Card>

  <Card title="Deduplication" icon="check-check">
    Track processed records to avoid duplicates
  </Card>

  <Card title="Factory Pattern" icon="factory">
    Track contracts created by a factory
  </Card>
</CardGroup>

## Basic Configuration

```yaml theme={null}
transforms:
  my_dynamic_table:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: <table_name>
    secret_name: <secret_name> # A Goldsky Postgres secret
    sql: | # Optional - auto-populate from pipeline data
      SELECT column FROM source
    # Optional PostgreSQL customization
    schema: custom_schema # Default: streamling
    column: custom_column # Default: value
    time_column: custom_time # Default: updated_at
```

### Parameters

<ParamField path="type" type="string" required>
  Must be `dynamic_table`
</ParamField>

<ParamField path="backend_type" type="string" required>
  Storage backend: `Postgres`
</ParamField>

<ParamField path="backend_entity_name" type="string" required>
  The table name in the backend storage. For Postgres, this creates a table in
  the `streamling` schema (configurable via the `schema` field).
</ParamField>

<ParamField path="secret_name" type="string" required>
  The name of a Goldsky secret containing Postgres credentials. Required for
  the Postgres backend.
</ParamField>

<ParamField path="sql" type="string">
  Optional. SQL query to automatically populate the table from pipeline data.
</ParamField>

<ParamField path="schema" type="string">
  Optional. PostgreSQL schema name for the table. Defaults to `streamling`.
</ParamField>

<ParamField path="column" type="string">
  Optional. Name of the primary key column storing values. Defaults to `value`.
</ParamField>

<ParamField path="time_column" type="string">
  Optional. Name of the timestamp column. Defaults to `updated_at`.
</ParamField>

## Backend Types

### PostgreSQL Backend (Recommended)

Best for production deployments requiring persistence:

```yaml theme={null}
transforms:
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES
```

**Benefits:**

* Data persists across pipeline restarts and failures
* Can be updated externally via direct SQL — no redeploy needed
* Indexed primary-key lookups scale to millions of rows

**Table Structure:**

PostgreSQL dynamic tables are created with two columns:

* A primary key column (default: `value`) storing the lookup values
* A timestamp column (default: `updated_at`) automatically set to the insertion time

**Table Location:**
By default, tables are created in the `streamling` schema: `streamling.tracked_contracts`

#### Custom schema and column names

You can customize the schema, column name, and timestamp column name:

```yaml theme={null}
transforms:
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES
    schema: my_app          # Custom schema (default: streamling)
    column: contract_addr   # Custom column name (default: value)
    time_column: created_at # Custom timestamp column (default: updated_at)
```

This creates a table at `my_app.tracked_contracts`:

```sql theme={null}
CREATE TABLE my_app.tracked_contracts (
  contract_addr TEXT PRIMARY KEY,
  created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
```

<Tip>
  Use custom schemas to organize dynamic tables by application or environment. For example, use `production` and `staging` schemas to separate data.
</Tip>

## Using Dynamic Tables in SQL

Once defined, use the `dynamic_table_check()` function in SQL transforms:

```yaml theme={null}
transforms:
  # Define the dynamic table
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Use it in SQL
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE dynamic_table_check('tracked_wallets', to_address)
```

### Function Signature

```sql theme={null}
dynamic_table_check(table_name, value) -> BOOLEAN
```

* **table\_name** (`TEXT`): The transform name of the dynamic table in your pipeline (not the `backend_entity_name`). Must be a string literal — the same value on every row.
* **value** (`TEXT`): The value to check for existence.
* **Returns**: `true` if the value exists in the table, `false` otherwise.

See also [`dynamic_table_check` in SQL functions reference](/turbo-pipelines/reference/sql-functions#dynamic-table-check).

## Auto-Population with SQL

You can automatically populate a dynamic table from your pipeline data:

```yaml theme={null}
transforms:
  # Auto-populate from pipeline
  usdc_senders:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: usdc_senders
    secret_name: MY_POSTGRES
    sql: |
      SELECT DISTINCT from_address
      FROM erc20_transfers
      WHERE contract_address = lower('0x...')

  # Use in subsequent transform
  all_usdc_activity:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE dynamic_table_check('usdc_senders', from_address)
         OR dynamic_table_check('usdc_senders', to_address)
```

<Info>
  When using SQL to populate, the query only supports projections and filters
  (no joins or aggregations).
</Info>

## Manual Updates

For Postgres backends, you can update the table directly using any Postgres client. (Substitute the schema and column names if you customized them.)

```sql theme={null}
-- Add values
INSERT INTO streamling.tracked_contracts (value) VALUES
  (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174')),
  (lower('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48'));

-- Remove values
DELETE FROM streamling.tracked_contracts
WHERE value = lower('0x...');

-- Check contents (includes updated_at timestamp)
SELECT * FROM streamling.tracked_contracts;

-- Query recently added entries
SELECT * FROM streamling.tracked_contracts
WHERE updated_at > NOW() - INTERVAL '1 hour';
```

<Tip>
  Changes take effect immediately - within seconds, your pipeline will start
  filtering based on the updated values!
</Tip>

## Example: Track Specific Token Contracts

Monitor transfers for specific ERC-20 tokens like USDC:

```yaml theme={null}
name: token-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table to store token contract addresses we want to track
  tracked_tokens:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_tokens
    secret_name: MY_POSTGRES

  # Filter to only transfers of tracked tokens
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM polygon_transfers
      WHERE dynamic_table_check('tracked_tokens', address)

sinks:
  postgres_output:
    type: postgres
    from: filtered_transfers
    schema: public
    table: tracked_token_transfers
    secret_name: MY_POSTGRES
    primary_key: id
```

**To start tracking USDC on Polygon:**

```sql theme={null}
-- Add USDC contract address (Polygon)
INSERT INTO streamling.tracked_tokens (value) VALUES (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'));

-- Add USDT contract address (Polygon)
INSERT INTO streamling.tracked_tokens (value) VALUES (lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F'));
```

Within seconds, your pipeline will start capturing transfers for these tokens!

## Example: Track Wallet Activity

Monitor all ERC-20 transfers for specific wallets:

```yaml theme={null}
name: wallet-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table for wallets we're tracking
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Filter to only transfers involving tracked wallets
  wallet_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN dynamic_table_check('tracked_wallets', sender) THEN 'outgoing'
          WHEN dynamic_table_check('tracked_wallets', recipient) THEN 'incoming'
          ELSE 'unknown'
        END as direction
      FROM polygon_transfers
      WHERE
        dynamic_table_check('tracked_wallets', sender)
        OR dynamic_table_check('tracked_wallets', recipient)

sinks:
  postgres_sink:
    type: postgres
    from: wallet_transfers
    schema: public
    table: wallet_activity
    secret_name: MY_POSTGRES
```

**To add a wallet to track:**

```sql theme={null}
INSERT INTO streamling.user_wallets (value) VALUES (lower('0x...your-wallet...'));
```

## Example: Complete Pipeline with Dynamic Tables

This example shows a complete pipeline that uses dynamic tables to filter ERC-20 transfers to specific contracts:

```yaml theme={null}
name: erc20-filtered-transfers
resource_size: s

sources:
  polygon_erc20_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Define a dynamic table to store contract addresses we want to track
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES_SECRET

  # Filter transfers to only include our tracked contracts
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        'polygon' as network
      FROM polygon_erc20_transfers
      WHERE dynamic_table_check('tracked_contracts', address)

sinks:
  postgres_sink:
    type: postgres
    schema: public
    table: erc20_transfers
    secret_name: MY_POSTGRES_SECRET
    from: filtered_transfers
    primary_key: id
```

**Add contracts to track:**

The dynamic table allows you to control which contracts to track without redeploying your pipeline.

```sql theme={null}
-- Add USDC contract on Polygon
INSERT INTO streamling.tracked_contracts (value) VALUES (
  lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174')
);

-- Add USDT contract on Polygon
INSERT INTO streamling.tracked_contracts (value) VALUES (
  lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F')
);

-- View tracked contracts (includes updated_at timestamp)
SELECT * FROM streamling.tracked_contracts;
```

Within seconds, your pipeline will start processing transfers for these contracts!

## Example: Factory Pattern

Track all contracts created by a factory and filter events from those contracts:

```yaml theme={null}
name: uniswap-pool-tracker
resource_size: m

sources:
  base_logs:
    type: dataset
    dataset_name: base.raw_logs
    version: 1.0.0
    start_at: earliest

transforms:
  # Decode PoolCreated events from factory
  pool_created_events:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address,
        evm_decode_log(
          '[{"anonymous":false,"inputs":[{"indexed":true,"name":"token0","type":"address"},{"indexed":true,"name":"token1","type":"address"},{"indexed":true,"name":"fee","type":"uint24"},{"indexed":false,"name":"tickSpacing","type":"int24"},{"indexed":false,"name":"pool","type":"address"}],"name":"PoolCreated","type":"event"}]',
          topics,
          data
        ) as decoded,
        block_number,
        block_timestamp
      FROM base_logs
      WHERE
        address = lower('0x33128a8fC17869897dcE68Ed026d694621f6FDfD')
        AND SPLIT_INDEX(topics, ',', 0) = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'

  # Auto-populate dynamic table with factory-created pools
  factory_pools:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: factory_addresses
    secret_name: MY_POSTGRES
    sql: |
      SELECT lower(decoded.event_params[5]) as contract_address
      FROM pool_created_events

  # Decode swap events from raw logs
  decoded_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        transaction_hash,
        address as pool_address,
        evm_decode_log(
          '[{"anonymous":false,"inputs":[{"indexed":true,"name":"sender","type":"address"},{"indexed":true,"name":"recipient","type":"address"},{"indexed":false,"name":"amount0","type":"int256"},{"indexed":false,"name":"amount1","type":"int256"},{"indexed":false,"name":"sqrtPriceX96","type":"uint160"},{"indexed":false,"name":"liquidity","type":"uint128"},{"indexed":false,"name":"tick","type":"int24"}],"name":"Swap","type":"event"}]',
          topics,
          data
        ) as decoded,
        block_timestamp
      FROM base_logs
      WHERE
        SPLIT_INDEX(topics, ',', 0) = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
        AND dynamic_table_check('factory_pools', address)

  # Extract swap data
  pool_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        transaction_hash,
        pool_address,
        decoded.event_params[1] as sender,
        decoded.event_params[2] as recipient,
        decoded.event_params[3] as amount0,
        decoded.event_params[4] as amount1,
        decoded.event_params[5] as sqrt_price_x96,
        decoded.event_params[6] as liquidity,
        decoded.event_params[7] as tick,
        block_timestamp
      FROM decoded_swaps

sinks:
  clickhouse_sink:
    type: clickhouse
    from: pool_swaps
    primary_key: id
    table: uniswap_swaps
    secret_name: MY_CLICKHOUSE
```

This pattern automatically tracks new pools as they're created and immediately starts capturing their swap events!

## Source Validation

<Warning>
  When a dynamic table uses SQL to auto-populate, **both the dynamic table and any SQL transform using it must reference the same source or upstream transform**.

  This ensures data consistency and proper synchronization.
</Warning>

Good example:

```yaml theme={null}
transforms:
  my_table:
    sql: SELECT value FROM source_1 # Uses source_1

  my_transform:
    sql: |
      SELECT * FROM source_1  # Also uses source_1
      WHERE dynamic_table_check('my_table', value)
```

Bad example:

```yaml theme={null}
transforms:
  my_table:
    sql: SELECT value FROM source_1 # Uses source_1

  my_transform:
    sql: |
      SELECT * FROM source_2  # Uses source_2 - ERROR!
      WHERE dynamic_table_check('my_table', value)
```

## Performance Considerations

<AccordionGroup>
  <Accordion title="Lookup Performance">
    * Lookups are batched and executed in parallel against Postgres (`ANY(ARRAY[...])` queries).
    * The value column is `PRIMARY KEY`, so Postgres uses its unique index automatically.
    * Large tables (millions of entries) work fine as long as the Postgres instance has adequate resources.
  </Accordion>

  <Accordion title="Update Latency">
    * Postgres backend: each `dynamic_table_check()` call queries the table directly (no in-process cache), so changes take effect on the next batch — typically within a second or two.
    * Auto-population via SQL: updates flow through with normal pipeline latency.
  </Accordion>

  <Accordion title="Table Size">
    * No hard row limit is enforced, but lookup cost scales with table size — keep tables as small as your use case allows.
    * Use specific filters in auto-population SQL to avoid unbounded growth.
    * For long-running pipelines, consider a cleanup strategy (`DELETE` old rows by `updated_at`).
  </Accordion>
</AccordionGroup>

## Best Practices

<Steps>
  <Step title="Use Postgres for production">
    Always use the Postgres backend for production deployments to ensure data persistence and external updatability.
  </Step>

  <Step title="Use specific filters in auto-population SQL">
    Use specific filters in auto-population SQL to ensure data consistency and proper synchronization.
  </Step>

  <Step title="Lowercase string values">
    Store addresses and other identifiers in lowercase for consistent matching:

    ```sql theme={null}
    INSERT INTO streamling.my_table (value) VALUES (lower('0x...'));
    ```
  </Step>

  <Step title="Monitor table growth">
    Periodically check table sizes and clean up old entries by `updated_at` if needed.
  </Step>
</Steps>
