例子采用的是完全分布式集群,不是hbase自带的zookeeper,是独立的zookeeper
mvn的依赖如下:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.4.8</version>
</dependency>
在hbase中创建表、插入数据、查询数据等操作
import java.text.SimpleDateFormat
import java.util
import hbase.TestHbaeJavaApi.conf
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.{FilterList, SingleColumnValueFilter}
import org.apache.hadoop.hbase.util.Bytes
/**
* 利用JavaAPI来操作Hbase
*/
object HBaseTool {
val zkQuorum = "192.168.0.219"
val port = "2181"
val table = "test"
val cf = "cf1"
val config = HBaseConfiguration.create()
config.set("hbase.zookeeper.property.clientPort", "2181")
config.set("hbase.zookeeper.quorum", "192.168.0.219")
config.set("hbase.master", "192.168.0.219:600000")
def putData(rowKey:String, cf:String = cf, kv:Seq[(String,String)]): Put ={
val put = new Put(Bytes.toBytes(rowKey))
kv.foreach{ kv =>
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2))
}
put
}
def getData(rowKey:String, qualifier:String =null): Get={
val get = new Get(Bytes.toBytes(rowKey))
if(qualifier != null)
get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(qualifier))
get
}
def getScan(startRow:String, stopRow:String, filters:Map[String,String]= null, columns:Seq[String]= null): Scan= {
var filterList: FilterList = null
val scan = new Scan()
.setStartRow(Bytes.toBytes(startRow))
.setStopRow(Bytes.toBytes(stopRow))
if(filters != null){
filterList = getFilters(filters)
if(filterList.getFilters.size() > 0)
scan.setFilter(filterList)
}
if(columns != null) {
columns.foreach{ column =>
scan.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column))
}
}
scan
}
def getFilters(kv:Map[String,String]): FilterList = {
val filterList = new FilterList()
kv.toSeq.foreach{ kv =>
val filter = new SingleColumnValueFilter(
Bytes.toBytes(cf),
Bytes.toBytes(kv._1),
CompareOp.EQUAL,
Bytes.toBytes(kv._2)
)
filter.setFilterIfMissing(true)
filterList.addFilter(filter)
}
filterList
}
def main(args: Array[String]): Unit = {
val rowKey = new SimpleDateFormat("yyyy-MM-dd").format(System.currentTimeMillis())
val testSchema = Seq("id","name","age")
val testData = Seq("10001","Jack","22")
/**
* 1. 插入数据到HBase
*/
val hTable: HTable = new HTable(config, TableName.valueOf(table))
hTable.setAutoFlush(false)
hTable.setWriteBufferSize(10 * 1024 * 1024)
//处理任务功能
hTable.put(putData(rowKey, cf=cf, testSchema zip testData))
hTable.flushCommits()
/**
* 2. 通过Get查询HBase
*/
val result: Result = hTable.get(getData(rowKey))
for(kv <- result.raw()) {
println("key="+Bytes.toString(kv.getQualifier)+", value="+Bytes.toString(kv.getValue))
}
val result1: Result = hTable.get(getData(rowKey, testSchema.toList(2)))
for(kv <- result1.raw()) {
println("value="+Bytes.toString(kv.getValue))
}
/**
* 3. 通过Scan查询HBase
*/
val scan = getScan(rowKey, rowKey)
val resultScan: ResultScanner = hTable.getScanner(scan)
val ite: util.Iterator[Result] = resultScan.iterator()
while(ite.hasNext) {
val result = ite.next()
for(kv <- result.raw()) {
println("rowKey="+Bytes.toString(kv.getRow)+", key="+Bytes.toString(kv.getQualifier)+", value="+Bytes.toString(kv.getValue))
}
}
hTable.close()
}
}