60分钟

MapReduce 实现 TopN 操作实战

MapReduce实现TopN

实验预计耗时:60分钟

前置实验教程:MapReduce实战

1. 课程背景

1.1 课程目的

在企业数据分析需求中,常常需要根据某字段的不同类型对数据进行TopN统计。TopN就是统计结果排序后,得出的最高的N个类型。以制造业为例,生产车间每天需要对每天生产的不良品类型进行统计并排序,借助柏拉图等图形化展示,优先解决最能影响生产的生产问题,从而提高改善良率的效率。

本实验分步带领学员通过MapReduce开发程序,在腾讯云EMR上实现小猫数据的TopN分析,并熟悉腾讯云EMR服务的基本操作和使用。

1.2 课前知识准备

学习本课程前,学员需要掌握以下前置知识:

1、技术基础

  • Java编程基础:包括数据结构、逻辑语句、函数、面向对象编程以及Maven工程构建。
  • HDFS基本命令的使用:包括文件的移动、文件的上传、文件权限的修改等。
  • MapReduce编程基础:理解MapReduce编程思想和开发基础。

2、相关技术

  • Top N算法如图。首先将输入分区为小块,每个小快发送到一个映射器。每个映射器会创建一个本地top 10列表,然后讲这个本地top 10列表发送到归约器。发出映射器输出时,我们使用了一个归约器键,这样所有映射器的输出都将由一个归约器处理。
  • TopN的Java实现:
    • Java中实现TopN有一种简单方法,可以使用SortedMap<K,V>接口和TreeMap<K,V>。TreeMap作为Map接口有一个重要的实现类,可以实现存储元素的自动排序。在TreeMap中,键值对之间按键有序。
  • TreeMap使用的存储结构是平衡二叉树,也称为红黑树。它首先是一棵二叉树,具有二叉树所有的特性,即树中的任何节点的值大于它的左子节点,且小于它的右子节点,如果是一棵左右完全均衡的二叉树,元素的查找效率将获得极大提高。
  • 同时map.firstKey(),和map.lastKey()方法分别可以输出排好序之后第一个键值和最后一个键值,可以由此得到相对应的value。

2. 实验环境

2.1 实验操作环境

本课程需要以下实验操作环境:

  1. 可以接入互联网的笔记本电脑或者台式机,本实验使用Windows系统
  2. 实验环境:计算机本地+腾讯云控制台
  3. Java软件开发工具包JDK(版本:1.8)
  4. Maven(版本:3.5及以上)
  5. Eclipse或者IDEA,此实验采用IDEA作为开发工具

2.2 实验架构图

本实验将开发的MapReduce程序提交给EMR三节点集群(Master节点和两个Core节点),其架构图如下:

2.3 实验的数据规划表

资源名称

数据

说明

腾讯云账号

账号:XXXXXXXX、密码:XXXXXXXX

涉及产品如下:VPC、EMR

top_N_cat.csv数据集

下载数据集top_N_cat.csv

2.4 实验数据

实验数据共包含三个字段,分别为不良品产生阶段,不良品类型,不良品数量,数据样本如下:

小猫编号

小猫名字

小猫重量

000001

name001

1.11

为指导生产优先改善最突出问题,我们需要统计不良品类型的数量,输出每种不良品类型总不良数的TopN。

3. 实验流程

实验共分为三个步骤,若您的计算机上还没有Java开发环境,请请学习MapReduce实战,准备好开发MapReduce程序需要基本的Java环境。

第一步,我们开始编写TopN程序代码,其中我们将Cat数据封装到Bean中,并在Map和Reduce阶段分别借助TreeMap来进行排序和剔除。项目代码确认无误后可以借助Maven将项目封装成一个jar包,等待后面步骤的计算使用。

随后我们需要借助腾讯云弹性MapReduce服务快速搭建一个Hadoop集群,用来运行TopN程序。

集群购买完成且运行成功后,我们将前面步骤生成的jar包上传至主节点,通过hadoop命令进行运行,体验MapReduce在YARN上的运行。

4. 实验步骤

任务1 编写TopN程序

【任务目标】

根据业务需求完成对于小猫数据的TopN项目编程。

【任务步骤】

1、Idea中创建项目

1.打开IDEA后,点击Create New Project

2.选择项目类型为Maven;

3.GroupId为:com.test;ArtifactId为:top-n;

4.Project name为:top_n;

项目创建成功后配置Maven的pom.xml,内容如下;

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>top-n</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <hadoop_version>2.8.5</hadoop_version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop_version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop_version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <executions>
                    <execution>
                        <id>default-compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <encoding>UTF-8</encoding>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>

                    <archive>
                        <manifest>
                            <mainClass>com.test.mr.TopNDriver</mainClass>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                        </manifest>

                    </archive>
                    <classesDirectory>
                    </classesDirectory>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

点击Import ChangesEnable Auto-Import,开始下载依赖(此处建议每次更改POM文件后手动点击Import Changes)。

2、编写TopN程序

1.创建package名:com.test.mr,需要在src-main-java右键,选择【new】-【package】创建。在该package下创建Java类。

创建一个CatBean类,实现WritableComparable,并重写compareTo方法,从而TreeMap可以自动根据此方法对Cat数据进行排序;另外我们补充Getter/Setter以及构造方法等:

package com.test.mr;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CatBean implements WritableComparable<CatBean> {
    private String cat_id;
    private String cat_name;
    private Double cat_weight;

    @Override
    public int compareTo(CatBean cat) {
        int result;
        if (this.cat_weight > cat.getCat_weight()) {
            result = 1;
        }else if (this.cat_weight < cat.getCat_weight()) {
            result = -1;
        }else {
            result = 0;
        }
        return result;

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(cat_id);
        out.writeUTF(cat_name);
        out.writeDouble(cat_weight);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        cat_id = in.readUTF();
        cat_name = in.readUTF();
        cat_weight = in.readDouble();
    }

    public String getCat_id() {
        return cat_id;
    }

    public void setCat_id(String cat_id) {
        this.cat_id = cat_id;
    }

    public String getCat_name() {
        return cat_name;
    }

    public void setCat_name(String cat_name) {
        this.cat_name = cat_name;
    }

    public Double getCat_weight() {
        return cat_weight;
    }

    public void setCat_weight(Double cat_weight) {
        this.cat_weight = cat_weight;
    }

    @Override
    public String toString() {
        return "CatBean{" +
                "cat_id='" + cat_id + '\'' +
                ", cat_name='" + cat_name + '\'' +
                ", cat_weight=" + cat_weight +
                '}';
    }

    public CatBean(String cat_id, String cat_name, Double cat_weight) {
        this.cat_id = cat_id;
        this.cat_name = cat_name;
        this.cat_weight = cat_weight;
    }

    public CatBean() {
    }
}

2.在com.test.mr包内编写TopNMapper类,首先定义一个全局的TreeMap容器,在map阶段实现数据的获取和插入TreeMap操作;在cleanup阶段将TreeMap内的数据写入到上下文context中:

package com.test.mr;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

public class TopNMapper extends Mapper<LongWritable, Text, CatBean, Text> {
    // 定义一个TreeMap作为存储数据的容器(天然按key排序)
    private TreeMap<CatBean, Text> catMap = new TreeMap<CatBean, Text>();
    private CatBean cat;

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        Iterator<CatBean> bean = catMap.keySet().iterator();

        while (bean.hasNext()) {
            CatBean k = bean.next();
            context.write(k, catMap.get(k));
        }

    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        cat = new CatBean();
        Text v = new Text();

        String line = value.toString();
        String[] fields = line.split(",");

        String cat_id = fields[0];
        String cat_name = fields[1];
        Double cat_weight = Double.parseDouble(fields[2]);

        cat.setCat_id(cat_id);
        cat.setCat_name(cat_name);
        cat.setCat_weight(cat_weight);

        v.set(cat_id);
        catMap.put(cat, v);

        if (catMap.size() > 10) {
            catMap.remove(catMap.firstKey());
        }
    }
}

3.在com.test.mr包内编写TopNReducer类,同样借助TreeMap容器将每个Mapper传来的数据进行再次排序,得到最终的Top10:

package com.test.mr;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

public class TopNReducer extends Reducer<CatBean, Text, Text, CatBean> {

    // 定义一个TreeMap作为存储数据的容器(天然按key排序)
    private TreeMap<CatBean, Text> catMap = new TreeMap<CatBean, Text>();

    @Override
    protected void reduce(CatBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            CatBean bean = new CatBean(key.getCat_id(), key.getCat_name(), key.getCat_weight());
            catMap.put(bean, new Text(value));

            if (catMap.size() > 10) {
                catMap.remove(catMap.firstKey());
            }
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        Iterator<CatBean> it = catMap.keySet().iterator();
        while (it.hasNext()) {
            CatBean v = it.next();
            context.write(new Text(catMap.get(v)), v);
        }

    }
}

4.在com.test.mr包内编写TopNDriver类:

package com.test.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TopNDriver {

    public static void main(String[] args) throws Exception {

        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(TopNDriver.class);

        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(TopNMapper.class);
        job.setReducerClass(TopNReducer.class);

        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(CatBean.class);
        job.setMapOutputValueClass(Text.class);

        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CatBean.class);

        // 6 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

完成全部便写后,项目目录如图所示:

3、编译打包

1.代码编写确认后,我们使用Maven对程序生成jar包。

使用IDEA,点击右侧Maven选项卡,点击每一模块下的Lifecycle,双击生命周期中的package,即可开始构建jar包。

package运行结果大致如下,其中我们看到BUILD SUCCESS时,表示构建成功:

2.打包成功后,我们可以在该模块目录的target文件夹下,查看到已有jar包。将jar包移至D盘等待上传EMR。

任务2 EMR集群部署

【任务目标】

借助腾讯云EMR快速搭建一个Hadoop集群,用来运行TopN程序。如已有EMR集群,可跳过本任务。

【任务步骤】

1、EMR集群选购

1.在腾讯云官网,找到弹性MapReduce首页,点击立即购买

2.可用区与软件配置如下:

配置项

配置项说明

计费模式

按量计费

地域/可用区

广州/广州四区(可根据所在地自定义选择)

产品版本

EMR-V3.0.0

必选组件

hadoop、zookeeper、knox

可选组件

确认配置无误后,点击下一步:硬件配置

3.硬件配置如下:

配置项

配置项说明

节点高可用

不启用

Master配置1台

EMR标准型S4 / 2核8G,CBS云盘:100G高效云盘 X 1

Core配置2台

EMR标准型S4 / 2核8G,CBS云盘:100G高效云盘 X 1

集群外网

开启集群Master节点公网

集群网络

新建或选择已有的私有网络

启动高可用选项可以自定义选择,默认是选择的,如果取消需要手动取消选择。由于我们这里的实验环境仅仅是一个学习的实验环境所以这里我们将此选项取消,实际生产中要根据实际环境合理选择是否需要这个配置。

确认硬件配置信息无误后,点击下一步:基础配置

4.基础配置如下:

配置项

配置项说明

集群名称

emr-test

安全组

创建新安全组(或选择已有安全组)

对象存储

不开启

登录密码

EMR集群云主机root用户登录的密码

确认信息无误后,点击购买,会自动跳转至集群页。图中的集群实例状态中显示集群创建中

等待5min左右,集群构建成功,截图如下:

2、第三方工具连接EMR集群

1.复制集群页的主节点外网IP,打开PuTTY创建连接,将复制的外网IP粘贴至Host Name,端口默认22,如图:

2.点击Open,第一次连接会弹出安全警告,点击是(Y)

3.接下在login as:后填写用户名为root,密码为构建EMR的时候设置的密码:

备注:这里只能使用root用户进行连接。

回车确认后,我们即可成功访问主节点实例。输入java进程查看命令jps,可看到应用进程已经启动。

任务3 程序上传并运行

【任务目标】

通过YARN执行TopN程序,并查看任务运行的结果。

【任务步骤】

1、上传程序jar包

1.使用mkdir命令在Master节点创建一个/test目录。

创建文件夹test;

mkdir /test

切换到test路径下;

cd /test

2.找到PuTTY的安装目录,在上方地址栏输入cmd并执行。

3.上传jar包

在弹出的黑窗口首先输入psftp,打开psftp工具用来传输文件;

psftp

接下来连接服务器,回车后需要输入用户名和密码;

open xxx.xxx.xxx.xxx 

用于切换远程Linux 服务器上的目录;

cd /test/

lcd命令用于切换本地的路径;

lcd D:\

上传文件;

put top-n-1.0-SNAPSHOT.jar

命令使用可以参考下图:

上传成功后在Master节点的/test目录查看到上传的jar包。

2、运行jar包

1.下载数据集words.dat到/test目录下。

wget https://course-public-resources-1252758970.cos.ap-chengdu.myqcloud.com/PracticalApplication/202001bigdata/2-1-topn/top_N_cat.csv

4.上传文件。

切换至hadoop用户;

su hadoop

字HDFS中创建一个/test文件夹;

hdfs dfs -mkdir /test

上传words.dat文件至hdfs的/test目录下(执行时没有任何输出,图示输出仅为日志配置信息);

hdfs dfs -put /test/top_N_cat.csv /test

上传成功后,使用hadoop命令运行jar包,使用命令如下:

# hadoop jar jar包名称.jar 主类 输入文件(HDFS) 输出路径(HDFS)
hadoop jar top-n-1.0-SNAPSHOT.jar /test/top_N_cat.csv /test/output

验证:直接查看输出目录及文件结果,使用命令如下:

hdfs dfs -ls /test/output

使用cat命令查看part-r-00000内记录的输出结果;

hdfs dfs -cat /test/output/part-r-00000

查看输出结果内容:

至此,您已完成了实验的全部任务。相信您已经掌握了MapReduce的基本开发和部署流程,您还可以参考官方文档,体验hadoop的其他功能。

5. 注意事项

如实验资源无需保留,请在实验结束后及时销毁,以免产生额外费用。