Skip to main content
Dynamic tables are a powerful feature that allows you to maintain updatable lookup tables within your pipeline. Unlike static SQL transforms, dynamic tables can be modified in real-time without redeploying your pipeline.

Key Features

  • Real-time Updates: Add or remove values without pipeline restart by updating a postgres table.
  • SQL Integration: Use dynamic_table_check() function in any SQL transform
  • Pipeline Updates: Update the table with on-chain data in the same pipeline.

Use Cases

Wallet Tracking

Monitor transfers to/from specific wallet addresses

Deduplication

Track processed records to avoid duplicates

Factory Pattern

Track contracts created by a factory

Basic Configuration

transforms:
  my_dynamic_table:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: <table_name>
    secret_name: <secret_name> # A Goldsky Postgres secret
    schema: streamling # Optional - defaults to 'streamling'
    column: value # Optional - defaults to 'value'
    sql: | # Optional - auto-populate from pipeline data
      SELECT column FROM source
    # Optional PostgreSQL customization
    schema: custom_schema # Default: streamling
    column: custom_column # Default: value
    time_column: custom_time # Default: updated_at

Parameters

type
string
required
Must be dynamic_table
backend_type
string
required
Storage backend: Postgres (recommended) or InMemory
backend_entity_name
string
required
The table name in the backend storage. For Postgres, this creates a table in the turbo schema.
secret_name
string
Required for Postgres backend. The name of the secret containing database credentials.
sql
string
Optional. SQL query to automatically populate the table from pipeline data.
schema
string
Optional. PostgreSQL schema name for the table. Defaults to streamling.
column
string
Optional. Name of the primary key column storing values. Defaults to value.
time_column
string
Optional. Name of the timestamp column. Defaults to updated_at.

Backend Types

Best for production deployments requiring persistence:
transforms:
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES
Benefits:
  • Data persists across pipeline restarts
  • Can be updated externally via direct SQL
  • Survives failures
  • Good performance with proper indexing
Table Structure: PostgreSQL dynamic tables are created with two columns:
  • A primary key column (default: value) storing the lookup values
  • A timestamp column (default: updated_at) automatically set to the insertion time
Table Location: By default, tables are created in the streamling schema: streamling.tracked_contracts

Custom schema and column names

You can customize the schema, column name, and timestamp column name:
transforms:
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES
    schema: my_app          # Custom schema (default: streamling)
    column: contract_addr   # Custom column name (default: value)
    time_column: created_at # Custom timestamp column (default: updated_at)
This creates a table at my_app.tracked_contracts:
CREATE TABLE my_app.tracked_contracts (
  contract_addr TEXT PRIMARY KEY,
  created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
Use custom schemas to organize dynamic tables by application or environment. For example, use production and staging schemas to separate data.

In-Memory Backend

Best for development and testing:
transforms:
  temp_tracking:
    type: dynamic_table
    backend_type: InMemory
    backend_entity_name: temp_data
Benefits:
  • Fastest possible lookups
  • No external dependencies
Limitations:
  • Data lost on restart
  • No external updates possible
  • Limited by memory

Using Dynamic Tables in SQL

Once defined, use the dynamic_table_check() function in SQL transforms:
transforms:
  # Define the dynamic table
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Use it in SQL
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE dynamic_table_check('tracked_wallets', to_address)

Function Signature

dynamic_table_check(table_name: string, value: string) -> boolean
  • table_name: The reference name of the dynamic table (not the backend_entity_name)
  • value: The value to check for existence
  • Returns: true if the value exists in the table, false otherwise

Auto-Population with SQL

You can automatically populate a dynamic table from your pipeline data:
transforms:
  # Auto-populate from pipeline
  usdc_senders:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: usdc_senders
    secret_name: MY_POSTGRES
    sql: |
      SELECT DISTINCT from_address
      FROM erc20_transfers
      WHERE contract_address = lower('0x...')

  # Use in subsequent transform
  all_usdc_activity:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM erc20_transfers
      WHERE dynamic_table_check('usdc_senders', from_address)
         OR dynamic_table_check('usdc_senders', to_address)
When using SQL to populate, the query only supports projections and filters (no joins or aggregations).

Manual Updates

For Postgres backends, you can update the table directly:
-- Add values
INSERT INTO streamling.tracked_contracts (value) VALUES
  (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174')),
  (lower('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48'));

-- Remove values
DELETE FROM streamling.tracked_contracts
WHERE value = lower('0x...');

-- Check contents (includes updated_at timestamp)
SELECT * FROM streamling.tracked_contracts;

-- Query recently added entries
SELECT * FROM streamling.tracked_contracts
WHERE updated_at > NOW() - INTERVAL '1 hour';
Changes take effect immediately - within seconds, your pipeline will start filtering based on the updated values!

Example: Track Specific Token Contracts

Monitor transfers for specific ERC-20 tokens like USDC:
name: token-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table to store token contract addresses we want to track
  tracked_tokens:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_tokens
    secret_name: MY_POSTGRES

  # Filter to only transfers of tracked tokens
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT *
      FROM polygon_transfers
      WHERE dynamic_table_check('tracked_tokens', contract_address)

sinks:
  postgres_output:
    type: postgres
    from: filtered_transfers
    schema: public
    table: tracked_token_transfers
    secret_name: MY_POSTGRES
    primary_key: id
To start tracking USDC on Polygon:
-- Add USDC contract address (Polygon)
INSERT INTO streamling.tracked_tokens (value) VALUES (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'));

-- Add USDT contract address (Polygon)
INSERT INTO streamling.tracked_tokens (value) VALUES (lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F'));
Within seconds, your pipeline will start capturing transfers for these tokens!

Example: Track Wallet Activity

Monitor all ERC-20 transfers for specific wallets:
name: wallet-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Dynamic table for wallets we're tracking
  tracked_wallets:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: user_wallets
    secret_name: MY_POSTGRES

  # Filter to only transfers involving tracked wallets
  wallet_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        CASE
          WHEN dynamic_table_check('tracked_wallets', from_address) THEN 'outgoing'
          WHEN dynamic_table_check('tracked_wallets', to_address) THEN 'incoming'
          ELSE 'unknown'
        END as direction
      FROM polygon_transfers
      WHERE
        dynamic_table_check('tracked_wallets', from_address)
        OR dynamic_table_check('tracked_wallets', to_address)

sinks:
  postgres_sink:
    type: postgres
    from: wallet_transfers
    schema: public
    table: wallet_activity
    secret_name: MY_POSTGRES
To add a wallet to track:
INSERT INTO streamling.user_wallets (value) VALUES (lower('0x...your-wallet...'));

Example: Complete Pipeline with Dynamic Tables

This example shows a complete pipeline that uses dynamic tables to filter ERC-20 transfers to specific contracts:
name: erc20-filtered-transfers
resource_size: s

sources:
  polygon_erc20_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Define a dynamic table to store contract addresses we want to track
  tracked_contracts:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_contracts
    secret_name: MY_POSTGRES_SECRET

  # Filter transfers to only include our tracked contracts
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        *,
        'polygon' as network
      FROM polygon_erc20_transfers
      WHERE dynamic_table_check('tracked_contracts', address)

sinks:
  postgres_sink:
    type: postgres
    schema: public
    table: erc20_transfers
    secret_name: MY_POSTGRES_SECRET
    from: filtered_transfers
    primary_key: id
Add contracts to track: The dynamic table allows you to control which contracts to track without redeploying your pipeline.
-- Add USDC contract on Polygon
INSERT INTO streamling.tracked_contracts (value) VALUES (
  lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174')
);

-- Add USDT contract on Polygon
INSERT INTO streamling.tracked_contracts (value) VALUES (
  lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F')
);

-- View tracked contracts (includes updated_at timestamp)
SELECT * FROM streamling.tracked_contracts;
Within seconds, your pipeline will start processing transfers for these contracts!

Example: Factory Pattern

Track all contracts created by a factory and filter events from those contracts:
name: uniswap-pool-tracker
resource_size: m

sources:
  base_logs:
    type: dataset
    dataset_name: base.raw_logs
    version: 1.0.0
    start_at: earliest

transforms:
  # Decode PoolCreated events from factory
  pool_created_events:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address,
        evm_decode_log(
          'event PoolCreated(address indexed token0, address indexed token1, uint24 indexed fee, int24 tickSpacing, address pool)',
          topics,
          data
        ) as decoded,
        block_number,
        block_timestamp
      FROM base_logs
      WHERE
        address = lower('0x33128a8fC17869897dcE68Ed026d694621f6FDfD')
        AND topics[1] = '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118'

  # Auto-populate dynamic table with factory-created pools
  factory_pools:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: factory_addresses
    secret_name: MY_POSTGRES
    sql: |
      SELECT lower(decoded['pool']) as contract_address
      FROM pool_created_events

  # Decode swap events from raw logs
  decoded_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        transaction_hash,
        address as pool_address,
        evm_decode_log(
          'event Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)',
          topics,
          data
        ) as decoded,
        block_timestamp
      FROM base_logs
      WHERE
        topics[1] = '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'
        AND dynamic_table_check('factory_pools', address)

  # Extract swap data
  pool_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        transaction_hash,
        pool_address,
        decoded['sender'] as sender,
        decoded['recipient'] as recipient,
        decoded['amount0'] as amount0,
        decoded['amount1'] as amount1,
        decoded['sqrtPriceX96'] as sqrt_price_x96,
        decoded['liquidity'] as liquidity,
        decoded['tick'] as tick,
        block_timestamp
      FROM decoded_swaps

sinks:
  clickhouse_sink:
    type: clickhouse
    from: pool_swaps
    primary_key: id
    table: uniswap_swaps
    secret_name: MY_CLICKHOUSE
This pattern automatically tracks new pools as they’re created and immediately starts capturing their swap events!

Example: Deduplication

Track processed records to avoid duplicates using an in-memory backend:
transforms:
  # In-memory deduplication - fast but lost on restart
  seen_transactions:
    type: dynamic_table
    backend_type: InMemory
    backend_entity_name: processed_txs
    sql: |
      SELECT transaction_hash
      FROM ethereum_transactions

  new_transactions_only:
    type: sql
    primary_key: transaction_hash
    sql: |
      SELECT *
      FROM ethereum_transactions
      WHERE NOT dynamic_table_check('seen_transactions', transaction_hash)
InMemory vs Postgres for Deduplication: - Use InMemory for development, testing, or when processing a bounded dataset where persistence isn’t needed - Use Postgres for production when you need deduplication state to survive restarts

Source Validation

When a dynamic table uses SQL to auto-populate, both the dynamic table and any SQL transform using it must reference the same source or upstream transform.This ensures data consistency and proper synchronization.
Good example:
transforms:
  my_table:
    sql: SELECT value FROM source_1 # Uses source_1

  my_transform:
    sql: |
      SELECT * FROM source_1  # Also uses source_1
      WHERE dynamic_table_check('my_table', value)
Bad example:
transforms:
  my_table:
    sql: SELECT value FROM source_1 # Uses source_1

  my_transform:
    sql: |
      SELECT * FROM source_2  # Uses source_2 - ERROR!
      WHERE dynamic_table_check('my_table', value)

Performance Considerations

  • Dynamic table checks are optimized for fast lookups
  • Postgres backend automatically creates indexes
  • In-memory backend provides sub-microsecond lookups
  • Large tables (millions of entries) work fine with proper indexing
  • Postgres backend: Changes visible within 1-2 seconds - In-memory backend: Not externally updatable - Auto-population via SQL: Updates flow through with normal pipeline latency
  • Keep dynamic tables reasonably sized (< 10M entries for best performance)
  • Use specific filters in auto-population SQL
  • Consider table cleanup strategies for growing datasets

Best Practices

1

Use Postgres for production

Always use the Postgres backend for production deployments to ensure data persistence and external updatability.
2

Use specific filters in auto-population SQL

Use specific filters in auto-population SQL to ensure data consistency and proper synchronization.
3

Lowercase string values

Store addresses and other identifiers in lowercase for consistent matching: sql INSERT INTO turbo.my_table VALUES (lower('0x...'));
4

Index large tables

For custom Postgres tables, add indexes: sql CREATE INDEX idx_value ON streamling.my_table(value);
5

Monitor table growth

Periodically check table sizes and clean up old entries if needed.