Mirror gives you the flexibility to ELT (extract > load > transform) your data, or ETL it. In other guides we focus on the ELT use-case; here we’ll walk through an ETL example considering Uniswap swaps.

What you’ll need

  1. A high-level understanding of Uniswap’s contract design, namely the factory > pool process.

  2. A destination to write your data to; for this use case specifically we recommend a Postgres sink.

Walkthrough

1

Write logic to collect Uniswap pool contract addresses

In order to find all Uniswap swap events across pools, we need to know all Uniswap pool contract addresses.

Here, we’re using Base as an example, and can find the relevant UniswapV3Factory deployment address from Uniswap’s docs.

We can use Mirror to watch the factory for PoolCreated() events against the factory address; the SQL logic to do this using Goldsky’s decoded event logs schema is below.

SELECT
    lower(event_params[5]) AS contract_address
FROM
    base.decoded_logs
WHERE
    lower(address) = lower('0x33128a8fC17869897dcE68Ed026d694621f6FDfD')
    AND event_signature = 'PoolCreated(address,address,uint24,int24,address)'
2

Write logic to collect trades

Next, we need to define the logic to filter the decoded logs database for Uniswap Swap() events, and we can use the logic written above as a subquery to filter only for Uniswap pool events (so that we filter out other contracts that emit similar Swap() events from our stream).

SELECT
  id,
  raw_log.block_number AS block_num,
  raw_log.block_timestamp AS block_time,
  transaction_hash AS tx_hash,
  'base' as chain,
  address,
  event_params[1] AS sender,
  event_params[2] AS receiver,
  event_params[3] AS amount0,
  event_params[4] AS amount1,
  event_params[5] AS price,
  event_params[6] AS liquidity,
  event_params[7] AS tick
FROM
  base.decoded_logs
WHERE
  event_signature = 'Swap(address,address,int256,int256,uint160,uint128,int24)'
  AND lower(address) IN (
    SELECT
      lower(event_params[5]) AS contract_address
    FROM
      base.decoded_logs
    WHERE
      lower(address) = lower('0x33128a8fC17869897dcE68Ed026d694621f6FDfD')
      AND event_signature = 'PoolCreated(address,address,uint24,int24,address)'
  );
3

Write full pipeline configuration

Now that we have the core logic for our streaming transformation written, we can combine it into a Mirror pipeline configuration file. As a refresher, the high-level outline for a pipeline configuration is as follows.

overview.yaml
sources:
transforms:
sinks:

We’ll one-line the SQL from the previous step, and write out a full configuration file.

config.yaml
sources:
  - referenceName: base.decoded_logs
    version: 1.0.0
    type: dataset
    startAt: earliest
transforms:
  - sql: |
      SELECT
        id,
        raw_log.block_number AS block_num,
        raw_log.block_timestamp AS block_time,
        transaction_hash AS tx_hash,
        'base' as chain,
        address,
        event_params[1] AS sender,
        event_params[2] AS receiver,
        event_params[3] AS amount0,
        event_params[4] AS amount1,
        event_params[5] AS price,
        event_params[6] AS liquidity,
        event_params[7] AS tick
      FROM
        base.decoded_logs
      WHERE
        event_signature = 'Swap(address,address,int256,int256,uint160,uint128,int24)'
        AND lower(address) IN (
          SELECT lower(event_params[5]) AS contract_address
          FROM base.decoded_logs
          WHERE lower(address) = lower('0x33128a8fC17869897dcE68Ed026d694621f6FDfD')
          AND event_signature = 'PoolCreated(address,address,uint24,int24,address)'
        );
    referenceName: univ3swaps
    type: sql
    primaryKey: id
sinks:
  - type: postgres
    table: base_univ3_swaps
    schema: public
    secretName: <YOUR_SECRET>
    referenceName: ethereum_1155_sink
    sourceStreamName: univ3swaps
4

Deploy pipeline

Once we have our pipeline.yaml configuration file, we can deploy from CLI with a single line of code:

goldsky pipeline create <pipeline-name> --definition-path config.yaml

After a minute or so, the Mirror pipeline will start walking through the decoded logs. You can monitor the progress of the pipeline by monitoring the max(block) in the database against a block explorer.

You can speed up the backfill by upscaling the resource size (and then scale back down to an S worker at edge). An M pipeline caught up to edge (~600K swaps as of October 2023) in approximately 45 minutes.

5

Create stream for other chains (or one multi-chain stream)

Once you’ve iterated on a single-chain use case (with the exact schema, sink indexes, etc that you need), you can update your pipeline configuration to write multiple chains’ worth of Uniswap data in one. You’d simply add additional sources/transforms/sinks in your config, replacing the referenceName and deployment addresses for each chain.

Can't find what you're looking for? Reach out to us at support@goldsky.com for help.