hadoop实例3:合并相同数据

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* 需要把相同订单id的记录放在一个文件中,并以订单id命名
* @author Administrator
*
*/
public class MultipleOutputTest {

static class SortMapper extends
Mapper<LongWritable, Text, Text, Text> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
try {
System.out.println(“Before Mapper: ” + value + “, ” + value);
String line = value.toString();
String[] fields = line.split(“,”);
System.out.println(“fields[0]: ” + fields[0] + “, fields[2]=”
+ Double.parseDouble(fields[2]));
context.write(new Text(fields[0]), value);
System.out.println(“After Mapper: “);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

static class SortReducer extends Reducer<Text, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> multipleOutputs;

@Override
protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
System.out.print(“Before Reduce: ” + key + “, ” + key);
for (Text value : values) {
multipleOutputs.write(NullWritable.get(), value, key.toString());
}
}

@Override
protected void cleanup(
org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
multipleOutputs.close();
}

// 主函数
public static void main(String[] args) throws Exception {
// 获取配置参数
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
// 检查命令语法
if (otherArgs.length != 2) {
System.err.println(“Usage: Dedup <in> <out>”);
System.exit(2);
}
// 定义作业对象
Job job = new Job(conf, “multiple”);
// 注册分布式类
job.setJarByClass(MultipleOutputTest.class);
// 注册输出格式类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 注册Mapper类
job.setMapperClass(SortMapper.class);
// 注册Reducer类
job.setReducerClass(SortReducer.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 运行程序
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注