A worker is the fundamental unit of a Node, encompassing a set of logics to index, structure, and store Open Information.
While many workers have already been implmented (see supported workers ), you can extend its capabilities by contributing new workers.
RSS3 Node is open-source and conforms to the RSS3 Protocol .
You can find the Node's source code here .
Two repositories are critical to the develop of the Node:
Protocol-Go : A Go implementation of the RSS3 Protocol .
Node : The RSS3 Data Sublayer (DSL) component that is responsible for indexing, transforming, storing, and ultimately serving the Open Information to the end users.
Protocol-Go is a Go implement of the RSS3 Protocol.
Popular networks like Ethereum, Arweave, and Near are already supported, along with various structures and schemas.
If your contribution does not involve adding a new network or defining a new structure, you can simply use the latest Protocol-Go
version in Node (go get github.com/rss3-network/protocol-go
).
The repository implements core logics for indexing, transforming, storing, and serving Open Information to end users.
It includes these the following main components:
Engine
Protocol : The indexing logic for specific networks.
Worker : The transforming logic for specific dApps (the focus of this tutorial).
Indexer : Handles data indexing, transforming, and storing.
Core API : Serves Node APIs, including decentralized, federated, and RSS data querying logics, as well as Node information APIs for operators.
Broadcaster : Registers nodes to Global Indexers (GIs).
Learn more here .
Monitor : Monitors data indexing progress and prunes outdated data.
Don't worry if this seems complex!
The worker component is designed as a standalone module within the engine.
You can integrate and test a new worker by modifying just a few files related to data transformation, without dealing with data indexing or storage logic.
Node is written in Go, so basic knowledge of Go is recommended.
Familiarity with SQL, Docker, Git, and Linux command line is also beneficial.
You'll need to have Go and Git installed on your machine.
git clone https://github.com/RSS3-Network/Node.git
cd Node
For GoLand users, dependencies are automatically managed upon launching.
Users of other Integrated Development Environments (IDEs) may need to install dependencies manually via the following command:
The project structure is organized as follows:
.
├── cmd
│ └── main.go
├── common
│ ├── http
│ └── sync
├── config
│ ├── flag
│ ├── parameter
│ ├── config.go
│ └── config_test.go
├── deploy
│ ├── default
│ ├── min
│ ├── README.md
│ └── config.example.yaml
├── docs
│ ├── openapi.go
│ └── openapi.json
├── internal
│ ├── constant
│ ├── database
│ ├── engine
│ ├── node
│ ├── stream
│ └── telemetry
├── provider
│ ├── activitypub
│ ├── arweave
│ ├── ethereum
│ ├── farcaster
│ ├── httpx
│ ├── ipfs
│ ├── near
│ ├── redis
│ └── telemetry
├── schema
│ └── worker
├── tool
│ └── testcase
├── CODEOWNERS
├── Dockerfile
├── LICENSE
├── Makefile
├── README.md
├── go.mod
└── go.sum
cmd
: the entrypoint for the application, execute go run cmd/main.go
to launch.
common
: utility functions and shared components.
config
: configuration files and related logic.
deploy
: deployment-specific files and instructions.
docs
: OpenAPI documentation.
internal
: core Node components, such as the engine, database, core API, and telemetry.
provider
: custom data providers for various networks, IPFS, Redis, and other services.
schema
: data structures and schemas for workers.
tool
: test generation utilities for workers.
Additional files located at the root level serve various purposes.
In the internal/engine/worker
directory, you'll find three subdirectories:
.
├── decentralized/
├── federated/
├── rss/
Node currently supports three distinct categories of workers:
For integrating new decentralized applications (dApps), implement the transformation logic in the decentralized/contract
directory.
When contributing a fediverse platform, develop the transformation logic in the federated/core/<fediverse_name>
directory.
For RSS workers, implement the logic in the rss/core/<rss_app_name>
directory.
The standard structure for a worker directory should look like this:
.
├── worker.go
├── worker_test.go
This guide explains how to define a worker schema in the schema/worker
directory, specifically for decentralized workers.
First, extend the Worker
enum in worker.go
:
//go:generate go run --mod=mod github.com/dmarkham/enumer@v1.5.9 --values --type=Worker --linecomment --output worker_string.go --json --yaml --sql
type Worker int
const (
Aave Worker = iota + 1 // aave
// ...other workers
Zerion // zerion
)
Generate the corresponding string utilities by running the generate command:
go:generate go run --mod=mod github.com/dmarkham/enumer@v1.5.9 --values --type=Worker --linecomment --output worker_string.go --json --yaml --sql
Update the Tag
mapping in the ToTagsMap
:
// ToTagsMap is a map of worker to tags
var ToTagsMap = map [ Worker ][] tag . Tag {
Aave: {tag.Exchange},
// ...other tag maps
Zerion: {tag.Exchange, tag.Transaction},
}
Add worker platform to the Platform
enum:
//go:generate go run --mod=mod github.com/dmarkham/enumer@v1.5.9 --values --type=Platform --linecomment --output platform_string.go --json --yaml --sql
type Platform uint64
const (
PlatformUnknown Platform = iota // Unknown
// ...other platforms
PlatformZerion // Zerion
)
Generate the platform string utilities similarly.
Finally, map the worker to its corresponding platform in ToPlatformMap
:
// ToPlatformMap is a map of worker to platform
var ToPlatformMap = map [ Worker ] Platform {
Aave: PlatformAAVE,
// ...other platform maps
Zerion: PlatformZerion,
}
Before integrating a new dApp, it is essential to fully understand its functionality.
We highly recommend carefully reviewing the project’s documentation and smart contracts to identify key elements, including contract addresses, ABIs, and critical event hashes.
For instance, Zerion's Router
contract ABI, transactions, and event logs can be examined at Optimism Scan .
It then sets up all contract addresses, event hashes, and the Go SDK for the target contract in the provider/ethereum/contract
directory.
A typical Ethereum contract directory structure is as follows:
.
├── contract.go
├── Router.abi
├── router.go
contract.go
sets the contract address and event hash:
//go:generate go run --mod=mod github.com/ethereum/go-ethereum/cmd/abigen --abi ./Router.abi --pkg zerion --type Router --out router.go
var (
AddressRouter = common. HexToAddress ( "0xd7F1Dd5D49206349CaE8b585fcB0Ce3D96f1696F" )
AddressNativeToken = common. HexToAddress ( "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE" )
EventHashExecuted = contract. EventHash ( "Executed(address,uint256,uint256,address,uint256,uint256,uint256,uint256,(uint8,(uint256,address),(uint256,address),address,address,bytes),address)" )
)
And you can execute the following command in the provider/ethereum/contract
directory to generate a router.go
file automatically:
go:generate go run --mod=mod github.com/ethereum/go-ethereum/cmd/abigen --abi ./Router.abi --pkg zerion --type Router --out router.go
All these packages will be imported in the worker.go
.
Here is a simple example of an EVM-based swap worker:
package zerion
import (
" context "
" fmt "
" math/big "
" github.com/ethereum/go-ethereum/common "
" github.com/rss3-network/node/config "
" github.com/rss3-network/node/internal/engine "
source " github.com/rss3-network/node/internal/engine/protocol/ethereum "
" github.com/rss3-network/node/provider/ethereum "
" github.com/rss3-network/node/provider/ethereum/contract "
" github.com/rss3-network/node/provider/ethereum/contract/erc20 "
" github.com/rss3-network/node/provider/ethereum/contract/zerion "
" github.com/rss3-network/node/provider/ethereum/token "
" github.com/rss3-network/node/schema/worker/decentralized "
" github.com/rss3-network/protocol-go/schema "
activityx " github.com/rss3-network/protocol-go/schema/activity "
" github.com/rss3-network/protocol-go/schema/metadata "
" github.com/rss3-network/protocol-go/schema/network "
" github.com/rss3-network/protocol-go/schema/tag "
" github.com/rss3-network/protocol-go/schema/typex "
" github.com/samber/lo "
" github.com/shopspring/decimal "
" go.uber.org/zap "
)
var _ engine . Worker = ( * worker )( nil )
// worker represents the Zerion worker, which processes Zerion-related transactions.
type worker struct {
config * config . Module
ethereumClient ethereum . Client
tokenClient token . Client
erc20Filterer * erc20 . ERC20Filterer
routerFilterer * zerion . RouterFilterer
}
// Name returns the name of the worker.
func ( w * worker ) Name () string {
return decentralized.Zerion. String ()
}
// Platform returns the platform name for the Zerion worker.
func ( w * worker ) Platform () string {
return decentralized.PlatformZerion. String ()
}
// Network returns the list of networks supported by the Zerion worker.
func ( w * worker ) Network () [] network . Network {
return [] network . Network {
network.BinanceSmartChain,
network.Polygon,
network.Arbitrum,
network.Avalanche,
network.Optimism,
network.Linea,
network.Gnosis,
network.XLayer,
network.Base,
}
}
// Tags returns the list of tags associated with the Zerion worker.
func ( w * worker ) Tags () [] tag . Tag {
return [] tag . Tag {
tag.Exchange,
tag.Transaction,
}
}
// Types returns the list of activity types that the Zerion worker can process.
func ( w * worker ) Types () [] schema . Type {
return [] schema . Type {
typex.ExchangeSwap,
typex.TransactionTransfer,
}
}
// Filter returns the data protocol filter for the Zerion worker.
func ( w * worker ) Filter () engine . DataSourceFilter {
return & source . Filter {
LogAddresses: [] common . Address {
zerion.AddressRouter,
},
LogTopics: [] common . Hash {
zerion.EventHashExecuted,
},
}
}
// Transform processes the input task and transforms it into an activity.
func ( w * worker ) Transform ( ctx context . Context , task engine . Task ) ( * activityx . Activity , error ) {
ethereumTask, ok := task.( * source . Task )
if ! ok {
return nil , fmt. Errorf ( "invalid task type %T " , task)
}
activity, err := ethereumTask. BuildActivity (activityx. WithActivityPlatform (w. Platform ()))
if err != nil {
return nil , fmt. Errorf ( "build activity: %w " , err)
}
// Iterate through all logs in the transaction receipt
for _, log := range ethereumTask.Receipt.Logs {
if len (log.Topics) == 0 {
continue
}
switch {
case w. matchSwapLog (ethereumTask, log):
actions, err := w. transformSwapLog (ctx, ethereumTask, log)
if err != nil {
zap. L (). Warn ( "handle settlement trade log" , zap. Error (err), zap. String ( "worker" , w. Name ()), zap. String ( "task" , ethereumTask. ID ()))
continue
}
activity.Actions = append (activity.Actions, actions ... )
default :
zap. L (). Debug ( "unsupported log" , zap. String ( "worker" , w. Name ()), zap. String ( "task" , ethereumTask. ID ()), zap. Stringer ( "topic" , log.Topics[ 0 ]))
}
}
if len (activity.Actions) == 0 {
return nil , fmt. Errorf ( "no actions" )
}
zap. L (). Info ( "Processing task" , zap. Any ( "task" , ethereumTask))
zap. L (). Info ( "activity is: " , zap. Any ( "activity" , activity))
activity.Type = typex.ExchangeSwap
return activity, nil
}
// matchSwapLog checks if the given log matches a Zerion swap event.
func ( w * worker ) matchSwapLog ( _ * source . Task , log * ethereum . Log ) bool {
return contract. MatchEventHashes (log.Topics[ 0 ], zerion.EventHashExecuted) &&
contract. MatchAddresses (log.Address, zerion.AddressRouter)
}
// transformSwapLog transforms a Zerion swap log into a list of actions.
func ( w * worker ) transformSwapLog ( ctx context . Context , task * source . Task , log * ethereum . Log ) ([] * activityx . Action , error ) {
event, err := w.routerFilterer. ParseExecuted (log. Export ())
if err != nil {
return nil , fmt. Errorf ( "parse Executed event: %w " , err)
}
var actions [] * activityx . Action
// Handle protocol fee if present
if event.ProtocolFeeAmount. Sign () == 1 {
feeAction, err := w. buildTransactionTransferAction (ctx, task, event.Sender, zerion.AddressRouter, lo. Ternary (event.OutputToken == zerion.AddressNativeToken, nil , & event.OutputToken), event.ProtocolFeeAmount)
if err != nil {
return nil , fmt. Errorf ( "build transaction transfer action for fee: %w " , err)
}
actions = append (actions, feeAction)
}
// Build the main swap action
swapAction, err := w. buildExchangeSwapAction (ctx, task, event.Sender, event.Sender, event.InputToken, event.OutputToken, event.AbsoluteInputAmount, event.ReturnedAmount)
if err != nil {
return nil , fmt. Errorf ( "build exchange swap action: %w " , err)
}
actions = append (actions, swapAction)
return actions, nil
}
// buildTransactionTransferAction creates a TransactionTransfer action for a given transfer.
func ( w * worker ) buildTransactionTransferAction ( ctx context . Context , task * source . Task , from , to common . Address , tokenAddress * common . Address , amount * big . Int ) ( * activityx . Action , error ) {
tokenMetadata, err := w.tokenClient. Lookup (ctx, task.ChainID, tokenAddress, nil , task.Header.Number)
if err != nil {
return nil , fmt. Errorf ( "lookup token metadata: %w " , err)
}
tokenMetadata.Value = lo. ToPtr (decimal. NewFromBigInt (amount, 0 ))
return & activityx . Action {
Type: typex.TransactionTransfer,
Platform: w. Platform (),
From: from. String (),
To: to. String (),
Metadata: metadata. TransactionTransfer ( * tokenMetadata),
}, nil
}
// buildExchangeSwapAction creates an ExchangeSwap action for a given swap.
func ( w * worker ) buildExchangeSwapAction ( ctx context . Context , task * source . Task , from , to , tokenIn , tokenOut common . Address , amountIn , amountOut * big . Int ) ( * activityx . Action , error ) {
tokenInAddress := lo. Ternary (tokenIn != zerion.AddressNativeToken, & tokenIn, nil )
tokenOutAddress := lo. Ternary (tokenOut != zerion.AddressNativeToken, & tokenOut, nil )
tokenInMetadata, err := w.tokenClient. Lookup (ctx, task.ChainID, tokenInAddress, nil , task.Header.Number)
if err != nil {
return nil , fmt. Errorf ( "lookup token in metadata: %w " , err)
}
tokenInMetadata.Value = lo. ToPtr (decimal. NewFromBigInt (amountIn, 0 ))
tokenOutMetadata, err := w.tokenClient. Lookup (ctx, task.ChainID, tokenOutAddress, nil , task.Header.Number)
if err != nil {
return nil , fmt. Errorf ( "lookup token out metadata: %w " , err)
}
tokenOutMetadata.Value = lo. ToPtr (decimal. NewFromBigInt (amountOut, 0 ))
return & activityx . Action {
Type: typex.ExchangeSwap,
Platform: w. Platform (),
From: from. String (),
To: to. String (),
Metadata: metadata . ExchangeSwap {
From: * tokenInMetadata,
To: * tokenOutMetadata,
},
}, nil
}
// NewWorker creates and initializes a new Zerion worker.
func NewWorker ( config * config . Module ) ( engine . Worker , error ) {
var err error
instance := worker {
config: config,
}
// Initialize token client.
instance.tokenClient = token. NewClient (instance.ethereumClient)
// Initialize filterers.
instance.erc20Filterer = lo. Must (erc20. NewERC20Filterer (ethereum.AddressGenesis, nil ))
instance.routerFilterer = lo. Must (zerion. NewRouterFilterer (zerion.AddressRouter, nil ))
if instance.ethereumClient, err = ethereum. Dial (context. Background (), config.Endpoint.URL, config.Endpoint. BuildEthereumOptions () ... ); err != nil {
return nil , fmt. Errorf ( "initialize ethereum client: %w " , err)
}
instance.tokenClient = token. NewClient (instance.ethereumClient)
return & instance, nil
}
For a comprehensive understanding of the implementation, please refer to the source code available here .
For a more structured analysis, let’s break down the code into its key components and examine each in detail.
// worker represents the Zerion worker, which processes Zerion-related transactions.
type worker struct {
config * config . Module
ethereumClient ethereum . Client
tokenClient token . Client
erc20Filterer * erc20 . ERC20Filterer
routerFilterer * zerion . RouterFilterer
}
It incorporates several key elements:
token.Client
: component is responsible for retrieving and parsing token metadata.
erc20.ERC20Filterer
: filterer to filter ERC20 transfer logs.
zerion.RouterFilterer
: filterer to filter Zerion swap logs.
These components are essential for the worker's operations and are therefore defined as fields within the struct.
Implement the NewWorker
function to instantiate and configure the worker.
This function sets up all necessary components.
// NewWorker creates and initializes a new Zerion worker.
func NewWorker ( config * config . Module ) ( engine . Worker , error ) {
var err error
instance := worker {
config: config,
}
// Initialize token client.
instance.tokenClient = token. NewClient (instance.ethereumClient)
// Initialize filterers.
instance.erc20Filterer = lo. Must (erc20. NewERC20Filterer (ethereum.AddressGenesis, nil ))
instance.routerFilterer = lo. Must (zerion. NewRouterFilterer (zerion.AddressRouter, nil ))
if instance.ethereumClient, err = ethereum. Dial (context. Background (), config.Endpoint.URL, config.Endpoint. BuildEthereumOptions () ... ); err != nil {
return nil , fmt. Errorf ( "initialize ethereum client: %w " , err)
}
instance.tokenClient = token. NewClient (instance.ethereumClient)
return & instance, nil
}
The Worker
interface, defined in internal/engine/worker/worker.go
, serves as a blueprint for all other workers.
This interface enforces consistency and standardization across all worker implementations.
Each worker must comply with the interface, implementing its methods to maintain a uniform structure, facilitating development and maintenance.
type Worker interface {
// Name is the name of the worker.
Name () string
// Platform returns the display name of the worker as the `platform` in the final Activity response.
Platform () string
// Network returns all networks where the worker runs on and displayed as the `network` in the final Activity response.
Network () [] network . Network
// Tags the possible `tag` of the worker, displayed in the final Activity response.
Tags () [] tag . Tag
// Types the possible `type` of the worker, displayed in the final Activity response.
Types () [] schema . Type
// Filter the DataSourceFilter of the worker(network, state, start logics, etc.).
Filter () DataSourceFilter
// Transform the core logic of the worker and returns the Activity.
Transform ( ctx context . Context , task Task ) ( * activityx . Activity , error )
}
The Name
, Platform
, Network
, Tags
, and Types
functions are simple implementations that return their respective values.
In the Filter
function, passing the contract address and event hash to the source.
Filter struct, when applicable, enables the Node to efficiently filter logs from specific contracts and events.
The Transform
function, which forms the core logic of the worker, is responsible for parsing events from logs, constructing actions, and assembling the final activity.
This section explores the Transform
function in detail in.
ethereumTask, ok := task.( * source . Task )
if ! ok {
return nil , fmt. Errorf ( "invalid task type %T " , task)
}
activity, err := ethereumTask. BuildActivity (activityx. WithActivityPlatform (w. Platform ()))
if err != nil {
return nil , fmt. Errorf ( "build activity: %w " , err)
}
The Transform
function verifies the task type for compatibility, then constructs the activity object using the provided task data.
This process ensures type safety and initializes the activity with the required platform information.
// Iterate through all logs in the transaction receipt
for _, log := range ethereumTask.Receipt.Logs {
if len (log.Topics) == 0 {
continue
}
switch {
case w. matchSwapLog (ethereumTask, log):
actions, err := w. transformSwapLog (ctx, ethereumTask, log)
if err != nil {
zap. L (). Warn ( "handle settlement trade log" , zap. Error (err), zap. String ( "worker" , w. Name ()), zap. String ( "task" , ethereumTask. ID ()))
continue
}
activity.Actions = append (activity.Actions, actions ... )
default :
zap. L (). Debug ( "unsupported log" , zap. String ( "worker" , w. Name ()), zap. String ( "task" , ethereumTask. ID ()), zap. Stringer ( "topic" , log.Topics[ 0 ]))
}
}
if len (activity.Actions) == 0 {
return nil , fmt. Errorf ( "no actions" )
}
// matchSwapLog checks if the given log matches a Zerion swap event.
func ( w * worker ) matchSwapLog ( _ * source . Task , log * ethereum . Log ) bool {
return contract. MatchEventHashes (log.Topics[ 0 ], zerion.EventHashExecuted) &&
contract. MatchAddresses (log.Address, zerion.AddressRouter)
}
It then iterates through each log entry in the transaction receipt, verifying whether each log matches the expected event hash.
Upon a successful match, it parses the event data and constructs the corresponding action using the extracted information.
// transformSwapLog transforms a Zerion swap log into a list of actions.
func ( w * worker ) transformSwapLog ( ctx context . Context , task * source . Task , log * ethereum . Log ) ([] * activityx . Action , error ) {
event, err := w.routerFilterer. ParseExecuted (log. Export ())
if err != nil {
return nil , fmt. Errorf ( "parse Executed event: %w " , err)
}
var actions [] * activityx . Action
// Handle protocol fee if present
if event.ProtocolFeeAmount. Sign () == 1 {
feeAction, err := w. buildTransactionTransferAction (ctx, task, event.Sender, zerion.AddressRouter, lo. Ternary (event.OutputToken == zerion.AddressNativeToken, nil , & event.OutputToken), event.ProtocolFeeAmount)
if err != nil {
return nil , fmt. Errorf ( "build transaction transfer action for fee: %w " , err)
}
actions = append (actions, feeAction)
}
// Build the main swap action
swapAction, err := w. buildExchangeSwapAction (ctx, task, event.Sender, event.Sender, event.InputToken, event.OutputToken, event.AbsoluteInputAmount, event.ReturnedAmount)
if err != nil {
return nil , fmt. Errorf ( "build exchange swap action: %w " , err)
}
actions = append (actions, swapAction)
return actions, nil
}
The event data from the log must be parsed to extract key details for the swap activity, including the sender’s address, token addresses involved, and transaction amounts.
This parsed data serves as the basis for constructing a detailed representation of the swap activity.
// buildExchangeSwapAction creates an ExchangeSwap action for a given swap.
func ( w * worker ) buildExchangeSwapAction ( ctx context . Context , task * source . Task , from , to , tokenIn , tokenOut common . Address , amountIn , amountOut * big . Int ) ( * activityx . Action , error ) {
tokenInAddress := lo. Ternary (tokenIn != zerion.AddressNativeToken, & tokenIn, nil )
tokenOutAddress := lo. Ternary (tokenOut != zerion.AddressNativeToken, & tokenOut, nil )
tokenInMetadata, err := w.tokenClient. Lookup (ctx, task.ChainID, tokenInAddress, nil , task.Header.Number)
if err != nil {
return nil , fmt. Errorf ( "lookup token in metadata: %w " , err)
}
tokenInMetadata.Value = lo. ToPtr (decimal. NewFromBigInt (amountIn, 0 ))
tokenOutMetadata, err := w.tokenClient. Lookup (ctx, task.ChainID, tokenOutAddress, nil , task.Header.Number)
if err != nil {
return nil , fmt. Errorf ( "lookup token out metadata: %w " , err)
}
tokenOutMetadata.Value = lo. ToPtr (decimal. NewFromBigInt (amountOut, 0 ))
return & activityx . Action {
Type: typex.ExchangeSwap,
Platform: w. Platform (),
From: from. String (),
To: to. String (),
Metadata: metadata . ExchangeSwap {
From: * tokenInMetadata,
To: * tokenOutMetadata,
},
}, nil
}
Following the construction of the necessary components, it proceeds to assemble and return the action, which serves as the foundation for the final activity.
This step is crucial in encapsulating the parsed event data into a structured format that accurately represents the swap operation within our system.
For a comprehensive understanding of the testing cases, please refer to the source code .
This resource provides detailed insights into the testing logic implemented.
Please note that a unit test is required for all new workers.
The following section outlines the fundamental structure of the test suite:
package zerion_test
import (
" context "
" math/big "
" testing "
" github.com/ethereum/go-ethereum/common "
" github.com/ethereum/go-ethereum/common/hexutil "
" github.com/rss3-network/node/config "
source " github.com/rss3-network/node/internal/engine/protocol/ethereum "
worker " github.com/rss3-network/node/internal/engine/worker/decentralized/contract/zerion "
" github.com/rss3-network/node/provider/ethereum "
" github.com/rss3-network/node/provider/ethereum/contract/zerion "
" github.com/rss3-network/node/provider/ethereum/endpoint "
workerx " github.com/rss3-network/node/schema/worker/decentralized "
activityx " github.com/rss3-network/protocol-go/schema/activity "
" github.com/rss3-network/protocol-go/schema/metadata "
" github.com/rss3-network/protocol-go/schema/network "
" github.com/rss3-network/protocol-go/schema/typex "
" github.com/samber/lo "
" github.com/shopspring/decimal "
" github.com/stretchr/testify/require "
)
func TestWorker_Zerion ( t * testing . T ) {
t. Parallel ()
type arguments struct {
task * source . Task
config * config . Module
}
testcases := [] struct {
name string
arguments arguments
want * activityx . Activity
wantError require . ErrorAssertionFunc
}{
{
name: "Swap MATIC and SAND on Polygon" ,
arguments: struct {
task * source . Task
config * config . Module
}{
task: & source . Task {},
config: & config . Module {
Network: network.Polygon,
Endpoint: config . Endpoint {
URL: endpoint. MustGet (network.Polygon),
},
},
},
want: & activityx . Activity {},
wantError: require.NoError,
},
}
for _, testcase := range testcases {
testcase := testcase
t. Run (testcase.name, func ( t * testing . T ) {
t. Parallel ()
ctx := context. Background ()
instance, err := worker. NewWorker (testcase.arguments.config)
require. NoError (t, err)
activity, err := instance. Transform (ctx, testcase.arguments.task)
testcase. wantError (t, err)
t. Log ( string (lo. Must (json. MarshalIndent (activity, "" , " \x20\x20 " ))))
require. Equal (t, testcase.want, activity)
})
}
}
To obtain mock transaction data for EVM worker testing (represented as task: &source.Task{}
in the test case), execute the following command:
go run ./tool/testcase/main.go --source=ethereum --endpoint=https://rpc.ankr.com/eth --activity=0x0000000000000000000000000000000000000000000000000000000000000000
func newNonCoreWorker ( config * config . Module , databaseClient database . Client , redisClient rueidis . Client ) ( engine . Worker , error ) {
switch config.Worker {
case decentralized.Zerion:
return zerion. NewWorker (config)
default :
return nil , fmt. Errorf ( "unsupported worker %s " , config.Worker)
}
}
Upon successful implementation of the worker, it is necessary to integrate it into the factory located in internal/engine/worker/decentralized/factory.go
.
This integration ensures the worker's availability within the Node ecosystem.
To enable operator access to the new worker, it must be added to the node configurations in internal/node/component/info/network_config.go
.
This process involves updating two critical areas:
NetworkToWorkersMap
var NetworkToWorkersMap = map [ network . Network ][] worker . Worker {
network.Arbitrum: {
// ...workers
decentralized.Zerion,
},
network.Arweave: { // ...workers},
network.Avalanche: {
// ...workers
decentralized.Zerion,
},
network.Base: {
// ...workers
decentralized.Zerion,
},
// ...other networks
}
WorkerToConfigMap
// WorkerToConfigMap is a map of worker to config.
var WorkerToConfigMap = map [ network . Protocol ] map [ worker . Worker ] workerConfig {
network.ActivityPubProtocol: { // ...configs},
network.ArweaveProtocol: { // ...configs},
network.EthereumProtocol: {
// ...configs
decentralized.Zerion: defaultWorkerConfig (decentralized.Zerion, network.EthereumProtocol, nil ),
},
network.FarcasterProtocol: { // ...configs},
network.NearProtocol: { // ...configs},
network.RSSProtocol: { // ...configs},
}
To facilitate local testing, execute make service_up
to initialize and launch essential services such as PostgreSQL, Redis, and other dependencies.
If necessary, you can adjust the port configurations in the ./devcontainer
directory.
To terminate all services, use the make service_down
command.
Configure the config.yaml
file in accordance with the specifications outlined in the documentation .
To incorporate a new worker, add an entry to the components section of the configuration file.
For example:
components :
decentralized :
- id : optimism-zerion
network : optimism
endpoint : https://rpc.ankr.com/optimism
worker : zerion
To initiate the Node with the newly integrated worker, execute the following command:
ENVIRONMENT = development go run ./cmd --worker.id=optimism-zerion
Verify the data in database to confirm the presence of activities generated by the newly implemented worker.
This step is crucial for ensuring the proper functionality and integration of your worker within the system.
To evaluate the functionality of your newly implemented worker through the API, follow these steps:
Initiate the API server by executing the following command:
ENVIRONMENT = development go run ./cmd --module=core
Once the server is running, you can use the API to retrieve activity data, enabling you to verify that the worker is properly generating and storing activities.
You can query the activity data by making an API request:
curl --location 'http://localhost:80/decentralized/platform/Zerion' \
--header 'Authorization: Bearer test'
This tutorial has offered a detailed guide to contributing a new worker to the RSS3 Node.
By following these steps, you have learned how to extend the Node’s capabilities and integrate new Open Information applications.
For a more in-depth understanding of the implementation details, refer to the following Pull Request: feat(worker/zerion): add zerion worker #564 .
This PR provides a practical example, illustrating the application of the concepts covered in this tutorial.
We welcome any contributions and look forward to seeing your next Pull Request!
If you have any questions or require further clarification, join our Discord .