首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase

Using Hadoop for the First Time, MapReduce Job does not run Reduce Phase(首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase)

本文介绍了首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写了一个简单的 map reduce 作业,它会从 DFS 中读取数据并在其上运行一个简单的算法.在尝试调试它时,我决定简单地让映射器输出一组键和值,而减速器输出完全不同的一组.我在单节点 Hadoop 20.2 集群上运行此作业.作业完成后,输出仅包含映射器输出的值,使我相信减速器没有运行.如果有人对我的代码为什么会产生这样的输出提供任何见解,我将不胜感激.我尝试将 outputKeyClass 和 outputValueClass 设置为不同的事物,并将 setMapOutputKeyClass 和 setMapOutputValueClass 设置为不同的事物.目前,注释我们的代码部分是我正在运行的算法,但我已经更改了 map 和 reduce 方法以简单地输出某些值.同样,作业的输出仅包含映射器输出的值.这是我用来运行作业的类:

I wrote a simple map reduce job that would read in data from the DFS and run a simple algorithm on it. When trying to debug it I decided to simply make the mappers output a single set of keys and values, and the reducers output an entirely different set. I am running this job on a single node Hadoop 20.2 cluster. When the job is finished the output contains simply the values that were outputted by the mappers leading me to believe that the reducer is not being run. I would greatly appreciate it if anyone provide any insight as to why my code is producing such output. I have tried setting the outputKeyClass and outputValueClass to different things as well as the setMapOutputKeyClass and setMapOutputValueClass to different things. Currently the commented our sections of code are the algorithm that I am running, but I have changed the map and reduce methods to simply output certain values. Once again, the output from the job contains only the values that were outputted by the mapper. Here is the class I used to run the job:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class CalculateHistogram {

    public static class HistogramMap extends Mapper<LongWritable, Text, LongWritable, Text> {

        private static final int R = 100;
        private int n = 0;

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            if (n == 0) {
                StringTokenizer tokens = new StringTokenizer(value.toString(), ",");
                int counter = 0;
                while (tokens.hasMoreTokens()) {
                    String token = tokens.nextToken();
                    if (tokens.hasMoreTokens()) {
                        context.write(new LongWritable(-2), new Text("HI"));
                        //context.write(new LongWritable(counter), new Text(token));
                    }
                    counter++;
                    n++;
                }
            } else {
                n++;
                if (n == R) {
                    n = 0;
                }
                
            }
        }
    }

    public static class HistogramReduce extends Reducer<LongWritable, Text, LongWritable, HistogramBucket> {

        private final static int R = 10;

        public void reduce(LongWritable key, Iterator<Text> values, Context context)
                                            throws IOException, InterruptedException {
            if (key.toString().equals("-1")) {
                //context.write(key, new HistogramBucket(key));
            }
            Text t = values.next();
            for (char c : t.toString().toCharArray()) {
                if (!Character.isDigit(c) && c != '.') {
                    //context.write(key, new HistogramBucket(key));//if this isnt a numerical attribute we ignore it
                }
            }
            context.setStatus("Building Histogram");
            HistogramBucket i = new HistogramBucket(key);
            i.add(new DoubleWritable(Double.parseDouble(t.toString())));
            while (values.hasNext()) {
                for (int j = 0; j < R; j++) {
                    t = values.next();
                }
                if (!i.contains(Double.parseDouble(t.toString()))) {
                    context.setStatus("Writing a value to the Histogram");
                    i.add(new DoubleWritable(Double.parseDouble(t.toString())));
                }
            }
            
            context.write(new LongWritable(55555555), new HistogramBucket(new LongWritable(55555555)));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf, "MRDT - Generate Histogram");
        job.setJarByClass(CalculateHistogram.class);
        job.setMapperClass(HistogramMap.class);
        job.setReducerClass(HistogramReduce.class);

        //job.setOutputValueClass(HistogramBucket.class);
        
        //job.setMapOutputKeyClass(LongWritable.class);
        //job.setMapOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

推荐答案

你的reduce方法的签名错误.您的方法签名包含 Iterator<Text>.你必须传递一个 Iterable<Text>.

The signature of your reduce method is wrong. Your method signature contains Iterator<Text>. You have to pass an Iterable<Text>.

您的代码不会覆盖 Reducer 基类的 reduce 方法.因此,使用 Reducer 基类提供的默认实现.这个实现是一个标识函数.

Your code does not override the reduce method of the Reducer base class. Because of this, the default imlementation provided by the Reducer base class is used. This implementation is an identity function.

使用 @Override 注释来预测类似这样的错误.

Use the @Override annotation to anticipate errors like this one.

这篇关于首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase

基础教程推荐