> ## Documentation Index
> Fetch the complete documentation index at: https://docs.goldsky.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Stream DEX trades

> Stream Decentralized Exchange trade events to your database

Welcome to our guide on using Mirror to create pipelines for decoding and streaming data from decentralized exchanges (DEX) into your datawarehouse.
Mirror is a powerful tool designed to facilitate the seamless integration of blockchain data into different [sinks](/mirror/sinks/supported-sinks), enabling you to leverage the vast amounts of information generated by DEXs for analytics, reporting, and more.

## What you'll need

1. A Goldky account and the CLI installed

<Accordion title="Install Goldsky's CLI and log in">
  1) Install the Goldsky CLI:

     **For macOS/Linux:**

     ```shell theme={null}
     curl https://goldsky.com | sh
     ```

     **For Windows:**

     ```shell theme={null}
     npm install -g @goldskycom/cli
     ```

     <Note>Windows users need to have Node.js and npm installed first. Download from [nodejs.org](https://nodejs.org) if not already installed.</Note>
  2) Go to your [Project Settings](https://app.goldsky.com/dashboard/settings) page and create an API key.
  3) Back in your Goldsky CLI, log into your Project by running the command `goldsky login` and paste your API key.
  4) Now that you are logged in, run `goldsky` to get started:
     ```shell theme={null}
     goldsky
     ```
</Accordion>

1. A basic understanding of the [Mirror product](/mirror)
2. A destination sink to write your data to. In this example, we will use [the PostgreSQL Sink](/mirror/sinks/postgres)

## Introduction

Most of the Decentralized Exchanges these days are based entirely on the Uniswap protocol or have strong similarities with it.

<Note>
  If you need a high level overview of how Uniswap works you can check out [this reference page](https://docs.uniswap.org/contracts/v2/concepts/protocol-overview/how-uniswap-works)
</Note>

Having that in mind, we can narrow our focus on identifying events emitted by Uniswap contracts and use them to identify similar events emitted by all DEXes in the blockchain.
There are a number of different events we could track. In this guide we will track the `Swap` and `PoolCreated` events as they are arguably two of the most important events to track when wanting to make sense of trading activity in a DEX.

For this example implementation, we will choose the [Raw Log Direct Indexing](https://docs.goldsky.com/mirror/sources/direct-indexing) for the Base chain as the source of our pipeline but you could choose any other chain for which the Raw Log dataset if that is preferred.
Raw logs need to be decoded for us to be able to identify the events we want to track. For that purpose, we will use [Decoding Transform Functions](/reference/mirror-functions/decoding-functions) to dinamically fetch the ABIs of both [UniswapV3Factory](https://basescan.org/address/0x33128a8fc17869897dce68ed026d694621f6fdfd) and [UniswapV3Pool](https://basescan.org/address/0xcccc03b23cd798c06828c377466f267e59bb9739) contracts from the Basescan API since they contain
the actual definition for the PoolCreated and Swap events.

<Note>
  It's worth mentioning that Uniswap has different versions and it's possible that some event definitions might differ. In this example we'll focus on UniswapV3. Depending on the events you are interested in tracking you might want to refine this example accordingly but the principles explained will stay the same.
</Note>

Let's now see all these concepts applied in an example pipeline definition:

## Pipeline Definition

```yaml base-dex-trades.yaml expandable theme={null}
name: base-dex-trades
apiVersion: 3
sources:
  base_logs:
    type: dataset
    dataset_name: base.logs
    version: 1.0.0
    filter: topics like '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118%' OR topics like '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67%'
transforms:
  factory_decoded:
    primary_key: id
    # Fetch the ABI of UniswapV3Factory in Base and use it to decode PoolCreated events
    sql: >
      SELECT 
        `id`,
        _gs_log_decode(
            _gs_fetch_abi('https://gist.githubusercontent.com/JavierTrujilloG/7df78272e689bf102cbe97ae86607d94/raw/9733aaa132a2c3e82cccbe5b0681d3270d696c83/UniswapV3Factory-ABI.json', 'raw'), 
            `topics`, 
            `data`
        ) AS `decoded`, 
        block_number, 
        transaction_hash 
      FROM base_logs  
  pool_decoded: 
    primary_key: id
    # Fetch the ABI of a UniswapV3Pool in Base and use it to decode Swap events
    sql: >
      SELECT 
        `id`,
        _gs_log_decode(
            _gs_fetch_abi('https://gist.githubusercontent.com/JavierTrujilloG/d3d2d80fbfd3415dd8e11aa498bd0909/raw/b8df8303e51ac7ad9ac921f25bfa84936bb4bc63/UniswapV3Pool-ABI.json', 'raw'), 
            `topics`, 
            `data`
        ) AS `decoded`, 
        block_number, 
        transaction_hash 
      FROM base_logs
  factory_clean:
    primary_key: id
    # Clean up the previous transform, unnest the values from the `decoded` object to get PoolCreated event data
    sql: >
      SELECT 
        `id`, 
        decoded.event_params AS `event_params`, 
        decoded.event_signature AS `event_signature`,
        block_number,
        transaction_hash
        FROM factory_decoded 
        WHERE decoded IS NOT NULL
        AND decoded.event_signature = 'PoolCreated'
  pool_clean:
    primary_key: id
    # Clean up the previous transform, unnest the values from the `decoded` object to get Swap event data
    sql: >
      SELECT 
        `id`, 
        decoded.event_params AS `event_params`, 
        decoded.event_signature AS `event_signature`,
        block_number,
        transaction_hash
        FROM pool_decoded 
        WHERE decoded IS NOT NULL
        AND decoded.event_signature = 'Swap'
sinks:
  poolcreated_events_sink:
    secret_name: <YOUR_SECRET>
    type: postgres
    schema: decoded_events
    table: poolcreated
    from: factory_clean
  swaps_event_sink:
    secret_name: <YOUR_SECRET>
    type: postgres
    schema: decoded_events
    table: swaps
    from: pool_clean
```

<Note>
  If you copy and use this configuration file, make sure to update:

  1. Your `secretName`. If you already [created a secret](/mirror/manage-secrets), you can find it via the [CLI command](/reference/cli#secret) `goldsky secret list`.
  2. The schema and table you want the data written to, by default it writes to `decoded_events` schema.
</Note>

Let's deconstruct this pipeline starting at the top:

### Fast Scan Source

```sql source theme={null}
sources:
  base_logs:
    type: dataset
    dataset_name: base.logs
    version: 1.0.0
    filter: topics like '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118%' OR topics like '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67%'
```

We start the pipeline defining a filter on the data source. This is a new feature called Quick Ingestion by which we are able to filter on the original dataset at a much faster rate than if we were doing normal filtering with transforms. Not all datasets currently have this feature but `base_logs` does and so we want to make use of it for this example dramatically.
We add as source filters the function signatures of the PoolCreated and Swap events:

* `PoolCreated (index_topic_1 address token0, index_topic_2 address token1, index_topic_3 uint24 fee, int24 tickSpacing, address pool)` maps to `0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118`
* `Swap (index_topic_1 address sender, index_topic_2 address recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)` maps to `0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67`

It's worth highlighting that we add a `%` to the end of each filter as `topics` contains more than just the function signature.

If the dataset we are using has the option for Quick Ingestion this filter will be pre-applied and it will speed up ingestion dramatically. Because not all datasets have this option enabled yet we'll add some redundancy on the transform by adding extra filters on these events to make sure that we are targeting these two events regardless of this feature being available or not.

Next, there are 4 transforms in this pipeline definition which we'll explain how they work. We'll start from the top:

### Decoding Transforms

```sql Transform: factory_decoded theme={null}
SELECT 
        `id`,
        _gs_log_decode(
            _gs_fetch_abi('https://gist.githubusercontent.com/JavierTrujilloG/7df78272e689bf102cbe97ae86607d94/raw/9733aaa132a2c3e82cccbe5b0681d3270d696c83/UniswapV3Factory-ABI.json', 'raw'), 
            `topics`, 
            `data`
        ) AS `decoded`, 
        block_number, 
        transaction_hash 
      FROM base.logs 
```

```sql Transform: pool_decoded theme={null}
SELECT 
    `id`,
    _gs_log_decode(
        _gs_fetch_abi('https://gist.githubusercontent.com/JavierTrujilloG/d3d2d80fbfd3415dd8e11aa498bd0909/raw/b8df8303e51ac7ad9ac921f25bfa84936bb4bc63/UniswapV3Pool-ABI.json', 'raw'), 
        `topics`, 
        `data`
    ) AS `decoded`, 
    block_number, 
    transaction_hash 
    FROM base.logs
```

The first two transforms fetch the ABI for UniswapV3Factory and a UniswapV3Pool to allows to decode Dex events and filter by `PoolCreated` and `Swap` events in the following transforms.
As explained in the [Decoding Contract Events guide](/mirror/guides/decoding-contract-events), we first make use of the `_gs_fetch_abi` function to get the ABIs from Basescan and pass it as first argument
to the function `_gs_log_decode` to decode its topics and data. We store the result in a `decoded` [ROW](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/types/#row) which we unnest on the next transform.

### Event Filtering Transforms

```sql Transform: factory_clean theme={null}
SELECT 
    `id`, 
    decoded.event_params AS `event_params`, 
    decoded.event_signature AS `event_signature`,
    block_number,
    transaction_hash
    FROM factory_decoded 
    WHERE decoded IS NOT NULL
    AND decoded.event_signature = 'PoolCreated'
```

```sql Transform: pool_clean theme={null}
SELECT 
    `id`, 
    decoded.event_params AS `event_params`, 
    decoded.event_signature AS `event_name`,
    block_number,
    transaction_hash
    FROM pool_decoded 
    WHERE decoded IS NOT NULL
    AND decoded.event_signature = 'Swap'
```

In the next two transforms we take the result of the previous encoding for each contract and filter by the PoolCreated and Swap events:

* `id`:  This is the Goldsky provided `id`, it is a string composed of the dataset name, block hash, and log index, which is unique per event, here's an example: `log_0x60eaf5a2ab37c73cf1f3bbd32fc17f2709953192b530d75aadc521111f476d6c_18`
* `decoded.event_params AS 'event_params'`: event\_params is an array containing the params associated to each event. For instance, in the case of Swap events, `event_params[1]` is the sender. You could use this for further analysis in downstream processing.
* `decoded.event_signature AS 'event_name'`: the decoder will output the event name as event\_signature, excluding its arguments.
* `WHERE decoded IS NOT NULL`: to leave out potential null results from the decoder
* `AND decoded.event_signature = 'PoolCreated'`: we use this value to filter only for 'PoolCreated' or 'Swap' events. As explained before, this filter will be redundant for datasets with Quick Ingestion enabled but we add it here in this example in case you would like to try out with a different dataset which doesn't have that option enabled.

If you would like to filter by other events like `Mint` you could easily add them to these queries; for example: `WHERE decoded.event_signature IN ('Swap', 'Mint')`

Both resulting datasets will be used as sources to two different tables at our sink: `decoded_events.poolcreated` & `decoded_events.swaps`

## Deploying the pipeline

Assuming we are using the same filename for the pipeline configuration as in this example we can deploy this pipeline with the [CLI pipeline create command](/reference/cli#pipeline-create):

`goldsky pipeline apply base-dex-trades.yaml --status ACTIVE`

Here's an example Swap record from our sink:

| id                                                                         | event\_params                                                                                                                                                                      | event\_name | block\_number | transaction\_hash                                                  |
| -------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------- | ------------- | ------------------------------------------------------------------ |
| log\_0x18db9278e431b3bb65c151857448227a649d9f8fe3fd0cdf2b9835eb8c71d8ae\_4 | 0x508fdf90951c1a31faa5dcd119f3b60e0e0e87fb,0x508fdf90951c1a31faa5dcd119f3b60e0e0e87fb,-256654458505550,500000000000000000,3523108129873998835611448265535,631691941157619701,75899 | Swap        | 1472162       | 0xd8a1b2c1296479f31f048aaf753e16f3d7d908fd17e6697b8850fdf209f080f6 |

We can see that it corresponds to the Swap event of this transaction:

<img className="block mx-auto" width="450" src="https://mintcdn.com/goldsky-38/djvhUUMseW21frQF/images/mirror/guides/stream-dex-trades/swap.png?fit=max&auto=format&n=djvhUUMseW21frQF&q=85&s=820e7d0bf9ff710eaab340d991af6948" data-path="images/mirror/guides/stream-dex-trades/swap.png" />

This concludes our successful deployment of a Mirror pipeline streaming DEX Trade events from Base chain into our database using inline decoders. Congrats\\! 🎉

## Conclusion

In this guide, we've walked through the process of using Mirror to decode and stream DEX events, specifically focusing on Swap and PoolCreated events, into a PostgreSQL database.
Along the process we have seen an example implementation of how to do inline decoding using the ABI of Factory contracts with the [Decoding Transform Functions](/reference/mirror-functions/decoding-functions)

By understanding and leveraging these events, you can harness the power of real-time blockchain data to enhance your trading strategies, optimize liquidity management, and perform detailed market analysis.
Experience the transformative power of Mirror today and redefine your approach to blockchain data integration.

Can't find what you're looking for? Reach out to us at [support@goldsky.com](mailto:support@goldsky.com) for help.
