在生产环境中,不使用 Apache Kafka 等流平台进行数据迁移并不是一个好的做法。 在这篇文章中,我们将详细探讨 Apache Kafka 和 Logstash 的关系。
但首先让我们简单了解一下 Apache Kafka 的含义。 Apache Kafka 是分布式流平台,擅长实时数据集成和消息传递。
Kafka 架构不复杂且直接。 生产者将给定主题的数据发送到 Kafka Broker; Kafka 集群包含一个或多个 broker,用于存储从生产者接收到的消息,订阅 Kafka 主题的消费者将接收数据。 由于它是一个分布式平台,Zookeeper 有助于管理架构。
安装并运行 Kafka 实例是测试迁移的必要条件。如果你想了解更多关于 Kafka 的安装知识,请参考:
启动 Kafka 的过程围绕以下命令进行。
# To create topic in Kafka.
kafka-topics.bat --create --bootstrap-server localhost:9092 --topic <topic_name>
# To produce data or to ingest data using producer module.
kafka-console-producer.bat --broker-list localhost:9092 --topic <topic_name>
# To see the data in the topic.
kafka-console-consumer.bat --topic <topic_name> --bootstrap-server localhost:9092 --from-beginning
注意:在上面显示的命令是针对 Windows 系统的。针对 Linux 系统,这些命令变成了?kafka-console-consumer.sh。
使用下面的配置文件,我们可以使用 JDBC 驱动程序从任何数据库中提取数据,将数据迁移到 Kafka,然后使用 Logstash 从 Kafka 将数据迁移到 Elasticsearch。
我们在 “config” 文件夹中创建管道配置文件来定义 Logstash 的处理阶段。 Logstash 仅加载 “config” 目录中的 “.conf” 文件,而忽略其他文件。 基本配置包括输入、过滤器和输出插件。 输入插件读取源事件,过滤器插件处理事件,输出插件将数据发送到特定目的地
在下面的配置模板中,我们在输入部分使用了 JDBC 插件,在输出部分使用了 Elasticsearch 插件。
input
{
id => "jdbc_input"
# path to third party driver library
# replace it with where you placed the driver.
jdbc_driver_library => "/path/to/mysql-connector-java.jar"
# class to load
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
# Replace the JDBC connection string with your actual database details
jdbc_connection_string => "jdbc:<sqlserverip>://:<port>;databaseName=<DbName>;encrypt=true;trustServerCertificate=true;user=<username>;password=<password>;"
# database Credentials
# replace it with your own credentials.
jdbc_user => "<username>"
jdbc_password => "<password>"
statement => "<SQL STATEMENT>"
}
filter{}
output
{
kafka {
codec => json
# topic created within the Kafka.
topic_id => "mytopic"
bootstrap_servers => "localhost:9092"
}
}
input
{
kafka
{
## decoding the input data
codec => json
## URL of kafka instance to establish initial connection
bootstrap_servers => "<IP_Address>:<Kafka_Port>"
## topics to subscribe to
topics => ["<topic_name>"]
}
}
filter{}
output
{
elasticsearch
{
# index to write the data
index => "index_name"
# Set the host's of the remote instance
hosts => ["<IP_Address>:<Port>"]
}
}
# replace the conf_file_name
logstash.bat -f .\config\<conf_file_name>.conf
# replace the conf_file_name
bin/logstash -f ./config/<conf_file_name>.conf
更多关于如何在 Logstash 中配置 JDBC 驱动的文章,请阅读 “Logstash:如何使用 Logstash 和 JDBC 确保 Elasticsearch 与关系型数据库保持同步”。
在数据迁移领域,Logstash 遇到了它的完美伴侣 ---? Apache Kafka。 显然,他们的协同合作提供了强大的解决方案,确保将数据从不同来源无缝高效地传输到 Elasticsearch,从而为组织提供实时洞察力和敏捷性。