Skip to main content
This guide will cover more advanced topics in transforming our decoded datasets and some of the functionality of the Flink SQL backend. In other guides we focus on the ELT use-case; here we’ll walk through an ETL example considering Uniswap trades.

What you’ll need

  1. A basic understanding of the Mirror product’s more basic ETL use case.
  2. A basic understanding of SQL, though we use the syntax and functionality of Flink v1.17.
  3. A destination sink to write your data to

Preface

In this example we’ll provide a YAML file that includes the transforms needed to sync token transfers to your database. Several steps are needed to accomplish this and we’ll go into detail about the following subjects:
  1. We need to differentate ERC-721 (NFT) transfers from ERC-20 token transfers since they have the same event signature in decoded data: Transfer(address,address,uint256)
  2. We need to extract and deduplicate ERC-1155 batch transfers.
In this example we’ve included two ERC-721 contract addresses and one ERC-1155 contract address, but you can change these two any token address or addresses of interest for you, or remove the address filters to get all transfers. Be careful though, removing the event filters will sync many millions of rows to your DB We’ve also included a number of columns that you may or may not need, the main columns needed for most purposes are: id, address (if you are syncing multiple contract addresses), sender, recipient, token_id, and value.

Pipeline YAML

There are two main transforms in this configuration and we’ll go through each one to explain how they work. If you copy and use this configuration file, make sure to update:
  1. Your secretName. If you already created a secret, you can find it via the CLI command goldsky secret list.
  2. The schema and table you want the data written to, by default it writes to mirror.transfers.
  3. The contract address or addresses you want to sync.
sources:
  - referenceName: ethereum.decoded_logs
    version: 1.0.0
    type: dataset
    startAt: earliest
    description: Decoded logs for events emitted from contracts. Contains the
      decoded event signature and event parameters, contract address, data,
      topics, and metadata for the block and transaction.
transforms:
  - type: sql
    referenceName: ethereum_721_transfers
    primaryKey: id
    description: ERC721 Transfers
    sql: >-
      SELECT
              address AS contract_address,
              lower(event_params[1]) AS sender,
              lower(event_params[2]) AS recipient,
              COALESCE(TRY_CAST(event_params[3] AS NUMERIC), -999) AS token_id,
              1 AS `value`,
              raw_log.block_number       AS block_number,
              raw_log.block_hash         AS block_hash,
              raw_log.log_index          AS log_index,
              raw_log.transaction_hash   AS transaction_hash,
              raw_log.transaction_index  AS transaction_index,
              id
              FROM ethereum.decoded_logs WHERE address IN ('0x22c1f6050e56d2876009903609a2cc3fef83b415', '0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d')
              AND raw_log.topics LIKE '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef%'
              AND SPLIT_INDEX(raw_log.topics, ',', 3) IS NOT NULL
  - type: sql
    referenceName: ethereum_1155_transfers
    primaryKey: id
    description: ERC1155 Transform
    sql: >-
      SELECT
              address AS contract_address,
              lower(event_params[2]) AS sender,
              lower(event_params[3]) AS recipient,
              COALESCE(TRY_CAST(event_params[4] AS NUMERIC), -999) AS token_id,
              COALESCE(TRY_CAST(event_params[5] AS NUMERIC), -999) AS `value`,
              raw_log.block_number       AS block_number,
              raw_log.block_hash         AS block_hash,
              raw_log.log_index          AS log_index,
              raw_log.transaction_hash   AS transaction_hash,
              raw_log.transaction_index  AS transaction_index,
              id FROM ethereum.decoded_logs
              WHERE address = '0xc36cf0cfcb5d905b8b513860db0cfe63f6cf9f5c'
              AND raw_log.topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%'
              UNION ALL SELECT
              address AS contract_address,
              lower(event_params[2]) AS sender,
              lower(event_params[3]) AS recipient,
              COALESCE(TRY_CAST(token_ids.token_id AS NUMERIC),-999) AS token_id,
              COALESCE(TRY_CAST(`values`.`token_value` AS NUMERIC),0) AS `value`,
              raw_log.block_number       AS block_number,
              raw_log.block_hash         AS block_hash,
              raw_log.log_index          AS log_index,
              raw_log.transaction_hash   AS transaction_hash,
              raw_log.transaction_index  AS transaction_index,
              id || '_' || token_ids.token_id AS id FROM ethereum.decoded_logs
              CROSS JOIN UNNEST(_gs_split_string_by(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])))) AS token_ids (token_id)
              CROSS JOIN UNNEST(_gs_split_string_by(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[5])))) AS `values` (token_value)
              WHERE raw_log.topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%'
              AND address = '0xc36cf0cfcb5d905b8b513860db0cfe63f6cf9f5c'
sinks:
  - type: postgres
    table: transfers
    schema: mirror
    secretName: <YOUR SECRET>
    description: Postgres sink for 721 NFT transfers
    referenceName: ethereum_721_sink
    sourceStreamName: ethereum_721_transfers
  - type: postgres
    table: transfers
    schema: mirror
    secretName: <YOUR SECRET>
    description: Postgres sink for 1155 transfers
    referenceName: ethereum_1155_sink
    sourceStreamName: ethereum_1155_transfers

ERC-721 (NFT) Transfers

Let’s first look at NFT transfers. This is the simpler of the two transformations. The main thing we need to do here is to make sure we’re pulling the sender, recipient, and token_id from the event_params array, and only getting NFT transfers rather than other transfers that may share the same event signature. This isn’t usually a problem when filtering for a specific contract address, but can become one when looking at all contract addresses, or contracts that may implement multiple types of tokens. We’ll start at the top.

Contract Address

SELECT address AS contract_address,
We use the lower function here to lower-case the address to make using this data simpler downstream, we also rename the column to contract_address to make it more explicit.

Sender

lower(event_params[1]) AS sender,
Here we continue to lower-case values for consistency. In this case we’re using the first element of the event_params array (using a 1-based index), and renaming it to sender. Each event parameter maps to an argument to the event_signature, for ERC-721, they are the sender, recipient, and token_id respectively.

Recipient

lower(event_params[2]) AS recipient,
Like the previous column, we’re pulling the second element in the event_params array and renaming it to recipient.

Token ID

COALESCE(TRY_CAST(event_params[3] AS NUMERIC), -999) AS token_id,
Here we introduce a few more SQL functions, we’ll start from the inside and work our way out.
  1. event_params[3] is the third element of the event_params array, and for ERC-721 this is the token ID. Although not covered in this example, since ERC-20 shares the same signature, this element represents a token balance rather than token ID if you’re decoding ERC-20 transfers.
  2. TRY_CAST(event_params[3] AS NUMERIC) is casting the string element event_params[3] to NUMERIC - token IDs can be as large as an unsigned 256 bit integer, so make sure your database can handle that, if not, you can cast it to a different data type that your sink can handle. We use TRY_CAST because it will prevent the pipeline from failing in case the cast fails returning a NULL value instead.
  3. COALESCE(TRY_CAST(event_params[3] AS NUMERIC), -999): COALESCE can take an arbitrary number of arguments and returns the first non-NULL value. Since TRY_CAST can return a NULL we’re returning -999 in case it does. This isn’t strictly necessary but is useful to do in case you want to find offending values that were unable to be cast.

Token Value

1 AS `value`,
NFTs are meant to be unique, so they don’t have a value or balance associated with them when transfered, but since we’re combining these transfers with ERC-1155 tokens which do have a value, we need to normalize the columns, so each transfer is treated a transfer of a single NFT with a value of 1.
Some columns are surrounded by backticks, this is because they are reserved words in Flink SQL. Common columns that need backticks are: data, output, value, and a full list can be found here.

Block Metadata

raw_log.block_number       AS block_number,
raw_log.block_hash         AS block_hash,
raw_log.log_index          AS log_index,
raw_log.transaction_hash   AS transaction_hash,
raw_log.transaction_index  AS transaction_index,
These columns aren’t necessarily needed for this example, but they’re included so you’re aware of them. A complete list of block metadata columns is available here. They are often useful for looking up the transfers in another tool, such as a block explorer like Etherscan.

ID Primary Key

id
This is the Goldsky provided id, it is a string composed of the dataset name, block hash, and log index, which is unique per event, here’s an example: decoded_log_0x60eaf5a2ab37c73cf1f3bbd32fc17f2709953192b530d75aadc521111f476d6c_18.
You may can save some space when storing the ID by using md5(id) as id in your transform. One reason you may want to keep the existing id format is that it makes it easier to order events in the same block without also syncing block hash and log index.

Address Filter

WHERE address IN ('0x22c1f6050e56d2876009903609a2cc3fef83b415', '0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d')
Here we’re filtering for two contract addresses. If we want all NFT transfers on the chain we can remove this line entirely.

Topic Filter and Length Check

AND raw_log.topics LIKE '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef%'
AND SPLIT_INDEX(raw_log.topics, ',', 3) IS NOT NULL
As mentioned in the preface, ERC-721 transfers share the same event_signature as ERC-20 transfers. What differentiates ERC-721 transfers from ERC-20 transfers are the number of topics associated with the event. ERC-721 transfers have four topics, and ERC-20 transfers have three. If you want to get into the nitty gritty you may enjoy the Solidity developer documentation for events, but for now know that in Mirror, raw_log.topics is a comma separated string. Each value in the string is a hash. The first is the hash of the event_signature and event_params, in our case Transfer(address,address,uint256) for ERC-721, which is hashed to 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef as seen in our WHERE clause. We use LIKE to only consider the first signature, with a % at the end, which acts as a wildcard. We could also use a filter such as event_signature = 'Transfer(address,address,uint256)', but we wanted to introduce the idea of topics as they can be a useful filter for some older contracts that may not completely follow the specification for ERC-721 contracts. SPLIT_INDEX splits the first argument by the second argument, and then extracts the 0-indexed argument, in this case 3 which would be the fourth element. Here’s an example topic string to consider:
0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef,0x000000000000000000000000441e1e47a6fa2dbfd3cd9b54291e9ab3a58d7975,0x00000000000000000000000097d2e8eeb59e521f10c2e7716eac3dd805ea9a46,0x0000000000000000000000000000000000000000000000000000000000043321
We check that the fourth element after splitting is NOT NULL to make sure this is an NFT transfer. An ERC-20 transfer would only have three elements when the topics are split, so SPLIT_INDEX would return NULL.

ERC-1155 Transfers

ERC-1155 combines the features of ERC-20 and ERC-721 contracts and adds a few features. Each transfer has both a token ID and a value representing the quantity being transfered for funglible tokens, the number 1 for tokens intended to represent NFTs, but how these work depends on how the contract is imlpemented. ERC-1155 also introduces new event signatures for transfers: TransferSingle(address,address,address,uint256,uint256) and TransferBatch(address,address,address,uint256[],uint256[]) which lets the contract transfer multiple tokens at once to a single recipient. This causes us some trouble since we want one row per transfer in our database, so we’ve got some extra SQL in our transform to deal with this. We won’t cover details that overlap with the ERC-721 example and will focus on the differences for ERC-1155.

Event Parameters for ERC-1155

address AS contract_address,
lower(event_params[2]) AS sender,
lower(event_params[3]) AS recipient,
COALESCE(TRY_CAST(event_params[4] AS NUMERIC), -999) AS token_id,
COALESCE(TRY_CAST(event_params[5] AS NUMERIC), -999) AS `value`,
Similar to the ERC-721 event_params we pull out the sender, recipient and token ID, note the indexes we use are different since ERC-1155 tokens have a different event_signature. We also get a value column from element five.

ID Primary Key for Batch Transfers

id || '_' || token_ids.token_id AS id
We modify the id column for batch transfers. The id coming from the source represents an entire batch transfer event, which can contain multiple tokens, so we concatenate the token_id to the id to make the unnested rows unique.

Topic Filter for Single Transfers

AND raw_log.topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%'
We filter for a different topic to get ERC-1155 single transfers, the above topic is for the event_signature TransferSingle(address,address,address,uint256,uint256). As with ERC-721, we could use the event signature as a filter instead.

Combining Single and Batch Transfers

UNION ALL SELECT
This directive creates a combined stream of all single transfers and batch transfers.

Array Splitting and Unnesting for Batch Transfers

CROSS JOIN UNNEST(_gs_split_string_by(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])))) AS token_ids (token_id)
CROSS JOIN UNNEST(_gs_split_string_by(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[5])))) AS `values` (token_value)
This is the trickiest part of the transformation and involves some functionality that is niche to both Goldsky and Flink v1.17. We’ll start from the inside and work our way out again.
  1. TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4]): The string for batch transfers in element 4 looks like this when decoded: [1 2 3 4 5 6]. We need to trim the leading and trailing [ and ] characters before splitting it out into individual token IDs.
  2. _gs_split_string_by(...): This is a Goldsky UDF which splits strings by the space character only. If you need to split by another character, for now you can use REGEXP_REPLACE(column, ',', ' ') to replace commas with spaces.
  3. CROSS JOIN UNNEST ... AS token_ids (token_id): This works like UNNEST in most other SQL dialects, but is a special case in Flink. It may be confusing that we have two separate CROSS JOINs, but they don’t work like CROSS JOIN in other SQL dialects, we’ll get a single row with a token_id and token_value that map correctly to each other.

Topic Filter for Batch Transfers

raw_log.topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%'
This is the same as the other topic filters, but is using the topic hash of the batch transfer event signature.

Using the Data

With this table in place, you can create views that show you a number of useful pieces of information:
  1. All mints. For ERC-721 and ERC-1155 a mint is identified by being from the sender 0x0000000000000000000000000000000000000000
  2. All current holders of a token, or balances for ERC-1155 holders.
Example balance query:
(SELECT id, block_number, contract_address, recipient as owner_address, token_id, value FROM transfers
   UNION ALL SELECT id, block_number, contract_address, sender as owner_address, token_id, -value FROM transfers)
SELECT contract_address || '_' || owner_address || '_' || CAST(token_id AS TEXT) as id, contract_address, owner_address,
token_id, sum(value) as balance, min(block_number) as first_updated, max(block_number) as last_updated
FROM ledger WHERE owner_address <> '0x0000000000000000000000000000000000000000'
GROUP BY contract_address, token_id, owner_address
Can’t find what you’re looking for? Reach out to us at support@goldsky.com for help.
I