Skip to main content
Dynamic tables are a powerful feature that allows you to maintain updatable lookup tables within your pipeline. Unlike static SQL transforms, dynamic tables can be modified in real-time without redeploying your pipeline.

Key Features

  • Real-time Updates: Add or remove values without pipeline restart by updating a postgres table.
  • SQL Integration: Use dynamic_table_check() function in any SQL transform
  • Pipeline Updates: Update the table with on-chain data in the same pipeline.

Use Cases

Wallet Tracking

Monitor transfers to/from specific wallet addresses

Deduplication

Track processed records to avoid duplicates

Factory Pattern

Track contracts created by a factory

Basic Configuration

transforms:
  my_dynamic_table:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: <table_name>
    secret_name: <secret_name> # A Goldsky Postgres secret
    schema: streamling # Optional - defaults to 'streamling'
    column: value # Optional - defaults to 'value'
    sql: | # Optional - auto-populate from pipeline data
      SELECT column FROM source

Parameters

type
string
required
Must be dynamic_table
backend_type
string
required
Storage backend: Postgres (recommended) or InMemory
backend_entity_name
string
required
The table name in the backend storage. For Postgres, this creates a table in the turbo schema.
secret_name
string
Required for Postgres backend. The name of the secret containing database credentials.
sql
string
Optional. SQL query to automatically populate the table from pipeline data.
schema
string
default:"turbo"
The schema where the dynamic table is created. Defaults to turbo.
column
string
default:"value"
The column name in the dynamic table. Defaults to value. The table is created with a single column unless you specify a custom schema.

Backend Types

Best for production deployments requiring persistence:
transforms:
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES
Benefits:
  • Data persists across pipeline restarts
  • Can be updated externally via direct SQL
  • Survives failures
  • Good performance with proper indexing
Table Location: The table is created in the turbo schema: turbo.tracked_contracts

In-Memory Backend

Best for development and testing:
transforms:
  temp_tracking:
    type: dynamic_table
    backend_type: InMemory
    backend_entity_name: temp_data
Benefits:
  • Fastest possible lookups
  • No external dependencies
Limitations:
  • Data lost on restart
  • No external updates possible
  • Limited by memory

Using Dynamic Tables in SQL

Once defined, use the dynamic_table_check() function in SQL transforms:
transforms:
  # Define the dynamic table
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Use it in SQL
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE dynamic_table_check('tracked_wallets', to_address)

Function Signature

dynamic_table_check(table_name: string, value: string) -> boolean
  • table_name: The reference name of the dynamic table (not the backend_entity_name)
  • value: The value to check for existence
  • Returns: true if the value exists in the table, false otherwise

Auto-Population with SQL

You can automatically populate a dynamic table from your pipeline data:
transforms:
  # Auto-populate from pipeline
  usdc_senders:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: usdc_senders
    secret_name: MY_POSTGRES
    sql: |
      SELECT DISTINCT from_address
      FROM erc20_transfers
      WHERE contract_address = lower('0x...')

  # Use in subsequent transform
  all_usdc_activity:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE dynamic_table_check('usdc_senders', from_address)
         OR dynamic_table_check('usdc_senders', to_address)
When using SQL to populate, the query only supports projections and filters (no joins or aggregations).

Manual Updates

For Postgres backends, you can update the table directly:
-- Add values
INSERT INTO turbo.tracked_contracts VALUES
  (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174')),
  (lower('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48'));

-- Remove values
DELETE FROM turbo.tracked_contracts
WHERE value = lower('0x...');

-- Check contents
SELECT * FROM turbo.tracked_contracts;
Changes take effect immediately - within seconds, your pipeline will start filtering based on the updated values!

Example: Track Specific Token Contracts

Monitor transfers for specific ERC-20 tokens like USDC:
name: token-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table to store token contract addresses we want to track
  tracked_tokens:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_tokens
    secret_name: MY_POSTGRES

  # Filter to only transfers of tracked tokens
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM polygon_transfers
      WHERE dynamic_table_check('tracked_tokens', contract_address)

sinks:
  postgres_output:
    type: postgres
    from: filtered_transfers
    schema: public
    table: tracked_token_transfers
    secret_name: MY_POSTGRES
    primary_key: id
To start tracking USDC on Polygon:
-- Add USDC contract address (Polygon)
INSERT INTO turbo.tracked_tokens VALUES (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'));

-- Add USDT contract address (Polygon)
INSERT INTO turbo.tracked_tokens VALUES (lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F'));
Within seconds, your pipeline will start capturing transfers for these tokens!

Example: Track Wallet Activity

Monitor all ERC-20 transfers for specific wallets:
name: wallet-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table for wallets we're tracking
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Filter to only transfers involving tracked wallets
  wallet_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN dynamic_table_check('tracked_wallets', from_address) THEN 'outgoing'
          WHEN dynamic_table_check('tracked_wallets', to_address) THEN 'incoming'
          ELSE 'unknown'
        END as direction
      FROM polygon_transfers
      WHERE
        dynamic_table_check('tracked_wallets', from_address)
        OR dynamic_table_check('tracked_wallets', to_address)

sinks:
  postgres_sink:
    type: postgres
    from: wallet_transfers
    schema: public
    table: wallet_activity
    secret_name: MY_POSTGRES
To add a wallet to track:
INSERT INTO turbo.user_wallets VALUES (lower('0x...your-wallet...'));

Example: Complete Pipeline with Dynamic Tables

This example shows a complete pipeline that uses dynamic tables to filter ERC-20 transfers to specific contracts:
name: erc20-filtered-transfers
resource_size: s

sources:
  polygon_erc20_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Define a dynamic table to store contract addresses we want to track
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES_SECRET

  # Filter transfers to only include our tracked contracts
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        'polygon' as network
      FROM polygon_erc20_transfers
      WHERE dynamic_table_check('tracked_contracts', address)

sinks:
  postgres_sink:
    type: postgres
    schema: public
    table: erc20_transfers
    secret_name: MY_POSTGRES_SECRET
    from: filtered_transfers
    primary_key: id
Add contracts to track: The dynamic table allows you to control which contracts to track without redeploying your pipeline.
-- Add USDC contract on Polygon
INSERT INTO turbo.tracked_contracts VALUES (
  lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174')
);

-- Add USDT contract on Polygon
INSERT INTO turbo.tracked_contracts VALUES (
  lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F')
);

-- View tracked contracts
SELECT * FROM turbo.tracked_contracts;
Within seconds, your pipeline will start processing transfers for these contracts!

Example: Factory Pattern

Track all contracts created by a factory and filter events from those contracts:
name: uniswap-pool-tracker
resource_size: m

sources:
  base_logs:
    type: dataset
    dataset_name: base.raw_logs
    version: 1.0.0
    start_at: earliest

transforms:
  # Decode PoolCreated events from factory
  pool_created_events:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address,
        evm_decode_log(
          'event PoolCreated(address indexed token0, address indexed token1, uint24 indexed fee, int24 tickSpacing, address pool)',
          topics,
          data
        ) as decoded,
        block_number,
        block_timestamp
      FROM base_logs
      WHERE
        address = lower('0x33128a8fC17869897dcE68Ed026d694621f6FDfD')
        AND topics[1] = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'

  # Auto-populate dynamic table with factory-created pools
  factory_pools:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: factory_addresses
    secret_name: MY_POSTGRES
    sql: |
      SELECT lower(decoded['pool']) as contract_address
      FROM pool_created_events

  # Decode swap events from raw logs
  decoded_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        transaction_hash,
        address as pool_address,
        evm_decode_log(
          'event Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)',
          topics,
          data
        ) as decoded,
        block_timestamp
      FROM base_logs
      WHERE
        topics[1] = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
        AND dynamic_table_check('factory_pools', address)

  # Extract swap data
  pool_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        transaction_hash,
        pool_address,
        decoded['sender'] as sender,
        decoded['recipient'] as recipient,
        decoded['amount0'] as amount0,
        decoded['amount1'] as amount1,
        decoded['sqrtPriceX96'] as sqrt_price_x96,
        decoded['liquidity'] as liquidity,
        decoded['tick'] as tick,
        block_timestamp
      FROM decoded_swaps

sinks:
  clickhouse_sink:
    type: clickhouse
    from: pool_swaps
    primary_key: id
    table: uniswap_swaps
    secret_name: MY_CLICKHOUSE
This pattern automatically tracks new pools as they’re created and immediately starts capturing their swap events!

Example: Deduplication

Track processed records to avoid duplicates using an in-memory backend:
transforms:
  # In-memory deduplication - fast but lost on restart
  seen_transactions:
    type: dynamic_table
    backend_type: InMemory
    backend_entity_name: processed_txs
    sql: |
      SELECT transaction_hash
      FROM ethereum_transactions

  new_transactions_only:
    type: sql
    primary_key: transaction_hash
    sql: |
      SELECT *
      FROM ethereum_transactions
      WHERE NOT dynamic_table_check('seen_transactions', transaction_hash)
InMemory vs Postgres for Deduplication: - Use InMemory for development, testing, or when processing a bounded dataset where persistence isn’t needed - Use Postgres for production when you need deduplication state to survive restarts

Source Validation

When a dynamic table uses SQL to auto-populate, both the dynamic table and any SQL transform using it must reference the same source or upstream transform.This ensures data consistency and proper synchronization.
Good example:
transforms:
  my_table:
    sql: SELECT value FROM source_1 # Uses source_1

  my_transform:
    sql: |
      SELECT * FROM source_1  # Also uses source_1
      WHERE dynamic_table_check('my_table', value)
Bad example:
transforms:
  my_table:
    sql: SELECT value FROM source_1 # Uses source_1

  my_transform:
    sql: |
      SELECT * FROM source_2  # Uses source_2 - ERROR!
      WHERE dynamic_table_check('my_table', value)

Performance Considerations

  • Dynamic table checks are optimized for fast lookups
  • Postgres backend automatically creates indexes
  • In-memory backend provides sub-microsecond lookups
  • Large tables (millions of entries) work fine with proper indexing
  • Postgres backend: Changes visible within 1-2 seconds - In-memory backend: Not externally updatable - Auto-population via SQL: Updates flow through with normal pipeline latency
  • Keep dynamic tables reasonably sized (< 10M entries for best performance)
  • Use specific filters in auto-population SQL
  • Consider table cleanup strategies for growing datasets

Best Practices

1

Use Postgres for production

Always use the Postgres backend for production deployments to ensure data persistence and external updatability.
2

Use specific filters in auto-population SQL

Use specific filters in auto-population SQL to ensure data consistency and proper synchronization.
3

Lowercase string values

Store addresses and other identifiers in lowercase for consistent matching: sql INSERT INTO turbo.my_table VALUES (lower('0x...'));
4

Index large tables

For custom Postgres tables, add indexes: sql CREATE INDEX idx_value ON turbo.my_table(value);
5

Monitor table growth

Periodically check table sizes and clean up old entries if needed.