Skip to main content

Overview

In this quickstart, you’ll create a simple Turbo pipeline that:
  1. Reads ERC-20 transfer data from a Goldsky dataset
  2. Writes the results to a blackhole sink
  3. Inspect the data live
  4. [Optional] Write the data into a PostgreSQL Database
This replaces the need for complex Mirror v1 configurations with a simple, declarative YAML file.

Prerequisites

Step 1: Create Your Pipeline

Create a file named erc20-pipeline.yaml:
erc20-pipeline.yaml
name: erc20-transfers
resource_size: s

sources:
  base_erc20_transfers:
    type: dataset
    dataset_name: base.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms: {}

sinks:
  blackhole_sink:
    type: blackhole
    from: base_erc20_transfers
Sources:
  • We’re using the base.erc20_transfers dataset (version 1.2.0)
  • start_at: latest means we’ll only process new transfers going forward
Sinks:
  • The blackhole sink discards the data but allows you to test the pipeline
  • Perfect for development and testing before adding real outputs

Step 2: Deploy Your Pipeline

Apply your pipeline configuration:
goldsky turbo apply erc20-pipeline.yaml
You should see output confirming the deployment:
✓ Pipeline erc20-transfers created

Step 3: Inspect the Data Live

Now that the pipeline is running, you can replace erc20-pipeline.yaml with the name of the pipeline erc20-transfers if desired
View the live data inspect to see data flowing through your pipeline:
goldsky turbo inspect erc20-pipeline.yaml
Use the logs command to the running output of the pipeline:
goldsky turbo logs erc20-pipeline.yaml
You should see output showing the pipeline processing ERC-20 transfers in real-time.

Step 4: Add a Filter for USDC Only

Now let’s add a SQL transform to filter only USDC transfers. Update your erc20-pipeline.yaml:
erc20-pipeline.yaml
name: erc20-transfers
resource_size: s

sources:
  base_erc20_transfers:
    type: dataset
    dataset_name: base.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Filter to only USDC transfers
  usdc_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        sender,
        recipient,
        amount,
        to_timestamp(block_timestamp) as block_time
      FROM base_erc20_transfers
      WHERE address = lower('0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913')

sinks:
  blackhole_sink:
    type: blackhole
    from: usdc_transfers
Transforms:
  • Added a SQL transform named usdc_transfers that filters for the USDC contract on Base
  • The contract address 0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913 is USDC on Base
  • We use lower() to ensure consistent case-insensitive matching
  • Selected only essential columns: sender, recipient, amount, and block_time
  • Converted the Unix timestamp to a human-readable format using to_timestamp()
Sinks:
  • Updated the sink to read from usdc_transfers instead of the raw source
  • Now only USDC transfers will flow through the pipeline
You can achieve the same filtering using a TypeScript transform instead of SQL. Return null to filter out records:
name: erc20-transfer
resource_size: s

sources:
base_erc20_transfers:
  type: dataset
  dataset_name: base.erc20_transfers
  version: 1.2.0
  start_at: latest

transforms:
usdc_transfers:
  type: script
  primary_key: id
  language: typescript
  from: base_erc20_transfers
  schema:
    id: string
    sender: string
    recipient: string
    amount: string
    block_time: string
  script: |
    function transform(input) {
      const USDC_ADDRESS = '0x833589fcd6edb6e08f4c7c32d4f71b54bda02913';

      // Return null to filter out non-USDC transfers
      if (!input?.address || input.address.toLowerCase() !== USDC_ADDRESS) {
        return null;
      }

      // Return a custom schema with only the fields we need
      return {
        id: input.id,
        sender: input.sender,
        recipient: input.recipient,
        amount: input.amount,
        block_time: new Date(input.block_timestamp * 1000).toISOString()
      };
    }
sinks:
blackhole_sink:
  type: blackhole
  from: usdc_transfers
Key features:
  • Return null to filter: Records that don’t match your criteria can be filtered out by returning null
  • Custom output schema: Use the schema field to define a different output schema than the input. This lets you reshape data, rename fields, or include only specific columns
  • Flexible transformations: Perform calculations, string manipulation, and conditional logic
TypeScript Benefits:
  • More flexible data transformations and complex logic
  • Familiar syntax for developers
  • Type safety and autocompletion support
  • Can perform calculations, string manipulation, and conditional logic
SQL Benefits:
  • Generally faster for simple filtering and aggregations
  • More concise for straightforward queries
  • Better for set-based operations
Choose TypeScript when you need complex transformations or custom logic. Choose SQL for simple filters and aggregations.

Redeploy and Inspect

Apply the updated configuration:
goldsky turbo apply erc20-pipeline.yaml
Now inspect output of your new usdc_transfers.
# the -n argument allows you to target one or more sources or transforms
goldsky turbo inspect erc20-pipeline.yaml -n usdc_transfers
You should now see only USDC transfers! Compare this to the original source data:
# View the filtered USDC data
goldsky turbo inspect erc20-pipeline.yaml -n usdc_transfers

# View all ERC-20 transfers (before filtering)
goldsky turbo inspect erc20-pipeline.yaml -n base_erc20_transfers

# View multiple stages at once
goldsky turbo inspect erc20-pipeline.yaml -n base_erc20_transfers,usdc_transfers
The --topology_node_keys parameter lets you inspect data at any point in your pipeline. Use transform names to see filtered/transformed data, or source names to see raw data. You can specify multiple keys separated by commas to view multiple stages simultaneously.

Optional: Write to PostgreSQL

To persist your data to a PostgreSQL database, update your pipeline configuration:

1. Create a Secret

Store your PostgreSQL credentials:
goldsky secret create MY_POSTGRES_SECRET
When prompted, enter your PostgreSQL connection string:
postgres://username:password@host:port/database

2. Update Your Pipeline

Modify erc20-pipeline.yaml to add a PostgreSQL sink:
erc20-pipeline.yaml
name: erc20-transfers
resource_size: s

sources:
  base_erc20_transfers:
    type: dataset
    dataset_name: base.erc20_transfers
    version: 1.2.0
    start_at: latest

transforms:
  # Filter to only USDC transfers
  usdc_transfers:
    type: sql
    primary_key: id
    sql: |
      SELECT
        id,
        sender,
        recipient,
        amount,
        to_timestamp(block_timestamp) as block_time
      FROM base_erc20_transfers
      WHERE address = lower('0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913')

sinks:
  postgres_sink:
    type: postgres
    schema: public
    table: usdc_transfers
    secret_name: MY_POSTGRES_SECRET
    from: usdc_transfers
    primary_key: id

3. Redeploy

Apply the updated configuration:
goldsky turbo apply erc20-pipeline.yaml

4. Query Your Data

Connect to PostgreSQL and query the USDC transfers:
SELECT
  sender,
  recipient,
  amount,
  block_time
FROM public.usdc_transfers
ORDER BY block_time DESC
LIMIT 10;