Skip to content

Data Nodes

Data nodes connect workflows to databases, object stores, message brokers, and HTTP endpoints. All data nodes follow a consistent pattern: they use ctx.pull() for reads and ctx.push() for writes, routing through registered data adaptors.

Overview

Node Description Actions
data/postgres PostgreSQL queries and mutations query, insert, upsert, update, delete
data/redis Redis key-value, list, and pub/sub operations get, set, delete, lpush, lrange, publish, subscribe
data/mongodb MongoDB document operations find, findOne, insertOne, insertMany, updateOne, deleteOne
data/s3 AWS S3 object storage getObject, putObject, deleteObject, listObjects, copyObject
data/http HTTP client for REST APIs get, post, put, patch, delete, head
data/elasticsearch Elasticsearch search and indexing search, index, update, delete, bulk
data/kafka Apache Kafka produce and consume produce, consume, createTopic
data/pinecone Pinecone vector database upsert, query, delete, fetch, listIndexes

The pull/push Pattern

Every data node uses the engine's adaptor system. When a node calls ctx.pull('postgres', params) or ctx.push('postgres', params), the engine routes the request to the registered data adaptor for that source:

// Register adaptors at engine startup
engine.registerAdaptor(postgresAdaptor);
engine.registerAdaptor(redisAdaptor);

// In a node handler, pull reads data:
const result = await ctx.pull('postgres', {
  connectionId: 'main-db',
  query: 'SELECT * FROM users WHERE active = $1',
  params: [true],
});

// push writes data:
await ctx.push('postgres', {
  connectionId: 'main-db',
  query: 'INSERT INTO logs (message) VALUES ($1)',
  params: ['event processed'],
});

postgres

Execute queries and mutations against a PostgreSQL database.

Input:

Field Type Required Description
action 'query' \| 'insert' \| 'upsert' \| 'update' \| 'delete' Yes Operation type
table string For mutations Target table
query string For query Raw SQL query
params unknown[] No Query parameters
data Record<string, unknown> For insert/upsert/update Column values
where Record<string, unknown> For update/delete WHERE conditions
conflictColumns string[] For upsert ON CONFLICT columns
returning string[] No RETURNING clause columns

Config: { connectionId: string }

Output: { rows: Record<string, unknown>[], rowCount: number, command: string }

import { postgresNode } from '@flowforgejs/nodes';

// Query
workflow('db-query')
  .trigger({ type: 'manual' })
  .node('fetch-users', postgresNode, {
    config: { connectionId: 'main-db' },
    input: () => ({
      action: 'query' as const,
      query: 'SELECT id, name, email FROM users WHERE active = $1 LIMIT $2',
      params: [true, 100],
    }),
  })
  .build();

// Upsert
workflow('db-upsert')
  .trigger({ type: 'event', event: 'user.synced' })
  .node('upsert-user', postgresNode, {
    config: { connectionId: 'main-db' },
    input: (ctx) => ({
      action: 'upsert' as const,
      table: 'users',
      data: ctx.event.data as Record<string, unknown>,
      conflictColumns: ['email'],
      returning: ['id'],
    }),
  })
  .build();

redis

Perform key-value, list, and pub/sub operations on Redis.

Common actions: get, set, delete, lpush, lrange, publish, subscribe

import { redisNode } from '@flowforgejs/nodes';

workflow('cache')
  .trigger({ type: 'event', event: 'data.computed' })
  .node('cache-result', redisNode, {
    config: { connectionId: 'cache' },
    input: (ctx) => ({
      action: 'set',
      key: `result:${(ctx.event.data as { id: string }).id}`,
      value: JSON.stringify(ctx.event.data),
      ttl: 3600,
    }),
  })
  .build();

http

General-purpose HTTP client for calling REST APIs.

Common actions: get, post, put, patch, delete, head

import { httpNode } from '@flowforgejs/nodes';

workflow('api-call')
  .trigger({ type: 'manual' })
  .node('fetch-data', httpNode, {
    config: {},
    input: () => ({
      action: 'get',
      url: 'https://api.example.com/data',
      headers: { Authorization: 'Bearer token123' },
      timeout: 10_000,
    }),
  })
  .build();

mongodb

Perform document operations on MongoDB collections.

Common actions: find, findOne, insertOne, insertMany, updateOne, deleteOne

import { mongodbNode } from '@flowforgejs/nodes';

workflow('mongo-insert')
  .trigger({ type: 'event', event: 'order.placed' })
  .node('save-order', mongodbNode, {
    config: { connectionId: 'orders-db' },
    input: (ctx) => ({
      action: 'insertOne',
      collection: 'orders',
      document: ctx.event.data,
    }),
  })
  .build();

s3

Interact with AWS S3 or S3-compatible object storage.

Common actions: getObject, putObject, deleteObject, listObjects, copyObject

import { s3Node } from '@flowforgejs/nodes';

workflow('upload')
  .trigger({ type: 'event', event: 'file.ready' })
  .node('upload-to-s3', s3Node, {
    config: { connectionId: 'assets' },
    input: (ctx) => ({
      action: 'putObject',
      bucket: 'my-bucket',
      key: `uploads/${(ctx.event.data as { filename: string }).filename}`,
      body: (ctx.event.data as { content: string }).content,
      contentType: 'application/octet-stream',
    }),
  })
  .build();

elasticsearch

Search and index documents in Elasticsearch.

Common actions: search, index, update, delete, bulk


kafka

Produce and consume messages with Apache Kafka.

Common actions: produce, consume, createTopic


pinecone

Operate on a Pinecone vector database for similarity search and retrieval.

Common actions: upsert, query, delete, fetch, listIndexes

import { pineconeNode } from '@flowforgejs/nodes';

workflow('semantic-search')
  .trigger({ type: 'manual' })
  .node('search-vectors', pineconeNode, {
    config: { connectionId: 'vectors' },
    input: (ctx) => ({
      action: 'query',
      vector: (ctx.steps as { embed: { embedding: number[] } }).embed.embedding,
      topK: 10,
      namespace: 'documents',
    }),
  })
  .build();

Combining with embed

Pinecone pairs naturally with the ai/embed node. Generate an embedding first, then query Pinecone for similar vectors in a two-step workflow.