gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Kafka

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Kafka"
Kafka, 运维 6月 27,2024

CDH6.3.2一台服务器宕机后kafka集群无法选举leader

cdh6.3.2集群有一台服务器宕机了,重新恢复后,发现kafka集群无法正常启动,报错日志如下,其中 TopicRunData 是kafka消费的topic。

1、错误分析

[Controller id=469 epoch=67] Controller 469 epoch 67 failed to change state for partition TopicRunData-2 from OfflinePartition to OnlinePartition kafka.common.StateChangeFailedException: Failed to elect leader for partition
TopicRunData -2 under strategy OfflinePartitionLeaderElectionStrategy at kafka.controller.PartitionStateMachine$$anonfun$doElectLeaderForPartitions$3.apply(PartitionStateMachine.scala:390) at kafka.controller.PartitionStateMachine$$anonfun$doElectLeaderForPartitions$3.apply(PartitionStateMachine.scala:388) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.controller.PartitionStateMachine.doElectLeaderForPartitions(PartitionStateMachine.scala:388) at kafka.controller.PartitionStateMachine.electLeaderForPartitions(PartitionStateMachine.scala:315) at kafka.controller.PartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:225) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:141) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:123) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:109) at kafka.controller.KafkaController.kafka$controller$KafkaController$$onBrokerStartup(KafkaController.scala:382) at kafka.controller.KafkaController$BrokerChange$.process(KafkaController.scala:1318) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:94) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:94) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:94) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:93) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

从提供的错误信息来看,Kafka Broker中存在4个离线分区(offline partitions),并且在尝试将其中一个分区
TopicRunData -2从OfflinePartition状态转换到OnlinePartition状态时失败了。具体原因是未能为该分区选举出领导者(leader)。这个异常是由kafka.common.StateChangeFailedException引发的,并指出在使用OfflinePartitionLeaderElectionStrategy策略下无法选出分区领导者。

结合上面的场景, 这是由于服务器宕机造成分区的数据不完整或元数据损坏 。

使用kafka-topics.sh命令检查主题状态:

kafka-topics.sh --describe --topic TopicRunData --bootstrap-server cdh01:9092

看到信息如下:

Topic:TopicRunData	PartitionCount:3	ReplicationFactor:1	Configs:min.insync.replicas=1,segment.bytes=1073741824,retention.ms=604800000,max.message.bytes=1000000,min.cleanable.dirty.ratio=0.5,unclean.leader.election.enable=false,retention.bytes=-1,delete.retention.ms=604800000
Topic: TopicRunData	Partition: 0	Leader: 299	Replicas: 299	Isr: 299
Topic: TopicRunData	Partition: 1	Leader: 384	Replicas: 384	Isr: 384
Topic: TopicRunData	Partition: 2	Leader: none	Replicas: 298	Isr:

这意味着每个分区的数据只在一个broker上保存,没有副本。这样如果该broker发生故障,相应的分区数据将不可用,从而影响到数据的高可用性。通常建议至少设置Replication Factor为3以确保高可用。 可惜之前由于存储压力等原因只有1个副本。

分区2显示没有Leader(Leader: none),且ISR(In-Sync Replicas)列表为空。这表明分区2目前处于未分配状态,可能是由于负责该分区的broker(Replicas: 298)出现故障或者与ZooKeeper的通信出现问题。这种情况下,该分区的数据无法被消费或生产

2、手动选举leader

为
TopicRunData 主题中无leader的分区手动分配并重新选举leader。执行以下命令:

kafka-preferred-replica-election.sh --bootstrap-server cdh01:9092 --path-to-json-file partition.json

创建一个名为partition.json的文件,其中包含以下内容:

{
  "partitions": [
    {
      "topic": "TopicRunData",
      "partition": 2
    }
  ]
}

这将触发platformMutiRunData主题第2分区的leader重新选举。

partition.json 有可能需要更多参数,请根据实际情况调整或找更详细的教程。

3、清空Topic来解决问题

由于无法手动选择,解决时间又紧迫,根据分析可能丢失数据少,所以想清空TopicRunData 主题的数据,从外部重新导入数据到 TopicRunData 。

注意:这将删除
TopicRunData 主题的所有数据。在执行此操作之前,请确保您了解此操作的后果,并备份好相关数据。

清空 TopicRunData 主题的数据:

kafka-topics.sh --delete --topic
TopicRunData --bootstrap-server cdh01:9092

然后重新创建该主题(如果需要):

kafka-topics.sh --create --topic
TopicRunData --bootstrap-server cdh01:9092 --replication-factor 1 --partitions 3

检查Kafka集群状态:

kafka-consumer-groups.sh --bootstrap-server cdh01:9092 --describe --group

your_consumer_group请将your_consumer_group替换为您要检查的实际消费者组ID。

确保所有Kafka broker正常运行。

作者 east
Kafka 5月 18,2023

2020 年精选:Kafka 优化:四个最佳实践

2020 年不会成为十年来最好的年份之一,但我们可以肯定地说,至少有一件好事发生了:Kafka 优化的四个最佳实践的总结。这篇博文最初发表于 5 月,并迅速成为我们读者的最爱。鉴于最佳实践至今仍然有效,我们想再次强调它们,以圆满结束这一年。阅读并享受它。

阿帕奇卡夫卡很棒。它允许创建易于扩展的实时、高吞吐量、低延迟数据流。优化的 Kafka 性能还带来其他好处,例如抵抗集群内发生的机器/节点故障以及集群上数据和消息的持久化。 Kafka 框架的性能优化应该是重中之重。

但优化是一项复杂的工作。优化您的 Apache Kafka 部署可能是一个挑战,因为分布式架构有很多层,并且可以在这些层内调整参数。

例如: 通常,具有自动数据冗余的高吞吐量发布-订阅 (pub/sub) 模式是一件好事。但是,当您的消费者努力跟上您的数据流,或者如果他们无法阅读消息,因为这些消息在消费者到达它们之前就消失了,那么就需要做一些工作来支持消费应用程序的性能需求。

Kafka 优化是一个广泛的主题,可以非常深入和精细,但这里有一些关键的最佳实践可以帮助您入门:

这听起来可能非常明显,但您会惊讶于有多少人使用旧版本的 Kafka。一个非常简单的 Kafka 优化举措是升级并使用最新版本的平台。您必须确定您的客户是否使用旧版本的 Kafka(0.10 或更早版本)。如果是,他们应该立即升级。

最新版本的 Kafka(版本 0.8x)附带 Apache ZooKeeper,主要用于协调消费者群体。使用过时版本的 Kafka 会导致重新平衡运行时间过长以及重新平衡算法失败。

优化 Apache Kafka 部署是优化平台堆栈层的练习。分区是吞吐量性能所基于的存储层。每个分区的数据速率是消息的平均大小乘以每秒消息数。简而言之,它是数据通过分区的速率。所需的吞吐率决定了分区的目标架构。

解决方案架构师希望每个分区都支持相似的数据量和吞吐率。实际上,数据速率会随着时间的推移而变化,生产者和消费者的原始数量也会随之变化。

可变性带来的性能挑战是消费者滞后的可能性,也就是消费者读取率落后于生产者写入率。随着 Kafka 环境的扩展,随机分区是一种有效的方法,可确保您不会在不必要地尝试将静态定义应用于移动性能目标时引入人为瓶颈。

分区领导通常是通过由 Zookeeper 维护的元数据进行简单选举的产物。然而,领导选举并没有考虑到各个分区的性能。根据您的 Kafka 发行版,可以利用专有的平衡器,但由于缺乏此类工具,随机分区提供了最不干涉的路径来平衡性能。

外卖?在写入主题时坚持随机分区,除非体系结构要求另有要求。

在较旧的 Kafka 版本中,参数 receive.buffer.bytes 默认设置为 64kB。在较新的 Kafka 版本中,参数为 socket.receive.buffer.bytes,默认为 100kB。

这对 Kafka 优化意味着什么?对于高吞吐量环境,这些默认值太小,因此不够用。当代理和消费者之间的网络带宽延迟乘积大于 LAN(局域网)时,情况就很明显了。

如果您的网络以 10 Gbps 或更高的速度运行并且延迟为 1 毫秒或更长,建议您将套接字缓冲区调整为 8 或 16 MB。如果内存有问题,请考虑 1 MB。

优化 Apache Kafka 部署是一项持续的工作,但这五个最佳实践应该是一个坚实的开始。上面提到的性能优化技巧只是用户可以实施以提高 Kafka 性能的一些优化方法。 Kafka 越来越受到应用程序开发人员、IT 专业人员和数据管理人员的欢迎。并且有充分的理由。查看我们的其他资源,其中详细讨论了 Kafka 应用于应用程序开发和数据管理的特定领域时的最佳实践。

已经在使用 Kafka 了吗?使用 Pepperdata Streaming Spotlight 监控和改进其性能。

回顾一下,我们建议您升级到最新版本的 Kafka。这是一件小事,但可以发挥重要作用。接下来,是确保您了解数据吞吐率。除非架构需求另有要求,否则我们建议您在写入主题时选择随机分区。如果你想实现高速摄取,调整消费者套接字缓冲区。我们希望您喜欢这篇 2020 年最佳博文,其中重点介绍了我们为 Kafka 优化推荐的最佳实践。

作者 east
Kafka 5月 16,2023

Kafka Streams 最佳实践:今天要尝试的 3 个

Kafka Streams 最好定义为专门为构建应用程序和微服务而设计的客户端库。考虑 Kafka 流的一种简洁方法是将其视为一种消息服务,其中数据(以消息的形式)在 Kafka 集群中从一个应用程序传输到另一个应用程序,从一个位置传输到另一个仓库。所有输入和输出数据都存放在 Apache Kafka 集群中。

为了形象化,这是 Kafka 环境中数据传输的样子:数据请求(消费者,用 Kafka 术语来说)被创建并发送到另一端。在这里,生产者响应请求生成内容,并通过 Kafka 架构将其直接传送回消费者,消费者随后消费信息。

Kafka Streams 是开发人员非常流行的工具,主要是因为它可以处理从消费者到生产者的数百万个请求,并将它们分布在数十台服务器上,以确保快速和连续传输,同时保持准确性。相反,该平台可以将大量数据从生产者转移到消费者,同时保证消费者实际上能够以他们需要的速度和他们需要的顺序消费数据。

但人们真正喜欢 Kafka Streams 的地方在于数据流没有停顿。大规模实时数据处理对于许多应用程序来说至关重要。 (没有它,Facebook 或 Uber 就会有大麻烦。)Kafka 不间断地传输数据,这与使用过时遗留应用程序的传统环境不同。 Kafka Streams 可在其集群内实现无阻碍的数据流,确保数据在来回循环中从一个应用程序传输到另一个应用程序,从消费者传输到生产者,中间没有任何停顿。其内置冗余可确保数据在传输过程中不会丢失,并完好无损地到达预定目的地。

所以:Kafka Streams 很棒。 Kafka Streams 为任何应用程序开发项目带来的优势直接证明了该平台越来越受欢迎和无处不在。但要真正从 Kafka Streams 中获得最大价值,您需要将一些最佳实践应用于底层 Kafka 平台:

如果你想优化 Kafka Streams 环境中的数据流,你需要跟踪很多事情。其中之一是速度,因为它涉及:

速度是一个关键组成部分。 Kafka Streams 可以有效地促进数据在集群内的移动。但是,如果消息移动的速度对您的应用程序来说不够快,则可能意味着麻烦。确保数据和消息以您需要的速度移动。

要完全优化 Kafka Streams,您的架构必须使用适量的资源构建,以便它获得并保持必要的数据流速度以实现其目标。简而言之,您需要解决以下问题:

必须构建消息路由以满足您的应用程序要求并且吞吐量要令您满意。这是关于为工作而建造的。你不会想要一辆半卡车来运送比萨饼。同样的原则适用于比特,而不仅仅是原子。

如果 Kafka 开始表现不佳,问题可能出在 Kafka 指标上。但这也可能是另一个问题,例如硬盘驱动器或某些性能不佳的内存。团队需要能够尽可能快速有效地进行故障排除。

借助正确的大数据分析工具,您可以集中查看堆栈中的所有硬件、应用程序和监控指标,包括来自 Kafka Streams 的指标。您得到的是一个单一的统一界面,其中指标和消息传递相互关联。这使您可以查看哪个应用程序遇到问题以及它如何影响 Kafka 及其功能。

在大多数大数据监控设置中,硬件监控不同于大数据应用程序监控。将 Kafka 监控添加到图片中,您将拥有一个大的脱节监控环境。但是使用像 Pepperdata 这样的统一监控套件,您可以查看和跟踪所有内容。您对大数据堆栈享有绝对且无与伦比的可见性和可观察性。

Kafka 是一个丰富而复杂的平台。还有很多东西要学。但是 Kafka Streams 的这三个最佳实践是让您获得成功的强大基础。

作者 east
Kafka 5月 16,2023

从 Kafka 吞吐量指标中获得最大价值

Kafka 支持在系统之间快速、安全、高效地移动大量流数据。出于这个原因,它已成为我们大数据时代的强大工具,在这个时代,数据速度和安全性比以往任何时候都更加重要。

了解和优化您对 Kafka 吞吐量指标的使用是成功支持基于 Kafka 构建的用例(例如实时 Kafka 流和流分析框架)的重要组成部分。吞吐量与在给定时间范围内可以在系统或应用程序之间移动的数据量有关。它广泛用于衡量 RAM、硬盘驱动器、网络连接和互联网的性能。对于 Kafka,吞吐量仅与消息从一个点移动到另一个点的速度有关。

各种组件的性能影响 Kafka 集群内的整体吞吐量:生产者生产内容的速度有多快?代理如何处理消息的移动?消费者消费消息的速度有多快?所有这些因素都会影响吞吐量。衡量这些组件及其性能可帮助您形成一组基线数字。 Kafka JMX 指标是您确定 Kafka 集群是否以最佳方式运行的能力的基础。

一旦捕获了 JMX 指标,集群所有者和/或架构师就可以使用这些指标,从可视化这些指标到创建图表并最终收集洞察力。这种捕获、分析和获得可操作见解的过程几乎不可能手动解决。解决方案需要尽可能多地自动化流程,同时不代表 Kafka 平台所有者自己陡峭的学习曲线和上下文切换。

Kafka 为您提供 JMX 指标,但它们仅代表您评估和维护平台的健康和性能所需的数据的一个子集。要问的大问题是:单独使用 JMX 指标是否足以完成这项工作? (剧透:没有)。您能否将这些指标综合为关键决策的燃料?您能否使用这些 Kafka 吞吐量指标来提高集群的性能并避免因性能问题而感到惊讶?您将如何将基于 JMX 的性能数据与基于 Kafka 构建的平台硬件和应用程序的性能指标相关联?

作者 east
Kafka 5月 14,2023

Kafka有什么用?以及注意事项

如果您刚刚开始使用 Apache Kafka,您就会知道有很多东西需要学习和注意。卡夫卡有什么用?我如何充分利用它?这些可能只是您脑海中闪过的几个问题,而尝试在线搜索答案可能会让人不知所措。我们已经为您完成了研究,并将答案放在这里以便于访问。继续阅读以了解它的用途以及使用 Kafka 时应注意的事项。

Apache Kafka 是由 LinkedIn 创建的开源流处理软件平台,目前由 Apache Software Foundation 开发。应用程序开发人员、IT 专业人员和数据管理员只是使用 Kafka 的一部分人。

据 Apache 软件基金会称,超过 80% 的财富 100 强公司都在使用这项技术。以下是一些快速统计数据,可以直观地了解有多少 Kafka 用户:10/10 的制造公司、7/10 的银行、10/10 的保险公司和 8/10 的电信公司使用该技术。

阿帕奇卡夫卡文档

Kafka 用于快速摄取、移动和消耗大量数据。它允许创建易于扩展的实时、高吞吐量、低延迟数据流。由于这些原因,该平台在大数据领域可靠、快速且广为人知。

在用例方面,Kafka 可用于网站活动跟踪,提供操作跟踪数据、日志聚合、流处理、事件溯源,作为消息代理的替代品,以及作为分布式系统的外部提交日志。

举一个具体的例子,纽约时报曾一度使用 Kafka 来存储他们发表的每一篇文章。除此之外,他们还使用 Kafka 和 Streams API 将实时发布的内容提供给读者访问其内容所依赖的各种应用程序和系统。

在与我们的客户合作时,我们发现 Kafka 的成功始于确保您的平台得到优化。由于平台内有如此多的潜力,因此确保您能充分利用它是关键。这里有 4 个最佳实践——我们将在另一篇文章中深入探讨——我们建议在优化 Kafka 时:

1.升级到最新版本。

使用过时版本的 Kafka 会导致重新平衡运行时间过长以及重新平衡算法失败。确保您使用的是最新版本的 Kafka 可以防止这些平衡问题并确保您充分利用该框架。

2. 了解如何提高数据吞吐率。

Kafka 具有控制数据如何在堆栈中移动的设置。了解和调整这些设置是提高数据吞吐率和充分利用 Kafka 架构的第一步。

3. 在编写主题时坚持随机分区,除非体系结构要求另有要求。

Kafka 支持随机写入。在调整 Kafka 时,您可能会想要指定数据写入的位置。但是,在大多数情况下,随机写入会产生更好的性能。

4. 调整消费者套接字缓冲区以实现高速摄取,同时保持数据完整性。

要调整消费者套接字缓冲区,能力更强的网络可以支持更大的缓冲区大小。例如,10Gbps 网络可能需要高达 16MB 的套接字缓冲区。

作者 east
Kafka 5月 7,2023

Kafka 优化:四个最佳实践

Apache Kafka 是一个强大的工具。它允许创建易于扩展的实时、高吞吐量、低延迟数据流。优化后,Kafka 会带来其他好处,例如抵抗集群内发生的机器/节点故障以及集群上数据和消息的持久化。这就是 Kafka 优化如此重要的原因。

优化你的 Kafka 框架应该是一个优先事项。但是,可能很难知道究竟如何优化 Kafka。这就是为什么我们为您带来四个 Kafka 最佳实践,您可以实施这些最佳实践以充分利用该框架。

以下是四个基本的 Kafka 优化技巧:

您的 Kafka 部署可能是一个挑战,因为分布式架构有很多层,并且可以在这些层内调整许多参数。

例如,通常情况下,具有自动数据冗余的高吞吐量发布-订阅 (pub/sub) 模式是一件好事。但是,当您的消费者努力跟上您的数据流,或者如果他们无法阅读消息,因为这些消息在消费者到达它们之前就消失了,那么就需要做一些工作来支持消费应用程序的性能需求。

但是这四种基本的做法应该是你Kafka优化的基础。继续阅读以深入了解这些方法。

实现和维护 Kafka 部署需要持续监控。 Kafka 是一个强大的实时数据流框架。未能优化会导致流式传输缓慢和性能滞后。

Kafka 优化是一个广泛的主题,可以非常深入和精细,但这里有四个高度利用的 Kafka 最佳实践可以帮助您入门:

1.升级到最新版本的Kafka。

这听起来可能非常明显,但您会惊讶于有多少人使用旧版本的 Kafka。一个非常简单的 Kafka 优化举措是升级并使用最新版本的平台。您必须确定您的客户是否使用旧版本的 Kafka(0.10 或更早版本)。如果是,他们应该立即升级。

Kafka 每次更新都会略有变化。最新的 Kafka 版本于 2021 年 4 月发布,提供了 KIP-500 的早期访问版本,使用户即使没有 Apache ZooKeeper 也可以运行 Kafka 代理。这消除了对内部 Raft 实现的需要。其他变化包括支持每个集群更多的分区、更无缝的操作和更严格的安全性。

2. 了解数据吞吐率。

优化 Apache Kafka 部署是优化平台堆栈层的练习。分区是吞吐量性能所基于的存储层。

每个分区的数据速率是消息的平均大小乘以每秒消息数。简而言之,它是数据通过分区的速率。所需的吞吐率决定了分区的目标架构。

这是一个关键的 Kafka 优化技巧:为了提高吞吐量,您可以扩大请求中获取的最小数据量。这导致更少的请求。然后以更大的批次传递消息。这一点至关重要,尤其是在生成的数据量较少时。对 Kafka 吞吐量指标的广泛了解将帮助用户在这种情况下充分优化他们的 Kafka 系统。

3. 在编写主题时坚持随机分区,除非体系结构要求另有要求。

解决方案架构师希望每个分区都支持相似的数据量和吞吐率。实际上,数据速率会随着时间的推移而变化,生产者和消费者的原始数量也会随之变化。

可变性带来的性能挑战是消费者滞后的可能性,也就是消费者读取率落后于生产者写入率。随着 Kafka 环境的扩展,随机分区是一种有效的方法,可确保您不会在不必要地尝试将静态定义应用于移动性能目标时引入人为瓶颈。

分区领导通常是通过由 Zookeeper 维护的元数据进行简单选举的产物。然而,领导选举并没有考虑到各个分区的性能。

根据您的 Kafka 发行版,可以利用专有的平衡器。但由于缺少此类工具,随机分区提供了实现平衡性能的最不干涉途径。

这就是为什么随机分区是我们推荐的关键 Apache Kafka 最佳实践之一。它为消费者平均分配负载。因此,扩展消费者变得更加容易。当您使用默认分区程序而不手动识别特定分区或消息密钥时,这实际上会发生这种情况。随机分区最适合无状态或“令人尴尬的并行”服务。

外卖?在写入主题时坚持随机分区,除非体系结构要求另有要求。

4.调整consumer socket buffer,实现高速摄取。

在较旧的 Kafka 版本中,参数 receive.buffer.bytes 默认设置为 64kB。在较新的 Kafka 版本中,参数为 socket.receive.buffer.bytes,默认为 100kB。

这对 Kafka 优化意味着什么?对于高吞吐量环境,这些默认值太小,因此不够用。当代理和消费者之间的网络带宽延迟乘积大于 LAN(局域网)时,情况就很明显了。

当没有足够的磁盘时,线程会变慢并变得有限。 Apache Kafka 最重要的最佳实践之一是增加网络请求缓冲区的大小。这样做将帮助您提高吞吐量。
如果您的网络以 10 Gbps 或更高的速度运行并且延迟为 1 毫秒或更长,建议您将套接字缓冲区调整为 8 或 16 MB。如果内存有问题,请考虑 1 MB。

优化 Apache Kafka 部署是一项持续的工作,但 Kafka 的这四个最佳实践应该是一个坚实的开始。上面提到的性能优化技巧只是用户可以实施以提高 Kafka 性能的一些优化方法。

Kafka 越来越受到应用程序开发人员、IT 专业人员和数据管理人员的欢迎。并且有充分的理由。有关 Kafka 的更多信息,请查看我们的另一篇博文,其中讨论了将 Kafka 应用于应用程序开发和数据管理的特定领域时的最佳实践。

作者 east
bug清单, Kafka, Spark 6月 10,2021

运行 Spark Streaming出现”Could not find KafkaClient entry in the JAAS configuration”

在使用FusionInsight HD大数据平台,用Spark Streaming来处理数据接入,kafka作为消费者,运行程序时出现”Could not find KafkaClient entry in the JAAS configuration”,当时怀疑是FusionInsight HD 的客户端相关配置有问题。

采用替换法思维,在另一台已经验证 FusionInsight HD 的客户端没问题的服务上运行,果然这个问题没做出现,仔细对比了这2台服务FusionInsight HD 的客户端 的配置,发现在spark配置文件(hd安装目录/Spark2x/spark/conf/Jaas.conf)要修改为下面的配置:

KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
principal="大数据平台账号"
useTicketCache=false
keyTab="user.keytab的路径"
storeKey=true;
};

作者 east
Kafka, Spark 3月 3,2021

SparkStreaming Direct方式读取kafka优缺点及示例(Redis保存offset)

在Spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有Receiver这一层,其会周期性地获取Kafka中每个topic(主题)的每个partition(分区)中的最新offsets(偏移量),之后根据设定的maxRatePerPartition来处理每个batch。其形式如下图所示。

这种方法相较于Receiver方式的优势在于:

● 简化的并行。Direct方式中,Kafka中的partition与Spark内部的partition是一一对应的,这点使得我们可以很容易地通过增加Kafka中的partition来提高数据整体传输的并行度,而不像Receiver方式中还需要创建多个Receiver然后利用union再合并成统一的Dstream。

● 高效。Direct方式中,我们可以自由地根据offset来从Kafka中拉取想要的数据(前提是Kafka保留了足够长时间的数据),这对错误恢复提供了很好的灵活性。然而在Receiver的方式中,还需要将数据存入Write Ahead Log中,存在数据冗余的问题。

● 一次性接收精确的数据记录Direct方式中我们直接使用了低阶Kafka的API接口,offset默认会利用Spark Steaming的checkpoints来存储,同样也可以将其存到数据库等其他地方。然而在Receiver的方式中,由于使用了Kafka的高阶API接口,其默认是从ZooKeeper中拉取offset记录(通常Kafka取数据都是这样的),但是Spark Streaming消费数据的情况和ZooKeeper记录的情况是不同步的,当程序发生中断或者错误时,可能会造成数据重复消费的情况。

不同于Receiver的方式,是从Zookeeper中读取offset值,那么自然Zookeeper就保存了当前消费的offset值,如果重新启动开始消费就会接着上一次offset值继续消费。而在Direct的方式中,是直接从Kafka来读数据,offset需要自己记录,可以利用checkpoint、数据库或文件记录或者回写到ZooKeeper中进行记录。这里我们给出利用Kafka底层API接口,将offset及时同步到ZooKeeper的通用类中。下面示范用redis保存offset

object Demo {


  val IP_RANG: Array[String] = "91,92,93,94,95".split(",")
  val PORT_RANG: Array[String] = "22420,22421,22422,22423,22424,22425,22426,22427".split(",")
  val hosts = new util.HashSet[HostAndPort]()

  val sdf:SimpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")

  def main(args: Array[String]) {

      val Array(checkPointDir, topic, brokers, groupId, cf, offset, dw_all_tn, dw_track_tn, dw_unique_tn, batchIntervel) = args

      login

      val client: JedisCluster = new JedisCluster(hosts, 5000)


      var topicPartitions: Map[TopicPartition, Long] = Map()

      if (client.exists(topic)) {
        val offsetMap: util.Map[String, String] = client.hgetAll(topic)
        val iterator: util.Iterator[String] = offsetMap.keySet().iterator()
        while (iterator.hasNext) {
          val key: String = iterator.next()
          val value: String = offsetMap.get(key)
          println(key + "------" + value)
          topicPartitions += (new TopicPartition(topic, key.toInt) -> value.toLong)
        }
      }
      client.close()

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> brokers,
        "value.deserializer" -> classOf[StringDeserializer],
        "key.deserializer" -> classOf[StringDeserializer],
        "group.id" -> groupId,
        "security.protocol" -> "SASL_PLAINTEXT",
        "sasl.kerberos.service.name" -> "kafka",
        "auto.offset.reset" -> offset,
        "kerberos.domain.name" -> "hadoop.hadoop.com",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )


      def functionToCreateContext(): StreamingContext = {

        //      val topicArr = topic.split(",")
        //      val topicSet = topicArr.toSet


        val locationStrategy = LocationStrategies.PreferConsistent
        //      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)

        val sparkConf: SparkConf = new SparkConf().setAppName("jingyi_xn_dw_all&track")

        val ssc = new StreamingContext(sparkConf, Seconds(batchIntervel.toInt))
        //      if (!"nocp".equals(checkPointDir)) {
        //        ssc.checkpoint(checkPointDir)
        //      }


        val config = HBaseConfiguration.create()
        val hbaseContext = new HBaseContext(ssc.sparkContext, config)

        val stream = KafkaUtils.createDirectStream[String, String](ssc,
          locationStrategy,
          //        consumerStrategy
          ConsumerStrategies.Assign[String, String](topicPartitions.keys.toList, kafkaParams, topicPartitions)
        )
    }
}

def setRedisHost: Unit ={
    for (host <- IP_RANG) {
      for (port <- PORT_RANG) {
        hosts.add(new HostAndPort("192.68.196." + host, port.toInt))
      }
    }
  }

       
作者 east
Kafka, Spark 2月 21,2021

spark和kafka在数据流处理对比

2625 / 5000

在对Spark Streaming和Kafka Streaming进行比较并得出何时使用哪个比较之前,让我们首先对Data Streaming的基础知识有一个清晰的了解:它是如何出现的,流是什么,如何运行,其协议和用例。 。 数据流如何诞生? 从那时起,数据一直是操作的重要组成部分。数据构成了整个操作结构的基础,其中数据被进一步处理以在系统的不同实体模块中使用。这就是为什么它已成为IT领域的典型代表。 随着技术的发展,数据的重要性变得更加突出。数据处理中使用的方法已经发生了显着变化,以适应软件机构对数据输入的不断增长的需求。 随着时间的增长,数据处理的时间框架急剧缩短,以至于立即处理的输出有望满足最终用户的更高期望。随着人工智能的出现,人们强烈希望为看起来像人类的最终用户提供实时帮助。 此要求仅取决于数据处理强度。越快越好。因此,结果是处理数据的方式发生了变化。较早之前,在指定的延迟之后,有成批的输入被输入到系统中,从而将处理后的数据作为输出。 目前,这种延迟(延迟)是输入性能,处理时间和输出的结果,这已成为性能的主要标准之一。为了确保高性能,延迟必须最小化到几乎是实时的程度。 这就是数据流出现的方式。在数据流处理中,实时数据流作为输入传递,必须立即进行处理,并实时传递输出信息流。

什么是数据流?

数据流传输是一种方法,其中不按常规的批处理方式发送输入,而是以连续流的形式发布该流,并按原样使用算法进行处理。还以连续数据流的形式检索输出。 该数据流是使用数千个源生成的,这些源同时以小尺寸发送数据。这些文件背对背发送时形成连续的流程。这些可能是大量发送的日志文件以进行处理。 这种作为流出现的数据必须被顺序处理以满足(几乎)连续实时数据处理的要求。

为什么需要数据流?


随着企业在线人数的增加以及随之而来的对数据的依赖,人们已经意识到了数据的方式。数据科学和分析技术的出现导致大量数据的处理,为实时数据分析,复杂数据分析,实时流分析和事件处理提供了可能性。

当输入数据大小庞大时,需要进行数据流传输。我们需要先存储数据,然后再将其移动以进行批处理。由于数据以多批次的形式存储,因此涉及大量时间和基础架构。为了避免所有这些情况,信息以小数据包的形式连续流传输以进行处理。数据流提供超可伸缩性,这仍然是批处理的挑战。

使用数据流传输的另一个原因是要提供近乎实时的体验,其中最终用户在输入数据时会在几秒钟或几毫秒内获得输出流。

当数据源似乎无穷无尽且无法为批处理中断时,也需要进行数据流传输。 IoT传感器在此类别中发挥了重要作用,因为它们会生成连续的读数,需要对其进行处理以得出推论。

数据流如何发生?


为了通过实时处理数据做出即时决策,可以进行数据流传输。 根据系统的规模,复杂性,容错性和可靠性要求,您可以使用工具,也可以自己构建。

自行构建它意味着您需要在编码角色之前将事件放置在诸如Kafka之类的消息代理主题中。 这里的参与者是一段代码,旨在接收来自代理中的问题的事件(即数据流),然后将输出发布回代理。

Spark是第一代Streaming Engine,它要求用户编写代码并将其放置在actor中,他们可以进一步将这些actor连接在一起。 为了避免这种情况,人们经常使用Streaming SQL进行查询,因为它使用户可以轻松地查询数据而无需编写代码。 流SQL是对SQL的扩展支持,可以运行流数据。 此外,由于SQL在数据库专业人员中已得到很好的实践,因此执行流式SQL查询将更加容易,因为它基于SQL。

这是用例的流式SQL代码,在这种情况下,如果池中的温度在2分钟内下降了7度,则必须向用户发送警报邮件。

@App:name("Low Pool Temperature Alert")

@App: description('An application which detects an abnormal decrease in swimming pools temperature.')

@source(type='kafka',@map(type='json'),bootstrap.servers='localhost:9092',topic.list='inputStream',group.id='option_value',threading.option='single.thread')

define stream PoolTemperatureStream(pool string, temperature double);

@sink(type='email', @map(type='text'), ssl.enable='true',auth='true',content.type='text/html', username='sender.account', address='sender.account@gmail.com',password='account.password', subject="Low Pool Temperature Alert", to="receiver.account@gmail.com")

define stream EmailAlertStream(roomNo string, initialTemperature double, finalTemperature double);

--Capture a pattern where the temperature of a pool decreases by 7 degrees within 2 minutes

@info(name='query1')

from every( e1 = PoolTemperatureStream ) -> e2 = PoolTemperatureStream [e1.pool == pool and (e1.temperature + 7.0) >= temperature]

    within 2 min

select e1.pool, e1.temperature as initialTemperature, e2.temperature as finalTemperature

insert into EmailAlertStream;

Spark SQL提供DSL(特定于域的语言),这将有助于以不同的编程语言(例如Scala,Java,R和Python)操纵DataFrame。 它使您可以使用SQL或DataFrame API对Spark程序内部的结构化数据执行查询。 Kafka等新一代流引擎也支持Kafka SQL或KSQL形式的Streaming SQL。

尽管流处理的过程大致相同,但此处重要的是根据用例要求和可用的基础结构选择流引擎。 在得出结论之前,什么时候使用Spark Streaming和什么时候使用Kafka Streaming,让我们首先探索Spark Streaming和Kafka Streaming的基础知识,以更好地理解。

什么是Spark Streaming?

Spark Streaming是核心Spark API的扩展,可让其用户执行实时数据流的流处理。 它从Kafka,Flume,Kinesis或TCP套接字等来源获取数据。 可以使用复杂的算法对这些数据进行进一步处理,这些复杂的算法使用诸如map,reduce,join和window之类的高级功能表示。 最终输出(即处理后的数据)可以推送到诸如HDFS文件系统,数据库和实时仪表板之类的目标。

让我们仔细看看Spark Streaming的工作原理。 Spark Streaming从数据源以数据流的形式获取实时输入,并将其进一步分为几批,然后由Spark引擎处理以生成大量输出。 Spark Streaming允许您将机器学习和图形处理用于数据流以进行高级数据处理。它还提供了代表连续数据流的高级抽象。 数据流的这种抽象称为离散流或DStream。该DStream可以通过对Kafka,Flume和Kinesis等来源的数据流或其他DStream进行高级操作来创建。 这些DStream是RDD(弹性分布式数据集)的序列,RDD是分布在计算机集群上的多个只读数据集。这些RDD以容错方式进行维护,使其具有高度鲁棒性和可靠性。DStreams序列Spark Streaming使用Spark Core的快速数据调度功能来执行流分析。从诸如Kafka,Flume,Kinesis等之类的源中以迷你批的形式摄取的数据用于执行数据流处理所需的RDD转换。


Spark Streaming使您可以根据需要使用Scala,Java或Python编写程序来处理数据流(DStreams)。由于此处将用于批处理的代码用于流处理,因此使用Spark Streaming实现Lambda体系结构(将批处理和流处理混合在一起)变得容易得多。但这是以等于最小批处理持续时间的延迟为代价的。 Spark Streaming中的输入源 Spark支持主要来源,例如文件系统和套接字连接。另一方面,它也支持高级资源,例如Kafka,Flume,Kinesis。只有添加额外的实用程序类,才能获得这些出色的资源。 您可以使用以下工件链接Kafka,Flume和Kinesis。

kafka:spark-streaming-kafka-0-10_2.12

flume:spark-streaming-flume_2.12

Kinesis:spark-streaming-kinesis-asl_2.12

什么是Kafka流媒体?

Kafka Stream是一个客户端库,可让您处理和分析从Kafka接收的数据输入,并将输出发送到Kafka或其他指定的外部系统。 Kafka依赖于流处理概念,例如: 准确区分事件时间和处理时间 窗口支持 高效直接的应用程序状态管理 通过利用Kafka中的生产者和消费者库来利用Kafka的本机功能,从而简化了应用程序开发,从而使其更加直接和快捷。正是由于这种原生的Kafka潜力,使得Kafka流式传输可以提供数据并行性,分布式协调,容错性和操作简便性。 Kafka Streaming中的主要API是提供多个高级运算符的流处理DSL(特定于域的语言)。这些运算符包括:筛选器,映射,分组,窗口,聚合,联接和表的概念。 Kafka中的消息传递层对进一步存储和传输的数据进行分区。根据状态事件在Kafka流中对数据进行分区,以进行进一步处理。通过将拓扑划分为多个任务来缩放拓扑,其中为每个任务分配了输入流中的分区列表(Kafka主题),从而提供了并行性和容错能力。

Kafka可以进行状态转换,与Spark Streaming中的批处理不同。 它在其主题内存储状态,流处理应用程序将其用于存储和查询数据。 因此,其所有操作均受状态控制。 这些状态还用于连接主题以形成事件任务.Kafka中基于状态的操作 这是由于Kafka中基于状态的操作使其具有容错能力,并允许从本地状态存储中自动恢复。 Kafka Streaming中的数据流是使用表和KStreams的概念构建的,这有助于它们提供事件时间处理。

Spark Streaming与Kafka Streaming:

何时使用什么 Spark Streaming使您可以灵活地选择任何类型的系统,包括具有lambda架构的系统。但是,Spark Streaming的延迟范围从毫秒到几秒。 如果延迟不是一个重要的问题,并且您正在寻找在源兼容性方面的灵活性,那么Spark Streaming是最佳选择。可以在EC2,Hadoop YARN,Mesos或Kubernetes上使用独立的集群模式运行Spark Streaming。 它可以访问HDFS,Alluxio,Apache Cassandra,Apache HBase,Apache Hive和许多其他数据源中的数据。它提供了容错能力,还提供了Hadoop分发。 此外,在Spark流式传输的情况下,您不必为批处理和流式传输应用程序分别编写多个代码,在这种情况下,单个系统可以同时满足这两种情况。 另一方面,如果延迟是一个重要问题,并且必须坚持以短于毫秒的时间范围进行实时处理,则必须考虑使用Kafka Streaming。由于事件驱动处理,Kafka Streaming提供了高级的容错能力,但是与其他类型的系统的兼容性仍然是一个重要的问题。此外,在高可伸缩性要求的情况下,Kafka具有最佳的可伸缩性,因此非常适合。

如果您要处理从Kafka到Kafka的本机应用程序(输入和输出数据源都在Kafka中),则Kafka流式传输是您的理想选择。 虽然Kafka Streaming仅在Scala和Java中可用,但Spark Streaming代码可以用Scala,Python和Java编写。 结束语 随着技术的发展,数据也随着时间大量增长。处理此类海量数据的需求以及对实时数据处理的日益增长的需求导致了数据流的使用。通过几种数据流方法,尤其是Spark Streaming和Kafka Streaming,全面了解用例以做出最适合需求的最佳选择变得至关重要。 在用例中优先考虑需求对于选择最合适的流技术至关重要。鉴于事实,Spark Streaming和Kafka Streaming都是高度可靠的,并且广泛推荐作为Streaming方法,它在很大程度上取决于用例和应用程序,以确保最佳效果。 在本文中,我们指出了两种流传输方法的专业领域,以便为您提供更好的分类,这可以帮助您确定优先级并做出更好的决策。

作者 east
Kafka, Spark 9月 5,2020

Spark Streaming读取kafka直连案例

Spark Streaming读取kafka有2种方式:基于Receiver的方式和 基于Direct的方式 。 Direct 更高效, 负责追踪消费的offset 可以用redis或mysql来保存。

 
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
object DirectKafka {
def main(args: Array[String]): Unit = {
if (args.length < 2){ System.err.println( s""" |DirectKafka
| is a list of one or more kafka brokers
| is a list of one or more kafka topics
""".stripMargin)
System.exit(1)
}


/*启动zookeeper
cd /root/kafka/kafka_2.11-1.0.0/bin
./zookeeper-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/zookeeper.properties

启动kafka
cd /root/kafka/kafka_2.11-1.0.0/bin
./kafka-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/server.properties

启动kafka生产者
./kafka-console-producer.sh --broker-list master:9092 --topic kafka_test

提交任务
spark-submit --class com.kafka.DirectKafka \
--master spark://master:7077 \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
/root/datafile/SparkStreamingKafka-1.0-SNAPSHOT-jar-with-dependencies.jar \
master:9092 kafka_test
*/

val sparkConf = new SparkConf().setAppName("SparkStreaming-DirectKafka")
val sc = new SparkContext(sparkConf)
val Array(brokers, topics) = args
val ssc = new StreamingContext(sc, Seconds(2))
val topicset = topics.split(",").toSet
val KafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, KafkaParams, topicset)
directKafkaStream.print()
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map(._2) .flatMap(.split(" "))
.map(x => (x, 1L))
.reduceByKey(_ + _)
.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
rdd.take(10).foreach(println)
}
ssc.start()
ssc.awaitTermination()
}
}


import com.alibaba.fastjson.JSON
import main.scala.until.dataschema
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import scalikejdbc.config.{DBs, DBsWithEnv}
import scalikejdbc._
import main.scala.until.ParamsUtils
import main.scala.until.SparkUtils

object readkafka {
  def main(args: Array[String]): Unit = {
    if (args.length != 1){
      System.err.println("please input data args")
      System.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("SparkStreaming-test")
      .setMaster("local[*]")
      .set("spark.testing.memory","2147480000")
    val sc = new SparkContext(sparkConf)

//topic : spark_example_topic , countly_event ,countly_imp
//broker : 172.31.2.6:9292,172.31.2.7:9292,172.31.2.8:9292

//    val ssc = new StreamingContext(sc, Seconds(2))
    val ssc = new StreamingContext(sc, Seconds(2))

    val messages = new SparkUtils(ssc).getDirectStream(ParamsUtils.kafka.KAFKA_TOPIC)
    messages.print()

//    SparkUtils.apply(null).getDirectStream()

//-------------------------------------------------------------------------------

    // messages 从kafka获取数据,将数据转为RDD
    messages.foreachRDD((rdd, batchTime) => {
      import org.apache.spark.streaming.kafka.HasOffsetRanges
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges   // 获取偏移量信息
      /**
        * OffsetRange 是对topic name,partition id,fromOffset(当前消费的开始偏移),untilOffset(当前消费的结束偏移)的封装。
        * *  所以OffsetRange 包含信息有:topic名字,分区Id,开始偏移,结束偏移
        */
      println("===========================> count: " + rdd.map(x => x + "1").count())
     // offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset))
      for (offset <- offsetRanges) {
        // 遍历offsetRanges,里面有多个partition
        println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset)
        DBs.setupAll()
        // 将partition及对应的untilOffset存到MySQL中
        val saveoffset = DB localTx {
          implicit session =>
           sql"DELETE FROM offsetinfo WHERE topic = ${offset.topic} AND partitionname = ${offset.partition}".update.apply()
            sql"INSERT INTO offsetinfo (topic, partitionname, untilOffset) VALUES (${offset.topic},${offset.partition},${offset.untilOffset})".update.apply()
        }
      }
    })

    // 处理从kafka获取的message信息
    val parameter = messages.flatMap(line => {
      //获取服务端事件日期 reqts_day
      val reqts_day = try {
        new DateTime(JSON.parseObject(line._2).getJSONObject("i").getLong("timestamp") * 1000).toDateTime.toString("yyyy-MM-dd HH:mm:ss")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //获取 设备号
      val cookieid = try {
        JSON.parseObject(line._2).getJSONObject("d").get("d")    //将Json字符串转化为相应的对象  .getString("kid")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //组合成一个字符串
      val data = reqts_day + "##" + cookieid
      Some(data)       //some是一定有值的, some.get获取值,如果没有值,会报异常
    }).map(_.split("##")).map(x => (x(0),x(1)))

    println("------------------")

    parameter.foreachRDD{ rdd =>

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._
      // 转换成DataFrame
      val SaveParameter = rdd.map(w => dataschema(w._1.toString,w._2.toString)).toDF("data_date","cookies_num")
      // 注册视图
      SaveParameter.createOrReplaceTempView("dau_tmp_table")
      val insertsql =sqlContext.sql("select * from dau_tmp_table")
      insertsql.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/userprofile_test","dau_tmp_table",ParamsUtils.mysql.mysqlProp)

    }

    messages.print()
    ssc.start()
    ssc.awaitTermination()

  }
}
作者 east

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.