package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
public class myWordCount {
public static class Map extends Mapper
<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer
<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapreduce.job.queuename", "apg_p7");
System.out.println("This is a new version");
Job job = new Job(conf);
job.setJarByClass(myWordCount.class);
job.setJobName("myWordCount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(myWordCount.Map.class);
job.setCombinerClass(myWordCount.Reduce.class);
job.setReducerClass(myWordCount.Reduce.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
There are some changes in the arguments of map and reduce, as well as the settings in main. To run the program under mapreduce, the following steps needs to be done.
1. Put source code under this location
/project/src/org/myorg/myWordCount.java
2. Compile java code
mkdir /project/class;
cd /project;
javac -classpath `yarn classpath` -d ./class ./src/org/myorg/*.java
3. Create manifest.txt file
cd project/class;
vim manifest.txt;
The content of manifest.txt is
Main-Class: org.myorg.myWordCount
Leave an empty line at the end of manifest.txt3. Generate jar file
jar -cvmf manifest.txt myWordCount.jar org
flag meaning:c: Indicates that you want to create a jar file.
v: Produces verbose output on stdout while the JAR file is being built. The verbose output tells you the name of each file as it's added to the JAR file.
m: Used to include manifest information from an existing manifest file. The format for using this option is: jar cmf existing-manifest jar-file input-file(s)
f: The f option indicates that you want the output to go to a jar file rather than to stdout.
4. Put input data on HDFS
mkdir input
echo "hadoop is fast" > input/file1
echo "Hadoop is amazing" > input/file2
hadoop fs -put input /user/hadoop
5. Run the program
hadoop jar myWordCount.jar /user/hadoop/input /user/hadoop/output
Note:
Sometimes I met with this error:
14/12/05 03:59:03 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/12/05 03:59:03 WARN mapreduce.JobSubmitter: No job jar file set. User classes may not be found. See Job or Job#setJar(String).
This is because previously I need to run some Hadoop java class files directly. In order to run them, I have to `export HADOOP_CLASSPATH=<Location of java class file>`. When run jar files, I need to `unset HADOOP_CLASSPATH`, then the error is gone.
References:
Mapreduce 1.2 tutorial
Mapreduce 2.5 tutorial
Stackoverflow