前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >BigData--MapReduce入门

BigData--MapReduce入门

作者头像
MiChong
发布2020-09-24 16:37:30
3310
发布2020-09-24 16:37:30
举报
文章被收录于专栏:米虫的家

MapReduce入门

一、MapReduce概述

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

1、优点
  • MapReduce易于编程
  • 良好的扩展性
  • 高容错性
  • 适合海量数据的离线处理
2、缺点
  • 不擅长实时计算,无法像MySQL一样,在毫秒或者秒级内返回结果。
  • 不擅长流式计算,MapReduce的输入数据是静态。
  • 不擅长DAG(有向图)计算,如果每个MapReduce作业的输出结果都写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
3、MapReduce核心编程思想
  • 1)分布式的运算程序往往需要分成至少2个阶段。
  • 2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
  • 3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
  • 4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
4、MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:

  • 1)MrAppMaster:负责整个程序的过程调度及状态协调。
  • 2)MapTask:负责Map阶段的整个数据处理流程
  • 3)ReduceTask:负责Reduce阶段的整个数据处理流程。
5、MapReduce编程规范
1) Mapper阶段
2)Reducer阶段
3)Driver阶段

用于提交封装了MapReduce程序相关运行参数的job对象。

二、WordCount案例实操

主要实现的是对文件中单词出现频率的分析,统计出单词出现的次数,这也是官方的示例教程

1、WcMapper ,负责数据的切分

java

代码语言:javascript
复制
package cn.buildworld.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author MiChong
 * @date 2020-05-24 11:32
 */
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text word = new Text();
    private IntWritable one = new IntWritable(1);


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 拿到这行数据
        String line = value.toString();
        //按照空格切分数据
        String[] words = line.split(" ");
        //遍历数组
        for (String word : words) {
            this.word.set(word);
            context.write(this.word, this.one);
        }

    }
}
2、WcReducer,负责数据的统计

java

代码语言:javascript
复制
package cn.buildworld.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author MiChong
 * @date 2020-05-24 11:33
 */
public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable total = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;

        //累加
        for (IntWritable value : values) {
            sum += value.get();
        }

        //包装结果并输出
        total.set(sum);
        context.write(key, this.total);
    }
}
3、WcDriver,代码相对固定,负责提交我们的Job

java

代码语言:javascript
复制
package cn.buildworld.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author MiChong
 * @date 2020-05-24 11:33
 */
public class WcDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1、获取一个job实例
        Job job = Job.getInstance(new Configuration());
        // 2、设置类路径
        job.setJarByClass(WcDriver.class);
        // 3、设置Mapper和Reducer
        job.setMapperClass(WcMapper.class);
        job.setReducerClass(WcReducer.class);

        // 4、设置Mapper和Reducer输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6、设置输入输出数据
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7、提交我们的job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

三、Hadoop序列化

不可以使用Java自带的序列化,要使用自定义bean对象实现序列化接口(Writable)

示例代码

java

代码语言:javascript
复制
package cn.buildworld.mapreduce.flow;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author MiChong
 * @date 2020-05-24 18:15
 */

/**
 * 必须实现Writable接口
 */
public class FlowBean implements Writable {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    /**
     * 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
     */
    public FlowBean() {
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    /**
     * 最后会按照下面的格式显示在输出文件中
     *
     * 要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。
     * @return
     */
    @Override
    public String toString() {
        return "FlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", sumFlow=" + sumFlow +
                '}';
    }

    /**
     * 重写序列化方法
     *
     *注意反序列化的顺序和序列化的顺序完全一致
     *
     * @param out 框架给我们提供的数据出口
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 重写反序列化方法
     *
     * 注意反序列化的顺序和序列化的顺序完全一致
     *
     * @param in 框架给我们提供的数据来源
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-05-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MapReduce入门
    • 一、MapReduce概述
      • 1、优点
      • 2、缺点
      • 3、MapReduce核心编程思想
      • 4、MapReduce进程
      • 5、MapReduce编程规范
    • 二、WordCount案例实操
      • 1、WcMapper ,负责数据的切分
      • 2、WcReducer,负责数据的统计
      • 3、WcDriver,代码相对固定,负责提交我们的Job
    • 三、Hadoop序列化
      • 示例代码
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档