Skip to content

Control Flow Nodes

Control flow nodes manage the execution order, branching, iteration, and orchestration of workflows. FlowForge uses a DAG-based execution model where steps can declare dependencies, and the runner resolves execution order via topological sort.

Overview

Node Description
control/delay Pause execution for a configurable duration
control/if Conditional branching (then/else)
control/switch Multi-way branching based on a value
control/parallel Fan-out items for concurrent processing
control/forEach Iterate over an array with concurrency control
control/while Repeat while a condition holds (with max iteration guard)
control/wait-for-event Pause and wait for an external event
control/sub-workflow Trigger another workflow via event emission
control/human-approval Pause for human approval before proceeding

delay

Pause workflow execution for a configurable duration. Data passes through unchanged.

Config: { ms: number } -- delay in milliseconds.

Input: { data?: unknown } -- optional passthrough data.

Output: { data: unknown, delayMs: number, resumedAt: string }

import { delayNode } from '@flowforgejs/nodes';

workflow('rate-limited')
  .trigger({ type: 'event', event: 'batch.ready' })
  .node('process', processNode, {
    /* ... */
  })
  .node('wait', delayNode, {
    config: { ms: 2000 },
    input: (ctx) => ({ data: ctx.steps }),
  })
  .node('next-batch', processNode, {
    /* ... */
  })
  .build();

if

Conditional branching. Evaluates a boolean condition and returns the matching branch value.

Input: { condition: boolean, thenValue?: unknown, elseValue?: unknown }

Output: { branch: 'then' | 'else', value: unknown }

import { ifNode } from '@flowforgejs/nodes';

workflow('conditional')
  .trigger({ type: 'event', event: 'order.placed' })
  .node('check-amount', ifNode, {
    input: (ctx) => ({
      condition: (ctx.event.data as { amount: number }).amount > 1000,
      thenValue: 'high-value',
      elseValue: 'standard',
    }),
  })
  .build();

For structural branching (running different step pipelines per branch), use the workflow builder's .if() method instead:

workflow('branching')
  .trigger({ type: 'manual' })
  .if('route', {
    condition: (ctx) => (ctx.event.data as { premium: boolean }).premium,
    then: [['premium-flow', premiumNode]],
    else: [['standard-flow', standardNode]],
  })
  .build();

switch

Multi-way branching. Routes to one of N outputs based on a value.

Input: { value: unknown, cases: Record<string, unknown>, default?: unknown }

Output: { matched: string | null, value: unknown }

import { switchNode } from '@flowforgejs/nodes';

workflow('route-by-type')
  .trigger({ type: 'event', event: 'ticket.created' })
  .node('route', switchNode, {
    input: (ctx) => ({
      value: (ctx.event.data as { priority: string }).priority,
      cases: {
        critical: { escalate: true, team: 'on-call' },
        high: { escalate: false, team: 'engineering' },
        low: { escalate: false, team: 'support' },
      },
      default: { escalate: false, team: 'triage' },
    }),
  })
  .build();

parallel

Fan-out items for concurrent processing with configurable concurrency.

Config: { concurrency: number } -- max concurrent executions (default: 10).

Input: { items: unknown[] }

Output: { results: unknown[], totalItems: number }

For the full fan-out pattern with sub-pipelines, use the workflow builder:

workflow('parallel-processing')
  .trigger({ type: 'manual' })
  .parallel('process-all', {
    items: (ctx) => (ctx.event.data as { urls: string[] }).urls,
    concurrency: 5,
    pipeline: (item) => [
      ['fetch', httpNode, { input: () => ({ action: 'get', url: item as string }) }],
      [
        'summarize',
        generateTextNode,
        {
          config: { model: 'gpt-4o' },
          input: (ctx) => ({
            prompt: `Summarize: ${JSON.stringify(ctx.steps)}`,
          }),
        },
      ],
    ],
  })
  .build();

forEach

Iterate over an array, running a sub-pipeline for each item with concurrency control.

Config: { concurrency: number } -- max concurrent iterations (default: 1).

Input: { items: unknown[] }

Output: { results: unknown[], processedCount: number }

workflow('process-each')
  .trigger({ type: 'event', event: 'batch.ready' })
  .forEach('process-items', {
    items: (ctx) => (ctx.event.data as { records: unknown[] }).records,
    concurrency: 3,
    pipeline: (item, index) => [
      ['transform', transformNode, { input: () => ({ data: item, index }) }],
      [
        'save',
        postgresNode,
        {
          config: { connectionId: 'db' },
          input: (ctx) => ({
            action: 'insert' as const,
            table: 'results',
            data: ctx.steps as Record<string, unknown>,
          }),
        },
      ],
    ],
  })
  .build();

while

Repeat a sub-pipeline while a condition is true. Includes a maxIterations guard to prevent infinite loops.

Config: { maxIterations: number } -- upper bound (default: 100).

Input: { condition: boolean, data?: unknown }

workflow('polling')
  .trigger({ type: 'manual' })
  .while('poll-until-done', {
    condition: (ctx) => !(ctx.steps as { done?: boolean }).done,
    maxIterations: 20,
    pipeline: [
      [
        'check-status',
        httpNode,
        {
          input: () => ({ action: 'get', url: 'https://api.example.com/status' }),
        },
      ],
    ],
  })
  .build();

wait-for-event

Pause workflow execution and wait for a specific event to arrive. Supports matching criteria and timeout.

Input:

Field Type Required Description
event string Yes Event type to wait for
match Record<string, unknown> No Matching criteria for the event payload
timeout number No Timeout in milliseconds

Config: { defaultTimeout: number } -- default 300,000ms (5 minutes).

Output: { event: string, data: unknown, receivedAt: string, timedOut: boolean }

import { waitForEventNode } from '@flowforgejs/nodes';

workflow('approval-flow')
  .trigger({ type: 'event', event: 'deployment.requested' })
  .node('deploy', deployNode, {
    /* ... */
  })
  .node('wait-for-verification', waitForEventNode, {
    config: { defaultTimeout: 600_000 },
    input: () => ({
      event: 'deployment.verified',
      match: { environment: 'production' },
    }),
  })
  .build();

sub-workflow

Trigger another workflow as a sub-workflow via event emission. Optionally wait for its completion.

Input:

Field Type Required Description
workflowId string Yes Target workflow identifier
data unknown No Data to pass to the sub-workflow
waitForCompletion boolean No Wait for the sub-workflow to finish (default: false)

Config: { eventPrefix: string, timeout: number }

import { subWorkflowNode } from '@flowforgejs/nodes';

workflow('orchestrator')
  .trigger({ type: 'manual' })
  .node('run-etl', subWorkflowNode, {
    config: { eventPrefix: 'workflow:trigger', timeout: 300_000 },
    input: () => ({
      workflowId: 'etl-pipeline',
      data: { source: 'orders', date: '2025-01-01' },
      waitForCompletion: true,
    }),
  })
  .node('run-report', subWorkflowNode, {
    input: (ctx) => ({
      workflowId: 'generate-report',
      data: ctx.steps,
      waitForCompletion: true,
    }),
  })
  .build();

human-approval

Pause workflow execution and wait for a human to approve or reject an action. This is the primary human-in-the-loop mechanism in FlowForge.

Input:

Field Type Required Description
action string Yes Description of what needs approval
details unknown No Detailed payload for reviewer inspection
urgency 'low' \| 'normal' \| 'high' \| 'critical' No Urgency level (default: normal)
timeout number No Override timeout in milliseconds

Config:

Field Type Default Description
defaultTimeout number 3,600,000 (1 hour) Default approval timeout
autoApprove boolean false Skip approval, auto-approve (for dev/testing)
autoReject boolean false Skip approval, auto-reject (for CI/headless)

Output: { approved: boolean, approvedBy?: string, reason?: string, respondedAt: string, timedOut: boolean, originalAction: string, details?: unknown }

import { humanApprovalNode } from '@flowforgejs/nodes';

workflow('deploy-production')
  .trigger({ type: 'event', event: 'deploy.requested' })
  .node('approval', humanApprovalNode, {
    config: {
      defaultTimeout: 1_800_000, // 30 minutes
      autoApprove: process.env.NODE_ENV === 'development',
      autoReject: process.env.CI === 'true',
    },
    input: (ctx) => ({
      action: 'Deploy to production',
      details: ctx.event.data,
      urgency: 'high' as const,
    }),
  })
  .if('check-approval', {
    condition: (ctx) => (ctx.steps as { approval: { approved: boolean } }).approval.approved,
    then: [['deploy', deployNode]],
    else: [
      [
        'notify-rejected',
        slackNode,
        {
          config: { connectionId: 'slack' },
          input: () => ({
            action: 'sendMessage' as const,
            channel: '#deploys',
            text: 'Production deploy was rejected.',
          }),
        },
      ],
    ],
  })
  .build();

Timeout behavior

If a human-approval times out, the node returns approved: false and timedOut: true. Your workflow should handle this case explicitly.

Events

The node emits a human-approval.requested event that external systems (dashboards, Slack bots, email) can subscribe to. It then waits for a human-approval.response event containing { approved, approvedBy?, reason? }.


DAG Execution Model

When workflow steps declare dependsOn arrays, the runner builds a DAG and resolves execution order via topological sort. Steps in the same layer run in parallel:

workflow('dag-example')
  .trigger({ type: 'manual' })
  .node('fetch-a', fetchNode, {
    /* ... */
  })
  .node('fetch-b', fetchNode, {
    /* ... */
  })
  .node('combine', combineNode, {
    dependsOn: ['fetch-a', 'fetch-b'],
    input: (ctx) => ({
      a: (ctx.steps as Record<string, unknown>)['fetch-a'],
      b: (ctx.steps as Record<string, unknown>)['fetch-b'],
    }),
  })
  .build();

In this example, fetch-a and fetch-b run in parallel (no dependencies), and combine runs only after both complete.