且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

logstash同步MySQL数据到ElasticSearch

更新时间:2022-08-22 14:24:14

Jdbc input plugin 配置选项

Setting

Input type

Required

Default

clean_run

boolean

No

false

columns_charset

hash

No

{}

connection_retry_attempts

number

No

1

connection_retry_attempts_wait_time

number

No

0.5

jdbc_connection_string

string

Yes

-

jdbc_default_timezone

string

No

-

jdbc_driver_class

string

Yes

-

jdbc_driver_library

string

No

-

jdbc_fetch_size

number

No

-

jdbc_page_size

number

No

100000

jdbc_paging_enabled

boolean

No

false

jdbc_password

password

No

-

jdbc_password_filepath

string

No

-

jdbc_pool_timeout

number

No

5

jdbc_user

string

Yes

-

jdbc_validate_connection

boolean

No

false

jdbc_validation_timeout

number

No

3600

last_run_metadata_path

string

No

“$HOME/.logstash_jdbc_last_run”

lowercase_column_names

boolean

No

true

parameters

hash

No

{}

plugin_timezone

string, one of [“local”, “utc”]

No

“utc”

prepared_statement_bind_values

array

No

[]

prepared_statement_name

string

No

“”

record_last_run

boolean

No

true

schedule

string

No

-

sql_log_level

string, one of [“fatal”, “error”, “warn”, “info”, “debug”]

No

“info”

statement

string

No

-

statement_filepath

string

No

-

tracking_column

string

No

-

tracking_column_type

string, one of [“numeric”, “timestamp”]

No

“numeric”

use_column_value

boolean

No

false

use_prepared_statements

boolean

No

false


配置文档

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

配置示例

input {
  jdbc {
    # jdbc驱动包位置
    jdbc_driver_library => "mysql-connector-java-8.0.16.jar"
    # 驱动类
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    # 数据库连接信息, 8.0以上版本:一定要把serverTimezone=UTC天加上
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/data?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
    # 用户
    jdbc_user => "root"
    # 密码
    jdbc_password => "123456"
    # 定时任务,默认一分钟
    schedule => "* * * * *"
    # 是否清空上次sql_last_value记录
    clean_run => false
    # 执行的语句
    statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"
    # 分页
    jdbc_paging_enabled => "true"
    jdbc_page_size => "5000"
    # 使用递增列的值
    use_column_value => true
    # 递增字段的类型
    tracking_column_type => "timestamp"
    # 递增字段的名称
    tracking_column => "update_time"
    # 同步点文件
    last_run_metadata_path => "syncpoint_table"
  }
}
output {
    elasticsearch {
        # ES的IP地址及端口
        hosts => ["http://127.0.0.1:9200"]
        # 索引名称 可自定义
        index => "user"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{id}"
    }
    # stdout {
        # JSON格式输出
        # codec => json_lines
    # }
}