利用JavaAPI来操作Hbase
例子采用的是完全分布式集群,不是hbase自带的zookeeper,是独立的zookeeper
mvn的依赖如下:
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.4.8</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.4.8</version> </dependency>
在hbase中创建表、插入数据、查询数据等操作 import java.text.SimpleDateFormat import java.util import hbase.TestHbaeJavaApi.conf import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.filter.{FilterList, SingleColumnValueFilter} import org.apache.hadoop.hbase.util.Bytes /** * 利用JavaAPI来操作Hbase */ object HBaseTool { val zkQuorum = "192.168.0.219" val port = "2181" val table = "test" val cf = "cf1" val config = HBaseConfiguration.create() config.set("hbase.zookeeper.property.clientPort", "2181") config.set("hbase.zookeeper.quorum", "192.168.0.219") config.set("hbase.master", "192.168.0.219:600000") def putData(rowKey:String, cf:String = cf, kv:Seq[(String,String)]): Put ={ val put = new Put(Bytes.toBytes(rowKey)) kv.foreach{ kv => put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2)) } put } def getData(rowKey:String, qualifier:String =null): Get={ val get = new Get(Bytes.toBytes(rowKey)) if(qualifier != null) get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(qualifier)) get } def getScan(startRow:String, stopRow:String, filters:Map[String,String]= null, columns:Seq[String]= null): Scan= { var filterList: FilterList = null val scan = new Scan() .setStartRow(Bytes.toBytes(startRow)) .setStopRow(Bytes.toBytes(stopRow)) if(filters != null){ filterList = getFilters(filters) if(filterList.getFilters.size() > 0) scan.setFilter(filterList) } if(columns != null) { columns.foreach{ column => scan.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column)) } } scan } def getFilters(kv:Map[String,String]): FilterList = { val filterList = new FilterList() kv.toSeq.foreach{ kv => val filter = new SingleColumnValueFilter( Bytes.toBytes(cf), Bytes.toBytes(kv._1), CompareOp.EQUAL, Bytes.toBytes(kv._2) ) filter.setFilterIfMissing(true) filterList.addFilter(filter) } filterList } def main(args: Array[String]): Unit = { val rowKey = new SimpleDateFormat("yyyy-MM-dd").format(System.currentTimeMillis()) val testSchema = Seq("id","name","age") val testData = Seq("10001","Jack","22") /** * 1. 插入数据到HBase */ val hTable: HTable = new HTable(config, TableName.valueOf(table)) hTable.setAutoFlush(false) hTable.setWriteBufferSize(10 * 1024 * 1024) //处理任务功能 hTable.put(putData(rowKey, cf=cf, testSchema zip testData)) hTable.flushCommits() /** * 2. 通过Get查询HBase */ val result: Result = hTable.get(getData(rowKey)) for(kv <- result.raw()) { println("key="+Bytes.toString(kv.getQualifier)+", value="+Bytes.toString(kv.getValue)) } val result1: Result = hTable.get(getData(rowKey, testSchema.toList(2))) for(kv <- result1.raw()) { println("value="+Bytes.toString(kv.getValue)) } /** * 3. 通过Scan查询HBase */ val scan = getScan(rowKey, rowKey) val resultScan: ResultScanner = hTable.getScanner(scan) val ite: util.Iterator[Result] = resultScan.iterator() while(ite.hasNext) { val result = ite.next() for(kv <- result.raw()) { println("rowKey="+Bytes.toString(kv.getRow)+", key="+Bytes.toString(kv.getQualifier)+", value="+Bytes.toString(kv.getValue)) } } hTable.close() } }