gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

hadoop实例3:合并相同数据

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 作者: east
  • ( 页面84 )
Hadoop 12月 14,2018

hadoop实例3:合并相同数据

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* 需要把相同订单id的记录放在一个文件中,并以订单id命名
* @author Administrator
*
*/
public class MultipleOutputTest {

static class SortMapper extends
Mapper<LongWritable, Text, Text, Text> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
try {
System.out.println(“Before Mapper: ” + value + “, ” + value);
String line = value.toString();
String[] fields = line.split(“,”);
System.out.println(“fields[0]: ” + fields[0] + “, fields[2]=”
+ Double.parseDouble(fields[2]));
context.write(new Text(fields[0]), value);
System.out.println(“After Mapper: “);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

static class SortReducer extends Reducer<Text, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> multipleOutputs;

@Override
protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
System.out.print(“Before Reduce: ” + key + “, ” + key);
for (Text value : values) {
multipleOutputs.write(NullWritable.get(), value, key.toString());
}
}

@Override
protected void cleanup(
org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
multipleOutputs.close();
}

// 主函数
public static void main(String[] args) throws Exception {
// 获取配置参数
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
// 检查命令语法
if (otherArgs.length != 2) {
System.err.println(“Usage: Dedup <in> <out>”);
System.exit(2);
}
// 定义作业对象
Job job = new Job(conf, “multiple”);
// 注册分布式类
job.setJarByClass(MultipleOutputTest.class);
// 注册输出格式类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 注册Mapper类
job.setMapperClass(SortMapper.class);
// 注册Reducer类
job.setReducerClass(SortReducer.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 运行程序
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

}

作者 east
Hadoop 12月 14,2018

hadoop实例2:分析学生成绩(最高、最低、平均分)

输入数据:

computer,huangxiaoming,85
computer,xuzheng,54
computer,huangbo,86
computer,liutao,85
computer,huanglei,99
computer,liujialing,85
computer,liuyifei,75
computer,huangdatou,48
computer,huangjiaju,88
computer,huangzitao,85
english,zhaobenshan,57
english,liuyifei,85
english,liuyifei,76
english,huangdatou,48
english,zhouqi,85
english,huangbo,85
english,huangxiaoming,96
english,huanglei,85
english,liujialing,75
algorithm,liuyifei,75
algorithm,huanglei,76
algorithm,huangjiaju,85
algorithm,liutao,85
algorithm,huangdou,42
algorithm,huangzitao,81
math,wangbaoqiang,85
math,huanglei,76
math,huangjiaju,85
math,liutao,48
math,xuzheng,54
math,huangxiaoming,85
math,liujialing,85

 

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class StudentBean implements Writable{
private String course;
private long maxScore;
private long minScore;
private long avgScore;
private long score;

public StudentBean(){

}

public StudentBean(String course, long score){
this.course = course;
this.score = score;
}

public String getCourse() {
return course;
}

public void setCourse(String course) {
this.course = course;
}

public long getMaxScore() {
return maxScore;
}

public void setMaxScore(long maxScore) {
this.maxScore = maxScore;
}

public long getMinScore() {
return minScore;
}

public void setMinScore(long minScore) {
this.minScore = minScore;
}

public long getAvgScore() {
return avgScore;
}

public void setAvgScore(long avgScore) {
this.avgScore = avgScore;
}

public long getScore() {
return score;
}

public void setScore(long score) {
this.score = score;
}

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
try{

// course = in.readUTF();
/*maxScore = in.readLong();
minScore = in.readLong();
avgScore = in.readLong();*/
score = in.readLong();
}catch(Exception ex){
ex.printStackTrace();
}

}

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
//out.writeBytes(course);
// out.writeLong(maxScore);
// out.writeLong(minScore);
out.writeLong(score);
}

@Override
public String toString() {
// TODO Auto-generated method stub
return “\tmax=” + maxScore + “\tmin=” + minScore + “\tavg=” + avgScore;
}
}

 

import java.io.IOException;
/**
* 需要统计手机用户流量日志,日志内容实例:
要把同一个用户的上行流量、下行流量进行累加,并计算出综合。
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import test.Dedup;
import test.Dedup.Map;
import test.Dedup.Reduce;

public class StudentScore {

static class FlowCountMapper extends
Mapper<LongWritable, Text, Text, StudentBean> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
try {
String line = value.toString();
if (null != line && line.length() > 0) {
String[] fields = line.split(“,”);
String course = fields[0];
long score = Long.parseLong(fields[2]);
System.out.println(“map: course=” + course + ” score=”
+ score);
context.write(new Text(fields[0]), new StudentBean(course,
score));
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

static class FlowCountReducer extends
Reducer<Text, StudentBean, Text, StudentBean> {

@Override
protected void reduce(
Text key,
Iterable<StudentBean> values,
org.apache.hadoop.mapreduce.Reducer<Text, StudentBean, Text, StudentBean>.Context context)
throws IOException, InterruptedException {
try {
// TODO Auto-generated method stub
long sum_score = 0;
long maxScore = Integer.MIN_VALUE;
long minScore = Integer.MAX_VALUE;
int index = 0;
for (StudentBean bean : values) {
System.out.println(“reduce: score=” + bean.getScore());
sum_score += bean.getScore();
index++;
if (maxScore < bean.getScore()) {
maxScore = bean.getScore();
}
if (minScore > bean.getScore()) {
minScore = bean.getScore();
}
}
System.out.println(“reduce: maxScore=” + maxScore
+ ” minScore=” + minScore);
StudentBean resultBean = new StudentBean();
resultBean.setAvgScore(sum_score / index);
resultBean.setMaxScore(maxScore);
resultBean.setMinScore(minScore);
context.write(key, resultBean);
} catch (Exception ex) {
ex.printStackTrace();
}
}

// 主函数
public static void main(String[] args) throws Exception {
// 获取配置参数
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
// 检查命令语法
if (otherArgs.length != 2) {
System.err.println(“Usage: Dedup <in> <out>”);
System.exit(2);
}
// 定义作业对象
Job job = new Job(conf, “FlowCount”);
// 注册分布式类
job.setJarByClass(StudentScore.class);
// 注册Mapper类
job.setMapperClass(FlowCountMapper.class);
// 注册Reducer类
job.setReducerClass(FlowCountReducer.class);
// 指定我们自定义的分区器
// job.setPartitionerClass(ProvincePartitioner.class);
// 同时指定相应分区数量的reducetask
// job.setNumReduceTasks(5);
// 注册输出格式类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(StudentBean.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 运行程序
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

}

 

作者 east
Hadoop 12月 14,2018

hadoop实例1:采集的气象数据分析最高气温

输入数据:

2014010114
2014010216
2014010317
2014010410
2014010506
2012010609
2012010732
2012010812
2012010919
2012011023
2001010116
2001010212
2001010310
2001010411
2001010529
2013010619
2013010722
2013010812
2013010929
2013011023
2008010105
2008010216
2008010337
2008010414
2008010516
2007010619
2007010712
2007010812
2007010999
2007011023
2010010114
2010010216
2010010317
2010010410
2010010506
2015010649
2015010722
2015010812
2015010999
2015011023

 

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

public class Temperature {
/**
* 四个泛型类型分别代表:
* KeyIn Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,…)
* ValueIn Mapper的输入数据的Value,这里是每行文字
* KeyOut Mapper的输出数据的Key,这里是每行文字中的“年份”
* ValueOut Mapper的输出数据的Value,这里是每行文字中的“气温”
*/
static class TempMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// 打印样本: Before Mapper: 0, 2000010115
System.out.println(“Before Mapper: ” + key + “, ” + value);
if(null == value || value.getLength() == 0){
return;
}
String line = value.toString();
String year = line.substring(0, 4);
int temperature = Integer.parseInt(line.substring(8));
context.write(new Text(year), new IntWritable(temperature));
// 打印样本: After Mapper:2000, 15
System.out.println(
“======”+
“After Mapper:” + new Text(year) + “, ” + new IntWritable(temperature));

}
}

/**
* 四个泛型类型分别代表:
* KeyIn Reducer的输入数据的Key,这里是每行文字中的“年份”
* ValueIn Reducer的输入数据的Value,这里是每行文字中的“气温”
* KeyOut Reducer的输出数据的Key,这里是不重复的“年份”
* ValueOut Reducer的输出数据的Value,这里是这一年中的“最高气温”
*/
static class TempReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
StringBuffer sb = new StringBuffer();
for(IntWritable value: values){
maxValue = Math.max(maxValue, value.get());
sb.append(value).append(“, “);
}
// 打印样本: Before Reduce: 2000, 15, 23, 99, 12, 22,
System.out.print(“Before Reduce: ” + key + “, ” + sb.toString());
context.write(key,new IntWritable(maxValue));
// 打印样本: After Reduce: 2000, 99
System.out.println(
“======”+
“After Reduce: ” + key + “, ” + maxValue);
}
}

public static void main(String[] args) throws Exception {
Configuration hadoopConfig = new Configuration();
String[] otherArgs = new GenericOptionsParser(hadoopConfig, args).getRemainingArgs();
// 检查命令语法
if(otherArgs.length != 2){
System.err.println(“Usage: Dedup <in> <out>”);
System.exit(2);
}
//输入路径
// String dst = “hdfs://localhost:9000/user/tminput/”;
//输出路径,必须是不存在的,空文件加也不行。
// String dstOut = “hdfs://localhost:9000/user/tmoutput/”;

/* hadoopConfig.set(“fs.hdfs.impl”,
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
);
hadoopConfig.set(“fs.file.impl”,
org.apache.hadoop.fs.LocalFileSystem.class.getName()
);*/
Job job = new Job(hadoopConfig, “Temperature”);
//如果需要打成jar运行,需要下面这句
job.setJarByClass(Temperature.class);

FileInputFormat.setInputPaths(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

//指定自定义的Mapper和Reducer作为两个阶段的任务处理类
job.setMapperClass(TempMapper.class);
job.setReducerClass(TempReducer.class);
//job执行作业时输入和输出文件的路径

//设置最后输出结果的Key和Value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//执行job,直到完成
job.waitForCompletion(true);
System.out.println(“Finished”);
}

}

 

作者 east
Spark 12月 14,2018

spark实例8:读取mysql数据

/**
 * Created by Administrator on 2017/11/6.
 */
public class SparkMysql {
    public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class);

    public static void main(String[] args) {
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));
        SQLContext sqlContext = new SQLContext(sparkContext);
        //读取mysql数据
        readMySQL(sqlContext);

        //停止SparkContext
        sparkContext.stop();
    }
    private static void readMySQL(SQLContext sqlContext){
        //jdbc.url=jdbc:mysql://localhost:3306/database
        String url = "jdbc:mysql://localhost:3306/test";
        //查找的表名
        String table = "hb_links";
        //增加数据库的用户名(user)密码(password),指定test数据库的驱动(driver)
        Properties connectionProperties = new Properties();
        connectionProperties.put("user","root");
        connectionProperties.put("password","168168");
        connectionProperties.put("driver","com.mysql.jdbc.Driver");

        //SparkJdbc读取Postgresql的products表内容
        System.out.println("读取rangfei数据库中的hb_links表内容");
        // 读取表中所有数据
      //  DataFrame jdbcDF = sqlContext.read().jdbc(url,table,connectionProperties).select("*");
        Dataset<Row> jdbcDF = sqlContext.read().jdbc(url,table,connectionProperties).select("*");

        //显示数据
        jdbcDF.show();
    }
}
作者 east
Spark 12月 14,2018

spark实例7:使用Spark SQL来分析网络日志

case class ApacheAccessLog(
                            ipAddress: String, // IP地址
                              clientId: String, // 客户端唯一标识符
                              userId: String, // 用户唯一标识符
                              serverTime: String, // 服务器时间
                              method: String, // 请求类型/方式
                              endpoint: String, // 请求的资源
                              protocol: String, // 请求的协议名称
                              responseCode: Int, // 请求返回值:比如:200、401
                              contentSize: Long // 返回的结果数据大小
                          ) {

}

object ApacheAccessLog {
  // regex
  // 64.242.88.10 - - [   07/Mar/2004:16:05:49 -0800       ]
  // "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1"
  // 401 12846
  val PARTTERN =
  """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+|-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r

  def isValidatelogLine(log: String): Boolean = {
    val res = PARTTERN.findFirstMatchIn(log)
    if (res.isEmpty) {
      false
    } else {
      true
    }

  }

  def parseLogLine(log: String): ApacheAccessLog = {
    val res = PARTTERN.findFirstMatchIn(log)
    if (res.isEmpty) {
      throw new RuntimeException("Cannot parse log line: " + log)
    }
    val m = res.get

    ApacheAccessLog(
      m.group(1),
      m.group(2),
      m.group(3),
      m.group(4),
      m.group(5),
      m.group(6),
      m.group(7),
      m.group(8).toInt,
      m.group(9).toLong
    )
  }
}


import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Administrator on 2017/4/25.
  */
object LogAnalysis {
  def main(args: Array[String]): Unit = {
    //sqlContext
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("log-analysis-sparksql")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._ //如果不写,下面的转换不成功

    //transform
    val path = "E:\\newcode\\MyFirstProject\\data\\test.log"
    val rdd = sc.textFile(path)
    val apacheAccessDataFrame = rdd
      .filter(line => ApacheAccessLog.isValidatelogLine(line))
      .map(line => {
        ApacheAccessLog.parseLogLine(line)
      }).cache().toDF() //rdd转换为DataFrame

    //register temptable
    apacheAccessDataFrame.registerTempTable("log_analysis_temp_table")
      sqlContext.sql("select * from log_analysis_temp_table limit 1").show()

      //需求一:求contentSize的平均值,最大值以及最小值
      val resultDataFrame1 = sqlContext.sql(

      """
        |SELECT
        |AVG(contentSize) as avg_contentSize,
        |MAX(contentSize) as max_contentSize,
        |MIN(contentSize) as min_contentSize
        |FROM log_analysis_temp_table
      """.stripMargin
      )
      resultDataFrame1.show()

      //save                                         //save as HDFS
      val resultRdd = resultDataFrame1.map(row => {
        val avgSize = row.getAs[Double]("avg_contentSize")
        val minSize = row.getAs[Long]("min_contentSize")
        val maxSize = row.getAs[Long]("max_contentSize")
        (avgSize, minSize, maxSize)
      })
      resultRdd.rdd.saveAsTextFile(s"E:/newcode/MyFirstProject/data/output/sql_${System.currentTimeMillis()}")


    //需求二:求各个返回值出现的数据个数
    val resultDataFrame2 = sqlContext.sql(
      """
        |SELECT
        |responseCode AS code,
        |COUNT(1) AS count
        |FROM log_analysis_temp_table
        |GROUP BY responseCode
      """.stripMargin
    )
    resultDataFrame2.show()
    resultDataFrame2.repartition(1).write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\output\\responseCode")


    //需求三:求访问次数大于N的IP地址,并对黑名单进行限制
    val blackIP = Array("200-55-104-193.ds1.prima.net.ar", "10.0.0.153", "208-38-57-205.ip.cal.radiant.net")
    val N = 10
    val resultDataFrame3 = sqlContext.sql(
      s"""
         |SELECT
         |ipAddress AS ip,
         |COUNT(1) AS count
         |FROM log_analysis_temp_table
         |WHERE not(ipAddress in(${blackIP.map(ip => s"'${ip}'").mkString(",")}))
         |GROUP BY ipAddress
         |HAVING count>${N}
     """.stripMargin)
    resultDataFrame3.show()


    //需求四:求访问次数最多的前k个endpoint的值
    val k = 50
    val resultDataFrame4 = sqlContext.sql(
      s"""
         |SELECT
         |  t.endpoint,
         |  t.count
         |FROM(
         |SELECT
         |  endpoint,
         |  COUNT(1) AS count
         |FROM log_analysis_temp_table
         |GROUP BY endpoint) t
         |ORDER BY t.count DESC
         |limit ${k}
      """.stripMargin)
    resultDataFrame4.show()
    resultDataFrame4.repartition(1).write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\output\\maxendpoint")

  }
}
作者 east
Spark 12月 14,2018

spark实例6:kafka生产者和消费者实例

mvn配置:

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.4.0</version>
  </dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.54</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>


生产者代码:
import java.util.Properties
import com.alibaba.fastjson.JSONObject
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import scala.util.Random

object OrderProducer {
def main(args: Array[String]): Unit = {

//Kafka参数设置
val topic = "order"
val brokers = "192.168.0.219:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
//创建生产者
val producer = new Producer[String, String](kafkaConfig)

while (true) {
//随机生成10以内ID
val id = Random.nextInt(10)
//创建订单成交事件
val event = new JSONObject();
//商品ID
event.put("id", id)
//商品成交价格
event.put("price", Random.nextInt(10000))

//发送信息
producer.send(new KeyedMessage[String, String](topic, event.toString))
println("Message sent: " + event)
//随机暂停一段时间
Thread.sleep(Random.nextInt(100))
}
}
}


消费者代码:

import com.alibaba.fastjson.JSON
import kafka.click.RedisClient
import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/**
  * Created by Administrator on 2017/8/30.
  */
object OrderConsumer {
  //Redis配置
  val dbIndex = 0
  //每件商品总销售额
  val orderTotalKey = "app::order::total"
  //每件商品每分钟销售额
  val oneMinTotalKey = "app::order::product"
  //总销售额
  val totalKey = "app::order::all"


  def main(args: Array[String]): Unit = {

    // 创建 StreamingContext 时间片为1秒
    val conf = new SparkConf().setMaster("local").setAppName("UserClickCountStat")
    val ssc = new StreamingContext(conf, Seconds(1))

    // Kafka 配置
    val topics = Set("order")
    val brokers = "192.168.0.219:9092"
    /*val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers,
      "serializer.class" -> "kafka.serializer.StringEncoder")
    */

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.0.219:9092,192.168.0.220:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

   // val topics = Array("topicA", "topicB")
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    // 创建一个 direct stream
 //   val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topics)

    //解析JSON
  //  val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line._2)))
  val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line.value())))

    // 按ID分组统计个数与价格总合
    val orders = events.map(x => (x.getString("id"), x.getLong("price"))).groupByKey().map(x => (x._1, x._2.size, x._2.reduceLeft(_ + _)))

    //输出
    orders.foreachRDD(x =>
      x.foreachPartition(partition =>
        partition.foreach(x => {

          println("id=" + x._1 + " count=" + x._2 + " price=" + x._3)

          //保存到Redis中
          val jedis = RedisClient.pool.getResource
          jedis.auth("123456")
          jedis.select(dbIndex)
          //每个商品销售额累加
          jedis.hincrBy(orderTotalKey, x._1, x._3)
          //上一分钟第每个商品销售额
          jedis.hset(oneMinTotalKey, x._1.toString, x._3.toString)
          //总销售额累加
          jedis.incrBy(totalKey, x._3)
          RedisClient.pool.returnResource(jedis)


        })
      ))


    ssc.start()
    ssc.awaitTermination()
  }

}


作者 east
Spark 12月 14,2018

spark实例5:找出词频最高的前K个词

输入数据:

Hello World Bye World
Hello Hadoop Bye Hadoop
Bye Hadoop Hello Hadoop

 

输出结果:

(Hadoop,4)
(Bye,3)
(Hello,3)
(World,2)

 

/**
  * 一是统计词频,二是找出词频最高的前K个词
  */
object TopNBasic {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName(TopNBasic.getClass.getSimpleName)
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile("E:\\newcode\\MyFirstProject\\data\\topn.txt")
    val words = textFile.flatMap(line => line.split(" "))
    val wordPairs = words.map(word => (word, 1))
    val wordCounts = wordPairs.reduceByKey((a,b) => a + b)
 // val groupRDD = textFile.map(line => (line.split(" ")(0),line.split(" ")(1).toInt)).groupByKey()
  //  val top2 = groupRDD.map(pair => (pair._1, pair._2.toList.sortWith(_>_).take(2)))
   val top2 = wordCounts.sortBy(_._2, false).take(4)

    println("wordCounts: ")
    top2.foreach(println)
  }
}
作者 east
Spark 12月 14,2018

spark实例4:求中位数

输入数据:

1 2 3 4 5 6 8 9 11 12 34

 

输出结果:

6

/**
  * 求中位数
  */

object Median {

  def main (args: Array[String]) {

    val conf = new SparkConf().setAppName("Median").setMaster("local")
    val sc = new SparkContext(conf)

    val data = sc.textFile("E:\\newcode\\MyFirstProject\\data\\Median.txt")

    val words = data.flatMap(_.split(" ")).map(word => word.toInt)
    words.collect().foreach(println)
    val number = words.map(word =>(word/4,word)).sortByKey()
    number.collect().foreach(println)
    val pariCount = words.map(word => (word/4,1)).reduceByKey(_+_).sortByKey()
    pariCount.collect().foreach(println)
    val count = words.count().toInt
    var mid =0
    if(count%2 != 0)
    {
      mid = count/2+1
    }else
    {
      mid = count/2
    }

    var temp =0
    var temp1= 0
    var index = 0
    val tongNumber = pariCount.count().toInt

    var foundIt = false
    for(i <- 0 to tongNumber-1 if !foundIt)
    {
      println(pariCount.collectAsMap()(i).toString);
      temp = temp + pariCount.collectAsMap()(i)
      temp1 = temp - pariCount.collectAsMap()(i)
      if(temp >= mid)
      {
        index = i
        foundIt = true
      }
    }
    val tonginneroffset = mid - temp1

    val median = number.filter(_._1==index).takeOrdered(tonginneroffset)
    sc.setLogLevel("ERROR")
    println(median(tonginneroffset-1)._2)
    sc.stop()

  }
}
作者 east
Spark 12月 14,2018

spark实例3:倒排索引

   输入数据文件格式:
   cx1:a,b,c,d,e,f
   cx2:c,d,e,f
   cx3:a,b,c,f
   cx4:a,b,c,d,e,f
   cx5:a,b,e,f
   cx6:a,b,c,d
   cx7:a,b,c,f
   cx8:d,e,f
   cx9:b,c,d,e,f
  
  输出结果:
  a|cx1,cx3,cx4,cx5,cx6,cx7
  b|cx1,cx3,cx4,cx5,cx6,cx7,cx9
  c|cx1,cx2,cx3,cx4,cx6,cx7,cx9
  d|cx1,cx2,cx4,cx6,cx8,cx9
  e|cx1,cx2,cx4,cx5,cx8,cx9
  f|cx1,cx2,cx3,cx4,cx5,cx7,cx8,cx9

import scala.io.Source
object InvertedIndex {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName(InvertedIndex.getClass.getSimpleName)
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    /* 倒排索引InvertedIndex */

    val source = Source.fromFile("D:\\java\\spark\\data\\invertIndex.txt").getLines.toArray

    val cxRDD0 = sc.parallelize(source) /* spark单机读取数据 */

   val rdd2 = cxRDD0.flatMap {

      lines =>

        val line = lines.split(":", -1) /* 拆分数据,以竖杠为拆分条件 */

        line(1).split(",", -1).map {
          /* 再对拆分后的数据,进行第二次拆分 */

          v =>

            (v, line(0)) /* 拼接数据 */

        }

    }
    rdd2.collect().foreach {println}
     rdd2.groupByKey() /* 分组 */

      .sortBy(_._1, true) /* 排序 */

      .foreach(x => println(s"${x._1}|${x._2.mkString(",")}")) /* 格式化输出 */

  }
}
作者 east
Hbase 12月 14,2018

利用JavaAPI来操作Hbase

例子采用的是完全分布式集群,不是hbase自带的zookeeper,是独立的zookeeper

mvn的依赖如下:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.4.8</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.4.8</version>
</dependency>
在hbase中创建表、插入数据、查询数据等操作

import java.text.SimpleDateFormat
import java.util

import hbase.TestHbaeJavaApi.conf
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.{FilterList, SingleColumnValueFilter}
import org.apache.hadoop.hbase.util.Bytes

/**
  * 利用JavaAPI来操作Hbase
  */
object HBaseTool {
  val zkQuorum = "192.168.0.219"
  val port = "2181"
  val table = "test"
  val cf = "cf1"
  val config = HBaseConfiguration.create()
  config.set("hbase.zookeeper.property.clientPort", "2181")
  config.set("hbase.zookeeper.quorum", "192.168.0.219")
  config.set("hbase.master", "192.168.0.219:600000")

  def putData(rowKey:String, cf:String = cf, kv:Seq[(String,String)]): Put ={
    val put = new Put(Bytes.toBytes(rowKey))
    kv.foreach{ kv =>
      put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2))
    }
    put
  }

  def getData(rowKey:String, qualifier:String =null): Get={
    val get = new Get(Bytes.toBytes(rowKey))
    if(qualifier != null)
      get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(qualifier))
    get
  }

  def getScan(startRow:String, stopRow:String, filters:Map[String,String]= null, columns:Seq[String]= null): Scan= {
    var filterList: FilterList = null
    val scan = new Scan()
      .setStartRow(Bytes.toBytes(startRow))
      .setStopRow(Bytes.toBytes(stopRow))

    if(filters != null){
      filterList = getFilters(filters)
      if(filterList.getFilters.size() > 0)
        scan.setFilter(filterList)
    }

    if(columns != null) {
      columns.foreach{ column =>
        scan.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column))
      }
    }

    scan
  }

  def getFilters(kv:Map[String,String]): FilterList = {
    val filterList = new FilterList()
    kv.toSeq.foreach{ kv =>
      val filter = new SingleColumnValueFilter(
        Bytes.toBytes(cf),
        Bytes.toBytes(kv._1),
        CompareOp.EQUAL,
        Bytes.toBytes(kv._2)
      )
      filter.setFilterIfMissing(true)
      filterList.addFilter(filter)
    }
    filterList
  }

  def main(args: Array[String]): Unit = {
    val rowKey = new SimpleDateFormat("yyyy-MM-dd").format(System.currentTimeMillis())
    val testSchema = Seq("id","name","age")
    val testData = Seq("10001","Jack","22")
    /**
      * 1. 插入数据到HBase
      */
    val hTable: HTable = new HTable(config, TableName.valueOf(table))
    hTable.setAutoFlush(false)
    hTable.setWriteBufferSize(10 * 1024 * 1024)
    //处理任务功能
    hTable.put(putData(rowKey, cf=cf, testSchema zip testData))

    hTable.flushCommits()

    /**
      * 2. 通过Get查询HBase
      */
    val result: Result = hTable.get(getData(rowKey))
    for(kv <- result.raw()) {
      println("key="+Bytes.toString(kv.getQualifier)+", value="+Bytes.toString(kv.getValue))
    }

    val result1: Result = hTable.get(getData(rowKey, testSchema.toList(2)))
    for(kv <- result1.raw()) {
      println("value="+Bytes.toString(kv.getValue))
    }

    /**
      * 3. 通过Scan查询HBase
      */
    val scan = getScan(rowKey, rowKey)
    val resultScan: ResultScanner = hTable.getScanner(scan)
    val ite: util.Iterator[Result] = resultScan.iterator()
    while(ite.hasNext) {
      val result = ite.next()
      for(kv <- result.raw()) {
        println("rowKey="+Bytes.toString(kv.getRow)+", key="+Bytes.toString(kv.getQualifier)+", value="+Bytes.toString(kv.getValue))
      }
    }

    hTable.close()
  }



}

 

作者 east
Spark 12月 14,2018

spark实例2:统计亚马逊联盟的导出的费用明细的csv

统计出自己关心的数据,并把部分关心的数据保存为csv。从这个实例可以学习到spark2.0+如何来读写csv文件,如何用spark sql来统计数据。

 

object AmazonFeeSQL {
  def main(arg: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("UmengPV").master("local[*]").getOrCreate(); //为读取的数据创建schema

    val taxiSchema = StructType(Array(
      StructField("Category", StringType, true),
      StructField("Name", StringType, true),
      StructField("ASIN", StringType, true),
      StructField("Seller", StringType, true),
      StructField("Tracking ID", StringType, true),
      StructField("Date Shipped", StringType, true),
      StructField("Price", StringType, true),
      StructField("Items Shipped", IntegerType, true),
      StructField("Returns", IntegerType, true),
      StructField("Revenue", DoubleType, true),
      StructField("Ad Fees", DoubleType, true),
      StructField("Device Type Group", StringType, true),
      StructField("Direct", StringType, true)
    ))
    val path = "E:\\newcode\\MyFirstProject\\data\\amazon\\fee"
    //跳过第一行的标题 .option("header","true")
    val data = spark.read.option("header","true").schema(taxiSchema).csv(path)
    //data.show()
    data.createTempView("amazon_fee")
    val df = data.toDF()

    //按受欢迎的分类倒序排列
    val resultRdd = df.sqlContext.sql("select Category, count(Category) as cateNum from amazon_fee GROUP BY Category order by cateNum DESC")
    resultRdd.show()

    //最受欢迎的商品排列
    val top1Rdd = df.sqlContext.sql("select * from amazon_fee WHERE Category = 'Home'")
    top1Rdd.show()

    //最受欢迎的商品排列
    val earnTopRdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE ORDER BY Revenue DESC")
    earnTopRdd.show()

    //被退回次数最多的
    val returnTopRdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE Returns > 0  ORDER BY Returns DESC")
    returnTopRdd.show()

    //统计价格区间内的商品数量
    val priceRangeRdd = df.sqlContext.sql("SELECT price_range, count(*) AS number FROM(select case when Price >= 0 and Price <= 4.99 then '0-5'  when Price >= 5 and Price <= 10 then '005-10'  when Price >= 10 and Price <= 14.99 then '010-15'   when Price >= 15 and Price <= 19.99 then '015-20'  when Price >= 20 and Price <= 24.99 then '020-25' when Price >= 25 and Price <= 49.99 then '025-50'   when Price >= 50 and Price <= 99.99 then '050-100'    else '100+' end as price_range FROM amazon_fee WHERE true) AS  price_summaries GROUP BY price_range ORDER BY price_range")
    priceRangeRdd.show()


    //购买前2名的类型的商品
    val top3Rdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE Category = 'Home' OR Category = 'Toys & Games'")
    top3Rdd.show()
    top3Rdd.write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\Home_ToysGames_BestSeller")


  }
}
作者 east
Spark 12月 14,2018

spark实例1:统计一个 10 万人口的所有人的平均年龄

生成10万人口的生成文件代码:

object SampleDataFileGenerator {

  def main(args:Array[String]) {
    val writer = new FileWriter(new File("d:\\sample_age_data.txt"),false)
    val rand = new Random()
    for ( i <- 1 to 10000000) {
      writer.write( i + " " + rand.nextInt(100))
      writer.write(System.getProperty("line.separator"))
    }
    writer.flush()
    writer.close()
  }
}



使用RDD进行计算平均年龄:
 要计算平均年龄,那么首先需要对源文件对应的 RDD 进行处理,也就是将它转化成一个只包含年龄信息的 RDD,
 其次是计算元素个数即为总人数,然后是把所有年龄数加起来,最后平均年龄=总年龄/人数。  
 对于第一步我们需要使用 map 算子把源文件对应的 RDD 映射成一个新的只包含年龄数据的 RDD,
 很显然需要对在 map 算子的传入函数中使用 split 方法,得到数组后只取第二个元素即为年龄信息;
 第二步计算数据元素总数需要对于第一步映射的结果 RDD 使用 count 算子;
 第三步则是使用 reduce 算子对只包含年龄信息的 RDD 的所有元素用加法求和;最后使用除法计算平均年龄即可。
object AvgAgeCalculator {
  def main(args:Array[String]) {
   /* if (args.length < 1){
      println("Usage:AvgAgeCalculator datafile")
      System.exit(1)
    }*/
    val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val dataFile = sc.textFile("d:\\sample_age_data.txt", 5);
    val count = dataFile.count()
    val ageData = dataFile.map(line => line.split(" ")(1))
    val totalAge = ageData.map(age => Integer.parseInt(
      String.valueOf(age))).collect().reduce((a,b) => a+b)
    println("Total Age:" + totalAge + ";Number of People:" + count )
    val avgAge : Double = totalAge.toDouble / count.toDouble
    println("Average Age is " + avgAge)
  }
}


作者 east

上一 1 … 83 84 85 … 93 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (493)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (35)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.