Welcome to our guide on using Mirror to create pipelines for decoding and streaming data from decentralized exchanges (DEX) into your datawarehouse. Mirror is a powerful tool designed to facilitate the seamless integration of blockchain data into different sinks, enabling you to leverage the vast amounts of information generated by DEXs for analytics, reporting, and more.

What you’ll need

  1. A Goldky account and the CLI installed
  1. A basic understanding of the Mirror product
  2. A destination sink to write your data to. In this example, we will use the PostgreSQL Sink

Introduction

Most of the Decentralized Exchanges these days are based entirely on the Uniswap protocol or have strong similarities with it.

If you need a high level overview of how Uniswap works you can check out this reference page

Having that in mind, we can narrow our focus on identifying events emitted by Uniswap contracts and use them to identify similar events emitted by all DEXes in the blockchain. There are a number of different events we could track. In this guide we will track the Swap and PoolCreated events as they are arguably two of the most important events to track when wanting to make sense of trading activity in a DEX.

For this example implementation, we will choose the Raw Log Direct Indexing for the Base chain as the source of our pipeline but you could choose any other chain for which the Raw Log dataset if that is preferred. Raw logs need to be decoded for us to be able to identify the events we want to track. For that purpose, we will use Decoding Transform Functions to dinamically fetch the ABIs of both UniswapV3Factory and UniswapV3Pool contracts from the Basescan API since they contain the actual definition for the PoolCreated and Swap events.

It’s worth mentioning that Uniswap has different versions and it’s possible that some event definitions might differ. In this example we’ll focus on UniswapV3. Depending on the events you are interested in tracking you might want to refine this example accordingly but the principles explained will stay the same.

Let’s now see all these concepts applied in an example pipeline definition:

Pipeline Definition

base-dex-trades.yaml
sources:
  - type: dataset
    referenceName: base.logs
    version: 1.0.0
    filter: topics like '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118%' OR topics like '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67%'
transforms:
  - referenceName: factory_decoded
    type: sql
    primaryKey: id
    # Fetch the ABI of UniswapV3Factory in Base and use it to decode PoolCreated events
    sql: >
      SELECT 
        `id`,
        _gs_log_decode(
            _gs_fetch_abi('https://api.basescan.org/api?module=contract&action=getabi&address=0x33128a8fC17869897dcE68Ed026d694621f6FDfD', 'etherscan'), 
            `topics`, 
            `data`
        ) AS `decoded`, 
        block_number, 
        transaction_hash 
      FROM base.logs  
  - referenceName: pool_decoded
    type: sql
    primaryKey: id
    # Fetch the ABI of a UniswapV3Pool in Base and use it to decode Swap events
    sql: >
      SELECT 
        `id`,
        _gs_log_decode(
            _gs_fetch_abi('https://api.basescan.org/api?module=contract&action=getabi&address=0x614d2ff8b2591e648f67d6f8c4a1fc1502add5f5', 'etherscan'), 
            `topics`, 
            `data`
        ) AS `decoded`, 
        block_number, 
        transaction_hash 
      FROM base.logs
  - referenceName: factory_clean
    primaryKey: id
    type: sql
    # Clean up the previous transform, unnest the values from the `decoded` object to get PoolCreated event data
    sql: >
      SELECT 
        `id`, 
        decoded.event_params AS `event_params`, 
        decoded.event_signature AS `event_signature`,
        block_number,
        transaction_hash
        FROM factory_decoded 
        WHERE decoded IS NOT NULL
        AND decoded.event_signature = 'PoolCreated'
  - referenceName: pool_clean
    primaryKey: id
    type: sql
    # Clean up the previous transform, unnest the values from the `decoded` object to get Swap event data
    sql: >
      SELECT 
        `id`, 
        decoded.event_params AS `event_params`, 
        decoded.event_signature AS `event_signature`,
        block_number,
        transaction_hash
        FROM pool_decoded 
        WHERE decoded IS NOT NULL
        AND decoded.event_signature = 'Swap'
sinks:
  - referenceName: poolcreated_events_sink
    secretName: <YOUR_SECRET>
    type: postgres
    sourceStreamName: factory_clean
    schema: decoded_events
    table: poolcreated
  - referenceName: swaps_event_sink
    secretName: <YOUR_SECRET>
    type: postgres
    sourceStreamName: pool_clean
    schema: decoded_events
    table: swaps

If you copy and use this configuration file, make sure to update:

  1. Your secretName. If you already created a secret, you can find it via the CLI command goldsky secret list.
  2. The schema and table you want the data written to, by default it writes to decoded_events schema.

Let’s deconstruct this pipeline starting at the top:

Quick Ingestion Source

source
sources:
  - type: dataset
    referenceName: base.logs
    version: 1.0.0
    filter: topics like '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118%' OR topics like '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67%'

We start the pipeline defining a filter on the data source. This is a new feature called Quick Ingestion by which we are able to filter on the original dataset at a much faster rate than if we were doing normal filtering with transforms. Not all datasets currently have this feature but base_logs does and so we want to make use of it for this example dramatically. We add as source filters the function signatures of the PoolCreated and Swap events:

  • PoolCreated (index_topic_1 address token0, index_topic_2 address token1, index_topic_3 uint24 fee, int24 tickSpacing, address pool) maps to 0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118
  • Swap (index_topic_1 address sender, index_topic_2 address recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick) maps to 0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67

It’s worth highlighting that we add a % to the end of each filter as topics contains more than just the function signature.

If the dataset we are using has the option for Quick Ingestion this filter will be pre-applied and it will speed up ingestion dramatically. Because not all datasets have this option enabled yet we’ll add some redundancy on the transform by adding extra filters on these events to make sure that we are targeting these two events regardless of this feature being available or not.

Next, there are 4 transforms in this pipeline definition which we’ll explain how they work. We’ll start from the top:

Decoding Transforms

Transform: factory_decoded
SELECT 
    `id`,
    _gs_log_decode(
        _gs_fetch_abi('https://api.basescan.org/api?module=contract&action=getabi&address=0x33128a8fC17869897dcE68Ed026d694621f6FDfD', 'etherscan'), 
        `topics`, 
        `data`
    ) AS `decoded`, 
    block_number, 
    transaction_hash 
    FROM base.raw_logs 
Transform: pool_decoded
SELECT 
    `id`,
    _gs_log_decode(
        _gs_fetch_abi('https://api.basescan.org/api?module=contract&action=getabi&address=0x614d2ff8b2591e648f67d6f8c4a1fc1502add5f5', 'etherscan'), 
        `topics`, 
        `data`
    ) AS `decoded`, 
    block_number, 
    transaction_hash 
    FROM base.raw_logs

The first two transforms fetch the ABI for UniswapV3Factory and a UniswapV3Pool to allows to decode Dex events and filter by PoolCreated and Swap events in the following transforms. As explained in the Decoding Contract Events guide, we first make use of the _gs_fetch_abi function to get the ABIs from Basescan and pass it as first argument to the function _gs_log_decode to decode its topics and data. We store the result in a decoded ROW which we unnest on the next transform.

Event Filtering Transforms

Transform: factory_clean
SELECT 
    `id`, 
    decoded.event_params AS `event_params`, 
    decoded.event_signature AS `event_signature`,
    block_number,
    transaction_hash
    FROM factory_decoded 
    WHERE decoded IS NOT NULL
    AND decoded.event_signature = 'PoolCreated'
Transform: pool_clean
SELECT 
    `id`, 
    decoded.event_params AS `event_params`, 
    decoded.event_signature AS `event_name`,
    block_number,
    transaction_hash
    FROM pool_decoded 
    WHERE decoded IS NOT NULL
    AND decoded.event_signature = 'Swap'

In the next two transforms we take the result of the previous encoding for each contract and filter by the PoolCreated and Swap events:

  • id: This is the Goldsky provided id, it is a string composed of the dataset name, block hash, and log index, which is unique per event, here’s an example: log_0x60eaf5a2ab37c73cf1f3bbd32fc17f2709953192b530d75aadc521111f476d6c_18
  • decoded.event_params AS 'event_params': event_params is an array containing the params associated to each event. For instance, in the case of Swap events, event_params[1] is the sender. You could use this for further analysis in downstream processing.
  • decoded.event_signature AS 'event_name': the decoder will output the event name as event_signature, excluding its arguments.
  • WHERE decoded IS NOT NULL: to leave out potential null results from the decoder
  • AND decoded.event_signature = 'PoolCreated': we use this value to filter only for ‘PoolCreated’ or ‘Swap’ events. As explained before, this filter will be redundant for datasets with Quick Ingestion enabled but we add it here in this example in case you would like to try out with a different dataset which doesn’t have that option enabled.

If you would like to filter by other events like Mint you could easily add them to these queries; for example: WHERE decoded.event_signature IN ('Swap', 'Mint')

Both resulting datasets will be used as sources to two different tables at our sink: decoded_events.poolcreated & decoded_events.swaps

Deploying the pipeline

Assuming we are using the same filename for the pipeline configuration as in this example we can deploy this pipeline with the CLI pipeline create command:

goldsky pipeline create base-dex-trades --definition-path base-dex-trades.yaml

Here’s an example Swap record from our sink:

idevent_paramsevent_nameblock_numbertransaction_hash
log_0x18db9278e431b3bb65c151857448227a649d9f8fe3fd0cdf2b9835eb8c71d8ae_4Swap14721620xd8a1b2c1296479f31f048aaf753e16f3d7d908fd17e6697b8850fdf209f080f6

We can see that it corresponds to the Swap event of this transaction:

This concludes our successful deployment of a Mirror pipeline streaming DEX Trade events from Base chain into our database using inline decoders. Congrats! 🎉

Conclusion

In this guide, we’ve walked through the process of using Mirror to decode and stream DEX events, specifically focusing on Swap and PoolCreated events, into a PostgreSQL database. Along the process we have seen an example implementation of how to do inline decoding using the ABI of Factory contracts with the Decoding Transform Functions

By understanding and leveraging these events, you can harness the power of real-time blockchain data to enhance your trading strategies, optimize liquidity management, and perform detailed market analysis. Experience the transformative power of Mirror today and redefine your approach to blockchain data integration.

Can't find what you're looking for? Reach out to us at support@goldsky.com for help.