Core Concepts¶
FlowForge has six core primitives: nodes, workflows, the engine, context, tools, and integrations.
Nodes¶
A node is a unit of work. It is defined by a NodeDefinition object that declares its identity, schemas, handler, and optional lifecycle hooks.
import { z } from 'zod';
import { defineNode } from '@flowforgejs/sdk';
const myNode = defineNode({
name: 'my-node',
version: '1.0.0',
description: 'Does something useful',
category: 'transform',
// Zod schemas -- validated at runtime by the engine
inputSchema: z.object({ text: z.string() }),
outputSchema: z.object({ upper: z.string() }),
configSchema: z.object({ prefix: z.string().default('') }),
// The main execution function
handler: async (ctx) => {
const prefix = (ctx.config as { prefix: string }).prefix;
return { upper: prefix + ctx.input.text.toUpperCase() };
},
// Optional lifecycle hooks
onInit: async (config) => {
/* called once when node is first loaded */
},
onDestroy: async () => {
/* cleanup resources */
},
onError: async (error, ctx) => {
/* custom error handling */
},
// Retry and timeout
retries: 3, // default: 3
timeout: 10_000, // milliseconds
});
Node categories¶
Every node belongs to one of six categories:
| Category | Purpose | Examples |
|---|---|---|
data |
Read and write external data stores | PostgreSQL, Redis, MongoDB, S3, Elasticsearch |
communication |
Interact with third-party services | Slack, GitHub, Jira, Salesforce, Email |
ai |
LLM operations | generate-text, generate-object, embed, agent, mcp-client |
control |
Workflow control flow | if, switch, forEach, parallel, while, delay, sub-workflow |
transform |
Data transformation | map, filter, reduce, template |
custom |
User-defined nodes | Anything you build with defineNode() |
Handler function¶
The handler receives a NodeContext and must return a value that conforms to the outputSchema. The engine validates both the input before calling the handler and the output after it returns.
Lifecycle hooks¶
| Hook | When it runs |
|---|---|
onInit(config) |
Once, when the node is first loaded with its config |
onDestroy() |
Once, during engine shutdown |
onError(error, ctx) |
On handler failure, before the retry mechanism kicks in |
Workflows¶
A workflow is a directed acyclic graph (DAG) of steps, initiated by a trigger. You build workflows with the fluent workflow() builder.
import { workflow } from '@flowforgejs/sdk';
const myWorkflow = workflow('my-workflow')
.name('My Workflow')
.version('1.0.0')
.description('Processes incoming data')
.trigger({ type: 'event', event: 'data.received' })
.timeout(60_000)
.retry({ maxAttempts: 3 })
.node('step-1', nodeA)
.node('step-2', nodeB, {
input: (ctx) => ctx.steps['step-1'],
when: (ctx) => ctx.steps['step-1'] !== null,
dependsOn: ['step-1'],
})
.build();
Triggers¶
A trigger determines how a workflow is started:
| Type | Description | Example |
|---|---|---|
manual |
Started explicitly via engine.trigger() |
{ type: "manual" } |
event |
Started when a matching event is emitted | { type: "event", event: "order.created" } |
cron |
Started on a schedule | { type: "cron", cron: "0 */6 * * *" } |
webhook |
Started by an incoming HTTP request | { type: "webhook", webhook: { path: "/hooks/deploy" } } |
sub-workflow |
Started by another workflow | { type: "sub-workflow" } |
Step options¶
Each .node() call accepts optional configuration:
input-- A function(ctx: StepContext) => unknownor a static object. Maps data from the trigger event or previous step outputs into this step's input.config-- Static configuration passed to the node'sconfigSchema.when-- A predicate(ctx: StepContext) => boolean. If it returnsfalse, the step is skipped.dependsOn-- An array of step names. The engine uses these to build the DAG and determine execution order.
Control flow¶
The builder exposes control flow methods that compose with regular node steps:
| Method | Description |
|---|---|
.if(name, { condition, then, else }) |
Conditional branching |
.switch(name, { value, cases, default }) |
Multi-way branching |
.forEach(name, { items, concurrency, pipeline }) |
Iterate over a collection |
.parallel(name, { items, concurrency, pipeline }) |
Fan-out parallel execution |
.while(name, { condition, maxIterations, pipeline }) |
Loop while condition holds |
Engine¶
The Engine is the runtime that registers workflows and executes them.
import { Engine } from '@flowforgejs/engine';
const engine = new Engine();
// Register workflows
engine.register(myWorkflow);
// Trigger a workflow directly
const run = await engine.trigger('my-workflow', { key: 'value' });
// Or emit an event -- all workflows listening for it will trigger
const runs = await engine.emit('data.received', { key: 'value' });
// Inspect runs
console.log(run.status); // "completed" | "failed" | "cancelled"
console.log(run.output); // output of the last step
// Cleanup
await engine.destroy();
Key methods¶
| Method | Description |
|---|---|
register(workflow) |
Register a workflow definition. Event-triggered workflows auto-subscribe. |
unregister(workflowId) |
Remove a workflow. |
trigger(workflowId, input?) |
Execute a workflow and return a RunRecord. |
emit(event, payload?) |
Emit an event. Returns RunRecord[] for all triggered workflows. |
registerAdaptor(adaptor) |
Register a data adaptor for ctx.pull()/ctx.push(). |
registerIntegration(adaptor) |
Register an integration adaptor for ctx.integrate(). |
getRun(runId) |
Retrieve a run record by ID. |
listRuns(workflowId?) |
List all runs, optionally filtered by workflow. |
cancelRun(runId) |
Cancel a running workflow. |
destroy() |
Shut down the engine and release all resources. |
Context¶
Every node handler receives a NodeContext object. This is the primary interface between your code and the engine runtime.
interface NodeContext<TInput = unknown, TConfig = unknown> {
input: TInput; // Validated input data
config: TConfig; // Validated config data
event: WorkflowEvent; // The trigger event
steps: Record<string, unknown>; // Outputs from previous steps
logger: Logger; // Structured logger (Pino-compatible)
signal: AbortSignal; // Abort signal for cancellation
metadata: WorkflowMetadata; // Run ID, workflow ID, attempt number
// Data operations
pull(source: string, params: unknown): Promise<unknown>;
push(target: string, params: unknown): Promise<unknown>;
// Integration operations
integrate(name: string, action: string, params: unknown): Promise<unknown>;
// AI operations
ai: AIContext;
// Event operations
emit(event: string, data: unknown): Promise<void>;
wait(event: string, match?: unknown, timeout?: number): Promise<unknown>;
sleep(ms: number): Promise<void>;
checkpoint(): Promise<void>;
}
Data operations: pull and push¶
These connect to registered data adaptors (PostgreSQL, Redis, MongoDB, etc.):
handler: async (ctx) => {
const users = await ctx.pull('postgres', {
query: 'SELECT * FROM users WHERE active = true',
});
await ctx.push('redis', {
key: 'active-users',
value: JSON.stringify(users),
});
return users;
};
Integration operations: integrate¶
Calls a registered integration adaptor (Composio-backed or custom):
handler: async (ctx) => {
await ctx.integrate('slack', 'send_message', {
channel: '#alerts',
text: `Pipeline completed: ${ctx.steps['report']}`,
});
};
AI operations: ai¶
Wraps the Vercel AI SDK for LLM interactions:
handler: async (ctx) => {
const result = await ctx.ai.generateText({
model: 'gpt-4o',
prompt: `Summarize: ${ctx.input.text}`,
});
return { summary: result.text };
};
The AIContext interface exposes four methods:
| Method | Description |
|---|---|
generateText(params) |
Generate text, optionally with tool calling |
streamText(params) |
Stream text generation |
generateObject(params) |
Generate a structured object validated against a Zod schema |
embed(params) |
Generate vector embeddings |
Event operations¶
| Method | Description |
|---|---|
emit(event, data) |
Emit an event that can trigger other workflows |
wait(event, match?, timeout?) |
Pause execution until a matching event arrives |
sleep(ms) |
Pause execution for a duration |
checkpoint() |
Persist the current workflow state for recovery |
Tools¶
Tool nodes are specialized nodes designed to be called by AI agents. They wrap capabilities like filesystem access, shell execution, and web browsing behind Zod-validated interfaces with configurable permission scopes.
FlowForge ships 9 tool nodes:
| Tool | Description |
|---|---|
| Filesystem | Read, write, list, and delete files |
| Shell | Execute shell commands |
| Code Interpreter | Run Python/JS in a sandboxed E2B environment |
| Web Search | Search the web via DuckDuckGo |
| Web Scrape | Extract content from web pages with Cheerio |
| Browser | Automate browsers with Puppeteer |
| Git | Clone, commit, push, and manage repositories |
| Document Parser | Parse PDFs and structured documents |
| Math | Evaluate mathematical expressions with math.js |
Tool nodes are regular NodeDefinition objects. You can use them as standalone workflow steps or pass them as tools to an agent node.
Safety
Tool nodes that access the filesystem, shell, or network should be gated by permission scopes. See Safety and Permissions for configuration details.
Integrations¶
FlowForge provides 46 communication nodes backed by the Composio integration platform. Each integration is a NodeDefinition that calls ctx.integrate() under the hood, routing requests through the integration service to the appropriate Composio adaptor.
Integrations span messaging, email, developer tools, project management, CRM, payments, social media, support, and analytics. See Integrations Overview for the full list.
Custom integrations¶
You can register your own integration adaptor by implementing the IntegrationAdaptor interface:
const myAdaptor: IntegrationAdaptor = {
name: 'my-service',
actions: ['send', 'fetch'],
execute: async (action, params, connectionId) => {
// Call your service
},
healthCheck: async () => true,
};
engine.registerIntegration(myAdaptor);
Safety and Permissions¶
FlowForge supports a human-in-the-loop approval model for sensitive operations. The human-approval control node pauses workflow execution and waits for external approval before allowing downstream steps to proceed.
Combined with configurable permission scopes on tool nodes, this provides layered safety:
- Permission scopes -- Restrict what a tool node is allowed to do (read-only filesystem, no network, etc.)
- Human approval gates -- Require explicit approval before executing dangerous steps
- Abort signals -- Cancel running workflows at any time via
engine.cancelRun() - Timeouts -- Node-level and workflow-level timeout enforcement
See Safety and Permissions for implementation details.