Java样例代码
功能介绍
在Spark应用中,通过使用Spark
Streaming调用Kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase
/** * 运行Spark Streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表 */ public class SparkOnStreamingToHbase { public static void main(String[] args) throws Exception { if (args.length < 3) { printUsage(); } String checkPointDir = args[0]; String topics = args[1]; final String brokers = args[2]; Duration batchDuration = Durations.seconds(5); SparkConf sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchDuration); // 设置Spark Streaming的CheckPoint目录 if (!"nocp".equals(checkPointDir)) { jssc.checkpoint(checkPointDir); } final String columnFamily = "cf"; HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); String[] topicArr = topics.split(","); Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr)); // 通过brokers和topics直接创建kafka stream // 接收Kafka中数据,生成相应DStream JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicSet).map( new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { // map(_._1)是消息的key, map(_._2)是消息的value return tuple2._2(); } } ); lines.foreachRDD( new Function<JavaRDD<String>, Void>() { public Void call(JavaRDD<String> rdd) throws Exception { rdd.foreachPartition( new VoidFunction<Iterator<String>>() { public void call(Iterator<String> iterator) throws Exception { hBaseWriter(iterator, columnFamily); } } ); return null; } } ); jssc.start(); jssc.awaitTermination(); } /** * 在executor端写入数据 * @param iterator 消息 * @param columnFamily */ private static void hBaseWriter(Iterator<String> iterator, String columnFamily) throws IOException { Configuration conf = HBaseConfiguration.create(); Connection connection = null; Table table = null; try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf("table1")); List<Get> rowList = new ArrayList<Get>(); while (iterator.hasNext()) { Get get = new Get(iterator.next().getBytes()); rowList.add(get); } // 获取table1的数据 Result[] resultDataBuffer = table.get(rowList); // 设置table1的数据 List<Put> putList = new ArrayList<Put>(); for (int i = 0; i < resultDataBuffer.length; i++) { String row = new String(rowList.get(i).getRow()); Result resultData = resultDataBuffer[i]; if (!resultData.isEmpty()) { // 根据列簇和列,获取旧值 String aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes())); Put put = new Put(Bytes.toBytes(row)); // 计算结果 int resultValue = Integer.valueOf(row) + Integer.valueOf(aCid); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue))); putList.add(put); } } if (putList.size() > 0) { table.put(putList); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { // 关闭Hbase连接. connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } private static void printUsage() { System.out.println("Usage: {checkPointDir} {topic} {brokerList}"); System.exit(1); } }
Scala样例代码
功能介绍
在Spark应用中,通过使用Spark
Streaming调用Kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase
/**
* 运行Spark Streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表
*/
object SparkOnStreamingToHbase {
def main(args: Array[String]) {
if (args.length < 3) {
printUsage
}
val Array(checkPointDir, topics, brokers) = args
val sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 设置Spark Streaming的CheckPoint目录
if (!"nocp".equals(checkPointDir)) {
ssc.checkpoint(checkPointDir)
}
val columnFamily = "cf"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers
)
val topicArr = topics.split(",")
val topicSet = topicArr.toSet
// map(_._1)是消息的key, map(_._2)是消息的value
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
lines.foreachRDD(rdd => {
//partition运行在executor上
rdd.foreachPartition(iterator => hBaseWriter(iterator, columnFamily))
})
ssc.start()
ssc.awaitTermination()
}
/**
* 在executor端写入数据
* @param iterator 消息
* @param columnFamily
*/
def hBaseWriter(iterator: Iterator[String], columnFamily: String): Unit = {
val conf = HBaseConfiguration.create()
var table: Table = null
var connection: Connection = null
try {
connection = ConnectionFactory.createConnection(conf)
table = connection.getTable(TableName.valueOf("table1"))
val iteratorArray = iterator.toArray
val rowList = new util.ArrayList[Get]()
for (row <- iteratorArray) {
val get = new Get(row.getBytes)
rowList.add(get)
}
// 获取table1的数据
val resultDataBuffer = table.get(rowList)
// 设置table1的数据
val putList = new util.ArrayList[Put]()
for (i <- 0 until iteratorArray.size) {
val row = iteratorArray(i)
val resultData = resultDataBuffer(i)
if (!resultData.isEmpty) {
// 根据列簇和列,获取旧值
val aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))
val put = new Put(Bytes.toBytes(row))
// 计算结果
val resultValue = row.toInt + aCid.toInt
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))
putList.add(put)
}
}
if (putList.size() > 0) {
table.put(putList)
}
} catch {
case e: IOException =>
e.printStackTrace();
} finally {
if (table != null) {
try {
table.close()
} catch {
case e: IOException =>
e.printStackTrace();
}
}
if (connection != null) {
try {
// 关闭Hbase连接.
connection.close()
} catch {
case e: IOException =>
e.printStackTrace()
}
}
}
}
private def printUsage {
System.out.println("Usage: {checkPointDir} {topic} {brokerList}")
System.exit(1)
}
}