gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Hbase

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Hbase"
Hbase 9月 27,2024

HBase协处理器详解及高级应用技巧

一、引言

HBase,作为Apache Hadoop生态系统中的核心组件之一,是一款分布式、面向列的NoSQL数据库。它凭借其高可靠性、高性能以及强大的横向扩展能力,在大数据处理领域得到了广泛的应用。HBase协处理器(Coprocessor)作为HBase的重要特性之一,为用户提供了在RegionServer端执行自定义代码的能力,从而进一步提升了HBase的灵活性和功能性。

协处理器这一概念的引入,可以追溯到HBase 0.92版本。随着大数据时代的到来,数据量呈现爆炸式增长,传统的数据处理方式已经无法满足日益复杂的需求。HBase协处理器作为一种创新的技术手段,应运而生,旨在为用户提供更加高效、便捷的数据处理方式。

协处理器允许用户在RegionServer端执行自定义代码,这意味着用户可以将数据处理逻辑下沉到数据存储层,从而减少了数据在网络中的传输,提高了处理效率。同时,协处理器还支持实时数据处理,为用户提供了更加实时的数据分析和处理能力。

HBase协处理器具有以下显著特点:

灵活性:用户可以根据自己的需求编写自定义代码,实现各种复杂的数据处理逻辑。

高效性:协处理器将数据处理逻辑下沉到RegionServer端,减少了数据在网络中的传输,提高了处理效率。

实时性:协处理器支持实时数据处理,为用户提供了更加实时的数据分析和处理能力。

扩展性:协处理器可以方便地与其他Hadoop组件集成,如MapReduce、Spark等,实现更加复杂的数据处理任务。

二、HBase协处理器类型

1. Endpoint协处理器

Endpoint协处理器允许用户在RegionServer端执行自定义的RPC(远程过程调用)方法。用户可以通过定义自己的Endpoint类,实现特定的业务逻辑,并通过HBase客户端调用这些方法。这种方式可以实现数据的实时处理和分析,提高数据处理效率。

2. Observer协处理器

Observer协处理器允许用户在RegionServer端监听并处理各种事件,如数据的插入、更新、删除等。用户可以通过定义自己的Observer类,实现对这些事件的实时响应和处理。这种方式可以用于实现数据的实时监控、触发特定业务流程等功能。

Observer协处理器又可以分为以下几种类型:

  • RegionObserver:监听Region级别的事件,如Region的打开、关闭等。
  • WALObserver:监听Write-Ahead Log(WAL)级别的事件,如WAL的写入、滚动等。
  • RegionServerObserver:监听RegionServer级别的事件,如RegionServer的启动、停止等。

3. Load Balancer协处理器

Load Balancer协处理器用于实现HBase集群的负载均衡。它可以根据集群中各个RegionServer的负载情况,自动调整Region的分布,以实现负载均衡。这种方式可以提高集群的整体性能和稳定性。

三、HBase协处理器的高级使用技巧

1. 自定义Endpoint协处理器

通过自定义Endpoint协处理器,用户可以实现更加复杂的数据处理逻辑。例如,用户可以在Endpoint协处理器中实现数据的聚合、计算等功能,从而减少数据在网络中的传输,提高处理效率。

实例分析:

假设我们有一个电商网站,需要实时统计每个用户的购物车金额。我们可以编写一个自定义的Endpoint协处理器,在用户添加商品到购物车时,实时计算购物车金额,并将结果存储到HBase中。这样,我们就可以避免在查询时进行复杂的数据计算,提高查询效率。

代码示例:

public class ShoppingCartEndpoint extends BaseRegionObserver {
    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        // 获取用户ID和商品金额
        byte[] userId = put.getRow();
        long amount = 0;
        for (Cell cell : put.getFamilyCellMap().get("cf".getBytes())) {
            if (Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(), "amount".getBytes(), 0, "amount".length())) {
                amount += Bytes.toLong(CellUtil.cloneValue(cell));
            }
        }
        // 计算购物车金额
        long cartAmount = calculateCartAmount(userId);
        // 将购物车金额存储到HBase中
        Put cartPut = new Put(userId);
        cartPut.addColumn("cf".getBytes(), "cartAmount".getBytes(), Bytes.toBytes(cartAmount));
        e.getEnvironment().getRegion().put(cartPut);
    }

    private long calculateCartAmount(byte[] userId) {
        // 实现购物车金额的计算逻辑
        return 0;
    }
}

2. 自定义Observer协处理器

通过自定义Observer协处理器,用户可以实现对数据的实时监控和处理。例如,用户可以在Observer协处理器中实现数据的清洗、过滤等功能,从而提高数据的质量和准确性。

实例分析:

假设我们有一个日志分析系统,需要实时监控并处理用户访问日志。我们可以编写一个自定义的Observer协处理器,在用户访问日志写入HBase时,实时清洗和过滤日志数据,去除无效数据和异常值,从而提高数据的质量和准确性。

代码示例:

public class LogObserver extends BaseRegionObserver {
    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        // 获取日志数据
        byte[] logData = put.getRow();
        // 清洗和过滤日志数据
        byte[] cleanedData = cleanLogData(logData);
        if (cleanedData != null) {
            // 将清洗后的日志数据存储到HBase中
            Put cleanedPut = new Put(cleanedData);
            cleanedPut.addColumn("cf".getBytes(), "log".getBytes(), logData);
            e.getEnvironment().getRegion().put(cleanedPut);
        }
    }

    private byte[] cleanLogData(byte[] logData) {
        // 实现日志数据的清洗和过滤逻辑
        return logData;
    }
}

3. 使用协处理器实现二级索引

HBase本身不支持二级索引,但通过使用协处理器,我们可以实现二级索引的功能。用户可以在Observer协处理器中监听数据的插入、更新、删除等事件,并在这些事件发生时,自动维护二级索引表。

实例分析:

假设我们有一个用户信息表,需要根据用户的年龄进行查询。由于HBase本身不支持二级索引,我们无法直接根据年龄进行查询。但是,我们可以通过编写一个自定义的Observer协处理器,在用户信息插入、更新、删除时,自动维护一个年龄索引表。这样,我们就可以根据年龄进行查询了。

代码示例:

public class AgeIndexObserver extends BaseRegionObserver {
    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        // 获取用户信息
        byte[] userId = put.getRow();
        byte[] age = null;
        for (Cell cell : put.getFamilyCellMap().get("cf".getBytes())) {
            if (Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(), "age".getBytes(), 0, "age".length())) {
                age = CellUtil.cloneValue(cell);
                break;
            }
        }
        if (age != null) {
            // 维护年龄索引表
            Put indexPut = new Put(age);
            indexPut.addColumn("cf".getBytes(), "userId".getBytes(), userId);
            e.getEnvironment().getRegion().put(indexPut);
        }
    }

    @Override
    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
        // 获取用户信息
        byte[] userId = delete.getRow();
        // 删除年龄索引表中的记录
        Scan scan = new Scan();
        scan.setFilter(new PrefixFilter(userId));
        ResultScanner scanner = e.getEnvironment().getRegion().getScanner(scan);
        for (Result result : scanner) {
            Delete indexDelete = new Delete(result.getRow());
            e.getEnvironment().getRegion().delete(indexDelete);
        }
    }
}

4. 使用协处理器实现数据分片

HBase支持数据的分片存储,但默认情况下,数据的分片是根据RowKey的哈希值进行分配的。通过使用协处理器,我们可以实现自定义的数据分片策略,从而更好地满足业务需求。

实例分析:

假设我们有一个电商网站,需要根据用户的地理位置进行数据分片。我们可以编写一个自定义的Load Balancer协处理器,在RegionServer启动时,根据用户的地理位置信息,自动调整Region的分布,从而实现数据的分片存储。

代码示例:

public class GeoLoadBalancer extends LoadBalancer {
    @Override
    public void balanceCluster(Configuration conf, RegionInfo[] regions, ServerManager serverManager) throws IOException {
        // 获取集群中各个RegionServer的负载情况
        Map<ServerName, List<RegionInfo>> serverRegionsMap = serverManager.getRegions();
        // 根据用户的地理位置信息,自动调整Region的分布
        for (Map.Entry<ServerName, List<RegionInfo>> entry : serverRegionsMap.entrySet()) {
            ServerName serverName = entry.getKey();
            List<RegionInfo> regionsList = entry.getValue();
            for (RegionInfo regionInfo : regionsList) {
                // 获取Region的RowKey
                byte[] rowKey = regionInfo.getStartKey();
                // 根据RowKey的地理位置信息,调整Region的分布
                ServerName targetServer = getTargetServer(rowKey);
                if (!serverName.equals(targetServer)) {
                    serverManager.move(regionInfo.getRegionName(), targetServer);
                }
            }
        }
    }

    private ServerName getTargetServer(byte[] rowKey) {
        // 实现自定义的数据分片策略
        return null;
    }
}

5. 协处理器与MapReduce集成

HBase协处理器可以与MapReduce集成,实现更加复杂的数据处理任务。用户可以在MapReduce作业中调用自定义的Endpoint协处理器,实现数据的实时处理和分析。

实例分析:

假设我们有一个日志分析系统,需要定期统计用户访问日志中的异常情况。我们可以编写一个自定义的Endpoint协处理器,在MapReduce作业中调用该协处理器,实现日志数据的实时处理和分析。

代码示例:

public class LogAnalyzerJob extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "Log Analyzer");
        job.setJarByClass(LogAnalyzerJob.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        TableMapReduceUtil.initTableMapperJob("logTable", new Scan(), LogMapper.class, Text.class, IntWritable.class, job);
        TableMapReduceUtil.initTableReducerJob("resultTable", LogReducer.class, job);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static class LogMapper extends TableMapper<Text, IntWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            // 调用自定义的Endpoint协处理器,实现日志数据的实时处理和分析
            byte[] logData = value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("log"));
            byte[] result = LogEndpoint.processLogData(logData);
            context.write(new Text(result), new IntWritable(1));
        }
    }

    public static class LogReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            Put put = new Put(Bytes.toString(key.toString()).getBytes());
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(sum));
            context.write(null, put);
        }
    }
}

6. 协处理器性能优化

在使用HBase协处理器时,需要注意性能优化。以下是一些性能优化的建议:

  • 减少数据传输:尽量在RegionServer端完成数据处理逻辑,减少数据在网络中的传输。
  • 批量处理:尽量使用批量处理的方式,减少RPC调用的次数。
  • 缓存数据:对于频繁访问的数据,可以使用缓存的方式进行优化。
  • 并发控制:合理控制协处理器的并发度,避免对RegionServer造成过大的压力。

四、HBase协处理器的使用场景

1. 数据实时处理

HBase协处理器可以实现数据的实时处理和分析,为用户提供更加实时的数据支持。例如,用户可以在Endpoint协处理器中实现数据的实时聚合、计算等功能。

2. 数据清洗和过滤

HBase协处理器可以实现数据的清洗和过滤,提高数据的质量和准确性。例如,用户可以在Observer协处理器中实现数据的实时清洗和过滤。

3. 数据分片和负载均衡

HBase协处理器可以实现数据的分片存储和负载均衡,提高集群的整体性能和稳定性。例如,用户可以在Load Balancer协处理器中实现自定义的数据分片策略和负载均衡算法。

4. 数据备份和恢复

HBase协处理器可以实现数据的备份和恢复,为用户提供更加可靠的数据支持。例如,用户可以在Observer协处理器中实现数据的实时备份和恢复。

5. 数据分析和挖掘

HBase协处理器可以实现数据分析和挖掘,为用户提供更加深入的数据洞察。例如,用户可以在Endpoint协处理器中实现数据的实时分析和挖掘。

五、HBase协处理器在实际应用中的案例分析

案例一:电商网站的用户行为分析

在一个电商网站中,用户行为数据是非常重要的数据资产。通过分析用户行为数据,可以了解用户的购物习惯、兴趣偏好等信息,从而为用户提供更加个性化的服务。

在该电商网站中,使用了HBase协处理器来实现用户行为数据的实时处理和分析。具体实现方式如下:

  • 使用Endpoint协处理器实现用户行为的实时聚合和计算。
  • 使用Observer协处理器实现用户行为的实时监控和触发特定业务流程。

通过这种方式,可以实现对用户行为数据的实时处理和分析,为用户提供更加个性化的服务。

案例二:金融领域的风险控制

在金融领域,风险控制是非常重要的工作。通过分析用户的交易数据、信用数据等信息,可以及时发现潜在的风险,从而采取相应的措施进行风险控制。

在该金融领域中,使用了HBase协处理器来实现风险数据的实时处理和分析。具体实现方式如下:

  • 使用Endpoint协处理器实现风险数据的实时聚合和计算。
  • 使用Observer协处理器实现风险数据的实时监控和触发特定业务流程。

通过这种方式,可以实现对风险数据的实时处理和分析,及时发现潜在的风险,从而采取相应的措施进行风险控制。

六、HBase协处理器与Spark的集成应用

Apache Spark 是一个快速且通用的集群计算系统,提供了包括 SQL、流处理、机器学习和图计算等一系列数据处理功能。HBase 协处理器与 Spark 的集成,可以充分利用两者的优势,实现更加高效和强大的数据处理能力。

集成方式

1. 使用 Spark 的 HBase 连接器

Spark 提供了专门的 HBase 连接器(spark-hbase-connector),可以方便地读取和写入 HBase 中的数据。通过这个连接器,可以在 Spark 应用程序中直接调用 HBase 协处理器。

2. 在 Spark 任务中嵌入 HBase 协处理器逻辑

可以将 HBase 协处理器的逻辑封装成独立的 Java 或 Scala 类,然后在 Spark 任务中通过反射或其他方式加载并执行这些类。

应用场景

1. 实时数据处理与分析

结合 Spark Streaming 和 HBase 协处理器,可以实现实时数据的处理和分析。例如,从 Kafka 中获取实时数据流,通过 Spark Streaming 进行初步处理后,再利用 HBase 协处理器进行深入的数据分析和计算。

2. 批量数据处理优化

对于大规模的批量数据处理任务,可以利用 Spark 的分布式计算能力进行数据处理,然后将处理结果存储到 HBase 中。在这个过程中,可以调用 HBase 协处理器来执行一些特定的业务逻辑,如数据清洗、格式转换等。

实例分析

假设我们有一个实时推荐系统,需要根据用户的实时行为数据进行个性化推荐。我们可以采用以下步骤来实现:

1. 数据采集

从用户行为日志中采集实时数据,并将数据存储到 Kafka 中。

2. 数据处理

使用 Spark Streaming 从 Kafka 中获取实时数据流,并进行初步的数据清洗和处理。

3. 数据分析

在 Spark Streaming 任务中,调用 HBase 协处理器来执行复杂的推荐算法。协处理器可以根据用户的实时行为数据,计算用户的兴趣偏好,并生成个性化的推荐结果。

4. 结果存储

将推荐结果存储到 HBase 中,以便后续查询和展示。

通过这种方式,我们可以充分利用 Spark 和 HBase 协处理器的优势,实现高效、实时的个性化推荐系统。

性能优化

1. 减少数据传输

在集成 Spark 和 HBase 协处理器时,应尽量减少不必要的数据传输。例如,可以在 HBase 协处理器中完成一些初步的数据处理逻辑,减少传输到 Spark 中的数据量。

2. 批量操作

对于大量的小规模操作,可以采用批量操作的方式进行优化。例如,在 Spark 任务中,可以将多个小规模的 HBase 操作合并成一个大规模的操作,从而提高处理效率。

3. 并行处理

充分利用 Spark 的并行处理能力,将任务分解成多个子任务并行执行。同时,在 HBase 协处理器中也可以采用并行处理的方式,提高处理速度。

七、HBase协处理器在大数据处理中的挑战与对策

挑战

1. 性能瓶颈

随着数据量的不断增长,HBase协处理器可能会成为性能瓶颈。特别是在处理大规模数据时,协处理器的执行效率可能会受到限制。

2. 数据一致性

在分布式环境中,保证数据一致性是一个重要的挑战。HBase协处理器需要在多个节点上执行,如何确保数据的一致性和正确性是一个需要解决的问题。

3. 容错性

在分布式环境中,节点故障是不可避免的。HBase协处理器需要具备良好的容错性,能够在节点故障时自动恢复并继续执行。

对策

1. 性能优化

  • 并行处理:充分利用 HBase 协处理器的并行处理能力,将任务分解成多个子任务并行执行。
  • 批量操作:对于大量的小规模操作,可以采用批量操作的方式进行优化。
  • 缓存机制:引入缓存机制,减少对 HBase 的访问次数,提高处理效率。

2. 数据一致性保证

  • 分布式锁:使用分布式锁来保证数据的一致性。在协处理器执行过程中,通过分布式锁来控制对数据的访问,确保同一时间只有一个节点能够修改数据。
  • 事务管理:引入事务管理机制,确保协处理器执行的原子性和一致性。在协处理器执行过程中,通过事务管理来保证数据的正确性和一致性。

3. 容错性提升

  • 故障检测:引入故障检测机制,及时发现节点故障并进行处理。在协处理器执行过程中,通过故障检测来监控节点的状态,一旦发现节点故障,
作者 east
Hbase 9月 26,2024

HBase中的Observer型协处理器通常用于哪些场合?

HBase中的Observer型协处理器主要用于以下几种场合:

  1. 增强数据操作的功能:RegionObserver提供了客户端的数据操纵事件钩子,如Get、Put、Delete、Scan等,可以在这些操作发生时插入自定义逻辑,以实现如审计、权限检查、数据验证等附加功能。 
  2. 监控和维护WAL(Write-Ahead Log):WALObserver提供了WAL相关操作的钩子,可以用于优化WAL的写入性能或实施WAL数据的额外安全措施。 
  3. 管理元数据和执行DDL操作:MasterObserver提供了对数据表生命周期管理的钩子,如创建、删除、修改表等,可以用于实现自动分区、负载均衡或其他集群级别的管理任务。 
  4. 实现触发器式的自动化处理:Observer型协处理器可以根据预设的规则和条件,在数据变更时自动执行特定的操作,类似于关系数据库中的触发器,提高了系统的自动化水平和响应能力。 
  5. 扩展HBase的功能:用户可以利用Observer接口开发新的功能,并将这些功能作为协处理器嵌入到HBase中,从而为HBase增加定制化的业务逻辑或性能优化
作者 east
Hbase 9月 26,2024

HBase协处理器如何影响HBase的性能表现?

HBase协处理器(Coprocessor)是HBase中的一个高级功能,它允许用户将自定义的逻辑代码运行在HBase的RegionServer上,直接接触和处理数据。协处理器的引入可以显著影响HBase的性能表现,具体体现在以下几个方面:

  1. 减少数据通信开销:通过在RegionServer端执行计算,协处理器可以减少客户端和服务器之间的数据通信量,从而提高计算性能。 
  2. 实现高效的数据操作:协处理器可以用来执行求和、计数、排序等操作,这些操作在RegionServer端完成后,只需将处理结果返回给客户端,这样可以大幅提升操作效率。 
  3. 扩展HBase功能:协处理器不仅可以优化现有操作,还可以用来扩展HBase的功能,如实现二级索引、访问控制等,这些功能的加入可以改善数据检索速度和系统的安全性。 
  4. 潜在的性能风险:尽管协处理器可以提升性能,但如果协处理器的实现不善,可能会成为性能瓶颈。没有资源隔离的协处理器可能会消耗过多的系统资源,影响集群的稳定性和响应速度。 
作者 east
Hbase 9月 26,2024

HBase协处理器与传统数据库中的触发器有何不同?

HBase协处理器与传统数据库中的触发器主要有以下几点不同:

  1. 应用场景和目的:HBase协处理器是NoSQL数据库HBase中的一个高级特性,用于在RegionServer级别执行自定义逻辑,如建立二级索引、复杂过滤器和访问控制等。而传统数据库中的触发器通常用于在数据修改前后自动执行特定的操作,以维护数据完整性或执行自动化任务。
  2. 执行时机和位置:协处理器的代码直接运行在RegionServer上,可以在数据操作发生时(如Put、Get等)被触发,执行与数据相关的计算或操作。触发器则是数据库管理系统内置的功能,在数据库层面上监控和响应数据变化事件。
  3. 功能和灵活性:协处理器不仅限于触发器的功能,它们可以执行更广泛的操作,包括但不限于数据验证、计算聚合、执行存储过程等。触发器的功能相对受限,通常专注于对数据变更的即时响应。
  4. 性能影响:由于协处理器在数据存储的地方执行计算,可以减少网络通信开销,提高数据处理的效率。触发器虽然可以优化数据库操作,但可能不会像协处理器那样显著减少数据在网络中的传输。
  5. 安全性和风险:协处理器具有较高的权限,可以直接访问和修改数据,这可能带来安全风险。触发器通常运行在数据库的权限模型之下,受到更严格的安全控制。
作者 east
Hbase, Hive 9月 18,2024

写入Hbase报错CallQueueTooBigException的解决

在CDH6.3.2,通过hive外部表插入数据到hbase时报错:

24/09/18 10:43:04 ERROR status.SparkJobMonitor: Spark job[3] failed [INFO] 2024-09-18 10:43:04.156 – [taskAppId=TASK-41-192147-585089]:[127] – -> java.util.concurrent.ExecutionException: Exception thrown by job at org.apache.spark.JavaFutureActionWrapper.getImpl(FutureAction.scala:337) at org.apache.spark.JavaFutureActionWrapper.get(FutureAction.scala:342) at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:404) at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:365) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 8.0 failed 4 times, most recent failure: Lost task 5.3 in stage 8.0 (TID 85, cdh02, executor 8): java.lang.RuntimeException: Hive Runtime Error while closing operators: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 4 actions: CallQueueTooBigException: 4 times, servers with issues: cdh02,16020,1720579381747 at org.apache.hadoop.hive.ql.exec.spark.SparkReduceRecordHandler.close(SparkReduceRecordHandler.java:463) at org.apache.hadoop.hive.ql.exec.spark.HiveReduceFunctionResultList.closeRecordProcessor(HiveReduceFunctionResultList.java:67) at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:96) at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 4 actions: CallQueueTooBigException: 4 times, servers with issues: cdh02,16020,1720579381747 at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:198) at org.apache.hadoop.hive.ql.exec.FileSinkOperator.closeOp(FileSinkOperator.java:1058) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:686) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:700) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:700) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:700) at org.apache.hadoop.hive.ql.exec.spark.SparkReduceRecordHandler.close(SparkReduceRecordHandler.java:447) … 17 more Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 4 actions: CallQueueTooBigException: 4 times, servers with issues: cdh02,16020,1720579381747 at org.apache.hadoop.hbase.client.BatchErrors.makeException(BatchErrors.java:54) at org.apache.hadoop.hbase.client.AsyncRequestFutureImpl.getErrors(AsyncRequestFutureImpl.java:1226) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doFlush(BufferedMutatorImpl.java:309) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:241) at org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat$MyRecordWriter.close(HiveHBaseTableOutputFormat.java:130) at org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat$MyRecordWriter.close(HiveHBaseTableOutputFormat.java:168) at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:195)

从错误日志来看,问题出现在通过Hive向HBase写入数据时,具体错误是CallQueueTooBigException,即HBase的请求队列太大,导致数据写入失败。

错误分析:

  1. CallQueueTooBigException:这是HBase的一种负载保护机制。当HBase的RPC队列过载,达到最大处理能力时,会抛出此异常,表示服务器无法处理更多的请求。这通常意味着HBase服务器在写入期间负载过重。
  2. Spark/Hive 与 HBase交互问题:在通过Hive和Spark写入HBase时,数据可能被并行地、大批量地发送给HBase。如果HBase负载过高或者写入并发量过大,可能会出现请求堆积,导致CallQueueTooBigException。
  3. 重试机制:日志中提到”RetriesExhaustedWithDetailsException”,说明Spark在多次重试之后仍然无法成功完成写入操作,导致任务最终失败。

可能的原因:

  1. HBase负载过大:HBase集群可能承受了过多的请求,导致队列超载。
  2. HBase资源不足:HBase服务器的硬件资源(如内存、CPU等)不足,无法处理高并发写入请求。
  3. 写入数据量过大:Hive通过Spark向HBase写入的数据量太大,超过了HBase的处理能力。
  4. 不合理的HBase配置:HBase的写入配置可能设置得过低,例如hbase.regionserver.handler.count(处理器数量)、hbase.rpc.timeout(RPC超时设置)等参数未合理配置。

解决方案:

  1. 调优HBase集群:
    • 增加HBase的处理能力:
      • 增加HBase的RegionServer实例数量。
      • 调整HBase集群的硬件配置,增加内存和CPU资源。
    • 调整HBase配置:
      • 增加hbase.regionserver.handler.count,该参数控制RegionServer可以同时处理的并发请求数量。
      • 调整hbase.ipc.server.max.callqueue.size,增大HBase RPC队列的大小以处理更多的并发请求。
      • 增加hbase.rpc.timeout的值,避免超时过早导致任务失败。
  2. 减少数据写入压力:
    • 批量写入控制:通过Spark作业配置控制每次写入的数据批量大小,减少单次请求的数据量。可以调整Spark配置参数,如spark.sql.shuffle.partitions,以减少分区数。
    • 限制并发写入:如果数据量特别大,可以控制并发写入HBase的作业数量,减小每次写入的压力。
  3. 重试机制配置:
    • 如果错误为临时性问题,可以在HBase客户端或Spark作业中增加重试次数,例如调整hbase.client.retries.number参数,确保在负载压力下仍然有更大的重试机会。
  4. 观察HBase监控日志:使用HBase的监控工具如HBase Web UI或者通过Ganglia等监控工具,观察HBase的RegionServer、请求队列、内存等资源使用情况,找出具体的瓶颈。

作者 east
CDH, Hbase, Hive 4月 25,2024

hive创建hbase外部关联表

在cdh6.3.2已经做好hbase和hive相关配置,这里不阐述。

要创建上述的表结构,你需要先在HBase中创建相应的表,然后在Hive中创建一个EXTERNAL TABLE来映射到这个HBase表。以下是详细的步骤:

步骤1:在HBase中创建表

  1. 确定HBase的安装和配置是否正确。
  2. 确定HBase的shell工具能够正常使用。

打开HBase shell:

hbase shell

在HBase shell中,创建一个表analysis_minute和一个列族bls:

create 'analysis_minute', 'bls'

退出HBase shell:

quit

步骤2:在Hive中创建EXTERNAL TABLE

  1. 确保Hive安装和配置正确。
  2. 确保HBaseStorageHandler已经包含在Hive的配置中。

使用Hive的命令行界面或者你的IDE来执行下面的SQL语句:

CREATE EXTERNAL TABLE analysis_minute_hbase (
  key STRING,
  pid STRING,
  time STRING,
  val STRING
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping' = ':key,bls:pid,bls:time,bls:val',
  'serialization.format' = '1'
)
TBLPROPERTIES (
  'hbase.table.name' = 'analysis_minute',
  'last_modified_by' = 'hdfs',
  'last_modified_time' = '1713990161',
  'numFiles' = '0',
  'numFilesErasureCoded' = '0',
  'numRows' = '0',
  'rawDataSize' = '0',
  'storage_handler' = 'org.apache.hadoop.hive.hbase.HBaseStorageHandler',
  'totalSize' = '0'
);

这段SQL语句做了以下几件事情:

  • 创建了一个EXTERNAL TABLE,意味着数据存储在HBase中,而不是Hive的数据仓库目录中。
  • 使用了HBaseStorageHandler来允许Hive与HBase交互。
  • 定义了四个字段,分别对应于HBase表中的列。
  • 通过WITH SERDEPROPERTIES定义了Hive字段到HBase列族的映射。
  • 通过TBLPROPERTIES定义了HBase表的名称和其他属性。

注意事项:

  • 确保HBase表analysis_minute和列族bls已经存在,Hive才能正确创建EXTERNAL TABLE。
  • 确认Hive配置文件中已经包含了HBaseStorageHandler的jar包。
  • 如果HBase表或列族名称与Hive表结构中的名称不匹配,Hive将无法找到对应的HBase表。
  • 在执行上述Hive SQL语句之前,你可能需要根据你的Hadoop和Hive版本调整配置参数。
作者 east
CDH, Hbase 4月 25,2024

如何在docker上面使用hbase shell

在新公司上班,hbase是cdh6.3.2安装在docker上面,如何直接在shell上面使用hbase shell是访问不到的。使用教程如下:

要在Docker上使用CDH 6.3.2中的HBase shell,你需要按照以下步骤操作:

步骤1:启动HBase服务

  1. 确保你的Docker环境已经启动,并且CDH 6.3.2的HBase服务已经随着CDH的安装被正确配置和启动。这通常通过Cloudera Manager完成,它提供了一个用户界面来管理CDH集群中的所有服务。
  2. 如果HBase服务尚未启动,你可以使用Cloudera Manager启动HBase Master和HBase RegionServer服务。

步骤2:进入HBase Shell

  1. 确定HBase服务运行正常后,使用Docker exec命令进入到运行HBase的容器中:docker exec -it <container_name_or_id> /bin/bash请将<container_name_or_id>替换为实际的容器名称或ID。
  2. 在容器内部,你可以使用HBase shell命令来启动HBase的交互式命令行界面:hbase shell

步骤3:使用HBase Shell命令

在HBase shell中,你可以执行多种操作,例如:

  • 创建表:create 'table_name', 'column_family'
  • 列出表:list
  • 插入数据:put 'table_name', 'row_id', 'column_family:column', 'value'
  • 获取数据:get 'table_name', 'row_id'
  • 扫描表:scan 'table_name'
  • 禁用表:disable 'table_name'
  • 启用表:enable 'table_name'
  • 删除表:drop 'table_name'

示例:在HBase shell中创建表并插入数据

  1. 启动HBase shell。
  2. 创建一个名为my_table的表,使用名为cf的列族:create 'my_table', 'cf'
  3. 向my_table表中插入一行数据,行键为row1,列族为cf,列名为data,值为my_value:put 'my_table', 'row1', 'cf:data', 'my_value'
  4. 扫描my_table表以查看数据:scan 'my_table'
  5. 退出HBase shell:exit

注意事项:

  • 请确保你有权限访问Docker容器以及HBase服务。
  • 根据你的CDH版本和配置,某些命令和文件路径可能略有不同。
  • 确保在操作过程中遵循安全最佳实践,如使用安全的密码和权限。

在上面步骤2如果不知道HBase的
container_name_or_id ,可以用下面的方法:

要查看 Docker 容器的 ID 或名称,您可以使用以下命令:

docker ps 

这将显示当前正在运行的 Docker 容器列表,包括容器的 ID、名称、状态等信息。您可以在列表中找到正在运行的 HBase 容器的 ID 或名称。

如果您有多个容器在运行,并且需要筛选特定的容器,您可以使用 grep 命令来过滤结果,例如:

docker ps | grep hbase 

这将仅显示包含 “hbase” 字符串的容器信息,以帮助您找到 HBase 容器的 ID 或名称。实际上关键字可能是cdh

一旦找到了容器的 ID 或名称,您可以将其替换到 <container_id_or_name> 的位置,并继续执行进入容器的命令。

作者 east
Hbase 4月 20,2024

hbase MultiRowRangeFilter的原理、作用和实例

MultiRowRangeFilter是HBase中的一个过滤器,用于在扫描操作中过滤多个行键范围

原理

MultiRowRangeFilter的原理是将多个行键范围组合成一个过滤器,然后在扫描操作中应用这个过滤器。当扫描器遍历HBase表的行时,它会检查每一行的行键是否在MultiRowRangeFilter中指定的行键范围内。如果行键在范围内,扫描器会返回这一行;否则,扫描器会跳过这一行。

作用

MultiRowRangeFilter的主要作用是在扫描操作中过滤多个行键范围。它可以帮助用户更高效地查询数据,因为它可以减少从HBase表中读取的数据量。这对于大型数据集和复杂查询非常有用,因为它可以显著提高查询性能。

使用例子

以下是一个使用MultiRowRangeFilter的示例:

import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;

public class MultiRowRangeFilterExample {
    public static void main(String[] args) {
        // 创建一个 MultiRowRangeFilter 对象
        MultiRowRangeFilter filter = new MultiRowRangeFilter(
                Arrays.asList(
                        new RowRange("row1", true, "row10", false),
                        new RowRange("row20", true, "row30", false)
                )
        );

        // 执行 HBase 扫描操作,并传递 MultiRowRangeFilter 对象
        HBaseAdmin admin = new HBaseAdmin();
        try {
            Scan scan = new Scan();
            scan.setFilter(filter);
            ResultScanner scanner = admin.scan("my_table", scan);
            try {
                // 遍历扫描结果
                for (Result result : scanner) {
                    // 处理扫描结果
                    String rowKey = new String(result.getRow());
                    System.out.println("Row Key: " + rowKey);
                }
            } finally {
                scanner.close();
            }
        } finally {
            admin.close();
        }
    }
}

在这个示例中,我们首先创建了一个MultiRowRangeFilter对象,并添加了两个行键范围。第一个范围是row1到row5(不包括row5),第二个范围是row10到row20(包括row20)。

然后,我们创建了一个Scan对象,并设置了过滤器为MultiRowRangeFilter。最后,我们执行了扫描操作,并遍历查询结果。在这个过程中,扫描器会过滤掉不在指定行键范围内的行。

注意事项

在使用MultiRowRangeFilter时,需要注意以下几点:

  1. 行键范围的顺序:MultiRowRangeFilter中的行键范围应该是有序的。如果行键范围重叠或无序,可能会导致查询结果不正确。
  2. 行键范围的数量:MultiRowRangeFilter支持多个行键范围,但是行键范围的数量不能超过一定的限制。在HBase 1.x和2.x版本中,这个限制是128。如果超过这个限制,可能会导致查询失败。
  3. 性能优化:MultiRowRangeFilter可以帮助用户更高效地查询数据,但是在某些情况下,它可能会导致性能下降。例如,当行键范围分布不均匀时,可能会导致扫描器在某些Region上花费更多的时间。为了提高查询性能,可以考虑使用其他过滤器或者优化行键范围的设计。

总之,MultiRowRangeFilter是一个强大的HBase过滤器,可以帮助用户更高效地查询数据。在使用它时,需要注意行键范围的顺序、数量和分布,以确保查询结果的正确性和性能。

作者 east
doris, Hbase 4月 13,2024

用Flink开发实时数仓,用hase还是doris存储维度表选型对比

在使用Flink开发实时数仓时,选择合适的存储方案对于保证数据的实时性、一致性和可用性至关重要。在这里,我们将对比Doris和HBase,以帮助您做出决策。

  1. Doris

Doris是一个基于MPP(Massively Parallel Processing)架构的分布式列式存储数据库,具有高性能、高可用性和高扩展性。Doris适用于大规模数据分析和实时查询场景,特别是对实时性要求较高的场景。

优点:

  • 高性能:Doris具有高性能的列式存储和向量化查询引擎,能够快速处理大量数据。
  • 易用性:Doris提供了简单易用的SQL接口,支持标准SQL语法,方便用户进行数据查询和分析。
  • 可扩展性:Doris支持水平扩展,可以根据业务需求动态调整集群规模。

缺点:

  • 对实时性要求较高的场景,Doris的实时性可能不如HBase。
  • 数据更新和删除操作相对较慢,不适合频繁更新的场景。
  1. HBase

HBase是一个基于Hadoop的分布式、可扩展、大规模列式存储的NoSQL数据库。HBase适用于需要实时读写的场景,特别是对实时性要求较高的场景。

优点:

  • 高实时性:HBase具有低延迟的读写性能,适用于实时数据处理和分析场景。
  • 可扩展性:HBase支持水平扩展,可以根据业务需求动态调整集群规模。
  • 数据一致性:HBase提供了强一致性保证,适用于对数据一致性要求较高的场景。

缺点:

  • 查询性能相对较低,不适合大规模数据分析场景。
  • 数据更新和删除操作相对较慢,不适合频繁更新的场景。

总结:

在选择存储方案时,需要根据实际业务场景和需求来权衡。如果实时性要求较高,可以选择HBase;如果需要大规模数据分析和查询,可以选择Doris。在实际应用中,也可以将两者结合使用,以满足不同场景的需求。

作者 east
Hbase, Hive 3月 24,2022

生产环境选型考虑:Hive全方位对比HBase

Apache Hive 和 Apache HBase 是用于大数据的令人难以置信的工具。虽然它们的功能有一些相同之处,但 Apache Hive 和 Apache HBase 都具有独特的作用,使它们更适合特定的场景。一些主要区别包括:

Apache Hive 是建立在 Hadoop 之上的数据仓库系统,而 Apache HBase 是在 HDFS 或 Alluxio 之上的 NoSQL 键/值。

Hive 为 Spark/Hadoop 数据提供 SQL 功能,HBase 实时存储和处理 Hadoop 数据。

HBase 用于实时查询或大数据,而 Hive 不适合实时查询。

Hive 最适合用于数据的分析查询,而 HBase 主要用于将非结构化 Hadoop 数据作为湖存储或处理。

归根结底,将 Apache Hive 与 Apache HBase 进行比较就像将苹果与橘子,或 Google 与 Facebook 进行比较。虽然这两个实体相似,但它们不为用户提供相同的功能。然而,尽管存在差异,Apache Hive 和 Apache HBase 都是处理大数据时使用的绝佳工具。继续阅读以了解有关 Apache Hive、Apache HBase 的更多信息,以及它们的各种功能如何在处理大数据时改善您的业务。

什么是 Apache Hive?

让我们从 Apache Hive 开始“Hive 与 Hbase”的考试。 Apache Hive 是一个构建在 Hadoop 之上的数据仓库系统。它为大型 Hadoop 非结构化数据池提供数据汇总、分析和查询。您可以查询存储在 Apache HDFS 中的数据,甚至可以查询存储在 Apache HBase 中的数据。 MapReduce、Spark 或 Tez 执行该数据。

Apache Hive 使用一种称为 HiveQL(或 HQL)的类似 SQL 的语言来查询批量 MapReduce 作业。 Hive 还支持 ACID 事务,例如 INSERT/DELETE/UPDATE/MERGE 语句。从更新 3.0 开始,Hive 通过减少表模式约束和提供对矢量化查询的访问权限为此添加了一些额外的功能。

简而言之,Apache Hive 为 Spark/Hadoop 数据提供了 SQL 特性(MapReduce 的 Java API 不太容易使用),它既是一个数据仓库系统,也是一个具有丰富集成和大量用户友好的 ETL 工具特征。与许多类似的产品(例如 Apache Pig)一样,Hive 在技术上可以处理许多不同的功能。例如,Hive 允许您使用 SQL,而不是为 MapReduce 作业编写冗长的 Java。您在堆栈中使用 Hive 的原因将因您的需求而异。

Hive 的核心功能

Hive 可以帮助精通 SQL 查询与 Hadoop 集成的各种数据存储中的数据。由于它符合 JDBC,它还与现有的基于 SQL 的工具集成。运行 Hive 查询可能需要一段时间,因为默认情况下它们会遍历表中的所有数据。尽管如此,Hive 的分区功能限制了数据量。分区允许对存储在单独文件夹中的数据运行过滤查询,并且只读取与查询匹配的数据。例如,如果文件将日期格式作为其名称的一部分,它可以用于仅处理在特定日期之间创建的文件。

以下是 Hive 的一些功能:

它使用 SQL。

出色的 Apache Spark 和 Tez 集成。

您可以使用用户定义函数 (UDF)。

它有很棒的带有 Hive 3+ 的 ACID 表。

您可以查询庞大的 Hadoop 数据集。

大量集成(例如,BI 工具、Pig、Spark、HBase 等)。

其他基于 Hive 的功能(例如 HiveMall)可以提供一些额外的独特功能。

什么是 Apache HBase?

Apache HBase 是运行在 HDFS 或 Alluxio 之上的 NoSQL 键/值存储。与 Hive 不同,HBase 操作在其数据库而不是 MapReduce 作业上实时运行。所以,你有随机访问能力——这是 HDFS 所缺少的。由于 HDFS 不是为处理具有随机读/写操作的实时分析而构建的,因此 HBase 为 HDFS 带来了大量功能。您可以将其设置为通过 Hadoop 处理的实时数据的数据存储。您可以将它与 MapReduce 集成。更好的是,您可以将它与 Hive 和 MapReduce 集成以获得 SQL 功能。

HBase 包含表,并且表被拆分为列族。列族(在架构中声明)将一组特定的列组合在一起(列不需要架构定义)。例如,“message”列族可以包括以下列:“to”、“from”、“date”、“subject”和“body”。 HBase 中的每个键/值对都定义为一个单元格,每个键由 row-key、c​​olumn family、column 和 time-stamp 组成。 HBase 中的一行是由行键标识的一组键/值映射。 HBase 享有 Hadoop 的基础设施并横向扩展。

简而言之,HBase 可以存储或处理具有近乎实时读/写需求的 Hadoop 数据。这包括结构化和非结构化数据,尽管 HBase 擅长后者。 HBase 具有低延迟,可通过 shell 命令、Java API、Thrift 或 REST 访问。 HBase 通常是 Hadoop 集群中的存储层,Adobe 等大型品牌利用 HBase 来满足其所有 Hadoop 存储需求。

HBase的核心特性

HBase 通过将数据存储为模仿 Google 的 Bigtable 的键/值来工作。它支持四种主要操作:添加或更新行、扫描以检索一系列单元格、返回指定行的单元格以及删除以从表中删除行、列或列版本。版本控制是可用的,因此它可以获取数据的先前值(历史记录不时删除以通过 HBase 压缩清理空间)。尽管 HBase 包含表,但仅表和列族需要模式,列不需要模式,并且它包括增量/计数器功能。

以下是 HBase 的一些功能:

它支持键值

它是一个支持随机读/写操作的 NoSQL 数据库

中型对象 (MOB) 支持

HBase 支持协处理器。这对于计算海量数据非常有用,并且操作类似于 MapReduce 作业,并具有一些额外的好处。

允许您利用 Apache Phoenix

您可以执行扫描操作

Hive 和 HBase 的局限性是什么?

每个工具都有自己的优缺点。因此,Hive 和 HBase 总是存在某些限制。阅读下面的这些限制。

Hive限制

首先,Hive 具有非常基本的 ACID 功能。他们到达了 Hive 0.14,但没有 MYSQL 等产品的成熟度。也就是说,仍然有 ACID 支持,并且每个补丁都会变得更好。

Hive 查询通常也具有高延迟。由于它在 Hadoop 上运行批处理,因此可能需要几分钟甚至几小时才能获得查询结果。此外,更新数据可能既复杂又耗时。

Hive 在小数据查询(尤其是大容量数据)方面并不是最好的,大多数用户倾向于依靠传统的 RDBMS 来处理这些数据集。

HBase 限制

HBase 查询采用自定义语言,需要经过培训才能学习。类似 SQL 的功能可以通过 Apache Phoenix 实现,尽管它是以维护模式为代价的。此外,HBase 并不完全符合 ACID,尽管它确实支持某些属性。最后但同样重要的是——为了运行 HBase,你需要 ZooKeeper——一个用于分布式协调的服务器,例如配置、维护和命名。

HBase 可以通过协同处理来处理小数据,但它仍然不如 RDBMS 有用。

实践中的 Hive 和 HBase

正如 Hive 和 HBase 在某些场景中有其局限性一样,它们也有它们蓬勃发展的特定场景。在下面的实践中阅读 Hive 和 HBase。

Hive使用场景

Hive 应该用于对一段时间内收集的数据进行分析查询——例如,计算趋势或网站日志。

我们通常会看到两个 Hive 用例:

HDFS 的 SQL 查询引擎 – Hive 可以成为 SQL 查询的重要来源。您可以利用 Hive 处理 Hadoop 数据湖并将它们连接到您的 BI 工具(如 Tableau)以实现可见性。

具有 HBase、Pig、Spark 或 Tez 的表存储层。大量 HDFS 工具使用 Hive 作为表存储层。从技术上讲,这可能是其最大的全球用例。

Hive 使用的真实例子

目前有超过 4,330 家公司品牌使用 Hive。这比使用 HBase 少,但仍然有很多品牌——尤其是因为大多数公司仍在运行 SQL 堆栈。

Scribd 将 Hive 典型的数据科学用例与 Hadoop 结合使用。这包括机器学习、数据挖掘和 BI 工具的临时查询。确实,Scribd 使用 Hive 作为其整体 Hadoop 堆栈的一部分——这是它最适合的地方。您可以将 Hive 和 HBase 放在同一个集群上进行存储、处理和即席查询。

MedHelp 将 Hive 用于其 Find a Doctor 功能。他们每天在 Hadoop 堆栈上处理数百万个查询,而 Hive 像专业人士一样处理它。

Last.fm 还使用 Hive 进行临时查询。再次,这就是 Hive 的亮点。如果您需要在 Hadoop 上进行临时查询,请使用 Hive。

HubSpot、hi5、eHarmony 和 CNET 也使用 Hive 进行查询。

HBase 使用场景

HBase 非常适合实时查询大数据(例如 Facebook 曾经将其用于消息传递)。 Hive 不应该用于实时查询,因为结果需要一段时间。

HBase 主要用于将非结构化 Hadoop 数据作为湖存储和处理。您也可以使用 HBase 作为所有 Hadoop 数据的仓库,但我们主要看到它用于写入繁重的操作。

HBase 使用的真实使用场景

几乎所有这些案例都将使用 HBase 作为 Hadoop 的存储和处理工具——这是它自然适合的地方。

Adobe 自推出以来一直在运行 HBase。他们的第一个节点早在 2008 年就启动了,他们目前将 HBase 用于他们的 30 个 HDFS 节点。他们将其用于内部结构化数据和非结构化外部数据。

Flurry 使用 HBase 运行 50 个 HDFS 节点,它使用 HBase 处理数百亿行。

HubSpot 主要使用 HBase 进行客户数据存储。作为 HDFS 堆栈的一部分,他们还使用 Hive 对该 HBase 数据运行查询。

Twitter 也在他们的 Hadoop 堆栈中使用 HBase。它用于用户搜索的内部数据。

Streamy 从 SQL 切换到带有 HBase 的 Hadoop 堆栈。他们声称能够比以往更快地处理。

Sematext(为 HBase 创建 SMP)使用 HBase 和 MapReduce 堆栈。同样,这两者可以很好地协同工作(通常通过 Hive 加以利用),因为它们完美地互补了彼此的优缺点。 超过 10,000 家企业使用 HBase。而且大部分都很大。在当前的技术生态系统中,大品牌倾向于更频繁地利用 Hadoop,因此 HBase 往往处于一些大堆栈中(例如,TCS、Marin Software、Taboola、KEYW Corp 等)

作者 east
Hbase 6月 4,2021

使用Hbase出现KeeperErrorCode = Session expired for /hbase/hbaseid

在使用FusionInsight HD大数据平台安全认证模式时,用hbase Shell或代码调用hbase时,有时出现”KeeperErrorCode = Session expired for /hbase/hbaseid”等错误。这是因为安全认证session过时了。

可以切换到hd,使用HBase客户端安装目录 ,输入下面命令

source bigdata_env
kinit 组件业务用户
按提示输入密码

再使用hbase shell就不会上面的问题。
如果是在Spark上调用的话,可以在调用的shell脚本上kinit 指定机机认证的安全认证文件。或者干脆写个定时执行的脚本配置在crontab上。

作者 east
Hbase 3月 10,2021

Hbase统计海量数据行数

业务需要统计每天hbase的数据量,而且每天增量有上百万条。

网上找了些代码,很多推荐使用Coprocessor的方式,执行效率高。但在我的大数据环境 运行出错,报“No registered coprocessor service found for name AggregateService in region xxx”。后来发现是第一次运行时需要下面这些代码来修改配置

 String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        if (! descriptor.hasCoprocessor(coprocessorClass)) {
            descriptor.addCoprocessor(coprocessorClass);
        }
        admin.modifyTable(name, descriptor);
        admin.enableTable(name);
public void rowCount(String tablename){
    try {
        //提前创建connection和conf
        Admin admin = connection.getAdmin();
        TableName name=TableName.valueOf(tablename);
        //先disable表,添加协处理器后再enable表
        admin.disableTable(name);
        HTableDescriptor descriptor = admin.getTableDescriptor(name);
        String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        if (! descriptor.hasCoprocessor(coprocessorClass)) {
            descriptor.addCoprocessor(coprocessorClass);
        }
        admin.modifyTable(name, descriptor);
        admin.enableTable(name);

        //计时
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        Scan scan = new Scan();
        AggregationClient aggregationClient = new AggregationClient(conf);

        System.out.println("RowCount: " + aggregationClient.rowCount(name, new LongColumnInterpreter(), scan));
        stopWatch.stop();
        System.out.println("统计耗时:" +stopWatch.getTotalTimeMillis());
    } catch (Throwable e) {
        e.printStackTrace();
    }
}
作者 east

1 2 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.