Spark Streaming从Kafka读取数据再写入HBase 实例

Java样例代码

功能介绍

在Spark应用中,通过使用Spark Streaming调用Kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase

/**  * 运行Spark Streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表  */ public class SparkOnStreamingToHbase {   public static void main(String[] args) throws Exception {     if (args.length < 3) {       printUsage();     }     String checkPointDir = args[0];     String topics = args[1];     final String brokers = args[2];     Duration batchDuration = Durations.seconds(5);     SparkConf sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase");     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchDuration);     // 设置Spark Streaming的CheckPoint目录     if (!"nocp".equals(checkPointDir)) {       jssc.checkpoint(checkPointDir);     }     final String columnFamily = "cf";     HashMap<String, String> kafkaParams = new HashMap<String, String>();     kafkaParams.put("metadata.broker.list", brokers);     String[] topicArr = topics.split(",");     Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));     // 通过brokers和topics直接创建kafka stream     // 接收Kafka中数据,生成相应DStream     JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc, String.class, String.class,       StringDecoder.class, StringDecoder.class, kafkaParams, topicSet).map(       new Function<Tuple2<String, String>, String>() {         public String call(Tuple2<String, String> tuple2) {           // map(_._1)是消息的key, map(_._2)是消息的value           return tuple2._2();         }       }     );     lines.foreachRDD(       new Function<JavaRDD<String>, Void>() {         public Void call(JavaRDD<String> rdd) throws Exception {           rdd.foreachPartition(             new VoidFunction<Iterator<String>>() {               public void call(Iterator<String> iterator) throws Exception {                 hBaseWriter(iterator, columnFamily);               }             }           );           return null;         }       }     );     jssc.start();     jssc.awaitTermination();   }   /**    * 在executor端写入数据    * @param iterator  消息    * @param columnFamily    */   private static void hBaseWriter(Iterator<String> iterator, String columnFamily) throws IOException {     Configuration conf = HBaseConfiguration.create();     Connection connection = null;     Table table = null;     try {       connection = ConnectionFactory.createConnection(conf);       table = connection.getTable(TableName.valueOf("table1"));       List<Get> rowList = new ArrayList<Get>();       while (iterator.hasNext()) {         Get get = new Get(iterator.next().getBytes());         rowList.add(get);       }       // 获取table1的数据       Result[] resultDataBuffer = table.get(rowList);       // 设置table1的数据       List<Put> putList = new ArrayList<Put>();       for (int i = 0; i < resultDataBuffer.length; i++) {         String row = new String(rowList.get(i).getRow());         Result resultData = resultDataBuffer[i];         if (!resultData.isEmpty()) {           // 根据列簇和列,获取旧值           String aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));           Put put = new Put(Bytes.toBytes(row));           // 计算结果           int resultValue = Integer.valueOf(row) + Integer.valueOf(aCid);           put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));           putList.add(put);         }       }       if (putList.size() > 0) {         table.put(putList);       }     } catch (IOException e) {       e.printStackTrace();     } finally {       if (table != null) {         try {           table.close();         } catch (IOException e) {           e.printStackTrace();         }       }       if (connection != null) {         try {           // 关闭Hbase连接.           connection.close();         } catch (IOException e) {           e.printStackTrace();         }       }     }   }     private static void printUsage() {     System.out.println("Usage: {checkPointDir} {topic} {brokerList}");     System.exit(1);   } }


Scala样例代码

功能介绍

在Spark应用中,通过使用Spark Streaming调用Kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase

/**
  * 运行Spark Streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表
  */
object SparkOnStreamingToHbase {
  def main(args: Array[String]) {
    if (args.length < 3) {
      printUsage
    }

    val Array(checkPointDir, topics, brokers) = args
    val sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 设置Spark Streaming的CheckPoint目录
    if (!"nocp".equals(checkPointDir)) {
      ssc.checkpoint(checkPointDir)
    }

    val columnFamily = "cf"
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers
    )

    val topicArr = topics.split(",")
    val topicSet = topicArr.toSet
    // map(_._1)是消息的key, map(_._2)是消息的value
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
    lines.foreachRDD(rdd => {
      //partition运行在executor上
      rdd.foreachPartition(iterator => hBaseWriter(iterator, columnFamily))
    })

    ssc.start()
    ssc.awaitTermination()
  }

  
  /**
   * 在executor端写入数据
   * @param iterator  消息
   * @param columnFamily
   */
  def hBaseWriter(iterator: Iterator[String], columnFamily: String): Unit = {
    val conf = HBaseConfiguration.create()
    var table: Table = null
    var connection: Connection = null
    try {
      connection = ConnectionFactory.createConnection(conf)
      table = connection.getTable(TableName.valueOf("table1"))
      val iteratorArray = iterator.toArray
      val rowList = new util.ArrayList[Get]()
      for (row <- iteratorArray) {
        val get = new Get(row.getBytes)
        rowList.add(get)
      }
      // 获取table1的数据
      val resultDataBuffer = table.get(rowList)
      // 设置table1的数据
      val putList = new util.ArrayList[Put]()
      for (i <- 0 until iteratorArray.size) {
        val row = iteratorArray(i)
        val resultData = resultDataBuffer(i)
        if (!resultData.isEmpty) {
          // 根据列簇和列,获取旧值
          val aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))
          val put = new Put(Bytes.toBytes(row))
          // 计算结果
          val resultValue = row.toInt + aCid.toInt
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))
          putList.add(put)
        }
      }
      if (putList.size() > 0) {
        table.put(putList)
      }
    } catch {
      case e: IOException =>
        e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close()
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // 关闭Hbase连接.
          connection.close()
        } catch {
          case e: IOException =>
            e.printStackTrace()
        }
      }
    }
  }
  

  private def printUsage {
    System.out.println("Usage: {checkPointDir} {topic} {brokerList}")
    System.exit(1)
  }
}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注