A worker is the fundamental unit of a Node, encompassing a set of logics to index, structure, and store Open Information.
While many workers have already been implmented (see supported workers), you can extend its capabilities by contributing new workers.
RSS3 Node is open-source and conforms to the RSS3 Protocol.
You can find the Node's source code here.
Node: The RSS3 Data Sublayer (DSL) component that is responsible for indexing, transforming, storing, and ultimately serving the Open Information to the end users.
Protocol-Go is a Go implement of the RSS3 Protocol.
Popular networks like Ethereum, Arweave, and Near are already supported, along with various structures and schemas.
If your contribution does not involve adding a new network or defining a new structure, you can simply use the latest Protocol-Go version in Node (go get github.com/rss3-network/protocol-go).
The repository implements core logics for indexing, transforming, storing, and serving Open Information to end users.
It includes these the following main components:
Engine
Protocol: The indexing logic for specific networks.
Worker: The transforming logic for specific dApps (the focus of this tutorial).
Indexer: Handles data indexing, transforming, and storing.
Core API: Serves Node APIs, including decentralized, federated, and RSS data querying logics, as well as Node information APIs for operators.
Broadcaster: Registers nodes to Global Indexers (GIs).
Learn more here.
Monitor: Monitors data indexing progress and prunes outdated data.
Don't worry if this seems complex!
The worker component is designed as a standalone module within the engine.
You can integrate and test a new worker by modifying just a few files related to data transformation, without dealing with data indexing or storage logic.
For GoLand users, dependencies are automatically managed upon launching.
Users of other Integrated Development Environments (IDEs) may need to install dependencies manually via the following command:
// ToTagsMap is a map of worker to tagsvar ToTagsMap = map[Worker][]tag.Tag{ Aave: {tag.Exchange}, // ...other tag maps Zerion: {tag.Exchange, tag.Transaction},}
Finally, map the worker to its corresponding platform in ToPlatformMap:
// ToPlatformMap is a map of worker to platformvar ToPlatformMap = map[Worker]Platform{ Aave: PlatformAAVE, // ...other platform maps Zerion: PlatformZerion,}
Before integrating a new dApp, it is essential to fully understand its functionality.
We highly recommend carefully reviewing the project’s documentation and smart contracts to identify key elements, including contract addresses, ABIs, and critical event hashes.
For instance, Zerion's Router contract ABI, transactions, and event logs can be examined at Optimism Scan.
It then sets up all contract addresses, event hashes, and the Go SDK for the target contract in the provider/ethereum/contract directory.
A typical Ethereum contract directory structure is as follows:
.├── contract.go├── Router.abi├── router.go
contract.go sets the contract address and event hash:
The Worker interface, defined in internal/engine/worker/worker.go, serves as a blueprint for all other workers.
This interface enforces consistency and standardization across all worker implementations.
Each worker must comply with the interface, implementing its methods to maintain a uniform structure, facilitating development and maintenance.
type Worker interface { // Name is the name of the worker. Name() string // Platform returns the display name of the worker as the `platform` in the final Activity response. Platform() string // Network returns all networks where the worker runs on and displayed as the `network` in the final Activity response. Network() []network.Network // Tags the possible `tag` of the worker, displayed in the final Activity response. Tags() []tag.Tag // Types the possible `type` of the worker, displayed in the final Activity response. Types() []schema.Type // Filter the DataSourceFilter of the worker(network, state, start logics, etc.). Filter() DataSourceFilter // Transform the core logic of the worker and returns the Activity. Transform(ctx context.Context, task Task) (*activityx.Activity, error)}
The Name, Platform, Network, Tags, and Types functions are simple implementations that return their respective values.
In the Filter function, passing the contract address and event hash to the source.
Filter struct, when applicable, enables the Node to efficiently filter logs from specific contracts and events.
The Transform function, which forms the core logic of the worker, is responsible for parsing events from logs, constructing actions, and assembling the final activity.
The Transform function verifies the task type for compatibility, then constructs the activity object using the provided task data.
This process ensures type safety and initializes the activity with the required platform information.
// Iterate through all logs in the transaction receiptfor _, log := range ethereumTask.Receipt.Logs { if len(log.Topics) == 0 { continue } switch { case w.matchSwapLog(ethereumTask, log): actions, err := w.transformSwapLog(ctx, ethereumTask, log) if err != nil { zap.L().Warn("handle settlement trade log", zap.Error(err), zap.String("worker", w.Name()), zap.String("task", ethereumTask.ID())) continue } activity.Actions = append(activity.Actions, actions...) default: zap.L().Debug("unsupported log", zap.String("worker", w.Name()), zap.String("task", ethereumTask.ID()), zap.Stringer("topic", log.Topics[0])) }}if len(activity.Actions) == 0 { return nil, fmt.Errorf("no actions")}
// matchSwapLog checks if the given log matches a Zerion swap event.func (w *worker) matchSwapLog(_ *source.Task, log *ethereum.Log) bool { return contract.MatchEventHashes(log.Topics[0], zerion.EventHashExecuted) && contract.MatchAddresses(log.Address, zerion.AddressRouter)}
It then iterates through each log entry in the transaction receipt, verifying whether each log matches the expected event hash.
Upon a successful match, it parses the event data and constructs the corresponding action using the extracted information.
// transformSwapLog transforms a Zerion swap log into a list of actions.func (w *worker) transformSwapLog(ctx context.Context, task *source.Task, log *ethereum.Log) ([]*activityx.Action, error) { event, err := w.routerFilterer.ParseExecuted(log.Export()) if err != nil { return nil, fmt.Errorf("parse Executed event: %w", err) } var actions []*activityx.Action // Handle protocol fee if present if event.ProtocolFeeAmount.Sign() == 1 { feeAction, err := w.buildTransactionTransferAction(ctx, task, event.Sender, zerion.AddressRouter, lo.Ternary(event.OutputToken == zerion.AddressNativeToken, nil, &event.OutputToken), event.ProtocolFeeAmount) if err != nil { return nil, fmt.Errorf("build transaction transfer action for fee: %w", err) } actions = append(actions, feeAction) } // Build the main swap action swapAction, err := w.buildExchangeSwapAction(ctx, task, event.Sender, event.Sender, event.InputToken, event.OutputToken, event.AbsoluteInputAmount, event.ReturnedAmount) if err != nil { return nil, fmt.Errorf("build exchange swap action: %w", err) } actions = append(actions, swapAction) return actions, nil}
The event data from the log must be parsed to extract key details for the swap activity, including the sender’s address, token addresses involved, and transaction amounts.
This parsed data serves as the basis for constructing a detailed representation of the swap activity.
// buildExchangeSwapAction creates an ExchangeSwap action for a given swap.func (w *worker) buildExchangeSwapAction(ctx context.Context, task *source.Task, from, to, tokenIn, tokenOut common.Address, amountIn, amountOut *big.Int) (*activityx.Action, error) { tokenInAddress := lo.Ternary(tokenIn != zerion.AddressNativeToken, &tokenIn, nil) tokenOutAddress := lo.Ternary(tokenOut != zerion.AddressNativeToken, &tokenOut, nil) tokenInMetadata, err := w.tokenClient.Lookup(ctx, task.ChainID, tokenInAddress, nil, task.Header.Number) if err != nil { return nil, fmt.Errorf("lookup token in metadata: %w", err) } tokenInMetadata.Value = lo.ToPtr(decimal.NewFromBigInt(amountIn, 0)) tokenOutMetadata, err := w.tokenClient.Lookup(ctx, task.ChainID, tokenOutAddress, nil, task.Header.Number) if err != nil { return nil, fmt.Errorf("lookup token out metadata: %w", err) } tokenOutMetadata.Value = lo.ToPtr(decimal.NewFromBigInt(amountOut, 0)) return &activityx.Action{ Type: typex.ExchangeSwap, Platform: w.Platform(), From: from.String(), To: to.String(), Metadata: metadata.ExchangeSwap{ From: *tokenInMetadata, To: *tokenOutMetadata, }, }, nil}
Following the construction of the necessary components, it proceeds to assemble and return the action, which serves as the foundation for the final activity.
This step is crucial in encapsulating the parsed event data into a structured format that accurately represents the swap operation within our system.
For a comprehensive understanding of the testing cases, please refer to the source code.
This resource provides detailed insights into the testing logic implemented.
Please note that a unit test is required for all new workers.
The following section outlines the fundamental structure of the test suite:
To obtain mock transaction data for EVM worker testing (represented as task: &source.Task{} in the test case), execute the following command:
go run ./tool/testcase/main.go --source=ethereum --endpoint=https://rpc.ankr.com/eth --activity=0x0000000000000000000000000000000000000000000000000000000000000000
Upon successful implementation of the worker, it is necessary to integrate it into the factory located in internal/engine/worker/decentralized/factory.go.
This integration ensures the worker's availability within the Node ecosystem.
To enable operator access to the new worker, it must be added to the node configurations in internal/node/component/info/network_config.go.
This process involves updating two critical areas:
To facilitate local testing, execute make service_up to initialize and launch essential services such as PostgreSQL, Redis, and other dependencies.
If necessary, you can adjust the port configurations in the ./devcontainer directory.
To terminate all services, use the make service_down command.
To initiate the Node with the newly integrated worker, execute the following command:
ENVIRONMENT=development go run ./cmd --worker.id=optimism-zerion
Verify the data in database to confirm the presence of activities generated by the newly implemented worker.
This step is crucial for ensuring the proper functionality and integration of your worker within the system.
To evaluate the functionality of your newly implemented worker through the API, follow these steps:
Initiate the API server by executing the following command:
ENVIRONMENT=development go run ./cmd --module=core
Once the server is running, you can use the API to retrieve activity data, enabling you to verify that the worker is properly generating and storing activities.
You can query the activity data by making an API request:
This tutorial has offered a detailed guide to contributing a new worker to the RSS3 Node.
By following these steps, you have learned how to extend the Node’s capabilities and integrate new Open Information applications.
For a more in-depth understanding of the implementation details, refer to the following Pull Request: feat(worker/zerion): add zerion worker #564.
This PR provides a practical example, illustrating the application of the concepts covered in this tutorial.
We welcome any contributions and look forward to seeing your next Pull Request!
If you have any questions or require further clarification, join our Discord.