Skip to main contentCertyo Developer Portal

MongoDB Integration

Use MongoDB Change Streams for real-time CDC, or timestamp-based polling for simpler deployments. This guide covers building a watcher process that detects document changes and pushes them to Certyo for blockchain anchoring.

Change Streams require a replica set
MongoDB Change Streams require a replica set or sharded cluster. Atlas deployments always have a replica set. Standalone mongod instances must be converted to a single-node replica set before Change Streams will work.

Overview

MongoDB Change Streams provide a real-time, ordered stream of document-level changes using the oplog. A watcher process subscribes to changes on a collection (or entire database), transforms each change event into a Certyo record, and calls the ingestion API. For environments where Change Streams are not available, a polling approach using timestamp fields provides a simpler alternative.

Architecture

Data flowbash
┌──────────────────────┐       oplog tailing (real-time)     ┌───────────────────────┐
│  MongoDB             │ ──── Change Stream ────────────────> │  Watcher Process      │
│  products collection │                                      │  (Node.js / Python)   │
│  (insert/update/del) │                                      └───────────┬───────────┘
└──────────────────────┘                                                  │
                                                               transform + map fields
                                                                          │
                                                                          v
                                                          ┌────────────────────────────┐
                                                          │  POST /api/v1/records      │
                                                          │  (per change event)        │
                                                          │  or POST /api/v1/records/  │
                                                          │       bulk (batched)       │
                                                          └─────────────┬──────────────┘
                                                                        │
                                                                        v
                                                               ┌─────────────────┐
                                                               │  Certyo API     │
                                                               │  (202 Accepted) │
                                                               └────────┬────────┘
                                                                        │
                                                              ~60-90s anchoring
                                                                        │
                                                                        v
                                                               ┌─────────────────┐
                                                               │  Polygon        │
                                                               │  (on-chain)     │
                                                               └─────────────────┘

Prerequisites

  • MongoDB 4.0+ with a replica set (or MongoDB Atlas, which always provides one)
  • Node.js 18+ with the mongodb driver, or Python 3.9+ with pymongo
  • A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
  • For the Debezium approach: Docker and Docker Compose

Change Streams approach (preferred)

Change Streams provide a real-time, resumable stream of document changes. The watcher below persists its resume token in a _certyo_cursors collection so it can survive restarts without missing events.

certyo-mongo-watcher.js — Change Streams with resume token persistencejavascript
/**
 * MongoDB Change Streams watcher for Certyo.
 * Watches a collection for insert/update/replace/delete events,
 * transforms them into Certyo records, and ingests via the API.
 *
 * Install: npm install mongodb node-fetch
 * Usage:   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy node certyo-mongo-watcher.js
 */

const { MongoClient } = require("mongodb");

const MONGO_URI = process.env.MONGO_URI || "mongodb://localhost:27017/?replicaSet=rs0";
const DATABASE = process.env.MONGO_DATABASE || "productdb";
const COLLECTION = process.env.MONGO_COLLECTION || "products";

const CERTYO_API_KEY = process.env.CERTYO_API_KEY;
const CERTYO_TENANT_ID = process.env.CERTYO_TENANT_ID;
const CERTYO_BASE_URL = process.env.CERTYO_BASE_URL || "https://www.certyos.com";

if (!CERTYO_API_KEY || !CERTYO_TENANT_ID) {
  console.error("CERTYO_API_KEY and CERTYO_TENANT_ID are required");
  process.exit(1);
}

async function main() {
  const client = new MongoClient(MONGO_URI);
  await client.connect();
  console.log("Connected to MongoDB");

  const db = client.db(DATABASE);
  const collection = db.collection(COLLECTION);
  const cursorsCollection = db.collection("_certyo_cursors");

  // Load persisted resume token (if any)
  const cursor = await cursorsCollection.findOne({ _id: COLLECTION });
  const resumeAfter = cursor?.resumeToken || undefined;

  const pipeline = [
    {
      $match: {
        operationType: { $in: ["insert", "update", "replace", "delete"] },
      },
    },
  ];

  const options = {
    fullDocument: "updateLookup",
    ...(resumeAfter && { resumeAfter }),
  };

  console.log(
    resumeAfter
      ? "Resuming Change Stream from saved token"
      : "Starting Change Stream from current position"
  );

  const changeStream = collection.watch(pipeline, options);

  changeStream.on("change", async (event) => {
    try {
      const record = mapChangeEvent(event);
      const result = await ingestRecord(record);

      if (result) {
        console.log(
          `Ingested ${record.recordId}: hash=${result.recordHash}`
        );
      }

      // Persist resume token after successful ingestion
      await cursorsCollection.updateOne(
        { _id: COLLECTION },
        {
          $set: {
            resumeToken: event._id,
            lastProcessedAt: new Date(),
            lastRecordId: record.recordId,
          },
        },
        { upsert: true }
      );
    } catch (err) {
      console.error("Error processing change event:", err.message);
    }
  });

  changeStream.on("error", (err) => {
    console.error("Change Stream error:", err.message);
    // The driver will attempt to resume automatically using the resume token
  });

  // Graceful shutdown
  process.on("SIGINT", async () => {
    console.log("Shutting down...");
    await changeStream.close();
    await client.close();
    process.exit(0);
  });
}

function mapChangeEvent(event) {
  const operationType =
    event.operationType === "replace" ? "update" : event.operationType;

  // For deletes, fullDocument is null — use documentKey
  const doc = event.fullDocument || {};
  const docId =
    event.documentKey?._id?.toString() || doc._id?.toString() || "unknown";

  return {
    tenantId: CERTYO_TENANT_ID,
    database: DATABASE,
    collection: COLLECTION,
    recordId: docId,
    recordVersion: event._id?._data || "1",
    operationType,
    recordPayload:
      operationType === "delete"
        ? { _id: docId, _deleted: true }
        : sanitizeDocument(doc),
    sourceTimestamp:
      event.clusterTime?.toNumber()
        ? new Date(event.clusterTime.toNumber() * 1000).toISOString()
        : new Date().toISOString(),
    idempotencyKey: `mongo-${COLLECTION}-${docId}-${event._id?._data?.substring(0, 16) || Date.now()}`,
  };
}

function sanitizeDocument(doc) {
  // Convert MongoDB-specific types to plain JSON
  const sanitized = { ...doc };
  if (sanitized._id) {
    sanitized._id = sanitized._id.toString();
  }
  return sanitized;
}

async function ingestRecord(record) {
  const response = await fetch(`${CERTYO_BASE_URL}/api/v1/records`, {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "X-API-Key": CERTYO_API_KEY,
    },
    body: JSON.stringify(record),
  });

  if (!response.ok) {
    const body = await response.text();
    console.error(`Ingestion failed for ${record.recordId}: ${response.status} ${body}`);
    return null;
  }

  return response.json();
}

main().catch((err) => {
  console.error("Fatal error:", err);
  process.exit(1);
});

Field mapping

Map MongoDB document fields to the Certyo record schema:

MongoDB → Certyo field mappingbash
MongoDB Field            Certyo Field       Notes
──────────────────────   ─────────────────  ────────────────────────────────────
_id (ObjectId)           recordId           Cast to string via .toString()
collection name          collection         e.g. "products"
database name            database           e.g. "productdb"
Full document (JSON)     recordPayload      Sanitize ObjectIds and Dates
clusterTime              sourceTimestamp     From change event; convert to ISO 8601
operationType            operationType      insert, update, replace → update, delete
_id + resume token       idempotencyKey     Ensures deduplication on retry

Verification

After anchoring completes (~60-90 seconds), verify the record and write the status back to the source document. This creates an audit trail directly in MongoDB.

verify-and-update.js — Verify anchoring and update source documentjavascript
const { MongoClient } = require("mongodb");

const MONGO_URI = process.env.MONGO_URI || "mongodb://localhost:27017/?replicaSet=rs0";
const DATABASE = process.env.MONGO_DATABASE || "productdb";
const COLLECTION = process.env.MONGO_COLLECTION || "products";
const CERTYO_API_KEY = process.env.CERTYO_API_KEY;
const CERTYO_TENANT_ID = process.env.CERTYO_TENANT_ID;
const CERTYO_BASE_URL = process.env.CERTYO_BASE_URL || "https://www.certyos.com";

async function verifyAndUpdate() {
  const client = new MongoClient(MONGO_URI);
  await client.connect();

  const db = client.db(DATABASE);
  const collection = db.collection(COLLECTION);

  // Find documents that have been ingested but not yet verified
  const docs = await collection
    .find({
      certyo_status: { $ne: "anchored" },
      _id: { $exists: true },
    })
    .limit(100)
    .toArray();

  console.log(`Verifying ${docs.length} documents...`);

  for (const doc of docs) {
    const docId = doc._id.toString();

    const response = await fetch(`${CERTYO_BASE_URL}/api/v1/verify/record`, {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        "X-API-Key": CERTYO_API_KEY,
      },
      body: JSON.stringify({
        tenantId: CERTYO_TENANT_ID,
        database: DATABASE,
        collection: COLLECTION,
        recordId: docId,
      }),
    });

    if (!response.ok) {
      console.log(`  ${docId}: verification request failed (${response.status})`);
      continue;
    }

    const result = await response.json();

    if (result.verified && result.anchoredOnChain) {
      await collection.updateOne(
        { _id: doc._id },
        {
          $set: {
            certyo_status: "anchored",
            certyo_record_hash: result.recordHash,
            certyo_verified_at: new Date(),
            certyo_tx_hash: result.transactionHash,
          },
        }
      );
      console.log(`  ${docId}: anchored (tx: ${result.transactionHash})`);
    } else {
      console.log(`  ${docId}: not yet anchored`);
    }
  }

  await client.close();
}

verifyAndUpdate().catch(console.error);

Resume token persistence

The Change Streams watcher stores its resume token in a _certyo_cursors collection in the same database. This allows the watcher to:

  • Survive restarts — On startup, the watcher loads the last resume token and continues from where it left off
  • Handle connection drops — The MongoDB driver automatically attempts to resume using the token when a connection error occurs
  • Avoid duplicates — Combined with the idempotencyKey, replayed events are deduplicated by Certyo

Token expiry

Resume tokens are only valid while the corresponding oplog entries exist. MongoDB Atlas retains oplog entries for at least 24 hours (configurable). If the watcher is offline longer than the oplog window, it will receive an InvalidResumeToken error. In that case, drop the resume token from _certyo_cursors and perform a full collection sync before restarting the watcher.


Bulk optimization

For collections with high write throughput, batch multiple change events before calling the API:

  • POST /api/v1/records/bulk accepts up to 1000 records per request
  • Buffer change events for a short window (e.g., 5 seconds) and send them as a single bulk request
  • The polling approach naturally batches — it sends all modified documents found in each cycle
  • For the Debezium approach, use a Kafka consumer with a batch listener that accumulates messages before posting to Certyo

Troubleshooting

  • "MongoError: The $changeStream stage is only supported on replica sets" — Your MongoDB instance is running as a standalone. Convert to a single-node replica set: add --replSet rs0 to your mongod startup and run rs.initiate() in the shell.
  • Missing updates in Change Stream — Ensure fullDocument: "updateLookup" is set in the watch options. Without it, update events only contain the changed fields, not the full document.
  • Resume token invalid — The watcher was offline longer than the oplog retention window. Delete the token from _certyo_cursors and restart. Consider a full collection resync.
  • Polling misses deletes — The polling approach only catches documents that exist and have an updatedAt field. It cannot detect deletes. Use Change Streams or soft deletes for full coverage.
  • Duplicate records in Certyo — The idempotencyKey prevents true duplicates. If you see apparent duplicates, check that different operations on the same document (insert then update) are generating distinct keys.

AI Integration · v1.0.0

AI Integration Skill

Download a skill file that enables AI agents to generate working MongoDB + Certyo integration code for any language or framework.

v1.0.0
What is this?
A markdown file containing MongoDB-specific field mappings, authentication setup, code examples, and integration patterns for Certyo. Drop it into your AI agent's context and ask it to generate integration code.

What's inside

  • AuthenticationMongoDB connection string and Certyo API key configuration
  • ArchitectureChange Streams real-time watcher with resume token persistence
  • Field mapping_id, collection, database to Certyo record schema
  • Code examplesNode.js Change Streams, Python polling, Debezium CDC
  • VerificationDocument update with certyo_status and recordHash fields
  • PatternsReal-time streaming, batch polling, and Debezium CDC approaches

How to use

Claude Code

Place the file in your project's .claude/commands/ directory, then use it as a slash command:

# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-mongodb.md \
  https://www.certyos.com/developers/skills/certyo-mongodb-skill.md

# Use it in Claude Code
/certyo-mongodb "Generate a Node.js watcher that uses Change Streams to push document changes to Certyo"

Cursor / Copilot / Any AI Agent

Add the file to your project root or attach it to a conversation. The AI agent will use the MongoDB-specific patterns, field mappings, and code examples to generate correct integration code.

# Add to your project
curl -o CERTYO_MONGODB.md \
  https://www.certyos.com/developers/skills/certyo-mongodb-skill.md

# Then in your AI agent:
"Using the Certyo MongoDB spec in CERTYO_MONGODB.md,
 generate a node.js watcher that uses change streams to push document changes to certyo"

CLAUDE.md Context File

Append the skill file to your project's CLAUDE.md so every Claude conversation has MongoDB + Certyo context automatically.

# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo MongoDB Integration" >> CLAUDE.md
cat CERTYO_MONGODB.md >> CLAUDE.md