一) mysql 数据使用logstash 导入elasticsearch

1. 安装 logstash 插件 logstash-input-jdbc

bin/logstash-plug install --no-verify logstash-input-jdbc

2. 启动

input {
  jdbc {
    #驱动绝对路径
    jdbc_driver_library => "/root/mysql-connector-java-5.1.43-bin.jar"
    #驱动类名
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    #连接池配置, mysql数据库连接 DAHLIA 为数据库名
    jdbc_connection_string => "jdbc:mysql://10.15.206.203:3306/DAHLIA"
    #连接池配置, 使用前是否验证连接
    jdbc_validate_connection => true
    #连接池配置, 数据库用户名
    jdbc_user => "foo"
    #连接池配置, 数据库密码
    jdbc_password => "bar"
    #连接池配置, 是否启用分页, 启用后,需要使用 jdbc_page_size 设置每次查询的结果集大小
    jdbc_paging_enabled => true
    #连接池配置, 每次查询的结果集大小, 必须设置 jdbc_paging_enabled 为 true 才有效
    jdbc_page_size => "1000"
    #查询结果集的语句
    statement => "select * from tablename where update_time >:sql_last_value"
    #使用增量列值,而不是时间戳; 默认值为false
    #use_column_value => true
    #当设置 use_column_value 为true时(不跟踪时间戳查询), 将跟踪此列值
    #tracking_column => "update_time"
    #不设置时,默认值为 numeric; 可选 numeric, timestamp
    tracking_column_type => "numeric"
    #每分钟执行一次
    schedule => "* * * * *"
  }
}


filter {
    mutate {
        #删除默认message字段
        remove_field => ["@version"]
        #把数据写入到 source 字段
        source => "message"
    }
}

output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        user => "foo"
        password => "bar"
        index => "news_info_shadow"
        document_id => "%{id}"
    }
    stdout { codec => rubydebug }
}
  • statement 参数可以替换为 statement_filepath, 从文件中执行sql查询; 参照
  • 本机启动多个logstash时,需要--path.data参数: logstash --path.data /path/to -f path/to/shipper.conf
  • jdbc_paging_enabled jdbc_page_size 控制是否开启查询分页, 及分页大小; 默认100000, 根据机器内存抉择

二) 定时增量导入

input参数 scheduling

增量更新原理

把mysql的id 设为 es _id; 根据 sql_last_value 查出需要更新的数据, 如果 _id 存在则更新,不存在则新增

注意:

use_column_value => false 时, 才可以跟踪时间戳

官方文档里的参数还是要仔细看的

发现网上有些文章, 在没有设置 use_column_value 为true的情况下 指定 tracking_column 为某个列值, 根据官方文档的说明,这样设置并没有什么卵用.

三) 使用 ruby filter 对字符进行转码

filter {
    ruby {
        init => "require 'iconv'"
        code => "
            row = event.get('title')
            #row = row.encoding.name
            ic = Iconv.new('UTF-8', 'GBK')
            row = ic.iconv(row)
            event.set('test-covert', row)
        "
    }
}

四) 使用ik对title进行分词

PUT /news_info_shadow
{
  "mappings": {
    "_default_" : {
      "_all" : {
        "enabled" : false
      }
    },
    "participle": {
      "dynamic" : true,
      "properties" : {
        "id": {
          "type": "long"
        },
        "title":{
          "type": "text",
          "analyzer": "ik_smart"
        }
      }
    }
  }
}

分词插件 ik

配置说明:

创建一个名叫 news_info_shadow 的索引,分词器用 ik_smart,并创建一个 participle 的类型,里面有一个 title 的字段,指定其使用 ik_max_word 分词器