Let's see how this works in practice. Imagine we want to iterate over all of the owners of the Bored Ape Yacht Club NFT collection. There are 10000 NFTs in the collection, so we could write the below query that would return all 10k NFTs with their owners in one call:
import { SpiceClient } from"@spiceai/spice";constspiceClient=newSpiceClient(process.env.API_KEY);constquery=`SELECT token_id, owner FROM eth.nft_owners WHERE token_address = '0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d'ORDER BY CAST(token_id AS NUMERIC)`constallBaycOwners=awaitspiceClient.query(query)allBaycOwners.toArray().forEach((row) => {processNFTOwner(row.toJSON())});
The result is that we call the processNFTOwner function 10k times, but we have to wait for all of the data to arrive before we can even begin the processing.
We can do better by processing the NFT owners as the results are streamed to the SDK. To take advantage of this, simply move the logic to process the data in the onData callback instead of the result of the query API. Rewriting the above code in this format looks like:
import { SpiceClient } from"@spiceai/spice";constspiceClient=newSpiceClient(process.env.API_KEY);constquery=`SELECT token_id, owner FROM eth.nft_owners WHERE token_address = '0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d'ORDER BY CAST(token_id AS NUMERIC)`awaitspiceClient.query(query, (partialData) => {partialData.toArray().forEach((row) => {processNFTOwner(row.toJSON()) });})
This will yield the same result as before: processNFTOwner will be called 10k times, but it can start processing the data earlier.
To demonstrate the effect of streaming the data, the following snippet keeps track of all the seen data and writes out how much of dataset it has processed.
import { SpiceClient } from"@spiceai/spice"constAPI_KEY=process.env['API_KEY']// Retrieve all Bored Ape Yacht Club owners in order by token_idconstquery=`SELECT token_id, owner FROM eth.nft_owners WHERE token_address = '0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d'ORDER BY CAST(token_id AS NUMERIC)`;let baycOwners = {};constprocessNFTOwner= (row) => {// Some processing of the NFT Owner. baycOwners[row.token_id] =row.owner;};constspiceClient=newSpiceClient(API_KEY);awaitspiceClient.query(query, (data) => {data.toArray().forEach((row) => {processNFTOwner(row.toJSON()); });console.log("Current size",Object.keys(baycOwners).length);});
Run this snippet yourself to see how it works using Replit. (Click the Fork Repl button)