1、首先下载安装logstash
https://www.elastic.co/guide/en/logstash/current/installing-logstash.html
2、安装Logstash JDBC Input插件
Logstash需要使用JDBC Input插件从MySQL中读取数据。你可以通过以下命令安装插件:
bin/logstash-plugin install logstash-input-jdbc
3. 下载MySQL JDBC驱动
下载MySQL JDBC驱动并将其放在Logstash的lib目录下。你可以从MySQL官方网站下载驱动。
4. 配置Logstash
创建一个Logstash配置文件(例如mysql_to_es.conf),内容如下:
input { jdbc { jdbc_driver_library => "/path/to/mysql-connector-java-x.x.xx-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database" jdbc_user => "your_username" jdbc_password => "your_password" schedule => "* * * * *" # 每分钟运行一次 statement => "SELECT * FROM your_table WHERE updated_at > :sql_last_value" use_column_value => true tracking_column => "updated_at" tracking_column_type => "timestamp" last_run_metadata_path => "/path/to/.logstash_jdbc_last_run" } } output { elasticsearch { hosts => ["http://localhost:9200"] index => "your_index" document_id => "%{id}" } stdout { codec => json_lines } }
配置说明:
jdbc_driver_library: MySQL JDBC驱动的路径。jdbc_driver_class: JDBC驱动类。jdbc_connection_string: MySQL数据库连接字符串。jdbc_user: 数据库用户名。jdbc_password: 数据库密码。schedule: 调度频率,这里设置为每分钟运行一次。statement: SQL查询语句,用于从MySQL读取数据。updated_at是用来跟踪数据变化的时间戳字段。use_column_value、tracking_column、tracking_column_type: 用于跟踪数据变化。last_run_metadata_path: 保存上次运行时间的文件路径。hosts: Elasticsearch的地址。index: Elasticsearch索引名称。document_id: Elasticsearch文档ID,使用MySQL表中的id字段。
5. 运行Logstash
在终端中运行Logstash并指定配置文件:
bin/logstash -f /path/to/mysql_to_es.conf
Logstash会根据配置文件中的调度频率,从MySQL中读取数据并将其同步到Elasticsearch中。
6. 验证数据
在Elasticsearch中查询数据,确保数据已经成功同步:
curl -X GET "localhost:9200/your_index/_search?pretty"
网友回复