即按天分表,一天的数据存放于一张表中,rowkey 采用随机值,不需要有特定规律,尽可能的散列。
rowkey 的设计要具体问题具体分析,有时会采取倒序的原则,有时会采取 rowkey 前加上一个随机的数字。(该数字一般要和 HregionServer 的数量求模运算)
根据业务预估数据量,提前建好预分区,避免 region 频繁拆分合并造成的性能浪费。
操作流程:HBase 读取数据 -> InputFormat -> map -> shuffle -> reduce -> OutputFormat -> Mysql
在多维分析的商业智能解决方案中,根据事实表
和维度表
的关系,又可将常见的模型分为星型模型
和雪花型模型
。在设计逻辑型数据的模型的时候,就应考虑数据是按照星型模型还是雪花型模型进行组织。
雪花型结构去除了数据冗余
。
雪花模型在加载数据集时,ETL 操作在设计上更加复杂,而且由于附属模型的限制,不能并行化
。
星形模型加载维度表,不需要再维度之间添加附属模型,因此 ETL 就相对简单,而且可以实现高度的并行化。
代码结构图
部分示例代码如下:
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.z</groupId>
<artifactId>transformer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>transformer</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-resourcemanager -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<version>2.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<!-- mysql start -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<!-- mysql end -->
<!-- 用户浏览器解析 -->
<dependency>
<groupId>cz.mallat.uasparser</groupId>
<artifactId>uasparser</artifactId>
<version>0.6.1</version>
</dependency>
<!-- json包 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.6.2</version>
</dependency>
</dependencies>
<profiles>
<profile>
<!-- 唯一id,表示本地 -->
<id>local</id>
<activation>
<!-- maven编译的时候,默认环境,该参数为true只能存在一个 -->
<activeByDefault>true</activeByDefault>
</activation>
<build>
<!-- 插件信息 -->
<plugins>
<plugin>
<!-- 将指定包的java文件进行编译打包操作 -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.4</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/src/main/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<!-- 需要最终形成一个jar文件 -->
<id>dev</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.4</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/src/main/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<!-- 将第三方的依赖包,一起打入到最终形成的jar文件中 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>cz.mallat.uasparser:uasparser</include>
<include>net.sourceforge.jregex:jregex</include>
<include>mysql:mysql-connector-java</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.codehaus.mojo</groupId>
<artifactId>
build-helper-maven-plugin
</artifactId>
<versionRange>[1.4,)</versionRange>
<goals>
<goal>add-source</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
resources 目录下文件
core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<!-- 指定HDFS中NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop102:9000</value>
</property>
<!-- 指定Hadoop运行时产生文件的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-2.7.2/data/tmp</value>
</property>
<property>
<name>hadoop.proxyuser.admin.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.admin.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.groups</name>
<value>*</value>
</property>
<!-- 配置垃圾回收时间为1分钟
<property>
<name>fs.trash.interval</name>
<value>1</value>
</property>
-->
<!-- 修改访问垃圾回收站用户名称为 atguigu
<property>
<name>hadoop.http.staticuser.user</name>
<value>atguigu</value>
</property>
-->
</configuration>
hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop102:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 0.98后的新变动,之前版本没有.port,默认端口为16000 -->
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/module/zookeeper-3.4.10/zkData</value>
</property>
<property>
<name>hbase.coprocessor.region.classes</name>
<value>com.china.hbase.CalleeWriteObserver</value>
</property>
<property>
<name>zookeeper.session.timeout</name>
<value>90000</value>
</property>
</configuration>
hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<!-- 指定HDFS副本的数量,默认是3个 -->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!-- 指定Hadoop辅助名称节点主机配置 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop104:50090</value>
</property>
<!-- 关闭权限检查-->
<property>
<name>dfs.permissions.enable</name>
<value>false</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<!-- NameNode的本地目录可以配置成多个,且每个目录存放内容相同,增加了可靠性。
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///${hadoop.tmp.dir}/dfs/name1,file:///${hadoop.tmp.dir}/dfs/name2</value>
</property>
-->
<!-- DataNode也可以配置成多个目录,每个目录存储的数据不一样。即:数据不是副本。
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2</value>
</property>
-->
<!-- 白名单信息
<property>
<name>dfs.hosts</name>
<value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts</value>
</property>
-->
<!-- 黑名单信息
<property>
<name>dfs.hosts.exclude</name>
<value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude</value>
</property>
-->
</configuration>
log4j.properties
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Define some default values that can be overridden by system properties
hadoop.root.logger=INFO,console
hadoop.log.dir=.
hadoop.log.file=hadoop.log
# Define the root logger to the system property "hadoop.root.logger".
log4j.rootLogger=${hadoop.root.logger}, EventCounter
# Logging Threshold
log4j.threshold=ALL
# Null Appender
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
#
# Rolling File Appender - cap space usage at 5gb.
#
hadoop.log.maxfilesize=256MB
hadoop.log.maxbackupindex=20
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format
#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
#
# Daily Rolling File Appender
#
log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
# Rollver at midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
# 30-day backup
#log4j.appender.DRFA.MaxBackupIndex=30
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format
#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
#
# console
# Add "console" to rootlogger above if you want to use this
#
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
#
# TaskLog Appender
#
#Default values
hadoop.tasklog.taskid=null
hadoop.tasklog.iscleanup=false
hadoop.tasklog.noKeepSplits=4
hadoop.tasklog.totalLogFileSize=100
hadoop.tasklog.purgeLogSplits=true
hadoop.tasklog.logsRetainHours=12
log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
#
# HDFS block state change log from block manager
#
# Uncomment the following to suppress normal block state change
# messages from BlockManager in NameNode.
#log4j.logger.BlockStateChange=WARN
#
#Security appender
#
hadoop.security.logger=INFO,NullAppender
hadoop.security.log.maxfilesize=256MB
hadoop.security.log.maxbackupindex=20
log4j.category.SecurityLogger=${hadoop.security.logger}
hadoop.security.log.file=SecurityAuth-${user.name}.audit
log4j.appender.RFAS=org.apache.log4j.RollingFileAppender
log4j.appender.RFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
log4j.appender.RFAS.MaxFileSize=${hadoop.security.log.maxfilesize}
log4j.appender.RFAS.MaxBackupIndex=${hadoop.security.log.maxbackupindex}
#
# Daily Rolling Security appender
#
log4j.appender.DRFAS=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
log4j.appender.DRFAS.layout=org.apache.log4j.PatternLayout
log4j.appender.DRFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
log4j.appender.DRFAS.DatePattern=.yyyy-MM-dd
#
# hadoop configuration logging
#
# Uncomment the following line to turn off configuration deprecation warnings.
# log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN
#
# hdfs audit logging
#
hdfs.audit.logger=INFO,NullAppender
hdfs.audit.log.maxfilesize=256MB
hdfs.audit.log.maxbackupindex=20
log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log
log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}
#
# mapred audit logging
#
mapred.audit.logger=INFO,NullAppender
mapred.audit.log.maxfilesize=256MB
mapred.audit.log.maxbackupindex=20
log4j.logger.org.apache.hadoop.mapred.AuditLogger=${mapred.audit.logger}
log4j.additivity.org.apache.hadoop.mapred.AuditLogger=false
log4j.appender.MRAUDIT=org.apache.log4j.RollingFileAppender
log4j.appender.MRAUDIT.File=${hadoop.log.dir}/mapred-audit.log
log4j.appender.MRAUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.MRAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
log4j.appender.MRAUDIT.MaxFileSize=${mapred.audit.log.maxfilesize}
log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex}
# Custom Logging levels
#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
#log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=DEBUG
# Jets3t library
log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
# AWS SDK & S3A FileSystem
log4j.logger.com.amazonaws=ERROR
log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR
log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN
#
# Event Counter Appender
# Sends counts of logging messages at different severity levels to Hadoop Metrics.
#
log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter
#
# Job Summary Appender
#
# Use following logger to send summary to separate file defined by
# hadoop.mapreduce.jobsummary.log.file :
# hadoop.mapreduce.jobsummary.logger=INFO,JSA
#
hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger}
hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log
hadoop.mapreduce.jobsummary.log.maxfilesize=256MB
hadoop.mapreduce.jobsummary.log.maxbackupindex=20
log4j.appender.JSA=org.apache.log4j.RollingFileAppender
log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file}
log4j.appender.JSA.MaxFileSize=${hadoop.mapreduce.jobsummary.log.maxfilesize}
log4j.appender.JSA.MaxBackupIndex=${hadoop.mapreduce.jobsummary.log.maxbackupindex}
log4j.appender.JSA.layout=org.apache.log4j.PatternLayout
log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger}
log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false
#
# Yarn ResourceManager Application Summary Log
#
# Set the ResourceManager summary log filename
yarn.server.resourcemanager.appsummary.log.file=rm-appsummary.log
# Set the ResourceManager summary log level and appender
yarn.server.resourcemanager.appsummary.logger=${hadoop.root.logger}
#yarn.server.resourcemanager.appsummary.logger=INFO,RMSUMMARY
# To enable AppSummaryLogging for the RM,
# set yarn.server.resourcemanager.appsummary.logger to
# <LEVEL>,RMSUMMARY in hadoop-env.sh
# Appender for ResourceManager Application Summary Log
# Requires the following properties to be set
# - hadoop.log.dir (Hadoop Log directory)
# - yarn.server.resourcemanager.appsummary.log.file (resource manager app summary log filename)
# - yarn.server.resourcemanager.appsummary.logger (resource manager app summary log level and appender)
log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=${yarn.server.resourcemanager.appsummary.logger}
log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=false
log4j.appender.RMSUMMARY=org.apache.log4j.RollingFileAppender
log4j.appender.RMSUMMARY.File=${hadoop.log.dir}/${yarn.server.resourcemanager.appsummary.log.file}
log4j.appender.RMSUMMARY.MaxFileSize=256MB
log4j.appender.RMSUMMARY.MaxBackupIndex=20
log4j.appender.RMSUMMARY.layout=org.apache.log4j.PatternLayout
log4j.appender.RMSUMMARY.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
# HS audit log configs
#mapreduce.hs.audit.logger=INFO,HSAUDIT
#log4j.logger.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=${mapreduce.hs.audit.logger}
#log4j.additivity.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=false
#log4j.appender.HSAUDIT=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.HSAUDIT.File=${hadoop.log.dir}/hs-audit.log
#log4j.appender.HSAUDIT.layout=org.apache.log4j.PatternLayout
#log4j.appender.HSAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
#log4j.appender.HSAUDIT.DatePattern=.yyyy-MM-dd
# Http Server Request Logs
#log4j.logger.http.requests.namenode=INFO,namenoderequestlog
#log4j.appender.namenoderequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.namenoderequestlog.Filename=${hadoop.log.dir}/jetty-namenode-yyyy_mm_dd.log
#log4j.appender.namenoderequestlog.RetainDays=3
#log4j.logger.http.requests.datanode=INFO,datanoderequestlog
#log4j.appender.datanoderequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.datanoderequestlog.Filename=${hadoop.log.dir}/jetty-datanode-yyyy_mm_dd.log
#log4j.appender.datanoderequestlog.RetainDays=3
#log4j.logger.http.requests.resourcemanager=INFO,resourcemanagerrequestlog
#log4j.appender.resourcemanagerrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.resourcemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-resourcemanager-yyyy_mm_dd.log
#log4j.appender.resourcemanagerrequestlog.RetainDays=3
#log4j.logger.http.requests.jobhistory=INFO,jobhistoryrequestlog
#log4j.appender.jobhistoryrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.jobhistoryrequestlog.Filename=${hadoop.log.dir}/jetty-jobhistory-yyyy_mm_dd.log
#log4j.appender.jobhistoryrequestlog.RetainDays=3
#log4j.logger.http.requests.nodemanager=INFO,nodemanagerrequestlog
#log4j.appender.nodemanagerrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.nodemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-nodemanager-yyyy_mm_dd.log
#log4j.appender.nodemanagerrequestlog.RetainDays=3
日志解析
日志存储于 HDFS 中,一行一条日志,解析出操作行为中具体的 key-value 值,然后进行解码操作。
IP地址解析/补全
浏览器信息解析
HBase rowkey 设计
注意规则
:尽可能的短小,占用内存少,尽可能的均匀分布。(即散列)
HBase 表的创建
使用 Java API 创建。
关键类:
LoggerUtil.java
示例代码如下:
package com.z.transformer.util;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import com.z.transformer.common.EventLogConstants;
import com.z.transformer.util.IPSeekerExt.RegionInfo;
import cz.mallat.uasparser.UserAgentInfo;
public class LoggerUtil {
// 日志输出提示
private static final Logger logger = Logger.getLogger(LoggerUtil.class);
/**
* 解析给定的日志行,如果解析成功返回一个有值的 map 集合,如果解析失败,返回一个 empty 集合
*
* @param logText
* @return
*/
public static Map<String, String> handleLogText(String logText) {
Map<String, String> result = new HashMap<String, String>();
// 1、开始解析
// hadoop 集群中默认只有 org.apache.commons.lang.StringUtils 所在的 jar 包,如果使用其他
// StringUtils,hadoop 集群中需要导入该 StringUtils 依赖的 jar 包方可使用
if (StringUtils.isNotBlank(logText)) {
// 日志行非空,可以进行解析
String[] splits = logText.trim().split(EventLogConstants.LOG_SEPARTIOR); // 日志分隔符
// ^A
// 192.168.25.102^A1555318954.798^A/what.png?u_nu=1&u_sd=6D4F89C0-E17B-45D0-BFE0-059644C1878D&c_time=......
if (splits.length == 3) {
// 日志格式是正确的,进行解析
String ip = splits[0].trim();
// 将 ip 地址封装进 Map 集合中
result.put(EventLogConstants.LOG_COLUMN_NAME_IP, ip);
long serverTime = TimeUtil.parseNginxServerTime2Long(splits[1].trim());
if (serverTime != -1L) {
// 表示服务器时间解析正确,而且 serverTime 就是对于的毫秒级的时间戳
// 将 serverTime 封装进 Map 集合中
result.put(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, String.valueOf(serverTime));
}
// 获取请求体
String requestBody = splits[2].trim();
int index = requestBody.indexOf("?"); // ? 符号所在的索引位置
if (index >= 0 && index != requestBody.length() - 1) {
// 在请求参数中存在 ?,而且 ? 不是最后一个字符的情况,则截取?后面的内容
requestBody = requestBody.substring(index + 1);
} else {
requestBody = null;
}
if (StringUtils.isNotBlank(requestBody)) {
// 非空,开始处理请求参数
handleRequestBody(result, requestBody);
// 开始补全 ip 地址
RegionInfo info = IPSeekerExt.getInstance().analysisIp(result.get(EventLogConstants.LOG_COLUMN_NAME_IP)); // 用户ip地址
if (info != null) {
result.put(EventLogConstants.LOG_COLUMN_NAME_COUNTRY, info.getCountry()); // 国家
result.put(EventLogConstants.LOG_COLUMN_NAME_PROVINCE, info.getProvince()); // 省份
result.put(EventLogConstants.LOG_COLUMN_NAME_CITY, info.getCity()); // 城市
}
// 开始补全浏览器信息
UserAgentInfo uaInfo = UserAgentUtil.analyticUserAgent(result.get(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT)); // 浏览器user agent参数
if (uaInfo != null) {
// 浏览器名称
result.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, uaInfo.getUaFamily()); // 浏览器名称
// 浏览器版本号
result.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, uaInfo.getBrowserVersionInfo()); // 浏览器版本
// 浏览器所在操作系统
result.put(EventLogConstants.LOG_COLUMN_NAME_OS_NAME, uaInfo.getOsFamily()); // 操作系统名称
// 浏览器所在操作系统的版本
result.put(EventLogConstants.LOG_COLUMN_NAME_OS_VERSION, uaInfo.getOsName()); // 操作系统版本
}
} else {
// logger
logger.debug("请求参数为空:" + logText);
result.clear(); // 清空
}
} else {
// log记录一下
logger.debug("日志行内容格式不正确:" + logText);
}
} else {
logger.debug("日志行内容为空,无法进行解析:" + logText);
}
return result;
}
/**
* 处理请求参数<br/>
* 处理结果保存到参数 result 集合(Map 集合)
*
* @param clientInfo
* 保存最终用户行为数据的 map 集合
* @param requestBody
* 请求参数中,用户行为数据,格式为:
* u_nu=1&u_sd=6D4F89C0-E17B-45D0-BFE0-059644C1878D&c_time=
* 1450569596991&ver=1&en=e_l&pl=website&sdk=js&b_rst=1440*900&
* u_ud=4B16B8BB-D6AA-4118-87F8-C58680D22657&b_iev=Mozilla%2F5.0%
* 20(Windows%20NT%205.1)%20AppleWebKit%2F537.36%20(KHTML%2C%
* 20like%20Gecko)%20Chrome%2F45.0.2454.101%20Safari%2F537.36&l=
* zh-CN&bf_sid=33cbf257-3b11-4abd-ac70-c5fc47afb797_11177014
*/
private static void handleRequestBody(Map<String, String> clientInfo, String requestBody) {
// 将请求参数体按照 & 切割
String[] parameters = requestBody.split("&");
for (String parameter : parameters) {
// 循环处理参数,parameter 格式为: c_time=1450569596991 = 只会出现一次
String[] params = parameter.split("=");
String key, value = null;
try {
// 使用 utf-8 解码
key = URLDecoder.decode(params[0].trim(), "utf-8");
value = URLDecoder.decode(params[1].trim(), "utf-8");
// 添加到结果集合 Map 中
clientInfo.put(key, value);
} catch (Exception e) {
logger.warn("解码失败:" + parameter, e);
}
}
}
}
使用淘宝接口解析IP地址
示例:REST API:http://ip.taobao.com/service/getIpInfo.php?ip=123.125.71.38
限制:10QPS(Query Per Second)
使用第三方 IP 库
通过文件中已经存放的 IP 和地区的映射进行 IP 解析,由于更新不及时,可能会导致某些 IP 解析不正确(小概率事件)。(推荐使用:纯真IP地址数据库)
使用自己的 IP 库
通过第三方的 IP 库,逐渐生成自己的 IP 库,自主管理。
IP 库表设计
startip(起始ip)
endip(结束ip)
country(国家)
province(省份)
city(城市)
尖叫提示
:判断某个 IP 是否在某个地域的起始 IP 和结束 IP 区间。
IP 与 long 的互转的工具类:
示例代码如下:
// 将 127.0.0.1 形式的 IP 地址转换成十进制整数
public long IpToLong(String strIp){
long[] ip = new long[4];
int position1 = strIp.indexOf(".");
int position2 = strIp.indexOf(".", position1 + 1);
int position3 = strIp.indexOf(".", position2 + 1);
// 将每个.之间的字符串转换成整型
ip[0] = Long.parseLong(strIp.substring(0, position1));
ip[1] = Long.parseLong(strIp.substring(position1 + 1, position2 - position1 - 1));
ip[2] = Long.parseLong(strIp.substring(position2 + 1, position3 - position2 - 1));
ip[3] = Long.parseLong(strIp.substring(position3 + 1));
// 进行左移位处理
return (ip[0] << 24) + (ip[1] << 16) + (ip[2] << 8) + ip[3];
}
// 将十进制整数形式转换成 127.0.0.1 形式的 ip 地址
public String LongToIp(long ip) {
StringBuilder sb = new StringBuilder();
// 直接右移 24 位
sb.append(ip >> 24);
sb.append(".");
// 将高 8 位置 0,然后右移 16
sb.append((ip & 0x00FFFFFF) >> 16);
sb.append(".");
// 将高 16 位置0 ,然后右移 8 位
sb.append((ip & 0x0000FFFF) >> 8);
sb.append(".");
// 将高 24 位置 0
sb.append((ip & 0x000000FF));
return sb.toString();
}
新建类:
AnalysisDataMapper.java
AnalysisDataRunner.java
目标:读取 HDFS 中的数据,清洗后写入到 HBase 中。
核心思路梳理:
Map<String, String>
集合 clientInfo。handleEventData(Map<String, String> clientInfo, EventEnum event, Context context, Text value)
方法处理事件。filterEventData(Map<String, String> clientInfo, EventEnum event)
方法过滤。
过滤规则:如果是 java_server 过来的数据,则会员 id 必须存在,如果是 website 过来的数据,则会话 id 和用户 id 必须存在。
outPutData(Map<String, String> clientInfo, Context context)
。generateRowKey(String uuid, long serverTime, Map<String, String> clientInfo)
,通过该方法生成的 rowKey 之后,添加内容到 HBase 表中。示例代码如下:
AnalysisDataMapper.java
package com.z.transformer.mr.etl;
import java.io.IOException;
import java.util.Map;
import java.util.zip.CRC32;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.EventLogConstants.EventEnum;
import com.z.transformer.util.LoggerUtil;
import com.z.transformer.util.TimeUtil;
public class AnalysisDataMapper extends Mapper<Object, Text, NullWritable, Put> {
// Object 是偏移量,Text 表示输入,NullWritable, Put 可以互换
// 如果无法处理给定的事件类型,则使用 log4j 输出, Logger 可以在运行 jar 包的控制台输出
private static final Logger logger = Logger.getLogger(AnalysisDataMapper.class);
private CRC32 crc1 = null;
private CRC32 crc2 = null;
private byte[] family = null;
private long currentDayInMills = -1;
/**
* 初始化数据
*/
@Override
protected void setup(Mapper<Object, Text, NullWritable, Put>.Context context)
throws IOException, InterruptedException {
crc1 = new CRC32();
crc2 = new CRC32();
this.family = EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME;
currentDayInMills = TimeUtil.getTodayInMillis();
}
// 1、覆写 map 方法
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 2、将原始数据通过 LoggerUtil 解析成 Map 键值对
Map<String, String> clientInfo = LoggerUtil.handleLogText(value.toString());
// 2.1、如果解析失败,则 Map 集合中无数据,通过日志输出当前数据
if (clientInfo.isEmpty()) {
logger.debug("日志解析失败:" + value.toString());
return;
}
// 3、根据解析后的数据,生成对应的 Event 事件类型(通过枚举类型的别名来解析)
EventEnum event = EventEnum.valueOfAlias(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME));
if (event == null) {
// 4、无法处理的事件,直接输出事件类型
logger.debug("无法匹配对应的事件类型:" + clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME));
} else {
// 5、处理具体的事件
handleEventData(clientInfo, event, context, value);
// clientInfo 数据集, event 事件类型, context 上下文(通过上下文写入到HBase), value 当前行的数据(可能会有新的过滤操作)
}
}
/**
* 处理具体的事件的方法
*
* @param clientInfo
* @param event
* @param context
* @param value
* @throws InterruptedException
* @throws IOException
*/
public void handleEventData(Map<String, String> clientInfo, EventEnum event, Context context, Text value)
throws IOException, InterruptedException {
// 6、如果事件成功通过过滤,则准备处理具体事件
if (filterEventData(clientInfo, event)) {
outPutData(clientInfo, context);
} else {
// 如果事件没有通过过滤,则通过日志输出当前数据
logger.debug("事件格式不正确:" + value.toString());
}
}
/**
* 6、如果事件成功通过过滤,则准备处理具体事件(我们的 HBase 只存成功通过过滤的事件)
*
* @param clientInfo
* @param event
* @return
*/
public boolean filterEventData(Map<String, String> clientInfo, EventEnum event) {
// 事件数据全局过滤(具体全局过滤条件视情况而定,这里的 “服务器时间” 和 “平台” 是例子)
boolean result = StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME))
&& StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_PLATFORM));
// 后面几乎全部是&&操作,只要有一个 false,那么该 Event 事件就无法处理
// public static final String PC_WEBSITE_SDK = "website";
// public static final String JAVA_SERVER_SDK = "java_server";
// 先确定平台
switch (clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)) {
// Java Server 平台发来的数据
case EventLogConstants.PlatformNameConstants.JAVA_SERVER_SDK:
result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID)); // 先判断会员 ID 是否存在
// 再确定事件
switch (event) {
case CHARGEREFUND:
// 退款事件
// ......
break;
case CHARGESUCCESS:
// 订单支付成功事件
result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_ID));
break;
default:
logger.debug("无法处理指定事件:" + clientInfo);
result = false;
break;
}
break;
// WebSite 平台发来的数据
case EventLogConstants.PlatformNameConstants.PC_WEBSITE_SDK:
// 再确定事件
switch (event) {
case CHARGEREQUEST:
// 下单事件
result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_ID))
&& StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_CURRENCY_TYPE))
&& StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_PAYMENT_TYPE))
&& StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_CURRENCY_AMOUNT));
break;
case EVENT:
// Event 事件
result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_CATEGORY))
&& StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_ACTION));
break;
case LAUNCH:
// Launch 访问事件
// ......
break;
case PAGEVIEW:
// PV 事件
result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_CURRENT_URL));
break;
default:
logger.debug("无法处理指定事件:" + clientInfo);
result = false;
break;
}
break;
default:
result = false;
logger.debug("无法确定的数据来源:" + clientInfo);
break;
}
return result;
}
/**
* 7 和 8、如果事件成功通过过滤,则输出事件到 HBase 的方法
*
* @param clientInfo
* @param context
* @throws IOException
* @throws InterruptedException
*/
public void outPutData(Map<String, String> clientInfo, Context context) throws IOException, InterruptedException {
String uuid = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
long serverTime = Long.valueOf(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME));
// 因为浏览器信息已经解析完成,所以此时删除原始的浏览器信息
clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT);
// 创建 rowKey
byte[] rowkey = generateRowKey(uuid, serverTime, clientInfo);
Put put = new Put(rowkey);
for (Map.Entry<String, String> entry : clientInfo.entrySet()) {
if (StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue())) {
put.addColumn(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
}
}
context.write(NullWritable.get(), put);
}
/**
* 9、为向 HBase 中写入数据依赖 Put 对象,Put 对象的创建依赖 RowKey,所以如下方法
*
* rowKey=时间+uuid的crc32编码+数据内容的hash码的crc32编码
*
* @return
*/
public byte[] generateRowKey(String uuid, long serverTime, Map<String, String> clientInfo) {
// 先清空 crc1 和 crc2 集合中的数据内容
crc1.reset();
crc2.reset();
// 时间=当前数据访问服务器的时间-当天00:00点的时间戳 ,得到最大值是8位数字=3600*24*1000=86400000 ,可以用int来存储,大小是 4个字节
byte[] timeBytes = Bytes.toBytes(serverTime - this.currentDayInMills);
// uuid 的 crc 编码
if (StringUtils.isNotBlank(uuid)) {
this.crc1.update(Bytes.toBytes(uuid));
}
byte[] uuidBytes = Bytes.toBytes(this.crc1.getValue());
// 数据内容的 hash 码的 crc 编码
this.crc2.update(Bytes.toBytes(clientInfo.hashCode()));
byte[] clientInfoBytes = Bytes.toBytes(this.crc2.getValue());
// 综合字节数组
byte[] buffer = new byte[timeBytes.length + uuidBytes.length + clientInfoBytes.length];
// 数组合并
System.arraycopy(timeBytes, 0, buffer, 0, timeBytes.length);
System.arraycopy(uuidBytes, 0, buffer, timeBytes.length, uuidBytes.length);
System.arraycopy(clientInfoBytes, 0, buffer, uuidBytes.length, clientInfoBytes.length);
return buffer;
}
}
AnalysisDataRunner.java
package com.z.transformer.mr.etl;
import java.io.File;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.GlobalConstants;
import com.z.transformer.util.TimeUtil;
public class AnalysisDataRunner implements Tool {
private Configuration conf = null;
public static void main(String[] args) {
try {
int resultCode = ToolRunner.run(new AnalysisDataRunner(), args);
if (resultCode == 0) {
System.out.println("Success!");
} else {
System.out.println("Fail!");
}
System.exit(resultCode);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void setConf(Configuration conf) {
// 先实例化 Configuration
this.conf = HBaseConfiguration.create(conf);
}
@Override
public Configuration getConf() {
// 全局的访问方法
return this.conf;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
// 处理传入的时间参数,默认或不合法时间则直接使用昨天日期
this.processArgs(conf, args);
// 开始创建 Job
Job job = Job.getInstance(conf, "Event-ETL");
// 设置 Job 参数
job.setJarByClass(AnalysisDataRunner.class);
// Mapper 参数设置
job.setMapperClass(AnalysisDataMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
// Reducer 参数设置
job.setNumReduceTasks(0);
// 设置数据输入
initJobInputPath(job);
// 设置输出到 HBase 的信息
initHBaseOutPutConfig(job);
// job.setJar("target/transformer-0.0.1-SNAPSHOT.jar");
// Job 提交
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* 初始化 Job 数据输入目录
*
* @param job
* @throws IOException
*/
private void initJobInputPath(Job job) throws IOException {
Configuration conf = job.getConfiguration();
// 获取要执行ETL操作的那一天的数据
String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); // 2017-08-14
// 格式化 HDFS 文件路径
String hdfsPath = TimeUtil.parseLong2String(TimeUtil.parseString2Long(date), "yyyy/MM/dd");// 2017/08/14
if (GlobalConstants.HDFS_LOGS_PATH_PREFIX.endsWith("/")) {
hdfsPath = GlobalConstants.HDFS_LOGS_PATH_PREFIX + hdfsPath; // /event-logs/2017/08/14
} else {
hdfsPath = GlobalConstants.HDFS_LOGS_PATH_PREFIX + File.separator + hdfsPath; // /event-logs/2017/08/14
// File.separator 的作用是:根据当前操作系统获取对应的文件分隔符,windows中是 \ ,Linux中是 /
}
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(hdfsPath);
if (fs.exists(inPath)) {
FileInputFormat.addInputPath(job, inPath);
} else {
throw new RuntimeException("HDFS 中该文件目录不存在:" + hdfsPath);
}
}
/**
* 设置输出到 HBase 的一些操作选项
*
* @throws IOException
*/
private void initHBaseOutPutConfig(Job job) throws IOException {
Configuration conf = job.getConfiguration();
// 获取要执行ETL操作的那一天的数据
String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); // 2017-08-14
// 格式化 HBase 表的后缀名
String tableNameSuffix = TimeUtil.parseLong2String(TimeUtil.parseString2Long(date), TimeUtil.HBASE_TABLE_NAME_SUFFIX_FORMAT); // 20170814
// 构建表名
String tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS + tableNameSuffix; // event_logs20170814
// 指定输出(初始化 ReducerJob)
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
Connection conn = null;
Admin admin = null;
// 使用 HBase 的新 API
conn = ConnectionFactory.createConnection(conf);
admin = conn.getAdmin();
// 创建表描述器(即通过表名实例化表描述器)
TableName tn = TableName.valueOf(tableName);
HTableDescriptor htd = new HTableDescriptor(tn);
// 设置列族
htd.addFamily(new HColumnDescriptor(EventLogConstants.EVENT_LOGS_FAMILY_NAME));
// 判断表是否存在
if (admin.tableExists(tn)) {
// 存在,则删除
if (admin.isTableEnabled(tn)) {
// 先将表设置为不可用
admin.disableTable(tn);
}
// 再删除表
admin.deleteTable(tn);
}
// 创建表,在创建的过程中可以考虑预分区操作
// 假设预分区为 3个分区
// byte[][] keySplits = new byte[3][];
// keySplits[0] = Bytes.toBytes("1"); // (-∞, 1]
// keySplits[1] = Bytes.toBytes("2"); // (1, 2]
// keySplits[2] = Bytes.toBytes("3"); // (2, ∞]
// admin.createTable(htd, keySplits);
admin.createTable(htd);
admin.close();
}
/**
* 处理时间参数,如果没有传递参数的话,则默认清洗前一天的。
*
* Job脚本如下: bin/yarn jar ETL.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2017-08-14
*
* @param args
*/
private void processArgs(Configuration conf, String[] args) {
String date = null;
for (int i = 0; i < args.length; i++) {
if ("-date".equals(args[i])) { // 找到 "-date" 标记
date = args[i + 1]; // 获取时间
break;
}
}
if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) {
// 如果没有传递参数,默认清洗昨天的数据然后存储到 HBase 中
date = TimeUtil.getYesterday();
}
// 将要清洗的目标时间字符串保存到 conf 对象中(这样全局中就可以引用)
conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date);
}
}
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir -p /event-logs/2015/12/20
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put /opt/software/20151220.log /event-logs/2015/12/20
方案一:
修改 etc/hadoop/hadoop-env.sh 中的 HADOOP_CLASSPATH 配置信息。
例如:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
方案二:
使用 maven 插件:maven-shade-plugin,将第三方依赖的 jar 全部打包进去,需要在 pom.xml 中配置依赖。参考【章节 十七、工具代码导入】中的 pom.xml 文件。
参数设置:
1、-P local clean package(不打包第三方jar)
2、-P dev clean package install(打包第三方jar)(推荐使用这种)
打包成功后,将打好的 jar 包上传至 Linux 上,然后执行命令,如下:
/opt/module/hadoop-2.7.2/bin/yarn jar /opt/software/transformer-0.0.1-SNAPSHOT.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2015-12-20
测试成功!截图如下:
1、控制台
2、HBase 网页端:http://hadoop102:16010/master-status
3、历史服务器:http://hadoop102:19888/jobhistory/attempts/job_1555404378493_0005/m/SUCCESSFUL
尖叫提示
:如果在打包的过程中 org.apache.maven.plugins 其中没有包含所依赖的 jar 包,则需要在 HADOOP_CLASSPATH 添加所依赖的 jar 文件。
例如:编写代码依赖了 HBase,但是打包 MR 任务的时候,没有 include HBase 的相关 jar,则需要在命令行中执行如下命令:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
在执行代码之前,我们先手动删除 hbase 上的表和命名空间,命令如下:
hbase(main):002:0> disable 'event_logs20151220'
hbase(main):003:0> drop 'event_logs20151220'
hbase(main):005:0> drop_namespace 'ns_ct'
问题:当我们查看历史服务器中的 Logs 日志时,发现一个解码失败异常:java.lang.IllegalArgumentException: URLDecoder: Incomplete trailing escape (%) pattern,如下图所示:
解决问题链接:https://cloud.tencent.com/developer/article/1417287
前提:需要在 Linux 中对 Mysql 的访问授权。
grant all on *.* to root@'%' identified by '123456';
flush privileges;
exit;