Create a table containing ERC-1155 Transfers for several, or all token contracts.
ERC-1155 is a standard for EVM ecosystems that allows for the creation of both fungible and non-fungible assets within a single contract. The process of transferring ERC-1155 tokens into a database is fundamental, unlocking opportunities for data analysis, tracking, and the development of innovative solutions.
This guide is part of a series of tutorials on how you can stream transfer data into your datawarehouse using Mirror pipelines. Here we will be focusing on ERC-1155 Transfers, visit the following two other guides for other types of Transfers:
Use the readily available ERC-1155 dataset for the chain you are interested in: this is the easiest and quickest method to get you streaming token transfers into your sink of choice with minimum code.
Build the ERC-1155 Transfers pipeline from scratch using raw or decoded logs: this method takes more code and time to implement but it’s a great way to learn about how you can use decoding functions in case you
want to build more customized pipelines.
Every EVM chain has its own ERC-1155 dataset available for you to use as source in your pipelines. You can check this by running the goldsky dataset list command and finding the EVM of your choice.
For this example, let’s use apex chain and create a simple pipeline definition using its ERC-20 dataset that writes the data into a PostgreSQL instance:
The schema and table you want the data written to, by default it writes to public.apex_erc1155_transfers.
If you are use ClickHouse as a sink for this dataset you’ll need to add the following schema_override to avoid potential data precision errors for big numbers, see example:
Building ERC-1155 Transfers from scratch using logs
In the previous method we just explored, the ERC-20 datasets that we used as source to the pipeline encapsulates all the decoding logic that’s explained in this section.
Read on if you are interested in learning how it’s implemented in case you want to consider extending or modifying this logic yourself.
There are two ways that we can go about building these token transfers pipeline from scratch:
Use the raw_logs Direct Indexing dataset for that chain in combination with Decoding Transform Functions using the ABI of a specific ERC-1155 Contract.
Use the decoded_logs Direct Indexing dataset for that chain in which the decoding process has already been done by Goldsky. This is only available for certain chains as you can check in this list.
We’ll primarily focus on the first decoding method using raw_logs and decoding functions as it’s the default and most used way of decoding; we’ll also present an example using decoded_logs and highlight the differences between the two.
Building ERC-1155 Tranfers using Decoding Transform Functions
In this example, we will stream all the Transfer events of all the ERC-1155 tokens for the Scroll chain. To that end, we will dinamically fetch the ABI of the Rubyscore_Scroll token from the Scrollscan API (available here)
and use it to identify all the same events for the tokens in the chain. We have decided to use the ABI of this token for this example but any other ERC-1155 compliant token would also work.
ERC-1155 combines the features of ERC-20 and ERC-721 contracts and adds a few features.
Each transfer has both a token ID and a value representing the quantity being transfered for funglible tokens, the number 1 for tokens intended to represent NFTs, but how these work depends on how the contract is implemented.
ERC-1155 also introduces new event signatures for transfers: TransferSingle(address,address,address,uint256,uint256) and TransferBatch(address,address,address,uint256[],uint256[]) which lets the contract transfer multiple tokens at once to a single recipient.
This causes us some trouble since we want one row per transfer in our database, so we’ll need some extra SQL logic in our pipeline to deal with this. To mitigate this complexity we have created two different transforms, each dealing with Single and Batch transfers separately.
We then aggregate both tables into a single view using a third transform.
Let’s now see all these concepts applied in an example pipeline definition:
name: scroll-erc1155-transfersapiVersion: 3sources: my_scroll_mainnet_raw_logs: type: dataset dataset_name: scroll_mainnet.raw_logs version: 1.0.0transforms: scroll_decoded: primary_key: id # Fetch the ABI from scrollscan for Rubyscore_Scroll token sql: > SELECT *, _gs_log_decode( _gs_fetch_abi('https://api.scrollscan.com/api?module=contract&action=getabi&address=0xdc3d8318fbaec2de49281843f5bba22e78338146', 'etherscan'), `topics`, `data` ) AS `decoded` FROM my_scroll_mainnet_raw_logs scroll_clean: primary_key: id # Clean up the previous transform, unnest the values from the `decoded` object sql: > SELECT *, decoded.event_params AS `event_params`, decoded.event_signature AS `event_name` FROM scroll_decoded WHERE decoded IS NOT NULL erc1155_transfer_single: primary_key: id sql: > SELECT id, address AS contract_address, lower(event_params[2]) AS sender, lower(event_params[3]) AS recipient, COALESCE(TRY_CAST(event_params[4] AS DECIMAL(78)), 0) AS token_id, COALESCE(TRY_CAST(event_params[5] AS DECIMAL(78)), 0) AS amount, block_number, block_hash, log_index, transaction_hash, transaction_index FROM scroll_clean WHERE topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%' erc1155_transfer_batch: primary_key: id sql: > WITH transfer_batch_logs AS ( SELECT *, _gs_split_string_by( REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])), ',', ' ') ) AS token_ids, _gs_split_string_by( REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[5])), ',', ' ') ) AS amounts FROM scroll_clean WHERE topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%' ) SELECT id || '_' || CAST(t.idx AS STRING) AS `id`, address AS contract_address, lower(event_params[2]) AS sender, lower(event_params[3]) AS recipient, COALESCE(TRY_CAST(token_ids[t.idx] AS DECIMAL(78)),0) AS token_id, COALESCE(TRY_CAST(amounts[t.idx] AS DECIMAL(78)),0) AS amount, block_number, block_hash, log_index, transaction_hash, transaction_index FROM transfer_batch_logs CROSS JOIN UNNEST( CAST( _gs_generate_series( CAST(1 AS BIGINT), CAST(COALESCE(CARDINALITY(token_ids), 0) AS BIGINT) ) AS ARRAY<INTEGER> ) ) AS t (idx) scroll_1155_transfers: primary_key: id sql: > SELECT * FROM erc1155_transfer_single UNION ALL SELECT * FROM erc1155_transfer_batch WHERE amount > 0sinks: scroll_1155_sink: type: postgres table: erc1155_transfers schema: mirror secret_name: <YOUR_SECRET> description: Postgres sink for ERC1155 transfers from: scroll_1155_transfers
scroll-erc1155-transfers.yaml
Copy
name: scroll-erc1155-transfersapiVersion: 3sources: my_scroll_mainnet_raw_logs: type: dataset dataset_name: scroll_mainnet.raw_logs version: 1.0.0transforms: scroll_decoded: primary_key: id # Fetch the ABI from scrollscan for Rubyscore_Scroll token sql: > SELECT *, _gs_log_decode( _gs_fetch_abi('https://api.scrollscan.com/api?module=contract&action=getabi&address=0xdc3d8318fbaec2de49281843f5bba22e78338146', 'etherscan'), `topics`, `data` ) AS `decoded` FROM my_scroll_mainnet_raw_logs scroll_clean: primary_key: id # Clean up the previous transform, unnest the values from the `decoded` object sql: > SELECT *, decoded.event_params AS `event_params`, decoded.event_signature AS `event_name` FROM scroll_decoded WHERE decoded IS NOT NULL erc1155_transfer_single: primary_key: id sql: > SELECT id, address AS contract_address, lower(event_params[2]) AS sender, lower(event_params[3]) AS recipient, COALESCE(TRY_CAST(event_params[4] AS DECIMAL(78)), 0) AS token_id, COALESCE(TRY_CAST(event_params[5] AS DECIMAL(78)), 0) AS amount, block_number, block_hash, log_index, transaction_hash, transaction_index FROM scroll_clean WHERE topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%' erc1155_transfer_batch: primary_key: id sql: > WITH transfer_batch_logs AS ( SELECT *, _gs_split_string_by( REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])), ',', ' ') ) AS token_ids, _gs_split_string_by( REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[5])), ',', ' ') ) AS amounts FROM scroll_clean WHERE topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%' ) SELECT id || '_' || CAST(t.idx AS STRING) AS `id`, address AS contract_address, lower(event_params[2]) AS sender, lower(event_params[3]) AS recipient, COALESCE(TRY_CAST(token_ids[t.idx] AS DECIMAL(78)),0) AS token_id, COALESCE(TRY_CAST(amounts[t.idx] AS DECIMAL(78)),0) AS amount, block_number, block_hash, log_index, transaction_hash, transaction_index FROM transfer_batch_logs CROSS JOIN UNNEST( CAST( _gs_generate_series( CAST(1 AS BIGINT), CAST(COALESCE(CARDINALITY(token_ids), 0) AS BIGINT) ) AS ARRAY<INTEGER> ) ) AS t (idx) scroll_1155_transfers: primary_key: id sql: > SELECT * FROM erc1155_transfer_single UNION ALL SELECT * FROM erc1155_transfer_batch WHERE amount > 0sinks: scroll_1155_sink: type: postgres table: erc1155_transfers schema: mirror secret_name: <YOUR_SECRET> description: Postgres sink for ERC1155 transfers from: scroll_1155_transfers
scroll-erc1155-transfers.yaml
Copy
sources: - type: dataset referenceName: scroll_mainnet.raw_logs version: 1.0.0transforms: - referenceName: scroll_decoded type: sql primaryKey: id # Fetch the ABI from scrollscan for Rubyscore_Scroll token sql: > SELECT *, _gs_log_decode( _gs_fetch_abi('https://api.scrollscan.com/api?module=contract&action=getabi&address=0xdc3d8318fbaec2de49281843f5bba22e78338146', 'etherscan'), `topics`, `data` ) AS `decoded` FROM scroll_mainnet.raw_logs - referenceName: scroll_clean primaryKey: id type: sql # Clean up the previous transform, unnest the values from the `decoded` object sql: > SELECT *, decoded.event_params AS `event_params`, decoded.event_signature AS `event_name` FROM scroll_decoded WHERE decoded IS NOT NULL - referenceName: erc1155_transfer_single primaryKey: id type: sql sql: > SELECT id, address AS contract_address, lower(event_params[2]) AS sender, lower(event_params[3]) AS recipient, COALESCE(TRY_CAST(event_params[4] AS NUMERIC), -999) AS token_id, COALESCE(TRY_CAST(event_params[5] AS NUMERIC), -999) AS amount, block_number, block_hash, log_index, transaction_hash, transaction_index FROM scroll_clean WHERE topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%' - referenceName: erc1155_transfer_batch primaryKey: id type: sql sql: > WITH transfer_batch_logs AS ( SELECT *, _gs_split_string_by( REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])), ',', ' ') ) AS token_ids, _gs_split_string_by( REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[5])), ',', ' ') ) AS amounts FROM scroll_clean WHERE topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%' ) SELECT id || '_' || CAST(t.idx AS STRING) AS `id`, address AS contract_address, lower(event_params[2]) AS sender, lower(event_params[3]) AS recipient, CAST(token_ids[t.idx] AS NUMERIC(78)) as token_id, CAST(amounts[t.idx] AS NUMERIC(78)) as amount, block_number, block_hash, log_index, transaction_hash, transaction_index FROM transfer_batch_logs CROSS JOIN UNNEST( CAST( _gs_generate_series( CAST(1 AS BIGINT), CAST(COALESCE(CARDINALITY(token_ids), 0) AS BIGINT) ) AS ARRAY<INTEGER> ) ) AS t (idx) - type: sql referenceName: scroll_1155_transfers primaryKey: id sql: > SELECT * FROM erc1155_transfer_single UNION ALL SELECT * FROM erc1155_transfer_batch WHERE amount > 0sinks: - type: postgres table: erc1155_transfers schema: mirror secretName: <YOUR_SECRET> description: Postgres sink for ERC1155 transfers referenceName: scroll_1155_sink sourceStreamName: scroll_1155_transfers
If you copy and use this configuration file, make sure to update:
The schema and table you want the data written to, by default it writes to mirror.erc1155_transfers.
There are 5 transforms in this pipeline definition which we’ll explain how they work. We’ll start from the top:
Decoding Transforms
Transform: scroll_decoded
Copy
SELECT *, _gs_log_decode( _gs_fetch_abi('https://api.scrollscan.com/api?module=contract&action=getabi&address=0xc7d86908ccf644db7c69437d5852cedbc1ad3f69', 'etherscan'), `topics`, `data` ) AS `decoded` FROM scroll_mainnet.raw_logs
As explained in the Decoding Contract Events guide we first make use of the _gs_fetch_abi function to get the ABI from Scrollscan and pass it as first argument
to the function _gs_log_decode to decode its topics and data. We store the result in a decodedROW which we unnest on the next transform.
Transform: scroll_clean
Copy
SELECT *, decoded.event_params AS `event_params`, decoded.event_signature AS `event_name` FROM scroll_decoded WHERE decoded IS NOT NULL
In this second transform, we take the event_params and event_signature from the result of the decoding. We then filter the query on decoded IS NOT NULL to leave out potential null results from the decoder.
SELECT id, address AS contract_address, lower(event_params[2]) AS sender, lower(event_params[3]) AS recipient, COALESCE(TRY_CAST(event_params[4] AS DECIMAL(78)), 0) AS token_id, COALESCE(TRY_CAST(event_params[5] AS DECIMAL(78)), 0) AS amount, block_number, block_hash, log_index, transaction_hash, transaction_index FROM scroll_clean WHERE topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%'
In this transform we focus on SingleTransfer events.
Similar to the ERC-721 example, we use event_params we pull out the sender, recipient and token ID, note the indexes we use are different since ERC-1155 tokens have a different event_signature.
Copy
COALESCE(TRY_CAST(event_params[4] AS DECIMAL(78)), 0) AS token_id,COALESCE(TRY_CAST(event_params[5] AS DECIMAL(78)), 0) AS amount,
event_params[4] is the fourth element of the event_params array, and for ERC-1155 this is the token ID.
TRY_CAST(event_params[4] AS NUMERIC) is casting the string element event_params[4] to NUMERIC - token IDs can be as large as an unsigned 256 bit integer, so make sure your database can handle that, if not, you can cast it to a different data type that your sink can handle. We use TRY_CAST because it will prevent the pipeline from failing in case the cast fails returning a NULL value instead.
COALESCE(TRY_CAST(event_params[4] AS DECIMAL(78)), 0): COALESCE can take an arbitrary number of arguments and returns the first non-NULL value. Since TRY_CAST can return a NULL we’re returning 0 in case it does. This isn’t strictly necessary but is useful to do in case you want to find offending values that were unable to be cast.
We repeat this process for event_params[5] which represents the amount of a token.
Copy
AND raw_log.topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%'
We filter for a specific topic to get ERC-1155 single transfers, the above topic is for the event_signatureTransferSingle(address,address,address,uint256,uint256). As with ERC-721, we could use the event signature as a filter instead.
BatchTransfers Transform
Now, let’s look at the BatchTransfer events:
erc1155_transfer_single (subquery)
Copy
WITH transfer_batch_logs AS ( SELECT *, _gs_split_string_by( REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])), ',', ' ') ) AS token_ids, _gs_split_string_by( REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[5])), ',', ' ') ) AS amounts FROM ethereum.decoded_logs WHERE scroll_clean LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%' )
The first thing we want to achieve is to decompose the string representation of tokens and their respective amounts into separate rows that we
can add as columns to each transaction. This will allow us to index on tokenId-amount pairs much more easily as a second step.
This is the trickiest part of the transformation and involves some functionality that is niche to both Goldsky and Flink v1.17.
We’ll start from the inside and work our way out again.
TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])): Similar to the ERC-721 example,
we use event_params to access the token_id information. For ERC-1155, the string for batch transfers in element 4 looks like this when decoded: [1 2 3 4 5 6]. We need to trim the leading and trailing [ and ] characters before splitting it out into individual token IDs.
_gs_split_string_by(...): This is a Goldsky UDF which splits strings by the space character only. If you need to split by another character, for now you can use REGEXP_REPLACE(column, ',', ' ') to replace commas with spaces.
CROSS JOIN UNNEST ... AS token_ids (token_id): This works like UNNEST in most other SQL dialects, but is a special case in Flink. It may be confusing that we have two separate CROSS JOINs, but they don’t work like CROSS JOIN in other SQL dialects, we’ll get a single row with a token_id and token_value that map correctly to each other.
Lastly, we filter on topic:
Copy
raw_log.topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%'
This is the same as the other topic filters but it is using the topic hash of the batch transfer event signature.
Next, onto creating an index for each tokenId - amount pair:
erc1155_transfer_single (time series)
Copy
FROM transfer_batch_logs CROSS JOIN UNNEST( CAST( _gs_generate_series( CAST(1 AS BIGINT), CAST(COALESCE(CARDINALITY(token_ids), 0) AS BIGINT) ) AS ARRAY<INTEGER> ) ) AS t (idx)
In this step we generate a series of indexes that we can use to access each individual tokenId - amount pair within a transfer.
We do this by definining a Goldsky UDF called _gs_generate_series which will generate an array of indexes for as many tokens there are in the batch.
We combine this indexes with our existing table and use to access each token - amount pair:
Copy
CAST(token_ids[t.idx] AS NUMERIC(78)) as token_id,CAST(amounts[t.idx] AS NUMERIC(78)) as amount,
We also use this logic to generate the resulting ID Primary Key for batch transfers:
Copy
id || '_' || CAST(t.idx AS STRING) AS `id`
The id coming from the source represents an entire batch transfer event, which can contain multiple tokens, so we concatenate the token_id to the id to make the unnested rows unique.
Our last step is to deploy this pipeline and start sinking ERC-1155 transfer data into our database. Assuming we are using the same file name for the pipeline configuration as in this example,
we can use the CLI pipeline create command like this:
This concludes our successful deployment of a Mirror pipeline streaming ERC-1155 Tokens from Scroll chain into our database using inline decoders. Congrats! 🎉
As explained in the Introduction, Goldsky provides decoded datasets for Raw Logs and Raw Traces for a number of different chains. You can check this list to see if the chain you are interested in has these decoded datasets.
In these cases, there is no need for us to run Decoding Transform Functions as the dataset itself will already contain the event signature and event params decoded.
Click on the button below to see an example pipeline definition for streaming ERC-1155 tokens on the Ethereum chain using the decoded_logs dataset.
ethereum-decoded-logs-erc1155-transfers.yaml
Copy
sources: - referenceName: ethereum.decoded_logs version: 1.0.0 type: dataset startAt: earliest description: Decoded logs for events emitted from contracts. Contains the decoded event signature and event parameters, contract address, data, topics, and metadata for the block and transaction.transforms: - type: sql referenceName: erc1155_transfer_single primaryKey: id sql: >- SELECT address AS contract_address, lower(event_params[2]) AS sender, lower(event_params[3]) AS recipient, COALESCE(TRY_CAST(event_params[4] AS DECIMAL(78)), 0) AS token_id, COALESCE(TRY_CAST(event_params[5] AS DECIMAL(78)), 0) AS amount, raw_log.block_number AS block_number, raw_log.block_hash AS block_hash, raw_log.log_index AS log_index, raw_log.transaction_hash AS transaction_hash, raw_log.transaction_index AS transaction_index, id FROM ethereum.decoded_logs WHERE raw_log.topics LIKE '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62%' AND address = '0xc36cf0cfcb5d905b8b513860db0cfe63f6cf9f5c' - type: sql referenceName: erc1155_transfer_batch primaryKey: id description: ERC1155 Transform sql: >- WITH transfer_batch_logs AS ( SELECT *, _gs_split_string_by( REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[4])), ',', ' ') ) AS token_ids, _gs_split_string_by( REPLACE(TRIM(LEADING '[' FROM TRIM(TRAILING ']' FROM event_params[5])), ',', ' ') ) AS amounts FROM ethereum.decoded_logs WHERE raw_log.topics LIKE '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb%' AND address = '0xc36cf0cfcb5d905b8b513860db0cfe63f6cf9f5c' ) SELECT address AS contract_address, lower(event_params[2]) AS sender, lower(event_params[3]) AS recipient, CAST(token_ids[t.idx] AS NUMERIC(78)) as token_id, CAST(amounts[t.idx] AS NUMERIC(78)) as amount, raw_log.block_number AS block_number, raw_log.block_hash AS block_hash, raw_log.log_index AS log_index, raw_log.transaction_hash AS transaction_hash, raw_log.transaction_index AS transaction_index, id || '_' || CAST(t.idx AS STRING) AS `id` FROM transfer_batch_logs CROSS JOIN UNNEST( CAST( _gs_generate_series( CAST(1 AS BIGINT), CAST(COALESCE(CARDINALITY(token_ids), 0) AS BIGINT) ) AS ARRAY<INTEGER> ) ) AS t (idx) - type: sql referenceName: ethereum_1155_transfers primaryKey: id sql: SELECT * FROM erc1155_transfer_single UNION ALL SELECT * FROM erc1155_transfer_batch WHERE amount > 0sinks: - type: postgres table: erc1155_transfers schema: mirror secretName: <YOUR_SECRET> description: Postgres sink for 1155 transfers referenceName: transfers sourceStreamName: ethereum_1155_transfers
If you copy and use this configuration file, make sure to update:
The schema and table you want the data written to, by default it writes to mirror.erc1155_transfers.
You can appreciate that it’s pretty similar to the inline decoding pipeline method but here we simply create a transform which does the filtering based on the raw_log.topics just as we did on the previous method.
Assuming we are using the same filename for the pipeline configuration as in this example we can deploy this pipeline with the CLI pipeline create command:
In this guide, we have learnt how Mirror simplifies streaming ERC-1155 Transfer events into your database.
We have first looked into the easy way of achieving this, simply by making use of the readily available ERC-1155 dataset of the EVM chaina and using its as the source to our pipeline.
We have deep dived into the standard decoding method using Decoding Transform Functions, implementing an example on Scroll chain.
We have also looked into an example implementation using the decoded_logs dataset for Ethereum. Both are great decoding methods and depending on your use case and dataset availability you might prefer one over the other.
With Mirror, developers gain flexibility and efficiency in integrating blockchain data, opening up new possibilities for applications and insights. Experience the transformative power of Mirror today and redefine your approach to blockchain data integration.
Can't find what you're looking for? Reach out to us at support@goldsky.com for help.