This guide is part of a series of tutorials on how you can export transfer data of different ERC standards into your datawarehouse. Here we will be focusing on ERC-1155 Transfers, visit the following two other guides for other types of Transfers:

In other guides we focus on the ELT use-case; here we will cover more advanced topics in transforming our decoded datasets and some of the functionality of the Flink SQL backend.

What you’ll need

  1. A basic understanding of the Mirror product’s more basic ETL use case.
  2. A basic understanding of SQL, though we use the syntax and functionality of Flink v1.17.
  3. A destination sink to write your data to.

Preface

In this example we’ll provide a YAML file that includes the transforms needed to sync token transfers to your database.

One of the particular aspects of this dataset is that we need to extract and deduplicate ERC-1155 batch transfers.

In this example we’ve included one ERC-1155 contract address, Town Star Token, but you can change them to any token address or addresses of interest for you, or remove the address filters to get all transfers. Be careful though, removing the event filters will sync many millions of rows to your DB.

We’ve also included a number of columns that you may or may not need, the main columns needed for most purposes are: id, address (if you are syncing multiple contract addresses), sender, recipient, token_id, and value.

Pipeline YAML

There is one transform in this configuration and we’ll go through each one to explain how they work. If you copy and use this configuration file, make sure to update:

  1. Your secretName. If you already created a secret, you can find it via the CLI command goldsky secret list.
  2. The schema and table you want the data written to, by default it writes to mirror.transfers.
  3. The contract address or addresses you want to sync.
sources:
  - referenceName: ethereum.decoded_logs
    version: 1.0.0
    type: dataset
    startAt: earliest
    description: Decoded logs for events emitted from contracts. Contains the
      decoded event signature and event parameters, contract address, data,
      topics, and metadata for the block and transaction.
transforms:
  - type: sql
    referenceName: erc1155_transfer_single
    primaryKey: id
    sql: >-
      SELECT
          lower(address) AS contract_address,
          lower(event_params[2]) AS sender,
          lower(event_params[3]) AS recipient,
          COALESCE(TRY_CAST(event_params[4] AS NUMERIC), -999) AS token_id,
          COALESCE(TRY_CAST(event_params[5] AS NUMERIC), -999) AS amount,
          raw_log.block_number       AS block_number,
          raw_log.block_hash         AS block_hash,
          raw_log.log_index          AS log_index,
          raw_log.transaction_hash   AS transaction_hash,
          raw_log.transaction_index  AS transaction_index,
          id
          FROM ethereum.decoded_logs WHERE raw_log.topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%'
          AND lower(address) = '0xc36cf0cfcb5d905b8b513860db0cfe63f6cf9f5c'
  - type: sql
    referenceName: erc1155_transfer_batch
    primaryKey: id
    description: ERC1155 Transform
    sql: >-
      WITH transfer_batch_logs AS (
        SELECT
          *,
          _gs_split_string_by(
            REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])), ',', ' ')
          ) AS token_ids,
          _gs_split_string_by(
            REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[5])), ',', ' ')
          ) AS amounts
        FROM
          ethereum.decoded_logs
        WHERE raw_log.topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%'
        AND lower(address) = '0xc36cf0cfcb5d905b8b513860db0cfe63f6cf9f5c'
        )
      SELECT
        lower(address) AS contract_address,
        lower(event_params[2]) AS sender,
        lower(event_params[3]) AS recipient,
        CAST(token_ids[t.idx] AS NUMERIC(78)) as token_id,
        CAST(amounts[t.idx] AS NUMERIC(78)) as amount,
        raw_log.block_number       AS block_number,
        raw_log.block_hash         AS block_hash,
        raw_log.log_index          AS log_index,
        raw_log.transaction_hash   AS transaction_hash,
        raw_log.transaction_index  AS transaction_index,
        id || '_' || CAST(t.idx AS STRING) AS `id`
        FROM transfer_batch_logs
          CROSS JOIN UNNEST(
            CAST(
              _gs_generate_series(
                CAST(1 AS BIGINT),
                CAST(COALESCE(CARDINALITY(token_ids), 0) AS BIGINT)
            ) AS ARRAY<INTEGER>
          )
          ) AS t (idx)
  - type: sql
    referenceName: ethereum_1155_transfers
    primaryKey: id
    sql: SELECT * FROM erc1155_transfer_single UNION ALL SELECT * FROM
      erc1155_transfer_batch WHERE amount > 0
sinks:
  - type: postgres
    table: erc1155_transfers
    schema: mirror
    secretName: <YOUR_SECRET>
    description: Postgres sink for 1155 transfers
    referenceName: transfers
    sourceStreamName: ethereum_1155_transfers

ERC-1155 Transfers

ERC-1155 combines the features of ERC-20 and ERC-721 contracts and adds a few features. Each transfer has both a token ID and a value representing the quantity being transfered for funglible tokens, the number 1 for tokens intended to represent NFTs, but how these work depends on how the contract is implemented. ERC-1155 also introduces new event signatures for transfers: TransferSingle(address,address,address,uint256,uint256) and TransferBatch(address,address,address,uint256[],uint256[]) which lets the contract transfer multiple tokens at once to a single recipient. This causes us some trouble since we want one row per transfer in our database, so we’ve got some extra SQL in our transform to deal with this. To mitigate this complexity we have created two different transforms, each dealing with Single and Batch transfers separately. We then aggregate both tables into a single view using a third transform.

We’ll start at the top.

SingleTransfers Transform

We first start analysing the transform for SingleTransfer event:

Event Parameters for ERC-1155

lower(address) AS contract_address,
lower(event_params[2]) AS sender,
lower(event_params[3]) AS recipient,

Similar to the ERC-721 example, we use event_params we pull out the sender, recipient and token ID, note the indexes we use are different since ERC-1155 tokens have a different event_signature. We also get a amount column from element five.

COALESCE(TRY_CAST(event_params[4] AS NUMERIC), -999) AS token_id,
COALESCE(TRY_CAST(event_params[5] AS NUMERIC), -999) AS amount,
  1. event_params[4] is the fourth element of the event_params array, and for ERC-1155 this is the token ID.
  2. TRY_CAST(event_params[4] AS NUMERIC) is casting the string element event_params[4] to NUMERIC - token IDs can be as large as an unsigned 256 bit integer, so make sure your database can handle that, if not, you can cast it to a different data type that your sink can handle. We use TRY_CAST because it will prevent the pipeline from failing in case the cast fails returning a NULL value instead.
  3. COALESCE(TRY_CAST(event_params[4] AS NUMERIC), -999): COALESCE can take an arbitrary number of arguments and returns the first non-NULL value. Since TRY_CAST can return a NULL we’re returning -999 in case it does. This isn’t strictly necessary but is useful to do in case you want to find offending values that were unable to be cast.

We repeat this process for event_params[5] which represents the amount of a token.

Topic Filter for Single Transfers

AND raw_log.topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%'

We filter for a specific topic to get ERC-1155 single transfers, the above topic is for the event_signature TransferSingle(address,address,address,uint256,uint256). As with ERC-721, we could use the event signature as a filter instead.

BatchTransfers Transform

Now, let’s look at the BatchTransfer events:

Array Splitting and Unnesting for Batch Transfers

WITH transfer_batch_logs AS (
    SELECT
      *,
      _gs_split_string_by(
        REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])), ',', ' ')
      ) AS token_ids,
      _gs_split_string_by(
        REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[5])), ',', ' ')
      ) AS amounts
    FROM
      ethereum.decoded_logs
    WHERE raw_log.topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%'
    AND lower(address) = '0xc36cf0cfcb5d905b8b513860db0cfe63f6cf9f5c'
    )

The first thing we want to achieve is to decompose the string representation of tokens and their respective amounts into separate rows that we can add as columns to each transaction. This will allow us to index on tokenId-amount pairs much more easily as a second step.

This is the trickiest part of the transformation and involves some functionality that is niche to both Goldsky and Flink v1.17. We’ll start from the inside and work our way out again.

  1. TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])): Similar to the ERC-721 example, we use event_params to access the token_id information. For ERC-1155, the string for batch transfers in element 4 looks like this when decoded: [1 2 3 4 5 6]. We need to trim the leading and trailing [ and ] characters before splitting it out into individual token IDs.
  2. _gs_split_string_by(...): This is a Goldsky UDF which splits strings by the space character only. If you need to split by another character, for now you can use REGEXP_REPLACE(column, ',', ' ') to replace commas with spaces.
  3. CROSS JOIN UNNEST ... AS token_ids (token_id): This works like UNNEST in most other SQL dialects, but is a special case in Flink. It may be confusing that we have two separate CROSS JOINs, but they don’t work like CROSS JOIN in other SQL dialects, we’ll get a single row with a token_id and token_value that map correctly to each other.

Topic Filter for Batch Transfers

raw_log.topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%'

This is the same as the other topic filters but it is using the topic hash of the batch transfer event signature.

Creating an index for each tokenId - amount pair

FROM transfer_batch_logs
    CROSS JOIN UNNEST(
      CAST(
        _gs_generate_series(
          CAST(1 AS BIGINT),
          CAST(COALESCE(CARDINALITY(token_ids), 0) AS BIGINT)
      ) AS ARRAY<INTEGER>
    ) 
    ) AS t (idx)

In this step we generate a series of indexes that we can use to access each individual tokenId - amount pair within a transfer. do this by definining a Goldsky UDF called _gs_generate_series which will generate an array of indexes for as many tokens there are in the batch. We combine this indexes with our existing table and use to access each token - amount pair:

CAST(token_ids[t.idx] AS NUMERIC(78)) as token_id,
CAST(amounts[t.idx] AS NUMERIC(78)) as amount,

ID Primary Key for Batch Transfers

id || '_' || CAST(t.idx AS STRING) AS `id`

We modify the id column for batch transfers. The id coming from the source represents an entire batch transfer event, which can contain multiple tokens, so we concatenate the token_id to the id to make the unnested rows unique.

Combining Single and Batch Transfers

SELECT * FROM erc1155_transfer_single
UNION ALL
SELECT * FROM erc1155_transfer_batch
WHERE amount > 0

This final directive in the third transform creates a combined stream of all single transfers and batch transfers.

Using the Data

With this table in place, you can create views that show you a number of useful pieces of information:

  1. All mints. For ERC-721 and ERC-1155 a mint is identified by being from the sender 0x0000000000000000000000000000000000000000
  2. All current holders of a token, or balances for ERC-1155 holders.

Example balance query:

(SELECT id, block_number, contract_address, recipient as owner_address, token_id, value FROM transfers
   UNION ALL SELECT id, block_number, contract_address, sender as owner_address, token_id, -value FROM transfers)
SELECT contract_address || '_' || owner_address || '_' || CAST(token_id AS TEXT) as id, contract_address, owner_address,
token_id, sum(value) as balance, min(block_number) as first_updated, max(block_number) as last_updated
FROM ledger WHERE owner_address <> '0x0000000000000000000000000000000000000000'
GROUP BY contract_address, token_id, owner_address

Can't find what you're looking for? Reach out to us at support@goldsky.com for help.