gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 开发博客
  • bug清单
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Kafka

分类归档Hadoop

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Hadoop"
bug清单, Hadoop 4月 24,2020

namenode 日志提示 storage directory does not exist or is not accessible

移植hadoop到另一台机器,启动hadoop时,发现namenode没启动起来。检查配置文件没问题,ssh又免登录了,于是删除hadoop几个临时文件夹,重新格式化,果然就正常了。

作者 east
bug清单, Hadoop 3月 2,2020

hadoop DataNode服务无法启动解决(报java.net.BindException:地址已在使用)

启动DataNode服务后马上又Shutdown, 在操作系统没看到有DataNode的日志(可能是服务启动失败, 自动删除了日志文件),幸好在界面上可以查看报错的日志:

 点开报错信息, 可以看到如下信息:

 HDFS的端口为50010, 但是使用netstat -ntulp | grep 50010查看不到此端口。

分析:

原因:当应用程序崩溃后, 它会留下一个滞留的socket,以便能够提前重用socket, 当尝试绑定socket并重用它,你需要将socket的flag设置为SO_REUSEADDR,但是HDFS不是这么做的。解决办法是使用设置SO_REUSEADDR的应用程序绑定到这个端口, 然后停止这个应用程序。可以使用netcat工具实现。解决办法: 安装nc工具, 使用nc工具占用50010端口, 然后关闭nc服务, 再次启动DataNode后正常。

参考链接:http://www.nosql.se/2013/10/hadoop-hdfs-datanode-java-net-bindexception-address-already-in-use/参考文字:

    After an application crashes it might leave a lingering socket, so to reuse that socket early you need to set the socket flag SO_REUSEADDR when attempting to bind to it to be allowed to reuse it. The HDFS datanode doesn’t do that, and I didn’t want to restart the HBase regionserver (which was locking the socket with a connection it hadn’t realized was dead).The solution was to bind to the port with an application that sets SO_REUSEADDR andthen stop that application, I used netcat for that:# nc -l 50010

2017-02-17 20:54:52,250 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Shutdown complete.2017-02-17 20:54:52,251 FATAL org.apache.hadoop.hdfs.server.datanode.DataNode: Exception in secureMainjava.net.BindException: Address already in use	at sun.nio.ch.Net.bind0(Native Method)	at sun.nio.ch.Net.bind(Net.java:444)	at sun.nio.ch.Net.bind(Net.java:436)	at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)	at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)	at com.cloudera.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:125)	at com.cloudera.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:475)	at com.cloudera.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1021)	at com.cloudera.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:455)	at com.cloudera.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:440)	at com.cloudera.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:844)	at com.cloudera.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:194)	at com.cloudera.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:340)	at com.cloudera.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)	at com.cloudera.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)	at com.cloudera.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)	at com.cloudera.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)	at java.lang.Thread.run(Thread.java:745)2017-02-17 20:54:52,262 INFO org.apache.hadoop.util.ExitUtil: Exiting with status 12017-02-17 20:54:52,264 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: SHUTDOWN_MSG: /************************************************************SHUTDOWN_MSG: Shutting down DataNode at cdh1/192.168.5.78
作者 east
bug清单, Hadoop 3月 2,2020

一个启动不了namenode的诡异问题

安装好hadoop后,访问ip:50070,发现只显示NateNode on ip:50070,在终端输入 jps,也没有发现namenode进程。

在百度上各种抓狂,修改host,修改配置文件,删除创建临时文件,重新格式化,还是没有作用。

发现shell执行start-all.sh 提示:

Add correct host key in /root/.ssh/known_hosts to get rid of this message.

于是联想到配置ssh免密时是否操作有问题,发现/root/.ssh多了一个
known_hosts文件,于是删除重新启动,果然好了

作者 east
Hadoop 2月 12,2019

yarn权限问题

① YARN 执行作业遇到 Unauthorized request to start container 问题:

datanode 与 namenode 之间未设置时间同步,所以引起该异常。解决方案:多个 datanode 与 namenode 进行时间同步。

②HDFS 客户端的权限错误:Permission denied

1、在系统的环境变量或 Java JVM 变量里面添加HADOOP_USER_NAME,这个值具体等于多少看自己的情况,以后会运行 Hadoop 上的 Linux 的用户名。(修改完重启 eclipse,不然可能不生效)

2、将当前系统的帐号修改为 hadoop

3、使用 HDFS 的命令行接口修改相应目录的权限,hadoop fs -chmod 777 /user,后面的/user 是要上传文件的路径,不同的情况可能不一样,比如要上传的文件路径为hdfs://namenode/user/xxx.doc,则这样的修改可以,如果要上传的文件路径为hdfs://namenode/Java/xxx.doc,则要修改的为 hadoop fs -chmod 777 /java 或者 hadoop fs – chmod 777 /,java 的那个需要先在 HDFS 里面建立 Java 目录,后面的这个是为根目录调整权限。

推荐第一种

作者 east
Hadoop 2月 12,2019

hadoop环境变量问题

bash:haoop:command not found

今天配置完 Hadoop 运行 hadoop 命令 hadoop fs -ls 时出现:

bash:haoop:command not found

后来经过检查,原来是配置文件中 HADOOP_HOME 变量引用前少了个$,修改完毕后可以正常运行了。

vi /etc/profile

export JAVA_HOME=/usr/Java/jdk1.7.0_75 export HADOOP_HOME=/opt/hadoop-2.6.0

export PATH=.:$HADOOP_HOME/bin:$JAVA_HOME/bin:$PATH

修改完后运行 source /etc/profile 让其立即生效!

在 start-all.sh 是出现异常:

Error:Cannot find configuration directory:/etc/hadoop

解决办法:

在 hadoop-env.sh 配置文件中将

export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-“etc/hadoop”}

替换成

export HADOOP_CONF_DIR=/usr/hadoop-2.6.0/etc/hadoop

修改完后运行 source hadoop-env.sh 让其立即生效!

作者 east
Hadoop 2月 12,2019

namenode常见问题

1、cannot delete name node is in safe mode

问题: 向 hdfs put 数据的时候,导致了 name node is in safe mode,然后使

用 Hadoop dfsadmin -safemode leave 后, 解除了安全模式。可是再次使用 hdfs put 或 rm

数据,仍旧导致 name node 进入安全模式。

答案:分析了一下,问题是 namenode 所在机器的硬盘满了。因此即使使用了 hadoop dfsadmin -safemode leave 之后, 仍旧不能使用 hdfs。

解决办法:

1,删除 namenode 所在机器的一些数据(本地数据) 2,结束安全模式 hadoop dfsadmin -safemode leave 3,可以正常使用 hdfs 了

2、Hadoop namenode 无法启动

最近遇到了一个问题,执行 start-all.sh 的时候发现 JPS 一下 namenode 没有启动。每次开机都得重新格式化一下 namenode 才可以

其实问题就出在tmp 文件,默认的 tmp 文件每次重新开机会被清空,与此同时 namenode

的格式化信息就会丢失

于是我们得重新配置一个 tmp 文件目录

首先在 home 目录下建立一个 hadoop_tmp 目录

sudo mkdir ~/hadoop_tmp

然后修改 Hadoop/conf 目录里面的 core-site.xml 文件,加入以下节点:

<property>

<name>hadoop.tmp.dir</name>

<value>/home/chjzh/hadoop_tmp</value>

<description>A base for other temporary directories.</description>

</property>

注意:我的用户是 chjzh 所以目录是/home/chjzh/hadoop_tmp OK 了,重新格式化Namenode

hadoop namenode –format

然后启动 hadoop

start-all.sh

执行下 JPS 命令就可以看到 NameNode 了

作者 east
Hadoop 12月 29,2018

hadoop权威指南第四版中英文代码资料合集一键下载

Hadoop权威指南从Hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍Hadoop这一高性能处理海量数据集的理想工具。涉及的主题包括:Haddoop简介;MapReduce简介:Hadoop分布式文件系统;Hadoop的I/O、MapReduee应用程序开发:MapReduee的工作机制;MapReduee的类型和格式;MapReduce的特性;如何安装Hadoop集群,如何管理Hadoop:Pig简介;Hbase简介;ZooKeeper简介,最后还提供了丰富的案例分析。

hadoop权威指南第四版英文原版、中文版以及配套的代码,内容以Hadoop2.x为主,包含一些hadoop的stable版本的新特性,与第三版本比较增加了介绍YARN , Parquet , Flume, Crunch , Spark的章节。

百度网盘下载地址:

链接:https://pan.baidu.com/s/19yvs8qUSR-0K2tOwDbQ4yQ
提取码:vsy4

作者 east
Hadoop 12月 14,2018

hadoop实例4:分析网站日志

public class LogCleanJob extends Configured implements Tool {

public static void main(String[] args) {
Configuration conf = new Configuration();
try {
int res = ToolRunner.run(conf, new LogCleanJob(), args);
System.exit(res);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
//设置mapper的配置,既就是hadoop/conf/mapred-site.xml的配置信息
conf.set(“mapred.job.tracker”, “hadoop01:9001”);
final Job job = new Job(new Configuration(),
LogCleanJob.class.getSimpleName());
// 设置为可以打包运行
job.setJarByClass(LogCleanJob.class);
// FileInputFormat.setInputPaths(job, args[0]);
FileInputFormat.addInputPath(job, new Path(“hdfs://localhost:9000/user/logclean”));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
/*
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 清理已存在的输出文件
FileSystem fs = FileSystem.get(new URI(args[0]), getConf());
Path outPath = new Path(args[1]);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}*/

Path outputPath = new Path(“hdfs://localhost:9000/user/logcleanOutput”);
FileSystem fs = outputPath.getFileSystem(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath, true);
}

FileOutputFormat.setOutputPath(job, outputPath);

boolean success = job.waitForCompletion(true);
if (success) {
System.out.println(“Clean process success!”);
} else {
System.out.println(“Clean process failed!”);
}
return 0;
}

// 静态内部类
static class MyMapper extends Mapper {
LogParser logParser = new LogParser();
Text outputValue = new Text();

protected void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper.Context context)
throws java.io.IOException, InterruptedException {
final String[] parsed = logParser.parse(value.toString());

// step1.过滤掉静态资源访问请求
if (parsed[2].startsWith(“GET /static/”)
|| parsed[2].startsWith(“GET /uc_server”)) {
return;
}
// step2.过滤掉开头的指定字符串
if (parsed[2].startsWith(“GET /”)) {
parsed[2] = parsed[2].substring(“GET /”.length());
} else if (parsed[2].startsWith(“POST /”)) {
parsed[2] = parsed[2].substring(“POST /”.length());
}
// step3.过滤掉结尾的特定字符串
if (parsed[2].endsWith(” HTTP/1.1″)) {
parsed[2] = parsed[2].substring(0, parsed[2].length()
– ” HTTP/1.1″.length());
}
// step4.只写入前三个记录类型项
outputValue.set(parsed[0] + “\t” + parsed[1] + “\t” + parsed[2]);
context.write(key, outputValue);
}
}

// 静态内部类
static class MyReducer extends Reducer {
protected void reduce(LongWritable k2, java.lang.Iterable<Text> v2s,
org.apache.hadoop.mapreduce.Reducer.Context context)
throws java.io.IOException, InterruptedException {
for (Text v2 : v2s) {
context.write(v2, NullWritable.get());
}
};
}

/*
* 日志解析类 静态内部类
*/
static class LogParser {
public static final SimpleDateFormat FORMAT = new SimpleDateFormat(
“d/MMM/yyyy:HH:mm:ss”, Locale.ENGLISH);
public static final SimpleDateFormat dateformat1 = new SimpleDateFormat(
“yyyyMMddHHmmss”);

public static void main(String[] args) throws ParseException {
final String S1 = “27.19.74.143 – – [30/May/2013:17:38:20 +0800] \”GET /static/image/common/faq.gif HTTP/1.1\” 200 1127″;
LogParser parser = new LogParser();
final String[] array = parser.parse(S1);
System.out.println(“样例数据: ” + S1);
System.out.format(
“解析结果: ip=%s, time=%s, url=%s, status=%s, traffic=%s”,
array[0], array[1], array[2], array[3], array[4]);
}

/**
* 解析英文时间字符串
*
* @param string
* @return
* @throws ParseException
*/
private Date parseDateFormat(String string) {
Date parse = null;
try {
parse = FORMAT.parse(string);
} catch (ParseException e) {
e.printStackTrace();
}
return parse;
}

/**
* 解析日志的行记录
*
* @param line
* @return 数组含有5个元素,分别是ip、时间、url、状态、流量
*/
public String[] parse(String line) {
String ip = parseIP(line);
String time = parseTime(line);
String url = parseURL(line);
String status = parseStatus(line);
String traffic = parseTraffic(line);
return new String[] { ip, time, url, status, traffic };
}

private String parseTraffic(String line) {
final String trim = line.substring(line.lastIndexOf(“\””) + 1)
.trim();
String traffic = trim.split(” “)[1];
return traffic;
}

private String parseStatus(String line) {
final String trim = line.substring(line.lastIndexOf(“\””) + 1)
.trim();
String status = trim.split(” “)[0];
return status;
}

private String parseURL(String line) {
final int first = line.indexOf(“\””);
final int last = line.lastIndexOf(“\””);
String url = line.substring(first + 1, last);
return url;
}

private String parseTime(String line) {
final int first = line.indexOf(“[“);
final int last = line.indexOf(“+0800]”);
String time = line.substring(first + 1, last).trim();
Date date = parseDateFormat(time);
return dateformat1.format(date);
}

private String parseIP(String line) {
String ip = line.split(“- -“)[0].trim();
return ip;
}
}
}

作者 east
Hadoop 12月 14,2018

hadoop实例3:合并相同数据

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* 需要把相同订单id的记录放在一个文件中,并以订单id命名
* @author Administrator
*
*/
public class MultipleOutputTest {

static class SortMapper extends
Mapper<LongWritable, Text, Text, Text> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
try {
System.out.println(“Before Mapper: ” + value + “, ” + value);
String line = value.toString();
String[] fields = line.split(“,”);
System.out.println(“fields[0]: ” + fields[0] + “, fields[2]=”
+ Double.parseDouble(fields[2]));
context.write(new Text(fields[0]), value);
System.out.println(“After Mapper: “);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

static class SortReducer extends Reducer<Text, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> multipleOutputs;

@Override
protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
System.out.print(“Before Reduce: ” + key + “, ” + key);
for (Text value : values) {
multipleOutputs.write(NullWritable.get(), value, key.toString());
}
}

@Override
protected void cleanup(
org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
multipleOutputs.close();
}

// 主函数
public static void main(String[] args) throws Exception {
// 获取配置参数
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
// 检查命令语法
if (otherArgs.length != 2) {
System.err.println(“Usage: Dedup <in> <out>”);
System.exit(2);
}
// 定义作业对象
Job job = new Job(conf, “multiple”);
// 注册分布式类
job.setJarByClass(MultipleOutputTest.class);
// 注册输出格式类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 注册Mapper类
job.setMapperClass(SortMapper.class);
// 注册Reducer类
job.setReducerClass(SortReducer.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 运行程序
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

}

作者 east
Hadoop 12月 14,2018

hadoop实例2:分析学生成绩(最高、最低、平均分)

输入数据:

computer,huangxiaoming,85
computer,xuzheng,54
computer,huangbo,86
computer,liutao,85
computer,huanglei,99
computer,liujialing,85
computer,liuyifei,75
computer,huangdatou,48
computer,huangjiaju,88
computer,huangzitao,85
english,zhaobenshan,57
english,liuyifei,85
english,liuyifei,76
english,huangdatou,48
english,zhouqi,85
english,huangbo,85
english,huangxiaoming,96
english,huanglei,85
english,liujialing,75
algorithm,liuyifei,75
algorithm,huanglei,76
algorithm,huangjiaju,85
algorithm,liutao,85
algorithm,huangdou,42
algorithm,huangzitao,81
math,wangbaoqiang,85
math,huanglei,76
math,huangjiaju,85
math,liutao,48
math,xuzheng,54
math,huangxiaoming,85
math,liujialing,85

 

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class StudentBean implements Writable{
private String course;
private long maxScore;
private long minScore;
private long avgScore;
private long score;

public StudentBean(){

}

public StudentBean(String course, long score){
this.course = course;
this.score = score;
}

public String getCourse() {
return course;
}

public void setCourse(String course) {
this.course = course;
}

public long getMaxScore() {
return maxScore;
}

public void setMaxScore(long maxScore) {
this.maxScore = maxScore;
}

public long getMinScore() {
return minScore;
}

public void setMinScore(long minScore) {
this.minScore = minScore;
}

public long getAvgScore() {
return avgScore;
}

public void setAvgScore(long avgScore) {
this.avgScore = avgScore;
}

public long getScore() {
return score;
}

public void setScore(long score) {
this.score = score;
}

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
try{

// course = in.readUTF();
/*maxScore = in.readLong();
minScore = in.readLong();
avgScore = in.readLong();*/
score = in.readLong();
}catch(Exception ex){
ex.printStackTrace();
}

}

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
//out.writeBytes(course);
// out.writeLong(maxScore);
// out.writeLong(minScore);
out.writeLong(score);
}

@Override
public String toString() {
// TODO Auto-generated method stub
return “\tmax=” + maxScore + “\tmin=” + minScore + “\tavg=” + avgScore;
}
}

 

import java.io.IOException;
/**
* 需要统计手机用户流量日志,日志内容实例:
要把同一个用户的上行流量、下行流量进行累加,并计算出综合。
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import test.Dedup;
import test.Dedup.Map;
import test.Dedup.Reduce;

public class StudentScore {

static class FlowCountMapper extends
Mapper<LongWritable, Text, Text, StudentBean> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
try {
String line = value.toString();
if (null != line && line.length() > 0) {
String[] fields = line.split(“,”);
String course = fields[0];
long score = Long.parseLong(fields[2]);
System.out.println(“map: course=” + course + ” score=”
+ score);
context.write(new Text(fields[0]), new StudentBean(course,
score));
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

static class FlowCountReducer extends
Reducer<Text, StudentBean, Text, StudentBean> {

@Override
protected void reduce(
Text key,
Iterable<StudentBean> values,
org.apache.hadoop.mapreduce.Reducer<Text, StudentBean, Text, StudentBean>.Context context)
throws IOException, InterruptedException {
try {
// TODO Auto-generated method stub
long sum_score = 0;
long maxScore = Integer.MIN_VALUE;
long minScore = Integer.MAX_VALUE;
int index = 0;
for (StudentBean bean : values) {
System.out.println(“reduce: score=” + bean.getScore());
sum_score += bean.getScore();
index++;
if (maxScore < bean.getScore()) {
maxScore = bean.getScore();
}
if (minScore > bean.getScore()) {
minScore = bean.getScore();
}
}
System.out.println(“reduce: maxScore=” + maxScore
+ ” minScore=” + minScore);
StudentBean resultBean = new StudentBean();
resultBean.setAvgScore(sum_score / index);
resultBean.setMaxScore(maxScore);
resultBean.setMinScore(minScore);
context.write(key, resultBean);
} catch (Exception ex) {
ex.printStackTrace();
}
}

// 主函数
public static void main(String[] args) throws Exception {
// 获取配置参数
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
// 检查命令语法
if (otherArgs.length != 2) {
System.err.println(“Usage: Dedup <in> <out>”);
System.exit(2);
}
// 定义作业对象
Job job = new Job(conf, “FlowCount”);
// 注册分布式类
job.setJarByClass(StudentScore.class);
// 注册Mapper类
job.setMapperClass(FlowCountMapper.class);
// 注册Reducer类
job.setReducerClass(FlowCountReducer.class);
// 指定我们自定义的分区器
// job.setPartitionerClass(ProvincePartitioner.class);
// 同时指定相应分区数量的reducetask
// job.setNumReduceTasks(5);
// 注册输出格式类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(StudentBean.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 运行程序
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

}

 

作者 east
Hadoop 12月 14,2018

hadoop实例1:采集的气象数据分析最高气温

输入数据:

2014010114
2014010216
2014010317
2014010410
2014010506
2012010609
2012010732
2012010812
2012010919
2012011023
2001010116
2001010212
2001010310
2001010411
2001010529
2013010619
2013010722
2013010812
2013010929
2013011023
2008010105
2008010216
2008010337
2008010414
2008010516
2007010619
2007010712
2007010812
2007010999
2007011023
2010010114
2010010216
2010010317
2010010410
2010010506
2015010649
2015010722
2015010812
2015010999
2015011023

 

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

public class Temperature {
/**
* 四个泛型类型分别代表:
* KeyIn Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,…)
* ValueIn Mapper的输入数据的Value,这里是每行文字
* KeyOut Mapper的输出数据的Key,这里是每行文字中的“年份”
* ValueOut Mapper的输出数据的Value,这里是每行文字中的“气温”
*/
static class TempMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// 打印样本: Before Mapper: 0, 2000010115
System.out.println(“Before Mapper: ” + key + “, ” + value);
if(null == value || value.getLength() == 0){
return;
}
String line = value.toString();
String year = line.substring(0, 4);
int temperature = Integer.parseInt(line.substring(8));
context.write(new Text(year), new IntWritable(temperature));
// 打印样本: After Mapper:2000, 15
System.out.println(
“======”+
“After Mapper:” + new Text(year) + “, ” + new IntWritable(temperature));

}
}

/**
* 四个泛型类型分别代表:
* KeyIn Reducer的输入数据的Key,这里是每行文字中的“年份”
* ValueIn Reducer的输入数据的Value,这里是每行文字中的“气温”
* KeyOut Reducer的输出数据的Key,这里是不重复的“年份”
* ValueOut Reducer的输出数据的Value,这里是这一年中的“最高气温”
*/
static class TempReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
StringBuffer sb = new StringBuffer();
for(IntWritable value: values){
maxValue = Math.max(maxValue, value.get());
sb.append(value).append(“, “);
}
// 打印样本: Before Reduce: 2000, 15, 23, 99, 12, 22,
System.out.print(“Before Reduce: ” + key + “, ” + sb.toString());
context.write(key,new IntWritable(maxValue));
// 打印样本: After Reduce: 2000, 99
System.out.println(
“======”+
“After Reduce: ” + key + “, ” + maxValue);
}
}

public static void main(String[] args) throws Exception {
Configuration hadoopConfig = new Configuration();
String[] otherArgs = new GenericOptionsParser(hadoopConfig, args).getRemainingArgs();
// 检查命令语法
if(otherArgs.length != 2){
System.err.println(“Usage: Dedup <in> <out>”);
System.exit(2);
}
//输入路径
// String dst = “hdfs://localhost:9000/user/tminput/”;
//输出路径,必须是不存在的,空文件加也不行。
// String dstOut = “hdfs://localhost:9000/user/tmoutput/”;

/* hadoopConfig.set(“fs.hdfs.impl”,
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
);
hadoopConfig.set(“fs.file.impl”,
org.apache.hadoop.fs.LocalFileSystem.class.getName()
);*/
Job job = new Job(hadoopConfig, “Temperature”);
//如果需要打成jar运行,需要下面这句
job.setJarByClass(Temperature.class);

FileInputFormat.setInputPaths(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

//指定自定义的Mapper和Reducer作为两个阶段的任务处理类
job.setMapperClass(TempMapper.class);
job.setReducerClass(TempReducer.class);
//job执行作业时输入和输出文件的路径

//设置最后输出结果的Key和Value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//执行job,直到完成
job.waitForCompletion(true);
System.out.println(“Finished”);
}

}

 

作者 east

标签

flex布局 github mysql O2O UI控件 不含后台 交流 体育 共享经济 出行 单机类 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 旅游 日历 时钟 流量主 物流 用户系统 电商 画图 画布(canvas) 社交 签到 算命 联网 装修 解锁 评论 读书 读音 资讯 阅读 预订

官方QQ群

1群:74052405

薅羊毛交流群: 952493060

近期文章

  • android http请求带中文参数会乱码(url编码)
  • mysql主从复制
  • 修改my.ini导致启动不了服务
  • Android聊天输入框控件
  • Android直播间刷礼物动画控件
  • Android自定义交易密码框
  • Android记录文件日志工具类
  • Android常用图片操作工具类
  • Android倒计时工具类
  • Android压缩解压工具

文章归档

  • 2021年一月
  • 2020年十二月
  • 2020年十一月
  • 2020年十月
  • 2020年九月
  • 2020年八月
  • 2020年七月
  • 2020年六月
  • 2020年五月
  • 2020年四月
  • 2020年三月
  • 2020年二月
  • 2020年一月
  • 2019年七月
  • 2019年六月
  • 2019年五月
  • 2019年四月
  • 2019年三月
  • 2019年二月
  • 2019年一月
  • 2018年十二月
  • 2018年七月
  • 2018年六月

分类目录

  • Android (30)
  • bug清单 (26)
  • Fuchsia (15)
  • php (1)
  • python (2)
  • 人工智能 (1)
  • 大数据开发 (119)
    • Elasticsearch (6)
    • Flink (3)
    • Hadoop (11)
    • Hbase (8)
    • Hive (1)
    • Java (27)
    • Kafka (1)
    • solr (1)
    • Spark (42)
    • spring (8)
    • 数据仓库 (1)
    • 数据挖掘 (5)
    • 运维 (4)
  • 小游戏代码 (1)
  • 小程序代码 (111)
    • O2O (15)
    • UI控件 (3)
    • 互联网类 (17)
    • 企业类 (5)
    • 地图定位 (9)
    • 多媒体 (5)
    • 工具类 (19)
    • 电商类 (18)
    • 社交 (5)
    • 行业软件 (7)
    • 资讯读书 (7)
  • 开发博客 (5)
  • 数据库 (2)
  • 未分类 (6)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.