Amazon Web Services 集成
使用 EventBridge、Lambda、Step Functions 和 API Gateway 将 Certyo 与 AWS 集成的参考架构。
前提条件
- 具有 EventBridge、Lambda、Step Functions 和 Secrets Manager IAM 权限的 AWS 账户
- AWS CLI v2 已配置凭据
- Certyo API 密钥(存储在 AWS Secrets Manager 中 — 参见下面的 身份验证 部分)
- Node.js 20+ 或 Python 3.12+(用于 Lambda 开发)
参考架构
此架构利用 AWS 原生事件路由和无服务器计算与 Certyo 集成:
Source System ──► EventBridge ──► API Destination (Certyo) ──► POST /api/v1/records
│ │
DLQ (SQS) Step Functions
for failed (verification workflow)
deliveries │
┌──────┴──────┐
│ Wait 90s │
│ Verify │
│ Branch │
└──────┬──────┘
│
SNS notification
(anchored / failed)EventBridge 从您的源系统(S3、DynamoDB Streams、自定义应用)接收领域事件。对于简单转发,API Destinations 直接调用 Certyo 无需 Lambda 中间层。对于复杂转换,Lambda 在采集前处理事件。Step Functions 编排多步骤验证工作流。
EventBridge API Destinations
最简单的集成模式:EventBridge 通过 API Destination 直接调用 Certyo API。无需 Lambda 代码即可实现简单事件转发。
工作原理
- 连接 — 使用
API_KEY身份验证类型存储 Certyo API 密钥。如果您更新密钥,EventBridge 会管理凭据轮换。 - API Destination — 定义 Certyo endpoint URL、HTTP 方法和调用速率限制。
- 规则 — 按来源、详情类型或自定义模式匹配传入事件,并将其路由到 API Destination。
- Input Transformer — 将您的事件 schema 映射到 Certyo 记录 payload — 无需任何 Lambda 代码。
- 重试 + DLQ — 内置重试带指数退避(最多 24 小时 185 次重试)。失败的事件落入 SQS dead-letter 队列。
Lambda Functions
对于需要在采集前进行转换、丰富或条件逻辑的事件,使用 Lambda 作为处理层:
- S3 触发器 — bucket 中的新对象触发 Lambda,读取文件、提取记录并将每条发送到 Certyo
- DynamoDB Streams — 从 DynamoDB 表捕获更改数据并将修改的项目作为 Certyo 记录转发
- Webhook 处理程序 — 通过 API Gateway 接收第三方系统的 webhook,转换并采集
Step Functions
编排多步骤工作流:采集记录、等待锚定、验证结果,并根据成功或失败进行分支:
- Standard Workflow — 适用于长期运行的验证流(最长 1 年执行)。使用 Wait 状态暂停等待 Certyo 约 60-90 秒的锚定窗口。
- Express Workflow — 适用于高容量、短时间执行(最长 5 分钟)。适合批量采集,验证单独处理。
API Gateway
为内部消费者使用 API Gateway 前置 Certyo,用于使用计划、节流和 API 密钥管理:
- 创建 REST API 代理将请求转发到
https://www.certyos.com/api/v1/ - 添加带节流的使用计划(如每个消费者 100 请求/秒)和月度配额
- 通过 Lambda authorizer 或 VTL 映射模板注入 Certyo
X-API-Key - 内部消费者使用自己的 API Gateway 密钥进行身份验证 — 他们永远看不到 Certyo 密钥
代码示例
import json
import os
import urllib.parse
import boto3
import urllib3
secrets_client = boto3.client("secretsmanager")
s3_client = boto3.client("s3")
http = urllib3.PoolManager()
CERTYO_API_URL = os.environ.get("CERTYO_API_URL", "https://www.certyos.com")
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
SECRET_ARN = os.environ["CERTYO_SECRET_ARN"]
_api_key_cache = None
def get_api_key():
global _api_key_cache
if _api_key_cache is None:
secret = secrets_client.get_secret_value(SecretId=SECRET_ARN)
_api_key_cache = json.loads(secret["SecretString"])["apiKey"]
return _api_key_cache
def handler(event, context):
"""
Triggered by S3 PutObject events. Reads the uploaded JSON file,
extracts records, and sends each to Certyo for blockchain anchoring.
"""
api_key = get_api_key()
results = []
for record in event["Records"]:
bucket = record["s3"]["bucket"]["name"]
key = urllib.parse.unquote_plus(record["s3"]["object"]["key"])
print(f"Processing s3://{bucket}/{key}")
# Read the S3 object
obj = s3_client.get_object(Bucket=bucket, Key=key)
body = json.loads(obj["Body"].read().decode("utf-8"))
# Support single record or array of records
items = body if isinstance(body, list) else [body]
for item in items:
record_id = item.get("id", key.split("/")[-1].replace(".json", ""))
payload = {
"tenantId": CERTYO_TENANT_ID,
"database": f"s3-{bucket}",
"collection": key.rsplit("/", 1)[0] if "/" in key else "root",
"recordId": record_id,
"recordVersion": str(item.get("version", "1")),
"operationType": "upsert",
"recordPayload": item,
"sourceTimestamp": record["eventTime"],
"idempotencyKey": f"{record_id}-{record['eventTime'][:10]}",
}
response = http.request(
"POST",
f"{CERTYO_API_URL}/api/v1/records",
body=json.dumps(payload),
headers={
"X-API-Key": api_key,
"Content-Type": "application/json",
},
)
result = json.loads(response.data.decode("utf-8"))
if response.status != 202:
print(f"ERROR: Certyo returned {response.status} for {record_id}: {result}")
raise Exception(f"Ingestion failed for {record_id}: {response.status}")
print(f"Record {record_id} accepted. Hash: {result['recordHash']}")
results.append(result)
return {
"statusCode": 200,
"body": json.dumps({
"message": f"Ingested {len(results)} records",
"records": results,
}),
}import * as cdk from "aws-cdk-lib";
import * as events from "aws-cdk-lib/aws-events";
import * as targets from "aws-cdk-lib/aws-events-targets";
import * as sqs from "aws-cdk-lib/aws-sqs";
import * as secretsmanager from "aws-cdk-lib/aws-secretsmanager";
import { Construct } from "constructs";
export class CertyoEventBridgeStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// Store the Certyo API key in Secrets Manager
const certyoSecret = new secretsmanager.Secret(this, "CertyoApiKey", {
secretName: "certyo/api-key",
description: "Certyo API key for record ingestion",
secretObjectValue: {
apiKey: cdk.SecretValue.unsafePlainText("REPLACE_WITH_ACTUAL_KEY"),
},
});
// Dead-letter queue for failed deliveries
const dlq = new sqs.Queue(this, "CertyoDlq", {
queueName: "certyo-eventbridge-dlq",
retentionPeriod: cdk.Duration.days(14),
encryption: sqs.QueueEncryption.SQS_MANAGED,
});
// EventBridge Connection — stores Certyo API key credentials
const connection = new events.Connection(this, "CertyoConnection", {
connectionName: "certyo-api-connection",
description: "Connection to Certyo Records API",
authorization: events.Authorization.apiKey(
"X-API-Key",
cdk.SecretValue.secretsManager(certyoSecret.secretArn, {
jsonField: "apiKey",
})
),
});
// EventBridge API Destination — the Certyo endpoint
const apiDestination = new events.ApiDestination(
this,
"CertyoApiDestination",
{
apiDestinationName: "certyo-records-api",
connection,
endpoint: "https://www.certyos.com/api/v1/records",
httpMethod: events.HttpMethod.POST,
rateLimitPerSecond: 100,
description: "Certyo record ingestion endpoint",
}
);
// Custom event bus for your domain events
const bus = new events.EventBus(this, "DomainEventBus", {
eventBusName: "domain-events",
});
// Rule: route matching events to Certyo API Destination
new events.Rule(this, "IngestToCertyoRule", {
eventBus: bus,
ruleName: "route-to-certyo",
description: "Forward domain record events to Certyo for blockchain anchoring",
eventPattern: {
source: ["com.yourorg.orders", "com.yourorg.inventory"],
detailType: ["RecordCreated", "RecordUpdated"],
},
targets: [
new targets.ApiDestination(apiDestination, {
// Transform the EventBridge event into Certyo record format
event: events.RuleTargetInput.fromObject({
tenantId: events.EventField.fromPath("$.detail.tenantId"),
database: events.EventField.fromPath("$.source"),
collection: events.EventField.fromPath("$.detail-type"),
recordId: events.EventField.fromPath("$.detail.recordId"),
recordVersion: events.EventField.fromPath("$.detail.version"),
operationType: "upsert",
recordPayload: events.EventField.fromPath("$.detail.payload"),
sourceTimestamp: events.EventField.fromPath("$.time"),
idempotencyKey: events.EventField.fromPath("$.detail.idempotencyKey"),
}),
deadLetterQueue: dlq,
retryAttempts: 185,
maxEventAge: cdk.Duration.hours(24),
}),
],
});
// Outputs
new cdk.CfnOutput(this, "EventBusArn", { value: bus.eventBusArn });
new cdk.CfnOutput(this, "DlqUrl", { value: dlq.queueUrl });
new cdk.CfnOutput(this, "SecretArn", { value: certyoSecret.secretArn });
}
}{
"Comment": "Certyo record ingestion and verification workflow",
"StartAt": "IngestRecord",
"States": {
"IngestRecord": {
"Type": "Task",
"Resource": "arn:aws:states:::http:invoke",
"Parameters": {
"ApiEndpoint": "https://www.certyos.com/api/v1/records",
"Method": "POST",
"Authentication": {
"ConnectionArn.$": "$.connectionArn"
},
"RequestBody": {
"tenantId.$": "$.tenantId",
"database.$": "$.database",
"collection.$": "$.collection",
"recordId.$": "$.recordId",
"recordVersion.$": "$.recordVersion",
"operationType": "upsert",
"recordPayload.$": "$.recordPayload",
"sourceTimestamp.$": "$$.State.EnteredTime",
"idempotencyKey.$": "States.Format('{}-{}-{}', $.recordId, $.recordVersion, States.ArrayGetItem(States.StringSplit($$.State.EnteredTime, 'T'), 0))"
},
"Headers": {
"Content-Type": "application/json"
}
},
"ResultPath": "$.ingestionResult",
"Retry": [
{
"ErrorEquals": ["States.Http.StatusCode.429", "States.Http.StatusCode.503"],
"IntervalSeconds": 5,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "IngestionFailed",
"ResultPath": "$.error"
}
],
"Next": "WaitForAnchoring"
},
"WaitForAnchoring": {
"Type": "Wait",
"Seconds": 90,
"Comment": "Wait for Certyo's accumulation window (60s) + Polygon confirmation (~30s)",
"Next": "VerifyRecord"
},
"VerifyRecord": {
"Type": "Task",
"Resource": "arn:aws:states:::http:invoke",
"Parameters": {
"ApiEndpoint": "https://www.certyos.com/api/v1/verify/record",
"Method": "POST",
"Authentication": {
"ConnectionArn.$": "$.connectionArn"
},
"RequestBody": {
"tenantId.$": "$.tenantId",
"database.$": "$.database",
"collection.$": "$.collection",
"recordId.$": "$.recordId",
"payload.$": "$.recordPayload"
},
"Headers": {
"Content-Type": "application/json"
}
},
"ResultPath": "$.verificationResult",
"Retry": [
{
"ErrorEquals": ["States.Http.StatusCode.429"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "VerificationFailed",
"ResultPath": "$.error"
}
],
"Next": "CheckVerificationResult"
},
"CheckVerificationResult": {
"Type": "Choice",
"Choices": [
{
"And": [
{ "Variable": "$.verificationResult.ResponseBody.verified", "BooleanEquals": true },
{ "Variable": "$.verificationResult.ResponseBody.anchoredOnChain", "BooleanEquals": true }
],
"Next": "RecordAnchored"
},
{
"Variable": "$.verificationResult.ResponseBody.verified",
"BooleanEquals": false,
"Next": "RetryVerification"
}
],
"Default": "RetryVerification"
},
"RetryVerification": {
"Type": "Pass",
"Parameters": {
"retryCount.$": "States.MathAdd($.retryCount, 1)"
},
"ResultPath": "$.retryState",
"Next": "CheckRetryLimit"
},
"CheckRetryLimit": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.retryState.retryCount",
"NumericGreaterThan": 5,
"Next": "VerificationFailed"
}
],
"Default": "WaitBeforeRetry"
},
"WaitBeforeRetry": {
"Type": "Wait",
"Seconds": 30,
"Next": "VerifyRecord"
},
"RecordAnchored": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn.$": "$.snsTopicArn",
"Subject": "Certyo Record Anchored",
"Message": {
"recordId.$": "$.recordId",
"tenantId.$": "$.tenantId",
"recordHash.$": "$.ingestionResult.ResponseBody.recordHash",
"verified": true,
"anchoredOnChain": true,
"polygonScanUrl.$": "$.verificationResult.ResponseBody.onChainProof.polygonScanEventUrl"
}
},
"End": true
},
"IngestionFailed": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn.$": "$.snsTopicArn",
"Subject": "Certyo Ingestion Failed",
"Message": {
"recordId.$": "$.recordId",
"error.$": "$.error"
}
},
"End": true
},
"VerificationFailed": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn.$": "$.snsTopicArn",
"Subject": "Certyo Verification Failed",
"Message": {
"recordId.$": "$.recordId",
"error": "Verification did not succeed after maximum retries"
}
},
"End": true
}
}
}# Store the Certyo API key in Secrets Manager
aws secretsmanager create-secret \
--name certyo/api-key \
--description "Certyo API key for record ingestion" \
--secret-string '{"apiKey":"certyo_sk_live_your_key_here"}'
# Create an EventBridge Connection with API Key auth
# The connection securely stores the Certyo API key
aws events create-connection \
--name certyo-api-connection \
--authorization-type API_KEY \
--auth-parameters '{
"ApiKeyAuthParameters": {
"ApiKeyName": "X-API-Key",
"ApiKeyValue": "certyo_sk_live_your_key_here"
}
}' \
--description "Connection to Certyo Records API"
# Wait for connection to be AUTHORIZED
aws events describe-connection \
--name certyo-api-connection \
--query 'ConnectionState'
# Create the API Destination pointing to Certyo
CONNECTION_ARN=$(aws events describe-connection \
--name certyo-api-connection \
--query 'ConnectionArn' --output text)
aws events create-api-destination \
--name certyo-records-api \
--connection-arn $CONNECTION_ARN \
--invocation-endpoint "https://www.certyos.com/api/v1/records" \
--http-method POST \
--invocation-rate-limit-per-second 100 \
--description "Certyo record ingestion endpoint"
# Create a custom event bus
aws events create-event-bus --name domain-events
# Create a dead-letter queue for failed deliveries
aws sqs create-queue \
--queue-name certyo-eventbridge-dlq \
--attributes '{
"MessageRetentionPeriod": "1209600",
"SqsManagedSseEnabled": "true"
}'
DLQ_ARN=$(aws sqs get-queue-attributes \
--queue-url $(aws sqs get-queue-url --queue-name certyo-eventbridge-dlq --query 'QueueUrl' --output text) \
--attribute-names QueueArn --query 'Attributes.QueueArn' --output text)
# Create a rule that matches domain record events
API_DEST_ARN=$(aws events describe-api-destination \
--name certyo-records-api \
--query 'ApiDestinationArn' --output text)
# Create IAM role for EventBridge to invoke the API Destination
ROLE_ARN=$(aws iam create-role \
--role-name EventBridgeCertyoRole \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "events.amazonaws.com"},
"Action": "sts:AssumeRole"
}]
}' --query 'Role.Arn' --output text)
aws iam put-role-policy \
--role-name EventBridgeCertyoRole \
--policy-name InvokeApiDestination \
--policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["events:InvokeApiDestination"],
"Resource": ["'"$API_DEST_ARN"'"]
}]
}'
aws events put-rule \
--name route-to-certyo \
--event-bus-name domain-events \
--event-pattern '{
"source": ["com.yourorg.orders", "com.yourorg.inventory"],
"detail-type": ["RecordCreated", "RecordUpdated"]
}' \
--description "Forward domain record events to Certyo"
# Attach the API Destination as the rule target with input transformation
aws events put-targets \
--rule route-to-certyo \
--event-bus-name domain-events \
--targets '[{
"Id": "certyo-api-destination",
"Arn": "'"$API_DEST_ARN"'",
"RoleArn": "'"$ROLE_ARN"'",
"InputTransformer": {
"InputPathsMap": {
"tenantId": "$.detail.tenantId",
"database": "$.source",
"collection": "$.detail-type",
"recordId": "$.detail.recordId",
"version": "$.detail.version",
"payload": "$.detail.payload",
"timestamp": "$.time",
"idempotencyKey": "$.detail.idempotencyKey"
},
"InputTemplate": "{"tenantId": <tenantId>, "database": <database>, "collection": <collection>, "recordId": <recordId>, "recordVersion": <version>, "operationType": "upsert", "recordPayload": <payload>, "sourceTimestamp": <timestamp>, "idempotencyKey": <idempotencyKey>}"
},
"RetryPolicy": {
"MaximumRetryAttempts": 185,
"MaximumEventAgeInSeconds": 86400
},
"DeadLetterConfig": {
"Arn": "'"$DLQ_ARN"'"
}
}]'
echo "EventBridge pipeline configured. Send events to the 'domain-events' bus."
echo "Events matching the rule will be forwarded to Certyo automatically."身份验证
推荐的身份验证模式使用 AWS 原生密钥管理配合 IAM 进行服务间授权:
- Secrets Manager — 将 Certyo
X-API-Key存储为密钥。如果您的组织需要,启用自动轮换。 - EventBridge 连接 — 连接对象安全存储 API 密钥并将其注入每个 API Destination 请求。您无需在应用代码中处理密钥。
- Lambda 执行角色 — 授予 Lambda 函数对特定密钥 ARN 的
secretsmanager:GetSecretValue。使用最小权限原则。 - API Gateway API 密钥 — 对于内部消费者,发放绑定到使用计划的 API Gateway 密钥。网关通过 Lambda authorizer 或映射模板注入 Certyo 密钥。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadCertyoApiKey",
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": "arn:aws:secretsmanager:us-east-1:123456789012:secret:certyo/api-key-*"
}
]
}成本优化
根据您的工作负载选择正确的集成模式以最小化 AWS 成本:
- EventBridge API Destinations(最经济) — 无 Lambda 调用成本。您只需为 EventBridge 事件(每百万事件 $1.00)和 API Destination 调用付费。用于不需要转换的简单事件转发。
- EventBridge + Input Transformer — 如果只需要重塑事件 payload(重命名字段、提取嵌套值),使用 EventBridge Input Transformer 而非 Lambda。零计算成本。
- Lambda(用于转换) — 仅在需要从 S3 读取、调用外部 API 或在采集前应用复杂业务逻辑时使用 Lambda。使用 ARM64 (Graviton) 可节省 20% 成本。
- Step Functions Express — 对于高容量验证工作流(>100K 执行/月),Express Workflows 比 Standard Workflows 最多便宜 80%。
后续步骤
- 部署上面的 CDK 堆栈,在几分钟内配置完整的 EventBridge 管道
- 发布测试事件到
domain-events总线并验证它到达 Certyo - 部署 Step Functions 验证工作流实现端到端锚定确认
- 查看 采集指南 了解记录 schema 和幂等性详情
- 参见 验证指南 了解密码学证明详情
AI Integration Skill
Download a skill file that enables AI agents to generate working AWS + Certyo integration code for any language or framework.
What's inside
- 身份验证 — EventBridge 连接配合 API Key,Lambda 配合 Secrets Manager
- 架构 — EventBridge → API Destination 或 Lambda → Certyo → Step Functions 验证
- 代码示例 — Python Lambda、CDK TypeScript 堆栈、Step Functions ASL 工作流
- API Destinations — 零 Lambda 直接转发到 Certyo 配合重试策略
- Step Functions — 带有 Wait 状态、重试循环和分支的验证工作流
- 成本优化 — API Destinations vs Lambda vs Express Workflows 对比
How to use
Claude Code
Place the file in your project's .claude/commands/ directory, then use it as a slash command:
# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-aws.md \
https://www.certyos.com/developers/skills/certyo-aws-skill.md
# Use it in Claude Code
/certyo-aws "生成一个将事件转发到 Certyo 的 EventBridge API Destination"Cursor / Copilot / Any AI Agent
Add the file to your project root or attach it to a conversation. The AI agent will use the AWS-specific patterns, field mappings, and code examples to generate correct integration code.
# Add to your project
curl -o CERTYO_AWS.md \
https://www.certyos.com/developers/skills/certyo-aws-skill.md
# Then in your AI agent:
"Using the Certyo AWS spec in CERTYO_AWS.md,
生成一个将事件转发到 certyo 的 eventbridge api destination"CLAUDE.md Context File
Append the skill file to your project's CLAUDE.md so every Claude conversation has AWS + Certyo context automatically.
# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo AWS Integration" >> CLAUDE.md
cat CERTYO_AWS.md >> CLAUDE.md