---
name: Certyo + SQL Server Integration
version: 1.0.0
description: Generate SQL Server / Azure SQL → Certyo integration code with field mappings, auth, and working examples
api_base: https://www.certyos.com
auth: X-API-Key header
last_updated: 2026-04-14
---

# Certyo + SQL Server (CDC) Integration Skill

This skill generates production-ready C# and SQL code to integrate Microsoft SQL Server (or Azure SQL) with Certyo's blockchain-backed authenticity platform. It uses Change Data Capture (CDC) to detect row-level changes and ingest them into Certyo for immutable anchoring on Polygon.

## Certyo API Reference

All requests require the `X-API-Key` header for authentication.

### Endpoints

| Method | Path | Description | Response |
|--------|------|-------------|----------|
| `POST` | `/api/v1/records` | Ingest a single record | `202 Accepted` |
| `POST` | `/api/v1/records/bulk` | Ingest up to 1000 records | `202 Accepted` |
| `POST` | `/api/v1/verify/record` | Verify blockchain anchoring | `200 OK` |
| `GET` | `/api/v1/records` | Query records | `200 OK` |

### Record Payload

```json
{
  "tenantId": "string (required)",
  "database": "string (required)",
  "collection": "string (required)",
  "recordId": "string (required)",
  "recordPayload": { "...any JSON (required)" },
  "clientId": "string (optional)",
  "recordVersion": "string (optional, default '1')",
  "operationType": "upsert|insert|update|delete (optional)",
  "sourceTimestamp": "ISO 8601 (optional)",
  "idempotencyKey": "string (optional)"
}
```

### Ingestion Response (202 Accepted)

```json
{
  "recordId": "PROD-42",
  "recordHash": "sha256:ab12cd34...",
  "tenantId": "acme-corp",
  "acceptedAt": "2026-04-14T10:30:00Z",
  "idempotencyReplayed": false
}
```

### Pipeline Timing

Records flow through: Kafka -> Accumulate (1000 records or 60s) -> Merkle tree -> IPFS pin -> Polygon anchor. Total anchoring latency is approximately 60-90 seconds after accumulation flush.

## Integration Pattern

```
SQL Server table with CDC enabled
  -> .NET 8 BackgroundService polls cdc.fn_cdc_get_net_changes every 10s
    -> Reads changes since last tracked LSN
      -> Maps rows to Certyo record payloads
        -> If batch > 10: POST /api/v1/records/bulk
        -> If batch <= 10: POST /api/v1/records (per record)
          -> Updates CdcTracker table with new LSN high-water mark
            -> Updates source row with CertyoRecordHash, CertyoAnchorStatus
              -> Separate verification worker polls POST /api/v1/verify/record
```

### Alternative: Azure Function with SQL Trigger

For Azure SQL databases, an Azure Function with a SQL trigger binding can replace the BackgroundService polling pattern. See the Azure Function example below.

## Authentication

### SQL Server Connection

```json
{
  "ConnectionStrings": {
    "SqlServer": "Server=myserver.database.windows.net;Database=ProductDb;User Id=certyo_cdc_reader;Password=***;Encrypt=True;TrustServerCertificate=False;"
  }
}
```

For Windows Authentication (on-premises):

```json
{
  "ConnectionStrings": {
    "SqlServer": "Server=SQLPROD01;Database=ProductDb;Integrated Security=True;Encrypt=True;"
  }
}
```

### Certyo API

```json
{
  "Certyo": {
    "BaseUrl": "https://www.certyos.com",
    "ApiKey": "cty_xxxxxxxxxxxxxxxxxxxx",
    "TenantId": "acme-corp"
  }
}
```

Store the API key in Azure Key Vault or .NET User Secrets for development. Never commit `appsettings.json` with real credentials.

## Field Mapping

| SQL Server Field | Certyo Field | Notes |
|-----------------|-------------|-------|
| `Products.ProductId` | `recordId` | Primary key, cast to string |
| `Products.ModifiedDate` | `sourceTimestamp` | ISO 8601 format |
| Hardcoded `"sqlserver"` | `database` | Identifies source system |
| `"dbo.Products"` (schema.table) | `collection` | Fully qualified table name |
| `__$operation` value | `operationType` | 2=insert, 4=update, 1=delete |
| `@@SERVERNAME + DB_NAME()` | `clientId` | Identifies source server+database |
| `ProductId + '-' + __$start_lsn` | `idempotencyKey` | LSN ensures uniqueness per change |
| Full row as JSON | `recordPayload` | All columns serialized to JSON |

### CDC Operation Mapping

| `__$operation` | Certyo `operationType` |
|---------------|----------------------|
| 1 | `delete` |
| 2 | `insert` |
| 3 | `update` (before image) -- skip |
| 4 | `update` (after image) -- use this |

## Code Examples

### 1. Enable CDC on SQL Server

```sql
-- Step 1: Enable CDC on the database
USE ProductDb;
GO
EXEC sys.sp_cdc_enable_db;
GO

-- Step 2: Enable CDC on the target table
EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name   = N'Products',
    @role_name     = N'cdc_reader',
    @supports_net_changes = 1;
GO

-- Step 3: Create the LSN tracking table
CREATE TABLE dbo.CdcTracker (
    Id              INT IDENTITY(1,1) PRIMARY KEY,
    TableName       NVARCHAR(256)   NOT NULL UNIQUE,
    LastLsn         BINARY(10)      NOT NULL,
    LastProcessedAt DATETIME2       NOT NULL DEFAULT SYSUTCDATETIME(),
    RecordsIngested BIGINT          NOT NULL DEFAULT 0
);
GO

-- Seed with the current minimum valid LSN
INSERT INTO dbo.CdcTracker (TableName, LastLsn)
VALUES ('dbo.Products', sys.fn_cdc_get_min_lsn('dbo_Products'));
GO

-- Step 4: Add Certyo tracking columns to the source table
ALTER TABLE dbo.Products ADD
    CertyoRecordHash    NVARCHAR(128)   NULL,
    CertyoAnchorStatus  NVARCHAR(20)    NULL DEFAULT 'Pending',
    CertyoVerifiedAt    DATETIME2       NULL;
GO

-- Step 5: Verify CDC is working
SELECT is_cdc_enabled FROM sys.databases WHERE name = 'ProductDb';
SELECT name, is_tracked_by_cdc FROM sys.tables WHERE name = 'Products';
GO
```

### 2. .NET 8 BackgroundService (CDC Polling + Certyo Ingestion)

```csharp
// File: CdcPollingWorker.cs
using System.Data;
using System.Net.Http.Json;
using System.Text.Json;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Options;

namespace CertyoCdcIntegration;

public class CertyoOptions
{
    public string BaseUrl { get; set; } = "https://www.certyos.com";
    public string ApiKey { get; set; } = "";
    public string TenantId { get; set; } = "";
}

public record CertyoRecord(
    string TenantId,
    string Database,
    string Collection,
    string RecordId,
    object RecordPayload,
    string? ClientId = null,
    string? OperationType = null,
    string? SourceTimestamp = null,
    string? IdempotencyKey = null
);

public record CertyoResponse(
    string RecordId,
    string RecordHash,
    string TenantId,
    DateTime AcceptedAt,
    bool IdempotencyReplayed
);

public record CertyoBulkRequest(List<CertyoRecord> Records);

public class CdcPollingWorker : BackgroundService
{
    private readonly ILogger<CdcPollingWorker> _logger;
    private readonly IHttpClientFactory _httpClientFactory;
    private readonly CertyoOptions _certyo;
    private readonly string _connectionString;
    private readonly TimeSpan _pollInterval = TimeSpan.FromSeconds(10);
    private readonly int _bulkThreshold = 10;

    public CdcPollingWorker(
        ILogger<CdcPollingWorker> logger,
        IHttpClientFactory httpClientFactory,
        IOptions<CertyoOptions> certyoOptions,
        IConfiguration configuration)
    {
        _logger = logger;
        _httpClientFactory = httpClientFactory;
        _certyo = certyoOptions.Value;
        _connectionString = configuration.GetConnectionString("SqlServer")
            ?? throw new InvalidOperationException("SqlServer connection string is required");
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("CDC Polling Worker started. Polling every {Interval}s", _pollInterval.TotalSeconds);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await PollAndIngestAsync(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error during CDC poll cycle");
            }

            await Task.Delay(_pollInterval, stoppingToken);
        }
    }

    private async Task PollAndIngestAsync(CancellationToken ct)
    {
        await using var connection = new SqlConnection(_connectionString);
        await connection.OpenAsync(ct);

        // Read the last processed LSN
        byte[] lastLsn = await GetLastLsnAsync(connection, "dbo.Products", ct);

        // Get the current maximum LSN
        byte[] maxLsn = await GetMaxLsnAsync(connection, ct);

        if (maxLsn == null || CompareByteArrays(lastLsn, maxLsn) >= 0)
        {
            return; // No new changes
        }

        // Query net changes since last LSN
        var changes = await GetNetChangesAsync(connection, lastLsn, maxLsn, ct);

        if (changes.Count == 0)
        {
            // Update LSN even if no relevant changes (cleanup records, etc.)
            await UpdateLastLsnAsync(connection, "dbo.Products", maxLsn, 0, ct);
            return;
        }

        _logger.LogInformation("Found {Count} CDC changes to ingest", changes.Count);

        // Ingest via bulk or single endpoint
        List<(string ProductId, string RecordHash)> results;
        if (changes.Count > _bulkThreshold)
        {
            results = await IngestBulkAsync(changes, ct);
        }
        else
        {
            results = await IngestSingleAsync(changes, ct);
        }

        // Write-back record hashes to source table
        await WriteBackHashesAsync(connection, results, ct);

        // Update LSN high-water mark
        await UpdateLastLsnAsync(connection, "dbo.Products", maxLsn, results.Count, ct);

        _logger.LogInformation("Ingested {Count} records, LSN updated", results.Count);
    }

    private async Task<List<Dictionary<string, object>>> GetNetChangesAsync(
        SqlConnection conn, byte[] fromLsn, byte[] toLsn, CancellationToken ct)
    {
        var changes = new List<Dictionary<string, object>>();

        await using var cmd = conn.CreateCommand();
        cmd.CommandText = @"
            SELECT *
            FROM cdc.fn_cdc_get_net_changes_dbo_Products(@from_lsn, @to_lsn, 'all with merge')
            WHERE __$operation IN (2, 4, 1)
            ORDER BY __$start_lsn";
        cmd.Parameters.Add("@from_lsn", SqlDbType.Binary, 10).Value = fromLsn;
        cmd.Parameters.Add("@to_lsn", SqlDbType.Binary, 10).Value = toLsn;

        await using var reader = await cmd.ExecuteReaderAsync(ct);
        while (await reader.ReadAsync(ct))
        {
            var row = new Dictionary<string, object>();
            for (int i = 0; i < reader.FieldCount; i++)
            {
                string name = reader.GetName(i);
                object value = reader.IsDBNull(i) ? null! : reader.GetValue(i);
                row[name] = value;
            }
            changes.Add(row);
        }

        return changes;
    }

    private CertyoRecord MapChangeToRecord(Dictionary<string, object> change)
    {
        int operation = Convert.ToInt32(change["__$operation"]);
        string operationType = operation switch
        {
            1 => "delete",
            2 => "insert",
            4 => "update",
            _ => "upsert"
        };

        string productId = change["ProductId"]?.ToString() ?? "";
        byte[] lsn = change["__$start_lsn"] as byte[] ?? Array.Empty<byte>();
        string lsnHex = BitConverter.ToString(lsn).Replace("-", "");

        // Build payload excluding CDC system columns
        var payload = new Dictionary<string, object>();
        foreach (var kvp in change)
        {
            if (!kvp.Key.StartsWith("__$") &&
                kvp.Key != "CertyoRecordHash" &&
                kvp.Key != "CertyoAnchorStatus" &&
                kvp.Key != "CertyoVerifiedAt")
            {
                payload[kvp.Key] = kvp.Value;
            }
        }

        string sourceTimestamp = change.ContainsKey("ModifiedDate") && change["ModifiedDate"] != null
            ? ((DateTime)change["ModifiedDate"]).ToString("o")
            : DateTime.UtcNow.ToString("o");

        return new CertyoRecord(
            TenantId: _certyo.TenantId,
            Database: "sqlserver",
            Collection: "dbo.Products",
            RecordId: productId,
            RecordPayload: payload,
            ClientId: _connectionString.Split(';')
                .FirstOrDefault(s => s.TrimStart().StartsWith("Server=", StringComparison.OrdinalIgnoreCase))
                ?.Split('=')[1],
            OperationType: operationType,
            SourceTimestamp: sourceTimestamp,
            IdempotencyKey: $"{productId}-{lsnHex}"
        );
    }

    private async Task<List<(string ProductId, string RecordHash)>> IngestBulkAsync(
        List<Dictionary<string, object>> changes, CancellationToken ct)
    {
        var results = new List<(string, string)>();
        var client = CreateHttpClient();

        // Process in batches of 1000 (Certyo bulk limit)
        foreach (var batch in changes.Chunk(1000))
        {
            var records = batch.Select(MapChangeToRecord).ToList();
            var bulkRequest = new CertyoBulkRequest(records);

            var response = await client.PostAsJsonAsync(
                $"{_certyo.BaseUrl}/api/v1/records/bulk", bulkRequest, ct);

            if (response.IsSuccessStatusCode)
            {
                var body = await response.Content.ReadFromJsonAsync<List<CertyoResponse>>(ct);
                if (body != null)
                {
                    results.AddRange(body.Select(r => (r.RecordId, r.RecordHash)));
                }
            }
            else
            {
                _logger.LogError("Bulk ingestion failed: {Status} {Body}",
                    response.StatusCode,
                    await response.Content.ReadAsStringAsync(ct));
            }
        }

        return results;
    }

    private async Task<List<(string ProductId, string RecordHash)>> IngestSingleAsync(
        List<Dictionary<string, object>> changes, CancellationToken ct)
    {
        var results = new List<(string, string)>();
        var client = CreateHttpClient();

        foreach (var change in changes)
        {
            var record = MapChangeToRecord(change);
            int retries = 0;
            const int maxRetries = 3;

            while (retries <= maxRetries)
            {
                try
                {
                    var response = await client.PostAsJsonAsync(
                        $"{_certyo.BaseUrl}/api/v1/records", record, ct);

                    if (response.StatusCode == System.Net.HttpStatusCode.Accepted)
                    {
                        var body = await response.Content.ReadFromJsonAsync<CertyoResponse>(ct);
                        if (body != null)
                        {
                            results.Add((body.RecordId, body.RecordHash));
                        }
                        break;
                    }
                    else if ((int)response.StatusCode == 429)
                    {
                        // Exponential backoff on rate limit
                        int delay = (int)Math.Pow(2, retries) * 1000;
                        _logger.LogWarning("Rate limited, retrying in {Delay}ms", delay);
                        await Task.Delay(delay, ct);
                        retries++;
                    }
                    else
                    {
                        _logger.LogError("Ingestion failed for {RecordId}: {Status}",
                            record.RecordId, response.StatusCode);
                        break;
                    }
                }
                catch (HttpRequestException ex)
                {
                    _logger.LogError(ex, "HTTP error ingesting {RecordId}", record.RecordId);
                    int delay = (int)Math.Pow(2, retries) * 1000;
                    await Task.Delay(delay, ct);
                    retries++;
                }
            }
        }

        return results;
    }

    private async Task WriteBackHashesAsync(
        SqlConnection conn, List<(string ProductId, string RecordHash)> results, CancellationToken ct)
    {
        foreach (var (productId, recordHash) in results)
        {
            await using var cmd = conn.CreateCommand();
            cmd.CommandText = @"
                UPDATE dbo.Products
                SET CertyoRecordHash = @hash,
                    CertyoAnchorStatus = 'Pending'
                WHERE ProductId = @productId";
            cmd.Parameters.AddWithValue("@hash", recordHash);
            cmd.Parameters.AddWithValue("@productId", productId);
            await cmd.ExecuteNonQueryAsync(ct);
        }
    }

    private async Task<byte[]> GetLastLsnAsync(SqlConnection conn, string tableName, CancellationToken ct)
    {
        await using var cmd = conn.CreateCommand();
        cmd.CommandText = "SELECT LastLsn FROM dbo.CdcTracker WHERE TableName = @table";
        cmd.Parameters.AddWithValue("@table", tableName);
        var result = await cmd.ExecuteScalarAsync(ct);
        return result as byte[] ?? Array.Empty<byte>();
    }

    private async Task<byte[]> GetMaxLsnAsync(SqlConnection conn, CancellationToken ct)
    {
        await using var cmd = conn.CreateCommand();
        cmd.CommandText = "SELECT sys.fn_cdc_get_max_lsn()";
        var result = await cmd.ExecuteScalarAsync(ct);
        return result as byte[] ?? Array.Empty<byte>();
    }

    private async Task UpdateLastLsnAsync(
        SqlConnection conn, string tableName, byte[] lsn, int count, CancellationToken ct)
    {
        await using var cmd = conn.CreateCommand();
        cmd.CommandText = @"
            UPDATE dbo.CdcTracker
            SET LastLsn = @lsn,
                LastProcessedAt = SYSUTCDATETIME(),
                RecordsIngested = RecordsIngested + @count
            WHERE TableName = @table";
        cmd.Parameters.Add("@lsn", SqlDbType.Binary, 10).Value = lsn;
        cmd.Parameters.AddWithValue("@count", count);
        cmd.Parameters.AddWithValue("@table", tableName);
        await cmd.ExecuteNonQueryAsync(ct);
    }

    private HttpClient CreateHttpClient()
    {
        var client = _httpClientFactory.CreateClient("Certyo");
        client.DefaultRequestHeaders.Add("X-API-Key", _certyo.ApiKey);
        client.Timeout = TimeSpan.FromSeconds(30);
        return client;
    }

    private static int CompareByteArrays(byte[] a, byte[] b)
    {
        for (int i = 0; i < Math.Min(a.Length, b.Length); i++)
        {
            int cmp = a[i].CompareTo(b[i]);
            if (cmp != 0) return cmp;
        }
        return a.Length.CompareTo(b.Length);
    }
}
```

### 3. Program.cs (Service Registration)

```csharp
// File: Program.cs
using CertyoCdcIntegration;

var builder = Host.CreateApplicationBuilder(args);

builder.Services.Configure<CertyoOptions>(
    builder.Configuration.GetSection("Certyo"));

builder.Services.AddHttpClient("Certyo");
builder.Services.AddHostedService<CdcPollingWorker>();
builder.Services.AddHostedService<CertyoVerificationWorker>();

var host = builder.Build();
host.Run();
```

### 4. Verification Worker (BackgroundService)

```csharp
// File: CertyoVerificationWorker.cs
using System.Net.Http.Json;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Options;

namespace CertyoCdcIntegration;

public record VerifyRequest(string TenantId, string RecordId, string RecordHash);

public record VerifyResponse(
    bool Verified,
    string? PolygonTxHash,
    string? AnchorStatus
);

public class CertyoVerificationWorker : BackgroundService
{
    private readonly ILogger<CertyoVerificationWorker> _logger;
    private readonly IHttpClientFactory _httpClientFactory;
    private readonly CertyoOptions _certyo;
    private readonly string _connectionString;
    private readonly TimeSpan _pollInterval = TimeSpan.FromMinutes(2);

    public CertyoVerificationWorker(
        ILogger<CertyoVerificationWorker> logger,
        IHttpClientFactory httpClientFactory,
        IOptions<CertyoOptions> certyoOptions,
        IConfiguration configuration)
    {
        _logger = logger;
        _httpClientFactory = httpClientFactory;
        _certyo = certyoOptions.Value;
        _connectionString = configuration.GetConnectionString("SqlServer")!;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Verification Worker started. Polling every {Interval}m", _pollInterval.TotalMinutes);

        // Initial delay to let ingestion process first
        await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await VerifyPendingRecordsAsync(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error during verification cycle");
            }

            await Task.Delay(_pollInterval, stoppingToken);
        }
    }

    private async Task VerifyPendingRecordsAsync(CancellationToken ct)
    {
        await using var connection = new SqlConnection(_connectionString);
        await connection.OpenAsync(ct);

        // Query records pending verification (older than 2 minutes)
        await using var cmd = connection.CreateCommand();
        cmd.CommandText = @"
            SELECT TOP 100 ProductId, CertyoRecordHash
            FROM dbo.Products
            WHERE CertyoAnchorStatus = 'Pending'
              AND CertyoRecordHash IS NOT NULL
              AND ModifiedDate < DATEADD(MINUTE, -2, SYSUTCDATETIME())
            ORDER BY ModifiedDate ASC";

        var pendingRecords = new List<(string ProductId, string RecordHash)>();
        await using var reader = await cmd.ExecuteReaderAsync(ct);
        while (await reader.ReadAsync(ct))
        {
            pendingRecords.Add((
                reader["ProductId"].ToString()!,
                reader["CertyoRecordHash"].ToString()!
            ));
        }
        await reader.CloseAsync();

        if (pendingRecords.Count == 0) return;

        _logger.LogInformation("Verifying {Count} pending records", pendingRecords.Count);

        var client = _httpClientFactory.CreateClient("Certyo");
        client.DefaultRequestHeaders.Add("X-API-Key", _certyo.ApiKey);

        foreach (var (productId, recordHash) in pendingRecords)
        {
            try
            {
                var request = new VerifyRequest(_certyo.TenantId, productId, recordHash);
                var response = await client.PostAsJsonAsync(
                    $"{_certyo.BaseUrl}/api/v1/verify/record", request, ct);

                if (response.IsSuccessStatusCode)
                {
                    var result = await response.Content.ReadFromJsonAsync<VerifyResponse>(ct);
                    if (result?.Verified == true)
                    {
                        await using var updateCmd = connection.CreateCommand();
                        updateCmd.CommandText = @"
                            UPDATE dbo.Products
                            SET CertyoAnchorStatus = 'Anchored',
                                CertyoVerifiedAt = SYSUTCDATETIME()
                            WHERE ProductId = @productId";
                        updateCmd.Parameters.AddWithValue("@productId", productId);
                        await updateCmd.ExecuteNonQueryAsync(ct);

                        _logger.LogInformation("Record {ProductId} verified and anchored", productId);
                    }
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to verify record {ProductId}", productId);
            }
        }
    }
}
```

### 5. appsettings.json Template

```json
{
  "ConnectionStrings": {
    "SqlServer": "Server=localhost;Database=ProductDb;User Id=certyo_cdc_reader;Password=CHANGE_ME;Encrypt=True;TrustServerCertificate=True;"
  },
  "Certyo": {
    "BaseUrl": "https://www.certyos.com",
    "ApiKey": "cty_CHANGE_ME",
    "TenantId": "acme-corp"
  },
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "CertyoCdcIntegration": "Information"
    }
  }
}
```

### 6. Azure Function with SQL Trigger (Alternative)

```csharp
// File: CertyoCdcFunction.cs
// For Azure SQL databases, use an Azure Function with SQL trigger binding
// instead of the BackgroundService polling pattern.

using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Extensions.Sql;
using Microsoft.Extensions.Logging;
using System.Net.Http.Json;

namespace CertyoCdcFunction;

public class ProductChange
{
    public int ProductId { get; set; }
    public string ProductName { get; set; } = "";
    public string SKU { get; set; } = "";
    public decimal Price { get; set; }
    public string Category { get; set; } = "";
    public DateTime ModifiedDate { get; set; }
}

public class CertyoCdcFunction
{
    private readonly ILogger<CertyoCdcFunction> _logger;
    private readonly HttpClient _httpClient;
    private readonly string _tenantId;

    public CertyoCdcFunction(
        ILogger<CertyoCdcFunction> logger,
        IHttpClientFactory httpClientFactory)
    {
        _logger = logger;
        _httpClient = httpClientFactory.CreateClient("Certyo");
        _httpClient.DefaultRequestHeaders.Add("X-API-Key",
            Environment.GetEnvironmentVariable("CERTYO_API_KEY"));
        _tenantId = Environment.GetEnvironmentVariable("CERTYO_TENANT_ID") ?? "";
    }

    [Function("IngestProductChanges")]
    public async Task Run(
        [SqlTrigger("[dbo].[Products]", "SqlServer")]
        IReadOnlyList<SqlChange<ProductChange>> changes)
    {
        if (changes.Count == 0) return;

        _logger.LogInformation("Processing {Count} product changes", changes.Count);

        var records = changes.Select(change => new
        {
            tenantId = _tenantId,
            database = "sqlserver",
            collection = "dbo.Products",
            recordId = change.Item.ProductId.ToString(),
            recordPayload = new
            {
                change.Item.ProductId,
                change.Item.ProductName,
                change.Item.SKU,
                change.Item.Price,
                change.Item.Category,
                change.Item.ModifiedDate,
            },
            operationType = change.Operation switch
            {
                SqlChangeOperation.Insert => "insert",
                SqlChangeOperation.Update => "update",
                SqlChangeOperation.Delete => "delete",
                _ => "upsert"
            },
            sourceTimestamp = change.Item.ModifiedDate.ToString("o"),
            idempotencyKey = $"{change.Item.ProductId}-{change.Item.ModifiedDate:yyyyMMddHHmmss}"
        }).ToList();

        string baseUrl = Environment.GetEnvironmentVariable("CERTYO_BASE_URL")
            ?? "https://www.certyos.com";

        if (records.Count > 10)
        {
            var response = await _httpClient.PostAsJsonAsync(
                $"{baseUrl}/api/v1/records/bulk",
                new { Records = records });

            _logger.LogInformation("Bulk ingestion: {Status}", response.StatusCode);
        }
        else
        {
            foreach (var record in records)
            {
                var response = await _httpClient.PostAsJsonAsync(
                    $"{baseUrl}/api/v1/records", record);

                _logger.LogInformation("Ingested product {Id}: {Status}",
                    record.recordId, response.StatusCode);
            }
        }
    }
}
```

### 7. Python Alternative (pyodbc)

```python
"""
CDC Polling + Certyo Ingestion using Python + pyodbc.
Suitable for teams that prefer Python or run on Linux without .NET.
"""

import pyodbc
import requests
import json
import time
import logging
from datetime import datetime, timezone

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("certyo-cdc")

# --- Configuration ---
SQL_CONNECTION_STRING = (
    "DRIVER={ODBC Driver 18 for SQL Server};"
    "SERVER=localhost;"
    "DATABASE=ProductDb;"
    "UID=certyo_cdc_reader;"
    "PWD=CHANGE_ME;"
    "Encrypt=yes;"
    "TrustServerCertificate=yes;"
)

CERTYO_BASE_URL = "https://www.certyos.com"
CERTYO_API_KEY = "cty_CHANGE_ME"
CERTYO_TENANT_ID = "acme-corp"

POLL_INTERVAL_SECONDS = 10
BULK_THRESHOLD = 10
MAX_RETRIES = 3


def get_connection():
    return pyodbc.connect(SQL_CONNECTION_STRING)


def get_last_lsn(cursor, table_name):
    cursor.execute(
        "SELECT LastLsn FROM dbo.CdcTracker WHERE TableName = ?", table_name
    )
    row = cursor.fetchone()
    return row[0] if row else None


def get_max_lsn(cursor):
    cursor.execute("SELECT sys.fn_cdc_get_max_lsn()")
    row = cursor.fetchone()
    return row[0] if row else None


def get_net_changes(cursor, from_lsn, to_lsn):
    cursor.execute(
        """SELECT *
           FROM cdc.fn_cdc_get_net_changes_dbo_Products(?, ?, 'all with merge')
           WHERE __$operation IN (2, 4, 1)
           ORDER BY __$start_lsn""",
        from_lsn,
        to_lsn,
    )
    columns = [desc[0] for desc in cursor.description]
    return [dict(zip(columns, row)) for row in cursor.fetchall()]


def map_operation(op_code):
    return {1: "delete", 2: "insert", 4: "update"}.get(op_code, "upsert")


def map_change_to_record(change):
    payload = {
        k: (str(v) if isinstance(v, (datetime, bytes)) else v)
        for k, v in change.items()
        if not k.startswith("__$") and k not in (
            "CertyoRecordHash", "CertyoAnchorStatus", "CertyoVerifiedAt"
        )
    }

    modified = change.get("ModifiedDate")
    source_ts = (
        modified.isoformat() if isinstance(modified, datetime)
        else datetime.now(timezone.utc).isoformat()
    )

    lsn_hex = change.get("__$start_lsn", b"").hex()

    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": "sqlserver",
        "collection": "dbo.Products",
        "recordId": str(change["ProductId"]),
        "recordPayload": payload,
        "operationType": map_operation(change["__$operation"]),
        "sourceTimestamp": source_ts,
        "idempotencyKey": f"{change['ProductId']}-{lsn_hex}",
    }


def ingest_records(records):
    headers = {"Content-Type": "application/json", "X-API-Key": CERTYO_API_KEY}
    results = []

    if len(records) > BULK_THRESHOLD:
        for i in range(0, len(records), 1000):
            batch = records[i : i + 1000]
            for attempt in range(MAX_RETRIES):
                try:
                    resp = requests.post(
                        f"{CERTYO_BASE_URL}/api/v1/records/bulk",
                        json={"records": batch},
                        headers=headers,
                        timeout=30,
                    )
                    if resp.status_code == 202:
                        results.extend(resp.json())
                        break
                    elif resp.status_code == 429:
                        time.sleep(2 ** attempt)
                    else:
                        logger.error("Bulk ingest failed: %s %s", resp.status_code, resp.text)
                        break
                except requests.RequestException as e:
                    logger.error("HTTP error: %s", e)
                    time.sleep(2 ** attempt)
    else:
        for record in records:
            for attempt in range(MAX_RETRIES):
                try:
                    resp = requests.post(
                        f"{CERTYO_BASE_URL}/api/v1/records",
                        json=record,
                        headers=headers,
                        timeout=30,
                    )
                    if resp.status_code == 202:
                        results.append(resp.json())
                        break
                    elif resp.status_code == 429:
                        time.sleep(2 ** attempt)
                    else:
                        logger.error(
                            "Ingest failed for %s: %s", record["recordId"], resp.status_code
                        )
                        break
                except requests.RequestException as e:
                    logger.error("HTTP error for %s: %s", record["recordId"], e)
                    time.sleep(2 ** attempt)

    return results


def update_last_lsn(cursor, table_name, lsn, count):
    cursor.execute(
        """UPDATE dbo.CdcTracker
           SET LastLsn = ?, LastProcessedAt = SYSUTCDATETIME(),
               RecordsIngested = RecordsIngested + ?
           WHERE TableName = ?""",
        lsn,
        count,
        table_name,
    )


def write_back_hashes(cursor, results):
    for r in results:
        cursor.execute(
            """UPDATE dbo.Products
               SET CertyoRecordHash = ?, CertyoAnchorStatus = 'Pending'
               WHERE ProductId = ?""",
            r["recordHash"],
            r["recordId"],
        )


def poll_loop():
    logger.info("Starting CDC poll loop (interval=%ds)", POLL_INTERVAL_SECONDS)

    while True:
        try:
            conn = get_connection()
            cursor = conn.cursor()

            from_lsn = get_last_lsn(cursor, "dbo.Products")
            to_lsn = get_max_lsn(cursor)

            if from_lsn and to_lsn and from_lsn < to_lsn:
                changes = get_net_changes(cursor, from_lsn, to_lsn)

                if changes:
                    records = [map_change_to_record(c) for c in changes]
                    results = ingest_records(records)
                    write_back_hashes(cursor, results)
                    update_last_lsn(cursor, "dbo.Products", to_lsn, len(results))
                    conn.commit()
                    logger.info("Ingested %d records", len(results))
                else:
                    update_last_lsn(cursor, "dbo.Products", to_lsn, 0)
                    conn.commit()

            cursor.close()
            conn.close()
        except Exception:
            logger.exception("Error in poll cycle")

        time.sleep(POLL_INTERVAL_SECONDS)


if __name__ == "__main__":
    poll_loop()
```

## Verification & Write-back

### Verification Flow

1. `CertyoVerificationWorker` runs every 2 minutes
2. Queries `dbo.Products` rows where `CertyoAnchorStatus = 'Pending'` and `ModifiedDate` is older than 2 minutes
3. Calls `POST /api/v1/verify/record` with `tenantId`, `recordId` (ProductId), and `recordHash`
4. On success, updates the source row:
   - `CertyoAnchorStatus` = `'Anchored'`
   - `CertyoVerifiedAt` = `SYSUTCDATETIME()`

### Write-back SQL

```sql
UPDATE dbo.Products
SET CertyoAnchorStatus = 'Anchored',
    CertyoVerifiedAt   = SYSUTCDATETIME()
WHERE ProductId = @ProductId;
```

### CDC and Write-back Interaction

When the verification worker updates `CertyoAnchorStatus` and `CertyoVerifiedAt`, CDC will capture that change. The CDC polling worker should filter out changes that only affect Certyo tracking columns to avoid infinite loops. The `MapChangeToRecord` method already excludes these columns from the `recordPayload`, but you should also add a filter:

```csharp
// In PollAndIngestAsync, after reading changes:
changes = changes.Where(c =>
{
    // Skip changes where only Certyo columns were modified
    var nonCertyoColumns = c.Keys
        .Where(k => !k.StartsWith("__$") &&
                     k != "CertyoRecordHash" &&
                     k != "CertyoAnchorStatus" &&
                     k != "CertyoVerifiedAt")
        .ToList();
    return nonCertyoColumns.Any();
}).ToList();
```

## Code Generation Rules

1. **Always track the LSN high-water mark in the `CdcTracker` table.** Update the `LastLsn` after every successful ingestion cycle, even if zero records were ingested. This prevents reprocessing old changes after a service restart.

2. **Use the bulk endpoint (`POST /api/v1/records/bulk`) when the batch contains more than 10 records.** The bulk endpoint accepts up to 1000 records per call. For batches exceeding 1000, split into multiple bulk calls. For 10 or fewer, use the single-record endpoint.

3. **CDC is available on Azure SQL Database (Hyperscale and General Purpose tiers).** It is not available on Azure SQL Basic or Standard S0-S2 tiers. Verify tier compatibility before enabling CDC. On Azure SQL, use the Azure Function with SQL trigger binding as an alternative.

4. **Never delete CDC tracking tables or disable CDC on production tables.** Disabling CDC drops the change tables and all unprocessed changes are lost permanently. Only disable CDC during planned migrations with explicit confirmation.

5. **Use `appsettings.json` and environment variables for all configuration.** Never hardcode connection strings, API keys, tenant IDs, or endpoint URLs in source code. Use Azure Key Vault or .NET User Secrets for sensitive values.

6. **Implement exponential backoff on Certyo HTTP 429 (Too Many Requests) responses.** Start with a 1-second delay, doubling on each retry up to 3 retries. Log rate-limit events for monitoring. Never retry immediately on 429.

7. **Use `fn_cdc_get_net_changes` with the `'all with merge'` option.** This returns one row per primary key with the final state, which is what Certyo needs. Do not use `fn_cdc_get_all_changes` unless you need the full change history.

8. **Filter out CDC operation code 3 (update before-image).** Only process operation codes 1 (delete), 2 (insert), and 4 (update after-image). The before-image row is not needed for Certyo ingestion.

9. **Construct idempotency keys from `ProductId` + `__$start_lsn` hex.** This guarantees uniqueness per change event. If a service restart causes re-reading of the same LSN range, Certyo will deduplicate via the idempotency key and return `idempotencyReplayed: true`.

10. **Exclude Certyo tracking columns from the `recordPayload`.** Columns like `CertyoRecordHash`, `CertyoAnchorStatus`, and `CertyoVerifiedAt` are metadata, not business data. Including them in the payload would cause hash mismatches on subsequent verifications when these columns change.
