Elasticsearch和Mysql的同步-logstash-input-jdbc

配置好elasticsearch后,需要将mysql的数据导入到elasticsearch中。网上查看,有好几种同步插件。我在elasticsearch-jdbc和 logstash-input-jdbc 中选择。两种我都尝试了下,由于我的elasticsearch版本是官网最新的6.1.1版本,官方现在推荐的都是logstash,于是把精力放在了logstash上。具体步骤如下:

1、logstash安装

Logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行自定义的处理,然后传输到指定的位置,比如某个服务器或者文件。直接下载解压缩放在适当目录就行,免安装。下面是我的目录地址:

1
2
3
cd /usr/local/src
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.1.1.tar.gz
tar -zxvf logstash-6.1.1.tar.gz

2、logstash-input-jdbc安装

Logstash5.x以上版本本身自带有logstash-input-jdbc,但是插件默认没有绑定,还需安装,只需要简单命令行即可。

1
2
cd /usr/local/src/logstash-6.1.1
bin/logstash-plugin install logstash-input-jdbc

等待几分钟安装成功后(可能需要多等待一会儿),我们可以在logstash根目录下的以下目录查看对应的插件版本

1
cd /usr/local/src/logstash-6.1.1/vendor/bundle/jruby/2.3.0/gems/logstash-input-jdbc-4.3.2/

对应的jdbc插件版本是4.3.2

3、下载mysql数据库连接包

1
2
3
cd /usr/local/src
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.45.tar.gz
tar -zxvf mysql-connector-java-5.1.45.tar.gz

4、编写同步测试的配置文件

1
2
3
cd /usr/local/src/logstash-6.1.1/bin/
mkdir mysql-config
vim jdbc.conf

jdbc.conf的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
input {
stdin {
}
jdbc {
    #mysql相关jdbc配置
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/hety.net?serverTimezone=UTC&zeroDateTimeBehavior=convertToNull"
    jdbc_user => "myuser"
    jdbc_password => "mypass"
    #jdbc连接mysql驱动的文件目录,第3步下载的文件目录
    jdbc_driver_library => "/usr/local/src/mysql-connector-java-5.1.45/mysql-connector-java-5.1.45-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    #mysql文件, 也可以直接写SQL语句在此处
    #statement => "SELECT * from member"
    statement_filepath => "/usr/local/src/logstash-6.1.1/bin/mysql-config/jdbc.sql"
    #定时操作,和crontab一样
    schedule => "*/2 * * * *"
    type => "jdbc_member"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到
    last_run_metadata_path 指定的文件中
    record_last_run => "true"
    #是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
    use_column_value => "true"
    #如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
    tracking_column => "member_id"
    last_run_metadata_path => "/usr/local/src/logstash-6.1.1/config/last_id"
    #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    clean_run => "false"
    #是否将 字段(column) 名称转小写
    lowercase_column_names => "false"
   }
}
#此处暂不做过滤处理,如果需要可添加
filter {}
output {
#输出到elasticsearch的配置,配置很简单,一看就懂,就不作说明了
elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "hetynet"
    document_id => "%{member_id}"
    document_type => "member"
    template_overwrite => true
   }

}

下面是mysql文件jdbc.sql,注意:sql_last_value

1
select * from member where member_id > (:sql_last_value-100)

4、遇到的问题

以下内容转自网络,但是很有用处。我基本上也遇到了同样的问题和疑问。

1)elasticsearch数据重复以及增量同步

在默认配置下,tracking_column这个值是@timestamp,存在elasticsearch就是_id值,是logstash存入elasticsearch的时间,这个值的主要作用类似mysql的主键,是唯一的,但是我们的时间戳其实是一直在变的,所以我们每次使用select语句查询的数据都会存入elasticsearch中,导致数据重复。
解决方法:在要查询的表中,找主键或者自增值的字段,将它设置为_id的值,因为_id值是唯一的,所以,当有重复的_id的时候,数据就不会重复


1
2
3
4
5
6
7
8
  <span class="hljs-comment">// 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中</span>
  record_last_run =&gt; <span class="hljs-string">"true"</span>

  <span class="hljs-comment">// 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.</span>
  use_column_value =&gt; <span class="hljs-string">"true"</span>

  <span class="hljs-comment">// 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键</span>
  tracking_column =&gt; <span class="hljs-string">"autoid"</span>

2).数据同步频繁,影响mysql数据库性能

我们写入jdbc.sql文件的mysql语句是写死的,所以每次查询的数据库有很多是已经不需要去查询的,尤其是每次select * from table;的时候,对mysql数据库造成了非常大的压力

解决:A 根据业务需求,可以适当修改定时同步时间,我这里对实时性相对要求较高,因此设置了10分钟


1
2
  <span class="hljs-comment">// 这里类似crontab,可以定制定时操作,比如每10分钟执行一次同步(分 时 天 月 年)</span>
  schedule =&gt; <span class="hljs-string">"*/10 * * * *"</span>

B 设置mysql查询范围,防止大量的查询拖死数据库


1
2
3
4
5
6
7
8
  <span class="hljs-comment">// 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键</span>
  tracking_column =&gt; <span class="hljs-string">"autoid"</span>

  <span class="hljs-comment">// 上次执行数据库的值,该值是上次查询时tracking_column设置的字段最大值</span>
  last_run_metadata_path =&gt; <span class="hljs-string">"/opt/logstash/conf/last_id"</span>

  <span class="hljs-comment">// 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录</span>
  clean_run =&gt; <span class="hljs-string">"false"</span>

在sql语句这里设置select * from WHERE autoid > :sql_last_value;
注意:如果你的语句比较复杂,autoid > :sql_last_value一定要写在WHERE后面,然后接AND即可

3).elasticsearch存储容量不断上升

稍微观察下就会发现,即使没有新的数据写入到elasticsearch里面,但只要logstash定时每次运行,elasticsearch容量就不断上升

过一段时间看,占用空间增大,其实elasticsearch数据是一样的

原因:在elasticsearch/nodes/0/indices/jdbc/{0,1,2,3,4}/下有个translog,这个是elasticsearch的事务日志,类似mysql的binlog。elasticsearch为了数据安全,接收到数据后,先将数据写入内存和translog,然后再建立索引写入到磁盘,这样即使突然断电,重启后,还可以通过translog恢复,不过这里由于我们每次查询都有很多重复的数据,而这些重复的数据又没有写入到elasticsearch的索引中,所以就囤积了下来

解决:查询官网说会定期refresh,会自动清理掉老的日志,因此可不做处理

4).增量同步和mysql范围查询导致mysql数据库有修改时无法同步到以前的数据

增量同步解决了,mysql每次都小范围查询,解决了数据库压力的问题,不过却导致无法同步老数据的修改问题

解决:可根据业务状态来做,如果你数据库是修改频繁类型,那只能做全量更新了,但是高频率大范围扫描数据库来做的索引还不如不做索引了(因为建立索引也是有成本的),我们做索引主要是针对一些数据量大,不常修改,很消耗数据库性能的情况。我这里是数据修改较少,而且修改也一般是近期数据,因为同步时,我在mysql范围上面稍微调整一下

如:autoid > (:sql_last_value-100000),每次扫描上次扫描范围往之前再多10W行,这样扫描的数据量相对较小,也照顾到了可能会修改的数据类型

但是范围扫描还存在一个问题,就是过往的数据写入了elasticsearch之后,如果有修改,而又不在范围扫描以内,那么elasticsearch就不会同步到。因此,我们还可以定期做一次全量或者更大范围的同步,只需要修改范围值即可。具体的值当然可以根据业务来定

Leave a Comment.