Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >怎样将 MySQL 数据表导入到 Elasticsearch

怎样将 MySQL 数据表导入到 Elasticsearch

作者头像
netkiller old
发布于 2018-03-05 10:36:00
发布于 2018-03-05 10:36:00
5.5K00
代码可运行
举报
文章被收录于专栏:NetkillerNetkiller
运行总次数:0
代码可运行

本文节选自《Netkiller Database 手札》

MySQL 导入 Elasticsearch 的方法有很多,通常是使用ETL工具,但我觉得太麻烦。于是想到 logstash 。

23.8. Migrating MySQL Data into Elasticsearch using logstash

23.8.1. 安装 logstash

安装 JDBC 驱动 和 Logstash

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
curl -s https://raw.githubusercontent.com/oscm/shell/master/database/mysql/5.7/mysql-connector-java.sh	 | bash			
curl -s https://raw.githubusercontent.com/oscm/shell/master/log/kibana/logstash-5.x.sh | bash			

mysql 驱动文件位置在 /usr/share/java/mysql-connector-java.jar

23.8.2. 配置 logstash

创建配置文件 /etc/logstash/conf.d/jdbc-mysql.conf

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
			mysql> desc article;
+-------------+--------------+------+-----+---------+-------+
| Field       | Type         | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+-------+
| id          | int(11)      | NO   |     | 0       |       |
| title       | mediumtext   | NO   |     | NULL    |       |
| description | mediumtext   | YES  |     | NULL    |       |
| author      | varchar(100) | YES  |     | NULL    |       |
| source      | varchar(100) | YES  |     | NULL    |       |
| ctime       | datetime     | NO   |     | NULL    |       |
| content     | longtext     | YES  |     | NULL    |       |
+-------------+--------------+------+-----+---------+-------+
7 rows in set (0.00 sec)			
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
			input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from article"
  }
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        
    }
}			

23.8.3. 启动 Logstash

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
			root@netkiller /var/log/logstash % systemctl restart logstash

root@netkiller /var/log/logstash % systemctl status logstash
● logstash.service - logstash
   Loaded: loaded (/etc/systemd/system/logstash.service; enabled; vendor preset: disabled)
   Active: active (running) since Mon 2017-07-31 09:35:00 CST; 11s ago
 Main PID: 10434 (java)
   CGroup: /system.slice/logstash.service
           └─10434 /usr/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC -Djava.awt.headless=true -Dfi...

Jul 31 09:35:00 VM_3_2_centos systemd[1]: Started logstash.
Jul 31 09:35:00 VM_3_2_centos systemd[1]: Starting logstash...
			
root@netkiller /var/log/logstash % cat logstash-plain.log 
[2017-07-31T09:35:28,169][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-07-31T09:35:28,172][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-07-31T09:35:28,298][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<Java::JavaNet::URI:0x453a18e9>}
[2017-07-31T09:35:28,299][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-07-31T09:35:28,337][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-07-31T09:35:28,344][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2017-07-31T09:35:28,465][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<Java::JavaNet::URI:0x66df34ae>]}
[2017-07-31T09:35:28,483][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-07-31T09:35:29,562][INFO ][logstash.pipeline        ] Pipeline main started
[2017-07-31T09:35:29,700][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-07-31T09:36:01,019][INFO ][logstash.inputs.jdbc     ] (0.006000s) select * from article	

23.8.4. 验证

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
			% curl -XGET 'http://localhost:9200/_all/_search?pretty'			

23.8.5. 配置模板

23.8.5.1. 全量导入

适合数据没有改变的归档数据或者只能增加没有修改的数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from article"
  }
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        
    }
}				
23.8.5.2. 多表导入

多张数据表导入到 Elasticsearch

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				# multiple inputs on logstash jdbc

input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from article"
    type => "article"
  }
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from comment"
    type => "comment"
  } 
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "%{type}"
        document_id => "%{id}"
        
    }
}				

需要在每一个jdbc配置项中加入 type 配置,然后 elasticsearch 配置项中加入 document_type => "%{type}"

23.8.5.3. 通过 ID 主键字段增量复制数据
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				input {
  jdbc {
    statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric"
    # ... other configuration bits
  }
}				

tracking_column_type => "numeric" 可以声明 id 字段的数据类型, 如果不指定将会默认为日期

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2017-07-31T11:08:00,193][INFO ][logstash.inputs.jdbc     ] (0.020000s) select * from article where id > '2017-07-31 02:47:00'				

如果复制不对称可以加入 clean_run => true 配置项,清楚数据

23.8.5.4. 通过日期字段增量复制数据
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				input {
  jdbc {
    statement => "SELECT * FROM my_table WHERE create_date > :sql_last_value"
    use_column_value => true
    tracking_column => "create_date"
    # ... other configuration bits
  }
}				

如果复制不对称可以加入 clean_run => true 配置项,清楚数据

23.8.5.5. 指定SQL文件

statement_filepath 指定 SQL 文件,有时SQL太复杂写入 statement 配置项维护部方便,可以将 SQL 写入一个文本文件,然后使用 statement_filepath 配置项引用该文件。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				input {
    jdbc {
        jdbc_driver_library => "/path/to/driver.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_url => "jdbc://postgresql"
        jdbc_user => "neo"
        jdbc_password => "password"
        statement_filepath => "query.sql"
    }
}				
23.8.5.6. 参数传递

将需要复制的条件参数写入 parameters 配置项

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
				input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    parameters => { "favorite_artist" => "Beethoven" }
    schedule => "* * * * *"
    statement => "SELECT * from songs where artist = :favorite_artist"
  }
}				
23.8.5.7. 控制返回JDBC数据量
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
	jdbc_fetch_size => 1000  #jdbc获取数据的数量大小
	jdbc_page_size => 1000 #jdbc一页的大小,
	jdbc_paging_enabled => true  #和jdbc_page_size组合,将statement的查询分解成多个查询,相当于: SELECT * FROM table LIMIT 1000 OFFSET 4000 				

23.8.6. example

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
			input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "password"
    schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次
    statement => "select id, title, description, author, source, ctime, content from article where id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article.last"
  }
}
output {
    elasticsearch {
    		hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        action => "update"  # 操作执行的动作,可选值有["index", "delete", "create", "update"]
        doc_as_upsert => true  #支持update模式
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-07-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Netkiller 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
利用Logstash插件进行Elasticsearch与Mysql的数据
Logstash与Elasticsearch的安装就不多说了,我之前有两篇文章写的比较详细了ElasticSearch + Logstash + Kibana 搭建笔记 和 Filebeat+Logstash+ElasticSearch+Kibana搭建Apache访问日志解析平台。
大江小浪
2018/07/24
1.2K0
ElasticSearch 使用 Logstash 从 MySQL 中同步数据
目的是希望将现有的数据导入到 ElasticSearch 中,研究了好几种,除了写代码的方式,最简便的就是使用 Logstash 来导入数据到 ElasticSearch 中了。
皇上得了花柳病
2020/05/06
3.9K0
Logstash-input-jdbc 同步 mysql 准实时数据至 ElasticSearch 搜索引擎
logstash-input-jdbc 插件将 Zabbix 数据库中 alerts 表告警数据推送至 ElasticSearch 搜索引擎。
Kevin song
2020/08/28
2.8K0
Logstash-input-jdbc 同步 mysql 准实时数据至 ElasticSearch 搜索引擎
Logstash-解决elasticsearch和Mysql数据库的同步问题
Logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行自定义的处理,然后传输到指定的位置,比如某个服务器或者文件。
名字是乱打的
2022/05/13
8610
Logstash-解决elasticsearch和Mysql数据库的同步问题
MySQL数据以全量和增量方式,向ES搜索引擎同步流程
/usr/local/logstash/sync-config/cicadaes.conf
知了一笑
2019/09/09
1.2K0
MySQL数据以全量和增量方式,向ES搜索引擎同步流程
unbuntu下安装Elasticsrearch+logstash+elasticsearch-analysis-ik
1. 安装 elasticsearch 第一次使用的是 apt-get 的安装方式, 应该是软件源没设置为最新的, 结果安装的版本为1.7x的, 果断删除. 第二次直接将 elasticsearch 的 zip 包下载下来安装. wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.1.zip unzip elasticsearch-5.5.1.zip 配置. 将elasticsearch.yml 的 clus
zcqshine
2018/05/11
8500
Linux 安装 logstash 并同步 MySQL 数据库
前两篇文章分别讲了 Linux 和 Windows 环境安装 Elasticsearch,有兴趣可以点击以下链接查看: 《windows10 安装 ElasticSearch》 《Linux 下安装 Elasticsearch》
王图思睿
2021/06/15
3.7K0
Elasticsearch父子关系
Elasticsearch父子关系 5.x参考 官网join介绍 es6.x一对多方案参考
丁D
2022/08/12
5870
Logstash使用jdbc_input同步Mysql数据时遇到的空时间SQLException问题
今天在使用Logstash的jdbc_input插件同步Mysql数据时,本来应该能搜索出10条数据,结果在Elasticsearch中只看到了4条,终端中只给出了如下信息
大江小浪
2018/07/24
1.4K0
ElasticSearch 同步的方式
ElasticSearch 是一款强大的分布式搜索和分析引擎,支持多种方式同步数据和日志。下面介绍几种常见的同步方式:
疯狂的KK
2023/07/24
7290
ElasticSearch 同步的方式
ELK —— Logstash 将 MySQL 数据同步至 ElasticSearch
Author:Gorit Date:2021/4/7 Refer:各种同类文章参考融合 + 自己的思考总结 2021年发表博文: 16/50
Gorit
2021/12/08
1.5K0
ELK —— Logstash 将 MySQL 数据同步至 ElasticSearch
logstash使用template提前设置好maping同步mysql数据到Elasticsearch5.5.2
上篇blog说到采用logstash-input-jdbc将mysql数据同步到ES(http://www.cnblogs.com/jstarseven/p/7704893.html),但是这里有一个问题,即假如我不需要logstash自动对mysql数据提供的mapping模板怎么办,毕竟我的数据需要ik分词,同义词解析等。。。
大道七哥
2019/09/10
2.5K0
logstash使用template提前设置好maping同步mysql数据到Elasticsearch5.5.2
logstash sql 数据采集
Elasticsearch 6.3 发布SQL模块作为C-Pack的一部分使用 kabana官方工具查询 Dev Tools - console 查看 POST /_xpack/sql?form
以谁为师
2019/05/30
1.5K0
通过Logstash复制MySQL数据到ElasticSearch
-P: 随机端口映射,容器内部端口随机映射到主机的端口,格式为:主机(宿主)端口:容器端口
iiopsd
2022/12/23
8630
logstash同步数据到es
ES采用UTC 时区,比北京时间早8小时,所以ES读取数据时让最后更新时间加8小时
周杰伦本人
2022/10/25
5340
[Springboot实战系列]整合ElasticSearch实现数据模糊搜索(Logstash同步Mysql数据)
本文介绍了如何整合搜索引擎elasticsearch与springboot,对外提供数据查询接口。
蛮三刀酱
2019/03/26
6.2K0
[Springboot实战系列]整合ElasticSearch实现数据模糊搜索(Logstash同步Mysql数据)
logstash-input-jdbc
https://segmentfault.com/a/1190000011784259
丁D
2022/08/12
6220
elasticsearch5xx使用logstash同步mysql
下载地址:https://www.elastic.co/downloads/logstash
全科
2018/08/15
6240
【Elasticsearch专栏 10】深入探索:Elasticsearch如何进行数据导入和导出
在Elasticsearch中,数据导入和导出是常见的操作,通常涉及到将数据从外部数据源导入到Elasticsearch索引中,或者从Elasticsearch索引导出数据到外部数据源。Elasticsearch提供了多种方法来进行数据导入和导出,包括使用官方提供的工具、API以及第三方工具。以下将详细描述这些方法和相关的代码片段或命令。
夏之以寒
2024/03/04
3.4K0
mysql同步elasticsearch调研
目前项目采用的是更新数据后再更新elasticsearch,各种历史原因导致很多数据并不是同步的,业务互相紧耦合, 所以需要调研适合团队发展的 db同步es机制,从业务层面剔除这部分功能维护。 下面是本人在搭建、配置、调试过程中一些总结和踩完坑后整理的配置。
用户2825413
2019/07/15
1.1K0
mysql同步elasticsearch调研
推荐阅读
相关推荐
利用Logstash插件进行Elasticsearch与Mysql的数据
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档