gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Spark

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Spark"
  • ( 页面8 )
Spark 2月 11,2019

Spark ML机器学习:连续型数据处理之给定分位数离散化-QuantileDiscretizer

QuantileDiscretizer输入连续的特征列,输出分箱的类别特征。分箱数是通过参数numBuckets来指定的。 箱的范围是通过使用近似算法(见approxQuantile )来得到的。 近似的精度可以通过relativeError参数来控制。当这个参数设置为0时,将会计算精确的分位数。箱的上边界和下边界分别是正无穷和负无穷时, 取值将会覆盖所有的实数值。

例子

  假设我们有下面的DataFrame,它的列名是id,hour。

 id | hour
----|------
 0  | 18.0
----|------
 1  | 19.0
----|------
 2  | 8.0
----|------
 3  | 5.0
----|------
 4  | 2.2

  hour是类型为DoubleType的连续特征。我们想将连续特征转换为一个分类特征。给定numBuckets为3,我们可以得到下面的结果。

id  | hour | result
----|------|------
 0  | 18.0 | 2.0
----|------|------
 1  | 19.0 | 2.0
----|------|------
 2  | 8.0  | 1.0
----|------|------
 3  | 5.0  | 1.0
----|------|------
 4  | 2.2  | 0.0

// $example on$
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.QuantileDiscretizer
// $example off$
import org.apache.spark.sql.SparkSession

/**
* 连续型数据处理之给定分位数离散化
*/
object QuantileDiscretizerExample {
def main(args: Array[String]) {
val sparkConf = new SparkConf();
sparkConf.setMaster("local[*]").setAppName(this.getClass.getSimpleName)
val spark = SparkSession
.builder
.config(sparkConf)
.appName("QuantileDiscretizerExample")
.getOrCreate()

// $example on$
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")
// $example off$
// Output of QuantileDiscretizer for such small datasets can depend on the number of
// partitions. Here we force a single partition to ensure consistent results.
// Note this is not necessary for normal use cases
.repartition(1)

// $example on$
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show(false)
// $example off$

spark.stop()
}
}

结果:

+—+—-+——+
|id |hour|result|
+—+—-+——+
|0 |18.0|2.0 |
|1 |19.0|2.0 |
|2 |8.0 |1.0 |
|3 |5.0 |1.0 |
|4 |2.2 |0.0 |
+—+—-+——+

作者 east
Spark 2月 11,2019

Spark ML机器学习:连续型数据处理之给定边界离散化-Bucketizer

Bucketizer将连续的特征列转换成特征桶(buckets)列。这些桶由用户指定。它拥有一个splits参数。 例如商城的人群,觉得把人分为50以上和50以下太不精准了,应该分为20岁以下,20-30岁,30-40岁,36-50岁,50以上,那么就得用到数值离散化的处理方法了。离散化就是把特征进行适当的离散处理,比如上面所说的年龄是个连续的特征,但是我把它分为不同的年龄阶段就是把它离散化了,这样更利于我们分析用户行为进行精准推荐。Bucketizer能方便的将一堆数据分成不同的区间。

  • splits:如果有n+1个splits,那么将有n个桶。桶将由split x和split y共同确定,它的值范围为[x,y),如果是最后 一个桶,范围将是[x,y]。splits应该严格递增。负无穷和正无穷必须明确的提供用来覆盖所有的双精度值,否则,超出splits的值将会被 认为是一个错误。splits的两个例子是Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity) 和 Array(0.0, 1.0, 2.0)。

  注意,如果你并不知道目标列的上界和下界,你应该添加Double.NegativeInfinity和Double.PositiveInfinity作为边界从而防止潜在的 超过边界的异常。下面是程序调用的例子。

object BucketizerDemo {
  def main(args: Array[String]): Unit = {
    var spark = SparkSession.builder().appName("BucketizerDemo").master("local[2]").getOrCreate();
    val array = Array((1,13.0),(2,16.0),(3,23.0),(4,35.0),(5,56.0),(6,44.0))
    //将数组转为DataFrame
    val df = spark.createDataFrame(array).toDF("id","age")
    // 设定边界,分为5个年龄组:[0,20),[20,30),[30,40),[40,50),[50,正无穷)
    // 注:人的年龄当然不可能正无穷,我只是为了给大家演示正无穷PositiveInfinity的用法,负无穷是NegativeInfinity。
    val splits = Array(0, 20, 30, 40, 50, Double.PositiveInfinity)
    //初始化Bucketizer对象并进行设定:setSplits是设置我们的划分依据
    val bucketizer = new Bucketizer().setSplits(splits).setInputCol("age").setOutputCol("bucketizer_feature")
    //transform方法将DataFrame二值化。
    val bucketizerdf = bucketizer.transform(df)
    //show是用于展示结果
    bucketizerdf.show
  }

}

输出结果:

+---+----+------------------+
| id| age|bucketizer_feature|
+---+----+------------------+
|  1|13.0|               0.0|
|  2|16.0|               0.0|
|  3|23.0|               1.0|
|  4|35.0|               2.0|
|  5|56.0|               4.0|
|  6|44.0|               3.0|
+---+----+------------------+
作者 east
Spark 2月 11,2019

Spark ML机器学习:连续型数据处理之二值化-Binarizer


Binarization是一个将数值特征转换为二值特征的处理过程。threshold参数表示决定二值化的阈值。 值大于阈值的特征二值化为1,否则二值化为0。 例如商城有个需求, 根据年龄来进行物品推荐,把50以上的人分为老年,50以下分为非老年人,那么我们根据二值化可以很简单的把50以上的定为1,50以下的定为0。这样就方便我们后续的推荐了。Binarizer就是根据阈值进行二值化,大于阈值的为1.0,小于等于阈值的为0.0


// $example on$
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.Binarizer
// $example off$
import org.apache.spark.sql.SparkSession

/**
* 二值化
*/
object BinarizerExample {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf();
sparkConf.setMaster("local[*]").setAppName(this.getClass.getSimpleName)
val spark = SparkSession
.builder
.config(sparkConf)
.appName("BinarizerExample")
.getOrCreate()

// $example on$
val data = Array((0, 0.1), (1, 0.8), (2, 0.6))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")
// transform 开始转换,将该列数据二值化,大于阈值的为1.0,否则为0.0
val binarizer: Binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)

println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
// $example off$

spark.stop()
}
}

输出结果:

+---+----+-----------------+
| id| age|binarized_feature|
+---+----+-----------------+
|  1|34.0|              0.0|
|  2|56.0|              1.0|
|  3|58.0|              1.0|
|  4|23.0|              0.0|
+---+----+-----------------+
作者 east
Spark 1月 8,2019

Spark数据挖掘实例1:基于 Audioscrobbler 数据集音乐推荐

本实例来源于《Spark高级数据分析》,这是一个很好的spark数据挖掘的实例。从经验上讲,推荐引擎属于大规模机器学习,在日常购物中大家或许深有体会,比如:你在淘宝上浏览了一些商品,或者购买了一些商品,那么淘宝就会根据你的偏好给你推荐一些其他类似的商品。然而,相比较其他机器学习算法,推荐引擎的输出更加的直观,有时候的推荐效果让人吃惊。作为机器学习开篇文章,本篇文章会系统的介绍基于Audioscrobbler数据集的音乐推荐。

数据集介绍

Audioscrobbler数据集是一个公开发布的数据集,读者可以在(https://github.com/libaoquan95/aasPractice/tree/master/c3/profiledata_06-May-2005)网站获取。数据集主要有三部分组成,user_artist_data.txt文件是主要的数据集文件记录了约2420条用户id、艺术家id以及用户收听艺术家歌曲的次数数据,包含141000个用户和160万个艺术家;artist_data.txt文件记录了艺术家id和对应的名字;artist_alias.txt记录了艺术家id和对应的别称id。

推荐算法介绍

由于所选取的数据集只记录了用户和歌曲之间的交互情况,除了艺术家名字之外没有其他信息。因此要找的学习算法不需要用户和艺术家的属性信息,这类算法通常被称为协同过滤。如果根据两个用户的年龄相同来判断他们可能具有相似的偏好,这不叫协同过滤。相反,根据两个用户播放过许多相同歌曲来判断他们可能都喜欢某首歌,这是协调过滤。

本篇所用的算法在数学上称为迭代最小二乘,把用户播放数据当成矩阵A,矩阵低i行第j列上的元素的值,代表用户i播放艺术家j的音乐。矩阵A是稀疏的,绝大多数元素是0,算法将A分解成两个小矩阵X和Y,既A=XYT,X代表用户特征矩阵,Y代表特征艺术家矩阵。两个矩阵的乘积当做用户-艺术家关系矩阵的估计。可以通过下边一组图直观的反映:

现在假如有5个听众,音乐有5首,那么A是一个5*5的矩阵,假如评分如下:

图2.1 用户订阅矩阵

假如d是三个属性,那么X的矩阵如下:

 

图2.2 用户-特征矩阵

Y的矩阵如下:

图2.3 特征-电影矩阵

实际的求解过程中通常先随机的固定矩阵Y,则,为提高计算效率,通常采用并行计算X的每一行,既。得到X之后,再反求出Y,不断的交替迭代,最终使得XYT与A的平方误差小于指定阈值,停止迭代,得到最终的X(代表用户特征矩阵)和Y矩阵(代表特征艺术家矩阵)。在根据最终X和Y矩阵结果,向用户进行推荐。

 

数据准备

首先将样例数据上传到HDFS,如果想要在本地测试这些功能的话,需要内存数量至少 6g, 当然可以通过减少数据量来达到通用的测试。

object RunRecommender {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf();
    conf.setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // Optional, but may help avoid errors due to long lineage
   // spark.sparkContext.setCheckpointDir("hdfs:///tmp/")
    spark.sparkContext.setCheckpointDir("d:///tmp/")

    //val base = "hdfs:///user/ds/"
    val base =  "E:/newcode/spark/aas/data/";
    val rawUserArtistData = spark.read.textFile(base + "user_artist_data.txt")
    val rawArtistData = spark.read.textFile(base + "artist_data.txt")
    val rawArtistAlias = spark.read.textFile(base + "artist_alias.txt")

    val runRecommender = new RunRecommender(spark)
    runRecommender.preparation(rawUserArtistData, rawArtistData, rawArtistAlias)
    runRecommender.model(rawUserArtistData, rawArtistData, rawArtistAlias)
    runRecommender.evaluate(rawUserArtistData, rawArtistAlias)
    runRecommender.recommend(rawUserArtistData, rawArtistData, rawArtistAlias)
  }

}


def preparation(
    rawUserArtistData: Dataset[String],
    rawArtistData: Dataset[String],
    rawArtistAlias: Dataset[String]): Unit = {

  rawUserArtistData.take(5).foreach(println)

  val userArtistDF = rawUserArtistData.map { line =>
    val Array(user, artist, _*) = line.split(' ')
    (user.toInt, artist.toInt)
  }.toDF("user", "artist")

  userArtistDF.agg(min("user"), max("user"), min("artist"), max("artist")).show()

  val artistByID = buildArtistByID(rawArtistData)
  val artistAlias = buildArtistAlias(rawArtistAlias)

  val (badID, goodID) = artistAlias.head
  artistByID.filter($"id" isin (badID, goodID)).show()
}

/**
  * 过滤无效的用户艺术家ID和名字行,将格式不正确的数据行剔除掉。
  * @param rawArtistData
  * @return
  */
def buildArtistByID(rawArtistData: Dataset[String]): DataFrame = {
  rawArtistData.flatMap { line =>
    val (id, name) = line.span(_ != '\t')
    if (name.isEmpty) {
      None
    } else {
      try {
        Some((id.toInt, name.trim))
      } catch {
        case _: NumberFormatException => None
      }
    }
  }.toDF("id", "name")
}

/**
  * 过滤艺术家id和对应的别名id,将格式拼写错误的行剔除掉。
  * @param rawArtistAlias
  * @return
  */
def buildArtistAlias(rawArtistAlias: Dataset[String]): Map[Int,Int] = {
  rawArtistAlias.flatMap { line =>
    val Array(artist, alias) = line.split('\t')
    if (artist.isEmpty) {
      None
    } else {
      Some((artist.toInt, alias.toInt))
    }
  }.collect().toMap
}

代码中模型训练好之后,预测了用户 2093760 的推荐结果,我测试结果如下,由于里面代码使用了随机生成初始矩阵,每个人的结果都有可能不一样。

Some((2814,50 Cent))
Some((829,Nas))
Some((1003249,Ludacris))
Some((1001819,2Pac))
Some((1300642,The Game))

代码中也给出了该用户以前听过的艺术家的名字如下:

Some((1180,David Gray))
Some((378,Blackalicious))
Some((813,Jurassic 5))
Some((1255340,The Saw Doctors))
Some((942,Xzibit))

模型评价

auc评价方法

def areaUnderCurve(
    positiveData: DataFrame,
    bAllArtistIDs: Broadcast[Array[Int]],
    predictFunction: (DataFrame => DataFrame)): Double = {

  // What this actually computes is AUC, per user. The result is actually something
  // that might be called "mean AUC".

  // Take held-out data as the "positive".
  // Make predictions for each of them, including a numeric score
  val positivePredictions = predictFunction(positiveData.select("user", "artist")).
    withColumnRenamed("prediction", "positivePrediction")

  // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
  // small AUC problems, and it would be inefficient, when a direct computation is available.

  // Create a set of "negative" products for each user. These are randomly chosen
  // from among all of the other artists, excluding those that are "positive" for the user.
  val negativeData = positiveData.select("user", "artist").as[(Int,Int)].
    groupByKey { case (user, _) => user }.
    flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
      val random = new Random()
      val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
      val negative = new ArrayBuffer[Int]()
      val allArtistIDs = bAllArtistIDs.value
      var i = 0
      // Make at most one pass over all artists to avoid an infinite loop.
      // Also stop when number of negative equals positive set size
      while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
        val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
        // Only add new distinct IDs
        if (!posItemIDSet.contains(artistID)) {
          negative += artistID
        }
        i += 1
      }
      // Return the set with user ID added back
      negative.map(artistID => (userID, artistID))
    }.toDF("user", "artist")

  // Make predictions on the rest:
  val negativePredictions = predictFunction(negativeData).
    withColumnRenamed("prediction", "negativePrediction")

  // Join positive predictions to negative predictions by user, only.
  // This will result in a row for every possible pairing of positive and negative
  // predictions within each user.
  val joinedPredictions = positivePredictions.join(negativePredictions, "user").
    select("user", "positivePrediction", "negativePrediction").cache()

  // Count the number of pairs per user
  val allCounts = joinedPredictions.
    groupBy("user").agg(count(lit("1")).as("total")).
    select("user", "total")
  // Count the number of correctly ordered pairs per user
  val correctCounts = joinedPredictions.
    filter($"positivePrediction" > $"negativePrediction").
    groupBy("user").agg(count("user").as("correct")).
    select("user", "correct")

  // Combine these, compute their ratio, and average over all users
  val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
    select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
    agg(mean("auc")).
    as[Double].first()

  joinedPredictions.unpersist()

  meanAUC
}

完整代码下载:RunRecommender.scala


						
作者 east
Spark 12月 27,2018

Spark集群的不同启动模式

1、Yarn-Client
spark-submit \
–master spark://SparkMaster:7077 \
–executor-memory 1g \
–total-executor-cores 2 \
–class MySpark \
MyFirstProject.jar
yarn-client可以在终端看到交互结果
2、 Yarn-Cluster
spark-submit \
–master yarn-cluster \
–executor-memory 1g \
–total-executor-cores 2 \
–class MySpark \
MyFirstProject.jar
yarn-cluster不能在终端看到结果,要在http://sparkmaster:8088/cluster,点左侧的FINISHED,找到执行的ApplicationID -> Logs -> stdout

作者 east
Spark 12月 17,2018

spark实例9:Spark Streaming小例子

在服务端安装nc

yum install nmap-ncat.x86_64

并启动

nc -lk 9999

 

客户端代码如下:

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

/**
 * Custom Receiver that receives data over a socket. Received bytes are interpreted as
 * text and \n delimited lines are considered as records. They are then counted and printed.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.CustomReceiver localhost 9999`
 */
object CustomReceiver {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: CustomReceiver <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("CustomReceiver").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create an input stream with the custom receiver on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}


class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
   // There is nothing much to do as the thread calling receive()
   // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
   var socket: Socket = null
   var userInput: String = null
   try {
     logInfo(s"Connecting to $host : $port")
     socket = new Socket(host, port)
     logInfo(s"Connected to $host : $port")
     val reader = new BufferedReader(
       new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
     userInput = reader.readLine()
     while(!isStopped && userInput != null) {
       store(userInput)
       userInput = reader.readLine()
     }
     reader.close()
     socket.close()
     logInfo("Stopped receiving")
     restart("Trying to connect again")
   } catch {
     case e: java.net.ConnectException =>
       restart(s"Error connecting to $host : $port", e)
     case t: Throwable =>
       restart("Error receiving data", t)
   }
  }
}
作者 east
Spark 12月 14,2018

spark实例8:读取mysql数据

/**
 * Created by Administrator on 2017/11/6.
 */
public class SparkMysql {
    public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class);

    public static void main(String[] args) {
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));
        SQLContext sqlContext = new SQLContext(sparkContext);
        //读取mysql数据
        readMySQL(sqlContext);

        //停止SparkContext
        sparkContext.stop();
    }
    private static void readMySQL(SQLContext sqlContext){
        //jdbc.url=jdbc:mysql://localhost:3306/database
        String url = "jdbc:mysql://localhost:3306/test";
        //查找的表名
        String table = "hb_links";
        //增加数据库的用户名(user)密码(password),指定test数据库的驱动(driver)
        Properties connectionProperties = new Properties();
        connectionProperties.put("user","root");
        connectionProperties.put("password","168168");
        connectionProperties.put("driver","com.mysql.jdbc.Driver");

        //SparkJdbc读取Postgresql的products表内容
        System.out.println("读取rangfei数据库中的hb_links表内容");
        // 读取表中所有数据
      //  DataFrame jdbcDF = sqlContext.read().jdbc(url,table,connectionProperties).select("*");
        Dataset<Row> jdbcDF = sqlContext.read().jdbc(url,table,connectionProperties).select("*");

        //显示数据
        jdbcDF.show();
    }
}
作者 east
Spark 12月 14,2018

spark实例7:使用Spark SQL来分析网络日志

case class ApacheAccessLog(
                            ipAddress: String, // IP地址
                              clientId: String, // 客户端唯一标识符
                              userId: String, // 用户唯一标识符
                              serverTime: String, // 服务器时间
                              method: String, // 请求类型/方式
                              endpoint: String, // 请求的资源
                              protocol: String, // 请求的协议名称
                              responseCode: Int, // 请求返回值:比如:200、401
                              contentSize: Long // 返回的结果数据大小
                          ) {

}

object ApacheAccessLog {
  // regex
  // 64.242.88.10 - - [   07/Mar/2004:16:05:49 -0800       ]
  // "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1"
  // 401 12846
  val PARTTERN =
  """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+|-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r

  def isValidatelogLine(log: String): Boolean = {
    val res = PARTTERN.findFirstMatchIn(log)
    if (res.isEmpty) {
      false
    } else {
      true
    }

  }

  def parseLogLine(log: String): ApacheAccessLog = {
    val res = PARTTERN.findFirstMatchIn(log)
    if (res.isEmpty) {
      throw new RuntimeException("Cannot parse log line: " + log)
    }
    val m = res.get

    ApacheAccessLog(
      m.group(1),
      m.group(2),
      m.group(3),
      m.group(4),
      m.group(5),
      m.group(6),
      m.group(7),
      m.group(8).toInt,
      m.group(9).toLong
    )
  }
}


import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Administrator on 2017/4/25.
  */
object LogAnalysis {
  def main(args: Array[String]): Unit = {
    //sqlContext
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("log-analysis-sparksql")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._ //如果不写,下面的转换不成功

    //transform
    val path = "E:\\newcode\\MyFirstProject\\data\\test.log"
    val rdd = sc.textFile(path)
    val apacheAccessDataFrame = rdd
      .filter(line => ApacheAccessLog.isValidatelogLine(line))
      .map(line => {
        ApacheAccessLog.parseLogLine(line)
      }).cache().toDF() //rdd转换为DataFrame

    //register temptable
    apacheAccessDataFrame.registerTempTable("log_analysis_temp_table")
      sqlContext.sql("select * from log_analysis_temp_table limit 1").show()

      //需求一:求contentSize的平均值,最大值以及最小值
      val resultDataFrame1 = sqlContext.sql(

      """
        |SELECT
        |AVG(contentSize) as avg_contentSize,
        |MAX(contentSize) as max_contentSize,
        |MIN(contentSize) as min_contentSize
        |FROM log_analysis_temp_table
      """.stripMargin
      )
      resultDataFrame1.show()

      //save                                         //save as HDFS
      val resultRdd = resultDataFrame1.map(row => {
        val avgSize = row.getAs[Double]("avg_contentSize")
        val minSize = row.getAs[Long]("min_contentSize")
        val maxSize = row.getAs[Long]("max_contentSize")
        (avgSize, minSize, maxSize)
      })
      resultRdd.rdd.saveAsTextFile(s"E:/newcode/MyFirstProject/data/output/sql_${System.currentTimeMillis()}")


    //需求二:求各个返回值出现的数据个数
    val resultDataFrame2 = sqlContext.sql(
      """
        |SELECT
        |responseCode AS code,
        |COUNT(1) AS count
        |FROM log_analysis_temp_table
        |GROUP BY responseCode
      """.stripMargin
    )
    resultDataFrame2.show()
    resultDataFrame2.repartition(1).write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\output\\responseCode")


    //需求三:求访问次数大于N的IP地址,并对黑名单进行限制
    val blackIP = Array("200-55-104-193.ds1.prima.net.ar", "10.0.0.153", "208-38-57-205.ip.cal.radiant.net")
    val N = 10
    val resultDataFrame3 = sqlContext.sql(
      s"""
         |SELECT
         |ipAddress AS ip,
         |COUNT(1) AS count
         |FROM log_analysis_temp_table
         |WHERE not(ipAddress in(${blackIP.map(ip => s"'${ip}'").mkString(",")}))
         |GROUP BY ipAddress
         |HAVING count>${N}
     """.stripMargin)
    resultDataFrame3.show()


    //需求四:求访问次数最多的前k个endpoint的值
    val k = 50
    val resultDataFrame4 = sqlContext.sql(
      s"""
         |SELECT
         |  t.endpoint,
         |  t.count
         |FROM(
         |SELECT
         |  endpoint,
         |  COUNT(1) AS count
         |FROM log_analysis_temp_table
         |GROUP BY endpoint) t
         |ORDER BY t.count DESC
         |limit ${k}
      """.stripMargin)
    resultDataFrame4.show()
    resultDataFrame4.repartition(1).write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\output\\maxendpoint")

  }
}
作者 east
Spark 12月 14,2018

spark实例6:kafka生产者和消费者实例

mvn配置:

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.4.0</version>
  </dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.54</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>


生产者代码:
import java.util.Properties
import com.alibaba.fastjson.JSONObject
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import scala.util.Random

object OrderProducer {
def main(args: Array[String]): Unit = {

//Kafka参数设置
val topic = "order"
val brokers = "192.168.0.219:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
//创建生产者
val producer = new Producer[String, String](kafkaConfig)

while (true) {
//随机生成10以内ID
val id = Random.nextInt(10)
//创建订单成交事件
val event = new JSONObject();
//商品ID
event.put("id", id)
//商品成交价格
event.put("price", Random.nextInt(10000))

//发送信息
producer.send(new KeyedMessage[String, String](topic, event.toString))
println("Message sent: " + event)
//随机暂停一段时间
Thread.sleep(Random.nextInt(100))
}
}
}


消费者代码:

import com.alibaba.fastjson.JSON
import kafka.click.RedisClient
import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/**
  * Created by Administrator on 2017/8/30.
  */
object OrderConsumer {
  //Redis配置
  val dbIndex = 0
  //每件商品总销售额
  val orderTotalKey = "app::order::total"
  //每件商品每分钟销售额
  val oneMinTotalKey = "app::order::product"
  //总销售额
  val totalKey = "app::order::all"


  def main(args: Array[String]): Unit = {

    // 创建 StreamingContext 时间片为1秒
    val conf = new SparkConf().setMaster("local").setAppName("UserClickCountStat")
    val ssc = new StreamingContext(conf, Seconds(1))

    // Kafka 配置
    val topics = Set("order")
    val brokers = "192.168.0.219:9092"
    /*val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers,
      "serializer.class" -> "kafka.serializer.StringEncoder")
    */

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.0.219:9092,192.168.0.220:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

   // val topics = Array("topicA", "topicB")
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    // 创建一个 direct stream
 //   val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topics)

    //解析JSON
  //  val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line._2)))
  val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line.value())))

    // 按ID分组统计个数与价格总合
    val orders = events.map(x => (x.getString("id"), x.getLong("price"))).groupByKey().map(x => (x._1, x._2.size, x._2.reduceLeft(_ + _)))

    //输出
    orders.foreachRDD(x =>
      x.foreachPartition(partition =>
        partition.foreach(x => {

          println("id=" + x._1 + " count=" + x._2 + " price=" + x._3)

          //保存到Redis中
          val jedis = RedisClient.pool.getResource
          jedis.auth("123456")
          jedis.select(dbIndex)
          //每个商品销售额累加
          jedis.hincrBy(orderTotalKey, x._1, x._3)
          //上一分钟第每个商品销售额
          jedis.hset(oneMinTotalKey, x._1.toString, x._3.toString)
          //总销售额累加
          jedis.incrBy(totalKey, x._3)
          RedisClient.pool.returnResource(jedis)


        })
      ))


    ssc.start()
    ssc.awaitTermination()
  }

}


作者 east
Spark 12月 14,2018

spark实例5:找出词频最高的前K个词

输入数据:

Hello World Bye World
Hello Hadoop Bye Hadoop
Bye Hadoop Hello Hadoop

 

输出结果:

(Hadoop,4)
(Bye,3)
(Hello,3)
(World,2)

 

/**
  * 一是统计词频,二是找出词频最高的前K个词
  */
object TopNBasic {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName(TopNBasic.getClass.getSimpleName)
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile("E:\\newcode\\MyFirstProject\\data\\topn.txt")
    val words = textFile.flatMap(line => line.split(" "))
    val wordPairs = words.map(word => (word, 1))
    val wordCounts = wordPairs.reduceByKey((a,b) => a + b)
 // val groupRDD = textFile.map(line => (line.split(" ")(0),line.split(" ")(1).toInt)).groupByKey()
  //  val top2 = groupRDD.map(pair => (pair._1, pair._2.toList.sortWith(_>_).take(2)))
   val top2 = wordCounts.sortBy(_._2, false).take(4)

    println("wordCounts: ")
    top2.foreach(println)
  }
}
作者 east
Spark 12月 14,2018

spark实例4:求中位数

输入数据:

1 2 3 4 5 6 8 9 11 12 34

 

输出结果:

6

/**
  * 求中位数
  */

object Median {

  def main (args: Array[String]) {

    val conf = new SparkConf().setAppName("Median").setMaster("local")
    val sc = new SparkContext(conf)

    val data = sc.textFile("E:\\newcode\\MyFirstProject\\data\\Median.txt")

    val words = data.flatMap(_.split(" ")).map(word => word.toInt)
    words.collect().foreach(println)
    val number = words.map(word =>(word/4,word)).sortByKey()
    number.collect().foreach(println)
    val pariCount = words.map(word => (word/4,1)).reduceByKey(_+_).sortByKey()
    pariCount.collect().foreach(println)
    val count = words.count().toInt
    var mid =0
    if(count%2 != 0)
    {
      mid = count/2+1
    }else
    {
      mid = count/2
    }

    var temp =0
    var temp1= 0
    var index = 0
    val tongNumber = pariCount.count().toInt

    var foundIt = false
    for(i <- 0 to tongNumber-1 if !foundIt)
    {
      println(pariCount.collectAsMap()(i).toString);
      temp = temp + pariCount.collectAsMap()(i)
      temp1 = temp - pariCount.collectAsMap()(i)
      if(temp >= mid)
      {
        index = i
        foundIt = true
      }
    }
    val tonginneroffset = mid - temp1

    val median = number.filter(_._1==index).takeOrdered(tonginneroffset)
    sc.setLogLevel("ERROR")
    println(median(tonginneroffset-1)._2)
    sc.stop()

  }
}
作者 east
Spark 12月 14,2018

spark实例3:倒排索引

   输入数据文件格式:
   cx1:a,b,c,d,e,f
   cx2:c,d,e,f
   cx3:a,b,c,f
   cx4:a,b,c,d,e,f
   cx5:a,b,e,f
   cx6:a,b,c,d
   cx7:a,b,c,f
   cx8:d,e,f
   cx9:b,c,d,e,f
  
  输出结果:
  a|cx1,cx3,cx4,cx5,cx6,cx7
  b|cx1,cx3,cx4,cx5,cx6,cx7,cx9
  c|cx1,cx2,cx3,cx4,cx6,cx7,cx9
  d|cx1,cx2,cx4,cx6,cx8,cx9
  e|cx1,cx2,cx4,cx5,cx8,cx9
  f|cx1,cx2,cx3,cx4,cx5,cx7,cx8,cx9

import scala.io.Source
object InvertedIndex {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName(InvertedIndex.getClass.getSimpleName)
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    /* 倒排索引InvertedIndex */

    val source = Source.fromFile("D:\\java\\spark\\data\\invertIndex.txt").getLines.toArray

    val cxRDD0 = sc.parallelize(source) /* spark单机读取数据 */

   val rdd2 = cxRDD0.flatMap {

      lines =>

        val line = lines.split(":", -1) /* 拆分数据,以竖杠为拆分条件 */

        line(1).split(",", -1).map {
          /* 再对拆分后的数据,进行第二次拆分 */

          v =>

            (v, line(0)) /* 拼接数据 */

        }

    }
    rdd2.collect().foreach {println}
     rdd2.groupByKey() /* 分组 */

      .sortBy(_._1, true) /* 排序 */

      .foreach(x => println(s"${x._1}|${x._2.mkString(",")}")) /* 格式化输出 */

  }
}
作者 east

上一 1 … 7 8 9 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (491)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.