gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 开发博客
  • bug清单
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Kafka

分类归档数据挖掘

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "数据挖掘"
Spark, 数据挖掘 11月 17,2020

Spark新闻App点击率预估实践案例

import org.apache.spark.sql.{SparkSession}

//action:userid~ docid ~behaivor(label)~time~ip
//160520092238579653~160704235940001~0~20160705000040909~1.49.185.165
//160520092238579653~160704235859003~0~20160705000040909~1.49.185.165
//define case class for action data
case class Action(docid: String, label:Int)

//document:docid ~ channelname ~ source ~ keyword:score
//160705131650005~科技~偏执电商~支付宝:0.17621 医疗:0.14105 复星:0.07106 动作:0.05235 邮局:0.04428
//160705024106002~体育~平大爷的刺~阿杜:0.23158 杜兰特:0.09447 巨头:0.08470 拯救者:0.06638 勇士:0.05453
//define case class for document data
case class Dccument(docid: String, channal: String, source: String, tags: String)

object GenTrainingData {
  def main(args: Array[String]): Unit = {

    //2rd_data/ch09/action.txt 2rd_data/ch09/document.txt output/ch11 local[2]
    val Array(actionPath, documentPath, output, mode) = args
    // 创建Spark实例
    val spark = SparkSession.builder
      .master(mode)
      .appName(this.getClass.getName)
      .getOrCreate()

    import spark.implicits._
    val ActionDF = spark.sparkContext.textFile(actionPath).map(_.split("~"))
      .map(x => Action(x(1).trim.toString, x(2).trim.toInt))
      .toDF()
    // Register the DataFrame as a temporary view
    //ActionDF.createOrReplaceTempView("actiondf")

    val documentDF = spark.sparkContext.textFile(documentPath).map(_.split("~")).filter(_.length > 3)
      .map { case x =>
        val xtags = x(3).split(" ").filter(_.length > 0).map { b => b.substring(0, b.indexOf(":")) }.mkString("|")
        Dccument(x(0).trim.toString, x(1).trim.toString, x(2).trim.toString, xtags.toString)
      }
      .toDF()
    // Register the DataFrame as a temporary view
    //documentDF.createOrReplaceTempView("documentdf")

    // 将查询结果放到tempDF中,完成dataframe转化
    //val tempDF = spark.sql("select actiondf.docid,actiondf.label,documentdf.channal,documentdf.source,documentdf.tags from actiondf,documentdf where actiondf.docid = documentdf.docid")
    val tempDF = documentDF.join(ActionDF, documentDF("docid").equalTo(ActionDF("docid")))
    //tempDF.select($"tags").show(100)

    // 编码格式转换
    val minDF = tempDF.select($"tags").rdd
      .flatMap{ x => x.toString.replace("[","").replace("]","").split('|') }.distinct
    //minDF.coalesce(1).saveAsTextFile(output+"/tags")
    val indexes = minDF.collect().zipWithIndex.toMap
    println(indexes.toList.length) //23937
    //
    val libsvmDF = tempDF.select($"label", $"tags").map {
      x =>
        val label = x(0)
        val terms = x(1).toString.replace("[","").replace("]","")
          .split('|') //使用单引号
          .map(v => (indexes.get(v).getOrElse(-1)+1, 1)) //索引从0开始
          .sortBy(_._1) //libsvm 需要升序
          .map(x => x._1 + ":" + x._2)
          .mkString(" ")
        (label.toString + " " + terms)
    }
    libsvmDF.show(100)

    //保存模型时存在:Exception while deleting local spark dir,不影响结果生成,作为已知问题暂时搁置。
    //libsvmDF.coalesce(1).write.format("text").save(output+"/model")
    //libsvmDF.rdd.coalesce(1).saveAsTextFile(output+"/model")
    val Array(trainingdata, testdata) = libsvmDF.randomSplit(Array(0.7, 0.3))
    trainingdata.rdd.coalesce(1).saveAsTextFile(output+"/training")
    testdata.rdd.coalesce(1).saveAsTextFile(output+"/test")
    //
    //spark.stop()
  }
}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

object LRTrainAndTest {

  def main(args: Array[String]) {

    if (args.length < 8) {
      System.err.println("Usage:LRTrainAndTest <trainingPath> <testPath> <output> <numFeatures> <partitions> <RegParam> <NumIterations> <NumCorrections>")
      System.exit(1)
    }

    //2rd_data/ch11/test/part-00000 2rd_data/ch11/training/part-00000 output/ch11/label 23937 50 0.01 100 10
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("ADTest with logistic regression")
    val sc = new SparkContext(conf)
    val numFeatures = args(3).toInt //特征数23937
    val partitions = args(4).toInt //一般50-1000

    //label channal source tags
    //依次为:类别(是否点击,点击为1,没有点击为0)、频道、来源、关键词
    //样例:1 娱乐 腾讯娱乐 曲妖精|棉袄|王子文|老大爷|黑色

    // 导入训练样本和测试样本
    val training = MLUtils.loadLibSVMFile(sc,args(0),numFeatures,partitions)
    val test = MLUtils.loadLibSVMFile(sc,args(1),numFeatures,partitions)

    val lr = new LogisticRegressionWithLBFGS()

    //训练参数设置
    lr.optimizer.setRegParam(args(5).toDouble) //0.01
      .setNumIterations(args(6).toInt) //100
      .setNumCorrections(args(7).toInt) //10

    //训练
    val lrModel = lr.setNumClasses(2).run(training)//2分类
    lrModel.clearThreshold()

    //预测打分
    val predictionAndLabel = test.map(p=>(lrModel.predict(p.features),p.label))
    predictionAndLabel.map(x=>x._1+"\t"+x._2).repartition(1)
      .saveAsTextFile(args(2))
    val metrics = new BinaryClassificationMetrics(predictionAndLabel)

    //计算AUC
    val str = s"the value of auc is ${metrics.areaUnderROC()}"
    println(str)
  }
}
作者 east
Spark, 数据挖掘 11月 17,2020

Spark企业法人建模案例

数据格式如下:

字段含义参考上一节。

样例如下:

package com.koala.ch12

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.LogisticRegression

object CreditModel {

  // 创建评分模型属性class,对字段进行命名
  // 0,1,37,10,0,3,18,7,4
  case class Credit(load_label:Double,gender:Double,age:Double,yearsmarried:Double,children:Double,housenumber:Double,captiallevel:Double,facarnumber:Double,pacarnumber:Double)

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    if (args.length < 3){
      System.err.println("Usage:CreditModel <creaditInPath> <outPut> <model>")
      System.exit(1)
    }

    //2rd_data/ch12/creditdata.txt output/ch12/model local[2]
    val Array(creaditInPath,output,mode) = args

    // 创建Spark实例
    val spark = SparkSession.builder
      .master(mode)
      .appName("CreditModel Example")
      .getOrCreate()

    // 加载文本,并创建RDD数据源,将变量的名称赋予各个字段
    // Create an RDD of Credit objects from a text file, convert it to a Dataframe
    import spark.implicits._
    val creditDF = spark.sparkContext.textFile(creaditInPath).map(_.split(","))
      .map(attributes => Credit(attributes(0).trim.toDouble,attributes(1).trim.toDouble,attributes(2).trim.toDouble,attributes(3).trim.toDouble,attributes(4).trim.toDouble,attributes(5).trim.toDouble,attributes(6).trim.toDouble,attributes(7).trim.toDouble,attributes(8).trim.toDouble))
      .toDF()

    // Register the DataFrame as a temporary view
    // 创建临时视图
    creditDF.createOrReplaceTempView("creditdf")

    // 将查询结果放到sqlDF中,完成dataframe转化
    val sqlDF = spark.sql("select * from creditdf")
    sqlDF.show()

    // 自变量的列名
    val colArray2 = Array("gender","age","yearsmarried","children","housenumber","captiallevel","facarnumber","pacarnumber")
    // 设置DataFrame自变量集,并将这些变量统称为"features"
    val vecDF: DataFrame = new VectorAssembler().setInputCols(colArray2).setOutputCol("features").transform(sqlDF)

    // 按7:3划分成训练集和测试集,训练集为trainingDF,测试集为testDF
    val Array(trainingDF,testDF) = vecDF.randomSplit(Array(0.7, 0.3), seed=132) //seed随机算法从该数字开始生成随机数字

    // 建立逻辑回归模型,设置目标变量(标签)和自变量集,在训练集上训练
    val lrModel = new LogisticRegression().setLabelCol("load_label").setFeaturesCol("features").fit(trainingDF)
    // 输出逻辑回归的系数和截距
    println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

    // 惩罚项,如果是0,是L2惩罚,如果是0-1之间是混合,如果是1,则是L1惩罚,默认是L2
    lrModel.getElasticNetParam
    // 正则化的参数,一般大于等于0,默认是0
    lrModel.getRegParam
    // 拟合之前是否需要标准化,默认是true
    lrModel.getStandardization
    // 二分类中设置阈值,范围为[0,1],如果类标签的1的概率大于该阈值,则会判定为1,默认是0.5
    lrModel.getThreshold
    // 设置迭代的收敛容限,默认值为1e-6
    lrModel.getTol

    // 使用测试集进行预测,包括原始的字段,在加上综合的自变量集字段features,预测的原始值,转化的概率值,预测的类别
    lrModel.transform(testDF).show

    //具体的查看features,预测的原始值,转化的概率值,预测的类别
    lrModel.transform(testDF).select("features","rawPrediction","probability","prediction").show(30,false)

    //查看模型训练过程中损失的迭代情况
    val trainingSummary = lrModel.summary
    val objectiveHistory = trainingSummary.objectiveHistory
    objectiveHistory.foreach(loss => println(loss))
    //保存模型
    lrModel.save(output)
    //
    spark.close()
  }
}
作者 east
Spark, 数据挖掘 11月 17,2020

Spark道路拥堵模式聚类案例

从基础的道路通行状态统计、分析、聚类等维度开展对某个城市道路拥堵情况的分析和研究。

13.3.2 数据预处理根据给定的某地图路况数据,首先进行数据预处理工作,清洗原始数据并去除缺失数据、错误数据,根据道路ID进行数据汇集,计算拥堵指数。1)清除缺失数据:清除字段为空记录;

2)清除错误数据:清除字段错误记录;

3)根据道路ID进行道路拥堵指数聚合;

4)根据时间进行道路拥堵指数排序。

13.3.3 特征构建仍然以半小时为最小时间粒度(每日24小时划分为48维时间片),并对道路拥堵指数按时间片进行聚合计算,同时按照48维时间片进行拥堵指数排列。具体处理过程以及代码如下:

package com.koala.ch13

import org.apache.spark.{SparkConf, SparkContext}
import java.text.SimpleDateFormat
import java.util.Calendar

import breeze.linalg.Counter
import org.apache.log4j.{Level, Logger}

object CrowdModel {

  def main(args: Array[String]){
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    if (args.length < 2) {
      System.err.println("Usage:CrowdModel <InPath> <OutPut> <Model>")
      System.exit(1)
    }

    // 2rd_data/ch13/user_location_sample.txt output/ch13/CrowdModel local[2]
    val Array(input,output,mode) = args

    //初始化SparkContext
    val conf = new SparkConf()
      .setMaster(mode)//.setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    // 位置筛选
    // 清洗数据,通过split(",")切分数据,得到 User_id Time_stamp Cell_id三个维度的数据列表。
    // (Time_stamp,Cell_id,User_id)-> (User_id,Time_stamp,Cell_id)
    // 20160501055052,209059,898
    val data = sc.textFile(input).map(_.split(",")).map {
      x => (x(2), x(0), x(1))
    }
    //data.coalesce(1).saveAsTextFile(output)

    // 根据Time_stamp分析当前日期为工作日或节假日,并添加time标签标识HH:mm,work_flag标签标识工作日(work_falg=1)或节假日(work_flag=0)
    // 输出:(User_id,work_flag,date_time,Cell_id)
    val preData = data.map {
      case (preUser_id, preTime_stamp, preCell_id) => {

        //将日期转变成星期,获取工作日(星期一至星期五)和非工作日(星期六、星期日)
        // 20160501 055052
        val sdf = new SimpleDateFormat("yyyyMMddHHmmss") //24小时工作制
        val date = sdf.parse(preTime_stamp)
        val cal = Calendar.getInstance
        cal.setTime(date)
        var w = cal.get(Calendar.DAY_OF_WEEK) - 1
        // 工作日默认为1 非工作日默认为0
        var work_flag = 1
        if (w <= 0 || w >= 6) {
          work_flag = 0
        }

        // 按照30分钟间隔处理时间
        val time_ = preTime_stamp.substring(8, 12)
        // 截取指定位置的元素,前包括后不包括
        var minute_ = "00"
        if (time_.substring(2).toInt >= 30) {
          minute_ = "30"
        }
        val date_time = time_.toString.substring(0, 2) + minute_
        ((preUser_id, work_flag, date_time, preCell_id), 1)
      }
    }
    //preData.coalesce(1).saveAsTextFile(output)

    //使用reduceByKey(_+_)对(User_id,work_flag,date_time,Cell_id)访问次数进行聚合,根据聚合结果,选择用户某段时间在30分钟内划分访问次数最多的基站为标准访问地点。
    val aggData = preData.reduceByKey(_ + _)
      .map { x => ((x._1._1, x._1._2, x._1._3), (x._1._4, x._2)) }
      .reduceByKey((a, b) => if (a._2 > b._2) a else b)//选取访问次数最多的cell
    //aggData.coalesce(1).saveAsTextFile(output)

    //获取用户工作日24小时访问地点cell_id、节假日24小时访问地点cell_id,以30分钟为最小时间粒度划分时间片,得到user_id工作日48维时间片访问cell_id和节假日48维时间片访问cell_id,共计96维时间片。
    //(User_id,work_flag,date_time),(Cell_id,nums)->(User_id,work_flag),(date_time,Cell_id)

    val slotData = aggData.map { x => ((x._1._1, x._1._2), (x._1._3 + ":" + x._2._1)) }.reduceByKey(_ + ";" + _)
    //slotData.coalesce(1).saveAsTextFile(output)

    // 位置编码
    // 根据聚合结果,提取所有用户访问的基站进行重新编码,获得用户访问位置列表cell_id,并进行排序去重
    // (User_id,work_flag,date_time),(Cell_id,nums)
    val minCell = aggData.map(x => x._2._1).sortBy(x=>x.toLong,true).collect().distinct
    println(minCell.toList)
    //使用zip方法从1开始对用户访问地址进行编码,并将编码进行保存。
    val index_list = minCell.zip(Stream from 1).toMap
    println(index_list)
    //得到的index_list即是用户访问位置编码特征向量。
  }
}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

object CleanCongestionData {

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    if (args.length < 2) {
      System.err.println("Usage:CleanCongestionData <InPath> <OutPut> <Model>")
      System.exit(1)
    }

    // 2rd_data/ch13/road_congestion_sample.txt output/ch13/CongestionModel local[2]
    val Array(input,output,mode) = args

    // 初始化SparkContext
    val conf = new SparkConf()
      .setMaster(mode)//.setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    // 计算link的拥堵情况,指定道路、工作日状态、时间片时,link拥堵指数的平均值(四舍五入)取整,
    // key (linkid, work_flag, hour) value (congestion)
    // 85349482;1;20.5;1
    val data = sc.textFile(input).map(_.split(";"))
      .map {x => ((x(0),x(1),x(2)),x(3))}
      .groupByKey().mapValues(x=>{
        val a = x.toList.reduceLeft((sum,i)=>sum +i)//拥堵指数求和
        val b = x.toList.length
        Math.round(a.toInt/b)//平均拥堵指数
      })
    //data.coalesce(1).saveAsTextFile(output)

    // 根据key聚合数据后,使用hour 进行排序 并删除hour数据
    // key (linkid,work_flag, hour) value (congestion)->(linkid) value(work_flag,congestion)
    val collectData = data.sortBy(x=>x._1._3).map(x => ((x._1._1),(x._1._2+":"+x._2))).reduceByKey(_ + ";" + _)
    collectData.coalesce(1).saveAsTextFile(output)
  }
}
作者 east
python, 人工智能, 数据挖掘 10月 8,2020

python多项式回归代码实现

多项式回归是在上文python源码实现线性回归并绘图

基础上实现的,要实现下面的多项式

可以用矩阵相乘来实现

代码如下:

import numpy as np
import matplotlib.pyplot as plt

# 读入训练数据
train = np.loadtxt('click.csv', delimiter=',', dtype='int', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

# 标准化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 参数初始化
theta = np.random.rand(3)

# 创建训练数据的矩阵
def to_matrix(x):
    return np.vstack([np.ones(x.size), x, x ** 2]).T

X = to_matrix(train_z)

# 预测函数
def f(x):
    return np.dot(x, theta)

# 目标函数
def E(x, y):
    return 0.5 * np.sum((y - f(x)) ** 2)

# 学习率
ETA = 1e-3

# 误差的差值
diff = 1

# 更新次数
count = 0

# 直到误差的差值小于 0.01 为止,重复参数更新
error = E(X, train_y)
while diff > 1e-2:
    # 更新结果保存到临时变量
    theta = theta - ETA * np.dot(f(X) - train_y, X)

    # 计算与上一次误差的差值
    current_error = E(X, train_y)
    diff = error - current_error
    error = current_error

    # 输出日志
    count += 1
    log = '第 {} 次 : theta = {}, 差值 = {:.4f}'
    print(log.format(count, theta, diff))

# 绘图确认
x = np.linspace(-3, 3, 100)
plt.plot(train_z, train_y, 'o')
plt.plot(x, f(to_matrix(x)))
plt.show()

最后输出效果如下:

作者 east
python, 数据挖掘 10月 8,2020

python源码实现线性回归并绘图

用python实现一次函数 的线性回归。把fθ(x)作为一次函数来实现吧。我们要实现下面这样的fθ(x)和目标函数E(θ)。

要把下面的训练数据变成平均值为0、方差为1的数据。作用是参数的收敛会更快。这种做法也被称为标准化或者z-score规范化,变换表达式是这样的。µ是训练数据的平均值,σ是标准差。

参数更新表达式如下:

训练数据如下:

x	y
235	591
216	539
148	413
35	310
85	308
204	519
49	325
25	332
173	498
191	498
134	392
99	334
117	385
112	387
162	425
272	659
159	400
159	427
59	319
198	522

import numpy as np
import matplotlib.pyplot as plt

# 读入训练数据
train = np.loadtxt('click.csv', delimiter=',', dtype='int', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

# 标准化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 参数初始化
theta0 = np.random.rand()
theta1 = np.random.rand()

# 预测函数
def f(x):
    return theta0 + theta1 * x

# 目标函数
def E(x, y):
    return 0.5 * np.sum((y - f(x)) ** 2)

# 学习率
ETA = 1e-3

# 误差的差值
diff = 1

# 更新次数
count = 0

# 直到误差的差值小于 0.01 为止,重复参数更新
error = E(train_z, train_y)
while diff > 1e-2:
    # 更新结果保存到临时变量
    tmp_theta0 = theta0 - ETA * np.sum((f(train_z) - train_y))
    tmp_theta1 = theta1 - ETA * np.sum((f(train_z) - train_y) * train_z)

    # 更新参数
    theta0 = tmp_theta0
    theta1 = tmp_theta1

    # 计算与上一次误差的差值
    current_error = E(train_z, train_y)
    diff = error - current_error
    error = current_error

    # 输出日志
    count += 1
    log = '第 {} 次 : theta0 = {:.3f}, theta1 = {:.3f}, 差值 = {:.4f}'
    print(log.format(count, theta0, theta1, diff))

# 绘图确认
x = np.linspace(-3, 3, 100)
plt.plot(train_z, train_y, 'o')
plt.plot(x, f(x))
plt.show()

最终输出图形如下:

作者 east

标签

flex布局 github mysql O2O UI控件 不含后台 交流 体育 共享经济 出行 单机类 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 旅游 日历 时钟 流量主 物流 用户系统 电商 画图 画布(canvas) 社交 签到 算命 联网 装修 解锁 评论 读书 读音 资讯 阅读 预订

官方QQ群

1群:74052405

薅羊毛交流群: 952493060

近期文章

  • android http请求带中文参数会乱码(url编码)
  • mysql主从复制
  • 修改my.ini导致启动不了服务
  • Android聊天输入框控件
  • Android直播间刷礼物动画控件
  • Android自定义交易密码框
  • Android记录文件日志工具类
  • Android常用图片操作工具类
  • Android倒计时工具类
  • Android压缩解压工具

文章归档

  • 2021年一月
  • 2020年十二月
  • 2020年十一月
  • 2020年十月
  • 2020年九月
  • 2020年八月
  • 2020年七月
  • 2020年六月
  • 2020年五月
  • 2020年四月
  • 2020年三月
  • 2020年二月
  • 2020年一月
  • 2019年七月
  • 2019年六月
  • 2019年五月
  • 2019年四月
  • 2019年三月
  • 2019年二月
  • 2019年一月
  • 2018年十二月
  • 2018年七月
  • 2018年六月

分类目录

  • Android (30)
  • bug清单 (26)
  • Fuchsia (15)
  • php (1)
  • python (2)
  • 人工智能 (1)
  • 大数据开发 (119)
    • Elasticsearch (6)
    • Flink (3)
    • Hadoop (11)
    • Hbase (8)
    • Hive (1)
    • Java (27)
    • Kafka (1)
    • solr (1)
    • Spark (42)
    • spring (8)
    • 数据仓库 (1)
    • 数据挖掘 (5)
    • 运维 (4)
  • 小游戏代码 (1)
  • 小程序代码 (111)
    • O2O (15)
    • UI控件 (3)
    • 互联网类 (17)
    • 企业类 (5)
    • 地图定位 (9)
    • 多媒体 (5)
    • 工具类 (19)
    • 电商类 (18)
    • 社交 (5)
    • 行业软件 (7)
    • 资讯读书 (7)
  • 开发博客 (5)
  • 数据库 (2)
  • 未分类 (6)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.