前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >2021年大数据Hadoop(十九):​​​​​​​MapReduce分区

2021年大数据Hadoop(十九):​​​​​​​MapReduce分区

作者头像
Lansonli
发布2021-10-11 15:48:12
5680
发布2021-10-11 15:48:12
举报
文章被收录于专栏:Lansonli技术博客

​​​​​​​MapReduce分区

​​​​​​​分区概述

在 MapReduce 中, 通过我们指定分区, 会将同一个分区的数据发送到同一个Reduce当中进行处理。例如: 为了数据的统计, 可以把一批类似的数据发送到同一个 Reduce 当中, 在同一个 Reduce 当中统计相同类型的数据, 就可以实现类似的数据分区和统计等

其实就是相同类型的数据, 有共性的数据, 送到一起去处理, 在Reduce过程中,可以根据实际需求(比如按某个维度进行归档,类似于数据库的分组),把Map完的数据Reduce到不同的文件中。分区的设置需要与ReduceTaskNum配合使用。比如想要得到5个分区的数据结果。那么就得设置5个ReduceTask。

需求:将以下数据进行分开处理

详细数据参见partition.csv  这个文本文件,其中第五个字段表示开奖结果数值,现在需求将15以上的结果以及15以下的结果进行分开成两个文件进行保存

​​​​​​​分区步骤

1、定义 Mapper

这个 Mapper 程序不做任何逻辑, 也不对 Key-Value 做任何改变, 只是接收数据, 然后往下发送

代码语言:javascript
复制
public class MyMapper extends Mapper<LongWritable,Text,Text,NullWritable>{

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        context.write(value,NullWritable.get());

    }

}

2、自定义Partitioner

主要的逻辑就在这里, 这也是这个案例的意义, 通过 Partitioner 将数据分发给不同的 Reducer

代码语言:javascript
复制
/**

 * 这里的输入类型与我们map阶段的输出类型相同

 */

public class MyPartitioner extends Partitioner<Text,NullWritable>{

    /**

     * 返回值表示我们的数据要去到哪个分区

     * 返回值只是一个分区的标记,标记所有相同的数据去到指定的分区

     */

    @Override

    public int getPartition(Text text, NullWritable nullWritable, int i) {

        String result = text.toString().split("\t")[5];

        if (Integer.parseInt(result) > 15){

            return 1;

        }else{

            return 0;

        }

    }

}

3、定义 Reducer 逻辑

这个 Reducer 也不做任何处理, 将数据原封不动的输出即可

代码语言:javascript
复制
public class MyReducer extends Reducer<Text,NullWritable,Text,NullWritable> {

    @Override

    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

        context.write(key,NullWritable.get());

    }

}

4、主类中设置分区类和ReduceTask个数

代码语言:javascript
复制
public class PartitionerRunner {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //1、创建建一个job任务对象

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration, "mypartitioner");



        //2、指定job所在的jar包

        job.setJarByClass(PartitionerRunner.class);



        //3、指定源文件的读取方式类和源文件的读取路径

        job.setInputFormatClass(TextInputFormat.class); //按照行读取

        TextInputFormat.addInputPath(job, new Path("hdfs://node1:8020/input/partitioner")); //只需要指定源文件所在的目录即可

        // TextInputFormat.addInputPath(job, new Path("file:///E:\\input\\partitioner")); //只需要指定源文件所在的目录即可



        //4、指定自定义的Mapper类和K2、V2类型

        job.setMapperClass(PartitionerMapper.class); //指定Mapper类

        job.setMapOutputKeyClass(Text.class); //K2类型

        job.setMapOutputValueClass(NullWritable.class);//V2类型



        //5、指定自定义分区类(如果有的话)

        job.setPartitionerClass(MyPartitioner.class);

        //6、指定自定义分组类(如果有的话)

        //7、指定自定义的Reducer类和K3、V3的数据类型

        job.setReducerClass(PartitionerReducer.class); //指定Reducer类

        job.setOutputKeyClass(Text.class); //K3类型

        job.setOutputValueClass(NullWritable.class);  //V3类型





        //设置Reduce的个数

        job.setNumReduceTasks(2);



        //8、指定输出方式类和结果输出路径

        job.setOutputFormatClass(TextOutputFormat.class);

        TextOutputFormat.setOutputPath(job, new  Path("hdfs://node1:8020/output/partitioner")); //目标目录不能存在,否则报错

        //TextOutputFormat.setOutputPath(job, new  Path("file:///E:\\output\\partitoner")); //目标目录不能存在,否则报错



        //9、将job提交到yarn集群

        boolean bl = job.waitForCompletion(true); //true表示可以看到任务的执行进度



        //10.退出执行进程

        System.exit(bl?0:1);

    }

}

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/05/30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ​​​​​​​MapReduce分区
    • ​​​​​​​分区概述
      • ​​​​​​​分区步骤
        • 1、定义 Mapper
        • 2、自定义Partitioner
        • 3、定义 Reducer 逻辑
        • 4、主类中设置分区类和ReduceTask个数
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档