> ## Documentation Index
> Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt
> Use this file to discover all available pages before exploring further.

# EVM Sources

> Build data pipelines with Ethereum, Polygon, Base, and any other supported EVM chain.

## Overview

Goldsky provides curated datasets for EVM chains, making it easy to build real-time data pipelines without managing Kafka topics or schemas. All datasets follow consistent schemas across chains, so you can write multi-chain pipelines with minimal code changes. See the [supported chains](/chains/supported-networks) page for the full list.

The [Datasource explorer](https://app.goldsky.com/data-sources) contains all the chain and dataset names. You can also see datasets with `goldsky dataset list` using the CLI.

<Note>
  All Mirror datasets work with Turbo. For complete field schemas, see the [EVM Data Schemas](/mirror/reference/schema/EVM-schemas) and [Curated Data
  Schemas](/mirror/reference/schema/curated-schemas) references.
</Note>

## Quick Start

The simplest way to get started is with curated token transfer datasets:

```yaml theme={null}
sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest
```

**Available EVM datasets:**

Datasets are namespaced by the network and type. The chain prefix is the network slug (for example `matic` for Polygon PoS, not `polygon`). See each chain's page in [supported chains](/chains/supported-networks) for the correct slug.

* `ethereum.raw_logs` for Ethereum mainnet event logs
* `matic.erc20_transfers` for Polygon ERC-20 transfers
* `base.raw_logs` for Base event logs

The following dataset types are available for most EVM chains:

* `erc20_transfers` - Fungible token transfers (curated)
* `erc721_transfers` - NFT transfers (curated)
* `erc1155_transfers` - Multi-token transfers (curated)
* `raw_blocks` - Block headers
* `raw_transactions` - Transactions including receipt fields
* `raw_logs` - Event logs
* `raw_traces` - Internal transaction traces

## Fast scan

Processing full datasets (starting from `earliest`) requires the pipeline to process a significant amount of data, which affects how quickly it reaches the chain tip. This is especially true for larger chains.

When you only need a subset of historical data, you can enable **fast scan** by adding a `filter` to your source configuration. The filter is pre-applied at the source level, making initial ingestion of historical data much faster by skipping irrelevant blocks.

When defining a `filter`, use attributes that exist in the dataset. You can get the schema of a dataset by running `goldsky dataset get <dataset_name>`.

```yaml theme={null}
sources:
  base_logs:
    type: dataset
    dataset_name: base.raw_logs
    version: 1.0.0
    start_at: earliest
    filter: address = '0x21552aeb494579c772a601f655e9b3c514fda960'
```

The filter expression follows the SQL standard for what comes after the `WHERE` clause:

```yaml theme={null}
# Filter by a single address
filter: address = '0x21552aeb494579c772a601f655e9b3c514fda960'

# Filter by multiple addresses
filter: address = '0xb794f5ea0ba39494ce839613ff2a60353dc3b38e' OR address = '0x21552aeb494579c772a601f655e9b3c514fda960'

# Combine conditions (example on matic.erc20_transfers, which has an `amount` column)
filter: address = '0x2791bca1f2de4661ed88a30c99a7a9449aa84174' AND amount > 500
```

<Info>
  Fast scan speeds up **backfills only** — when processing historical data with `start_at: earliest`. During real-time processing (when the pipeline has caught up to the chain tip), all blocks are processed regardless of the filter.
</Info>

Fast scan also works with [job mode](/turbo-pipelines/job-mode). When you set a `filter:` together with `start_at: earliest` (or omit `start_at`), the source becomes bounded and job mode can terminate it cleanly. To make the backfill finite, include an upper bound on `block_number` in the `filter:` expression:

```yaml theme={null}
sources:
  base_usdc_logs:
    type: dataset
    dataset_name: base.raw_logs
    version: 1.0.0
    start_at: earliest
    filter: address = '0x833589fcd6edb6e08f4c7c32d4f71b54bda02913' AND block_number <= 20000000
```

Deploy with `job: true` and the pipeline will self-terminate once block 20,000,000 is processed. Without a `filter:`, an EVM dataset becomes a streaming Kafka source and cannot be used with `job: true`.

<Note>
  `end_block` is **not** supported on EVM dataset sources — it is silently ignored. To bound an EVM job, put the block range in the `filter:` expression (e.g., `block_number <= 20000000` or `block_number BETWEEN 18000000 AND 20000000`). `end_block` is Solana-only.
</Note>

## Guide: Track Specific Tokens

Use dynamic tables to monitor transfers for tokens you care about:

```yaml theme={null}
name: token-tracker
resource_size: s

sources:
  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Track specific token contracts
  tracked_tokens:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: tracked_tokens
    secret_name: MY_POSTGRES

  # Filter to tracked tokens only
  filtered_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as token,
        sender,
        recipient,
        amount,
        block_timestamp,
        transaction_hash,
        _gs_op
      FROM polygon_transfers
      WHERE dynamic_table_check('tracked_tokens', address)

sinks:
  postgres_output:
    type: postgres
    from: filtered_transfers
    schema: public
    table: token_transfers
    secret_name: MY_POSTGRES
    primary_key: id
```

**Add tokens to track:**

Dynamic tables default to the `streamling` schema with a `value` column. See [Dynamic Tables](/turbo-pipelines/transforms/dynamic-tables) for how to customize this.

```sql theme={null}
-- USDC on Polygon
INSERT INTO streamling.tracked_tokens (value) VALUES
  (lower('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'));

-- USDT on Polygon
INSERT INTO streamling.tracked_tokens (value) VALUES
  (lower('0xc2132D05D31c914a87C6611C10748AEb04B58e8F'));
```

## Guide: Decode Custom Contract Events

Use `_gs_log_decode` to decode events from any contract with raw logs:

```yaml theme={null}
name: custom-event-tracker
resource_size: m

sources:
  base_logs:
    type: dataset
    dataset_name: base.raw_logs
    version: 1.0.0
    start_at: latest

transforms:
  # Decode events using contract ABI
  decoded_events:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address,
        _gs_log_decode(
          _gs_fetch_abi('https://gist.githubusercontent.com/JavierTrujilloG/0fe5b2a9fc33b2ecce2c3d9490fce638/raw/14483e1d174e28361753bdd2a4eab03fab9ca3d4/friendtech.json', 'raw'),
          topics,
          data
        ) as decoded,
        block_number,
        block_timestamp,
        transaction_hash,
        _gs_op
      FROM base_logs
      WHERE address = lower('0xcf205808ed36593aa40a44f10c7f7c2f67d4a4d4')

  # Extract decoded parameters
  clean_events:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address,
        decoded.event_signature AS `event_signature`,
        decoded.event_params[1] as param1,
        decoded.event_params[2] as param2,
        decoded.event_params[3] as param3,
        block_number,
        block_timestamp,
        transaction_hash,
        _gs_op
      FROM decoded_events
      WHERE decoded IS NOT NULL

sinks:
  postgres_events:
    type: postgres
    from: clean_events
    schema: public
    table: decoded_events
    secret_name: MY_POSTGRES
    primary_key: id
```

**Decoding functions:**

* `_gs_log_decode(abi, topics, data)` - Decode event logs
* `_gs_fetch_abi(url, 'etherscan')` - Fetch ABI from Etherscan
* `_gs_fetch_abi(url, 'raw')` - Fetch raw JSON ABI from URL

**Accessing decoded parameters:**

```sql theme={null}
decoded.event_signature  -- Event name (e.g., "Transfer")
decoded.event_params[1]  -- First parameter
decoded.event_params[2]  -- Second parameter
```

## Guide: Multi-Chain Monitoring

Track the same event across multiple chains:

```yaml theme={null}
name: multi-chain-transfers
resource_size: m

sources:
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

  polygon_transfers:
    type: dataset
    dataset_name: matic.erc20_transfers
    version: 1.2.0
    start_at: latest

  base_transfers:
    type: dataset
    dataset_name: base.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Combine all chains
  all_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        'ethereum' as chain,
        1 as chain_id,
        address as token,
        sender,
        recipient,
        amount,
        block_timestamp,
        _gs_op
      FROM ethereum_transfers

      UNION ALL

      SELECT
        id,
        'polygon' as chain,
        137 as chain_id,
        address as token,
        sender,
        recipient,
        amount,
        block_timestamp,
        _gs_op
      FROM polygon_transfers

      UNION ALL

      SELECT
        id,
        'base' as chain,
        8453 as chain_id,
        address as token,
        sender,
        recipient,
        amount,
        block_timestamp,
        _gs_op
      FROM base_transfers

sinks:
  clickhouse_all_transfers:
    type: clickhouse
    from: all_transfers
    table: multi_chain_transfers
    primary_key: id
    secret_name: MY_CLICKHOUSE
```

## Guide: High-Value Transfer Alerts

Send real-time alerts for large transfers:

```yaml theme={null}
name: whale-alerts
resource_size: s

sources:
  ethereum_transfers:
    type: dataset
    dataset_name: ethereum.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Filter to high-value transfers (>$1M assuming 18 decimals)
  high_value:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as token,
        sender,
        recipient,
        amount,
        block_timestamp,
        transaction_hash,
        _gs_op
      FROM ethereum_transfers
      WHERE amount > 1000000000000000000000000

sinks:
  webhook_alert:
    type: webhook
    from: high_value
    url: https://alerts.example.com/whale-transfer
    one_row_per_request: true
```

## Guide: UniswapV3 Swap Detection with Log Decoding

Track UniswapV3-like swaps on Base on specific pools of interest.

```yaml theme={null}
name: uniswapv3-swaps-base
resource_size: m

sources:
  base_logs:
    type: dataset
    dataset_name: base.raw_logs
    version: 1.0.0
    start_at: latest

transforms:
  # Track UniswapV3 pool addresses dynamically
  uniswap_pools:
    type: dynamic_table
    backend_type: Postgres
    backend_entity_name: uniswap_v3_pools
    secret_name: MY_POSTGRES

  # Filter to Swap events from tracked pools
  swap_logs:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        address as pool_address,
        topics,
        data,
        block_number,
        block_timestamp,
        transaction_hash,
        log_index,
        _gs_op
      FROM base_logs
      WHERE
        -- Swap event signature: Swap(address,address,int256,int256,uint160,uint128,int24)
        SPLIT_INDEX(topics, ',', 0) = lower('0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67')
        AND dynamic_table_check('uniswap_pools', address)

  # Decode the Swap event parameters
  decoded_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        pool_address,
        _gs_log_decode(
          '[{
            "anonymous": false,
            "inputs": [
              {"indexed": true, "name": "sender", "type": "address"},
              {"indexed": true, "name": "recipient", "type": "address"},
              {"indexed": false, "name": "amount0", "type": "int256"},
              {"indexed": false, "name": "amount1", "type": "int256"},
              {"indexed": false, "name": "sqrtPriceX96", "type": "uint160"},
              {"indexed": false, "name": "liquidity", "type": "uint128"},
              {"indexed": false, "name": "tick", "type": "int24"}
            ],
            "name": "Swap",
            "type": "event"
          }]',
          topics,
          data
        ) as decoded,
        block_number,
        block_timestamp,
        transaction_hash,
        log_index,
        _gs_op
      FROM swap_logs

  # Extract and clean decoded swap data
  clean_swaps:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        pool_address,
        decoded.event_params[1] as sender,
        decoded.event_params[2] as recipient,
        CAST(decoded.event_params[3] AS DECIMAL(38, 0)) as amount0,
        CAST(decoded.event_params[4] AS DECIMAL(38, 0)) as amount1,
        CAST(decoded.event_params[5] AS DECIMAL(38, 0)) as sqrt_price_x96,
        CAST(decoded.event_params[6] AS DECIMAL(38, 0)) as liquidity,
        CAST(decoded.event_params[7] AS INT) as tick,
        block_number,
        block_timestamp,
        transaction_hash,
        log_index,
        _gs_op
      FROM decoded_swaps
      WHERE decoded IS NOT NULL

sinks:
  postgres_swaps:
    type: postgres
    from: clean_swaps
    schema: public
    table: uniswap_v3_swaps
    secret_name: MY_POSTGRES
    primary_key: id
```

**Populate pools from UniswapV3 Factory:**

Instead of manually adding pool addresses, you can automatically discover pools from factory PoolCreated events:

```yaml theme={null}
# Add this transform before swap_logs in the pipeline above
factory_pool_created:
  type: sql
  primary_key: id
  sql: |
    SELECT
      id,
      _gs_log_decode(
        '[{
          "anonymous": false,
          "inputs": [
            {"indexed": true, "name": "token0", "type": "address"},
            {"indexed": true, "name": "token1", "type": "address"},
            {"indexed": true, "name": "fee", "type": "uint24"},
            {"indexed": false, "name": "tickSpacing", "type": "int24"},
            {"indexed": false, "name": "pool", "type": "address"}
          ],
          "name": "PoolCreated",
          "type": "event"
        }]',
        topics,
        data
      ) as decoded,
      block_number,
      block_timestamp,
      _gs_op
    FROM base_logs
    WHERE
      -- UniswapV3 Factory address on Base
      address = lower('0x33128a8fC17869897dcE68Ed026d694621f6FDfD')
      -- PoolCreated event signature
      AND SPLIT_INDEX(topics, ',', 0) = lower('0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118')

pool_addresses:
  type: sql
  primary_key: pool_address
  sql: |
    SELECT
      lower(decoded.event_params[5]) as pool_address,
      lower(decoded.event_params[1]) as token0,
      lower(decoded.event_params[2]) as token1,
      CAST(decoded.event_params[3] AS INT) as fee,
      block_number,
      block_timestamp,
      _gs_op
    FROM factory_pool_created
    WHERE decoded IS NOT NULL

# Add sink to populate the dynamic table
populate_pools:
  type: postgres
  from: pool_addresses
  schema: streamling
  table: uniswap_v3_pools
  secret_name: MY_POSTGRES
  primary_key: pool_address
```

**Or manually add specific pools:**

```sql theme={null}
-- WETH/USDC 0.05% pool on Base
INSERT INTO streamling.uniswap_v3_pools (value) VALUES
  (lower('0xd0b53D9277642d899DF5C87A3966A349A798F224'));

-- WETH/USDbC 0.05% pool on Base
INSERT INTO streamling.uniswap_v3_pools (value) VALUES
  (lower('0x4C36388bE6F416A29C8d8Eee81C771cE6bE14B18'));
```

**Filter to specific tokens:**

```sql theme={null}
-- Only track pools containing WETH
INSERT INTO streamling.uniswap_v3_pools (value)
SELECT pool_address
FROM pool_addresses_table
WHERE token0 = lower('0x4200000000000000000000000000000000000006')
   OR token1 = lower('0x4200000000000000000000000000000000000006');
```

**Key points:**

* Uses the `base.raw_logs` dataset for full event coverage and custom decoding
* Swap event signature (`0xc4207...`) filters for UniswapV3 Swap events
* `_gs_log_decode` decodes the raw log using inline ABI
* Dynamic table pattern allows adding/removing pools without redeploying
* Extracts all swap parameters: amounts, price, liquidity, and tick
* Factory pattern automatically discovers new pools as they're created
