spark实例3:倒排索引

   输入数据文件格式:
   cx1:a,b,c,d,e,f
   cx2:c,d,e,f
   cx3:a,b,c,f
   cx4:a,b,c,d,e,f
   cx5:a,b,e,f
   cx6:a,b,c,d
   cx7:a,b,c,f
   cx8:d,e,f
   cx9:b,c,d,e,f
  
  输出结果:
  a|cx1,cx3,cx4,cx5,cx6,cx7
  b|cx1,cx3,cx4,cx5,cx6,cx7,cx9
  c|cx1,cx2,cx3,cx4,cx6,cx7,cx9
  d|cx1,cx2,cx4,cx6,cx8,cx9
  e|cx1,cx2,cx4,cx5,cx8,cx9
  f|cx1,cx2,cx3,cx4,cx5,cx7,cx8,cx9

import scala.io.Source
object InvertedIndex {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName(InvertedIndex.getClass.getSimpleName)
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    /* 倒排索引InvertedIndex */

    val source = Source.fromFile("D:\\java\\spark\\data\\invertIndex.txt").getLines.toArray

    val cxRDD0 = sc.parallelize(source) /* spark单机读取数据 */

   val rdd2 = cxRDD0.flatMap {

      lines =>

        val line = lines.split(":", -1) /* 拆分数据,以竖杠为拆分条件 */

        line(1).split(",", -1).map {
          /* 再对拆分后的数据,进行第二次拆分 */

          v =>

            (v, line(0)) /* 拼接数据 */

        }

    }
    rdd2.collect().foreach {println}
     rdd2.groupByKey() /* 分组 */

      .sortBy(_._1, true) /* 排序 */

      .foreach(x => println(s"${x._1}|${x._2.mkString(",")}")) /* 格式化输出 */

  }
}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注