目录
1.从mysql同步全量数据到hive无分区表的json文件配置
2.从mysql同步增量数据到hive无分区表的json文件配置
3.从mysql同步全量数据到hive分区表的json文件配置
6.从Postgre同步全量数据到hive分区表的json文件配置
7.从Postgre同步全量数据到hive分区表的json文件配置
DataX 是阿里云DataWorks数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
类型 | 数据源 | Reader(读) | Writer(写) |
---|---|---|---|
RDBMS 关系型数据库 | MySQL | MysqlReader | MysqlWriter |
Oracle | OracleReader | OracleWriter | |
OceanBase | oceanbasev10reader | oceanbasev10writer | |
SQLServer | SqlServerReader | SqlServerWriter | |
PostgreSQL | PostgresqlReader | PostgresqlWriter | |
DRDS | DrdsReader | DRDSWriter | |
Kudu | kuduwriter | ||
Clickhouse | clickwriter | ||
通用RDBMS(支持所有关系型数据库) | RDBMSReader | RDBMSWriter | |
阿里云数仓数据存储 | ODPS | ODPSReader | ODPSWriter |
ADS | ADSWriter | ||
OSS | OSSReader | OSSWriter | |
OCS | OCSWriter | ||
NoSQL数据存储 | OTS | OTSReader\otsstreamreader | OTSWriter |
Hbase0.94 | Hbase094XReader | Hbase094XWriter | |
Hbase1.1 | Hbase11XReader | Hbase11XWriter | |
Phoenix4.x | hbase11xsqlreader | HBase11xsqlwriter | |
Phoenix5.x | hbase20xsqlreader | HBase20xsqlwriter | |
MongoDB | MongoDBReader | MongoDBWriter | |
Hive | HdfsReader | HdfsWriter | |
Cassandra | CassandraReader | CassandraWriter | |
无结构化数据存储 | TxtFile | TxtFileReader | TxtFileWriter |
FTP | FtpReader | FtpWriter | |
HDFS | HdfsReader | HdfsWriter | |
Elasticsearch | ElasticSearchWriter | ||
时间序列数据库 | OpenTSDB | OpenTSDBReader | |
TSDB | TSDBReader | TSDBWriter | |
TDengine | TDengineReader | TDengineWriter |
数据源参考指南:GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://ip:port/db_name?useSSL=false"],
"querySql": ["select * from table_name"],
}
],
"username": "username",
"password": "password"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://ip:port",
"fileType": "text",
"path": "/user/hive/warehouse/db_name.db/hive_table_name_da",
"fileName": "hive_table_name",
"column": [
{"name":"id","type":"int"},
{"name":"name","type":"string"}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"encoding": "utf-8"
}
}
}],
"setting": {
"speed": {
"channel": "1"
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
}
}
}
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://ip:port/db_name?useSSL=false"],
"querySql": ["select * from mysql_table_name where date(date_created)='${date_create}'"],
}
],
"username": "username",
"password": "password"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://ip:port",
"fileType": "text",
"path": "/user/hive/warehouse/db_name.db/hive_table_name_da",
"fileName": "hive_table_name",
"column": [
{"name":"id","type":"int"},
{"name":"name","type":"string"}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"encoding": "utf-8"
}
}
}],
"setting": {
"speed": {
"channel": "1"
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
}
}
}
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://ip:port/db_name?useSSL=false"],
"querySql": ["select * from mysql_table_name"],
}
],
"username": "username",
"password": "password"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://ip:port",
"fileType": "text",
"path": "/user/hive/warehouse/db_name.db/hive_table_name_ds/ds=2022-09-16",
"fileName": "hive_table_name",
"column": [
{"name":"id","type":"int"},
{"name":"name","type":"string"}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"encoding": "utf-8"
}
}
}],
"setting": {
"speed": {
"channel": "1"
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
}
}
}
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "hdfsreader",
"parameter": {
"path":"/user/hive/warehouse/db_name.db/hive_table_name",
"defaultFS": "hdfs://ip:port",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "string"
},
{
"index": 3,
"type": "long"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": "\t"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "username",
"password": "password",
"column": [
"id",
"name",
"age"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"truncate table mysql_table_name"
],
"connection": [{
"jdbcUrl": "jdbc:mysql://ip:port/db_name?useUnicode=true&characterEncoding=utf8",
"table": [
"mysql_table_name"
]
}]
}
}
}]
}
}
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "hdfsreader",
"parameter": {
"path":"/user/hive/warehouse/db_name.db/hive_table_name/ds=${ds}",
"defaultFS": "hdfs://ip:port",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "string"
},
{
"index": 3,
"type": "long"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": "\t"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "username",
"password": "password",
"column": [
"id",
"name",
"age"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"truncate table mysql_table_name"
],
"connection": [{
"jdbcUrl": "jdbc:mysql://ip:port/db_name?useUnicode=true&characterEncoding=utf8",
"table": [
"mysql_table_name"
]
}]
}
}
}]
}
}
{
"job": {
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:postgresql://ip:port/pg_db_name"],
"querySql": ["select * from pg_table_name"],
}
],
"username": "username",
"password": "password"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://ip:port",
"fileType": "text",
"path": "/user/hive/warehouse/hive_db_name.db/hive_table_name/ds=${ds}",
"fileName": "hive_table_name",
"column": [
{"name":"id","type":"bigint"},
{"name":"name","type":"string"},
{"name":"date_create","type":"string"}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"encoding": "utf-8"
}
}
}],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
{
"job": {
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:postgresql://ip:[ort/pg_db_name"],
"querySql": ["select * from pg_table_name where date_create='${date_create}'"],
}
],
"username": "username",
"password": "password"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://ip:port",
"fileType": "text",
"path": "/user/hive/warehouse/hive_db_name.db/hive_table_name/ds=${ds}",
"fileName": "hive_table_name",
"column": [
{"name":"id","type":"bigint"},
{"name":"name","type":"string"},
{"name":"date_create","type":"string"}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"encoding": "utf-8"
}
}
}],
"setting": {
"speed": {
"channel": "1"
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
}
}
}
{
"core":{
"transport": {
"channel": {
"speed": {
"byte": 104857600,
"record": 200000
}
}
}
},
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "username",
"password": "password",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://ip:port/mysql_db_name"
],
"querySql": [
"select * from mysql_table_name;"
]
}
]
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"username": "username",
"password": "password",
"database": "db_name",
"table": "table_name",
"column": [ "column1","column2","column3"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://ip:port/",
"feLoadUrl": ["cdh3:port"],
"beLoadUrl": ["cdh1:port", "cdh2:port", "cdh3:port"],
"loadProps": {
},
"maxBatchRows" : 200000,
"maxBatchByteSize" : 104857600,
"lineDelimiter": "\n"
}
}
}
]
}
}
执行命令
$ python datax.py conf.json