Skip to content

Nodes

Nodes are the fundamental building blocks of FlowForge workflows. Each node encapsulates a discrete unit of work -- reading from a database, calling an API, transforming data, or invoking an LLM -- and exposes a typed contract of inputs, outputs, and configuration.

What is a Node?

A node is defined by the NodeDefinition interface from @flowforgejs/shared:

interface NodeDefinition {
  name: string; // Unique identifier, e.g. "ai/generate-text"
  version: string; // Semver version
  description: string; // Human-readable description
  category: NodeCategory; // One of: data, communication, ai, control, transform, custom

  inputSchema: z.ZodTypeAny; // Zod schema for input validation
  outputSchema: z.ZodTypeAny; // Zod schema for output validation
  configSchema: z.ZodTypeAny; // Zod schema for static configuration

  handler: (ctx: NodeContext) => Promise<unknown>; // Execution logic

  // Lifecycle hooks (optional)
  onInit?: (config: unknown) => Promise<void>;
  onDestroy?: () => Promise<void>;
  onError?: (error: Error, ctx: NodeContext) => Promise<void>;

  // Execution options (optional)
  retries?: number;
  timeout?: number;
  tags?: string[];
  author?: string;
  repository?: string;
}

Every node receives a NodeContext at runtime, which provides access to data sources, integrations, AI capabilities, event operations, and workflow metadata.

Node Categories

FlowForge ships with 50+ built-in nodes organized into the following categories:

Category Description Count Examples
AI LLM generation, embeddings, agents, MCP 5 generate-text, agent, mcp-client
Data Database and storage connectors 9 postgres, redis, s3, http
Communication Third-party service integrations 47 slack, github, stripe, notion
Control Flow control and orchestration 9 if, parallel, forEach, human-approval
Transform Data transformation primitives 4 map, filter, reduce, template
Tools System-level utilities 10 filesystem, shell, browser, web-search
Log Logging and observability 1 console

Using Nodes in Workflows

Nodes are used as steps inside workflows. You can reference built-in nodes or custom ones:

import { workflow } from '@flowforgejs/sdk';
import { generateTextNode } from '@flowforgejs/nodes';

const myWorkflow = workflow('summarize')
  .name('Summarize Document')
  .trigger({ type: 'event', event: 'document.uploaded' })
  .node('summarize', generateTextNode, {
    config: { model: 'gpt-4o', temperature: 0.3 },
    input: (ctx) => ({
      prompt: `Summarize: ${(ctx.event.data as { text: string }).text}`,
    }),
  })
  .build();

You can also use inline functions as steps without defining a full node:

const wf = workflow('quick')
  .trigger({ type: 'manual' })
  .node('process', (ctx) => {
    return { processed: true, input: ctx.event.data };
  })
  .build();

Creating Custom Nodes

Use defineNode() from @flowforgejs/sdk to create reusable nodes:

import { z } from 'zod';
import { defineNode } from '@flowforgejs/sdk';

export const myNode = defineNode({
  name: 'custom/my-node',
  version: '1.0.0',
  description: 'A custom node that does something useful',
  category: 'custom',
  inputSchema: z.object({
    message: z.string(),
  }),
  outputSchema: z.object({
    result: z.string(),
    processedAt: z.string(),
  }),
  configSchema: z.object({
    uppercase: z.boolean().default(false),
  }),
  handler: async (ctx) => {
    const { message } = ctx.input as { message: string };
    const { uppercase } = ctx.config as { uppercase: boolean };
    return {
      result: uppercase ? message.toUpperCase() : message,
      processedAt: new Date().toISOString(),
    };
  },
});

Publishing nodes

Custom nodes are plain TypeScript objects that conform to NodeDefinition. You can publish them as npm packages and import them in any FlowForge project. See the Custom Nodes Guide for details.

NodeContext

Every node handler receives a NodeContext object with these capabilities:

Property/Method Description
ctx.input Validated input data
ctx.config Validated static configuration
ctx.event The workflow trigger event
ctx.steps Outputs from previous steps (keyed by step name)
ctx.logger Pino-compatible logger
ctx.ai AI operations (generateText, generateObject, embed, streamText)
ctx.pull(source, params) Read from a registered data adaptor
ctx.push(target, params) Write to a registered data adaptor
ctx.integrate(name, action, params) Call a registered integration
ctx.emit(event, data) Emit an event for downstream workflows
ctx.wait(event, match?, timeout?) Pause and wait for an event
ctx.sleep(ms) Pause execution for a duration
ctx.checkpoint() Save state for durable execution
ctx.metadata Run metadata (runId, workflowId, attempt, etc.)
ctx.signal AbortSignal for cancellation