Building Custom Nodes¶
This guide covers how to create, test, and publish custom nodes for FlowForge.
defineNode() API¶
The defineNode() function from @flowforgejs/sdk creates a NodeDefinition object:
import { z } from 'zod';
import { defineNode } from '@flowforgejs/sdk';
export const sentimentNode = defineNode({
name: 'custom/sentiment-analysis',
version: '1.0.0',
description: 'Analyze sentiment of text using an LLM',
category: 'ai',
inputSchema: z.object({
text: z.string().min(1),
language: z.string().default('en'),
}),
outputSchema: z.object({
sentiment: z.enum(['positive', 'negative', 'neutral']),
confidence: z.number().min(0).max(1),
explanation: z.string(),
}),
configSchema: z.object({
model: z.string().default('gpt-4o-mini'),
threshold: z.number().default(0.7),
}),
handler: async (ctx) => {
const { text, language } = ctx.input as { text: string; language: string };
const { model } = ctx.config as { model: string };
const result = await ctx.ai.generateObject({
model,
schema: z.object({
sentiment: z.enum(['positive', 'negative', 'neutral']),
confidence: z.number(),
explanation: z.string(),
}),
prompt: `Analyze the sentiment of the following ${language} text: "${text}"`,
});
return result.object;
},
// Optional lifecycle hooks
onInit: async (config) => {
console.log('Node initialized with config:', config);
},
onDestroy: async () => {
console.log('Node destroyed, cleaning up resources');
},
onError: async (error, ctx) => {
ctx.logger.error({ err: error }, 'Sentiment analysis failed');
},
// Optional metadata
retries: 2,
timeout: 30_000,
tags: ['ai', 'nlp', 'sentiment'],
author: 'your-name',
repository: 'https://github.com/your-org/flowforge-sentiment',
});
Full Options Reference¶
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
Yes | -- | Unique node identifier (convention: category/name) |
version |
string |
Yes | -- | Semver version string |
description |
string |
Yes | -- | Human-readable description |
category |
NodeCategory |
Yes | -- | One of: data, communication, ai, control, transform, custom |
inputSchema |
z.ZodTypeAny |
Yes | -- | Zod schema for input validation |
outputSchema |
z.ZodTypeAny |
Yes | -- | Zod schema for output validation |
configSchema |
z.ZodTypeAny |
Yes | -- | Zod schema for static configuration |
handler |
(ctx: NodeContext) => Promise<unknown> |
Yes | -- | Execution logic |
onInit |
(config: unknown) => Promise<void> |
No | -- | Called once when node is initialized |
onDestroy |
() => Promise<void> |
No | -- | Called when node is torn down |
onError |
(error: Error, ctx: NodeContext) => Promise<void> |
No | -- | Called on handler errors (before retry) |
retries |
number |
No | 3 |
Max retry attempts on failure |
timeout |
number |
No | -- | Handler timeout in milliseconds |
tags |
string[] |
No | -- | Searchable tags |
author |
string |
No | -- | Author name |
repository |
string |
No | -- | Source repository URL |
Input/Output/Config Schemas¶
All three schemas use Zod. They serve two purposes:
- Runtime validation -- the engine validates input, output, and config before and after handler execution.
- Documentation -- schemas are introspectable and can generate JSON Schema for dashboard UIs and API docs.
// Schemas with descriptions for better documentation
const inputSchema = z.object({
query: z.string().min(1).describe('Search query string'),
limit: z.number().int().min(1).max(100).default(10).describe('Max results'),
filters: z.record(z.string()).optional().describe('Key-value filter pairs'),
});
Use .describe()
Adding .describe() to schema fields improves auto-generated documentation and helps LLM agents understand what each field does when the node is used as a tool.
Handler Function¶
The handler receives a NodeContext object with full access to the FlowForge runtime:
handler: async (ctx) => {
// Access validated input and config
const input = ctx.input as MyInput;
const config = ctx.config as MyConfig;
// Use data adaptors
const records = await ctx.pull('postgres', { query: 'SELECT * FROM t' });
// Use integrations
await ctx.integrate('slack', 'sendMessage', { channel: '#alerts', text: 'Alert!' });
// Use AI
const response = await ctx.ai.generateText({ model: 'gpt-4o', prompt: 'Hello' });
// Access previous step outputs
const previousResult = ctx.steps['previous-step-name'];
// Emit events
await ctx.emit('processing.complete', { id: 123 });
// Log
ctx.logger.info({ recordCount: 42 }, 'Processing complete');
// Return output (validated against outputSchema)
return { result: response.text, count: 42 };
};
Lifecycle Hooks¶
onInit¶
Called once when the node is first loaded. Use it for setup work like establishing connections or validating configuration.
onInit: async (config) => {
const { apiKey } = config as { apiKey: string };
if (!apiKey) throw new Error('apiKey is required');
// Pre-warm connections, load models, etc.
},
onDestroy¶
Called when the engine shuts down. Use it to clean up resources.
onError¶
Called when the handler throws an error, before the retry mechanism kicks in. Use it for custom error logging or alerting.
onError: async (error, ctx) => {
ctx.logger.error({ err: error, runId: ctx.metadata.runId }, 'Node failed');
await ctx.integrate('slack', 'sendMessage', {
connectionId: 'alerts',
channel: '#errors',
text: `Node ${ctx.metadata.workflowId} failed: ${error.message}`,
});
},
Testing with @flowforgejs/test-utils¶
The @flowforgejs/test-utils package provides createMockContext() for unit testing nodes:
import { describe, it, expect } from 'vitest';
import { createMockContext } from '@flowforgejs/test-utils';
import { sentimentNode } from './sentiment';
describe('sentimentNode', () => {
it('should analyze positive text', async () => {
const { ctx, calls } = createMockContext({
input: { text: 'I love this product!', language: 'en' },
config: { model: 'gpt-4o-mini', threshold: 0.7 },
});
// Override AI mock to return a specific response
ctx.ai.generateObject = async () => ({
object: {
sentiment: 'positive',
confidence: 0.95,
explanation: 'Strong positive language detected',
},
});
const result = await sentimentNode.handler(ctx);
expect(result).toEqual({
sentiment: 'positive',
confidence: 0.95,
explanation: 'Strong positive language detected',
});
});
it('should track integration calls', async () => {
const { ctx, calls } = createMockContext({
input: { text: 'test' },
config: { model: 'gpt-4o-mini' },
});
await ctx.integrate('slack', 'sendMessage', { channel: '#test', text: 'hi' });
expect(calls).toContainEqual({
method: 'integrate',
args: ['slack', 'sendMessage', { channel: '#test', text: 'hi' }],
});
});
});
MockContext capabilities¶
The mock context provides:
- Stub implementations for
pull,push,integrate,emit,wait,sleep,checkpoint - Call tracking via the
callsarray -- every method invocation is recorded - Noop AI -- all AI methods return empty responses by default; override them for specific test scenarios
- MockLogger -- captures log output without printing
Publishing as an npm Package¶
Custom nodes are plain TypeScript objects. To share them:
my-flowforge-nodes/
src/
index.ts # Re-exports all nodes
sentiment.ts # Node implementation
package.json
tsconfig.json
{
"name": "@my-org/flowforge-nodes",
"version": "1.0.0",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"peerDependencies": {
"@flowforgejs/sdk": "^0.1.0",
"zod": "^3.0.0"
}
}
// src/index.ts
export { sentimentNode } from './sentiment.js';
export { classifyNode } from './classify.js';
Consumers install and use:
import { sentimentNode } from '@my-org/flowforge-nodes';
import { workflow } from '@flowforgejs/sdk';
const wf = workflow('analyze')
.trigger({ type: 'manual' })
.node('sentiment', sentimentNode, {
config: { model: 'gpt-4o', threshold: 0.8 },
input: (ctx) => ({ text: (ctx.event.data as { text: string }).text }),
})
.build();
Peer dependencies
Always declare @flowforgejs/sdk and zod as peer dependencies to avoid version conflicts and duplicate instances.