一) mysql 数据使用logstash 导入elasticsearch
1. 安装 logstash 插件 logstash-input-jdbc
bin/logstash-plug install --no-verify logstash-input-jdbc
2. 启动
input {
jdbc {
#驱动绝对路径
jdbc_driver_library => "/root/mysql-connector-java-5.1.43-bin.jar"
#驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
#连接池配置, mysql数据库连接 DAHLIA 为数据库名
jdbc_connection_string => "jdbc:mysql://10.15.206.203:3306/DAHLIA"
#连接池配置, 使用前是否验证连接
jdbc_validate_connection => true
#连接池配置, 数据库用户名
jdbc_user => "foo"
#连接池配置, 数据库密码
jdbc_password => "bar"
#连接池配置, 是否启用分页, 启用后,需要使用 jdbc_page_size 设置每次查询的结果集大小
jdbc_paging_enabled => true
#连接池配置, 每次查询的结果集大小, 必须设置 jdbc_paging_enabled 为 true 才有效
jdbc_page_size => "1000"
#查询结果集的语句
statement => "select * from tablename where update_time >:sql_last_value"
#使用增量列值,而不是时间戳; 默认值为false
#use_column_value => true
#当设置 use_column_value 为true时(不跟踪时间戳查询), 将跟踪此列值
#tracking_column => "update_time"
#不设置时,默认值为 numeric; 可选 numeric, timestamp
tracking_column_type => "numeric"
#每分钟执行一次
schedule => "* * * * *"
}
}
filter {
mutate {
#删除默认message字段
remove_field => ["@version"]
#把数据写入到 source 字段
source => "message"
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
user => "foo"
password => "bar"
index => "news_info_shadow"
document_id => "%{id}"
}
stdout { codec => rubydebug }
}
statement
参数可以替换为 statement_filepath
, 从文件中执行sql查询; 参照- 本机启动多个logstash时,需要
--path.data
参数: logstash --path.data /path/to -f path/to/shipper.conf
jdbc_paging_enabled
jdbc_page_size
控制是否开启查询分页, 及分页大小; 默认100000, 根据机器内存抉择
二) 定时增量导入
input参数 scheduling
增量更新原理
把mysql的id 设为 es _id; 根据 sql_last_value 查出需要更新的数据, 如果 _id 存在则更新,不存在则新增
注意:
use_column_value => false
时, 才可以跟踪时间戳
官方文档里的参数还是要仔细看的
发现网上有些文章, 在没有设置 use_column_value
为true的情况下 指定 tracking_column
为某个列值, 根据官方文档的说明,这样设置并没有什么卵用.
三) 使用 ruby filter 对字符进行转码
filter {
ruby {
init => "require 'iconv'"
code => "
row = event.get('title')
#row = row.encoding.name
ic = Iconv.new('UTF-8', 'GBK')
row = ic.iconv(row)
event.set('test-covert', row)
"
}
}
四) 使用ik对title进行分词
PUT /news_info_shadow
{
"mappings": {
"_default_" : {
"_all" : {
"enabled" : false
}
},
"participle": {
"dynamic" : true,
"properties" : {
"id": {
"type": "long"
},
"title":{
"type": "text",
"analyzer": "ik_smart"
}
}
}
}
}
配置说明:
创建一个名叫 news_info_shadow 的索引,分词器用 ik_smart,并创建一个 participle 的类型,里面有一个 title 的字段,指定其使用 ik_max_word 分词器