MongoDB Integration
Use MongoDB Change Streams for real-time CDC, or timestamp-based polling for simpler deployments. This guide covers building a watcher process that detects document changes and pushes them to Certyo for blockchain anchoring.
Overview
MongoDB Change Streams provide a real-time, ordered stream of document-level changes using the oplog. A watcher process subscribes to changes on a collection (or entire database), transforms each change event into a Certyo record, and calls the ingestion API. For environments where Change Streams are not available, a polling approach using timestamp fields provides a simpler alternative.
Architecture
┌──────────────────────┐ oplog tailing (real-time) ┌───────────────────────┐
│ MongoDB │ ──── Change Stream ────────────────> │ Watcher Process │
│ products collection │ │ (Node.js / Python) │
│ (insert/update/del) │ └───────────┬───────────┘
└──────────────────────┘ │
transform + map fields
│
v
┌────────────────────────────┐
│ POST /api/v1/records │
│ (per change event) │
│ or POST /api/v1/records/ │
│ bulk (batched) │
└─────────────┬──────────────┘
│
v
┌─────────────────┐
│ Certyo API │
│ (202 Accepted) │
└────────┬────────┘
│
~60-90s anchoring
│
v
┌─────────────────┐
│ Polygon │
│ (on-chain) │
└─────────────────┘Prerequisites
- MongoDB 4.0+ with a replica set (or MongoDB Atlas, which always provides one)
- Node.js 18+ with the
mongodbdriver, or Python 3.9+ withpymongo - A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
- For the Debezium approach: Docker and Docker Compose
Change Streams approach (preferred)
Change Streams provide a real-time, resumable stream of document changes. The watcher below persists its resume token in a _certyo_cursors collection so it can survive restarts without missing events.
/**
* MongoDB Change Streams watcher for Certyo.
* Watches a collection for insert/update/replace/delete events,
* transforms them into Certyo records, and ingests via the API.
*
* Install: npm install mongodb node-fetch
* Usage: CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy node certyo-mongo-watcher.js
*/
const { MongoClient } = require("mongodb");
const MONGO_URI = process.env.MONGO_URI || "mongodb://localhost:27017/?replicaSet=rs0";
const DATABASE = process.env.MONGO_DATABASE || "productdb";
const COLLECTION = process.env.MONGO_COLLECTION || "products";
const CERTYO_API_KEY = process.env.CERTYO_API_KEY;
const CERTYO_TENANT_ID = process.env.CERTYO_TENANT_ID;
const CERTYO_BASE_URL = process.env.CERTYO_BASE_URL || "https://www.certyos.com";
if (!CERTYO_API_KEY || !CERTYO_TENANT_ID) {
console.error("CERTYO_API_KEY and CERTYO_TENANT_ID are required");
process.exit(1);
}
async function main() {
const client = new MongoClient(MONGO_URI);
await client.connect();
console.log("Connected to MongoDB");
const db = client.db(DATABASE);
const collection = db.collection(COLLECTION);
const cursorsCollection = db.collection("_certyo_cursors");
// Load persisted resume token (if any)
const cursor = await cursorsCollection.findOne({ _id: COLLECTION });
const resumeAfter = cursor?.resumeToken || undefined;
const pipeline = [
{
$match: {
operationType: { $in: ["insert", "update", "replace", "delete"] },
},
},
];
const options = {
fullDocument: "updateLookup",
...(resumeAfter && { resumeAfter }),
};
console.log(
resumeAfter
? "Resuming Change Stream from saved token"
: "Starting Change Stream from current position"
);
const changeStream = collection.watch(pipeline, options);
changeStream.on("change", async (event) => {
try {
const record = mapChangeEvent(event);
const result = await ingestRecord(record);
if (result) {
console.log(
`Ingested ${record.recordId}: hash=${result.recordHash}`
);
}
// Persist resume token after successful ingestion
await cursorsCollection.updateOne(
{ _id: COLLECTION },
{
$set: {
resumeToken: event._id,
lastProcessedAt: new Date(),
lastRecordId: record.recordId,
},
},
{ upsert: true }
);
} catch (err) {
console.error("Error processing change event:", err.message);
}
});
changeStream.on("error", (err) => {
console.error("Change Stream error:", err.message);
// The driver will attempt to resume automatically using the resume token
});
// Graceful shutdown
process.on("SIGINT", async () => {
console.log("Shutting down...");
await changeStream.close();
await client.close();
process.exit(0);
});
}
function mapChangeEvent(event) {
const operationType =
event.operationType === "replace" ? "update" : event.operationType;
// For deletes, fullDocument is null — use documentKey
const doc = event.fullDocument || {};
const docId =
event.documentKey?._id?.toString() || doc._id?.toString() || "unknown";
return {
tenantId: CERTYO_TENANT_ID,
database: DATABASE,
collection: COLLECTION,
recordId: docId,
recordVersion: event._id?._data || "1",
operationType,
recordPayload:
operationType === "delete"
? { _id: docId, _deleted: true }
: sanitizeDocument(doc),
sourceTimestamp:
event.clusterTime?.toNumber()
? new Date(event.clusterTime.toNumber() * 1000).toISOString()
: new Date().toISOString(),
idempotencyKey: `mongo-${COLLECTION}-${docId}-${event._id?._data?.substring(0, 16) || Date.now()}`,
};
}
function sanitizeDocument(doc) {
// Convert MongoDB-specific types to plain JSON
const sanitized = { ...doc };
if (sanitized._id) {
sanitized._id = sanitized._id.toString();
}
return sanitized;
}
async function ingestRecord(record) {
const response = await fetch(`${CERTYO_BASE_URL}/api/v1/records`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-API-Key": CERTYO_API_KEY,
},
body: JSON.stringify(record),
});
if (!response.ok) {
const body = await response.text();
console.error(`Ingestion failed for ${record.recordId}: ${response.status} ${body}`);
return null;
}
return response.json();
}
main().catch((err) => {
console.error("Fatal error:", err);
process.exit(1);
});"""
MongoDB polling worker for Certyo.
Queries documents modified since the last sync timestamp and ingests them.
Install: pip install pymongo requests
Usage: CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python certyo_mongo_poll.py
"""
import os
import time
import json
import requests
from datetime import datetime, timezone
from pymongo import MongoClient
from bson import ObjectId
MONGO_URI = os.environ.get("MONGO_URI", "mongodb://localhost:27017/?replicaSet=rs0")
DATABASE = os.environ.get("MONGO_DATABASE", "productdb")
COLLECTION = os.environ.get("MONGO_COLLECTION", "products")
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
BULK_THRESHOLD = 10
def get_last_sync(db):
"""Load the last sync timestamp from the _certyo_cursors collection."""
cursor_doc = db["_certyo_cursors"].find_one({"_id": f"poll_{COLLECTION}"})
if cursor_doc and "lastSyncAt" in cursor_doc:
return cursor_doc["lastSyncAt"]
return datetime(2000, 1, 1, tzinfo=timezone.utc)
def save_last_sync(db, sync_time):
"""Persist the sync timestamp."""
db["_certyo_cursors"].update_one(
{"_id": f"poll_{COLLECTION}"},
{
"$set": {
"lastSyncAt": sync_time,
"updatedAt": datetime.now(timezone.utc),
}
},
upsert=True,
)
def map_document(doc):
"""Map a MongoDB document to a Certyo record payload."""
doc_id = str(doc["_id"])
# Convert ObjectId and datetime fields for JSON serialization
payload = {}
for key, value in doc.items():
if isinstance(value, ObjectId):
payload[key] = str(value)
elif isinstance(value, datetime):
payload[key] = value.isoformat()
else:
payload[key] = value
return {
"tenantId": CERTYO_TENANT_ID,
"database": DATABASE,
"collection": COLLECTION,
"recordId": doc_id,
"recordVersion": "1",
"operationType": "upsert",
"recordPayload": payload,
"sourceTimestamp": doc.get("updatedAt", datetime.now(timezone.utc)).isoformat()
if isinstance(doc.get("updatedAt"), datetime)
else datetime.now(timezone.utc).isoformat(),
"idempotencyKey": f"mongo-poll-{COLLECTION}-{doc_id}-"
f"{datetime.now(timezone.utc).strftime('%Y-%m-%d')}",
}
def ingest_records(records):
"""Send records to Certyo — bulk if above threshold."""
headers = {
"X-API-Key": CERTYO_API_KEY,
"Content-Type": "application/json",
}
if len(records) > BULK_THRESHOLD:
# Chunk into batches of 1000
for i in range(0, len(records), 1000):
batch = records[i : i + 1000]
payload = {"tenantId": CERTYO_TENANT_ID, "records": batch}
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records/bulk",
headers=headers,
json=payload,
)
if resp.ok:
print(f" Bulk ingested {len(batch)} records")
else:
print(f" Bulk failed: {resp.status_code} {resp.text}")
else:
for record in records:
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records",
headers=headers,
json=record,
)
if resp.ok:
result = resp.json()
print(f" Ingested {record['recordId']}: {result['recordHash']}")
else:
print(f" Failed {record['recordId']}: {resp.status_code}")
def poll_cycle():
"""Run one polling cycle."""
client = MongoClient(MONGO_URI)
db = client[DATABASE]
collection = db[COLLECTION]
last_sync = get_last_sync(db)
now = datetime.now(timezone.utc)
# Query documents modified since last sync
query = {"updatedAt": {"$gte": last_sync}}
docs = list(collection.find(query).sort("updatedAt", 1).limit(5000))
if not docs:
client.close()
return
print(f"Found {len(docs)} modified documents since {last_sync.isoformat()}")
records = [map_document(doc) for doc in docs]
ingest_records(records)
save_last_sync(db, now)
client.close()
if __name__ == "__main__":
print(f"MongoDB polling worker started (interval: {POLL_INTERVAL}s)")
print(f"Watching {DATABASE}.{COLLECTION}")
while True:
try:
poll_cycle()
except Exception as e:
print(f"Poll cycle error: {e}")
time.sleep(POLL_INTERVAL)version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
debezium:
image: debezium/connect:2.5
depends_on: [kafka]
ports: ["8083:8083"]
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: certyo-mongo-cdc
CONFIG_STORAGE_TOPIC: _debezium_configs
OFFSET_STORAGE_TOPIC: _debezium_offsets
STATUS_STORAGE_TOPIC: _debezium_status
CONFIG_STORAGE_REPLICATION_FACTOR: 1
OFFSET_STORAGE_REPLICATION_FACTOR: 1
STATUS_STORAGE_REPLICATION_FACTOR: 1
# After startup, register the MongoDB connector:
#
# curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
# "name": "certyo-mongo-source",
# "config": {
# "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
# "mongodb.connection.string": "mongodb://mongo:27017/?replicaSet=rs0",
# "topic.prefix": "certyo",
# "collection.include.list": "productdb.products",
# "capture.mode": "change_streams_update_full",
# "snapshot.mode": "initial"
# }
# }'
#
# This creates a Kafka topic: certyo.productdb.products
# Write a consumer that reads from this topic and POSTs to Certyo /api/v1/recordsField mapping
Map MongoDB document fields to the Certyo record schema:
MongoDB Field Certyo Field Notes
────────────────────── ───────────────── ────────────────────────────────────
_id (ObjectId) recordId Cast to string via .toString()
collection name collection e.g. "products"
database name database e.g. "productdb"
Full document (JSON) recordPayload Sanitize ObjectIds and Dates
clusterTime sourceTimestamp From change event; convert to ISO 8601
operationType operationType insert, update, replace → update, delete
_id + resume token idempotencyKey Ensures deduplication on retryVerification
After anchoring completes (~60-90 seconds), verify the record and write the status back to the source document. This creates an audit trail directly in MongoDB.
const { MongoClient } = require("mongodb");
const MONGO_URI = process.env.MONGO_URI || "mongodb://localhost:27017/?replicaSet=rs0";
const DATABASE = process.env.MONGO_DATABASE || "productdb";
const COLLECTION = process.env.MONGO_COLLECTION || "products";
const CERTYO_API_KEY = process.env.CERTYO_API_KEY;
const CERTYO_TENANT_ID = process.env.CERTYO_TENANT_ID;
const CERTYO_BASE_URL = process.env.CERTYO_BASE_URL || "https://www.certyos.com";
async function verifyAndUpdate() {
const client = new MongoClient(MONGO_URI);
await client.connect();
const db = client.db(DATABASE);
const collection = db.collection(COLLECTION);
// Find documents that have been ingested but not yet verified
const docs = await collection
.find({
certyo_status: { $ne: "anchored" },
_id: { $exists: true },
})
.limit(100)
.toArray();
console.log(`Verifying ${docs.length} documents...`);
for (const doc of docs) {
const docId = doc._id.toString();
const response = await fetch(`${CERTYO_BASE_URL}/api/v1/verify/record`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-API-Key": CERTYO_API_KEY,
},
body: JSON.stringify({
tenantId: CERTYO_TENANT_ID,
database: DATABASE,
collection: COLLECTION,
recordId: docId,
}),
});
if (!response.ok) {
console.log(` ${docId}: verification request failed (${response.status})`);
continue;
}
const result = await response.json();
if (result.verified && result.anchoredOnChain) {
await collection.updateOne(
{ _id: doc._id },
{
$set: {
certyo_status: "anchored",
certyo_record_hash: result.recordHash,
certyo_verified_at: new Date(),
certyo_tx_hash: result.transactionHash,
},
}
);
console.log(` ${docId}: anchored (tx: ${result.transactionHash})`);
} else {
console.log(` ${docId}: not yet anchored`);
}
}
await client.close();
}
verifyAndUpdate().catch(console.error);Resume token persistence
The Change Streams watcher stores its resume token in a _certyo_cursors collection in the same database. This allows the watcher to:
- Survive restarts — On startup, the watcher loads the last resume token and continues from where it left off
- Handle connection drops — The MongoDB driver automatically attempts to resume using the token when a connection error occurs
- Avoid duplicates — Combined with the
idempotencyKey, replayed events are deduplicated by Certyo
Token expiry
Resume tokens are only valid while the corresponding oplog entries exist. MongoDB Atlas retains oplog entries for at least 24 hours (configurable). If the watcher is offline longer than the oplog window, it will receive an InvalidResumeToken error. In that case, drop the resume token from _certyo_cursors and perform a full collection sync before restarting the watcher.
Bulk optimization
For collections with high write throughput, batch multiple change events before calling the API:
POST /api/v1/records/bulkaccepts up to 1000 records per request- Buffer change events for a short window (e.g., 5 seconds) and send them as a single bulk request
- The polling approach naturally batches — it sends all modified documents found in each cycle
- For the Debezium approach, use a Kafka consumer with a batch listener that accumulates messages before posting to Certyo
Troubleshooting
- "MongoError: The $changeStream stage is only supported on replica sets" — Your MongoDB instance is running as a standalone. Convert to a single-node replica set: add
--replSet rs0to yourmongodstartup and runrs.initiate()in the shell. - Missing updates in Change Stream — Ensure
fullDocument: "updateLookup"is set in the watch options. Without it, update events only contain the changed fields, not the full document. - Resume token invalid — The watcher was offline longer than the oplog retention window. Delete the token from
_certyo_cursorsand restart. Consider a full collection resync. - Polling misses deletes — The polling approach only catches documents that exist and have an
updatedAtfield. It cannot detect deletes. Use Change Streams or soft deletes for full coverage. - Duplicate records in Certyo — The
idempotencyKeyprevents true duplicates. If you see apparent duplicates, check that different operations on the same document (insert then update) are generating distinct keys.
AI Integration Skill
Download a skill file that enables AI agents to generate working MongoDB + Certyo integration code for any language or framework.
What's inside
- Authentication — MongoDB connection string and Certyo API key configuration
- Architecture — Change Streams real-time watcher with resume token persistence
- Field mapping — _id, collection, database to Certyo record schema
- Code examples — Node.js Change Streams, Python polling, Debezium CDC
- Verification — Document update with certyo_status and recordHash fields
- Patterns — Real-time streaming, batch polling, and Debezium CDC approaches
How to use
Claude Code
Place the file in your project's .claude/commands/ directory, then use it as a slash command:
# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-mongodb.md \
https://www.certyos.com/developers/skills/certyo-mongodb-skill.md
# Use it in Claude Code
/certyo-mongodb "Generate a Node.js watcher that uses Change Streams to push document changes to Certyo"Cursor / Copilot / Any AI Agent
Add the file to your project root or attach it to a conversation. The AI agent will use the MongoDB-specific patterns, field mappings, and code examples to generate correct integration code.
# Add to your project
curl -o CERTYO_MONGODB.md \
https://www.certyos.com/developers/skills/certyo-mongodb-skill.md
# Then in your AI agent:
"Using the Certyo MongoDB spec in CERTYO_MONGODB.md,
generate a node.js watcher that uses change streams to push document changes to certyo"CLAUDE.md Context File
Append the skill file to your project's CLAUDE.md so every Claude conversation has MongoDB + Certyo context automatically.
# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo MongoDB Integration" >> CLAUDE.md
cat CERTYO_MONGODB.md >> CLAUDE.md