输入数据文件格式:
cx1:a,b,c,d,e,f
cx2:c,d,e,f
cx3:a,b,c,f
cx4:a,b,c,d,e,f
cx5:a,b,e,f
cx6:a,b,c,d
cx7:a,b,c,f
cx8:d,e,f
cx9:b,c,d,e,f
输出结果:
a|cx1,cx3,cx4,cx5,cx6,cx7
b|cx1,cx3,cx4,cx5,cx6,cx7,cx9
c|cx1,cx2,cx3,cx4,cx6,cx7,cx9
d|cx1,cx2,cx4,cx6,cx8,cx9
e|cx1,cx2,cx4,cx5,cx8,cx9
f|cx1,cx2,cx3,cx4,cx5,cx7,cx8,cx9
import scala.io.Source
object InvertedIndex {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(InvertedIndex.getClass.getSimpleName)
conf.setMaster("local")
val sc = new SparkContext(conf)
/* 倒排索引InvertedIndex */
val source = Source.fromFile("D:\\java\\spark\\data\\invertIndex.txt").getLines.toArray
val cxRDD0 = sc.parallelize(source) /* spark单机读取数据 */
val rdd2 = cxRDD0.flatMap {
lines =>
val line = lines.split(":", -1) /* 拆分数据,以竖杠为拆分条件 */
line(1).split(",", -1).map {
/* 再对拆分后的数据,进行第二次拆分 */
v =>
(v, line(0)) /* 拼接数据 */
}
}
rdd2.collect().foreach {println}
rdd2.groupByKey() /* 分组 */
.sortBy(_._1, true) /* 排序 */
.foreach(x => println(s"${x._1}|${x._2.mkString(",")}")) /* 格式化输出 */
}
}