Python入门到实战 tws mirror 多线程 numpy 金融信贷 methods printing jaxb Keys.js 后台界面模板 建站一条龙 河南普通话报名 bitlocker加密好慢 mysql分页查询sql语句 less的比较级 安装python教程 python安装教程 python用什么数据库 java基础教学 java中tostring方法 java调用接口 shell编程学习 linux镜像安装 易语言进度条 淘宝自动发货软件 mathcad15 网卡驱动安装包 经典雅黑 quickchm android应用开发入门 ass转srt 程序流程图软件 jquery添加样式 pandas中文手册 音乐制作器 origin柱状图 getdata软件 qq个人文件夹清理 instagram怎么读 ps魔棒工具在哪
当前位置: 首页 > 学习教程  > 编程语言

大数据知识专栏 - MapReduce 的自定义分组求TopN

2021/1/28 23:54:03 文章标签:

自定义分组求取topN 分组是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定…

自定义分组求取topN

分组是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义分组实现不同的key作为同一个组,调用一次reduce逻辑

3.1 需求

有如下订单数据

订单id商品id成交金额
Order_0000001Pdt_01222.8
Order_0000001Pdt_0525.8
Order_0000002Pdt_03522.8
Order_0000002Pdt_04122.4
Order_0000002Pdt_05722.4
Order_0000003Pdt_01222.8

现在需要求出每一个订单中成交金额最大的一笔交易

3.2 分析

1、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

2、在reduce端利用分组将订单id相同的kv聚合成组,然后取第一个即是最大值

3.3 实现

**第一步:**定义OrderBean

定义一个OrderBean,里面定义两个字段,第一个字段是我们的orderId,第二个字段是我们的金额(注意金额一定要使用Double或者DoubleWritable类型,否则没法按照金额顺序排序)

public class OrderBean  implements WritableComparable<OrderBean>{
    private  String orderId;
    private  Double price;

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return  orderId + "\t" + price;
    }

    //指定排序规则
    @Override
    public int compareTo(OrderBean orderBean) {
        //先比较订单ID,如果订单ID一致,则排序订单金额(降序)
        int i = this.orderId.compareTo(orderBean.orderId);
        if(i == 0){
            i = this.price.compareTo(orderBean.price) * -1;
        }
        return i;
    }

    //实现对象的序列化
    @Override
    public void write(DataOutput out) throws IOException {
         out.writeUTF(orderId);
         out.writeDouble(price);
    }

    //实现对象反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.price  = in.readDouble();
    }
}

第二步: 定义Mapper类

public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1:拆分行文本数据,得到订单的ID,订单的金额
        String[] split = value.toString().split("\t");

        //2:封装OrderBean,得到K2
        OrderBean orderBean = new OrderBean();
        orderBean.setOrderId(split[0]);
        orderBean.setPrice(Double.valueOf(split[2]));

        //3:将K2和V2写入上下文中
        context.write(orderBean, value);
    }
}

**第三步:**自定义分区

自定义分区,按照订单id进行分区,把所有订单id相同的数据,都发送到同一个reduce中去

public class OrderPartition extends Partitioner<OrderBean,Text> {
    //分区规则: 根据订单的ID实现分区

    /**
     *
     * @param orderBean K2
     * @param text  V2
     * @param i  ReduceTask个数
     * @return 返回分区的编号
     */
    @Override
    public int getPartition(OrderBean orderBean, Text text, int i) {
        return (orderBean.getOrderId().hashCode() & 2147483647) % i;
    }
}

**第四步:**自定义分组

按照我们自己的逻辑进行分组,通过比较相同的订单id,将相同的订单id放到一个组里面去,进过分组之后当中的数据,已经全部是排好序的数据,我们只需要取前topN即可

// 1: 继承WriteableComparator
public class OrderGroupComparator extends WritableComparator {
    // 2: 调用父类的有参构造
    public OrderGroupComparator() {
        super(OrderBean.class,true);
    }

    //3: 指定分组的规则(重写方法)
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //3.1 对形参做强制类型转换
        OrderBean first = (OrderBean)a;
        OrderBean second = (OrderBean)b;

        //3.2 指定分组规则
        return first.getOrderId().compareTo(second.getOrderId());
    }
}

第五步:定义Reducer类

public class GroupReducer extends Reducer<OrderBean,Text,Text,NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        int i = 0;
        //获取集合中的前N条数据
        for (Text value : values) {
            context.write(value, NullWritable.get());
            i++;
            if(i >= 1){
                break;
            }
        }
    }
}

**第六步:**程序main函数入口

public class JobMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //1:获取Job对象
        Job job = Job.getInstance(super.getConf(), "mygroup_job");

        //2:设置job任务
            //第一步:设置输入类和输入路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\mygroup_input"));

            //第二步:设置Mapper类和数据类型
            job.setMapperClass(GroupMapper.class);
            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(Text.class);

            //第三,四,五,六
            //设置分区
            job.setPartitionerClass(OrderPartition.class);
            //设置分组
            job.setGroupingComparatorClass(OrderGroupComparator.class);

            //第七步:设置Reducer类和数据类型
            job.setReducerClass(GroupReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);

            //第八步:设置输出类和输出的路径
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\mygroup_out"));

        //3:等待job任务结束
        boolean bl = job.waitForCompletion(true);



        return bl ? 0: 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        //启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);

        System.exit(run);
    }
}


本文链接: http://www.dtmao.cc/news_show_650325.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?