Skip to content

Firehose

Firehose is a server that bridges WebSocket clients with libp2p Pubsub, enabling browsers and Node.js clients to participate in a P2P network.

import { Firehose } from "@aikyo/firehose";
constructor(port: number, libp2pConfig?: Libp2pOptions<Services>)
ParameterTypeDescription
portnumberPort number for the WebSocket server
libp2pConfigLibp2pOptions<Services>Custom libp2p config
import { Firehose } from "@aikyo/firehose";
const firehose = new Firehose(8080);
await firehose.start();
// Subscribe to each topic
await firehose.subscribe("messages", (data) => {
firehose.broadcastToClients(data);
});
await firehose.subscribe("queries", (data) => {
firehose.broadcastToClients(data);
});
await firehose.subscribe("actions", (data) => {
firehose.broadcastToClients(data);
});

Client Example:

import WebSocket from 'ws';
import { randomUUID } from "node:crypto";
const firehoseUrl = 'ws://localhost:8080';
const ws = new WebSocket(firehoseUrl);
const companionId = 'companion_aya'; // Specify Aya's ID
const userId = 'user_yamada'; // Specify the user's name
ws.on('open', () => {
const message = {
topic: "messages",
body: {
jsonrpc: '2.0',
method: 'message.send',
params: {
id: randomUUID(),
from: userId,
to: [companionId],
message: 'Hello Aya!',
}
}
};
ws.send(JSON.stringify(message));
});
ws.on('message', (data) => {
console.log(JSON.parse(data.toString()));
});

For using custom libp2p configuration, you must provide complete settings.

import { Firehose } from "@aikyo/firehose";
import { gossipsub } from "@chainsafe/libp2p-gossipsub";
import { noise } from "@chainsafe/libp2p-noise";
import { yamux } from "@chainsafe/libp2p-yamux";
import { identify } from "@libp2p/identify";
import { mdns } from "@libp2p/mdns";
import { tcp } from "@libp2p/tcp";
const customFirehose = new Firehose(8080, {
addresses: { listen: ["/ip4/0.0.0.0/tcp/9000"] },
transports: [tcp()],
peerDiscovery: [mdns()],
connectionEncrypters: [noise()],
streamMuxers: [yamux()],
services: {
pubsub: gossipsub({ allowPublishToZeroTopicPeers: true }),
identify: identify(),
},
});
await customFirehose.start();
private libp2p: Libp2p<Services>

Instance of the libp2p node (configured similarly to CompanionServer).

this.libp2p = await createLibp2p({
addresses: {
listen: ["/ip4/0.0.0.0/tcp/0"],
},
transports: [tcp()],
peerDiscovery: [mdns()],
connectionEncrypters: [noise()],
streamMuxers: [yamux()],
services: {
pubsub: gossipsub({ allowPublishToZeroTopicPeers: true }),
identify: identify(),
},
});
private wss: WebSocketServer

Instance of the WebSocket server.

this.wss = new WebSocketServer({ port: this.port });
private clients: Set<WebSocket>

Set managing connected WebSocket clients.

this.clients = new Set();

New clients are added when they connect and removed when disconnecting.

private readonly port: number

Port number for the WebSocket server.

private topicHandlers: {
[K in keyof TopicPayloads]: ((data: TopicPayloads[K]) => void)[];
}

Object managing handler functions for each topic, providing type-safe event handling.

type TopicPayloads = {
messages: Message;
queries: Query | QueryResult;
actions: Action;
states: State;
};

Multiple handlers can be registered for each topic and will be executed sequentially when messages are received.

private libp2pConfig?: Libp2pOptions<Services>

Optional. Custom configuration for the libp2p node. If not specified, default settings will be used.

private receiveHandler?: ReceiveHandler

The handler function that processes data received from the WebSocket client.

Type Definition:

const RequestSchema = z.object({ topic: z.string(), body: z.record(z.any()) });
type RequestData = z.infer<typeof RequestSchema>;
type ReceiveHandler = (
data: Record<string, unknown>,
) => RequestData | Promise<RequestData>;

Configured via the setReceiveHandler() method. When a handler is set, all incoming data from WebSockets will pass through this handler, and the returned RequestData will be published to libp2p pubsub.

If no handler is configured, the received data will be parsed according to the RequestSchema and published directly (default behavior).

Starts the Firehose server.

async start(): Promise<void>

Process Flow:

  1. Initializes the libp2p node
  2. Starts the WebSocket server
  3. Registers event listeners

Example Output:

aikyo firehose server running on ws://localhost:8080

Subscribes to specified topics and optionally registers handlers.

async subscribe<K extends keyof TopicPayloads>(
topic: K,
handler?: (data: TopicPayloads[K]) => void
): Promise<void>

Parameters:

ParameterTypeDescription
topickeyof TopicPayloadsTopic to subscribe to
handlerfunctionHandler for messages

Topic Details:

  • topic: One of “messages”, “queries”, “actions”, or “states”
  • handler: Optional. Executed when messages are received

Usage Example:

// Subscribe to a topic only
await firehose.subscribe("messages");
// Subscribe with a handler
await firehose.subscribe("messages", (data) => {
console.log("Message received:", data);
firehose.broadcastToClients(data);
});

Adds a handler to an existing topic.

addHandler<K extends keyof TopicPayloads>(
topic: K,
handler: (data: TopicPayloads[K]) => void
): void

Parameters:

ParameterTypeDescription
topickeyof TopicPayloadsTopic name
handlerfunctionHandler for messages

Usage Example:

firehose.addHandler("actions", (action) => {
console.log("Action received:", action);
});

Configure a handler to process data received from WebSocket clients.

setReceiveHandler(handler: ReceiveHandler): void

Parameters:

ParameterTypeDescription
handlerReceiveHandlerTransforms WebSocket data

Handler Type:

  • (data: Record<string, unknown>) => RequestData | Promise<RequestData>
  • Transforms arbitrary WebSocket payloads into RequestData

Type Definitions:

const RequestSchema = z.object({ topic: z.string(), body: z.record(z.any()) });
type RequestData = z.infer<typeof RequestSchema>;

Usage Example:

// Register a custom data-processing handler
firehose.setReceiveHandler(async (rawData) => {
// Perform custom validation or transformation
const validated = await validateAndTransform(rawData);
return {
topic: "messages",
body: {
jsonrpc: "2.0",
method: "message.send",
params: validated
}
};
});
// When a handler is set, the upstream payload can be any shape
ws.send(JSON.stringify({
customField: "value",
anotherField: 123
}));

Behavior:

  • When configured, every inbound WebSocket payload is routed through the handler.
  • The handler’s return value (RequestData) is published to libp2p pubsub.
  • Without a handler, the payload is parsed via RequestSchema and published as-is.

Broadcasts data to all connected WebSocket clients.

broadcastToClients(data: unknown): void

Parameters:

ParameterTypeDescription
dataunknownData to broadcast (will be JSON.stringified)