写入Hbase报错CallQueueTooBigException的解决
在CDH6.3.2,通过hive外部表插入数据到hbase时报错:
24/09/18 10:43:04 ERROR status.SparkJobMonitor: Spark job[3] failed [INFO] 2024-09-18 10:43:04.156 – [taskAppId=TASK-41-192147-585089]:[127] – -> java.util.concurrent.ExecutionException: Exception thrown by job at org.apache.spark.JavaFutureActionWrapper.getImpl(FutureAction.scala:337) at org.apache.spark.JavaFutureActionWrapper.get(FutureAction.scala:342) at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:404) at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:365) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 8.0 failed 4 times, most recent failure: Lost task 5.3 in stage 8.0 (TID 85, cdh02, executor 8): java.lang.RuntimeException: Hive Runtime Error while closing operators: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 4 actions: CallQueueTooBigException: 4 times, servers with issues: cdh02,16020,1720579381747 at org.apache.hadoop.hive.ql.exec.spark.SparkReduceRecordHandler.close(SparkReduceRecordHandler.java:463) at org.apache.hadoop.hive.ql.exec.spark.HiveReduceFunctionResultList.closeRecordProcessor(HiveReduceFunctionResultList.java:67) at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:96) at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 4 actions: CallQueueTooBigException: 4 times, servers with issues: cdh02,16020,1720579381747 at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:198) at org.apache.hadoop.hive.ql.exec.FileSinkOperator.closeOp(FileSinkOperator.java:1058) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:686) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:700) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:700) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:700) at org.apache.hadoop.hive.ql.exec.spark.SparkReduceRecordHandler.close(SparkReduceRecordHandler.java:447) … 17 more Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 4 actions: CallQueueTooBigException: 4 times, servers with issues: cdh02,16020,1720579381747 at org.apache.hadoop.hbase.client.BatchErrors.makeException(BatchErrors.java:54) at org.apache.hadoop.hbase.client.AsyncRequestFutureImpl.getErrors(AsyncRequestFutureImpl.java:1226) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doFlush(BufferedMutatorImpl.java:309) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:241) at org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat$MyRecordWriter.close(HiveHBaseTableOutputFormat.java:130) at org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat$MyRecordWriter.close(HiveHBaseTableOutputFormat.java:168) at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:195)
从错误日志来看,问题出现在通过Hive向HBase写入数据时,具体错误是CallQueueTooBigException
,即HBase的请求队列太大,导致数据写入失败。
错误分析:
CallQueueTooBigException
:这是HBase的一种负载保护机制。当HBase的RPC队列过载,达到最大处理能力时,会抛出此异常,表示服务器无法处理更多的请求。这通常意味着HBase服务器在写入期间负载过重。- Spark/Hive 与 HBase交互问题:在通过Hive和Spark写入HBase时,数据可能被并行地、大批量地发送给HBase。如果HBase负载过高或者写入并发量过大,可能会出现请求堆积,导致
CallQueueTooBigException
。 - 重试机制:日志中提到”RetriesExhaustedWithDetailsException”,说明Spark在多次重试之后仍然无法成功完成写入操作,导致任务最终失败。
可能的原因:
- HBase负载过大:HBase集群可能承受了过多的请求,导致队列超载。
- HBase资源不足:HBase服务器的硬件资源(如内存、CPU等)不足,无法处理高并发写入请求。
- 写入数据量过大:Hive通过Spark向HBase写入的数据量太大,超过了HBase的处理能力。
- 不合理的HBase配置:HBase的写入配置可能设置得过低,例如
hbase.regionserver.handler.count
(处理器数量)、hbase.rpc.timeout
(RPC超时设置)等参数未合理配置。
解决方案:
- 调优HBase集群:
- 增加HBase的处理能力:
- 增加HBase的RegionServer实例数量。
- 调整HBase集群的硬件配置,增加内存和CPU资源。
- 调整HBase配置:
- 增加
hbase.regionserver.handler.count
,该参数控制RegionServer可以同时处理的并发请求数量。 - 调整
hbase.ipc.server.max.callqueue.size
,增大HBase RPC队列的大小以处理更多的并发请求。 - 增加
hbase.rpc.timeout
的值,避免超时过早导致任务失败。
- 增加
- 增加HBase的处理能力:
- 减少数据写入压力:
- 批量写入控制:通过Spark作业配置控制每次写入的数据批量大小,减少单次请求的数据量。可以调整Spark配置参数,如
spark.sql.shuffle.partitions
,以减少分区数。 - 限制并发写入:如果数据量特别大,可以控制并发写入HBase的作业数量,减小每次写入的压力。
- 批量写入控制:通过Spark作业配置控制每次写入的数据批量大小,减少单次请求的数据量。可以调整Spark配置参数,如
- 重试机制配置:
- 如果错误为临时性问题,可以在HBase客户端或Spark作业中增加重试次数,例如调整
hbase.client.retries.number
参数,确保在负载压力下仍然有更大的重试机会。
- 如果错误为临时性问题,可以在HBase客户端或Spark作业中增加重试次数,例如调整
- 观察HBase监控日志:使用HBase的监控工具如HBase Web UI或者通过
Ganglia
等监控工具,观察HBase的RegionServer、请求队列、内存等资源使用情况,找出具体的瓶颈。