gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

月度归档12月 2018

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  2018   /  
  • 12月
  • ( 页面3 )
Spark 12月 14,2018

spark实例7:使用Spark SQL来分析网络日志

case class ApacheAccessLog(
                            ipAddress: String, // IP地址
                              clientId: String, // 客户端唯一标识符
                              userId: String, // 用户唯一标识符
                              serverTime: String, // 服务器时间
                              method: String, // 请求类型/方式
                              endpoint: String, // 请求的资源
                              protocol: String, // 请求的协议名称
                              responseCode: Int, // 请求返回值:比如:200、401
                              contentSize: Long // 返回的结果数据大小
                          ) {

}

object ApacheAccessLog {
  // regex
  // 64.242.88.10 - - [   07/Mar/2004:16:05:49 -0800       ]
  // "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1"
  // 401 12846
  val PARTTERN =
  """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+|-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r

  def isValidatelogLine(log: String): Boolean = {
    val res = PARTTERN.findFirstMatchIn(log)
    if (res.isEmpty) {
      false
    } else {
      true
    }

  }

  def parseLogLine(log: String): ApacheAccessLog = {
    val res = PARTTERN.findFirstMatchIn(log)
    if (res.isEmpty) {
      throw new RuntimeException("Cannot parse log line: " + log)
    }
    val m = res.get

    ApacheAccessLog(
      m.group(1),
      m.group(2),
      m.group(3),
      m.group(4),
      m.group(5),
      m.group(6),
      m.group(7),
      m.group(8).toInt,
      m.group(9).toLong
    )
  }
}


import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Administrator on 2017/4/25.
  */
object LogAnalysis {
  def main(args: Array[String]): Unit = {
    //sqlContext
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("log-analysis-sparksql")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._ //如果不写,下面的转换不成功

    //transform
    val path = "E:\\newcode\\MyFirstProject\\data\\test.log"
    val rdd = sc.textFile(path)
    val apacheAccessDataFrame = rdd
      .filter(line => ApacheAccessLog.isValidatelogLine(line))
      .map(line => {
        ApacheAccessLog.parseLogLine(line)
      }).cache().toDF() //rdd转换为DataFrame

    //register temptable
    apacheAccessDataFrame.registerTempTable("log_analysis_temp_table")
      sqlContext.sql("select * from log_analysis_temp_table limit 1").show()

      //需求一:求contentSize的平均值,最大值以及最小值
      val resultDataFrame1 = sqlContext.sql(

      """
        |SELECT
        |AVG(contentSize) as avg_contentSize,
        |MAX(contentSize) as max_contentSize,
        |MIN(contentSize) as min_contentSize
        |FROM log_analysis_temp_table
      """.stripMargin
      )
      resultDataFrame1.show()

      //save                                         //save as HDFS
      val resultRdd = resultDataFrame1.map(row => {
        val avgSize = row.getAs[Double]("avg_contentSize")
        val minSize = row.getAs[Long]("min_contentSize")
        val maxSize = row.getAs[Long]("max_contentSize")
        (avgSize, minSize, maxSize)
      })
      resultRdd.rdd.saveAsTextFile(s"E:/newcode/MyFirstProject/data/output/sql_${System.currentTimeMillis()}")


    //需求二:求各个返回值出现的数据个数
    val resultDataFrame2 = sqlContext.sql(
      """
        |SELECT
        |responseCode AS code,
        |COUNT(1) AS count
        |FROM log_analysis_temp_table
        |GROUP BY responseCode
      """.stripMargin
    )
    resultDataFrame2.show()
    resultDataFrame2.repartition(1).write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\output\\responseCode")


    //需求三:求访问次数大于N的IP地址,并对黑名单进行限制
    val blackIP = Array("200-55-104-193.ds1.prima.net.ar", "10.0.0.153", "208-38-57-205.ip.cal.radiant.net")
    val N = 10
    val resultDataFrame3 = sqlContext.sql(
      s"""
         |SELECT
         |ipAddress AS ip,
         |COUNT(1) AS count
         |FROM log_analysis_temp_table
         |WHERE not(ipAddress in(${blackIP.map(ip => s"'${ip}'").mkString(",")}))
         |GROUP BY ipAddress
         |HAVING count>${N}
     """.stripMargin)
    resultDataFrame3.show()


    //需求四:求访问次数最多的前k个endpoint的值
    val k = 50
    val resultDataFrame4 = sqlContext.sql(
      s"""
         |SELECT
         |  t.endpoint,
         |  t.count
         |FROM(
         |SELECT
         |  endpoint,
         |  COUNT(1) AS count
         |FROM log_analysis_temp_table
         |GROUP BY endpoint) t
         |ORDER BY t.count DESC
         |limit ${k}
      """.stripMargin)
    resultDataFrame4.show()
    resultDataFrame4.repartition(1).write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\output\\maxendpoint")

  }
}
作者 east
Spark 12月 14,2018

spark实例6:kafka生产者和消费者实例

mvn配置:

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.4.0</version>
  </dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.54</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>


生产者代码:
import java.util.Properties
import com.alibaba.fastjson.JSONObject
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import scala.util.Random

object OrderProducer {
def main(args: Array[String]): Unit = {

//Kafka参数设置
val topic = "order"
val brokers = "192.168.0.219:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
//创建生产者
val producer = new Producer[String, String](kafkaConfig)

while (true) {
//随机生成10以内ID
val id = Random.nextInt(10)
//创建订单成交事件
val event = new JSONObject();
//商品ID
event.put("id", id)
//商品成交价格
event.put("price", Random.nextInt(10000))

//发送信息
producer.send(new KeyedMessage[String, String](topic, event.toString))
println("Message sent: " + event)
//随机暂停一段时间
Thread.sleep(Random.nextInt(100))
}
}
}


消费者代码:

import com.alibaba.fastjson.JSON
import kafka.click.RedisClient
import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/**
  * Created by Administrator on 2017/8/30.
  */
object OrderConsumer {
  //Redis配置
  val dbIndex = 0
  //每件商品总销售额
  val orderTotalKey = "app::order::total"
  //每件商品每分钟销售额
  val oneMinTotalKey = "app::order::product"
  //总销售额
  val totalKey = "app::order::all"


  def main(args: Array[String]): Unit = {

    // 创建 StreamingContext 时间片为1秒
    val conf = new SparkConf().setMaster("local").setAppName("UserClickCountStat")
    val ssc = new StreamingContext(conf, Seconds(1))

    // Kafka 配置
    val topics = Set("order")
    val brokers = "192.168.0.219:9092"
    /*val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers,
      "serializer.class" -> "kafka.serializer.StringEncoder")
    */

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.0.219:9092,192.168.0.220:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

   // val topics = Array("topicA", "topicB")
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    // 创建一个 direct stream
 //   val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topics)

    //解析JSON
  //  val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line._2)))
  val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line.value())))

    // 按ID分组统计个数与价格总合
    val orders = events.map(x => (x.getString("id"), x.getLong("price"))).groupByKey().map(x => (x._1, x._2.size, x._2.reduceLeft(_ + _)))

    //输出
    orders.foreachRDD(x =>
      x.foreachPartition(partition =>
        partition.foreach(x => {

          println("id=" + x._1 + " count=" + x._2 + " price=" + x._3)

          //保存到Redis中
          val jedis = RedisClient.pool.getResource
          jedis.auth("123456")
          jedis.select(dbIndex)
          //每个商品销售额累加
          jedis.hincrBy(orderTotalKey, x._1, x._3)
          //上一分钟第每个商品销售额
          jedis.hset(oneMinTotalKey, x._1.toString, x._3.toString)
          //总销售额累加
          jedis.incrBy(totalKey, x._3)
          RedisClient.pool.returnResource(jedis)


        })
      ))


    ssc.start()
    ssc.awaitTermination()
  }

}


作者 east
Spark 12月 14,2018

spark实例5:找出词频最高的前K个词

输入数据:

Hello World Bye World
Hello Hadoop Bye Hadoop
Bye Hadoop Hello Hadoop

 

输出结果:

(Hadoop,4)
(Bye,3)
(Hello,3)
(World,2)

 

/**
  * 一是统计词频,二是找出词频最高的前K个词
  */
object TopNBasic {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName(TopNBasic.getClass.getSimpleName)
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile("E:\\newcode\\MyFirstProject\\data\\topn.txt")
    val words = textFile.flatMap(line => line.split(" "))
    val wordPairs = words.map(word => (word, 1))
    val wordCounts = wordPairs.reduceByKey((a,b) => a + b)
 // val groupRDD = textFile.map(line => (line.split(" ")(0),line.split(" ")(1).toInt)).groupByKey()
  //  val top2 = groupRDD.map(pair => (pair._1, pair._2.toList.sortWith(_>_).take(2)))
   val top2 = wordCounts.sortBy(_._2, false).take(4)

    println("wordCounts: ")
    top2.foreach(println)
  }
}
作者 east
Spark 12月 14,2018

spark实例4:求中位数

输入数据:

1 2 3 4 5 6 8 9 11 12 34

 

输出结果:

6

/**
  * 求中位数
  */

object Median {

  def main (args: Array[String]) {

    val conf = new SparkConf().setAppName("Median").setMaster("local")
    val sc = new SparkContext(conf)

    val data = sc.textFile("E:\\newcode\\MyFirstProject\\data\\Median.txt")

    val words = data.flatMap(_.split(" ")).map(word => word.toInt)
    words.collect().foreach(println)
    val number = words.map(word =>(word/4,word)).sortByKey()
    number.collect().foreach(println)
    val pariCount = words.map(word => (word/4,1)).reduceByKey(_+_).sortByKey()
    pariCount.collect().foreach(println)
    val count = words.count().toInt
    var mid =0
    if(count%2 != 0)
    {
      mid = count/2+1
    }else
    {
      mid = count/2
    }

    var temp =0
    var temp1= 0
    var index = 0
    val tongNumber = pariCount.count().toInt

    var foundIt = false
    for(i <- 0 to tongNumber-1 if !foundIt)
    {
      println(pariCount.collectAsMap()(i).toString);
      temp = temp + pariCount.collectAsMap()(i)
      temp1 = temp - pariCount.collectAsMap()(i)
      if(temp >= mid)
      {
        index = i
        foundIt = true
      }
    }
    val tonginneroffset = mid - temp1

    val median = number.filter(_._1==index).takeOrdered(tonginneroffset)
    sc.setLogLevel("ERROR")
    println(median(tonginneroffset-1)._2)
    sc.stop()

  }
}
作者 east
Spark 12月 14,2018

spark实例3:倒排索引

   输入数据文件格式:
   cx1:a,b,c,d,e,f
   cx2:c,d,e,f
   cx3:a,b,c,f
   cx4:a,b,c,d,e,f
   cx5:a,b,e,f
   cx6:a,b,c,d
   cx7:a,b,c,f
   cx8:d,e,f
   cx9:b,c,d,e,f
  
  输出结果:
  a|cx1,cx3,cx4,cx5,cx6,cx7
  b|cx1,cx3,cx4,cx5,cx6,cx7,cx9
  c|cx1,cx2,cx3,cx4,cx6,cx7,cx9
  d|cx1,cx2,cx4,cx6,cx8,cx9
  e|cx1,cx2,cx4,cx5,cx8,cx9
  f|cx1,cx2,cx3,cx4,cx5,cx7,cx8,cx9

import scala.io.Source
object InvertedIndex {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName(InvertedIndex.getClass.getSimpleName)
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    /* 倒排索引InvertedIndex */

    val source = Source.fromFile("D:\\java\\spark\\data\\invertIndex.txt").getLines.toArray

    val cxRDD0 = sc.parallelize(source) /* spark单机读取数据 */

   val rdd2 = cxRDD0.flatMap {

      lines =>

        val line = lines.split(":", -1) /* 拆分数据,以竖杠为拆分条件 */

        line(1).split(",", -1).map {
          /* 再对拆分后的数据,进行第二次拆分 */

          v =>

            (v, line(0)) /* 拼接数据 */

        }

    }
    rdd2.collect().foreach {println}
     rdd2.groupByKey() /* 分组 */

      .sortBy(_._1, true) /* 排序 */

      .foreach(x => println(s"${x._1}|${x._2.mkString(",")}")) /* 格式化输出 */

  }
}
作者 east
Hbase 12月 14,2018

利用JavaAPI来操作Hbase

例子采用的是完全分布式集群,不是hbase自带的zookeeper,是独立的zookeeper

mvn的依赖如下:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.4.8</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.4.8</version>
</dependency>
在hbase中创建表、插入数据、查询数据等操作

import java.text.SimpleDateFormat
import java.util

import hbase.TestHbaeJavaApi.conf
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.{FilterList, SingleColumnValueFilter}
import org.apache.hadoop.hbase.util.Bytes

/**
  * 利用JavaAPI来操作Hbase
  */
object HBaseTool {
  val zkQuorum = "192.168.0.219"
  val port = "2181"
  val table = "test"
  val cf = "cf1"
  val config = HBaseConfiguration.create()
  config.set("hbase.zookeeper.property.clientPort", "2181")
  config.set("hbase.zookeeper.quorum", "192.168.0.219")
  config.set("hbase.master", "192.168.0.219:600000")

  def putData(rowKey:String, cf:String = cf, kv:Seq[(String,String)]): Put ={
    val put = new Put(Bytes.toBytes(rowKey))
    kv.foreach{ kv =>
      put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2))
    }
    put
  }

  def getData(rowKey:String, qualifier:String =null): Get={
    val get = new Get(Bytes.toBytes(rowKey))
    if(qualifier != null)
      get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(qualifier))
    get
  }

  def getScan(startRow:String, stopRow:String, filters:Map[String,String]= null, columns:Seq[String]= null): Scan= {
    var filterList: FilterList = null
    val scan = new Scan()
      .setStartRow(Bytes.toBytes(startRow))
      .setStopRow(Bytes.toBytes(stopRow))

    if(filters != null){
      filterList = getFilters(filters)
      if(filterList.getFilters.size() > 0)
        scan.setFilter(filterList)
    }

    if(columns != null) {
      columns.foreach{ column =>
        scan.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column))
      }
    }

    scan
  }

  def getFilters(kv:Map[String,String]): FilterList = {
    val filterList = new FilterList()
    kv.toSeq.foreach{ kv =>
      val filter = new SingleColumnValueFilter(
        Bytes.toBytes(cf),
        Bytes.toBytes(kv._1),
        CompareOp.EQUAL,
        Bytes.toBytes(kv._2)
      )
      filter.setFilterIfMissing(true)
      filterList.addFilter(filter)
    }
    filterList
  }

  def main(args: Array[String]): Unit = {
    val rowKey = new SimpleDateFormat("yyyy-MM-dd").format(System.currentTimeMillis())
    val testSchema = Seq("id","name","age")
    val testData = Seq("10001","Jack","22")
    /**
      * 1. 插入数据到HBase
      */
    val hTable: HTable = new HTable(config, TableName.valueOf(table))
    hTable.setAutoFlush(false)
    hTable.setWriteBufferSize(10 * 1024 * 1024)
    //处理任务功能
    hTable.put(putData(rowKey, cf=cf, testSchema zip testData))

    hTable.flushCommits()

    /**
      * 2. 通过Get查询HBase
      */
    val result: Result = hTable.get(getData(rowKey))
    for(kv <- result.raw()) {
      println("key="+Bytes.toString(kv.getQualifier)+", value="+Bytes.toString(kv.getValue))
    }

    val result1: Result = hTable.get(getData(rowKey, testSchema.toList(2)))
    for(kv <- result1.raw()) {
      println("value="+Bytes.toString(kv.getValue))
    }

    /**
      * 3. 通过Scan查询HBase
      */
    val scan = getScan(rowKey, rowKey)
    val resultScan: ResultScanner = hTable.getScanner(scan)
    val ite: util.Iterator[Result] = resultScan.iterator()
    while(ite.hasNext) {
      val result = ite.next()
      for(kv <- result.raw()) {
        println("rowKey="+Bytes.toString(kv.getRow)+", key="+Bytes.toString(kv.getQualifier)+", value="+Bytes.toString(kv.getValue))
      }
    }

    hTable.close()
  }



}

 

作者 east
Spark 12月 14,2018

spark实例2:统计亚马逊联盟的导出的费用明细的csv

统计出自己关心的数据,并把部分关心的数据保存为csv。从这个实例可以学习到spark2.0+如何来读写csv文件,如何用spark sql来统计数据。

 

object AmazonFeeSQL {
  def main(arg: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("UmengPV").master("local[*]").getOrCreate(); //为读取的数据创建schema

    val taxiSchema = StructType(Array(
      StructField("Category", StringType, true),
      StructField("Name", StringType, true),
      StructField("ASIN", StringType, true),
      StructField("Seller", StringType, true),
      StructField("Tracking ID", StringType, true),
      StructField("Date Shipped", StringType, true),
      StructField("Price", StringType, true),
      StructField("Items Shipped", IntegerType, true),
      StructField("Returns", IntegerType, true),
      StructField("Revenue", DoubleType, true),
      StructField("Ad Fees", DoubleType, true),
      StructField("Device Type Group", StringType, true),
      StructField("Direct", StringType, true)
    ))
    val path = "E:\\newcode\\MyFirstProject\\data\\amazon\\fee"
    //跳过第一行的标题 .option("header","true")
    val data = spark.read.option("header","true").schema(taxiSchema).csv(path)
    //data.show()
    data.createTempView("amazon_fee")
    val df = data.toDF()

    //按受欢迎的分类倒序排列
    val resultRdd = df.sqlContext.sql("select Category, count(Category) as cateNum from amazon_fee GROUP BY Category order by cateNum DESC")
    resultRdd.show()

    //最受欢迎的商品排列
    val top1Rdd = df.sqlContext.sql("select * from amazon_fee WHERE Category = 'Home'")
    top1Rdd.show()

    //最受欢迎的商品排列
    val earnTopRdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE ORDER BY Revenue DESC")
    earnTopRdd.show()

    //被退回次数最多的
    val returnTopRdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE Returns > 0  ORDER BY Returns DESC")
    returnTopRdd.show()

    //统计价格区间内的商品数量
    val priceRangeRdd = df.sqlContext.sql("SELECT price_range, count(*) AS number FROM(select case when Price >= 0 and Price <= 4.99 then '0-5'  when Price >= 5 and Price <= 10 then '005-10'  when Price >= 10 and Price <= 14.99 then '010-15'   when Price >= 15 and Price <= 19.99 then '015-20'  when Price >= 20 and Price <= 24.99 then '020-25' when Price >= 25 and Price <= 49.99 then '025-50'   when Price >= 50 and Price <= 99.99 then '050-100'    else '100+' end as price_range FROM amazon_fee WHERE true) AS  price_summaries GROUP BY price_range ORDER BY price_range")
    priceRangeRdd.show()


    //购买前2名的类型的商品
    val top3Rdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE Category = 'Home' OR Category = 'Toys & Games'")
    top3Rdd.show()
    top3Rdd.write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\Home_ToysGames_BestSeller")


  }
}
作者 east
Spark 12月 14,2018

spark实例1:统计一个 10 万人口的所有人的平均年龄

生成10万人口的生成文件代码:

object SampleDataFileGenerator {

  def main(args:Array[String]) {
    val writer = new FileWriter(new File("d:\\sample_age_data.txt"),false)
    val rand = new Random()
    for ( i <- 1 to 10000000) {
      writer.write( i + " " + rand.nextInt(100))
      writer.write(System.getProperty("line.separator"))
    }
    writer.flush()
    writer.close()
  }
}



使用RDD进行计算平均年龄:
 要计算平均年龄,那么首先需要对源文件对应的 RDD 进行处理,也就是将它转化成一个只包含年龄信息的 RDD,
 其次是计算元素个数即为总人数,然后是把所有年龄数加起来,最后平均年龄=总年龄/人数。  
 对于第一步我们需要使用 map 算子把源文件对应的 RDD 映射成一个新的只包含年龄数据的 RDD,
 很显然需要对在 map 算子的传入函数中使用 split 方法,得到数组后只取第二个元素即为年龄信息;
 第二步计算数据元素总数需要对于第一步映射的结果 RDD 使用 count 算子;
 第三步则是使用 reduce 算子对只包含年龄信息的 RDD 的所有元素用加法求和;最后使用除法计算平均年龄即可。
object AvgAgeCalculator {
  def main(args:Array[String]) {
   /* if (args.length < 1){
      println("Usage:AvgAgeCalculator datafile")
      System.exit(1)
    }*/
    val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val dataFile = sc.textFile("d:\\sample_age_data.txt", 5);
    val count = dataFile.count()
    val ageData = dataFile.map(line => line.split(" ")(1))
    val totalAge = ageData.map(age => Integer.parseInt(
      String.valueOf(age))).collect().reduce((a,b) => a+b)
    println("Total Age:" + totalAge + ";Number of People:" + count )
    val avgAge : Double = totalAge.toDouble / count.toDouble
    println("Average Age is " + avgAge)
  }
}


作者 east
Spark 12月 14,2018

使用spark-shell从本地读取文件

利用spark-shell 写个小例子,读取本地文件。

想当然写成val file=“E:///test/word“,结果报错Input path does not exist,认真检查路径觉得路径没错,后来发现,从本地读文件应该是这样写:

file:///test/word。

作者 east
Spark 12月 14,2018

spark2.0+读写csv文件

spark2.0+不用集成第三方的库,可以很方便进行读写csv文件

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkReadFile {
def main(args: Array[String]): Unit = {
val localpath=”E:\\input\\test.csv”
val outpath=”E:\\output\\word”
val conf = new SparkConf()
conf.setAppName(“SparkReadFile”)
conf.setMaster(“local”)
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)
//读csv文件
val data: DataFrame = sqlContext.read.format(“com.databricks.spark.csv”)
.option(“header”, “false”) //在csv第一行有属性”true”,没有就是”false”
.option(“inferSchema”, true.toString) //这是自动推断属性列的数据类型
.load(localpath)
// data.show()
// 写csv文件
data.repartition(1).write.format(“com.databricks.spark.csv”)
.option(“header”, “false”)//在csv第一行有属性”true”,没有就是”false”
.option(“delimiter”,”,”)//默认以”,”分割
.save(outpath)
sparkContext.stop()
}
}

 

作者 east

上一 1 2 3

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.