Advanced NFT Transfers
Create a table containing ERC-721 and ERC-1155 Transfers for several, or all token contracts.
This guide will cover more advanced topics in transforming our decoded datasets and some of the functionality of the Flink SQL backend.
In other guides we focus on the ELT use-case; here we’ll walk through an ETL example considering Uniswap trades.
What you’ll need
- A basic understanding of the Mirror product’s more basic ETL use case.
- A basic understanding of SQL, though we use the syntax and functionality of Flink v1.17.
- A destination sink to write your data to
Preface
In this example we’ll provide a YAML file that includes the transforms needed to sync token transfers to your database. Several steps are needed to accomplish this and we’ll go into detail about the following subjects:
- We need to differentate ERC-721 (NFT) transfers from ERC-20 token transfers since they have the same event signature in decoded data:
Transfer(address,address,uint256)
- We need to extract and deduplicate ERC-1155 batch transfers.
In this example we’ve included two ERC-721 contract addresses and one ERC-1155 contract address, but you can change these two any token address or addresses of interest for you, or remove the address filters to get all transfers. Be careful though, removing the event filters will sync many millions of rows to your DB
We’ve also included a number of columns that you may or may not need, the main columns needed for most purposes are: id
, address
(if you are syncing multiple contract addresses), sender
, recipient
, token_id
, and value
.
Pipeline YAML
There are two main transforms in this configuration and we’ll go through each one to explain how they work. If you copy and use this configuration file, make sure to update:
- Your
secretName
. If you already created a secret, you can find it via the CLI commandgoldsky secret list
. - The schema and table you want the data written to, by default it writes to
mirror.transfers
. - The contract address or addresses you want to sync.
sources:
- referenceName: ethereum.decoded_logs
version: 1.0.0
type: dataset
startAt: earliest
description: Decoded logs for events emitted from contracts. Contains the
decoded event signature and event parameters, contract address, data,
topics, and metadata for the block and transaction.
transforms:
- type: sql
referenceName: ethereum_721_transfers
primaryKey: id
description: ERC721 Transfers
sql: >-
SELECT
lower(address) AS contract_address,
lower(event_params[1]) AS sender,
lower(event_params[2]) AS recipient,
COALESCE(TRY_CAST(event_params[3] AS NUMERIC), -999) AS token_id,
1 AS `value`,
raw_log.block_number AS block_number,
raw_log.block_hash AS block_hash,
raw_log.log_index AS log_index,
raw_log.transaction_hash AS transaction_hash,
raw_log.transaction_index AS transaction_index,
id
FROM ethereum.decoded_logs WHERE lower(address) IN ('0x22c1f6050e56d2876009903609a2cc3fef83b415', '0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d')
AND raw_log.topics LIKE '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef%'
AND SPLIT_INDEX(raw_log.topics, ',', 3) IS NOT NULL
- type: sql
referenceName: ethereum_1155_transfers
primaryKey: id
description: ERC1155 Transform
sql: >-
SELECT
lower(address) AS contract_address,
lower(event_params[2]) AS sender,
lower(event_params[3]) AS recipient,
COALESCE(TRY_CAST(event_params[4] AS NUMERIC), -999) AS token_id,
COALESCE(TRY_CAST(event_params[5] AS NUMERIC), -999) AS `value`,
raw_log.block_number AS block_number,
raw_log.block_hash AS block_hash,
raw_log.log_index AS log_index,
raw_log.transaction_hash AS transaction_hash,
raw_log.transaction_index AS transaction_index,
id FROM ethereum.decoded_logs
WHERE lower(address) = '0xc36cf0cfcb5d905b8b513860db0cfe63f6cf9f5c'
AND raw_log.topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%'
UNION ALL SELECT
lower(address) AS contract_address,
lower(event_params[2]) AS sender,
lower(event_params[3]) AS recipient,
COALESCE(TRY_CAST(token_ids.token_id AS NUMERIC),-999) AS token_id,
COALESCE(TRY_CAST(`values`.`token_value` AS NUMERIC),0) AS `value`,
raw_log.block_number AS block_number,
raw_log.block_hash AS block_hash,
raw_log.log_index AS log_index,
raw_log.transaction_hash AS transaction_hash,
raw_log.transaction_index AS transaction_index,
id || '_' || token_ids.token_id AS id FROM ethereum.decoded_logs
CROSS JOIN UNNEST(_gs_split_string_by(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])))) AS token_ids (token_id)
CROSS JOIN UNNEST(_gs_split_string_by(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[5])))) AS `values` (token_value)
WHERE raw_log.topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%'
AND lower(address) = '0xc36cf0cfcb5d905b8b513860db0cfe63f6cf9f5c'
sinks:
- type: postgres
table: transfers
schema: mirror
secretName: <YOUR SECRET>
description: Postgres sink for 721 NFT transfers
referenceName: ethereum_721_sink
sourceStreamName: ethereum_721_transfers
- type: postgres
table: transfers
schema: mirror
secretName: <YOUR SECRET>
description: Postgres sink for 1155 transfers
referenceName: ethereum_1155_sink
sourceStreamName: ethereum_1155_transfers
ERC-721 (NFT) Transfers
Let’s first look at NFT transfers. This is the simpler of the two transformations. The main thing we need to do here is to make sure we’re pulling the sender
, recipient
, and token_id
from the event_params
array, and only getting NFT transfers rather than other transfers that may share the same event signature. This isn’t usually a problem when filtering for a specific contract address, but can become one when looking at all contract addresses, or contracts that may implement multiple types of tokens.
We’ll start at the top.
Contract Address
SELECT lower(address) AS contract_address,
We use the lower
function here to lower-case the address to make using this data simpler downstream, we also rename the column to contract_address
to make it more explicit.
Sender
lower(event_params[1]) AS sender,
Here we continue to lower-case values for consistency. In this case we’re using the first element of the event_params
array (using a 1-based index), and renaming it to sender
. Each event parameter maps to an argument to the event_signature
, for ERC-721, they are the sender
, recipient
, and token_id
respectively.
Recipient
lower(event_params[2]) AS recipient,
Like the previous column, we’re pulling the second element in the event_params
array and renaming it to recipient
.
Token ID
COALESCE(TRY_CAST(event_params[3] AS NUMERIC), -999) AS token_id,
Here we introduce a few more SQL functions, we’ll start from the inside and work our way out.
event_params[3]
is the third element of theevent_params
array, and for ERC-721 this is the token ID. Although not covered in this example, since ERC-20 shares the same signature, this element represents a token balance rather than token ID if you’re decoding ERC-20 transfers.TRY_CAST(event_params[3] AS NUMERIC)
is casting the string elementevent_params[3]
toNUMERIC
- token IDs can be as large as an unsigned 256 bit integer, so make sure your database can handle that, if not, you can cast it to a different data type that your sink can handle. We useTRY_CAST
because it will prevent the pipeline from failing in case the cast fails returning aNULL
value instead.COALESCE(TRY_CAST(event_params[3] AS NUMERIC), -999)
:COALESCE
can take an arbitrary number of arguments and returns the first non-NULL value. SinceTRY_CAST
can return aNULL
we’re returning-999
in case it does. This isn’t strictly necessary but is useful to do in case you want to find offending values that were unable to be cast.
Token Value
1 AS `value`,
NFTs are meant to be unique, so they don’t have a value or balance associated with them when transfered, but since we’re combining these transfers with ERC-1155 tokens which do have a value, we need to normalize the columns, so each transfer is treated a transfer of a single NFT with a value of 1
.
Block Metadata
raw_log.block_number AS block_number,
raw_log.block_hash AS block_hash,
raw_log.log_index AS log_index,
raw_log.transaction_hash AS transaction_hash,
raw_log.transaction_index AS transaction_index,
These columns aren’t necessarily needed for this example, but they’re included so you’re aware of them. A complete list of block metadata columns is available here. They are often useful for looking up the transfers in another tool, such as a block explorer like Etherscan.
ID Primary Key
id
This is the Goldsky provided id
, it is a string composed of the dataset name, block hash, and log index, which is unique per event, here’s an example: decoded_log_0x60eaf5a2ab37c73cf1f3bbd32fc17f2709953192b530d75aadc521111f476d6c_18
.
md5(id) as id
in your transform. One reason you may want to keep the existing id
format is that it makes it easier to order events in the same block without also syncing block hash and log index.Address Filter
WHERE lower(address) IN ('0x22c1f6050e56d2876009903609a2cc3fef83b415', '0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d')
Here we’re filtering for two contract addresses. If we want all NFT transfers on the chain we can remove this line entirely.
Topic Filter and Length Check
AND raw_log.topics LIKE '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef%'
AND SPLIT_INDEX(raw_log.topics, ',', 3) IS NOT NULL
As mentioned in the preface, ERC-721 transfers share the same event_signature
as ERC-20 transfers. What differentiates ERC-721 transfers from ERC-20 transfers are the number of topics associated with the event. ERC-721 transfers have four topics, and ERC-20 transfers have three. If you want to get into the nitty gritty you may enjoy the Solidity developer documentation for events, but for now know that in Mirror, raw_log.topics
is a comma separated string. Each value in the string is a hash. The first is the hash of the event_signature
and event_params
, in our case Transfer(address,address,uint256)
for ERC-721, which is hashed to 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef
as seen in our WHERE
clause.
We use LIKE
to only consider the first signature, with a %
at the end, which acts as a wildcard.
We could also use a filter such as event_signature = 'Transfer(address,address,uint256)'
, but we wanted to introduce the idea of topics as they can be a useful filter for some older contracts that may not completely follow the specification for ERC-721 contracts.
SPLIT_INDEX
splits the first argument by the second argument, and then extracts the 0-indexed argument, in this case 3
which would be the fourth element. Here’s an example topic string to consider:
0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef,0x000000000000000000000000441e1e47a6fa2dbfd3cd9b54291e9ab3a58d7975,0x00000000000000000000000097d2e8eeb59e521f10c2e7716eac3dd805ea9a46,0x0000000000000000000000000000000000000000000000000000000000043321
We check that the fourth element after splitting is NOT NULL
to make sure this is an NFT transfer. An ERC-20 transfer would only have three elements when the topics are split, so SPLIT_INDEX
would return NULL
.
ERC-1155 Transfers
ERC-1155 combines the features of ERC-20 and ERC-721 contracts and adds a few features. Each transfer has both a token ID and a value representing the quantity being transfered for funglible tokens, the number 1
for tokens intended to represent NFTs, but how these work depends on how the contract is imlpemented. ERC-1155 also introduces new event signatures for transfers: TransferSingle(address,address,address,uint256,uint256)
and TransferBatch(address,address,address,uint256[],uint256[])
which lets the contract transfer multiple tokens at once to a single recipient. This causes us some trouble since we want one row per transfer in our database, so we’ve got some extra SQL in our transform to deal with this. We won’t cover details that overlap with the ERC-721 example and will focus on the differences for ERC-1155.
Event Parameters for ERC-1155
lower(address) AS contract_address,
lower(event_params[2]) AS sender,
lower(event_params[3]) AS recipient,
COALESCE(TRY_CAST(event_params[4] AS NUMERIC), -999) AS token_id,
COALESCE(TRY_CAST(event_params[5] AS NUMERIC), -999) AS `value`,
Similar to the ERC-721 event_params
we pull out the sender, recipient and token ID, note the indexes we use are different since ERC-1155 tokens have a different event_signature
. We also get a value
column from element five.
ID Primary Key for Batch Transfers
id || '_' || token_ids.token_id AS id
We modify the id
column for batch transfers. The id
coming from the source represents an entire batch transfer event, which can contain multiple tokens, so we concatenate the token_id to the id
to make the unnested rows unique.
Topic Filter for Single Transfers
AND raw_log.topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%'
We filter for a different topic to get ERC-1155 single transfers, the above topic is for the event_signature
TransferSingle(address,address,address,uint256,uint256)
. As with ERC-721, we could use the event signature as a filter instead.
Combining Single and Batch Transfers
UNION ALL SELECT
This directive creates a combined stream of all single transfers and batch transfers.
Array Splitting and Unnesting for Batch Transfers
CROSS JOIN UNNEST(_gs_split_string_by(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])))) AS token_ids (token_id)
CROSS JOIN UNNEST(_gs_split_string_by(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[5])))) AS `values` (token_value)
This is the trickiest part of the transformation and involves some functionality that is niche to both Goldsky and Flink v1.17.
We’ll start from the inside and work our way out again.
TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])
: The string for batch transfers in element 4 looks like this when decoded:[1 2 3 4 5 6]
. We need to trim the leading and trailing[
and]
characters before splitting it out into individual token IDs._gs_split_string_by(...)
: This is a Goldsky UDF which splits strings by the space character only. If you need to split by another character, for now you can useREGEXP_REPLACE(column, ',', ' ')
to replace commas with spaces.CROSS JOIN UNNEST ... AS token_ids (token_id)
: This works likeUNNEST
in most other SQL dialects, but is a special case in Flink. It may be confusing that we have two separate CROSS JOINs, but they don’t work like CROSS JOIN in other SQL dialects, we’ll get a single row with atoken_id
andtoken_value
that map correctly to each other.
Topic Filter for Batch Transfers
raw_log.topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%'
This is the same as the other topic filters, but is using the topic hash of the batch transfer event signature.
Using the Data
With this table in place, you can create views that show you a number of useful pieces of information:
- All mints. For ERC-721 and ERC-1155 a mint is identified by being from the sender
0x0000000000000000000000000000000000000000
- All current holders of a token, or balances for ERC-1155 holders.
Example balance query:
(SELECT id, block_number, contract_address, recipient as owner_address, token_id, value FROM transfers
UNION ALL SELECT id, block_number, contract_address, sender as owner_address, token_id, -value FROM transfers)
SELECT contract_address || '_' || owner_address || '_' || CAST(token_id AS TEXT) as id, contract_address, owner_address,
token_id, sum(value) as balance, min(block_number) as first_updated, max(block_number) as last_updated
FROM ledger WHERE owner_address <> '0x0000000000000000000000000000000000000000'
GROUP BY contract_address, token_id, owner_address
Can't find what you're looking for? Reach out to us at support@goldsky.com for help.
Was this page helpful?