利用JavaAPI来操作Hbase

例子采用的是完全分布式集群,不是hbase自带的zookeeper,是独立的zookeeper

mvn的依赖如下:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.4.8</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.4.8</version>
</dependency>
在hbase中创建表、插入数据、查询数据等操作

import java.text.SimpleDateFormat
import java.util

import hbase.TestHbaeJavaApi.conf
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.{FilterList, SingleColumnValueFilter}
import org.apache.hadoop.hbase.util.Bytes

/**
  * 利用JavaAPI来操作Hbase
  */
object HBaseTool {
  val zkQuorum = "192.168.0.219"
  val port = "2181"
  val table = "test"
  val cf = "cf1"
  val config = HBaseConfiguration.create()
  config.set("hbase.zookeeper.property.clientPort", "2181")
  config.set("hbase.zookeeper.quorum", "192.168.0.219")
  config.set("hbase.master", "192.168.0.219:600000")

  def putData(rowKey:String, cf:String = cf, kv:Seq[(String,String)]): Put ={
    val put = new Put(Bytes.toBytes(rowKey))
    kv.foreach{ kv =>
      put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2))
    }
    put
  }

  def getData(rowKey:String, qualifier:String =null): Get={
    val get = new Get(Bytes.toBytes(rowKey))
    if(qualifier != null)
      get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(qualifier))
    get
  }

  def getScan(startRow:String, stopRow:String, filters:Map[String,String]= null, columns:Seq[String]= null): Scan= {
    var filterList: FilterList = null
    val scan = new Scan()
      .setStartRow(Bytes.toBytes(startRow))
      .setStopRow(Bytes.toBytes(stopRow))

    if(filters != null){
      filterList = getFilters(filters)
      if(filterList.getFilters.size() > 0)
        scan.setFilter(filterList)
    }

    if(columns != null) {
      columns.foreach{ column =>
        scan.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column))
      }
    }

    scan
  }

  def getFilters(kv:Map[String,String]): FilterList = {
    val filterList = new FilterList()
    kv.toSeq.foreach{ kv =>
      val filter = new SingleColumnValueFilter(
        Bytes.toBytes(cf),
        Bytes.toBytes(kv._1),
        CompareOp.EQUAL,
        Bytes.toBytes(kv._2)
      )
      filter.setFilterIfMissing(true)
      filterList.addFilter(filter)
    }
    filterList
  }

  def main(args: Array[String]): Unit = {
    val rowKey = new SimpleDateFormat("yyyy-MM-dd").format(System.currentTimeMillis())
    val testSchema = Seq("id","name","age")
    val testData = Seq("10001","Jack","22")
    /**
      * 1. 插入数据到HBase
      */
    val hTable: HTable = new HTable(config, TableName.valueOf(table))
    hTable.setAutoFlush(false)
    hTable.setWriteBufferSize(10 * 1024 * 1024)
    //处理任务功能
    hTable.put(putData(rowKey, cf=cf, testSchema zip testData))

    hTable.flushCommits()

    /**
      * 2. 通过Get查询HBase
      */
    val result: Result = hTable.get(getData(rowKey))
    for(kv <- result.raw()) {
      println("key="+Bytes.toString(kv.getQualifier)+", value="+Bytes.toString(kv.getValue))
    }

    val result1: Result = hTable.get(getData(rowKey, testSchema.toList(2)))
    for(kv <- result1.raw()) {
      println("value="+Bytes.toString(kv.getValue))
    }

    /**
      * 3. 通过Scan查询HBase
      */
    val scan = getScan(rowKey, rowKey)
    val resultScan: ResultScanner = hTable.getScanner(scan)
    val ite: util.Iterator[Result] = resultScan.iterator()
    while(ite.hasNext) {
      val result = ite.next()
      for(kv <- result.raw()) {
        println("rowKey="+Bytes.toString(kv.getRow)+", key="+Bytes.toString(kv.getQualifier)+", value="+Bytes.toString(kv.getValue))
      }
    }

    hTable.close()
  }



}

 

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注