Skip to main content
This guide will cover more advanced topics in transforming raw log data using decoding functions and some of the functionality of the Flink SQL backend. In other guides we focus on the ELT use-case; here we’ll walk through an ETL example considering Uniswap trades.

What you’ll need

  1. A basic understanding of the Mirror product’s more basic ETL use case.
  2. A basic understanding of SQL, though we use the syntax and functionality of Flink v1.17.
  3. A destination sink to write your data to

Preface

In this example we’ll provide a YAML file that includes the transforms needed to sync token transfers to your database. Several steps are needed to accomplish this and we’ll go into detail about the following subjects:
  1. We need to differentate ERC-721 (NFT) transfers from ERC-20 token transfers since they have the same event signature: Transfer(address,address,uint256)
  2. We need to extract and deduplicate ERC-1155 batch transfers.
In this example we’ve included two ERC-721 contract addresses and one ERC-1155 contract address, but you can change these two any token address or addresses of interest for you, or remove the address filters to get all transfers. Be careful though, removing the event filters will sync many millions of rows to your DB We’ve also included a number of columns that you may or may not need, the main columns needed for most purposes are: id, address (if you are syncing multiple contract addresses), sender, recipient, token_id, and value.

Pipeline YAML

There are two main transforms in this configuration and we’ll go through each one to explain how they work. If you copy and use this configuration file, make sure to update:
  1. Your secretName. If you already created a secret, you can find it via the CLI command goldsky secret list.
  2. The schema and table you want the data written to, by default it writes to mirror.transfers.
  3. The contract address or addresses you want to sync.
Advanced NFT transfers pipeline
sources:
  - referenceName: ethereum.raw_logs
    version: 1.0.0
    type: dataset
    startAt: earliest
    description: Raw logs for events emitted from contracts. Contains the
      raw topics, data, contract address, and metadata for the block and transaction.
transforms:
  - type: sql
    referenceName: ethereum_721_transfers
    primaryKey: id
    description: ERC721 Transfers
    sql: >-
      SELECT
              address AS contract_address,
              CONCAT('0x', SUBSTRING(SPLIT_INDEX(topics, ',', 1), 27)) AS sender,
              CONCAT('0x', SUBSTRING(SPLIT_INDEX(topics, ',', 2), 27)) AS recipient,
              COALESCE(TRY_CAST(CONV(SUBSTRING(SPLIT_INDEX(topics, ',', 3), 3), 16, 10) AS NUMERIC), -999) AS token_id,
              1 AS `value`,
              block_number,
              block_hash,
              log_index,
              transaction_hash,
              transaction_index,
              id
              FROM ethereum.raw_logs WHERE address IN ('0x22c1f6050e56d2876009903609a2cc3fef83b415', '0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d')
              AND topics LIKE '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef%'
              AND SPLIT_INDEX(topics, ',', 3) IS NOT NULL
  - type: sql
    referenceName: ethereum_1155_transfers
    primaryKey: id
    description: ERC1155 Transform (SingleTransfer only - for BatchTransfer see ERC-1155 guide)
    sql: >-
      SELECT
              address AS contract_address,
              CONCAT('0x', SUBSTRING(SPLIT_INDEX(topics, ',', 2), 27)) AS sender,
              CONCAT('0x', SUBSTRING(SPLIT_INDEX(topics, ',', 3), 27)) AS recipient,
              COALESCE(TRY_CAST(CONV(SUBSTRING(`data`, 3, 64), 16, 10) AS NUMERIC), -999) AS token_id,
              COALESCE(TRY_CAST(CONV(SUBSTRING(`data`, 67, 64), 16, 10) AS NUMERIC), -999) AS `value`,
              block_number,
              block_hash,
              log_index,
              transaction_hash,
              transaction_index,
              id FROM ethereum.raw_logs
              WHERE address = '0xc36cf0cfcb5d905b8b513860db0cfe63f6cf9f5c'
              AND topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%'
sinks:
  - type: postgres
    table: transfers
    schema: mirror
    secretName: <YOUR SECRET>
    description: Postgres sink for 721 NFT transfers
    referenceName: ethereum_721_sink
    sourceStreamName: ethereum_721_transfers
  - type: postgres
    table: transfers
    schema: mirror
    secretName: <YOUR SECRET>
    description: Postgres sink for 1155 transfers
    referenceName: ethereum_1155_sink
    sourceStreamName: ethereum_1155_transfers
For a complete ERC-1155 implementation including batch transfers, see the ERC-1155 Transfers guide which provides the full decoding logic.

ERC-721 (NFT) Transfers

Let’s first look at NFT transfers. This is the simpler of the two transformations. The main thing we need to do here is to make sure we’re pulling the sender, recipient, and token_id from the raw topics, and only getting NFT transfers rather than other transfers that may share the same event signature. This isn’t usually a problem when filtering for a specific contract address, but can become one when looking at all contract addresses, or contracts that may implement multiple types of tokens. We’ll start at the top.

Contract Address

SELECT address AS contract_address,
We use the lower function here to lower-case the address to make using this data simpler downstream, we also rename the column to contract_address to make it more explicit.

Sender

CONCAT('0x', SUBSTRING(SPLIT_INDEX(topics, ',', 1), 27)) AS sender,
Here we extract the sender address from the second topic (index 1). The topic is a 32-byte hex value with the address in the last 20 bytes, so we take the substring starting at position 27 and prepend 0x.

Recipient

CONCAT('0x', SUBSTRING(SPLIT_INDEX(topics, ',', 2), 27)) AS recipient,
Like the previous column, we’re pulling the recipient address from the third topic (index 2).

Token ID

COALESCE(TRY_CAST(CONV(SUBSTRING(SPLIT_INDEX(topics, ',', 3), 3), 16, 10) AS NUMERIC), -999) AS token_id,
Here we introduce a few more SQL functions, we’ll start from the inside and work our way out.
  1. SPLIT_INDEX(topics, ',', 3) extracts the fourth topic which contains the token ID for ERC-721 transfers.
  2. SUBSTRING(..., 3) removes the 0x prefix from the hex string.
  3. CONV(..., 16, 10) converts the hexadecimal value to decimal.
  4. TRY_CAST(... AS NUMERIC) casts the result to NUMERIC - token IDs can be as large as an unsigned 256 bit integer, so make sure your database can handle that. We use TRY_CAST because it will prevent the pipeline from failing in case the cast fails returning a NULL value instead.
  5. COALESCE(..., -999) returns -999 if the cast fails. This isn’t strictly necessary but is useful for finding offending values that were unable to be cast.

Token Value

1 AS `value`,
NFTs are meant to be unique, so they don’t have a value or balance associated with them when transfered, but since we’re combining these transfers with ERC-1155 tokens which do have a value, we need to normalize the columns, so each transfer is treated a transfer of a single NFT with a value of 1.
Some columns are surrounded by backticks, this is because they are reserved words in Flink SQL. Common columns that need backticks are: data, output, value, and a full list can be found here.

Block Metadata

block_number,
block_hash,
log_index,
transaction_hash,
transaction_index,
These columns aren’t necessarily needed for this example, but they’re included so you’re aware of them. A complete list of block metadata columns is available here. They are often useful for looking up the transfers in another tool, such as a block explorer like Etherscan.

ID Primary Key

id
This is the Goldsky provided id, it is a string composed of the dataset name, block hash, and log index, which is unique per event, here’s an example: log_0x60eaf5a2ab37c73cf1f3bbd32fc17f2709953192b530d75aadc521111f476d6c_18.
You may can save some space when storing the ID by using md5(id) as id in your transform. One reason you may want to keep the existing id format is that it makes it easier to order events in the same block without also syncing block hash and log index.

Address Filter

WHERE address IN ('0x22c1f6050e56d2876009903609a2cc3fef83b415', '0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d')
Here we’re filtering for two contract addresses. If we want all NFT transfers on the chain we can remove this line entirely.

Topic Filter and Length Check

AND topics LIKE '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef%'
AND SPLIT_INDEX(topics, ',', 3) IS NOT NULL
As mentioned in the preface, ERC-721 transfers share the same event_signature as ERC-20 transfers. What differentiates ERC-721 transfers from ERC-20 transfers are the number of topics associated with the event. ERC-721 transfers have four topics, and ERC-20 transfers have three. If you want to get into the nitty gritty you may enjoy the Solidity developer documentation for events, but for now know that in Mirror, topics is a comma separated string. Each value in the string is a hash. The first is the hash of the event_signature, in our case Transfer(address,address,uint256) for ERC-721, which is hashed to 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef as seen in our WHERE clause. We use LIKE to only consider the first signature, with a % at the end, which acts as a wildcard. SPLIT_INDEX splits the first argument by the second argument, and then extracts the 0-indexed argument, in this case 3 which would be the fourth element. Here’s an example topic string to consider:
0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef,0x000000000000000000000000441e1e47a6fa2dbfd3cd9b54291e9ab3a58d7975,0x00000000000000000000000097d2e8eeb59e521f10c2e7716eac3dd805ea9a46,0x0000000000000000000000000000000000000000000000000000000000043321
We check that the fourth element after splitting is NOT NULL to make sure this is an NFT transfer. An ERC-20 transfer would only have three elements when the topics are split, so SPLIT_INDEX would return NULL.

ERC-1155 Transfers

ERC-1155 combines the features of ERC-20 and ERC-721 contracts and adds a few features. Each transfer has both a token ID and a value representing the quantity being transfered for funglible tokens, the number 1 for tokens intended to represent NFTs, but how these work depends on how the contract is implemented. ERC-1155 also introduces new event signatures for transfers: TransferSingle(address,address,address,uint256,uint256) and TransferBatch(address,address,address,uint256[],uint256[]) which lets the contract transfer multiple tokens at once to a single recipient.

Extracting Data from Raw Logs

For ERC-1155 SingleTransfer events, we extract the sender and recipient from topics, and the token_id and value from the data field:
address AS contract_address,
CONCAT('0x', SUBSTRING(SPLIT_INDEX(topics, ',', 2), 27)) AS sender,
CONCAT('0x', SUBSTRING(SPLIT_INDEX(topics, ',', 3), 27)) AS recipient,
COALESCE(TRY_CAST(CONV(SUBSTRING(`data`, 3, 64), 16, 10) AS NUMERIC), -999) AS token_id,
COALESCE(TRY_CAST(CONV(SUBSTRING(`data`, 67, 64), 16, 10) AS NUMERIC), -999) AS `value`,
The sender and recipient are extracted from topics (indices 2 and 3). The token_id and value are extracted from the data field - the first 64 hex characters (after the 0x prefix) contain the token_id, and the next 64 contain the value.

Topic Filter for Single Transfers

AND topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%'
We filter for the TransferSingle(address,address,address,uint256,uint256) event signature hash.
For batch transfers which require more complex array handling, see the complete ERC-1155 Transfers guide.

Using the Data

With this table in place, you can create views that show you a number of useful pieces of information:
  1. All mints. For ERC-721 and ERC-1155 a mint is identified by being from the sender 0x0000000000000000000000000000000000000000
  2. All current holders of a token, or balances for ERC-1155 holders.
Example balance query:
(SELECT id, block_number, contract_address, recipient as owner_address, token_id, value FROM transfers
   UNION ALL SELECT id, block_number, contract_address, sender as owner_address, token_id, -value FROM transfers)
SELECT contract_address || '_' || owner_address || '_' || CAST(token_id AS TEXT) as id, contract_address, owner_address,
token_id, sum(value) as balance, min(block_number) as first_updated, max(block_number) as last_updated
FROM ledger WHERE owner_address <> '0x0000000000000000000000000000000000000000'
GROUP BY contract_address, token_id, owner_address
Can’t find what you’re looking for? Reach out to us at support@goldsky.com for help.