gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

月度归档9月 2024

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  2024   /  
  • 9月
  • ( 页面3 )
Hive, Impala 9月 23,2024

Hive/Impala利用时间窗口函数巧妙实现2种不同类型数据间隔出现

在做一个需求,要求计算在不同时间段的多个最大值(波峰)和最小值(波谷),并且要求波峰和波谷是间隔出现的。

原始数据如下:

要求按时间(ptime)排序,同1个soc_id必须是1个peak和1个valley间隔,可能会有波峰波谷间隔出现多个;有多个peak连续出现时,取pvalue最大值(如果都相同取第一个值);有多个valley连续出现时,取pvalue最小值(如果都相同取第一个值)

实现代码如下:

WITH LagResult AS (
— 计算每一行的前一行的 peak_or_valley 值,用于后续分组
SELECT
soc_id,
ds,
ptime,
pvalue,
peak_or_valley,
LAG(peak_or_valley) OVER (PARTITION BY soc_id ORDER BY ptime) AS prev_peak_valley
FROM
your_table
),
GroupedPeaksAndValleys AS (
— 基于 LAG 结果生成每个 peak 和 valley 的分组编号
SELECT
soc_id,
ds,
ptime,
pvalue,
peak_or_valley,
— 通过对比当前值和前一个值是否不同来创建组号
SUM(CASE WHEN peak_or_valley != prev_peak_valley THEN 1 ELSE 0 END)
OVER (PARTITION BY soc_id ORDER BY ptime ASC) AS group_id
FROM
LagResult
),
FilteredPeaksAndValleys AS (
— 按每个分组的 peak 和 valley 排序,并选取最大或最小的 pvalue
SELECT
soc_id,
ds,
ptime,
pvalue,
peak_or_valley,
group_id,
ROW_NUMBER() OVER (PARTITION BY soc_id, group_id ORDER BY
CASE WHEN peak_or_valley = ‘peak’ THEN pvalue END DESC, — 对 peak 按 pvalue 降序
CASE WHEN peak_or_valley = ‘valley’ THEN pvalue END ASC, — 对 valley 按 pvalue 升序
ptime ASC — 在相同 pvalue 的情况下按 ptime 升序
) AS rn
FROM
GroupedPeaksAndValleys
)
SELECT
soc_id,
ds,
ptime,
pvalue,
peak_or_valley
FROM
FilteredPeaksAndValleys
WHERE
rn = 1 — 只保留每个 group 中的第一个,即 pvalue 最大/最小且时间最早的记录
ORDER BY
soc_id, ptime;

在上面的代码:

  1. LagResult CTE: 首先,我们通过 LAG() 函数计算出每行的前一个 peak_or_valley,这为后续分组做准备。
  2. GroupedPeaksAndValleys CTE: 使用 SUM(CASE ...) OVER 来生成分组编号(group_id)。当当前的 peak_or_valley 与前一个不同的时候,我们将分组编号加 1,从而将连续的相同 peak 或 valley 分为一组。
  3. FilteredPeaksAndValleys CTE: 对每个 group_id 中的 peak 和 valley 排序,选择 pvalue 最大(对于 peak)或最小(对于 valley)的记录,确保在 pvalue 相同时选择时间最早的记录。
  4. 最终结果: 按时间 (ptime) 排序,输出满足要求的 peak 和 valley 数据。

这个查询避免了嵌套窗口函数的限制,能够正确处理连续的 peak 和 valley,并选取最大或最小的 pvalue。

作者 east
Android 9月 23,2024

如何处理Activity在不同设备配置变化时的生命周期问题?

在Android开发中,处理Activity在不同设备配置变化时的生命周期问题通常涉及到对onConfigurationChanged()方法的重写以及配置变更监听器的设置。以下是一些关键步骤和建议:

  1. 声明配置变更监听:
    在AndroidManifest.xml文件中,对于需要响应配置变化的Activity,需要在<activity>标签内添加android:configChanges属性,并指定相应的配置类型,如屏幕方向、键盘可用性等。
  2. 重写onConfigurationChanged()方法:
    在Activity中重写onConfigurationChanged(Configuration newConfig)方法,以便在配置发生变化时执行自定义逻辑。这个方法会在配置实际改变之前被调用,允许开发者保存状态或更新UI。
  3. 管理资源和状态:
    在onConfigurationChanged()方法内部,根据新的配置信息来管理资源和应用状态。例如,如果配置变化包括屏幕方向的改变,可能需要重新加载布局资源或者调整控件的大小和位置。
  4. 避免不必要的Activity重建:
    通过声明android:configChanges,可以告诉系统在这些配置变化发生时不需要销毁并重新建Activity。这样可以避免不必要的资源消耗和潜在的性能问题。
  5. 测试不同配置:
    确保在多种设备配置上进行测试,以验证Activity能够正确地处理配置变化。这包括不同尺寸的屏幕、不同的语言和地区设置等。
  6. 考虑使用Fragment:
    对于复杂的界面,可以考虑使用Fragment来封装可重用的UI部分,这样即使在配置变化时也能更灵活地管理界面的重构。
作者 east
Android 9月 23,2024

Activity的生命周期中哪个方法是在Activity首次创建时唯一必须实现的?

在Activity的生命周期中,onCreate()方法是在Activity首次创建时唯一必须实现的。这个方法在Activity对象被第一次创建时被调用,用于执行基本的应用启动逻辑,如设置布局、初始化控件等。这些操作在Activity的整个生命周期中只应该发生一次。onCreate()方法接收一个savedInstanceState参数,这是一个Bundle对象,包含了Activity之前保存的状态信息。如果Activity之前从未存在过,这个Bundle对象的值为null. 

作者 east
Flink 9月 23,2024

Flink Lookup Join在性能优化方面都提供了哪些机制?

Apache Flink的Lookup Join是一种特殊类型的连接操作,它允许流表与外部数据库中的维表进行实时关联。在性能优化方面,Flink提供了以下几种机制:

  1. 同步和异步查询机制:Flink支持通过查询提示开启同步和异步查询模式。在异步模式下,可以并发地向维表发送多个数据项的查询请求,这样可以减少等待单个查询响应的时间,从而提高整体的查询性能。 
  2. Ordered和Unordered查询机制:在异步查询模式中,Flink还提供了Ordered和Unordered两种输出模式。Ordered模式需要等待所有数据查询完成并对数据进行排序后才能发送给下游,而Unordered模式不对输出顺序做要求,可以在查询到结果后立即发送,这样可以进一步提升性能。 
  3. Cache机制:Flink支持使用Cache机制来优化Lookup Join的性能。通过将维表数据缓存到内存中,可以减少对数据库的直接查询次数,加快数据检索速度。Flink提供了Full Caching(将所有数据缓存)和Partial Caching(使用LRU算法缓存最近使用的数据)两种缓存策略。 
  4. 状态管理优化:Flink的状态后端(State Backends)和Checkpoint机制也对Lookup Join的性能有影响。选择合适的状态后端和调整Checkpoint间隔可以优化状态的存储和恢复,从而间接提升Lookup Join的性能。
  5. 查询提示(Query Hints):Flink允许用户通过查询提示来指导优化器如何执行Lookup Join,这可以帮助优化器生成更高效的执行计划。 
作者 east
Flink 9月 23,2024

Flink Lookup Join 相比传统Join有哪些优势?

Flink Lookup Join 的优势

Flink Lookup Join 相比传统的 Join 操作,在实时数据处理方面具有明显的优势:

  1. 低延迟查询:Lookup Join 能够根据键值动态查找外部维表中的数据,这对于需要快速响应的实时数据流处理场景非常适用。 
  2. 异步查询:Flink 支持异步查找机制,这有助于提高查询性能和吞吐量,尤其是在处理大规模数据时。 
  3. 高效的维度查询:通过将维表数据加载到内存中并进行索引,Lookup Join 提供了快速查找和关联维度数据的能力,减少了对外部存储系统的访问次数。 
  4. 实时维度关联:Lookup Join 允许实时流数据与维表进行关联,这对于获取最新的维度信息至关重要,如在电商领域中将订单流与商品维表关联。 
  5. 动态数据关联:适用于维表数据动态变化的场景,确保流数据处理过程中能够及时获取最新的维度信息。 
  6. 事件时间关联:支持基于事件时间的关联操作,这在处理时间敏感的数据时尤为重要,如用户行为分析等。 
作者 east
doris 9月 18,2024

Doris Unique模型用于离线数仓的ODS层的好处

在离线数仓的ODS(Operational Data Store,操作型数据存储)层使用 Doris 的 Unique 模型有多个好处。Doris 提供多种数据模型,其中 Unique 模型专门用于保证数据唯一性,适合一些有去重需求的场景。将 Unique 模型应用于 ODS 层的主要优点包括:

1. 保证数据的唯一性

ODS 层通常存储从源系统抽取的原始数据,这些数据可能存在重复问题。Doris 的 Unique 模型通过唯一键(Primary Key)确保数据的唯一性,避免重复数据的出现。这在整合来自多个源的相同实体时非常重要,比如处理重复的用户、订单、交易等。

2. 便于数据更新

ODS 层的数据可能会发生变化,数据会不断地被更新或增量加载。Doris 的 Unique 模型允许基于唯一键进行记录的更新和覆盖,这使得数据变更(如状态更新、数据修正等)能够方便地应用到表中,而不必执行复杂的删除和插入操作。

3. 提高查询效率

尽管 ODS 层主要用作数据存储的中间层,但在有需要时,也会进行数据查询或分析。Unique 模型通过主键索引提高查询效率,特别是针对带有唯一键的查询。在处理大规模数据时,索引结构可以加速数据检索和聚合查询。

4. 适合维度更新频繁的场景

ODS 层的数据源自多个系统,维度数据的更新可能非常频繁。Doris 的 Unique 模型允许在 ODS 层快速处理这种频繁更新的数据,从而保持数据的最新状态。

5. 简化数据清洗和预处理

在离线数仓的 ODS 层,数据清洗和预处理是重要的步骤。Unique 模型帮助通过去重和数据规范化简化这一过程,确保数据质量,并为后续 DWH 层(数据仓库层)或 DM(数据集市层)的构建奠定坚实基础。

6. 高并发性能

Doris 具有高性能的并行处理能力,支持高并发的数据写入和查询,这对于离线数据仓库中的批量数据处理非常重要。Unique 模型能够高效处理大规模数据的批量导入和更新操作。

7. 灵活的扩展性

ODS 层的数据量通常非常庞大,并且随时间逐步扩展。Doris 的分布式架构和 Unique 模型的设计,支持在集群中进行水平扩展,适应离线数仓 ODS 层的扩展需求。

作者 east
Hbase, Hive 9月 18,2024

写入Hbase报错CallQueueTooBigException的解决

在CDH6.3.2,通过hive外部表插入数据到hbase时报错:

24/09/18 10:43:04 ERROR status.SparkJobMonitor: Spark job[3] failed [INFO] 2024-09-18 10:43:04.156 – [taskAppId=TASK-41-192147-585089]:[127] – -> java.util.concurrent.ExecutionException: Exception thrown by job at org.apache.spark.JavaFutureActionWrapper.getImpl(FutureAction.scala:337) at org.apache.spark.JavaFutureActionWrapper.get(FutureAction.scala:342) at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:404) at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:365) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 8.0 failed 4 times, most recent failure: Lost task 5.3 in stage 8.0 (TID 85, cdh02, executor 8): java.lang.RuntimeException: Hive Runtime Error while closing operators: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 4 actions: CallQueueTooBigException: 4 times, servers with issues: cdh02,16020,1720579381747 at org.apache.hadoop.hive.ql.exec.spark.SparkReduceRecordHandler.close(SparkReduceRecordHandler.java:463) at org.apache.hadoop.hive.ql.exec.spark.HiveReduceFunctionResultList.closeRecordProcessor(HiveReduceFunctionResultList.java:67) at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:96) at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 4 actions: CallQueueTooBigException: 4 times, servers with issues: cdh02,16020,1720579381747 at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:198) at org.apache.hadoop.hive.ql.exec.FileSinkOperator.closeOp(FileSinkOperator.java:1058) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:686) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:700) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:700) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:700) at org.apache.hadoop.hive.ql.exec.spark.SparkReduceRecordHandler.close(SparkReduceRecordHandler.java:447) … 17 more Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 4 actions: CallQueueTooBigException: 4 times, servers with issues: cdh02,16020,1720579381747 at org.apache.hadoop.hbase.client.BatchErrors.makeException(BatchErrors.java:54) at org.apache.hadoop.hbase.client.AsyncRequestFutureImpl.getErrors(AsyncRequestFutureImpl.java:1226) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doFlush(BufferedMutatorImpl.java:309) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:241) at org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat$MyRecordWriter.close(HiveHBaseTableOutputFormat.java:130) at org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat$MyRecordWriter.close(HiveHBaseTableOutputFormat.java:168) at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:195)

从错误日志来看,问题出现在通过Hive向HBase写入数据时,具体错误是CallQueueTooBigException,即HBase的请求队列太大,导致数据写入失败。

错误分析:

  1. CallQueueTooBigException:这是HBase的一种负载保护机制。当HBase的RPC队列过载,达到最大处理能力时,会抛出此异常,表示服务器无法处理更多的请求。这通常意味着HBase服务器在写入期间负载过重。
  2. Spark/Hive 与 HBase交互问题:在通过Hive和Spark写入HBase时,数据可能被并行地、大批量地发送给HBase。如果HBase负载过高或者写入并发量过大,可能会出现请求堆积,导致CallQueueTooBigException。
  3. 重试机制:日志中提到”RetriesExhaustedWithDetailsException”,说明Spark在多次重试之后仍然无法成功完成写入操作,导致任务最终失败。

可能的原因:

  1. HBase负载过大:HBase集群可能承受了过多的请求,导致队列超载。
  2. HBase资源不足:HBase服务器的硬件资源(如内存、CPU等)不足,无法处理高并发写入请求。
  3. 写入数据量过大:Hive通过Spark向HBase写入的数据量太大,超过了HBase的处理能力。
  4. 不合理的HBase配置:HBase的写入配置可能设置得过低,例如hbase.regionserver.handler.count(处理器数量)、hbase.rpc.timeout(RPC超时设置)等参数未合理配置。

解决方案:

  1. 调优HBase集群:
    • 增加HBase的处理能力:
      • 增加HBase的RegionServer实例数量。
      • 调整HBase集群的硬件配置,增加内存和CPU资源。
    • 调整HBase配置:
      • 增加hbase.regionserver.handler.count,该参数控制RegionServer可以同时处理的并发请求数量。
      • 调整hbase.ipc.server.max.callqueue.size,增大HBase RPC队列的大小以处理更多的并发请求。
      • 增加hbase.rpc.timeout的值,避免超时过早导致任务失败。
  2. 减少数据写入压力:
    • 批量写入控制:通过Spark作业配置控制每次写入的数据批量大小,减少单次请求的数据量。可以调整Spark配置参数,如spark.sql.shuffle.partitions,以减少分区数。
    • 限制并发写入:如果数据量特别大,可以控制并发写入HBase的作业数量,减小每次写入的压力。
  3. 重试机制配置:
    • 如果错误为临时性问题,可以在HBase客户端或Spark作业中增加重试次数,例如调整hbase.client.retries.number参数,确保在负载压力下仍然有更大的重试机会。
  4. 观察HBase监控日志:使用HBase的监控工具如HBase Web UI或者通过Ganglia等监控工具,观察HBase的RegionServer、请求队列、内存等资源使用情况,找出具体的瓶颈。

作者 east
doris, Hive 9月 18,2024

Doris在离线数仓代替hive的理由

Apache Doris 是一个现代化的MPP(Massively Parallel Processing)数据库,特别适合数据分析和在线查询场景。Doris 在某些情况下可以替代 Hive 作为离线数仓,主要是因为以下几个关键原因:

1. 查询性能

  • 高效的查询执行:Doris 专为高并发和低延迟的查询设计,支持秒级查询响应,特别适合高性能的实时分析需求。
  • 列存储格式:Doris 使用列式存储,能够在查询时只扫描需要的列,大幅减少 I/O,优化了查询性能。相比之下,Hive 的查询速度通常较慢,尤其是对大规模数据集的多维分析。
  • 向量化执行引擎:Doris 引入了向量化执行引擎,这种设计能高效利用 CPU 资源,在批处理数据时能加快计算速度。而 Hive 在执行复杂查询时,性能可能受到较大的开销影响。

2. 支持实时数据加载和更新

  • Doris 支持 流式数据导入,能够做到近实时的数据更新,这对于需要处理不断更新的业务数据的离线数仓场景非常有用。
  • Hive 通常依赖批处理方式更新数据,延迟较高,不能很好地处理实时数据需求。

3. 易用性和生态

  • SQL 兼容性高:Doris 提供丰富的 SQL 语法支持,接近 ANSI SQL 标准,开发人员可以轻松上手,不需要学习新的查询语言。
  • 操作简便:Doris 集成度高,操作简单,支持集群自动容错、自动扩展、负载均衡等功能。而 Hive 的操作复杂度较高,通常依赖 Hadoop 生态中的多个组件(如 YARN、HDFS 等),维护难度较大。
  • 轻量级部署:Doris 的架构更加轻量,不依赖像 Hadoop 这样复杂的集群,降低了基础设施和运维成本。

4. 支持复杂分析场景

  • 多维分析:Doris 支持类似 OLAP(Online Analytical Processing)的多维数据分析,适合进行复杂的聚合和多维度的数据查询。通过并行处理和多种索引机制,Doris 可以在大规模数据场景下快速响应复杂查询需求。
  • Hive 的局限性:Hive 主要是批处理工具,虽然它支持复杂查询,但执行时间通常较长,尤其在需要多维分析和实时交互式查询时表现较差。

5. 数据模型和压缩

  • 自动分区和索引:Doris 支持自动分区和多种索引方式,如倒排索引、位图索引等,极大优化了数据查询的效率。相比之下,Hive 的分区管理较为复杂且手动操作较多。
  • 压缩和存储优化:Doris 采用高效的压缩算法,降低了存储成本。与之相比,Hive 对于数据的存储优化较为有限,依赖底层的 HDFS 文件系统,压缩和解压缩需要额外配置和操作。

6. 高可扩展性和分布式架构

  • Doris 拥有强大的分布式架构,支持大规模集群扩展,能够处理海量数据并保持良好的性能表现。
  • Hive 依赖 Hadoop 分布式文件系统(HDFS),尽管具备扩展性,但性能受限于 Hadoop 集群的配置和调度方式。

7. 统一的存算架构

  • Doris 提供存储和计算紧密耦合的架构,这让数据读取和处理变得更加高效。Hive 则是存算分离的架构,往往需要依赖外部的计算引擎(如 Tez 或 Spark)来执行查询,这带来了额外的计算和数据调度开销。

8. 社区和企业支持

  • Doris 拥有活跃的开源社区,并且在国内外的大型互联网公司中得到了广泛应用。它的发展速度快,更新频繁,支持企业级的使用场景。
  • Hive 作为 Hadoop 生态的一部分,虽然历史悠久,但其发展较为缓慢,主要适用于传统的批处理场景。

总结

Doris 在替代 Hive 作为离线数仓的过程中,主要依赖于其高性能查询、实时数据处理、易用性和更适合复杂多维分析的优势。Doris 对于需要实时响应的数仓场景或大规模多维分析的企业特别具有吸引力,而 Hive 由于其批处理的特性,更适合传统的离线批量计算场景。在新兴的数仓建设中,Doris 是更为现代化的选择。

作者 east
doris, Flink 9月 18,2024

流批一体化,Flink+Doris取代Spark+Hive来开发离线数仓

Flink + Doris 可以作为替代方案,用于开发离线数仓,替代 Spark + Hive 组合。这个架构能够实现更高的性能和实时性,同时简化了架构的复杂性。让我们详细分析一下 Flink + Doris 取代 Spark + Hive 的优点及适用场景:

1. 实时性与流处理能力

  • Flink 的优势:Flink 是一个分布式流处理框架,支持事件驱动和批处理模式,特别擅长处理实时数据流和复杂事件处理(CEP)。Flink 能够以低延迟处理海量数据,确保数据能够迅速进入下游系统。
  • Doris 的实时写入和分析:Doris 支持实时数据加载,能够高效处理来自 Flink 的流式数据,提供秒级查询的响应能力,适合实时数据分析场景。

Spark + Hive 的组合主要用于批处理,Hive 本身不擅长处理实时数据,Spark 的流处理能力也不如 Flink 专注且强大。因此,Flink + Doris 更适合有实时数据需求的场景,尤其是在需要低延迟的业务数据分析和展示时。

2. 查询性能和并发处理

  • Doris 的高效查询:Doris 作为一个专为分析设计的 MPP 数据库,擅长处理大规模数据查询,支持高并发的 SQL 查询并提供低延迟响应,适合在线分析型应用(OLAP)。
  • 列式存储和向量化执行:Doris 使用列式存储和向量化执行引擎,在处理复杂查询时效率更高,能够大幅缩短查询时间。相比之下,Hive 在执行复杂查询时较慢,尤其是基于 HDFS 的查询,依赖磁盘 I/O,性能较差。

Spark + Hive 组合通常需要大量的计算资源来执行批量查询任务,且在查询复杂度上性能不如 Doris。

3. 简化架构与运维成本

  • Flink 和 Doris 的简洁架构:Flink 与 Doris 都具备较高的集成度和易用性,且不依赖于像 Hadoop 这样的复杂生态系统。Flink + Doris 的组合能够简化系统架构,减少数据流动中的延迟,且维护成本相对较低。
  • 轻量级部署:Doris 作为轻量级 OLAP 数据库,部署和运维都较为简单,不需要 Hadoop 的支持。相比之下,Hive 依赖于 Hadoop 生态系统,需要更多的组件(如 HDFS、YARN 等)来保证其工作,这增加了架构的复杂度和运维难度。

Spark + Hive 架构复杂,需要更多的组件支持,部署和维护较为繁琐,Flink + Doris 在这一方面更具优势。

4. 批处理和流处理的统一

  • Flink 批流一体化:Flink 提供了统一的编程模型,能够同时处理批处理和流处理任务,使得离线数仓架构能够更灵活地应对各种数据处理需求。
  • 批处理能力:尽管 Flink 主要以流处理见长,但它在批处理方面的表现也相当出色。通过批流一体化架构,开发人员可以更高效地处理历史数据和实时数据,极大简化了数据处理的开发和运维。

Spark + Hive 则主要关注批处理任务,对于流数据的处理能力相对弱一些。Spark 的流处理框架(Spark Streaming)比 Flink 在复杂流处理上的能力有限。

5. 灵活的数据集成

  • 数据集成与传输:Flink 可以轻松集成各种数据源,包括 Kafka、文件系统、数据库等。它可以将流式和批量数据统一处理后,通过 Doris 实现实时分析和查询。
  • Doris 的多种导入方式:Doris 支持多种数据导入方式,能够高效地处理 Flink 输出的数据流(例如通过 HTTP、Broker、Stream Load 等方式),这使得两者之间的集成非常顺畅。

Spark + Hive 在数据集成的实时性上不如 Flink + Doris。Spark 处理数据后通常还需要依赖 Hive 进行存储和管理,数据查询和更新的延迟较高。

6. 高可扩展性

  • 分布式处理:Flink 作为流处理框架具备出色的可扩展性,能够处理大规模的数据流。Doris 也是一个分布式架构,能够扩展到数百个节点,适应大规模数仓需求。
  • 集群管理:Flink 和 Doris 都支持分布式集群管理,能够根据业务需求动态扩展计算和存储能力。

Spark + Hive 也具有可扩展性,但其扩展性受 Hadoop 生态的限制,复杂性更高。

适用场景

Flink + Doris 适用于需要实时数据处理、高性能查询以及复杂多维分析的场景,如:

  • 实时数据流分析(用户行为分析、监控告警系统)
  • 实时数据仓库(T+0 数据仓库)
  • 多维度的在线查询(报表系统、BI 工具)
  • 需要兼顾批处理和流处理的场景

Spark + Hive 更适用于需要处理大规模离线批量数据且对实时性要求不高的传统数仓场景。

作者 east
mysql, 海豚调度器 9月 18,2024

解决Sqoop从mysql导出数据到hive结果为空时的报错

在海豚调度器1.3.5,用Sqoop从hive导入数据到mysql,有时由于计算结果为空,导致hive的表当天分区的数据为空,Sqoop导出到mysql时报错。海豚调度器设置了失败继续策略也没用,导致后面的工作流没办法继续执行下去。

可以在执行sqoop之前增加判断当天分区的数量是否为空,如果为空就不执行导出。

# 检查分区是否为空
empty=$(hive -S -e "SELECT COUNT(*) FROM cnsaas.ads_bigdata_iot_data WHERE dt='$yesday_date';" | awk '{print $1}')

if [ "$empty" -eq 0 ]; then
  echo "分区为空,不执行 Sqoop 导出。"
else
  echo "分区非空,执行 Sqoop 导出。"

完整的sqoop脚本如下:

#!/bin/bash

# 定义变量
host="your_host"
user="your_user"
pwd="your_password"
yesday_date=$(date -d "yesterday" +%Y-%m-%d)

# 检查分区是否为空
empty=$(hive -S -e "SELECT COUNT(*) FROM cnsaas.ads_bigdata_iot_data WHERE dt='$yesday_date';" | awk '{print $1}')

if [ "$empty" -eq 0 ]; then
  echo "分区为空,不执行 Sqoop 导出。"
else
  echo "分区非空,执行 Sqoop 导出。"
  sqoop export \
    --connect jdbc:mysql://$host:3306/zgcn?characterEncoding=UTF-8 \
    --username $user \
    --password $pwd \
    -m 1 \
    --table ads_bigdata_iot_data\
    --columns stat_date,data_type,stat_type,cu_pid_system_code,max_value,min_value,avg_value \
    --fields-terminated-by '\001' \
    --update-key stat_date,data_type,stat_type,cu_pid_system_code \
    --update-mode allowinsert \
    --input-null-string '\\N' \
    --input-null-non-string '\\N' \
    --null-string '\\N' --null-non-string '\\N' \
    --export-dir /user/hive/warehouse/cnsaas.db/ads_bigdata_iot_data/dt=$yesday_date/*
fi
作者 east
python 9月 18,2024

selenium解决调用Chrome str’ object has no attribute ‘capabilities’ Process finished

使用selenium调用chrome进行自动化测试,时不时报错:

原来代码如下:

path_to_chromedriver = 'C:/Program Files (x86)/Google/Chrome/Application/chromedriver.exe'
5driver = webdriver.Chrome(executable_path=path_to_chromedriver)

报错如下:

上面的代码修改后报错:Traceback (most recent call last): File “D:\program files\Python\Python39\lib\site-packages\selenium\webdriver\common\driver_finder.py”, line 38, in get_path path = SeleniumManager().driver_location(options) if path is None else path File “D:\program files\Python\Python39\lib\site-packages\selenium\webdriver\common\selenium_manager.py”, line 76, in driver_location browser = options.capabilities[“browserName”] AttributeError: ‘str’ object has no attribute ‘capabilities’ During handling of the above exception, another exception occurred: Traceback (most recent call last): File “D:\newcode\pythonProject\gpt\doubao_csdn.py”, line 130, in <module> reader = FunnyScriptsReader(directory) File “D:\newcode\pythonProject\gpt\doubao_csdn.py”, line 21, in __init__ self.driver = webdriver.Chrome(“executable_path=C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe”) File “D:\program files\Python\Python39\lib\site-packages\selenium\webdriver\chrome\webdriver.py”, line 45, in __init__ super().__init__( File “D:\program files\Python\Python39\lib\site-packages\selenium\webdriver\chromium\webdriver.py”, line 51, in __init__ self.service.path = DriverFinder.get_path(self.service, options) File “D:\program files\Python\Python39\lib\site-packages\selenium\webdriver\common\driver_finder.py”, line 40, in get_path msg = f”Unable to obtain driver for {options.capabilities[‘browserName’]} using Selenium Manager.” AttributeError: ‘str’ object has no attribute ‘capabilities’ Process finished with exit code 1

在最新版本的Selenium中,推荐的做法是使用 service 模块,并且不再推荐使用 executable_path。因此,更好的实践是创建一个 Service 实例,,并传入 ChromeDriver 的路径:(
顺利解决了上面的问题 )


from selenium import webdriver
from selenium.webdriver.chrome.service import Service

# 替换为实际的ChromeDriver路径
path_to_chromedriver = ‘C:/Program Files (x86)/Google/Chrome/Application/chromedriver.exe’
service = Service(executable_path=path_to_chromedriver)

# 创建WebDriver实例
driver = webdriver.Chrome(service=service)

作者 east
Flink 9月 14,2024

Flink中的窗口与传统数据库中的窗函数有何不同?

Flink中的窗口与传统数据库中的窗函数主要有以下几个方面的不同:

  1. 实时处理与批处理的差异:Flink是专为实时数据流处理设计的,其窗口机制能够处理无限数据流,并支持事件时间和处理时间的概念。相比之下,传统数据库中的窗函数通常用于批处理,处理的是有界数据集。 
  2. 窗口类型的多样性:Flink提供了多种窗口类型,包括滚动窗口、滑动窗口、会话窗口和全局窗口,这些窗口可以根据时间或计数来定义。而传统数据库中的窗函数通常较为基础,主要是基于时间的窗口聚合。 
  3. 窗口函数的实现:Flink中的窗口函数不仅支持全量聚合,还支持增量聚合,后者在性能上更为优越,特别是在处理大规模数据流时。此外,Flink的窗口函数可以与触发器结合使用,以控制窗口的计算时机。 
  4. 时间语义的明确性:Flink的窗口函数在处理事件时间时提供了明确的时间语义,这对于确保实时数据分析的准确性至关重要。而传统数据库中的窗函数通常不涉及事件时间的概念。 
  5. 窗口的动态创建:Flink中的窗口是动态创建的,只有当窗口内的数据到达时才会创建相应的窗口,这有助于优化内存使用和计算资源。 
作者 east

上一 1 2 3 4 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.