SDK Reference¶
The @flowforgejs/sdk package provides the primary API for defining nodes, agents, and workflows.
defineNode(options)¶
Creates a NodeDefinition from the provided options.
Parameters¶
interface DefineNodeInput {
name: string; // Unique identifier (e.g. "custom/my-node")
version: string; // Semver version
description: string; // Human-readable description
category: NodeCategory; // "data" | "communication" | "ai" | "control" | "transform" | "custom"
inputSchema: z.ZodTypeAny; // Zod schema for input validation
outputSchema: z.ZodTypeAny; // Zod schema for output validation
configSchema: z.ZodTypeAny; // Zod schema for static configuration
handler: (ctx: NodeContext) => Promise<unknown>; // Execution logic
// Optional lifecycle hooks
onInit?: (config: unknown) => Promise<void>;
onDestroy?: () => Promise<void>;
onError?: (error: Error, ctx: NodeContext) => Promise<void>;
// Optional execution configuration
retries?: number; // Default: 3
timeout?: number; // In milliseconds, no default
tags?: string[];
author?: string;
repository?: string;
}
Returns¶
NodeDefinition -- a frozen object that can be used in workflows or published as a package.
Example¶
import { z } from 'zod';
import { defineNode } from '@flowforgejs/sdk';
const myNode = defineNode({
name: 'custom/hello',
version: '1.0.0',
description: 'A simple greeting node',
category: 'custom',
inputSchema: z.object({ name: z.string() }),
outputSchema: z.object({ greeting: z.string() }),
configSchema: z.object({ prefix: z.string().default('Hello') }),
handler: async (ctx) => {
const { name } = ctx.input as { name: string };
const { prefix } = ctx.config as { prefix: string };
return { greeting: `${prefix}, ${name}!` };
},
});
defineAgentNode(options)¶
Creates a NodeDefinition that runs an LLM tool-calling loop.
Parameters¶
interface AgentNodeOptions {
name: string;
version: string;
description: string;
model: string; // Default model ID (e.g. "gpt-4o")
systemPrompt: string; // System prompt for the agent
tools: Record<string, AgentToolDef>; // Tool definitions
outputSchema: z.ZodTypeAny; // Schema for structured final output
// Optional
maxIterations?: number; // Default: 10
temperature?: number;
maxTokens?: number;
tags?: string[];
author?: string;
}
interface AgentToolDef {
description: string;
inputSchema: z.ZodTypeAny;
handler: (ctx: NodeContext, input: unknown) => Promise<unknown>;
}
Returns¶
NodeDefinition with:
category: 'ai'retries: 1timeout: 120_000- Tags: includes
'agent'and'ai'automatically
Behavior¶
- Builds AI SDK tool definitions from the
toolsrecord. - Enters a loop calling
ctx.ai.generateText()with tools. - Executes tool calls and feeds results back.
- When the LLM stops calling tools, attempts to produce structured output via
ctx.ai.generateObject()with theoutputSchema. - If
maxIterationsis reached, produces best-effort output.
workflow(id)¶
Creates a WorkflowBuilder for declaratively constructing workflow definitions.
WorkflowBuilder Methods¶
.name(name: string)¶
Set the workflow display name.
.version(version: string)¶
Set the workflow version (default: "1.0.0").
.description(desc: string)¶
Set the workflow description.
.trigger(trigger: TriggerDefinition)¶
Required. Set the workflow trigger.
interface TriggerDefinition {
type: 'event' | 'cron' | 'webhook' | 'manual' | 'sub-workflow';
event?: string; // For type: 'event'
cron?: string; // For type: 'cron' (cron expression)
webhook?: {
// For type: 'webhook'
path: string;
method?: string;
};
}
Examples:
.trigger({ type: 'manual' })
.trigger({ type: 'event', event: 'order.placed' })
.trigger({ type: 'cron', cron: '0 * * * *' }) // Every hour
.trigger({ type: 'webhook', webhook: { path: '/api/ingest', method: 'POST' } })
.node(stepName, nodeOrHandler, options?)¶
Add a node step to the workflow.
// With a NodeDefinition
.node('step-name', myNodeDefinition, {
config: { key: 'value' },
input: (ctx) => ({ field: ctx.event.data }),
when: (ctx) => ctx.event.type === 'specific-type',
dependsOn: ['other-step'],
})
// With an inline handler
.node('step-name', (ctx) => {
return { processed: ctx.event.data };
})
NodeStepOptions:
| Option | Type | Description |
|---|---|---|
config |
Record<string, unknown> |
Static configuration passed to the node |
input |
((ctx: StepContext) => unknown) \| Record<string, unknown> |
Dynamic or static input |
when |
(ctx: StepContext) => boolean \| Promise<boolean> |
Conditional execution |
dependsOn |
string[] |
Step names this step depends on |
.if(stepName, options)¶
Conditional branching.
.if('check', {
condition: (ctx) => (ctx.event.data as { active: boolean }).active,
then: [
['active-step', activeNode, { config: {} }],
],
else: [
['inactive-step', inactiveNode],
],
})
.parallel(stepName, options)¶
Fan-out parallel execution.
.parallel('process-all', {
items: (ctx) => (ctx.event.data as { list: unknown[] }).list,
concurrency: 5,
pipeline: (item) => [
['process', processNode, { input: () => ({ item }) }],
],
})
.forEach(stepName, options)¶
Iterate with concurrency control.
.forEach('iterate', {
items: (ctx) => (ctx.event.data as { records: unknown[] }).records,
concurrency: 3,
pipeline: (item, index) => [
['transform', transformNode, { input: () => ({ item, index }) }],
],
})
.switch(stepName, options)¶
Multi-way branching.
.switch('route', {
value: (ctx) => (ctx.event.data as { type: string }).type,
cases: {
'typeA': [['handle-a', nodeA]],
'typeB': [['handle-b', nodeB]],
},
default: [['handle-default', defaultNode]],
})
.while(stepName, options)¶
Loop with guard.
.while('poll', {
condition: (ctx) => !(ctx.steps as { done?: boolean }).done,
maxIterations: 50,
pipeline: [
['check', checkNode, { config: {} }],
],
})
.timeout(ms: number)¶
Set workflow-level timeout in milliseconds.
.retry(config)¶
Set workflow-level retry configuration.
.retry({
maxAttempts: 3,
backoff: 'exponential', // 'fixed' | 'exponential' | 'linear'
delayMs: 1000,
maxDelayMs: 30_000,
})
.metadata(meta: Record\)¶
Attach arbitrary metadata to the workflow.
.build()¶
Required. Finalize and return the WorkflowDefinition. Throws if no trigger or steps are defined.
Complete Example¶
import { workflow } from '@flowforgejs/sdk';
import { generateTextNode, slackNode, postgresNode } from '@flowforgejs/nodes';
const wf = workflow('order-notification')
.name('Order Notification Pipeline')
.version('2.0.0')
.description('Process new orders and notify the team')
.trigger({ type: 'event', event: 'order.placed' })
.timeout(60_000)
.retry({ maxAttempts: 3, backoff: 'exponential', delayMs: 2000 })
.node('save-order', postgresNode, {
config: { connectionId: 'main-db' },
input: (ctx) => ({
action: 'insert' as const,
table: 'orders',
data: ctx.event.data as Record<string, unknown>,
returning: ['id'],
}),
})
.node('generate-summary', generateTextNode, {
config: { model: 'gpt-4o-mini', temperature: 0.3 },
input: (ctx) => ({
prompt: `Summarize this order for the sales team: ${JSON.stringify(ctx.event.data)}`,
}),
dependsOn: ['save-order'],
})
.node('notify-team', slackNode, {
config: { connectionId: 'slack-bot' },
input: (ctx) => ({
action: 'sendMessage' as const,
channel: '#orders',
text: (ctx.steps as { 'generate-summary': { text: string } })['generate-summary'].text,
}),
dependsOn: ['generate-summary'],
})
.build();