---
name: Certyo + MuleSoft Anypoint Integration
version: 1.0.0
description: Generate MuleSoft Anypoint → Certyo integration code with DataWeave transforms, API-led connectivity, auth, and working examples
api_base: https://www.certyos.com
auth: X-API-Key header
last_updated: 2026-04-14
---

# Certyo + MuleSoft Anypoint Integration Skill

This skill generates production-ready MuleSoft Anypoint integrations that ingest records into Certyo's blockchain-anchored authenticity platform. It follows API-led connectivity patterns with DataWeave transforms, encrypted credential management, and robust error handling.

## Certyo API Reference

### Endpoints

| Method | Path | Description | Response |
|--------|------|-------------|----------|
| `POST` | `/api/v1/records` | Ingest a single record | `202 Accepted` |
| `POST` | `/api/v1/records/bulk` | Ingest up to 1000 records | `202 Accepted` |
| `POST` | `/api/v1/verify/record` | Verify blockchain anchoring | `200 OK` |
| `GET` | `/api/v1/records` | Query records | `200 OK` |

### Authentication

All requests require the `X-API-Key` header with a valid Certyo API key.

### Record Payload

```json
{
  "tenantId": "string (required)",
  "database": "string (required)",
  "collection": "string (required)",
  "recordId": "string (required)",
  "recordPayload": { "...any JSON (required)" },
  "clientId": "string (optional)",
  "recordVersion": "string (optional, default '1')",
  "operationType": "upsert|insert|update|delete (optional)",
  "sourceTimestamp": "ISO 8601 (optional)",
  "idempotencyKey": "string (optional)"
}
```

### Response (202 Accepted)

```json
{
  "recordId": "string",
  "recordHash": "string (SHA-256)",
  "tenantId": "string",
  "acceptedAt": "ISO 8601",
  "idempotencyReplayed": false
}
```

### Pipeline

Record ingestion flows through: Kafka accumulation (1000 records or 60s flush) then Merkle tree computation then IPFS pin then Polygon anchor (~60-90s end to end).

## Integration Pattern

This integration follows MuleSoft's **API-led connectivity** three-layer architecture:

```
Source System API (Experience/System)
        |
   Process API (DataWeave transform + orchestration)
        |
   Certyo System API (HTTP Requester to Certyo)
```

- **Source System API**: Receives data from upstream systems (ERP, CRM, databases) via HTTP Listener, polling, or watermark-based triggers.
- **Process API**: Transforms source payloads into Certyo record format using DataWeave 2.0, applies business rules, and routes to the correct endpoint.
- **Certyo System API**: Manages the HTTP connection to Certyo, handles authentication, retries, and error responses.

## Authentication

### Anypoint Connected App (for Anypoint Platform access)

Use a Connected App with client credentials for deploying and managing the Mule application via Anypoint Platform APIs.

### Certyo API Key in Mule Secure Properties

Store the Certyo API key in an encrypted properties file using Mule Secure Properties module. Never place API keys in plain-text `.properties` or `.yaml` files.

**`src/main/resources/config-secure.yaml`** (encrypted values):

```yaml
certyo:
  apiKey: "![encryptedValue]"
  baseUrl: "https://www.certyos.com"
  tenantId: "your-tenant-id"
```

**Encryption command** (AES algorithm with a key stored in a runtime property):

```bash
# Encrypt the API key value
mvn com.mulesoft.tools:secure-properties-tool:1.2.0:string-encrypt \
  -Dalgorithm=AES -Dmode=CBC \
  -Dkey=${MULE_ENCRYPTION_KEY} \
  -Dvalue="your-certyo-api-key"
```

**Global configuration referencing secure properties:**

```xml
<secure-properties:config name="secureConfig"
    file="config-secure.yaml"
    key="${mule.encryption.key}"
    doc:name="Secure Properties Config">
    <secure-properties:encrypt algorithm="AES" mode="CBC"/>
</secure-properties:config>
```

## Field Mapping

MuleSoft connects to any source system, so the mapping is generic. Adapt the source fields to match your upstream data.

| Certyo Field | Source (Generic) | DataWeave Expression | Notes |
|---|---|---|---|
| `tenantId` | Config property | `p('certyo.tenantId')` | From secure properties |
| `database` | Source system name | `vars.sourceSystem` | Set as flow variable |
| `collection` | Source entity type | `vars.entityType` | e.g., "orders", "invoices" |
| `recordId` | Source record ID | `payload.id` | Unique in source system |
| `recordPayload` | Full source record | `payload` | Entire source payload |
| `clientId` | Anypoint client ID | `p('anypoint.clientId')` | Optional traceability |
| `operationType` | Derived from trigger | `vars.operationType` | "insert", "update", etc. |
| `sourceTimestamp` | Source timestamp | `payload.lastModified` | ISO 8601 format |
| `idempotencyKey` | Composite key | `vars.sourceSystem ++ ":" ++ payload.id ++ ":" ++ payload.version` | Prevents duplicates |

## Code Examples

### DataWeave 2.0 Transform: Source Record to Certyo Payload

**`src/main/resources/dw/transform-to-certyo-record.dwl`**:

```dataweave
%dw 2.0
output application/json

var sourceSystem = vars.sourceSystem default "unknown"
var entityType = vars.entityType default "records"
var opType = vars.operationType default "upsert"

---
{
    tenantId: p('certyo.tenantId'),
    database: sourceSystem,
    collection: entityType,
    recordId: payload.id as String,
    recordPayload: payload,
    clientId: p('anypoint.clientId') default null,
    recordVersion: (payload.version default "1") as String,
    operationType: opType,
    sourceTimestamp: payload.lastModified
        default payload.updatedAt
        default now() as String { format: "yyyy-MM-dd'T'HH:mm:ss'Z'" },
    idempotencyKey: sourceSystem ++ ":" ++ (payload.id as String) ++ ":" ++ (payload.version default "1") as String
}
```

### DataWeave 2.0 Transform: Bulk Records

**`src/main/resources/dw/transform-to-certyo-bulk.dwl`**:

```dataweave
%dw 2.0
output application/json

var sourceSystem = vars.sourceSystem default "unknown"
var entityType = vars.entityType default "records"

---
payload map ((item, index) -> {
    tenantId: p('certyo.tenantId'),
    database: sourceSystem,
    collection: entityType,
    recordId: item.id as String,
    recordPayload: item,
    recordVersion: (item.version default "1") as String,
    operationType: vars.operationType default "upsert",
    sourceTimestamp: item.lastModified
        default now() as String { format: "yyyy-MM-dd'T'HH:mm:ss'Z'" },
    idempotencyKey: sourceSystem ++ ":" ++ (item.id as String) ++ ":" ++ (item.version default "1") as String
})
```

### Mule 4 Flow XML: Single Record Ingestion

**`src/main/mule/certyo-process-api.xml`**:

```xml
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
      xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
      xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="
        http://www.mulesoft.org/schema/mule/core
        http://www.mulesoft.org/schema/mule/core/current/mule.xsd
        http://www.mulesoft.org/schema/mule/http
        http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
        http://www.mulesoft.org/schema/mule/ee/core
        http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd">

    <!-- HTTP Requester for Certyo API -->
    <http:request-config name="Certyo_HTTP_Request"
        doc:name="Certyo HTTP Request Config">
        <http:request-connection
            host="${certyo.baseUrl}"
            port="443"
            protocol="HTTPS">
            <http:default-headers>
                <http:default-header key="X-API-Key"
                    value="${secure::certyo.apiKey}"/>
                <http:default-header key="Content-Type"
                    value="application/json"/>
            </http:default-headers>
        </http:request-connection>
    </http:request-config>

    <!-- HTTP Listener for Source System API -->
    <http:listener-config name="Source_HTTP_Listener"
        doc:name="Source HTTP Listener Config">
        <http:listener-connection host="0.0.0.0" port="8081"/>
    </http:listener-config>

    <!-- Main flow: Receive record and send to Certyo -->
    <flow name="certyo-ingest-single-record-flow"
        doc:name="Certyo Single Record Ingestion">

        <http:listener config-ref="Source_HTTP_Listener"
            path="/api/records" allowedMethods="POST"
            doc:name="HTTP Listener - Receive Record"/>

        <set-variable variableName="sourceSystem"
            value="#[attributes.headers.'x-source-system' default 'external']"
            doc:name="Set Source System"/>

        <set-variable variableName="entityType"
            value="#[attributes.headers.'x-entity-type' default 'records']"
            doc:name="Set Entity Type"/>

        <set-variable variableName="operationType"
            value="#[attributes.headers.'x-operation-type' default 'upsert']"
            doc:name="Set Operation Type"/>

        <!-- Transform to Certyo record format -->
        <ee:transform doc:name="Transform to Certyo Record">
            <ee:message>
                <ee:set-payload resource="dw/transform-to-certyo-record.dwl"/>
            </ee:message>
        </ee:transform>

        <logger level="INFO"
            category="com.certyo.integration"
            message="#['Ingesting record: ' ++ payload.recordId ++ ' to Certyo']"
            doc:name="Log Ingestion Start"/>

        <!-- Send to Certyo API -->
        <http:request config-ref="Certyo_HTTP_Request"
            method="POST" path="/api/v1/records"
            doc:name="POST Record to Certyo">
            <http:response-validator>
                <http:success-status-code-validator values="202"/>
            </http:response-validator>
        </http:request>

        <logger level="INFO"
            category="com.certyo.integration"
            message="#['Record accepted: ' ++ payload.recordId ++ ' hash: ' ++ payload.recordHash]"
            doc:name="Log Ingestion Success"/>

        <!-- Error handling -->
        <error-handler>
            <on-error-propagate type="HTTP:UNAUTHORIZED"
                doc:name="Auth Error">
                <set-payload value='#[{"error": "Invalid Certyo API key", "status": 401}]'
                    mimeType="application/json"
                    doc:name="Auth Error Response"/>
                <logger level="ERROR"
                    category="com.certyo.integration"
                    message="Certyo authentication failed. Check API key in secure properties."
                    doc:name="Log Auth Error"/>
            </on-error-propagate>

            <on-error-propagate type="HTTP:CONNECTIVITY, HTTP:TIMEOUT"
                doc:name="Connectivity Error">
                <set-variable variableName="retryCount"
                    value="#[(vars.retryCount default 0) + 1]"
                    doc:name="Increment Retry"/>
                <logger level="WARN"
                    category="com.certyo.integration"
                    message="#['Certyo connectivity error, retry ' ++ vars.retryCount as String]"
                    doc:name="Log Retry"/>
                <set-payload value='#[{"error": "Certyo service unavailable", "retryCount": vars.retryCount}]'
                    mimeType="application/json"
                    doc:name="Connectivity Error Response"/>
            </on-error-propagate>

            <on-error-propagate type="ANY"
                doc:name="General Error">
                <logger level="ERROR"
                    category="com.certyo.integration"
                    message="#['Unexpected error ingesting record: ' ++ error.description]"
                    doc:name="Log General Error"/>
                <set-payload value='#[{"error": error.description, "errorType": error.errorType.identifier}]'
                    mimeType="application/json"
                    doc:name="Error Response"/>
            </on-error-propagate>
        </error-handler>
    </flow>
</mule>
```

### Mule 4 Flow XML: Batch Job for Bulk Ingestion

**`src/main/mule/certyo-batch-flow.xml`**:

```xml
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:batch="http://www.mulesoft.org/schema/mule/batch"
      xmlns:http="http://www.mulesoft.org/schema/mule/http"
      xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
      xmlns:os="http://www.mulesoft.org/schema/mule/os"
      xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="
        http://www.mulesoft.org/schema/mule/batch
        http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
        http://www.mulesoft.org/schema/mule/http
        http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
        http://www.mulesoft.org/schema/mule/ee/core
        http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
        http://www.mulesoft.org/schema/mule/os
        http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd
        http://www.mulesoft.org/schema/mule/core
        http://www.mulesoft.org/schema/mule/core/current/mule.xsd">

    <!-- Object Store for tracking batch results -->
    <os:object-store name="certyo_batch_store"
        doc:name="Certyo Batch Object Store"
        persistent="true" entryTtl="86400" entryTtlUnit="SECONDS"/>

    <!-- Batch job: process large datasets in chunks of 1000 -->
    <flow name="certyo-batch-trigger-flow"
        doc:name="Batch Ingestion Trigger">

        <http:listener config-ref="Source_HTTP_Listener"
            path="/api/records/batch" allowedMethods="POST"
            doc:name="HTTP Listener - Batch Trigger"/>

        <logger level="INFO"
            category="com.certyo.integration.batch"
            message="#['Starting batch ingestion: ' ++ sizeOf(payload) ++ ' records']"
            doc:name="Log Batch Start"/>

        <batch:job jobName="certyo-bulk-ingestion-batch"
            maxFailedRecords="-1"
            doc:name="Certyo Bulk Ingestion Batch">

            <batch:process-records>
                <!-- Step 1: Transform each record -->
                <batch:step name="transform-records"
                    doc:name="Transform Records">
                    <ee:transform doc:name="Transform Single Record">
                        <ee:message>
                            <ee:set-payload resource="dw/transform-to-certyo-record.dwl"/>
                        </ee:message>
                    </ee:transform>
                </batch:step>

                <!-- Step 2: Aggregate into chunks of 1000 and POST bulk -->
                <batch:step name="send-to-certyo"
                    doc:name="Send to Certyo">
                    <batch:aggregator size="1000"
                        doc:name="Aggregate 1000 Records">
                        <logger level="INFO"
                            category="com.certyo.integration.batch"
                            message="#['Sending bulk batch of ' ++ sizeOf(payload) ++ ' records']"
                            doc:name="Log Chunk"/>

                        <http:request config-ref="Certyo_HTTP_Request"
                            method="POST" path="/api/v1/records/bulk"
                            doc:name="POST Bulk to Certyo">
                            <http:response-validator>
                                <http:success-status-code-validator values="202"/>
                            </http:response-validator>
                        </http:request>
                    </batch:aggregator>
                </batch:step>
            </batch:process-records>

            <batch:on-complete>
                <logger level="INFO"
                    category="com.certyo.integration.batch"
                    message="#['Batch complete. Processed: ' ++ payload.processedRecords
                        ++ ' Successful: ' ++ payload.successfulRecords
                        ++ ' Failed: ' ++ payload.failedRecords]"
                    doc:name="Log Batch Complete"/>
            </batch:on-complete>
        </batch:job>
    </flow>
</mule>
```

### Retry Configuration with Until-Successful

For critical record ingestion that must not be lost, wrap the HTTP request in an `until-successful` scope:

```xml
<until-successful maxRetries="5"
    millisBetweenRetries="2000"
    doc:name="Retry Certyo Ingestion">
    <http:request config-ref="Certyo_HTTP_Request"
        method="POST" path="/api/v1/records"
        doc:name="POST Record to Certyo (with retry)">
        <http:response-validator>
            <http:success-status-code-validator values="202"/>
        </http:response-validator>
    </http:request>
</until-successful>
```

### Dead Letter Queue (DLQ) Flow

Route permanently failed records to a DLQ for manual review:

```xml
<flow name="certyo-dlq-flow" doc:name="Certyo DLQ Handler">
    <vm:listener queueName="certyo-dlq" config-ref="VM_Config"
        doc:name="VM Listener - DLQ"/>

    <logger level="ERROR"
        category="com.certyo.integration.dlq"
        message="#['DLQ record: ' ++ payload.recordId
            ++ ' error: ' ++ vars.errorDescription]"
        doc:name="Log DLQ Entry"/>

    <os:store key="#[payload.recordId]"
        objectStore="certyo_batch_store"
        doc:name="Store Failed Record">
        <os:value>#[output application/json --- {
            record: payload,
            error: vars.errorDescription,
            failedAt: now() as String {format: "yyyy-MM-dd'T'HH:mm:ss'Z'"},
            retryCount: vars.retryCount default 0
        }]</os:value>
    </os:store>
</flow>
```

## Verification & Write-back

After records are anchored (~60-90 seconds), verify and write the blockchain proof back to the source system.

### Verification Flow

```xml
<flow name="certyo-verification-flow"
    doc:name="Certyo Verification Polling">

    <!-- Poll every 2 minutes for recently ingested records -->
    <scheduler doc:name="Verification Scheduler">
        <scheduling-strategy>
            <fixed-frequency frequency="2" timeUnit="MINUTES"/>
        </scheduling-strategy>
    </scheduler>

    <!-- Query records pending verification from local store -->
    <os:retrieve-all objectStore="certyo_pending_verification_store"
        doc:name="Get Pending Records"/>

    <foreach doc:name="For Each Pending Record">
        <ee:transform doc:name="Build Verify Request">
            <ee:message>
                <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    tenantId: payload.tenantId,
    recordId: payload.recordId,
    recordHash: payload.recordHash
}]]></ee:set-payload>
            </ee:message>
        </ee:transform>

        <http:request config-ref="Certyo_HTTP_Request"
            method="POST" path="/api/v1/verify/record"
            doc:name="Verify Record">
            <http:response-validator>
                <http:success-status-code-validator values="200"/>
            </http:response-validator>
        </http:request>

        <choice doc:name="Check Verification Result">
            <when expression="#[payload.verified == true]">
                <logger level="INFO"
                    category="com.certyo.integration.verify"
                    message="#['Record ' ++ payload.recordId ++ ' verified on-chain']"
                    doc:name="Log Verified"/>

                <!-- Write back to source system -->
                <flow-ref name="certyo-writeback-sub-flow"
                    doc:name="Write Back to Source"/>

                <!-- Remove from pending store -->
                <os:remove key="#[payload.recordId]"
                    objectStore="certyo_pending_verification_store"
                    doc:name="Remove from Pending"/>
            </when>
            <otherwise>
                <logger level="DEBUG"
                    category="com.certyo.integration.verify"
                    message="#['Record ' ++ payload.recordId ++ ' not yet anchored, will retry']"
                    doc:name="Log Not Yet Anchored"/>
            </otherwise>
        </choice>
    </foreach>
</flow>

<!-- Sub-flow: Write blockchain proof back to source system -->
<sub-flow name="certyo-writeback-sub-flow"
    doc:name="Write Back Blockchain Proof">
    <ee:transform doc:name="Build Write-back Payload">
        <ee:message>
            <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
    certyo_verified: true,
    certyo_record_hash: payload.recordHash,
    certyo_merkle_root: payload.merkleRoot default null,
    certyo_anchor_tx: payload.transactionHash default null,
    certyo_verified_at: now() as String {format: "yyyy-MM-dd'T'HH:mm:ss'Z'"}
}]]></ee:set-payload>
        </ee:message>
    </ee:transform>

    <http:request config-ref="Source_System_HTTP_Request"
        method="PATCH"
        path="#['/api/records/' ++ vars.sourceRecordId ++ '/certyo-status']"
        doc:name="PATCH Source System Record"/>
</sub-flow>
```

## Code Generation Rules

1. **Use API-led three-layer architecture.** Always separate Source System API, Process API, and Certyo System API into distinct Mule applications or at minimum distinct flows. Never combine source-system polling and Certyo API calls in a single flow without a process layer.

2. **Store the Certyo API key in Secure Properties (encrypted), never in plain-text properties files.** Use `secure-properties:config` with AES/CBC encryption. Reference values with `${secure::certyo.apiKey}`. Never hard-code API keys in flow XML.

3. **Use DataWeave 2.0 for all data transformations.** Never use Set Payload with inline expressions for building JSON. Always create `.dwl` files in `src/main/resources/dw/` and reference them with `resource="dw/filename.dwl"`. This keeps transforms testable and maintainable.

4. **Implement `on-error-propagate` with retry for HTTP errors.** Use `until-successful` for transient errors (HTTP:CONNECTIVITY, HTTP:TIMEOUT) with exponential backoff. Use `on-error-propagate` to catch and categorize errors. Route permanently failed records to a DLQ (VM queue or Object Store).

5. **Use Batch Job for datasets exceeding 100 records.** Configure `batch:aggregator` with `size="1000"` to match the Certyo bulk endpoint limit. Set `maxFailedRecords="-1"` to process all records even if some fail. Log batch completion stats in `batch:on-complete`.

6. **Log with a custom logger category (`com.certyo.integration`).** Always specify `category` on every `<logger>` element. Use INFO for successful operations, WARN for retries, ERROR for failures. Include `recordId` and `recordHash` in log messages for traceability.

7. **Use externalized properties for all URLs, tenant IDs, and configuration values.** Never hard-code `https://www.certyos.com` or tenant IDs in flow XML. Use `${certyo.baseUrl}` and `${certyo.tenantId}` from properties files, with environment-specific overrides via `config-${env}.yaml`.

8. **Validate payloads before sending to Certyo.** Add a DataWeave validation step that checks all required fields (`tenantId`, `database`, `collection`, `recordId`, `recordPayload`) are present and non-null. Route invalid records to an error flow rather than sending them to the API.

9. **Use Object Store for state management across flows.** Track pending verifications, batch results, and DLQ entries in persistent Object Stores with appropriate TTLs. Never rely on in-memory state for cross-flow coordination.

10. **Generate `mule-artifact.json` with correct classloader isolation.** Include all required connectors (HTTP, Secure Properties, VM, Object Store, Batch) in the artifact descriptor. Set `minMuleVersion` to `4.4.0` or higher. Include MUnit test scaffolding for every flow.
