Skip to main content
Dynamic tables are a powerful feature that allows you to maintain updatable lookup tables within your pipeline. Unlike static SQL transforms, dynamic tables can be modified in real-time without redeploying your pipeline.

Key Features

  • Real-time Updates: Add or remove values without pipeline restart by updating a postgres table.
  • SQL Integration: Use dynamic_table_check() function in any SQL transform
  • Pipeline Updates: Update the table with on-chain data in the same pipeline.

Use Cases

Wallet Tracking

Monitor transfers to/from specific wallet addresses

Deduplication

Track processed records to avoid duplicates

Factory Pattern

Track contracts created by a factory

Basic Configuration

transforms:
  my_dynamic_table:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: <table_name>
    secret_name: <secret_name> # A Goldsky Postgres secret
    sql: | # Optional - auto-populate from pipeline data
      SELECT column FROM source
    # Optional PostgreSQL customization
    schema: custom_schema # Default: streamling
    column: custom_column # Default: value
    time_column: custom_time # Default: updated_at

Parameters

type
string
required
Must be dynamic_table
backend_type
string
required
Storage backend: Postgres
backend_entity_name
string
required
The table name in the backend storage. For Postgres, this creates a table in the streamling schema (configurable via the schema field).
secret_name
string
required
The name of a Goldsky secret containing Postgres credentials. Required for the Postgres backend.
sql
string
Optional. SQL query to automatically populate the table from pipeline data.
schema
string
Optional. PostgreSQL schema name for the table. Defaults to streamling.
column
string
Optional. Name of the primary key column storing values. Defaults to value.
time_column
string
Optional. Name of the timestamp column. Defaults to updated_at.

Backend Types

Best for production deployments requiring persistence:
transforms:
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES
Benefits:
  • Data persists across pipeline restarts and failures
  • Can be updated externally via direct SQL — no redeploy needed
  • Indexed primary-key lookups scale to millions of rows
Table Structure: PostgreSQL dynamic tables are created with two columns:
  • A primary key column (default: value) storing the lookup values
  • A timestamp column (default: updated_at) automatically set to the insertion time
Table Location: By default, tables are created in the streamling schema: streamling.tracked_contracts

Custom schema and column names

You can customize the schema, column name, and timestamp column name:
transforms:
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES
    schema: my_app          # Custom schema (default: streamling)
    column: contract_addr   # Custom column name (default: value)
    time_column: created_at # Custom timestamp column (default: updated_at)
This creates a table at my_app.tracked_contracts:
CREATE TABLE my_app.tracked_contracts (
  contract_addr TEXT PRIMARY KEY,
  created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
Use custom schemas to organize dynamic tables by application or environment. For example, use production and staging schemas to separate data.

Using Dynamic Tables in SQL

Once defined, use the dynamic_table_check() function in SQL transforms:
transforms:
  # Define the dynamic table
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Use it in SQL
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE dynamic_table_check('tracked_wallets', to_address)

Function Signature

dynamic_table_check(table_name, value) -> BOOLEAN
  • table_name (TEXT): The transform name of the dynamic table in your pipeline (not the backend_entity_name). Must be a string literal — the same value on every row.
  • value (TEXT): The value to check for existence.
  • Returns: true if the value exists in the table, false otherwise.
See also dynamic_table_check in SQL functions reference.

Auto-Population with SQL

You can automatically populate a dynamic table from your pipeline data:
transforms:
  # Auto-populate from pipeline
  usdc_senders:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: usdc_senders
    secret_name: MY_POSTGRES
    sql: |
      SELECT DISTINCT from_address
      FROM erc20_transfers
      WHERE contract_address = lower('0x...')

  # Use in subsequent transform
  all_usdc_activity:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE dynamic_table_check('usdc_senders', from_address)
         OR dynamic_table_check('usdc_senders', to_address)
When using SQL to populate, the query only supports projections and filters (no joins or aggregations).

Manual Updates

For Postgres backends, you can update the table directly using any Postgres client. (Substitute the schema and column names if you customized them.)
-- Add values
INSERT INTO streamling.tracked_contracts (value) VALUES
  (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174')),
  (lower('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48'));

-- Remove values
DELETE FROM streamling.tracked_contracts
WHERE value = lower('0x...');

-- Check contents (includes updated_at timestamp)
SELECT * FROM streamling.tracked_contracts;

-- Query recently added entries
SELECT * FROM streamling.tracked_contracts
WHERE updated_at > NOW() - INTERVAL '1 hour';
Changes take effect immediately - within seconds, your pipeline will start filtering based on the updated values!

Example: Track Specific Token Contracts

Monitor transfers for specific ERC-20 tokens like USDC:
name: token-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table to store token contract addresses we want to track
  tracked_tokens:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_tokens
    secret_name: MY_POSTGRES

  # Filter to only transfers of tracked tokens
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM polygon_transfers
      WHERE dynamic_table_check('tracked_tokens', address)

sinks:
  postgres_output:
    type: postgres
    from: filtered_transfers
    schema: public
    table: tracked_token_transfers
    secret_name: MY_POSTGRES
    primary_key: id
To start tracking USDC on Polygon:
-- Add USDC contract address (Polygon)
INSERT INTO streamling.tracked_tokens (value) VALUES (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'));

-- Add USDT contract address (Polygon)
INSERT INTO streamling.tracked_tokens (value) VALUES (lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F'));
Within seconds, your pipeline will start capturing transfers for these tokens!

Example: Track Wallet Activity

Monitor all ERC-20 transfers for specific wallets:
name: wallet-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table for wallets we're tracking
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Filter to only transfers involving tracked wallets
  wallet_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN dynamic_table_check('tracked_wallets', sender) THEN 'outgoing'
          WHEN dynamic_table_check('tracked_wallets', recipient) THEN 'incoming'
          ELSE 'unknown'
        END as direction
      FROM polygon_transfers
      WHERE
        dynamic_table_check('tracked_wallets', sender)
        OR dynamic_table_check('tracked_wallets', recipient)

sinks:
  postgres_sink:
    type: postgres
    from: wallet_transfers
    schema: public
    table: wallet_activity
    secret_name: MY_POSTGRES
To add a wallet to track:
INSERT INTO streamling.user_wallets (value) VALUES (lower('0x...your-wallet...'));

Example: Complete Pipeline with Dynamic Tables

This example shows a complete pipeline that uses dynamic tables to filter ERC-20 transfers to specific contracts:
name: erc20-filtered-transfers
resource_size: s

sources:
  polygon_erc20_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Define a dynamic table to store contract addresses we want to track
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES_SECRET

  # Filter transfers to only include our tracked contracts
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        'polygon' as network
      FROM polygon_erc20_transfers
      WHERE dynamic_table_check('tracked_contracts', address)

sinks:
  postgres_sink:
    type: postgres
    schema: public
    table: erc20_transfers
    secret_name: MY_POSTGRES_SECRET
    from: filtered_transfers
    primary_key: id
Add contracts to track: The dynamic table allows you to control which contracts to track without redeploying your pipeline.
-- Add USDC contract on Polygon
INSERT INTO streamling.tracked_contracts (value) VALUES (
  lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174')
);

-- Add USDT contract on Polygon
INSERT INTO streamling.tracked_contracts (value) VALUES (
  lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F')
);

-- View tracked contracts (includes updated_at timestamp)
SELECT * FROM streamling.tracked_contracts;
Within seconds, your pipeline will start processing transfers for these contracts!

Example: Factory Pattern

Track all contracts created by a factory and filter events from those contracts:
name: uniswap-pool-tracker
resource_size: m

sources:
  base_logs:
    type: dataset
    dataset_name: base.raw_logs
    version: 1.0.0
    start_at: earliest

transforms:
  # Decode PoolCreated events from factory
  pool_created_events:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address,
        evm_decode_log(
          '[{"anonymous":false,"inputs":[{"indexed":true,"name":"token0","type":"address"},{"indexed":true,"name":"token1","type":"address"},{"indexed":true,"name":"fee","type":"uint24"},{"indexed":false,"name":"tickSpacing","type":"int24"},{"indexed":false,"name":"pool","type":"address"}],"name":"PoolCreated","type":"event"}]',
          topics,
          data
        ) as decoded,
        block_number,
        block_timestamp
      FROM base_logs
      WHERE
        address = lower('0x33128a8fC17869897dcE68Ed026d694621f6FDfD')
        AND SPLIT_INDEX(topics, ',', 0) = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'

  # Auto-populate dynamic table with factory-created pools
  factory_pools:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: factory_addresses
    secret_name: MY_POSTGRES
    sql: |
      SELECT lower(decoded.event_params[5]) as contract_address
      FROM pool_created_events

  # Decode swap events from raw logs
  decoded_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        transaction_hash,
        address as pool_address,
        evm_decode_log(
          '[{"anonymous":false,"inputs":[{"indexed":true,"name":"sender","type":"address"},{"indexed":true,"name":"recipient","type":"address"},{"indexed":false,"name":"amount0","type":"int256"},{"indexed":false,"name":"amount1","type":"int256"},{"indexed":false,"name":"sqrtPriceX96","type":"uint160"},{"indexed":false,"name":"liquidity","type":"uint128"},{"indexed":false,"name":"tick","type":"int24"}],"name":"Swap","type":"event"}]',
          topics,
          data
        ) as decoded,
        block_timestamp
      FROM base_logs
      WHERE
        SPLIT_INDEX(topics, ',', 0) = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
        AND dynamic_table_check('factory_pools', address)

  # Extract swap data
  pool_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        transaction_hash,
        pool_address,
        decoded.event_params[1] as sender,
        decoded.event_params[2] as recipient,
        decoded.event_params[3] as amount0,
        decoded.event_params[4] as amount1,
        decoded.event_params[5] as sqrt_price_x96,
        decoded.event_params[6] as liquidity,
        decoded.event_params[7] as tick,
        block_timestamp
      FROM decoded_swaps

sinks:
  clickhouse_sink:
    type: clickhouse
    from: pool_swaps
    primary_key: id
    table: uniswap_swaps
    secret_name: MY_CLICKHOUSE
This pattern automatically tracks new pools as they’re created and immediately starts capturing their swap events!

Source Validation

When a dynamic table uses SQL to auto-populate, both the dynamic table and any SQL transform using it must reference the same source or upstream transform.This ensures data consistency and proper synchronization.
Good example:
transforms:
  my_table:
    sql: SELECT value FROM source_1 # Uses source_1

  my_transform:
    sql: |
      SELECT * FROM source_1  # Also uses source_1
      WHERE dynamic_table_check('my_table', value)
Bad example:
transforms:
  my_table:
    sql: SELECT value FROM source_1 # Uses source_1

  my_transform:
    sql: |
      SELECT * FROM source_2  # Uses source_2 - ERROR!
      WHERE dynamic_table_check('my_table', value)

Performance Considerations

  • Lookups are batched and executed in parallel against Postgres (ANY(ARRAY[...]) queries).
  • The value column is PRIMARY KEY, so Postgres uses its unique index automatically.
  • Large tables (millions of entries) work fine as long as the Postgres instance has adequate resources.
  • Postgres backend: each dynamic_table_check() call queries the table directly (no in-process cache), so changes take effect on the next batch — typically within a second or two.
  • Auto-population via SQL: updates flow through with normal pipeline latency.
  • No hard row limit is enforced, but lookup cost scales with table size — keep tables as small as your use case allows.
  • Use specific filters in auto-population SQL to avoid unbounded growth.
  • For long-running pipelines, consider a cleanup strategy (DELETE old rows by updated_at).

Best Practices

1

Use Postgres for production

Always use the Postgres backend for production deployments to ensure data persistence and external updatability.
2

Use specific filters in auto-population SQL

Use specific filters in auto-population SQL to ensure data consistency and proper synchronization.
3

Lowercase string values

Store addresses and other identifiers in lowercase for consistent matching:
INSERT INTO streamling.my_table (value) VALUES (lower('0x...'));
4

Monitor table growth

Periodically check table sizes and clean up old entries by updated_at if needed.