Spring Cloud Stream

在学习尚硅谷Spring Cloud的视频中,发现Spring官网对于Spring Cloud Stream文档最老都是3.1.6,且3.1版本后的Spring Cloud Stream与视频中的2.x版本发生了较大的更改,官网也不再建议使用2.x版本。另外尚硅谷视频的老师在讲这个知识点时感觉讲的很不清晰,很多时候都是走马观花。因此,笔者打算自己整理一版Spring Cloud 3.x的文档。

文档的整理过程也是个人学习的过程,鉴于笔者水平有限,文档可能存在很多问题,望大家多多指正。

文档参考了很多其他人的博客和笔记,笔者在文章的最后都有标出,如有侵权,烦请告知。

1. 为什么要有Spring Cloud Stream

很自然的第一个问题:为什么要有Spring Cloud Stream?

我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套API,无需关心具体的消息队列实现”。

这样理解是有些不全面的,Spring Cloud Stream的核心是Stream,准确来讲Spring Cloud Stream提供了一整套数据流走向(流向)的API, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理

我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。

image-20221002231325892

我们很容易总结出每个模块的流程:

  1. 从上一个模块拉取数据
  2. 处理数据
  3. 将处理完成的数据发给下一个模块

其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。但由于中间件的不同,需要使用不同的API。

为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream。

Spring Cloud Stream想让我们不关心如何获取数据,如何发送数据,而只专心处理自己的业务。还拿上面的例子来说,假设你现在负责的是系统里的模块3,它的功能是将模块2传来的字符串全部转成大写,然后再将这个转化后的字符串发给模块4,也即:

public String handle(String source){
    return source.toUpperCase();
}

其中方法的的参数String source就是模块2传给你的数据,方法的返回就是你要给模块4发送的数据。

如果你使用Spring Cloud Stream来开发,我们的模块其实基本完成了(配置文件回来再说),至于你以前开发时关心的要如何操作中间件API从模块2拉取数据,拉取完成后要怎么解码(反序列化,将字节流转成Java 对象),处理完成后又要如何操作中间件API将数据写给模块4,写的时候还得将你返回的对象转成字节流写出(序列化)等,这些你都不用再关心,Spring Cloud Stream帮你做了。

看到这可能很多同学会问:你上面举得这个例子不还是在说Spring Cloud Stream屏蔽了消息队列的差异吗?

Spring Cloud Stream屏蔽的是一些中间件对于数据流入和数据流出的差异,消息队列自然是属于这种中间件的,也是最常用的。除此以外Spring Cloud Stream还支持其他中间件如Amazon Kinesis,它就不是消息队列。

2. 一些概念

要理解和使用Spring Cloud Stream需要先明白Spring Cloud Stream提出的一些概念。

2.1 Binder

什么是Binder?一句话概括就是具体中间件的统一抽象。一个kafka中间件在Spring Cloud Stream里是一个Binder,一个rabbitMQ中间件也是一个Binder。官方文档中写道:当你引入spring-cloud-stream依赖的时候,Spring Cloud Stream就会为你的那个中间件生成一个Binder实例,你就可以通过这个Binder实例来和这个消息中间件通信(收发数据)。

很容易得出结论,Spring Cloud Stream对底层中间件的差异屏蔽都是基于我们的Binder,Binder适配了不同的消息中间件(官方文档中写道:Spring Cloud Stream为kafka和rabbitMQ提供了Binder的实现了)。

2.2 Binding

Binding是个比较抽象的概念,我们这里还拿之前的例子来说:

public String handle(String source){
    return source.toUpperCase();
}

这是你写的模块3中的业务代码,我们假设你与模块2交互使用的是中间件kafka和与模块4交互使用的是中间件rabbitMQ。也即你的模块的功能就变为了从kafka中获取数据,将获取的字符串数据全转为大写并写出给rabbitMQ。

很明显,这里有两个Binder,一个kafka Binder一个rabbitMQ Binder。而你这个业务处理函数其实也有两个功能:接收中间件的输入和将返回数据输出。再结合Binder,我们可以理解为:

  1. 函数接收kafka Binder中的输入
  2. 函数将返回结果写出给rabbitMQ binder。

但是如何表示这种关系呢?也即你现在写了一个函数,怎么表示这个函数的参数是从kafka入的,函数的返回是向rabbitMQ输出的呢?这就需要Binding。

Binding其实就是一座桥,桥的一头是Binder,另一头是你的业务处理函数。Bindings将外部消息中间件与你的业务处理代码连接在了一起(官方原话是:外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由Binder创建))。

了解了这些其实也就了解Spring Cloud Stream的架构图,Spring Cloud Stream官网中有一张图讲了它的架构:

SCSt with binder

首先最底层的Middleware是中间件,我们的kafka,rabbitMQ都属于中间件。上一层的Binder已经讲了,是对中间件的一层抽象和封装。再上一层的inputs和outputs其实就是Bindings,我们与Binder的交互就是通过Binding,其中写出数据就是output,而获取数据就是input。再上层的Application Core就是我们自己的业务代码,可以看到我们的业务代码通过Binding(input、output)与Binder交互,而Binder又负责和具体中间件交互。

3. 函数式接口

Spring Cloud Stream 2.x与Spring Cloud Stream 3.x最大的不同就是2.x是基于注解的,而3.x是基于函数式编程的。

还拿上面的例子来说:对于你开发的一个模块而言,它无非三种情况:

  1. 从上一个模块获取数据,将这个数据转发到下一个模块
  2. 从上一个模块获取数据,自己处理完后不再将这个数据发给别的模块
  3. 不需要从别处获取数据,自己就是数据源,将自己的数据发送到下一个模块。

这三种模式其实就对应Java 8函数式编程中的三个接口:FunctionConsumerSupplier(不了解这三个接口的可自行搜索相关资料,关键字:Java 8;函数式接口)。

现在我们还来模拟之前的系统,首先模块1是系统的入口模块,不需要其他模块提供数据源,换言之它是个生产者,那么模块1就可以使用接口Supplier(只有返回没有入参)。我们假设模块1的功能是生成字符串,那模块1的代码可以写为:

public Supplier<String> produceStr(){
    return () -> "hello spring cloud stream";
}

模块2会消费模块1的字符串,并将它全部转为大写,然后再将转化后的字符串写出。很明显模块2既是生产者也是消费者,那模块2就可以使用接口Function(既有返回也有入参)。模块2的代码为:

public Function<String,String> upperCase(){
    return String::toUpperCase;
}

模块3会消费模块2的字符串,并将它直接打印到控制台,且模块3不再将字符串写出,很明显模块3只是一个消费者,那模块3就可以使用接口Consumer(只有入参没有返回)。模块3的代码为:

public Consumer<String> log(){
    return System.out::println;
}

其中上例中的String::toUpperCaseSystem.out::println为Java 8的方法引用(不了解的可自行搜索相关资料,关键字:Java8;方法引用)。

可以看到,我们将自己的业务处理都封装成了一个函数式接口,并作为一个函数的返回。

在实际的开发中上面的那些函数都会被标上@Bean注解,注入到Spring容器,也即:

@Bean
public Supplier<String> produceStr(){
    return () -> "hello spring cloud stream";
}

@Bean
public Function<String,String> upperCase(){
    return String::toUpperCase;
}

@Bean
public Consumer<String> log(){
    return System.out::println;
}

我们知道这代表向Spring中注入一个Bean,其中Bean的名字就是函数名,而Bean本身就是函数的返回也即我们将自己的业务处理逻辑包装成一个对象(函数式接口)注入到了Spring IOC中。

现在假设Binder收到了一条数据,那它会寻找Binding,而Binding是一个桥梁,它会连接一个我们的处理函数,处理函数其实就是这里的Bean,Binder拿到Bean后,自然就会调用Bean的处理函数来处理(因为是函数式接口)。

如果用一张图来描述的话,大概就是这样:

image-20221004193943143

这里主要讲的就是我们之前的业务处理被函数式接口包装成了对象,包装成对象后就可以注入到Spring IOC中,这样的一个Bean对象就可以对应一个Binding,通过Binding与Binder交互。

4. 案例

说了那么多,还是没讲怎么使用。我们不妨还以文档一开始的那个例子来作为编码案例:

现在的需求如下:

模块1生产字符串,并将字符串写出到kafka

模块2消费模块1的字符串,并将字符串转为大写,输出到rabbitMQ

模块3消费模块2的字符串,并将字符串打印到控制台。

也即:

image-20221004194535980

为了项目的简洁,我们将上述模块1、模块2和模块3写在一个项目中。

本文使用的SpringBoot版本是2.6.3,使用的Spring Cloud版本是2021.0.1。这里需要注意以下,如果你使用的是尚硅谷视频里老师讲的SpringBoot2.2.2.RELEASESpring Cloud Hoxton.SR1是无法复现本例的代码的,因为官方是在3.1版本后废弃了使用注解的方案,转而推荐使用Java函数模式的方式。尚硅谷视频中老师使用的版本Spring Cloud Stream是3.0.1版本,相比3.1版本后的API发生了较大变化,所以如果大家想动手实验本文中的案例,最好与笔者配置的环境一致。

在开始创建项目前,我们默认大家都是已经配置好了自己的kafka与rabbitMQ并已经正常启动了。

4.1 maven依赖

我们需要的依赖并不多,其实只需要rabbit和kafka的依赖,整个项目的maven配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.coderzoe</groupId>
    <artifactId>logging-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>logging-consumer</name>
    <description>logging-consumer</description>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
    </parent>

    <dependencyManagement>

        <dependencies>

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2021.0.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

        </dependencies>

    </dependencyManagement>

    <dependencies>
        <!-- 主要依赖 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

    </dependencies>

</project>

4.2 配置文件之Binder

之前已经讲了一个kafka实例或者rabbitMQ实例其实就是一个binder,那你现在有了一个kafka,要如何告诉Spring Cloud呢?最简单的就是通过配置文件,配置文件配置Binder的思想很简单,就是告诉Spring Cloud Stream,我要创建一个Binder,这个Binder的类型是kafka或者rabbitMQ,然后它的IP,端口都是啥以及用户名密码等都是啥就好了。

我们先以kafka为例,配置kafka为Binder有两种方式:

spring:
  cloud:
    stream:
      kafka:
        binder:
          # kafka的Ip和端口,可以是集群
          brokers: ip:port

或者:

spring:
  cloud:
    stream:
      binders:
        # 你的binder名字,自己随意取,我取的名字叫myKafka
        myKafka:
          # 你的binder类型,我们这里类型是kafka
          type: kafka
          # 下面的环境配置与上面的一模一样
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      # kafka的Ip和端口,可以是集群
                      brokers: ip:port

很明显,第二种比第一种更复杂,你如果只有一个kafka实例,那直接用第一种就可以了,但如果你的项目中有多个kafka实例,比如项目2和项目1之间用的是kafka,项目2和项目3间也用的kafka,这两个kafka又是不是同一套kafka。

所以,第二种配置可以配备多个kafka实例,如:

spring:
  cloud:
    stream:
      binders:
        myKafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip1:port1
        myKafka2:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip2:port2

另外,如果你有多个kafka实例,但使用第一种方式下配备的属性信息会被这多个kafka实例共享,如:

spring:
  cloud:
    stream:
      kafka:
        binder:
          configuration:
            security.protocol: SASL_PLAINTEXT
            sasl.mechanism: PLAIN
      binders:
        myKafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip1:port1
                      configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
        myKafka2:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip2:port2
                      configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"

我们现在有两个kafka实例myKafka1和myKafka2,但我们在一开头配置了security.protocol = SASL_PLAINTEXTsasl.mechanism = PLAIN,这是kafka的安全配置,这个配置信息会被myKafka1和myKafka2都具备。也即在一开始的这些配置会被每个kafka实例都具有,因此一些公共的配置可以放在一开始。

同理rabbitMQ的配置也有两种:

spring:
  rabbitmq:
    host: 你的rabbitMQ的IP
    port: 你的rabbitMQ的端口
    username: 用户名
    password: 密码
spring:
  cloud:
    stream:
      binders:
        myRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 你的rabbitMQ的IP
                port: 你的rabbitMQ的端口
                username: 用户名
                password: 密码

是的,如果你只有一个rabbitMQ实例可以使用第一种,但如果有多个,就得使用第二种,它与kafka配置的思路一模一样,这里不再赘述。

虽然我们只有一个kafka实例和一个rabitMQ实例,但笔者依然采取了第二种配置文件,一则是考虑到以后实例增多改动比较小的可能,二则是第二种配置笔者认为更清晰。项目对于Binder的配置全部信息为:

spring:
  cloud:
    stream:
      binders:
        myRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 你的rabbitMQ的IP
                port: 你的rabbitMQ的端口
                username: 用户名
                password: 密码
        myKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip:port

关于rabbitMQ与kafka更详细的配置,如自动提交,ACK等信息可以参考Spring官网,本文不再列出。

4.3 编写自己的业务代码

配置完Binder就代表你已经具备和外部消息中间件通信的能力了,现在你可以写自己的业务代码了:

package com.coderzoe.loggingconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * @author coderZoe
 */
@SpringBootApplication
public class LoggingConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

    /**
     * 模块1 生产字符串
     */
    @Bean
    public Supplier<String> produceStr(){
        return () -> "hello spring cloud stream";
    }

    /**
     * 模块2,将生产的字符串转为大写
     */
    @Bean
    public Function<String,String> upperCase(){
        return String::toUpperCase;
    }

    /**
     * 模块3 将字符串打印
     */
    @Bean
    public Consumer<String> log(){
        return System.out::println;
    }
}

由于项目代码不是很多,我将Bean的注入就写到了启动类中,实际上上面这段代码就是整个项目的所有代码了。

写完这些Bean后,我们还需要将它写到配置文件,告诉Spring Cloud,这些都是用于函数处理的Bean:

spring:
  cloud:
    function:
      definition: produceStr;upperCase;log

4.4 配置文件之Bindings

现在我们有了Binder,也有了处理业务函数,肯定还差一个Binding,将Binder与业务处理联系起来。联系起来的方法很简单,就是通过配置文件来配置。

在讲配置文件前,我们先讲Binding的名称规范。Binding的命名是:<functionName>-in/out-<index>

比如:

@Bean
public Supplier<String> produceStr(){
    return () -> "hello spring cloud stream";
}

它的Binding名字就是produceStr-out-0。其中produceStr是函数名(也是Bean名),out代表这个Binding是向外写出的,而index是输入或输出绑定的索引。对于典型的单个输入/输出函数,它始终为 0,因此它仅与具有多个输入和输出参数的函数相关(一个函数被多次作为输出/输出,比如这个函数被kafka和rabbitMQ都作为输出,那就是一个index0一个index1)。

再比如:

@Bean
public Function<String,String> upperCase(){
    return String::toUpperCase;
}

它对应两个Binding,因为它既是输入又是输出(从kafka入数据,向rabbitMQ出数据),它们的名字是:upperCase-in-0upperCase-out-0

可以看到,我们通过名字就将Binding和处理函数做了关联。关联了Binding与处理函数,还需要关联Binding与Binder,它的配置写法如下:

spring:
  application:
  cloud:
    stream:
      bindings:
        produceStr-out-0:
          binder: myKafka

通过在你的Binding中指明使用的是哪个binder就可以了。

这样我们配置好了Binding,项目的Bindings完整配置如下:

spring:
  cloud:
    stream:
      bindings:
        produceStr-out-0:
          binder: myKafka
          destination: topic1
        upperCase-in-0:
          binder: myKafka
          group: group1
          destination: topic1
        upperCase-out-0:
          destination: topic2
          binder: myRabbit
        log-in-0:
          binder: myRabbit
          group: group1
          destination: topic2

这里在配置Binding的时候比上面多了group和destination两个属性,其中group是消费组的意思,而destination是主题(topic)。如果你不了解这两个概念,我建议你查阅一下kafka的相关资料。

4.5 主动发送消息

这样其实我们就完成了整个项目,启动项目你会发现:控制台会不断的打印HELLO SPRING CLOUD STREAM

image-20221004212533640

但这个消息我们是被动发送的,因为Binder调用我们的produceStr-out-0这个Binding来不断的发送消息。很多时候我们是希望主动的发送消息的,比如处理完一条用户请求后,将处理结果发送出去。Spring Cloud Stream主动发送消息借助于StreamBridge,它的用法如下:

@Service
public class SendService {
    private StreamBridge streamBridge;

    public void send(String message){
        streamBridge.send("upperCase-in-0",message);
    }
    @Autowired
    public void setStreamBridge(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }
}

可以看到,就是使用StreamBridge给一个in的Binding发送消息。

4.6 代码地址

本文代码已经上传至github,大家可以自行参阅:https://github.com/coderZoe/spring-cloud-stream.git

5. 一些补充

5.1 Message

我们刚才的文档一直在以字符串作为消息传递的数据,实际上消息传递的准确对象是org.springframework.messaging.Message,这是一个接口:

public interface Message<T> {

   /**
    * Return the message payload.
    */
   T getPayload();

   /**
    * Return message headers for the message (never {@code null} but may be empty).
    */
   MessageHeaders getHeaders();

}

也即生产者和消费者交互的对象其实是Message,我们之前写的String只是在生产时被Spring Cloud Stream封装为了Message,而在消费时又从Message转为了String,因此我们其实完全可以这样写:

@Bean
public Supplier<Message<String>> produceStr(){
    return () -> MessageBuilder.withPayload("hello spring cloud stream").build();
}

但大部分场景下没有必要,还是那句话,因为Spring Cloud Stream会为我们自动“装箱”和“拆箱”。

另外,消息是支持发送Java对象的,比如:

public static class User{
    String name;
    int age;
    //省略 getter setter和toString
}
@Bean
public Supplier<User> produceUser(){
    return () -> new User("tom",18);
}

消费者可以写为:

@Bean
public Consumer<User> logUser(){
    return s-> System.out.println(s.name+"_"+s.age);
}

很明显,对象想要被发送需要被序列化,且想要被消费也需要被反序列化,在Spring Cloud Stream中默认的序列化是json。也即对象会被以application/json的形式发送出去。

这可以在配置文件中进行修改:

produceUser-out-0:
  binder: myKafka
  destination: topic1
  content-type: application/json

是不是发现和Spring MVC有点眼熟,是的,其实就是Spring MVC那一套。

5.2 消费组

我们在《4.4 配置文件之Bindings》中为消费的Binding配置了一个group,Spring Cloud Stream建议大家为每个消费者都显示声明一个消费组,因为这样可以保证“断点续传”的功能。比如你消费者挂了,如果指明了消费组,重启后可以从之前挂掉的地方继续消费,但如果没有指明消费组,Spring Cloud Stream会分配一个匿名的消费组,但每次启动这个名字可能都会变,这样可能会导致重启后重复消费。

5.3 其他

关于更多信息,如分区(partition)或者路由,以及kafka与rabbitMQ的一些定制化配置大家可以参考Spring Cloud官网。

另外,Spring Cloud官网给出了Spring Cloud Stream的一些实践案例:spring-cloud/spring-cloud-stream-samples: Samples for Spring Cloud Stream (github.com)

参考文档

Spring官网

SpringCloud-Stream3.x版本使用教程及如何整合rabbitmq_ShuSheng007的博客-CSDN博客_springcloud整合rabbitmq

Spring Cloud Stream 体系及原理介绍-阿里云开发者社区 (aliyun.com)

最后修改:2022 年 10 月 04 日
如果觉得我的文章对你有用,请随意赞赏