基于 Crossplane 和 WebAuthn 构建 TDD 驱动的 Delta Lake 自助式供给服务


团队内部对隔离数据实验环境的需求正在爆炸式增长。每个数据科学家都希望有一个独立的 S3 Bucket、一套独立的 Glue Catalog 和专门的 IAM 策略,用来运行他们的 Delta Lake 查询,互不干扰。最初,这个任务落在平台工程团队头上,通过手动执行 Terraform 脚本来完成。很快,这成了我们的瓶颈。请求队列越来越长,配置错误时有发生,而且管理大量的 Terraform state 文件和 AWS 凭证也带来了安全隐患。

我们需要一个自助服务平台,但直接把 AWS 控制台或 Terraform 权限交给数据科学家是不可想象的。我们的目标是构建一个内部 API,它必须满足几个严苛的条件:

  1. 声明式与幂等性: 用户只需描述他们想要的 DeltaLakeEnvironment,系统应自动达到并维持该状态。重复请求不会产生副作用。
  2. 极致安全: 凭证管理必须自动化,并且用户认证过程不能依赖传统的密码,以杜绝钓鱼和凭证泄露风险。
  3. 高可靠性: 作为基础设施供给的核心服务,其逻辑必须经过严格测试,确保每一次操作都准确无误。

这直接将我们的技术选型引向了一组看似不相关的技术组合:使用 Crossplane 来处理声明式基础设施,集成 WebAuthn 实现无密码认证,并采用测试驱动开发 (TDD) 来保证核心控制逻辑的质量。

技术选型决策

在真实项目中,技术选择从来不是追求时髦,而是解决具体问题的权衡。

  • 为什么是 Crossplane 而不是 Terraform Wrapper?
    构建一个 API 来包装 terraform apply 命令是常见的初级方案。但它的问题在于,API 层需要处理状态锁定、幂等性检查、错误重试等复杂逻辑。Crossplane 将基础设施视为 Kubernetes 的一等公民,利用 K8s 成熟的控制循环(reconciliation loop)机制,天然地解决了状态管理和幂等性问题。我们可以定义一个高级别的抽象,比如 CompositeResourceDefinition (XRD),来描述一个 DeltaLakeEnvironment,而将底层的 AWS 资源(s3.Bucket, iam.Role, glue.Database)的编排细节封装在 Composition 中。这让平台团队能提供一个干净、稳定的平台 API,而不是一个脆弱的脚本执行器。

  • 为什么是 WebAuthn?
    这是一个操作基础设施的高权限入口。任何形式的静态凭证,无论是 API Key 还是密码,都存在泄露风险。WebAuthn 基于公钥加密,利用设备(如 YubiKey、笔记本电脑的 Touch ID)进行认证,私钥永不离开设备。这从根本上消除了网络钓鱼的可能性,是实现零信任架构中身份认证的理想选择。

  • 为什么坚持 TDD?
    我们要编写一个自定义控制器,它的核心职责是将用户通过 API 创建的 DeltaLakeRequest 转换为一个 Crossplane 的 CompositeResourceClaim。这个转换和调谐逻辑是整个系统的核心,一旦出错,可能导致资源创建错误、权限过大或资源泄露。TDD 强制我们先定义清楚行为和边界情况(例如,请求中的命名冲突、参数校验失败),再编写实现代码。这不仅能保证代码质量,更能生成一份“活文档”,即测试用例本身。

TDD 驱动控制器开发

我们的起点不是 YAML,也不是 API 服务,而是 Go 中的一个测试文件。我们将构建一个小型 Kubernetes 控制器,它不直接与 AWS 对话,而是作为 API 服务和 Crossplane 之间的桥梁。

provisioner/internal/controller/deltarequest_controller_test.go

// +build unit

package controller

import (
	"context"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	
	// Crossplane
	"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
	"github.com/crossplane/crossplane-runtime/pkg/resource"
	"github.com/crossplane/crossplane-runtime/pkg/test"
	xrv1 "github.com/crossplane/crossplane/apis/apiextensions/v1"
	
	// Our own API types
	platformv1alpha1 "github.com/our-org/delta-provisioner/apis/platform/v1alpha1"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// TestDeltaRequestReconciliation defines the core behavior.
// When a new DeltaLakeRequest is created, the controller must create a
// corresponding CompositeResourceClaim (XRClaim) with the correct spec.
func TestDeltaRequestReconciliation(t *testing.T) {
	// Arrange
	ctx := context.Background()
	requestName := "data-science-project-x"
	requestNamespace := "provisioning-system"
	ownerEmail := "[email protected]"
	teamName := "alpha-squad"

	// Define our custom resource (the input)
	deltaRequest := &platformv1alpha1.DeltaLakeRequest{
		ObjectMeta: metav1.ObjectMeta{
			Name:      requestName,
			Namespace: requestNamespace,
			UID:       types.UID("test-uid"),
		},
		Spec: platformv1alpha1.DeltaLakeRequestSpec{
			Owner:    ownerEmail,
			Team:     teamName,
			Region:   "us-east-1",
			DeletionPolicy: "Orphan", // Important for test environments
		},
	}

	// Define the expected output: a Crossplane Claim
	expectedClaim := &xrv1.CompositeResourceClaim{
		ObjectMeta: metav1.ObjectMeta{
			Name:      requestName,
			Namespace: requestNamespace,
			// The owner reference ensures that if the DeltaLakeRequest is deleted,
			// the Claim is garbage collected by Kubernetes. This is a critical link.
			OwnerReferences: []metav1.OwnerReference{
				{
					APIVersion: platformv1alpha1.GroupVersion.String(),
					Kind:       "DeltaLakeRequest",
					Name:       requestName,
					UID:        types.UID("test-uid"),
				},
			},
		},
		Spec: xrv1.CompositeResourceClaimSpec{
			CompositeResourceDefinitionName: "deltalakeenvironments.platform.our-org.io",
			Parameters: runtime.RawExtension{
				Raw: []byte(`{"bucketName":"` + requestName + `","team":"` + teamName + `","region":"us-east-1"}`),
			},
		},
	}
	
	// Mock Kubernetes API client
	scheme := runtime.NewScheme()
	platformv1alpha1.AddToScheme(scheme)
	xrv1.AddToScheme(scheme) // Add Crossplane types to the scheme
	
	// Start with the DeltaLakeRequest already existing in the mocked client
	mockClient := &test.MockClient{
		MockGet: func(ctx context.Context, key client.ObjectKey, obj client.Object) error {
			if key.Name == requestName {
				*obj.(*platformv1alpha1.DeltaLakeRequest) = *deltaRequest
				return nil
			}
			// Simulate "Not Found" for the Claim initially
			return test.NewNotFound(obj.GetObjectKind().GroupVersionKind().GroupKind(), key.Name)
		},
		MockCreate: test.NewMockCreate(),
	}

	// Create the reconciler instance with the mocked client
	reconciler := &DeltaLakeRequestReconciler{
		Client: mockClient,
		Scheme: scheme,
	}

	// Act
	req := reconcile.Request{NamespacedName: types.NamespacedName{Name: requestName, Namespace: requestNamespace}}
	res, err := reconciler.Reconcile(ctx, req)

	// Assert
	require.NoError(t, err, "Reconcile should not return an error")
	assert.False(t, res.Requeue, "Reconcile should not requeue")

	// Verify that client.Create was called with the correct Claim object
	createdObj := mockClient.MockCreate.CalledWith
	createdClaim, ok := createdObj.(*xrv1.CompositeResourceClaim)
	require.True(t, ok, "Expected a CompositeResourceClaim to be created")
	
	assert.Equal(t, expectedClaim.Name, createdClaim.Name, "Claim name mismatch")
	assert.Equal(t, expectedClaim.Namespace, createdClaim.Namespace, "Claim namespace mismatch")
	assert.Equal(t, expectedClaim.Spec.CompositeResourceDefinitionName, createdClaim.Spec.CompositeResourceDefinitionName, "XRD name mismatch")
	assert.JSONEq(t, string(expectedClaim.Spec.Parameters.Raw), string(createdClaim.Spec.Parameters.Raw), "Claim parameters mismatch")
	assert.Equal(t, expectedClaim.OwnerReferences[0].UID, createdClaim.OwnerReferences[0].UID, "Owner reference UID mismatch")
}

这个测试用例定义了控制器的核心契约。现在,我们编写让它通过的最小化实现。

provisioner/internal/controller/deltarequest_controller.go

package controller

import (
	"context"
	"encoding/json"
	
	"github.com/crossplane/crossplane-runtime/pkg/errors"
	"github.com/crossplane/crossplane-runtime/pkg/logging"
	"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
	"github.com/crossplane/crossplane-runtime/pkg/resource"
	xrv1 "github.com/crossplane/crossplane/apis/apiextensions/v1"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

	platformv1alpha1 "github.com/our-org/delta-provisioner/apis/platform/v1alpha1"
	
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
)

const (
	errGetRequest   = "cannot get DeltaLakeRequest"
	errCreateClaim  = "cannot create CompositeResourceClaim"
	errConstructClaim = "cannot construct CompositeResourceClaim from request"
)

// DeltaLakeRequestReconciler reconciles a DeltaLakeRequest object
type DeltaLakeRequestReconciler struct {
	client.Client
	Scheme *runtime.Scheme
	Log    logging.Logger
}

func (r *DeltaLakeRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	r.Log.Debug("Reconciling DeltaLakeRequest", "request", req.NamespacedName.String())

	// 1. Fetch the DeltaLakeRequest instance
	request := &platformv1alpha1.DeltaLakeRequest{}
	if err := r.Get(ctx, req.NamespacedName, request); err != nil {
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}

	// 2. Check if a corresponding claim already exists.
	// In a real implementation, we'd handle updates here too. For now, we focus on creation.
	claim := &xrv1.CompositeResourceClaim{}
	err := r.Get(ctx, req.NamespacedName, claim)
	if err == nil {
		// Claim already exists, our job is done for now.
		r.Log.Debug("Claim already exists, skipping creation.", "claim", claim.Name)
		return ctrl.Result{}, nil
	}
	if !resource.IsNotFound(err) {
		return ctrl.Result{}, errors.Wrap(err, "failed to get existing claim")
	}

	// 3. Construct the claim from the request
	desiredClaim, err := r.constructClaimForRequest(request)
	if err != nil {
		// This is a programming error, should not happen if logic is correct
		return ctrl.Result{}, errors.Wrap(err, errConstructClaim)
	}

	// Set the DeltaLakeRequest as the owner of the Claim.
	// This is crucial for garbage collection.
	if err := controllerutil.SetControllerReference(request, desiredClaim, r.Scheme); err != nil {
		return ctrl.Result{}, errors.Wrap(err, "cannot set owner reference on claim")
	}
	
	// 4. Create the CompositeResourceClaim in the cluster
	r.Log.Info("Creating a new CompositeResourceClaim", "claimName", desiredClaim.Name)
	if err := r.Create(ctx, desiredClaim); err != nil {
		return ctrl.Result{}, errors.Wrap(err, errCreateClaim)
	}

	return ctrl.Result{}, nil
}

// constructClaimForRequest translates our internal API object into a Crossplane claim.
func (r *DeltaLakeRequestReconciler) constructClaimForRequest(req *platformv1alpha1.DeltaLakeRequest) (*xrv1.CompositeResourceClaim, error) {
	// Define the parameters that our Crossplane Composition expects.
	// This is the contract between our controller and the platform team's Composition.
	params := map[string]interface{}{
		"bucketName": req.Name, // Use the request name for deterministic bucket naming
		"team":       req.Spec.Team,
		"region":     req.Spec.Region,
	}

	rawParams, err := json.Marshal(params)
	if err != nil {
		return nil, err
	}
	
	claim := &xrv1.CompositeResourceClaim{
		ObjectMeta: metav1.ObjectMeta{
			Name:      req.Name,
			Namespace: req.Namespace,
		},
		Spec: xrv1.CompositeResourceClaimSpec{
			CompositeResourceDefinitionName: "deltalakeenvironments.platform.our-org.io",
			Parameters: runtime.RawExtension{
				Raw: rawParams,
			},
		},
	}
	return claim, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *DeltaLakeRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&platformv1alpha1.DeltaLakeRequest{}).
		Owns(&xrv1.CompositeResourceClaim{}). // Watch claims it creates
		Complete(r)
}

定义平台抽象:Crossplane XRD 和 Composition

控制器现在有了逻辑,但它需要知道该创建什么样的 CompositeResourceClaim。这就是 Crossplane 的 XRDComposition 发挥作用的地方。

xrd-deltalakeenvironment.yaml - 定义了我们向用户暴露的 API (DeltaLakeEnvironment)。

apiVersion: apiextensions.crossplane.io/v1
kind: CompositeResourceDefinition
metadata:
  name: deltalakeenvironments.platform.our-org.io
spec:
  group: platform.our-org.io
  names:
    kind: DeltaLakeEnvironment
    plural: deltalakeenvironments
  claimNames:
    kind: DeltaLakeEnvironmentClaim # We use this claim name, but our controller creates it directly
    plural: deltalakeenvironmentclaims
  versions:
  - name: v1alpha1
    served: true
    referenceable: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            properties:
              parameters:
                type: object
                properties:
                  bucketName:
                    type: string
                    description: "The name of the S3 bucket for the Delta Lake tables."
                  team:
                    type: string
                    description: "The team tag that owns this environment for cost allocation."
                  region:
                    type: string
                    description: "The AWS region for the resources."
                required:
                  - bucketName
                  - team
                  - region
            required:
              - parameters

composition-aws-deltalake.yaml - 这是实现细节,它告诉 Crossplane 如何用 AWS 实际资源来满足一个 DeltaLakeEnvironment 的请求。

apiVersion: apiextensions.crossplane.io/v1
kind: Composition
metadata:
  name: aws.deltalakeenvironment.v1alpha1
  labels:
    provider: aws
spec:
  compositeTypeRef:
    apiVersion: platform.our-org.io/v1alpha1
    kind: DeltaLakeEnvironment
  resources:
    - name: s3-bucket
      base:
        apiVersion: s3.aws.upbound.io/v1beta1
        kind: Bucket
        spec:
          forProvider:
            acl: private
            tags:
              ManagedBy: "Crossplane"
      patches:
        - fromFieldPath: "spec.parameters.bucketName"
          toFieldPath: "metadata.name"
        - fromFieldPath: "spec.parameters.region"
          toFieldPath: "spec.forProvider.region"
        - fromFieldPath: "spec.parameters.team"
          toFieldPath: "spec.forProvider.tags.Team"
          
    - name: glue-database
      base:
        apiVersion: glue.aws.upbound.io/v1beta1
        kind: CatalogDatabase
        spec:
          forProvider:
            region: "us-east-1" # Glue Catalog is regional
      patches:
        - fromFieldPath: "spec.parameters.bucketName"
          toFieldPath: "spec.forProvider.name"
          transforms:
            - type: string
              string:
                fmt: "%s_db"
        - fromFieldPath: "spec.parameters.region"
          toFieldPath: "spec.forProvider.region"

    - name: iam-role-for-access
      base:
        apiVersion: iam.aws.upbound.io/v1beta1
        kind: Role
        spec:
          forProvider:
            assumeRolePolicy: |
              {
                "Version": "2012-10-17",
                "Statement": [
                  {
                    "Effect": "Allow",
                    "Principal": { "Service": "glue.amazonaws.com" },
                    "Action": "sts:AssumeRole"
                  }
                ]
              }
      patches:
        - fromFieldPath: "spec.parameters.bucketName"
          toFieldPath: "metadata.name"
          transforms:
            - type: string
              string:
                fmt: "delta-access-role-%s"

现在,当我们的控制器创建一个 CompositeResourceClaim,并且这个 Composition 被正确安装后,Crossplane 会自动创建 S3 Bucket、Glue Database 和 IAM Role,并将它们的状态关联起来。

WebAuthn API 认证层

最后一步是构建一个安全的 API 入口,让用户可以创建 DeltaLakeRequest 资源。我们将使用 Go 和一个 WebAuthn 库(如 github.com/go-webauthn/webauthn)来构建这个 API 服务。

sequenceDiagram
    participant User as 用户 (浏览器)
    participant RP as 供给服务API (Go)
    participant AuthDB as 认证数据库
    participant K8s as Kubernetes API
    participant Controller as 自定义控制器
    participant Crossplane as Crossplane

    User->>RP: 1. POST /register/begin (username)
    RP->>RP: 2. 生成挑战 (Challenge)
    RP->>AuthDB: 3. 存储用户信息和挑战
    RP-->>User: 4. 返回 WebAuthn 创建选项
    User->>User: 5. 调用 navigator.credentials.create()
    User->>RP: 6. POST /register/finish (公钥和签名)
    RP->>RP: 7. 验证签名和挑战
    RP->>AuthDB: 8. 存储用户公钥凭证

    Note over User, RP: --- 登录流程 ---

    User->>RP: 9. POST /login/begin (username)
    RP->>RP: 10. 生成新挑战
    RP-->>User: 11. 返回 WebAuthn 获取选项
    User->>User: 12. 调用 navigator.credentials.get()
    User->>RP: 13. POST /login/finish (签名响应)
    RP->>RP: 14. 验证签名
    RP-->>User: 15. 签发 JWT

    Note over User, K8s: --- 资源供给流程 ---

    User->>RP: 16. POST /provision (JWT, {spec})
    RP->>RP: 17. 验证 JWT
    RP->>K8s: 18. 创建 DeltaLakeRequest CR
    K8s-->>Controller: 19. 通知资源创建
    Controller->>K8s: 20. 创建 CompositeResourceClaim
    K8s-->>Crossplane: 21. 通知 Claim 创建
    Crossplane->>Crossplane: 22. 根据 Composition 调谐
    Crossplane-->>AWS: 23. 创建 S3, Glue, IAM...

下面是 API 服务中一个关键端点的简化实现,它在 WebAuthn 认证成功后创建 CR。

api/server.go

package main

import (
	"context"
	"net/http"
	
	"github.com/gin-gonic/gin"
	// Assume webAuthnMiddleware handles JWT validation from WebAuthn login
	// and injects user info into the context.
	
	platformv1alpha1 "github.com/our-org/delta-provisioner/apis/platform/v1alpha1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"sigs.k8s.io/controller-runtime/pkg/client"
)

// ProvisioningRequest defines the JSON body for a new environment request.
type ProvisioningRequest struct {
	Name   string `json:"name" binding:"required"`
	Team   string `json:"team" binding:"required"`
	Region string `json:"region" binding:"required"`
}

// createProvisioningHandler handles the creation of a DeltaLakeRequest resource.
// This endpoint is protected by our WebAuthn middleware.
func createProvisioningHandler(k8sClient client.Client) gin.HandlerFunc {
	return func(c *gin.Context) {
		var req ProvisioningRequest
		if err := c.ShouldBindJSON(&req); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
			return
		}
		
		// Extract user identity from the context (populated by middleware)
		ownerEmail, exists := c.Get("user_email")
		if !exists {
			c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid authentication token"})
			return
		}
		
		// Construct the custom resource object
		deltaRequest := &platformv1alpha1.DeltaLakeRequest{
			ObjectMeta: metav1.ObjectMeta{
				Name:      req.Name,
				Namespace: "provisioning-system", // All requests go to a dedicated namespace
				Labels: map[string]string{
					"owner": ownerEmail.(string),
					"team":  req.Team,
				},
			},
			Spec: platformv1alpha1.DeltaLakeRequestSpec{
				Owner:    ownerEmail.(string),
				Team:     req.Team,
				Region:   req.Region,
				DeletionPolicy: "Orphan", // Use a safe default
			},
		}

		// Use the controller-runtime client to create the resource in the cluster
		if err := k8sClient.Create(context.Background(), deltaRequest); err != nil {
			// Handle potential conflicts (e.g., name already exists)
			if client.IgnoreAlreadyExists(err) != nil {
				c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create provisioning request"})
				return
			}
			c.JSON(http.StatusConflict, gin.H{"error": "an environment with this name already exists"})
			return
		}
		
		c.JSON(http.StatusAccepted, gin.H{
			"message": "Delta Lake environment provisioning initiated",
			"name":    req.Name,
		})
	}
}

// main function to setup router, k8s client, webauthn, etc.
// ...

局限性与未来展望

这套方案虽然解决了核心痛点,但在生产环境中还有几个需要注意的边界和可迭代的方向。
首先,当前的控制器逻辑非常简单,仅处理创建。一个完整的控制器还需要处理 DeltaLakeRequest 的更新(比如修改标签)和删除逻辑,并同步更新状态到 DeltaLakeRequeststatus 字段,以便 API 可以查询供给进度。
其次,错误处理和反馈机制需要增强。当 Crossplane 在供给下游云资源时出错(例如,AWS API 限流、权限不足),这种状态需要被传递回我们的 DeltaLakeRequest 资源,并通过 API 暴露给最终用户。
最后,虽然 Crossplane 支持多云,我们的 Composition 目前是硬编码为 AWS 的。未来的迭代路径是创建针对 GCP 或 Azure 的 Composition,并让用户在请求时通过参数选择目标云厂商,真正实现平台层的多云抽象。这个架构为这种扩展提供了坚实的基础。


  目录