---
name: Certyo + MongoDB Integration
version: 1.0.0
description: Generate MongoDB → Certyo integration code with Change Streams, polling, field mappings, auth, and working examples
api_base: https://www.certyos.com
auth: X-API-Key header
last_updated: 2026-04-14
---

# Certyo + MongoDB Integration Skill

This skill generates production-ready Node.js and Python code to integrate MongoDB with Certyo's blockchain-backed authenticity platform. It uses Change Streams for real-time CDC or timestamp-based polling to detect document changes and ingest them into Certyo for immutable anchoring on Polygon.

## Certyo API Reference

All requests require the `X-API-Key` header for authentication.

### Endpoints

| Method | Path | Description | Response |
|--------|------|-------------|----------|
| `POST` | `/api/v1/records` | Ingest a single record | `202 Accepted` |
| `POST` | `/api/v1/records/bulk` | Ingest up to 1000 records | `202 Accepted` |
| `POST` | `/api/v1/verify/record` | Verify blockchain anchoring | `200 OK` |
| `GET` | `/api/v1/records` | Query records | `200 OK` |

### Record Payload

```json
{
  "tenantId": "string (required)",
  "database": "string (required)",
  "collection": "string (required)",
  "recordId": "string (required)",
  "recordPayload": { "...any JSON (required)" },
  "clientId": "string (optional)",
  "recordVersion": "string (optional, default '1')",
  "operationType": "upsert|insert|update|delete (optional)",
  "sourceTimestamp": "ISO 8601 (optional)",
  "idempotencyKey": "string (optional)"
}
```

### Ingestion Response (202 Accepted)

```json
{
  "recordId": "665a1b2c3d4e5f6a7b8c9d0e",
  "recordHash": "sha256:ab12cd34...",
  "tenantId": "acme-corp",
  "acceptedAt": "2026-04-14T10:30:00Z",
  "idempotencyReplayed": false
}
```

### Pipeline Timing

Records flow through: Kafka -> Accumulate (1000 records or 60s) -> Merkle tree -> IPFS pin -> Polygon anchor. Total anchoring latency is approximately 60-90 seconds after accumulation flush.

## Integration Patterns

### Pattern 1: Change Streams (Preferred)

```
MongoDB collection with replica set
  -> Node.js watcher calls collection.watch() with resumeAfter token
    -> Receives real-time insert/update/replace/delete events
      -> Maps document to Certyo record payload
        -> POST /api/v1/records (or /bulk for batched mode)
          -> Persists resume token in _certyo_cursors collection
            -> Verification worker updates source document with certyo_status
```

### Pattern 2: Timestamp-Based Polling

```
MongoDB collection with updatedAt field
  -> Python script polls every N seconds with find({ updatedAt: { $gte: lastSync } })
    -> Maps each document to Certyo record payload
      -> POST /api/v1/records (or /bulk if > 10 docs)
        -> Persists lastSyncAt in _certyo_cursors collection
```

### Pattern 3: Debezium CDC

```
MongoDB replica set
  -> Debezium MongoDB connector reads oplog via Change Streams
    -> Publishes to Kafka topic (certyo.productdb.products)
      -> Consumer reads Kafka, maps to Certyo payload
        -> POST /api/v1/records
```

## Authentication

### MongoDB Connection

```javascript
// Node.js
const MONGO_URI = "mongodb+srv://certyo_reader:PASSWORD@cluster0.abc123.mongodb.net/?retryWrites=true&w=majority";

// Python
MONGO_URI = "mongodb+srv://certyo_reader:PASSWORD@cluster0.abc123.mongodb.net/?retryWrites=true&w=majority"
```

For local development with a replica set:

```
mongodb://localhost:27017/?replicaSet=rs0
```

### Certyo API

```json
{
  "CERTYO_BASE_URL": "https://www.certyos.com",
  "CERTYO_API_KEY": "cty_xxxxxxxxxxxxxxxxxxxx",
  "CERTYO_TENANT_ID": "acme-corp"
}
```

Store the API key in environment variables or a secrets manager. Never commit credentials to source control.

## Field Mapping

| MongoDB Field | Certyo Field | Notes |
|--------------|-------------|-------|
| `_id` (ObjectId) | `recordId` | Cast to string via `.toString()` |
| Collection name | `collection` | e.g. `"products"` |
| Database name | `database` | e.g. `"productdb"` |
| Full document (JSON) | `recordPayload` | Sanitize ObjectIds and Dates to strings |
| `clusterTime` | `sourceTimestamp` | From change event; convert to ISO 8601 |
| `operationType` | `operationType` | `insert`, `update`, `replace` -> `update`, `delete` |
| `_id` + resume token data | `idempotencyKey` | Ensures deduplication on retry |

### Operation Mapping

| Change Stream `operationType` | Certyo `operationType` |
|------------------------------|----------------------|
| `insert` | `insert` |
| `update` | `update` |
| `replace` | `update` |
| `delete` | `delete` |

### Document Sanitization

MongoDB documents may contain BSON types that are not valid JSON. Sanitize before sending to Certyo:

- `ObjectId` -> `.toString()` (string)
- `Date` -> `.toISOString()` (ISO 8601 string)
- `Decimal128` -> `.toString()` (string)
- `Binary` -> base64 encode or hex string

## Code Examples

### 1. Node.js Change Streams Watcher (Complete)

```javascript
/**
 * MongoDB Change Streams watcher for Certyo.
 * Watches a collection for document changes and ingests them.
 * Persists resume tokens in _certyo_cursors for crash recovery.
 *
 * Install: npm install mongodb
 * Usage:   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy node certyo-mongo-watcher.js
 */

const { MongoClient } = require("mongodb");

const MONGO_URI = process.env.MONGO_URI || "mongodb://localhost:27017/?replicaSet=rs0";
const DATABASE = process.env.MONGO_DATABASE || "productdb";
const COLLECTION = process.env.MONGO_COLLECTION || "products";
const CERTYO_API_KEY = process.env.CERTYO_API_KEY;
const CERTYO_TENANT_ID = process.env.CERTYO_TENANT_ID;
const CERTYO_BASE_URL = process.env.CERTYO_BASE_URL || "https://www.certyos.com";

async function main() {
  const client = new MongoClient(MONGO_URI);
  await client.connect();

  const db = client.db(DATABASE);
  const collection = db.collection(COLLECTION);
  const cursorsCollection = db.collection("_certyo_cursors");

  // Load persisted resume token
  const cursorDoc = await cursorsCollection.findOne({ _id: COLLECTION });
  const resumeAfter = cursorDoc?.resumeToken || undefined;

  const pipeline = [
    { $match: { operationType: { $in: ["insert", "update", "replace", "delete"] } } },
  ];

  const options = {
    fullDocument: "updateLookup",
    ...(resumeAfter && { resumeAfter }),
  };

  const changeStream = collection.watch(pipeline, options);

  changeStream.on("change", async (event) => {
    try {
      const record = mapChangeEvent(event);
      const result = await ingestRecord(record);

      if (result) {
        console.log(`Ingested ${record.recordId}: hash=${result.recordHash}`);
      }

      // Persist resume token
      await cursorsCollection.updateOne(
        { _id: COLLECTION },
        {
          $set: {
            resumeToken: event._id,
            lastProcessedAt: new Date(),
            lastRecordId: record.recordId,
          },
        },
        { upsert: true }
      );
    } catch (err) {
      console.error("Error processing change:", err.message);
    }
  });

  changeStream.on("error", (err) => {
    console.error("Change Stream error:", err.message);
  });

  process.on("SIGINT", async () => {
    await changeStream.close();
    await client.close();
    process.exit(0);
  });
}

function mapChangeEvent(event) {
  const operationType = event.operationType === "replace" ? "update" : event.operationType;
  const doc = event.fullDocument || {};
  const docId = event.documentKey?._id?.toString() || doc._id?.toString() || "unknown";

  return {
    tenantId: CERTYO_TENANT_ID,
    database: DATABASE,
    collection: COLLECTION,
    recordId: docId,
    recordVersion: event._id?._data || "1",
    operationType,
    recordPayload:
      operationType === "delete"
        ? { _id: docId, _deleted: true }
        : sanitizeDocument(doc),
    sourceTimestamp: event.clusterTime?.toNumber()
      ? new Date(event.clusterTime.toNumber() * 1000).toISOString()
      : new Date().toISOString(),
    idempotencyKey: `mongo-${COLLECTION}-${docId}-${event._id?._data?.substring(0, 16) || Date.now()}`,
  };
}

function sanitizeDocument(doc) {
  const sanitized = { ...doc };
  if (sanitized._id) sanitized._id = sanitized._id.toString();
  return sanitized;
}

async function ingestRecord(record) {
  const response = await fetch(`${CERTYO_BASE_URL}/api/v1/records`, {
    method: "POST",
    headers: { "Content-Type": "application/json", "X-API-Key": CERTYO_API_KEY },
    body: JSON.stringify(record),
  });

  if (!response.ok) {
    const body = await response.text();
    console.error(`Ingestion failed: ${response.status} ${body}`);
    return null;
  }
  return response.json();
}

main().catch(console.error);
```

### 2. Python Polling Worker

```python
"""
MongoDB polling worker for Certyo.
Queries documents modified since last sync and ingests them.

Install: pip install pymongo requests
Usage:   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python certyo_mongo_poll.py
"""

import os
import time
import requests
from datetime import datetime, timezone
from pymongo import MongoClient
from bson import ObjectId

MONGO_URI = os.environ.get("MONGO_URI", "mongodb://localhost:27017/?replicaSet=rs0")
DATABASE = os.environ.get("MONGO_DATABASE", "productdb")
COLLECTION = os.environ.get("MONGO_COLLECTION", "products")
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
BULK_THRESHOLD = 10


def get_last_sync(db):
    cursor_doc = db["_certyo_cursors"].find_one({"_id": f"poll_{COLLECTION}"})
    if cursor_doc and "lastSyncAt" in cursor_doc:
        return cursor_doc["lastSyncAt"]
    return datetime(2000, 1, 1, tzinfo=timezone.utc)


def save_last_sync(db, sync_time):
    db["_certyo_cursors"].update_one(
        {"_id": f"poll_{COLLECTION}"},
        {"$set": {"lastSyncAt": sync_time, "updatedAt": datetime.now(timezone.utc)}},
        upsert=True,
    )


def map_document(doc):
    doc_id = str(doc["_id"])
    payload = {}
    for key, value in doc.items():
        if isinstance(value, ObjectId):
            payload[key] = str(value)
        elif isinstance(value, datetime):
            payload[key] = value.isoformat()
        else:
            payload[key] = value

    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": DATABASE,
        "collection": COLLECTION,
        "recordId": doc_id,
        "recordVersion": "1",
        "operationType": "upsert",
        "recordPayload": payload,
        "sourceTimestamp": doc.get("updatedAt", datetime.now(timezone.utc)).isoformat()
            if isinstance(doc.get("updatedAt"), datetime)
            else datetime.now(timezone.utc).isoformat(),
        "idempotencyKey": f"mongo-poll-{COLLECTION}-{doc_id}-"
            f"{datetime.now(timezone.utc).strftime('%Y-%m-%d')}",
    }


def ingest_records(records):
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}

    if len(records) > BULK_THRESHOLD:
        for i in range(0, len(records), 1000):
            batch = records[i : i + 1000]
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records/bulk",
                headers=headers,
                json={"tenantId": CERTYO_TENANT_ID, "records": batch},
            )
            if resp.ok:
                print(f"  Bulk ingested {len(batch)} records")
            else:
                print(f"  Bulk failed: {resp.status_code} {resp.text}")
    else:
        for record in records:
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records",
                headers=headers,
                json=record,
            )
            if resp.ok:
                result = resp.json()
                print(f"  Ingested {record['recordId']}: {result['recordHash']}")
            else:
                print(f"  Failed {record['recordId']}: {resp.status_code}")


def poll_cycle():
    client = MongoClient(MONGO_URI)
    db = client[DATABASE]
    collection = db[COLLECTION]

    last_sync = get_last_sync(db)
    now = datetime.now(timezone.utc)

    docs = list(
        collection.find({"updatedAt": {"$gte": last_sync}})
        .sort("updatedAt", 1)
        .limit(5000)
    )

    if not docs:
        client.close()
        return

    print(f"Found {len(docs)} modified documents since {last_sync.isoformat()}")
    records = [map_document(doc) for doc in docs]
    ingest_records(records)
    save_last_sync(db, now)
    client.close()


if __name__ == "__main__":
    print(f"MongoDB polling worker started (interval: {POLL_INTERVAL}s)")
    while True:
        try:
            poll_cycle()
        except Exception as e:
            print(f"Poll cycle error: {e}")
        time.sleep(POLL_INTERVAL)
```

### 3. Debezium MongoDB Connector

```yaml
# docker-compose.yml for Debezium MongoDB CDC
version: "3.9"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  debezium:
    image: debezium/connect:2.5
    depends_on: [kafka]
    ports: ["8083:8083"]
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: certyo-mongo-cdc
      CONFIG_STORAGE_TOPIC: _debezium_configs
      OFFSET_STORAGE_TOPIC: _debezium_offsets
      STATUS_STORAGE_TOPIC: _debezium_status
```

Register the connector:

```bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "certyo-mongo-source",
    "config": {
      "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
      "mongodb.connection.string": "mongodb://mongo:27017/?replicaSet=rs0",
      "topic.prefix": "certyo",
      "collection.include.list": "productdb.products",
      "capture.mode": "change_streams_update_full",
      "snapshot.mode": "initial"
    }
  }'
```

### 4. Verification Worker

```javascript
// verify-and-update.js — Verify anchored records and update source documents
const { MongoClient } = require("mongodb");

const MONGO_URI = process.env.MONGO_URI || "mongodb://localhost:27017/?replicaSet=rs0";
const DATABASE = process.env.MONGO_DATABASE || "productdb";
const COLLECTION = process.env.MONGO_COLLECTION || "products";
const CERTYO_API_KEY = process.env.CERTYO_API_KEY;
const CERTYO_TENANT_ID = process.env.CERTYO_TENANT_ID;
const CERTYO_BASE_URL = process.env.CERTYO_BASE_URL || "https://www.certyos.com";

async function verifyAndUpdate() {
  const client = new MongoClient(MONGO_URI);
  await client.connect();
  const db = client.db(DATABASE);
  const collection = db.collection(COLLECTION);

  // Find documents not yet verified
  const docs = await collection
    .find({ certyo_status: { $ne: "anchored" } })
    .limit(100)
    .toArray();

  console.log(`Verifying ${docs.length} documents...`);

  for (const doc of docs) {
    const docId = doc._id.toString();
    const response = await fetch(`${CERTYO_BASE_URL}/api/v1/verify/record`, {
      method: "POST",
      headers: { "Content-Type": "application/json", "X-API-Key": CERTYO_API_KEY },
      body: JSON.stringify({
        tenantId: CERTYO_TENANT_ID,
        database: DATABASE,
        collection: COLLECTION,
        recordId: docId,
      }),
    });

    if (!response.ok) {
      console.log(`  ${docId}: verification failed (${response.status})`);
      continue;
    }

    const result = await response.json();
    if (result.verified && result.anchoredOnChain) {
      await collection.updateOne(
        { _id: doc._id },
        {
          $set: {
            certyo_status: "anchored",
            certyo_record_hash: result.recordHash,
            certyo_verified_at: new Date(),
            certyo_tx_hash: result.transactionHash,
          },
        }
      );
      console.log(`  ${docId}: anchored (tx: ${result.transactionHash})`);
    } else {
      console.log(`  ${docId}: not yet anchored`);
    }
  }

  await client.close();
}

verifyAndUpdate().catch(console.error);
```

## Verification & Write-back

### Verification Flow

1. A verification script runs periodically (e.g., every 2 minutes via cron or setInterval)
2. Queries documents where `certyo_status` is not `"anchored"`
3. Calls `POST /api/v1/verify/record` with `tenantId`, `database`, `collection`, and `recordId`
4. On success, updates the source document:
   - `certyo_status` = `"anchored"`
   - `certyo_record_hash` = hash from verification response
   - `certyo_verified_at` = current timestamp
   - `certyo_tx_hash` = Polygon transaction hash

### Write-back Fields

```javascript
// Fields added to source documents after verification
{
  certyo_status: "anchored",           // "pending" | "anchored" | "failed"
  certyo_record_hash: "sha256:ab12...", // Hash returned by ingestion API
  certyo_verified_at: ISODate("..."),   // When verification succeeded
  certyo_tx_hash: "0xabc123..."         // Polygon transaction hash
}
```

### Change Stream and Write-back Interaction

When the verification worker updates `certyo_status` fields, the Change Stream watcher will see those updates. To avoid an infinite loop, filter out changes that only affect Certyo tracking fields:

```javascript
// In the mapChangeEvent function, skip documents where only certyo_ fields changed
const CERTYO_FIELDS = ["certyo_status", "certyo_record_hash", "certyo_verified_at", "certyo_tx_hash"];

// For update events, check if only certyo_ fields were modified
if (event.operationType === "update" && event.updateDescription) {
  const updatedFields = Object.keys(event.updateDescription.updatedFields || {});
  const onlyCertyoFields = updatedFields.every(f => CERTYO_FIELDS.includes(f));
  if (onlyCertyoFields) return null; // Skip this event
}
```

## Code Generation Rules

1. **Always persist the resume token in the `_certyo_cursors` collection.** Update the token after every successful ingestion. This ensures the watcher can resume from the correct position after a restart.

2. **Use the bulk endpoint (`POST /api/v1/records/bulk`) when the batch contains more than 10 records.** The bulk endpoint accepts up to 1000 records per call. For batches exceeding 1000, split into multiple bulk calls. For 10 or fewer, use the single-record endpoint.

3. **Change Streams require a MongoDB replica set.** Atlas deployments always have a replica set. Standalone `mongod` instances must be converted to a single-node replica set by adding `--replSet rs0` and running `rs.initiate()`.

4. **Use `fullDocument: "updateLookup"` in Change Stream options.** Without this, update events only contain the changed fields (`updateDescription`), not the full document needed for `recordPayload`.

5. **Sanitize BSON types before sending to Certyo.** Convert `ObjectId` to string, `Date` to ISO 8601, and `Decimal128` to string. The Certyo API expects standard JSON types.

6. **Implement exponential backoff on Certyo HTTP 429 (Too Many Requests) responses.** Start with a 1-second delay, doubling on each retry up to 3 retries. Never retry immediately on 429.

7. **Map `replace` operations to `update` in the Certyo `operationType`.** MongoDB Change Streams distinguish between `update` (partial) and `replace` (full document replacement), but Certyo treats both as updates.

8. **For delete events, send `{ _id: docId, _deleted: true }` as the `recordPayload`.** The full document is not available for deletes. Include the document key so the deletion is recorded with a meaningful payload.

9. **Construct idempotency keys from `_id` + resume token data.** This guarantees uniqueness per change event. If the watcher restarts and replays events, Certyo will deduplicate via the idempotency key.

10. **Exclude `certyo_*` fields from the `recordPayload`.** Fields like `certyo_status`, `certyo_record_hash`, and `certyo_verified_at` are metadata. Including them causes hash mismatches when these fields are updated by the verification worker.
