09-Pub/Sub

发布时间:2023年12月18日

1 Channel和Subscription

  • 关于Channel
    • Eventing中的Channel CRD负责定义名称空间级别的消息总线
    • 它的后端要基于特定的实现,如In-Memory Channel(简称imc)、NATS Channel或Kafka Channel等
    • 每个Channel应该对应于一个特定Topic
    • 通常,Channels and Subscriptions消息投递模式中才需要自行创建Channel
      • Sources to Sink模式不需要Channel
      • Brokers and Triggers无须自行配置Channel
  • 关于Subscription
    • Eventing中的Subscription CRD负责将Sink(例如Service或KService)连接至一个Channel之上;
    • 何时需要自行创建Subscription
      • Sources to Sink模式不需要Subscription,因为没有Channel可以订阅
      • ◆Channels and Subscriptions消息投递模式,需要创建订阅至Channel的Subscription
      • Brokers and Triggers消息投递模式,需要创建订阅至Trigger的Subscription
1.1 Channel和Subscription实践
  • 示例环境说明

    • 基于imc的channel/imc01作为消息总线
    • kservice/event-display订阅channel/imc01
    • curl命令作为event source,基于HTTP协议推送消息至channel/imc01

    在这里插入图片描述

  • 具体步骤

    • 创建一个channel

      kn channel create imc01 --type messaging.knative.dev:v1:InMemoryChannel
      
      apiVersion: messaging.knative.dev/v1
      kind: Channel
      metadata:
        name: imc01
      spec:
        channelTemplate:
          apiVersion: messaging.knative.dev/v1
          kind: InMemoryChannel
      

      查看channel

      kn channel list
      

      在这里插入图片描述

    • 创建2个Sink: kservice/event-display01/kservice/event-display02

      kn service create event-display01 --image ikubernetes/event_display --port 8080 --scale-min 1
      kn service create event-display02 --image ikubernetes/event_display --port 8080 --scale-min 1
      
      ---
      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display01
      spec:
        template:
          metadata:
            annotations:
              autoscaling.knative.dev/min-scale: "1"
          spec:
            containers:
              - image: ikubernetes/event_display
                ports:
                  - containerPort: 8080
      ---
      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display02
      spec:
        template:
          metadata:
            annotations:
              autoscaling.knative.dev/min-scale: "1"
          spec:
            containers:
              - image: ikubernetes/event_display
                ports:
                  - containerPort: 8080
      
    • 创建subscription: 负责连接channel和sink。这里模拟使用同一channel,创建多个subscription,验证分发效果

       # /sub01负责连接kservice/event-display01至channel/imc01
       kn subscription create sub01 --channel imc01 --sink ksvc:event-display01 
       #/sub02负责连接kservice/event-display02至channel/imc01
       kn subscription create sub02 --channel imc01 --sink ksvc:event-display02
      
      apiVersion: messaging.knative.dev/v1
      kind: Subscription
      metadata:
        name: subscription1
      spec:
        channel:
          apiVersion: messaging.knative.dev/v1
          kind: Channel
          name: imc01
        subscriber:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display01
      ---
      apiVersion: messaging.knative.dev/v1
      kind: Subscription
      metadata:
        name: subscription2
      spec:
        channel:
          apiVersion: messaging.knative.dev/v1
          kind: Channel
          name: imc01
        subscriber:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display02
      

      查看subscription:

      在这里插入图片描述

    • 验证

      • 创建一个客户端Pod,使用curl命令基于HTTP协议推送event

        kubectl run client-$RANDOM --image=ikubernetes/admin-box:v1.2 --restart=Never --rm -it --command -- /bin/bash
        
      • 进入测试的pod里面,向channel/imc01的URL发起事件推送请求

         curl -v "http://imc01-kn-channel.default.svc.cluster.local" -X POST -H "Content-Type: application/cloudevents+json" \
        -d '{"id": "say-hello", "specversion": "1.0", "type": "com.icloud2native.sayhi", "source": "sendoff", "data": {"msg":"Hello Knative imc01 Channel"}}'
        

        在这里插入图片描述

      • 在sink1: event-display01查看log,能收到相关的event

        kubectl logs -f event-display01-00001-deployment-64cd5c8866-48t4m
        

        在这里插入图片描述

      • 在sink2: event-display02查看log,能收到相关的event

        kubectl logs -f event-display02-00001-deployment-664ccfc45d-k96zz
        

        在这里插入图片描述

2 Broker/Triger

2.1 Message Broker
  • Broker

    • 承载消息队列的组件,它从生产者接收消息,并根据消息交换规则将其交换至相应的队列(或Topic)
      • 生产者通过特定的协议将Message投递至Broker
    • 然后,通过队列(或Topic),将消息传递给消费者
    • Kafka、RabbitMQ、ActiveMQ和RocketMQ是较为常见的代表产品

    在这里插入图片描述

  • 消息代理模式

    • Point-to-point messaging

      • 消息的发送者与接收者之间存在“一对一”的关系,队列中的每条消息只发送一个接收者,并且只能被消费一次
      • 适合消息仅能被处理一次的场景,例工资单处理、金融交易处理等
    • Publish/subscribe messaging

      • 即“发布/订阅”模式,每条消息的生产者将消息发布到一个主题(Topic),多个消费者可以访问他们希望从中接收消息的Topic
      • 发布到Topic的所有消息,都会分发给订阅该Topic的消费者
        • Kafka的Topic内部由一到多个队列(Queue)组成,这些内部队列称为Partition
        • 消费者也可以在Partition级别订阅
      • 广播式分发机制,消息的发布者与消息的消费者之间存在“一对多”的关系

      在这里插入图片描述

2.2 Knative的Broker/Trigger 消息传递框架
  • Broker

    • Knative Eventing提供的CRD,负责收集CloudEvents类型的事件
    • Broker对象会提供一个用于事件传入的入口端点,各生产者可以调用该入口将事件发往Broker
    • 将事件投递至目的地的任务则由Trigger资源负责
    • Trigger基于属性过滤事件,并将筛选出的的事件投递给订阅该Trigger的Subscriber
    • Subscriber还可生成响应事件,并将这些新生成的事件传入Broker

    在这里插入图片描述

2.3 knative Broker
  • Knative Eventing支持以下几种类型的Broker

    • 基于Channel的多租户Broker (Multi-tenant channel-based broker,简称为MT-Channel-based Broker)

      • 基于Channel进行事件路由
      • 需要部署至少一种Channel的实现
        • InMemoryChannel:可用于开发和测试目的,但不为生产环境提供适当的事件交付保证
        • KafkaChannel:提供生产环境所需的事件交付保证
    • 其它的可用的Broker类型

      • Apache Kafka Broker
      • RabbitMQ Broker
      • GCP Broker

      在这里插入图片描述

  • Knative Serving在名称空间级别提供了一个名为default的默认Broker,但使用前需要通过某种方式先行完成创建

  • 创建默认Broker的方法

    • 命令式命令,或使用配置文件

      kn broker create default --namespace NS_NAME
      
    • 在Trigger资源上使用特定的Annotation自动创建

      • eventing.knative.dev/injection=enabled
    • 在名称空间上添加特定的Label自动创建

      • eventing.knative.dev/injection=enabled
  • 删除默认的Broker

    • 第一种方法创建的默认Broker可直接进行删除
    • 后面两种是通过Injection的方式进行的资源创建,这类资源需要由管理员手动才能完成删除
2.4 Broker/Trigger 实践
  • 示例环境说明

    • 基于MT-Channel-based的Broker
    • Trigger1过滤“type=sayhi”类的事件
      • Sink为ksvc/event-display-hi
    • Triiger2过滤“type=saybye”类的事件
      • Sink为ksvc/event-display-bye
    • curl命令作为event source
      • 基于HTTP协议推送消息至broker
      • Trigger基于类型过滤事件并完成分发

    在这里插入图片描述

  • 具体步骤

    • 创建两个ksvc作为sink: event-display-hi/event-display-bye

      $ kn service create event-display-hi --image ikubernetes/event_display --port 8080 --scale-min 1
      $ kn service create event-display-bye --image ikubernetes/event_display --port 8080 --scale-min 1
      

      在这里插入图片描述

      ---
      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display-hi
      spec:
        template:
          metadata:
            annotations:
              autoscaling.knative.dev/min-scale: "1"
          spec:
            containers:
              - image: ikubernetes/event_display
                ports:
                  - containerPort: 8080
      ---
      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display-bye
      spec:
        template:
          metadata:
            annotations:
              autoscaling.knative.dev/min-scale: "1"
          spec:
            containers:
              - image: ikubernetes/event_display
                ports:
                  - containerPort: 8080
      
    • 创建broker

      $ kn broker create default --class MTChannelBasedBroker
      # 其中的“--class”选项在使用默认的Broker类时,可以省略
      
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      metadata:
       name: default
      

      在这里插入图片描述

    • 创建trigger:两个trigger分别过滤了不同类型的事件

      kn trigger create trigger1 --broker default --sink ksvc:event-display-hi --filter type=com.icloud2native.sayhi
       kn trigger create trigger2 --broker default --sink ksvc:event-display-bye --filter type=com.icloud2native.saybye
      
      ---
      apiVersion: eventing.knative.dev/v1
      kind: Trigger
      metadata:
        name: trigger1
      spec:
        broker: default
        filter:
          attributes:
            type: com.icloud2native.sayhi
        subscriber:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display-hi
      ---
      apiVersion: eventing.knative.dev/v1
      kind: Trigger
      metadata:
        name: trigger2
      spec:
        broker: default
        filter:
          attributes:
            type: com.icloud2native.saybye
        subscriber:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display-bye
      

      列出两个trigger

      kn trigger list
      

      在这里插入图片描述

    • 测试验证:event-display-hi的sink只收到hi的event,event-display-bye的sink只收到bye的event

      • 创建一个客户端Pod,使用curl命令基于HTTP协议推送event

         kubectl run client-$RANDOM --image=ikubernetes/admin-box:v1.2 --restart=Never --rm -it --command -- /bin/bash
        
      • 在启动的客户端Pod中,向Broker/default的URL发起事件推送事件,sayhi和saybye类型的事件都要推送

         curl -v "http://broker-ingress.knative-eventing.svc.cluster.local/default/default" -X POST -H "Content-Type: application/cloudevents+json" \
        -d '{"id": "say-hi", "specversion": "1.0", "type": "com.icloud2native.sayhi", "source": "sendoff", "data": {"msg":"Hello Knative default Broker Say HI"}}'
        
        curl -v "http://broker-ingress.knative-eventing.svc.cluster.local/default/default" -X POST -H "Content-Type: application/cloudevents+json" \
        -d '{"id": "say-bye", "specversion": "1.0", "type": "com.icloud2native.saybye", "source": "sendoff", "data": {"msg":"Hello Knative default Broker Say BYE"}}'
        

        在这里插入图片描述

      • 获取event-display-hi中的日志信息,验证是否仅存在sayhi类型的事件

        在这里插入图片描述

      • 获取event-display-bye中的日志信息,验证是否仅存在saybye类型的事件

        在这里插入图片描述

文章来源:https://blog.csdn.net/weixin_38753143/article/details/135069016
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。