目录
4.4.2 需求二,统计topic1中10秒内的wordcount写到topic2
随着大数据技术的发展越来越成熟,大数据涉及的领域也越来越多,从以往的T+1到如今的实时处理,得益于底层技术的强大支撑,尤其是流式计算技术的发展让众多的业务场景价值得以深度挖掘,聊到流式计算,涌入入脑海中的Spark Streaming,Flink等,本文接下来将介绍另一种流式计算技术kafka stream。
Kafka Stream是一款开源、分布式和水平扩展的流处理平台,其在Apache Kafka之上进行构建,借助其高性能、可伸缩性和容错性,可以实现高效的流处理应用程序。
在处理流式计算的场景中,发展到今天出现了很多成熟的性能高效的技术框架,比如老牌的Apache Storm,大数据处理框架Spark Streaming,Flink等,而且像Spark 与flink都能与SQL紧密结合,集成便捷,功能也很强大,为何还需要kafka stream呢?
kafka可以说在很多互联网公司都有着广泛的使用,只要维护了kafka的环境,即可集成和使用kafka stream。
相比于部署spark,storm等这样的大数据处理框架需要的计算资源,部署kafka占用的服务器资源更少,而且维护起来也相对节省人力。
相比与spark和flink这样的大数据框架,kafka在日常的开发中接触和使用会更多,学习和上手成本会低很多。 ?
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。具有如下特点:
Kafka Stream提供了一个非常简单而轻量的Library,可以方便的嵌入任意Java应用中,也可以任意方式打包和部署;
充分利用Kafka分区机制实现水平扩展和顺序性保证;
提供记录级的处理能力,从而实现毫秒级的低延迟;
支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records),这点与spark和flink中的时间窗口处理机制很像;
提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce);
通过可容错的state store实现高效的状态操作(如windowed join和aggregation);
除了Kafka外,无任何外部依赖,且支持正好一次处理语义;
在kafka stream中,KStream和KTable是理解kafka stream时非常核心的两个概念。
KStream
KStream是一个数据流,是一段顺序的、可以无限长、不断更新的数据集,可以认为所有的记录都通过Insert only的方式插入进这个数据流中。
KTable
KTable代表一个完整的数据集,可对照mysql理解为数据库中的表。每条记录都有KV键值对,key可理解为数据库中的主键,是唯一的,而value代表一条记录,记录通常是一段可序列化的字符串。可以认为KTable中的数据时通过Update only的方式进入的。如果是相同的key,会覆盖掉原来那条记录。
综上来说:
KStream是数据流,即不断传输过来的流式数据记录,以Insert only的方式不断插入;
KTable是数据集(逻辑概念),相同key的数据只保留最新的记录,也就是Update only;
Kafka Streams主要用于以下应用场景:
在开始使用kafka stream之前,先快速搭建起kafka的环境,参照下面的步骤快速部署kafka的环境。
docker network create zk-kafka --driver bridge
docker pull zookeeper:3.8.1
docker run -d --name zk-server -p 2181:2181 --network zk-kafka -e ALLOW_ANONYMOUS_LOGIN=yes zookeeper:3.8.1
?
下载地址:Apache Kafka,这里我使用 kafka_2.12-3.1.1.tgz
tar -zxvf kafka_2.12-3.1.1.tgz
cd kafka_2.12-3.1.1
mkdir logs
进到config目录下,找到server.properties配置文件,主要修改下面几个核心配置即可(覆盖原有的默认的配置参数)
broker.id=0
listeners=PLAINTEXT://云服务器内网IP:9092
zookeeper.connect=内外网均可,如果不对外暴露使用内网IP:2181
log.dirs=/usr/local/kafka/kafka_2.12-3.1.1/logs
advertised.listeners=PLAINTEXT://外网IP:9092
参数说明:
在主目录下,使用下面的命令启动kafka服务前台启动
./bin/kafka-server-start.sh ./config/server.properties
或者使用下面的命令后台启动
./bin/kafka-server-start.sh -daemon ./config/server.properties
kafka服务启动之后,接下来创建一个测试用的topic并测试是否能够正常生产和消费消息
使用下面的命令创建一个名为zcy的topic
bin/kafka-topics.sh --create --topic zcy --bootstrap-server 公网IP:9092
使用下面的命令,开启一个生产者的控制台窗口,并发送一条消息
bin/kafka-console-producer.sh --broker-list 公网IP:9092 --topic zcy
使用下面的命令,开启一个消费端的控制台窗口,检查是否能够正常消费消息
bin/kafka-console-consumer.sh --bootstrap-server 公网IP:9092 --topic zcy
或者
bin/kafka-console-consumer.sh --bootstrap-server 公网IP:9092 --topic zcy --from-beginning
引入kafka的客户端依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
编写如下的测试代码,向上述kafka的zcy这个topic中发送一条消息
public static void main(String[] args) throws Exception {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "公网IP:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
System.out.println("开始发送数据");
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("zcy","congge " + i));
}
// 5. 关闭资源
kafkaProducer.close();
}
运行上面的代码,运行成功后,可以看到上面的kafka的消费端的控制台正确接收到了5条消息
介绍了kafka stream的相关概念之后,接下来通过一些案例感受下如何使用
创建一个maven工程,引入如下依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
再创建另一个topic
bin/kafka-console-consumer.sh --bootstrap-server IP:9092 --topic zcy-out
使用kafka stream进行应用的业务开发,即相关的API使用,按照下面几步操作:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "定义本次实例名称,保持全局唯一");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka连接IP地址:9092");
//... 更多其他的属性可以点击到StreamsConfig配置类进行查看
StreamsBuilder builder = new StreamsBuilder();
KafkaStreams streams = new KafkaStreams(builder.build(), props);
参数说明:
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream")
指定本次流处理应用的唯一标识符;
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
指定连接的 Kafka 集群的地址;
StreamsBuilder builder = new StreamsBuilder()
创建 StreamsBuilder 实例,并用其构建 TOPOLOGY;
final String inputTopic = "topic-input";
final String outputTopic = "topic-output";
KStream<String, String> inputStream = builder.stream(inputTopic);
//从input-topic中拿到数据进行逻辑处理
KStream<String, String> outputStream = inputStream.mapValues(value -> value.toUpperCase());
//将处理后的数据输出到其他的topic中
outputStream.to(outputTopic);
streams.start();
以上几步可以说是Kafka Streams编程的一种固定的方法模板,需重点关注。
业务场景如下,从topic1中接收到消息,将消息内容转换为大写之后,输出到topic2
完整的代码如下:
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-convert-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> inputStream = builder.stream("zcy");
KStream<String, String> outputStream = inputStream
.mapValues(value -> value.toUpperCase());
outputStream.to("zcy-out", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
运行代码之前,我们将zcy-out这个topic的消费端的终端打开,便于看到程序中处理之后的结果
运行上面的程序,通过观察控制台日志可以发现当前处于等待接收消息输入的状态
由于之前zcy这个topic中已经有消息了,可以看到,经过程序的处理,窗口中能够获取到之前的消息,并且已经将消息转为大写了
此时通过生产端的控制台发送一条消息,然后再在zcy-out消息控制台中就能近乎实时看到被转换后的消息了
注意:如果实际业务中想适当节省计算资源,即不需要实时计算,而是间隔计算之后提交结果,可以通过设置下面的这个参数,即3秒提交一次结果
prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); //提交时间设置为3秒
业务场景如下,topic1接收外部消息,然后转发到topic2中
实际开发中,可能需要将原始的消息经过简单的处理之后发到另一个topic中,以供后面的业务使用,可以考虑使用下面这种方式
public class StreamCopy {
public static void main(String[] args) {
Properties prop =new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"copy-stream");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> inputStream = builder.stream("zcy");
inputStream.to("zcy-out", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), prop);
streams.start();
}
}
运行代码之后,仍然采用上面的方式做测试,在zcy的生产者窗口发送一条消息,可以看到zcy-out
中接收到相同的消息
需求场景如下,通过kafka stream将第一个topic中接收到的消息经过计算之后输出到topic2中
完整代码如下
public class KafkaStreamWordCount {
public static void main(String[] args) {
//kafka的配置
Properties prop = new Properties();
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-count");
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> stream = streamsBuilder.stream("zcy");
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//按照value进行聚合处理
.groupBy((key, value) -> value)
//时间窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//统计单词的个数
.count()
//转换为kStream
.toStream()
.map((key, value) -> {
System.out.println("key:" + key + " ,vlaue:" + value);
return new KeyValue<>(key.key().toString(), value.toString());
})
//发送消息
.to("zcy-out");
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);
kafkaStreams.start();
}
}
窗口函数在很多技术框架中都有着广泛的使用,比如spark,flink,hive,甚至在mysql8也开始支持窗口函数了,利用窗口函数可以对某个时间窗口内的数据进行统计、聚合和计算,接下来通过几个案例展示下在kafka stream中窗口函数的使用。
这里每隔3秒输出一次从topic1中过去10秒的数据到topic2中
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class WindowStream1 {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "WindowCountStream");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000);
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> source = builder.stream("zcy");
KTable<Windowed<String>, Long> countTable = source
.flatMapValues(value -> Arrays.asList(value.toString().split("\\s+")))
.map((x, y) -> {
return new KeyValue<String, String>(y, "1");
}).groupByKey()
//加10秒窗口,按步长3秒滑动
.windowedBy(TimeWindows.of(Duration.ofSeconds(10).toMillis()).advanceBy(Duration.ofSeconds(3).toMillis()))
.count();
countTable.toStream().foreach((key, val) -> {
System.out.println("key: " + key + " val: " + val);
});
countTable.toStream().map((key, val) -> {
return new KeyValue<String, String>(key.toString(), val.toString());
}).to("zcy-out");
final Topology topo = builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
streams.start();
}
}
运行代码,按照上述相同的方式测试,然后再在控制台可以看到统计到的时间窗口内的单词数
一个典型的场景就是,通过session会话的时间窗口统计用户访问网站的时长,对某个特定的用户来说,用户从登录开始,即该用户的窗口开始,直到发生退出或者会话超时,窗口期结束,可以统计在窗口期间发生的各种动作,比如点击某些按钮,浏览某个页面的时长等行为。
public class WindowStream2 {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "WindowCountStream");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> source = builder.stream("zcy");
KTable<Windowed<String>, Long> countTable = source.flatMapValues(value -> Arrays.asList(value.toString().split("\\s+")))
.map((x, y) -> {
return new KeyValue<>(y, "1");
}).groupByKey()
.windowedBy(SessionWindows.with(Duration.ofSeconds(15).toMillis()))
.count();
countTable.toStream().foreach((key, val) -> {
System.out.println("key: " + key + " val: " + val);
});
countTable
.toStream()
.map((key, val) -> {
return new KeyValue<String, String>(key.toString(), val.toString());
})
.to("zcy-out");
KafkaStreams streams = new KafkaStreams(builder.build(), prop);
streams.start();
}
}
假如现在需要对某系统中实时上报到topic-1的错误或告警日志进行转换,并输出到下游的topic-2中做大屏监控,如下为原始的从topic-1中获取到的日志数据格式
{
"timestamp" : "2023-12-11 23:25:13",
"method": "GET",
"endpoint": "http://10.1.63.112:9098/fox/message/get",
"status_code": 500,
"source_ip":"192.168.9.138",
"request_params":"type=5&status=3",
"operation_user":"6613"
}
假如下游的应用需要实时可视化用户请求日志,需要的数据格式如下:
{
"ope_time": "2023-12-11 23:25:13",
"ope_user": [
{"user_id": "6613", "source_ip": "192.168.9.138","endpoint":"http://10.1.63.112:9098/fox/message/get"}
]
}
如果使用Kafka Stream来处理,可以考虑下面的思路
根据业务需求对原始日志进行聚合和转换,重新组装结果的格式,并将结果写到下游的topic中;
下游应用从topic中获取处理的结果,按照大屏的数据格式再次组装数据,最后展示到大屏;
比如某电商网站或app的后台需要统计用户某些指标的数据,从而分析用户的消费习惯为后续做促销提供数据决策依据,现在从原始的topic中可以拿到下面几类指标信息
{
"enter_type": app,
"online_time": 16m,
"user_type": "level_1" ,
"buy_time_in_month":2,
"user_id":1003
}
有了这些信息,就可以计算某种类型的用户,在过去一年内产生在app或网站来浏览的时长,购买的总次数,如果需要汇聚更多的信息,可以要求上游的topic中传入更详细的参数。
kafka stream可以作为简单的实时计算框架,对数据进行准实时的聚合统计,快速汇总计算数据按业务维度进行数据分发,承载一部分大数据实时计算的功能。
基于现有的数据模型进行相关的指标计算,预测某些指标的行为,进一步指导业务决策,比如上面统计电商网站中用户的网站浏览动作。
检测系统异常指标,通过准实时计算汇聚结果,将异常行为进行上报。
这个与消息中间件的作用类似,为了减少源系统的计算压力,可以通过kafka stream进行解耦,所有的计算动作在kafka stream中进行,然后再将计算结果推送到下游的topic进行后续的使用。
有了上面对kafka stream的了解和使用,接下来演示下如何在springboot中整合kafka stream
?在上述已经导入的依赖的基础上补充下面几个依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
在配置文件中添加如下配置信息
server:
port: 8088
spring:
application:
name: kafka-sream-app
kafka:
bootstrap-servers: kafka连接IP:9092
producer:
retries: 5
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
hosts: kafka连接IP:9092
group: ${spring.application.name}
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_id");
props.put(StreamsConfig.RETRIES_CONFIG, 5);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
还记得在编写消息中间件客户端程序的时候添加的那些监听器吗,原理类似,这里自定义一个监听器处理类,接收上游的topic消息进行处理之后再发送到下一个topic中,相当于是把上面的代码搬过来放到spring的ioc容器中
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Arrays;
@Configuration
public class StreamCountListener {
@Bean
public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
KStream<String, String> stream = streamsBuilder.stream("zcy");
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//按照value进行聚合处理
.groupBy((key, value) -> value)
//时间窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//统计单词的个数
.count()
//转换为kStream
.toStream()
.map((key, value) -> {
System.out.println("key:" + key + " ,vlaue:" + value);
return new KeyValue<>(key.key().toString(), value.toString());
})
//发送消息
.to("zcy-out");
return stream;
}
}
运行项目,运行之后,使用下面的代码,往zcy这个topic中发送一些消息
public static void main(String[] args) throws Exception {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
System.out.println("开始发送数据");
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("zcy","congge_" + i));
}
// 5. 关闭资源
kafkaProducer.close();
}
发送成功后,在控制台中可以看到经过上面的监听类处理得到的结果输出信息
本篇通过较大得篇幅详细分享了kafka stream的使用,流式计算可以说是当下非常火热的技术之一,对于非大数据场景下的业务处理,kafka stream提供了一种很好的解决思路,希望对看到的同学有所帮助,本篇到此介绍,感谢观看。