且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

使用Logstash实现mysql同步数据到ES——《我的Java打怪日记》

更新时间:2021-07-15 11:33:22

mysql 作为成熟稳定的数据持久化解决方案,广泛地应用在各种领域,但是有些时候,我们在做查询时,由于查询条件的多样、变化多端(比如根据时间查、根据名称模糊查、根据id查等等),或者查询的数据来自很多不同的库表或者系统,这时就很难以一个较快的速度(几百毫秒)直接获取我们想要的数据,而 elasticsearch 作为数据分析领域的佼佼者,刚好可以弥补这项不足,而我们要做的只需要将 mysql 中的数据同步到 elasticsearch 中即可,而 logstash 刚好就是一个同步神器,能够很好的满足我们的需求。

一:版本说明

logstash:7.6.1
elasticsearch:7.5.2
jdk:1.8
下载对应平台的压缩包,解压即可使用。

二:原理

使用logstash自带的logstash-inpu-jdbc插件,实现从mysql向es同步数据
官方插件使用说明:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

三:配置

使用 logstash-input-jdbc 插件读取 mysql 的数据,原理就是定时执行一个 sql,然后将 sql 执行的结果写入到流中;增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段.

input {
# 使用logstash-input-jdbc插件
  jdbc {
    # mysql连接驱动包
    jdbc_driver_library => "/data/www/logstash-7.6.1/mysql-connector-java-5.1.44.jar"
    # mysql连接驱动类
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # mysql jdbc连接串
    jdbc_connection_string => "jdbc:mysql://xxxxxxxxxxx"
    # 设置时区
    jdbc_default_timezone => "Asia/Shanghai"
    # 设置连接数据库的用户名和密码
    jdbc_user => "xxx"
    jdbc_password => "xxxxx"
    # 开启分页
    jdbc_paging_enabled => "true"
    # 开启分页后,每页数据条数大小
    jdbc_page_size => "1000"
    # 使用本地时区
    plugin_timezone => "local"
    # 开启大小写敏感
    lowercase_column_names => "false"
    # 同时时间间隔,分别对应”分 时 月 周 年“ 后面可以设置时区,也可不设,如果注释schedule,则只运行一次同步,进程及停止
    schedule => "*/5 * * * * Asia/Shanghai"
    # 读取数据库使用的SQL文件,其中有一个需要特别说明的关键字:sql_last_value,此值是读取last_run_metadata_path文件内记录的值
    statement_filepath => "/data/www/logstash-7.6.1/conf/xxxxxxx.sql"
    # 记录每次同步tracking_column_type追踪的值
    last_run_metadata_path => "/data/www/logstash-7.6.1/conf/xxxx_modify_time"
    # 开启记录最后同步的值
    record_last_run => "true"
    # 设置false为记录时间戳,true为其他字段的值
    use_column_value => "false"
    # 追踪字段的类型,可选timestamp、numeric
    tracking_column_type => timestamp
    # 具体跟踪的同步表字段
    tracking_column => "modify_time"
    # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    clean_run => "false"
  }
 
}
 
#filter部分对数据进行处理,此处只对@timestamp字段处理,设置为本地时间
filter {
 
    ruby {
      code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
    }
    ruby {
      code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
      remove_field => ["timestamp"]
    }
}
 
output {
#向es同步数据的相关配置
  elasticsearch {
    hosts => ["127.0.0.1:9200","127.0.0.1:9300"]
    #将数据中id字段设置为es中索引的_id,不使用es自动生成的随机_id,提供写入速度
    document_id => "%{id}"
    #定义es中索引名称,如es没有创建或不存在,会自动创建,自动创建的索引可能会不满足要求,建议使用自行创建的索引
    index => "xxxx_index"
    document_type => "_doc"
  }
 
# 以下配置为debug,可在测试中开启,方便定位问题,生产或正式使用务必关闭,以免产生大量日志
  stdout {
    codec => rubydebug
  }
 
}
  • jdbc_driver_library

mysql-connector-java-5.1.44.jar的存放目录,这个一定要配置正确,支持全路径和相对路径。

  • jdbc_driver_class

驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了

  • sql_last_value

标志目前logstash同步的位置信息(类似offset)。比如id、updatetime。logstash通过这个标志,可以判断目前同步到哪一条数据。

  • statement

执行同步的sql语句,可以同步部分数据;

  • statement_filepath

存储执行同步的sql语句,不和statement同时使用;

  • schedule

定时器,表示每隔多长时间同步一次数据,格式类似crontab;

  • tracking_column

表示表中哪一列用于判断logstash同步的位置信息,与sql_last_value比较判断是否需要同步这条数据;

  • tracking_column_type

racking_column指定列的类型,支持两种类型:numeric(默认)、timestamp;

  • last_run_metadata_path

存储sql_last_value值的文件名称及位置。

  • document_id

生成elasticsearch的文档值,尽量使用同步的数据中已有的唯一标识。

注意⚠️

1、同步单张表可以使用:logstash -f /path/to/xxxx.conf

2、同步多张表时,可通过pipeline管道的方式

3、当表数据超过百万级别时,建议分批同步,可按照每100w条数据同步一次来进行,以加快同步速度,具体可使用表id的区间来确定同步数量,则在SQL中增加id区间值。

多表同步的pipeline.yml配置如下:

- pipeline.id: xxxx1
  pipeline.workers: 1
  path.config: /data/www/logstash-7.6.1/member_conf/xxxx1.conf
 
- pipeline.id: xxxx2
  pipeline.workers: 1
  path.config: /data/www/logstash-7.6.1/member_conf/xxxx2.conf
 
- pipeline.id: xxxx3
  pipeline.workers: 1
  path.config: /data/www/logstash-7.6.1/member_conf/xxxx3.conf
 
- pipeline.id: xxxx4
  pipeline.workers: 1
  path.config: /data/www/logstash-7.6.1/member_conf/xxxx4.conf

四:启动

单表启动,指定conf配置文件的位置

bin/logstash -f config/logstash-mysql-es.conf > llogstash.log 2>&1 &

当使用多表同步时,启动时,不需要指定具体的conf配置文件,即:

bin/logstash > logstash.log 2>&1 &