Spark道路拥堵模式聚类案例

从基础的道路通行状态统计、分析、聚类等维度开展对某个城市道路拥堵情况的分析和研究。

13.3.2 数据预处理根据给定的某地图路况数据,首先进行数据预处理工作,清洗原始数据并去除缺失数据、错误数据,根据道路ID进行数据汇集,计算拥堵指数。1)清除缺失数据:清除字段为空记录;

2)清除错误数据:清除字段错误记录;

3)根据道路ID进行道路拥堵指数聚合;

4)根据时间进行道路拥堵指数排序。

13.3.3 特征构建仍然以半小时为最小时间粒度(每日24小时划分为48维时间片),并对道路拥堵指数按时间片进行聚合计算,同时按照48维时间片进行拥堵指数排列。具体处理过程以及代码如下:

package com.koala.ch13

import org.apache.spark.{SparkConf, SparkContext}
import java.text.SimpleDateFormat
import java.util.Calendar

import breeze.linalg.Counter
import org.apache.log4j.{Level, Logger}

object CrowdModel {

  def main(args: Array[String]){
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    if (args.length < 2) {
      System.err.println("Usage:CrowdModel <InPath> <OutPut> <Model>")
      System.exit(1)
    }

    // 2rd_data/ch13/user_location_sample.txt output/ch13/CrowdModel local[2]
    val Array(input,output,mode) = args

    //初始化SparkContext
    val conf = new SparkConf()
      .setMaster(mode)//.setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    // 位置筛选
    // 清洗数据,通过split(",")切分数据,得到 User_id Time_stamp Cell_id三个维度的数据列表。
    // (Time_stamp,Cell_id,User_id)-> (User_id,Time_stamp,Cell_id)
    // 20160501055052,209059,898
    val data = sc.textFile(input).map(_.split(",")).map {
      x => (x(2), x(0), x(1))
    }
    //data.coalesce(1).saveAsTextFile(output)

    // 根据Time_stamp分析当前日期为工作日或节假日,并添加time标签标识HH:mm,work_flag标签标识工作日(work_falg=1)或节假日(work_flag=0)
    // 输出:(User_id,work_flag,date_time,Cell_id)
    val preData = data.map {
      case (preUser_id, preTime_stamp, preCell_id) => {

        //将日期转变成星期,获取工作日(星期一至星期五)和非工作日(星期六、星期日)
        // 20160501 055052
        val sdf = new SimpleDateFormat("yyyyMMddHHmmss") //24小时工作制
        val date = sdf.parse(preTime_stamp)
        val cal = Calendar.getInstance
        cal.setTime(date)
        var w = cal.get(Calendar.DAY_OF_WEEK) - 1
        // 工作日默认为1 非工作日默认为0
        var work_flag = 1
        if (w <= 0 || w >= 6) {
          work_flag = 0
        }

        // 按照30分钟间隔处理时间
        val time_ = preTime_stamp.substring(8, 12)
        // 截取指定位置的元素,前包括后不包括
        var minute_ = "00"
        if (time_.substring(2).toInt >= 30) {
          minute_ = "30"
        }
        val date_time = time_.toString.substring(0, 2) + minute_
        ((preUser_id, work_flag, date_time, preCell_id), 1)
      }
    }
    //preData.coalesce(1).saveAsTextFile(output)

    //使用reduceByKey(_+_)对(User_id,work_flag,date_time,Cell_id)访问次数进行聚合,根据聚合结果,选择用户某段时间在30分钟内划分访问次数最多的基站为标准访问地点。
    val aggData = preData.reduceByKey(_ + _)
      .map { x => ((x._1._1, x._1._2, x._1._3), (x._1._4, x._2)) }
      .reduceByKey((a, b) => if (a._2 > b._2) a else b)//选取访问次数最多的cell
    //aggData.coalesce(1).saveAsTextFile(output)

    //获取用户工作日24小时访问地点cell_id、节假日24小时访问地点cell_id,以30分钟为最小时间粒度划分时间片,得到user_id工作日48维时间片访问cell_id和节假日48维时间片访问cell_id,共计96维时间片。
    //(User_id,work_flag,date_time),(Cell_id,nums)->(User_id,work_flag),(date_time,Cell_id)

    val slotData = aggData.map { x => ((x._1._1, x._1._2), (x._1._3 + ":" + x._2._1)) }.reduceByKey(_ + ";" + _)
    //slotData.coalesce(1).saveAsTextFile(output)

    // 位置编码
    // 根据聚合结果,提取所有用户访问的基站进行重新编码,获得用户访问位置列表cell_id,并进行排序去重
    // (User_id,work_flag,date_time),(Cell_id,nums)
    val minCell = aggData.map(x => x._2._1).sortBy(x=>x.toLong,true).collect().distinct
    println(minCell.toList)
    //使用zip方法从1开始对用户访问地址进行编码,并将编码进行保存。
    val index_list = minCell.zip(Stream from 1).toMap
    println(index_list)
    //得到的index_list即是用户访问位置编码特征向量。
  }
}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

object CleanCongestionData {

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    if (args.length < 2) {
      System.err.println("Usage:CleanCongestionData <InPath> <OutPut> <Model>")
      System.exit(1)
    }

    // 2rd_data/ch13/road_congestion_sample.txt output/ch13/CongestionModel local[2]
    val Array(input,output,mode) = args

    // 初始化SparkContext
    val conf = new SparkConf()
      .setMaster(mode)//.setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    // 计算link的拥堵情况,指定道路、工作日状态、时间片时,link拥堵指数的平均值(四舍五入)取整,
    // key (linkid, work_flag, hour) value (congestion)
    // 85349482;1;20.5;1
    val data = sc.textFile(input).map(_.split(";"))
      .map {x => ((x(0),x(1),x(2)),x(3))}
      .groupByKey().mapValues(x=>{
        val a = x.toList.reduceLeft((sum,i)=>sum +i)//拥堵指数求和
        val b = x.toList.length
        Math.round(a.toInt/b)//平均拥堵指数
      })
    //data.coalesce(1).saveAsTextFile(output)

    // 根据key聚合数据后,使用hour 进行排序 并删除hour数据
    // key (linkid,work_flag, hour) value (congestion)->(linkid) value(work_flag,congestion)
    val collectData = data.sortBy(x=>x._1._3).map(x => ((x._1._1),(x._1._2+":"+x._2))).reduceByKey(_ + ";" + _)
    collectData.coalesce(1).saveAsTextFile(output)
  }
}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注