+
95
-

回答

1、首先下载安装logstash

https://www.elastic.co/guide/en/logstash/current/installing-logstash.html

2、安装Logstash JDBC Input插件

Logstash需要使用JDBC Input插件从MySQL中读取数据。你可以通过以下命令安装插件:

bin/logstash-plugin install logstash-input-jdbc

3. 下载MySQL JDBC驱动

下载MySQL JDBC驱动并将其放在Logstash的lib目录下。你可以从MySQL官方网站下载驱动。

4. 配置Logstash

创建一个Logstash配置文件(例如mysql_to_es.conf),内容如下:

input {
  jdbc {
    jdbc_driver_library => "/path/to/mysql-connector-java-x.x.xx-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database"
    jdbc_user => "your_username"
    jdbc_password => "your_password"
    schedule => "* * * * *"  # 每分钟运行一次
    statement => "SELECT * FROM your_table WHERE updated_at > :sql_last_value"
    use_column_value => true
    tracking_column => "updated_at"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/path/to/.logstash_jdbc_last_run"
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "your_index"
    document_id => "%{id}"
  }
  stdout { codec => json_lines }
}

配置说明:

jdbc_driver_library: MySQL JDBC驱动的路径。jdbc_driver_class: JDBC驱动类。jdbc_connection_string: MySQL数据库连接字符串。jdbc_user: 数据库用户名。jdbc_password: 数据库密码。schedule: 调度频率,这里设置为每分钟运行一次。statement: SQL查询语句,用于从MySQL读取数据。updated_at是用来跟踪数据变化的时间戳字段。use_column_value、tracking_column、tracking_column_type: 用于跟踪数据变化。last_run_metadata_path: 保存上次运行时间的文件路径。hosts: Elasticsearch的地址。index: Elasticsearch索引名称。document_id: Elasticsearch文档ID,使用MySQL表中的id字段。

5. 运行Logstash

在终端中运行Logstash并指定配置文件:

bin/logstash -f /path/to/mysql_to_es.conf

Logstash会根据配置文件中的调度频率,从MySQL中读取数据并将其同步到Elasticsearch中。

6. 验证数据

在Elasticsearch中查询数据,确保数据已经成功同步:

curl -X GET "localhost:9200/your_index/_search?pretty"

网友回复

我知道答案,我要回答