Skip to main content
Prerequisites: Before starting, install the Turbo CLI and authenticate with goldsky login.

Overview

Goldsky provides curated datasets for all major EVM chains, making it easy to build real-time data pipelines without managing Kafka topics or schemas. All datasets follow consistent schemas across chains, so you can write multi-chain pipelines with minimal code changes.
For complete field schemas, see the EVM Data Schemas and Curated Data Schemas references.

Quick Start

The simplest way to get started is with curated token transfer datasets:
sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest
Available EVM datasets:
  • erc20_transfers - Fungible token transfers
  • erc721_transfers - NFT transfers
  • erc1155_transfers - Multi-token transfers
  • blocks - Block data
  • transactions - Transaction data
  • logs - Raw event logs
  • traces - Internal transaction traces

Guide: Track Specific Tokens

Use dynamic tables to monitor transfers for tokens you care about:
name: token-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Track specific token contracts
  tracked_tokens:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_tokens
    secret_name: MY_POSTGRES

  # Filter to tracked tokens only
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as token,
        sender,
        recipient,
        amount,
        block_timestamp,
        transaction_hash,
        _gs_op
      FROM polygon_transfers
      WHERE dynamic_table_check('tracked_tokens', address)

sinks:
  postgres_output:
    type: postgres
    from: filtered_transfers
    schema: public
    table: token_transfers
    secret_name: MY_POSTGRES
    primary_key: id
Add tokens to track:
-- USDC on Polygon
INSERT INTO turbo.tracked_tokens VALUES
  (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'));

-- USDT on Polygon
INSERT INTO turbo.tracked_tokens VALUES
  (lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F'));

Guide: Decode Custom Contract Events

Use _gs_log_decode to decode events from any contract with raw logs:
name: custom-event-tracker
resource_size: m

sources:
  base_logs:
    type: dataset
    dataset_name: base.raw_logs
    version: 1.0.0
    start_at: latest

transforms:
  # Decode events using contract ABI
  decoded_events:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address,
        _gs_log_decode(
          _gs_fetch_abi('https://gist.githubusercontent.com/JavierTrujilloG/0fe5b2a9fc33b2ecce2c3d9490fce638/raw/14483e1d174e28361753bdd2a4eab03fab9ca3d4/friendtech.json', 'raw'),
          topics,
          data
        ) as decoded,
        block_number,
        block_timestamp,
        transaction_hash,
        _gs_op
      FROM base_logs
      WHERE address = lower('0xcf205808ed36593aa40a44f10c7f7c2f67d4a4d4')

  # Extract decoded parameters
  clean_events:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address,
        decoded.event_signature AS `event_signature`,
        decoded.event_params[1] as param1,
        decoded.event_params[2] as param2,
        decoded.event_params[3] as param3,
        block_number,
        block_timestamp,
        transaction_hash,
        _gs_op
      FROM decoded_events
      WHERE decoded IS NOT NULL

sinks:
  postgres_events:
    type: postgres
    from: clean_events
    schema: public
    table: decoded_events
    secret_name: MY_POSTGRES
    primary_key: id
Decoding functions:
  • _gs_log_decode(abi, topics, data) - Decode event logs
  • _gs_fetch_abi(url, 'etherscan') - Fetch ABI from Etherscan
  • _gs_fetch_abi(url, 'raw') - Fetch raw JSON ABI from URL
Accessing decoded parameters:
decoded.event_signature  -- Event name (e.g., "Transfer")
decoded.event_params[1]  -- First parameter
decoded.event_params[2]  -- Second parameter

Guide: Multi-Chain Monitoring

Track the same event across multiple chains:
name: multi-chain-transfers
resource_size: m

sources:
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

  base_transfers:
    type: dataset
    dataset_name: base.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Combine all chains
  all_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        'ethereum' as chain,
        1 as chain_id,
        address as token,
        sender,
        recipient,
        amount,
        block_timestamp,
        _gs_op
      FROM ethereum_transfers

      UNION ALL

      SELECT
        id,
        'polygon' as chain,
        137 as chain_id,
        address as token,
        sender,
        recipient,
        amount,
        block_timestamp,
        _gs_op
      FROM polygon_transfers

      UNION ALL

      SELECT
        id,
        'base' as chain,
        8453 as chain_id,
        address as token,
        sender,
        recipient,
        amount,
        block_timestamp,
        _gs_op
      FROM base_transfers

sinks:
  clickhouse_all_transfers:
    type: clickhouse
    from: all_transfers
    table: multi_chain_transfers
    primary_key: id
    secret_name: MY_CLICKHOUSE

Guide: High-Value Transfer Alerts

Send real-time alerts for large transfers:
name: whale-alerts
resource_size: s

sources:
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Filter to high-value transfers (>$1M assuming 18 decimals)
  high_value:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as token,
        sender,
        recipient,
        amount,
        block_timestamp,
        transaction_hash,
        _gs_op
      FROM ethereum_transfers
      WHERE amount > 1000000000000000000000000

sinks:
  webhook_alert:
    type: webhook
    from: high_value
    url: https://alerts.example.com/whale-transfer
    one_row_per_request: true

Guide: UniswapV3 Swap Detection with Log Decoding

Track UniswapV3-like swaps on Base on specific pools of interest.
name: uniswapv3-swaps-base
resource_size: m

sources:
  base_logs:
    type: dataset
    dataset_name: base.raw_logs
    version: 1.0.0
    start_at: latest

transforms:
  # Track UniswapV3 pool addresses dynamically
  uniswap_pools:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: uniswap_v3_pools
    secret_name: MY_POSTGRES

  # Filter to Swap events from tracked pools
  swap_logs:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as pool_address,
        topics,
        data,
        block_number,
        block_timestamp,
        transaction_hash,
        log_index,
        _gs_op
      FROM base_logs
      WHERE
        -- Swap event signature: Swap(address,address,int256,int256,uint160,uint128,int24)
        topics[1] = lower('0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67')
        AND dynamic_table_check('uniswap_pools', address)

  # Decode the Swap event parameters
  decoded_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        pool_address,
        _gs_log_decode(
          '[{
            "anonymous": false,
            "inputs": [
              {"indexed": true, "name": "sender", "type": "address"},
              {"indexed": true, "name": "recipient", "type": "address"},
              {"indexed": false, "name": "amount0", "type": "int256"},
              {"indexed": false, "name": "amount1", "type": "int256"},
              {"indexed": false, "name": "sqrtPriceX96", "type": "uint160"},
              {"indexed": false, "name": "liquidity", "type": "uint128"},
              {"indexed": false, "name": "tick", "type": "int24"}
            ],
            "name": "Swap",
            "type": "event"
          }]',
          topics,
          data
        ) as decoded,
        block_number,
        block_timestamp,
        transaction_hash,
        log_index,
        _gs_op
      FROM swap_logs

  # Extract and clean decoded swap data
  clean_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        pool_address,
        decoded.event_params[1] as sender,
        decoded.event_params[2] as recipient,
        CAST(decoded.event_params[3] AS DECIMAL(38, 0)) as amount0,
        CAST(decoded.event_params[4] AS DECIMAL(38, 0)) as amount1,
        CAST(decoded.event_params[5] AS DECIMAL(38, 0)) as sqrt_price_x96,
        CAST(decoded.event_params[6] AS DECIMAL(38, 0)) as liquidity,
        CAST(decoded.event_params[7] AS INT) as tick,
        block_number,
        block_timestamp,
        transaction_hash,
        log_index,
        _gs_op
      FROM decoded_swaps
      WHERE decoded IS NOT NULL

sinks:
  postgres_swaps:
    type: postgres
    from: clean_swaps
    schema: public
    table: uniswap_v3_swaps
    secret_name: MY_POSTGRES
    primary_key: id
Populate pools from UniswapV3 Factory: Instead of manually adding pool addresses, you can automatically discover pools from factory PoolCreated events:
# Add this transform before swap_logs in the pipeline above
factory_pool_created:
  type: sql
  primary_key: id
  sql: |
    SELECT
      id,
      _gs_log_decode(
        '[{
          "anonymous": false,
          "inputs": [
            {"indexed": true, "name": "token0", "type": "address"},
            {"indexed": true, "name": "token1", "type": "address"},
            {"indexed": true, "name": "fee", "type": "uint24"},
            {"indexed": false, "name": "tickSpacing", "type": "int24"},
            {"indexed": false, "name": "pool", "type": "address"}
          ],
          "name": "PoolCreated",
          "type": "event"
        }]',
        topics,
        data
      ) as decoded,
      block_number,
      block_timestamp,
      _gs_op
    FROM base_logs
    WHERE
      -- UniswapV3 Factory address on Base
      address = lower('0x33128a8fC17869897dcE68Ed026d694621f6FDfD')
      -- PoolCreated event signature
      AND topics[1] = lower('0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118')

pool_addresses:
  type: sql
  primary_key: pool_address
  sql: |
    SELECT
      lower(decoded.event_params[5].value) as pool_address,
      lower(decoded.event_params[1].value) as token0,
      lower(decoded.event_params[2].value) as token1,
      CAST(decoded.event_params[3].value AS INT) as fee,
      block_number,
      block_timestamp,
      _gs_op
    FROM factory_pool_created
    WHERE decoded IS NOT NULL

# Add sink to populate the dynamic table
populate_pools:
  type: postgres
  from: pool_addresses
  schema: turbo
  table: uniswap_v3_pools
  secret_name: MY_POSTGRES
  primary_key: pool_address
Or manually add specific pools:
-- WETH/USDC 0.05% pool on Base
INSERT INTO turbo.uniswap_v3_pools (pool_address) VALUES
  (lower('0xd0b53D9277642d899DF5C87A3966A349A798F224'));

-- WETH/USDbC 0.05% pool on Base
INSERT INTO turbo.uniswap_v3_pools (pool_address) VALUES
  (lower('0x4C36388bE6F416A29C8d8Eee81C771cE6bE14B18'));
Filter to specific tokens:
-- Only track pools containing WETH
INSERT INTO turbo.uniswap_v3_pools (pool_address)
SELECT pool_address
FROM pool_addresses_table
WHERE token0 = lower('0x4200000000000000000000000000000000000006')
   OR token1 = lower('0x4200000000000000000000000000000000000006');
Key points:
  • Uses base.logs dataset instead of base.decoded_logs for more control
  • Swap event signature (0xc4207...) filters for UniswapV3 Swap events
  • _gs_log_decode decodes the raw log using inline ABI
  • Dynamic table pattern allows adding/removing pools without redeploying
  • Extracts all swap parameters: amounts, price, liquidity, and tick
  • Factory pattern automatically discovers new pools as they’re created

Next Steps