Skip to main content

Overview

Dynamic tables are a powerful feature that allows you to maintain updatable lookup tables within your pipeline. Unlike static SQL transforms, dynamic tables can be modified in real-time without redeploying your pipeline.

Key Features

  • Real-time Updates: Add or remove values without pipeline restart by updating a postgres table.
  • SQL Integration: Use dynamic_table_check() function in any SQL transform
  • Pipeline Updates: Update the table with on-chain data in the same pipeline.

Use Cases

Contract Filtering

Track specific smart contracts and filter events from only those addresses

Wallet Tracking

Monitor transfers to/from specific wallet addresses

Deduplication

Track processed records to avoid duplicates

Factory Pattern

Dynamically track contracts created by a factory contract

Basic Configuration

transforms:
  my_dynamic_table:
    type: dynamic_table
    backend_type: Postgres 
    backend_entity_name: <table_name>
    secret_name: <secret_name>  # A Goldsky Postgres secret
    sql: |  # Optional - auto-populate from pipeline data
      SELECT column FROM source

Parameters

type
string
required
Must be dynamic_table
backend_type
string
required
Storage backend: Postgres (recommended) or InMemory
backend_entity_name
string
required
The table name in the backend storage. For Postgres, this creates a table in the turbo schema.
secret_name
string
Required for Postgres backend. The name of the secret containing database credentials.
sql
string
Optional. SQL query to automatically populate the table from pipeline data.

Backend Types

Best for production deployments requiring persistence:
transforms:
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES
Benefits:
  • Data persists across pipeline restarts
  • Can be updated externally via direct SQL
  • Survives failures
  • Good performance with proper indexing
Table Location: The table is created in the turbo schema: turbo.tracked_contracts

In-Memory Backend

Best for development and testing:
transforms:
  temp_tracking:
    type: dynamic_table
    backend_type: InMemory
    backend_entity_name: temp_data
Benefits:
  • Fastest possible lookups
  • No external dependencies
Limitations:
  • Data lost on restart
  • No external updates possible
  • Limited by memory

Using Dynamic Tables in SQL

Once defined, use the dynamic_table_check() function in SQL transforms:
transforms:
  # Define the dynamic table
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Use it in SQL
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE dynamic_table_check('tracked_wallets', to_address)

Function Signature

dynamic_table_check(table_name: string, value: string) -> boolean
  • table_name: The reference name of the dynamic table (not the backend_entity_name)
  • value: The value to check for existence
  • Returns: true if the value exists in the table, false otherwise

Auto-Population with SQL

You can automatically populate a dynamic table from your pipeline data:
transforms:
  # Auto-populate from pipeline
  usdc_senders:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: usdc_senders
    secret_name: MY_POSTGRES
    sql: |
      SELECT DISTINCT from_address
      FROM erc20_transfers
      WHERE contract_address = lower('0x...')

  # Use in subsequent transform
  all_usdc_activity:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE dynamic_table_check('usdc_senders', from_address)
         OR dynamic_table_check('usdc_senders', to_address)
When using SQL to populate, the query only supports projections and filters (no joins or aggregations).

Manual Updates

For Postgres backends, you can update the table directly:
-- Add values
INSERT INTO turbo.tracked_contracts VALUES
  (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174')),
  (lower('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48'));

-- Remove values
DELETE FROM turbo.tracked_contracts
WHERE value = lower('0x...');

-- Check contents
SELECT * FROM turbo.tracked_contracts;
Changes take effect immediately - within seconds, your pipeline will start filtering based on the updated values!

Example: Track Specific Token Contracts

Monitor transfers for specific ERC-20 tokens like USDC:
name: token-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table to store token contract addresses we want to track
  tracked_tokens:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_tokens
    secret_name: MY_POSTGRES

  # Filter to only transfers of tracked tokens
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM polygon_transfers
      WHERE dynamic_table_check('tracked_tokens', contract_address)

sinks:
  postgres_output:
    type: postgres
    from: filtered_transfers
    schema: public
    table: tracked_token_transfers
    secret_name: MY_POSTGRES
    primary_key: id
To start tracking USDC on Polygon:
-- Add USDC contract address (Polygon)
INSERT INTO turbo.tracked_tokens VALUES (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'));

-- Add USDT contract address (Polygon)
INSERT INTO turbo.tracked_tokens VALUES (lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F'));
Within seconds, your pipeline will start capturing transfers for these tokens!

Example: Track Wallet Activity

Monitor all ERC-20 transfers for specific wallets:
name: wallet-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table for wallets we're tracking
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Filter to only transfers involving tracked wallets
  wallet_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN dynamic_table_check('tracked_wallets', from_address) THEN 'outgoing'
          WHEN dynamic_table_check('tracked_wallets', to_address) THEN 'incoming'
          ELSE 'unknown'
        END as direction
      FROM polygon_transfers
      WHERE
        dynamic_table_check('tracked_wallets', from_address)
        OR dynamic_table_check('tracked_wallets', to_address)

sinks:
  postgres_sink:
    type: postgres
    from: wallet_transfers
    schema: public
    table: wallet_activity
    secret_name: MY_POSTGRES
To add a wallet to track:
INSERT INTO turbo.user_wallets VALUES (lower('0x...your-wallet...'));

Example: Complete Pipeline with Dynamic Tables

This example shows a complete pipeline that uses dynamic tables to filter ERC-20 transfers to specific contracts:
name: erc20-filtered-transfers
resource_size: s

sources:
  polygon_erc20_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Define a dynamic table to store contract addresses we want to track
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES_SECRET

  # Filter transfers to only include our tracked contracts
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        'polygon' as network
      FROM polygon_erc20_transfers
      WHERE dynamic_table_check('tracked_contracts', address)

sinks:
  postgres_sink:
    type: postgres
    schema: public
    table: erc20_transfers
    secret_name: MY_POSTGRES_SECRET
    from: filtered_transfers
    primary_key: id
Add contracts to track: The dynamic table allows you to control which contracts to track without redeploying your pipeline.
-- Add USDC contract on Polygon
INSERT INTO turbo.tracked_contracts VALUES (
  lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174')
);

-- Add USDT contract on Polygon
INSERT INTO turbo.tracked_contracts VALUES (
  lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F')
);

-- View tracked contracts
SELECT * FROM turbo.tracked_contracts;
Within seconds, your pipeline will start processing transfers for these contracts!

Example: Factory Pattern

Track all contracts created by a factory and filter events from those contracts:
name: uniswap-pool-tracker
resource_size: m

sources:
  base_logs:
    type: dataset
    dataset_name: base.raw_logs
    version: 1.0.0
    start_at: earliest

transforms:
  # Decode PoolCreated events from factory
  pool_created_events:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address,
        evm_decode_log(
          'event PoolCreated(address indexed token0, address indexed token1, uint24 indexed fee, int24 tickSpacing, address pool)',
          topics,
          data
        ) as decoded,
        block_number,
        block_timestamp
      FROM base_logs
      WHERE
        address = lower('0x33128a8fC17869897dcE68Ed026d694621f6FDfD')
        AND topics[1] = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'

  # Auto-populate dynamic table with factory-created pools
  factory_pools:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: factory_addresses
    secret_name: MY_POSTGRES
    sql: |
      SELECT lower(decoded['pool']) as contract_address
      FROM pool_created_events

  # Decode swap events from raw logs
  decoded_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        transaction_hash,
        address as pool_address,
        evm_decode_log(
          'event Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)',
          topics,
          data
        ) as decoded,
        block_timestamp
      FROM base_logs
      WHERE
        topics[1] = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
        AND dynamic_table_check('factory_pools', address)

  # Extract swap data
  pool_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        transaction_hash,
        pool_address,
        decoded['sender'] as sender,
        decoded['recipient'] as recipient,
        decoded['amount0'] as amount0,
        decoded['amount1'] as amount1,
        decoded['sqrtPriceX96'] as sqrt_price_x96,
        decoded['liquidity'] as liquidity,
        decoded['tick'] as tick,
        block_timestamp
      FROM decoded_swaps

sinks:
  clickhouse_sink:
    type: clickhouse
    from: pool_swaps
    primary_key: id
    table: uniswap_swaps
    secret_name: MY_CLICKHOUSE
This pattern automatically tracks new pools as they’re created and immediately starts capturing their swap events!

Example: Deduplication

Track processed records to avoid duplicates using an in-memory backend:
transforms:
  # In-memory deduplication - fast but lost on restart
  seen_transactions:
    type: dynamic_table
    backend_type: InMemory
    backend_entity_name: processed_txs
    sql: |
      SELECT transaction_hash
      FROM ethereum_transactions

  new_transactions_only:
    type: sql
    primary_key: transaction_hash
    sql: |
      SELECT *
      FROM ethereum_transactions
      WHERE NOT dynamic_table_check('seen_transactions', transaction_hash)
InMemory vs Postgres for Deduplication:
  • Use InMemory for development, testing, or when processing a bounded dataset where persistence isn’t needed
  • Use Postgres for production when you need deduplication state to survive restarts

Source Validation

When a dynamic table uses SQL to auto-populate, both the dynamic table and any SQL transform using it must reference the same source or upstream transform.This ensures data consistency and proper synchronization.
Good example:
transforms:
  my_table:
    sql: SELECT value FROM source_1  # Uses source_1

  my_transform:
    sql: |
      SELECT * FROM source_1  # Also uses source_1
      WHERE dynamic_table_check('my_table', value)
Bad example:
transforms:
  my_table:
    sql: SELECT value FROM source_1  # Uses source_1

  my_transform:
    sql: |
      SELECT * FROM source_2  # Uses source_2 - ERROR!
      WHERE dynamic_table_check('my_table', value)

Performance Considerations

  • Dynamic table checks are optimized for fast lookups
  • Postgres backend automatically creates indexes
  • In-memory backend provides sub-microsecond lookups
  • Large tables (millions of entries) work fine with proper indexing
  • Postgres backend: Changes visible within 1-2 seconds
  • In-memory backend: Not externally updatable
  • Auto-population via SQL: Updates flow through with normal pipeline latency
  • Keep dynamic tables reasonably sized (< 10M entries for best performance)
  • Use specific filters in auto-population SQL
  • Consider table cleanup strategies for growing datasets

Best Practices

1

Use Postgres for production

Always use the Postgres backend for production deployments to ensure data persistence and external updatability.
2

Lowercase string values

Store addresses and other identifiers in lowercase for consistent matching:
INSERT INTO turbo.my_table VALUES (lower('0x...'));
3

Index large tables

For custom Postgres tables, add indexes:
CREATE INDEX idx_value ON turbo.my_table(value);
4

Monitor table growth

Periodically check table sizes and clean up old entries if needed.

Next Steps