hadoop实例2:分析学生成绩(最高、最低、平均分)

输入数据:

computer,huangxiaoming,85
computer,xuzheng,54
computer,huangbo,86
computer,liutao,85
computer,huanglei,99
computer,liujialing,85
computer,liuyifei,75
computer,huangdatou,48
computer,huangjiaju,88
computer,huangzitao,85
english,zhaobenshan,57
english,liuyifei,85
english,liuyifei,76
english,huangdatou,48
english,zhouqi,85
english,huangbo,85
english,huangxiaoming,96
english,huanglei,85
english,liujialing,75
algorithm,liuyifei,75
algorithm,huanglei,76
algorithm,huangjiaju,85
algorithm,liutao,85
algorithm,huangdou,42
algorithm,huangzitao,81
math,wangbaoqiang,85
math,huanglei,76
math,huangjiaju,85
math,liutao,48
math,xuzheng,54
math,huangxiaoming,85
math,liujialing,85

 

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class StudentBean implements Writable{
private String course;
private long maxScore;
private long minScore;
private long avgScore;
private long score;

public StudentBean(){

}

public StudentBean(String course, long score){
this.course = course;
this.score = score;
}

public String getCourse() {
return course;
}

public void setCourse(String course) {
this.course = course;
}

public long getMaxScore() {
return maxScore;
}

public void setMaxScore(long maxScore) {
this.maxScore = maxScore;
}

public long getMinScore() {
return minScore;
}

public void setMinScore(long minScore) {
this.minScore = minScore;
}

public long getAvgScore() {
return avgScore;
}

public void setAvgScore(long avgScore) {
this.avgScore = avgScore;
}

public long getScore() {
return score;
}

public void setScore(long score) {
this.score = score;
}

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
try{

// course = in.readUTF();
/*maxScore = in.readLong();
minScore = in.readLong();
avgScore = in.readLong();*/
score = in.readLong();
}catch(Exception ex){
ex.printStackTrace();
}

}

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
//out.writeBytes(course);
// out.writeLong(maxScore);
// out.writeLong(minScore);
out.writeLong(score);
}

@Override
public String toString() {
// TODO Auto-generated method stub
return “\tmax=” + maxScore + “\tmin=” + minScore + “\tavg=” + avgScore;
}
}

 

import java.io.IOException;
/**
* 需要统计手机用户流量日志,日志内容实例:
要把同一个用户的上行流量、下行流量进行累加,并计算出综合。
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import test.Dedup;
import test.Dedup.Map;
import test.Dedup.Reduce;

public class StudentScore {

static class FlowCountMapper extends
Mapper<LongWritable, Text, Text, StudentBean> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
try {
String line = value.toString();
if (null != line && line.length() > 0) {
String[] fields = line.split(“,”);
String course = fields[0];
long score = Long.parseLong(fields[2]);
System.out.println(“map: course=” + course + ” score=”
+ score);
context.write(new Text(fields[0]), new StudentBean(course,
score));
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

static class FlowCountReducer extends
Reducer<Text, StudentBean, Text, StudentBean> {

@Override
protected void reduce(
Text key,
Iterable<StudentBean> values,
org.apache.hadoop.mapreduce.Reducer<Text, StudentBean, Text, StudentBean>.Context context)
throws IOException, InterruptedException {
try {
// TODO Auto-generated method stub
long sum_score = 0;
long maxScore = Integer.MIN_VALUE;
long minScore = Integer.MAX_VALUE;
int index = 0;
for (StudentBean bean : values) {
System.out.println(“reduce: score=” + bean.getScore());
sum_score += bean.getScore();
index++;
if (maxScore < bean.getScore()) {
maxScore = bean.getScore();
}
if (minScore > bean.getScore()) {
minScore = bean.getScore();
}
}
System.out.println(“reduce: maxScore=” + maxScore
+ ” minScore=” + minScore);
StudentBean resultBean = new StudentBean();
resultBean.setAvgScore(sum_score / index);
resultBean.setMaxScore(maxScore);
resultBean.setMinScore(minScore);
context.write(key, resultBean);
} catch (Exception ex) {
ex.printStackTrace();
}
}

// 主函数
public static void main(String[] args) throws Exception {
// 获取配置参数
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
// 检查命令语法
if (otherArgs.length != 2) {
System.err.println(“Usage: Dedup <in> <out>”);
System.exit(2);
}
// 定义作业对象
Job job = new Job(conf, “FlowCount”);
// 注册分布式类
job.setJarByClass(StudentScore.class);
// 注册Mapper类
job.setMapperClass(FlowCountMapper.class);
// 注册Reducer类
job.setReducerClass(FlowCountReducer.class);
// 指定我们自定义的分区器
// job.setPartitionerClass(ProvincePartitioner.class);
// 同时指定相应分区数量的reducetask
// job.setNumReduceTasks(5);
// 注册输出格式类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(StudentBean.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 运行程序
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

}

 

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注