Firehose
Firehose is a server that bridges WebSocket clients with libp2p Pubsub,
enabling browsers and Node.js clients to participate in a P2P network.
Import
Section titled “Import”import { Firehose } from "@aikyo/firehose";Constructor
Section titled “Constructor”constructor(port: number, libp2pConfig?: Libp2pOptions<Services>)Parameters
Section titled “Parameters”| Parameter | Type | Description |
|---|---|---|
port | number | Port number for the WebSocket server |
libp2pConfig | Libp2pOptions<Services> | Custom libp2p config |
Usage Example
Section titled “Usage Example”import { Firehose } from "@aikyo/firehose";
const firehose = new Firehose(8080);await firehose.start();
// Subscribe to each topicawait firehose.subscribe("messages", (data) => { firehose.broadcastToClients(data);});
await firehose.subscribe("queries", (data) => { firehose.broadcastToClients(data);});
await firehose.subscribe("actions", (data) => { firehose.broadcastToClients(data);});Client Example:
import WebSocket from 'ws';import { randomUUID } from "node:crypto";
const firehoseUrl = 'ws://localhost:8080';const ws = new WebSocket(firehoseUrl);
const companionId = 'companion_aya'; // Specify Aya's IDconst userId = 'user_yamada'; // Specify the user's name
ws.on('open', () => { const message = { topic: "messages", body: { jsonrpc: '2.0', method: 'message.send', params: { id: randomUUID(), from: userId, to: [companionId], message: 'Hello Aya!', } } };
ws.send(JSON.stringify(message));});
ws.on('message', (data) => { console.log(JSON.parse(data.toString()));});For using custom libp2p configuration, you must provide complete settings.
import { Firehose } from "@aikyo/firehose";import { gossipsub } from "@chainsafe/libp2p-gossipsub";import { noise } from "@chainsafe/libp2p-noise";import { yamux } from "@chainsafe/libp2p-yamux";import { identify } from "@libp2p/identify";import { mdns } from "@libp2p/mdns";import { tcp } from "@libp2p/tcp";
const customFirehose = new Firehose(8080, { addresses: { listen: ["/ip4/0.0.0.0/tcp/9000"] }, transports: [tcp()], peerDiscovery: [mdns()], connectionEncrypters: [noise()], streamMuxers: [yamux()], services: { pubsub: gossipsub({ allowPublishToZeroTopicPeers: true }), identify: identify(), },});await customFirehose.start();Properties
Section titled “Properties”libp2p
Section titled “libp2p”private libp2p: Libp2p<Services>Instance of the libp2p node (configured similarly to CompanionServer).
this.libp2p = await createLibp2p({ addresses: { listen: ["/ip4/0.0.0.0/tcp/0"], }, transports: [tcp()], peerDiscovery: [mdns()], connectionEncrypters: [noise()], streamMuxers: [yamux()], services: { pubsub: gossipsub({ allowPublishToZeroTopicPeers: true }), identify: identify(), },});private wss: WebSocketServerInstance of the WebSocket server.
this.wss = new WebSocketServer({ port: this.port });clients
Section titled “clients”private clients: Set<WebSocket>Set managing connected WebSocket clients.
this.clients = new Set();New clients are added when they connect and removed when disconnecting.
private readonly port: numberPort number for the WebSocket server.
topicHandlers
Section titled “topicHandlers”private topicHandlers: { [K in keyof TopicPayloads]: ((data: TopicPayloads[K]) => void)[];}Object managing handler functions for each topic, providing type-safe event handling.
type TopicPayloads = { messages: Message; queries: Query | QueryResult; actions: Action; states: State;};Multiple handlers can be registered for each topic and will be executed sequentially when messages are received.
libp2pConfig
Section titled “libp2pConfig”private libp2pConfig?: Libp2pOptions<Services>Optional. Custom configuration for the libp2p node. If not specified, default settings will be used.
Methods
Section titled “Methods”receiveHandler
Section titled “receiveHandler”private receiveHandler?: ReceiveHandlerThe handler function that processes data received from the WebSocket client.
Type Definition:
const RequestSchema = z.object({ topic: z.string(), body: z.record(z.any()) });type RequestData = z.infer<typeof RequestSchema>;type ReceiveHandler = ( data: Record<string, unknown>,) => RequestData | Promise<RequestData>;Configured via the setReceiveHandler() method. When a handler is set, all
incoming data from WebSockets will pass through this handler, and the returned
RequestData will be published to libp2p pubsub.
If no handler is configured, the received data will be parsed according to the
RequestSchema and published directly (default behavior).
start()
Section titled “start()”Starts the Firehose server.
async start(): Promise<void>Process Flow:
- Initializes the libp2p node
- Starts the WebSocket server
- Registers event listeners
Example Output:
aikyo firehose server running on ws://localhost:8080subscribe()
Section titled “subscribe()”Subscribes to specified topics and optionally registers handlers.
async subscribe<K extends keyof TopicPayloads>( topic: K, handler?: (data: TopicPayloads[K]) => void): Promise<void>Parameters:
| Parameter | Type | Description |
|---|---|---|
topic | keyof TopicPayloads | Topic to subscribe to |
handler | function | Handler for messages |
Topic Details:
topic: One of “messages”, “queries”, “actions”, or “states”handler: Optional. Executed when messages are received
Usage Example:
// Subscribe to a topic onlyawait firehose.subscribe("messages");
// Subscribe with a handlerawait firehose.subscribe("messages", (data) => { console.log("Message received:", data); firehose.broadcastToClients(data);});addHandler()
Section titled “addHandler()”Adds a handler to an existing topic.
addHandler<K extends keyof TopicPayloads>( topic: K, handler: (data: TopicPayloads[K]) => void): voidParameters:
| Parameter | Type | Description |
|---|---|---|
topic | keyof TopicPayloads | Topic name |
handler | function | Handler for messages |
Usage Example:
firehose.addHandler("actions", (action) => { console.log("Action received:", action);});setReceiveHandler()
Section titled “setReceiveHandler()”Configure a handler to process data received from WebSocket clients.
setReceiveHandler(handler: ReceiveHandler): voidParameters:
| Parameter | Type | Description |
|---|---|---|
handler | ReceiveHandler | Transforms WebSocket data |
Handler Type:
(data: Record<string, unknown>) => RequestData | Promise<RequestData>- Transforms arbitrary WebSocket payloads into
RequestData
Type Definitions:
const RequestSchema = z.object({ topic: z.string(), body: z.record(z.any()) });type RequestData = z.infer<typeof RequestSchema>;Usage Example:
// Register a custom data-processing handlerfirehose.setReceiveHandler(async (rawData) => { // Perform custom validation or transformation const validated = await validateAndTransform(rawData);
return { topic: "messages", body: { jsonrpc: "2.0", method: "message.send", params: validated } };});
// When a handler is set, the upstream payload can be any shapews.send(JSON.stringify({ customField: "value", anotherField: 123}));Behavior:
- When configured, every inbound WebSocket payload is routed through the handler.
- The handler’s return value (
RequestData) is published to libp2p pubsub. - Without a handler, the payload is parsed via
RequestSchemaand published as-is.
broadcastToClients()
Section titled “broadcastToClients()”Broadcasts data to all connected WebSocket clients.
broadcastToClients(data: unknown): voidParameters:
| Parameter | Type | Description |
|---|---|---|
data | unknown | Data to broadcast (will be JSON.stringified) |