gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具
    • SVG转PDF/Word
    • SVG转Draw.io可二次编辑格式
    • js代码混淆
    • json格式化及任意折叠展开
    • PDF常用工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面8 )
大数据开发, 面试 8月 2,2024

剑指Offer-大数据最全面试题整理

​

大数据时代已经到来,数据科学家、大数据工程师、数据分析师等岗位成为了热门职业。如果你正准备面试,想要脱颖而出,那么《大数据最全面试题-Offer直通车》是你的不二选择。

全面大数据面试知识体系:本专栏汇集了多篇超过1万字的精华内容,总计超百万字的面试题总结。包括程序员入职新公司如何快速上手项目、大数据面试英文自我介绍参考、大数据运维应用场景面试题汇总及参考答案等。无论是数据仓库、Flink/Spark技术,还是大数据各类技术面试,本书都为你提供了最全面的试题和参考答案。由于Flink实时计算是面试难点,更多多篇Flink难点详解!

从面试到入职全面保驾护航:面试应有尽有的各类技术面试题,还提供英文面试和综合素质建议。面试有结果时,提供谈薪建议;入职后,提供如何快速上手项目,如何利用AI快速熟悉代码。

物超所值代码和工具分享:本专栏分享超多自己工作珍藏,亲试可用的ETL工具,包括监控大数据组件异常并重启、自动远程监控磁盘日志空间和清理、API启动海豚调度器工作流等等,马上可以应用到你的新工作当中,为新工作加分!

无论你是大数据领域的新手还是有一定经验的老手,本专栏都能为你提供宝贵的参考和指导。无论你是准备面试还是想要提升自己的技能,本书都能帮助你更好地应对挑战。

现在就加入我们,开启你的大数据面试之旅吧!让《大数据最全面试题-Offer直通车》成为你的得力助手,助你顺利通过面试,迈向大数据领域的成功之路!

本书共分为以下几个部分:

  1. 程序员入职新公司如何快速上手项目:本部分将为你提供实用的建议,帮助你在入职新公司后迅速融入团队,快速上手项目。
  2. 大数据面试英文自我介绍参考:本部分为你提供了英文自我介绍的参考范文,帮助你在面试中展现自信、流利的一面。
  3. 大数据运维应用场景面试题汇总及参考答案:本部分汇总了大量大数据运维应用场景的面试题及参考答案,让你在面试中轻松应对各种问题。
  4. 数据仓库篇:本部分详细介绍了数据仓库的面试题、数据质量监控和处理方法最佳实践、数据仓库建模方法等内容。
  5. Flink/Spark技术篇:本部分重点讲解了Flink和Spark技术的面试题及参考答案,包括生产实践应用场景、Join相关问题、水印(Watermark)等方面的问题。
  6. 大数据各类技术面试篇:本部分涵盖了Hive、Elasticsearch、Kafka、Zookeeper等大数据技术的面试题及参考答案,让你在面试中全面展示自己的技术实力。
  7. 公司篇:本部分针对汇丰银行、华为云等知名企业的大数据面试题进行了汇总和分析,帮助你更好地了解各大公司的面试要求和侧重点。 目录  综合篇 数据仓库篇 精通SQL篇 Flink/Spark技术篇 BI报表篇 调度器篇 ETL工具篇 大数据各类技术面试篇 公司篇

​

  综合篇

装上大模型翅膀,程序员入职新公司如何快速上手代码(老员工如何选择大模型编程如虎添翼)

如何把自己卖个好价钱:实战面试谈薪水

做好这些不用担心试用期不通过:程序员入职新公司如何快速上手项目

本人遇到大数据面试题和参考答案(超过1万字精华版)

大数据面试英文自我介绍参考(万字长文)

大数据运维应用场景面试题汇总及参考答案(持续更新)

大数据大厂校招网申入口最全集合(持续更新)

 技术简历优化秘籍:迈向求职成功第一步

 最全大数据学习路线指南:大数据知识点汇总保姆级教程(2万字长文)

 外企面企必备:IT技术面试英文高频面试题

大厂面试智力题大全(详细解题思路,持续更新)

从上千份大厂面经呕心沥血整理:大厂高频手撕面试题(数据结构和算法篇 ,Java实现亲试可跑)

面试高频高阶问题:2万字长文详解JDK源码用到哪些设计模式

数据仓库篇

万字数据仓库面试题及参考答案

数据仓库数据质量监控和处理方法最佳实践

数据仓库建模方法万字详解

Doris的3种数据模型详解和数据仓库每一层的模型选用

大数据面试临阵磨枪不知看什么?看这份心理就有底了-大数据常用技术栈常见面试100道题

数据中台高频面试题及参考答案(持续更新)

 大数据面试必问的数据治理面试题大全及参考答案

数据中台/数据仓库必问的数量质量控制面试题

大数据架构师选型必懂:大数据离线数仓开发框架详解与对比(hive、Spark SQL、Impala、Doris) 大数据平台符合信创(CDH国产化代替)详细方案(企业内部不外传方案)

数据分析师必懂知识和高频问题:如何平衡数据分析需求与个人隐私保护之间的矛盾

数据中台或数仓如何避免数仓模型 “烟囱式” 建设保姆级教程

精通SQL篇

3万字长文:SQL Server面试题和参考答案(持续更新)

DBA必懂选型:MySQL、PostgreSQL与Oracle对比研究

Flink/Spark技术篇

KeyedProcessFunction 在 Flink项目中的应用实战

吃透Flink State面试题和参考答案

Flink面试必问题:时间和窗口处理面试题及参考答案(3万字长文)

Flink必问面试题:生产实践应用场景相关问题汇总及参考答案(3万字长文)

Flink必问面试题:Join相关问题汇总及参考答案

Flink必问面试题:水印(Watermark)30题及参考答案

3万字长文-大数据Yarn最全面试题及参考答案(持续更新)

PySpark面试题精选及参考答案(3万字长文)

Pyspark和Pandas语法差异和调试技巧(附总结出来直接用代码)PySpark JDBC 读写 MySQL 数据库保姆级指南

Spark Mahout入门和精通必懂问题(3万字长文)

Spark MLLib面试题你会几道?(万字长文)

从 Spark 离线数仓到 Flink 实时数仓:实战指南

Apache Flink在IoT指标开发流处理全过程案例

Flink assignTimestampsAndWatermarks 深度解析:时间语义与水印生成

万字长文讲解团队使用Spark中选型,使用Scala、Java还是Python?

Flink Lookup Join的工作原理、性能优化和应用场景

 Flink难点和高阶面试题:Flink的状态管理机制如何保证数据处理的准确性和完整性

 万字长文看懂Flink的架构及原理

 万字长文看懂Flink窗口基本理论、实现原理及和传统SQL窗函数区别

企业实战干货分享:Flink的实时数仓误差原因详解,如何利用离线计算修正结果保姆级教程

Hive/Hadoop篇 

大数据必懂知识点:Parquet、ORC还是Avro作为数据存储格式,哪种在性能和压缩率上更优

 万字长文详解Hadoop切片原理及高频面试题

万字长文讲透HDFS的高可用机制

Hive时间窗口函数保姆级教程(最全解析、应用和优化)(持续更新)

大数据开发工程师必懂的Hive调优与实战保姆指南

开发和面试必懂:Hive在开发和运维各种常见坑分析

编程语言篇

Java/Scala篇

Java多线程和并发编程面试题和参考答案100多道(持续更新)

深入解析大数据Scala面试题及参考答案(持续更新)

大数据手写面试题Scala语言实现大全(持续更新)

NIO和Netty保姆级5万字面试题及参考答案

Java中的Lock、synchronize、CAS关系及其应用场景

 进阶面试题:Java反射机制最全面试题及参考答案

大厂校招必懂:Java反射机制原理详解(2万字长文)

 Java架构师必知的 JVM 调优知识

Python篇

数据分析必问:Pandas面试题及参考答案

用python工具实现自动检测报表缺失哪些天日期的数据(亲测可用)

Nosql篇

2万字长文Doris运维问题大全及参考答案(持续更新)

Apache kylin面试题50道题及参考答案(2万字长文)

一文搞懂MongoDB面试题(2万字长文)

精通Opentsdb面试(3万字长文)

Cassandra面试题及答案详解(3万字长文)

时序数据库InfluxDB面试题和参考答案

4万字长文:TDengine 100道面试题及参考答案

Hbase高阶知识:HBase的协处理器(Coprocessor)原理、使用实例、高级技巧和案例分析

Lucene最新最全面试题及参考答案

BI报表篇

Tableau面试题及参考答案

Quick BI最全最新面试题及参考答案(2万字长文)

FineReport高频面试题及参考答案

调度器篇

海豚调度器自动监测每日报表及自动重跑异常工作流(综合应用可用代码

2万字长文:海豚调度器(DolphinScheduler)面试题深入了解

海豚调度器(DolphinScheduler)生产环境问题及解决方案汇总(持续更新)

一文看懂Oozie面试题及参考答案

海豚调度器利用API来自动补数的源码分析和亲测可用实例

3万字长文:Azkaban最全参考答案和面试题(持续更新)

 海豚调度器用得好,运维人员少加班 —— 高级技巧与使用教程

无人值守大数据平台(CDH6.3.2+Flink+海豚调度器)如何实现大数据平台稳定及顺利跑出离线报表和实时报表(持续更新方案

CDH清理磁盘空间完全攻略和完整实现自动化脚本(大数据清除日志)

ETL工具篇

海豚调度器调用api接口启动工作流(亲试可用)

利用Cloudera Manager API来监控CDH大数据组件并异常重启实例

CDH远程监控所有HDFS节点磁盘空间和自动清除日志

ETL利器:Kettle 2万字长文详解面试题

2万字长文带你看懂Talend常见面试题及参考答案

 Apache NiFi最全面试题及参考答案

 大厂篇

腾讯大数据开发面试题及参考答案(持续更新)

字节跳动后端或大数据基础知识面试题及参考答案(2万字长文)

阿里大数据面试题集锦及参考答案(3万字长文:持续更新)

 百度大数据开发面试题集锦及参考答案(持续更新)

美团大数据开发最新最全面试题及参考答案(持续更新)

万字长文-汇丰银行大数据面试题(持续更新)

 虾皮Shopee大数据面试题及参考答案

欢聚时代(BIGO)大数据面试题及参考答案(4万字长文)

 汇量科技大数据面试题及参考答案

 作业帮大数据面试题及参考答案

 唯品会大数据面试题及参考答案(3万字长文)

B站(哔哩哔哩/bilibili)大数据面试题及参考答案(3万字长文)

 大厂面试:小米大数据面试题大全及参考答案(130+面试题 12万长文)

2024年携程大数据开发面试题及参考答案

 2024年携程大数据分析面试题及参考答案

 进BAT必懂:大厂高频八股文面试题及参考答案(6万字长文)

 大厂面试:小红书大数据面试题及参考答案(3万字长文)

 大厂面经:京东大数据面试题及参考答案(3万字长文)

 大厂面经:滴滴大数据面试题及参考答案(3万字长文)

5万字长文吃透快手大数据面试题及参考答案(持续更新)

 2024年最全网易大数据面试题及参考答案(3万字长文持续更新)

字节跳动数据分析面试题及参考答案

知乎大数据开发面试题及参考答案

知乎数据分析面试题及参考答案

腾讯数据分析面试题及参考答案

腾讯微信大数据面试题及参考答案

soul大数据面试题及参考答案

米哈游大数据面试题及参考答案

富途证券大数据面试题及参考答案

OPPO 数据分析面试题及参考答案

新浪微博大数据面试题及参考答案(数据开发和数据分析)

滴滴数据分析80道面试题及参考答案

昆仑万维大数据面试题及参考答案

消息队列篇

Kafka 面试题及参考答案(持续更新)

ZeroMQ最全面试题解读(3万字长文)

StormMQ从入门到精通面试题及参考答案

行业场景案例篇

一文吃透物联网(IoT)的面试题及参考答案

面试或开发必懂场景案例:物联网(Iot)把数据补齐和转换成分钟级数据的详细案例(完整代码实现和解释)

管理监控篇

Prometheus面试题精选及参考答案(2万字长文)

Grafana面试题精选和参考答案

Nagios高频面试题及参考答案(2万字长文)

Ganglia面试大全及参考答案(2万字长文 )

数据安全篇

密码学与信息安全面试题及参考答案(2万字长文)

Linux/Shell

Linux Shell面试题大全及参考答案(3万字长文)

大数据各类技术面试篇

最全Hive面试题2024年(2万字详解)

Elasticsearch 面试题及参考答案:深入解析与实战应用

深入解析Zookeeper面试题及参考答案(超过万字精华篇)

Apache Iceberg最新最全面试题及详细参考答案(持续更新)

最新最全Sqoop面试题及参考答案(持续更新)

Hudi面试题及参考答案:全面解析与实战应用

最新最全Delta Lake面试题及参考答案详解2万字精华(持续更新)

Kudu面试题及参考答案详解

Impala面试题及参考答案2万字详解

StarRocks 面试题及参考答案详解(万字详解)

万字长文:FineBI面试题及参考答案详解

万字长文,大数据PowerBI面试题及参考答案

HBase面试题及参考答案:深入理解大数据存储技术(2万字长文)

Pulsar高频面试题及参考答案(持续更新)

通往大厂之路:Solr面试题及参考答案100道题

Apache Atlas 50道面试题及参考答案

Metacat最新最全面试题及参考答案(持续更新)

大数据数据埋点技术面试题及参考答案(持续更新)

大厂PostgreSQL面试题100道及参考答案(5万字长文)

Presto最新最全面试题及参考答案(3万字长文)

最新5万字长文:Docker 100道面试题及参考答案

大厂Storm的100道面试题及参考答案(5万字长文)

万字长文:华为云DataArts面试题及参考答案

2万字长文:ELK高频面试题及参考答案

3万字长文:Airflow最新最全面试题及参考答案

通晓Git操作-git面试题及参考答案

物联网(IoT)及物联网网络协议面试题及参考答案(2万字长文)

Jenkins从入门到精通面试题及参考答案(3万字长文)

SVN 80道面试题及参考答案(2万字长文)

2万字长文详解Ambari面试题及参考答案

Apache Drill 2万字面试题及参考答案

SonarQube面试题一卷到底60问及参考答案(3万字长文) ​

作者 east
运维 8月 2,2024

笔记本电脑设备管理器找不到以太网适配器的原因分析和解决方案

可能的原因:

  1. 驱动程序问题:适配器的驱动程序可能已损坏、过时或未正确安装。
  2. 硬件故障:适配器本身可能存在物理损坏。
  3. 配置问题:BIOS/UEFI设置可能禁用了以太网适配器。
  4. 操作系统问题:系统文件损坏或者不兼容。
  5. 连接问题:适配器可能没有正确连接到主板上。
  6. 电脑没有配置:有的没有网口的笔记本电脑是没有以太网卡的,需要通过扩展坞来访问,可以具体咨询电脑厂商的技术支持。

解决方案:

  1. 重启电脑:有时候简单的重启就能解决问题。
  2. 检查硬件连接:
    • 如果是可拆卸的笔记本,尝试重新插入适配器模块。
    • 检查适配器的物理连接是否松动。
  3. 检查BIOS/UEFI设置:
    • 重启电脑并进入BIOS/UEFI设置(通常按F2、F10、Del等键)。
    • 查找与网络相关的设置,并确保以太网适配器没有被禁用。
    • 保存更改并退出。
  4. 更新或重新安装驱动程序:
    • 打开“设备管理器”,找到“网络适配器”类别。
    • 如果适配器显示为灰色,右击它选择“启用设备”。
    • 如果适配器列表为空,尝试右击空白处选择“扫描检测硬件改动”。
    • 如果适配器出现但有黄色感叹号,右击它选择“更新驱动程序”。
    • 如果无法自动更新,访问制造商网站下载最新驱动程序并手动安装。
  5. 使用系统还原:
    • 如果您之前创建了系统还原点,可以尝试回滚到一个较早的时间点。
    • 开始菜单 > “控制面板” > “系统和安全” > “系统” > “系统保护” > “系统还原”。
  6. 检查操作系统问题:
    • 使用Windows自带的故障排除工具进行诊断。
    • 运行命令提示符作为管理员,输入sfc /scannow来检查和修复系统文件。
  7. 专业维修:
    • 如果以上方法都无法解决问题,可能是硬件故障,需要专业的技术支持人员检查。
作者 east
运维 8月 2,2024

联想笔记本电脑用了扩展坞不到以太网适配器原因分析及解决方案

可能的原因:

  1. 扩展坞问题:扩展坞本身可能存在问题,如硬件故障或不兼容。
  2. 驱动程序问题:扩展坞所需的驱动程序可能未正确安装或已损坏。
  3. 连接问题:Type-C接口或线缆可能存在接触不良或故障。
  4. 系统设置问题:某些系统设置可能会导致适配器不可见。
  5. BIOS/UEFI设置问题:BIOS/UEFI中的某些设置可能会影响扩展坞的功能。
  6. 操作系统的兼容性问题:您的操作系统可能不完全支持扩展坞上的以太网适配器。

解决方案:

  1. 检查扩展坞和线缆:
    • 确保Type-C线缆和扩展坞之间的连接牢固且无损。
    • 尝试更换Type-C线缆或使用不同的Type-C端口。
    • 确认扩展坞上的以太网接口指示灯是否正常工作。
  2. 检查驱动程序:
    • 访问联想官网,查找适用于您笔记本型号和扩展坞的最新驱动程序。
    • 下载并安装最新的以太网适配器驱动程序。
    • 如果扩展坞附带有驱动程序或软件,请确保它们也被正确安装。
  3. 检查BIOS/UEFI设置:
    • 重启电脑并进入BIOS/UEFI设置。
    • 检查与USB或Type-C端口相关的设置,确保它们处于启用状态。
    • 保存更改并退出。
  4. 系统故障排除:
    • 在“设备管理器”中,展开“网络适配器”类别,查看是否能看到通过扩展坞连接的适配器。
    • 如果看到适配器但有黄色警告标志,尝试更新驱动程序或禁用后重新启用适配器。
    • 如果适配器完全不可见,尝试“扫描检测硬件改动”来刷新设备列表。
  5. 运行Windows故障排除工具:
    • 打开“设置” > “更新与安全” > “故障排除”。
    • 寻找“以太网连接”或“网络适配器”的故障排除选项并运行它。
  6. 检查操作系统版本:
    • 确保您的操作系统是最新的,并且安装了所有必要的更新。
  7. 尝试其他扩展坞或电脑:
    • 将扩展坞连接到另一台电脑上测试,看是否能正常工作。
    • 将另一台已知工作的扩展坞连接到您的笔记本电脑上,确认是否能识别以太网适配器。

如果上述步骤都不能解决问题,建议通过联想官方网站联系联想的技术支持获取更进一步的帮助。他们可能会建议您进行更深入的硬件检查或提供具体的解决方案。

作者 east
运维 7月 25,2024

解决aide严重影响大数据计算时间问题

大数据离线数仓上线后,由于源头数据倍增到几十亿,发现有的耗时任务跑了几个小时也跑不出结果。明明服务器配置不错,计算内存也还可以。调大计算资源内存后也发现无济于事。

后来发现服务器有aide在运行,严重影响磁盘IO。

iotop -oP
Total DISK READ : 234.02 M/s | Total DISK WRITE : 27.09 M/s
Actual DISK READ: 238.46 M/s | Actual DISK WRITE: 11.52 M/s
PID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND

49757 be/4 unbound 239.94 K/s 0.00 B/s 0.00 % 99.99 % du -sk /data/dfs/dn/current/BP-1594034144-10.0.0.1-1704683739059
33597 be/4 root 7.50 M/s 0.00 B/s 0.00 % 83.99 % aide –check
54216 be/4 root 9.19 M/s 0.00 B/s 0.00 % 83.77 % aide –check
13435 be/4 root 1622.13 K/s 0.00 B/s 0.00 % 83.35 % aide –check
7996 be/4 root 3.06 M/s 0.00 B/s 0.00 % 82.80 % aide –check
25673 be/4 root 8.77 M/s 0.00 B/s 0.00 % 82.68 % aide –check
25721 be/4 root 9.50 M/s 0.00 B/s 0.00 % 81.80 % aide –check
60644 be/4 root 2.11 M/s 0.00 B/s 0.00 % 81.00 % aide –check
44128 be/4 root 10.46 M/s 0.00 B/s 0.00 % 80.85 % aide –check
3670 be/4 root 10.14 M/s 0.00 B/s 0.00 % 80.18 % aide –check
38900 be/4 root 3.06 M/s 0.00 B/s 0.00 % 79.62 % aide –check
46920 be/4 root 9.72 M/s 0.00 B/s 0.00 % 79.49 % aide –check
36099 be/4 root 10.14 M/s 0.00 B/s 0.00 % 79.35 % aide –check
32724 be/4 root 10.46 M/s 0.00 B/s 0.00 % 79.04 % aide –check
21047 be/4 root 9.50 M/s 0.00 B/s 0.00 % 78.96 % aide –check
51881 be/4 root 12.46 M/s 0.00 B/s 0.00 % 77.87 % aide –check
13147 be/4 root 10.77 M/s 0.00 B/s 0.00 % 77.56 % aide –check
36436 be/4 root 10.56 M/s 0.00 B/s 0.00 % 77.34 % aide –check

原来, AIDE 是一款入侵检测工具,它的作用是监控文件系统的完整性,防止未经授权的更改。听起来很不错,对吧?但问题就出在这里。AIDE 在工作时,需要频繁地读取和比对磁盘上的大量文件信息,这就导致了磁盘 IO 操作的大幅增加。

而大数据计算过程,如果内存不够时,需要缓存到磁盘,这时AIDE占用了大量磁盘IO,就会严重拖慢整个大数据计算的进度。

不运行AIDE后,发现耗时的离线计算运行时间缩短为之前的几分之一。

作者 east
Spark 7月 18,2024

解决pyspark的py4j.protocol.Py4JError: An error occurred while calling o84.getstate错误

在pyspark使用了udf,然后就报错下面错误:

 File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\serializers.py", line 587, in dumps
return cloudpickle.dumps(obj, 2)
File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 863, in dumps
cp.dump(obj)
File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 260, in dump
return Pickler.dump(self, obj)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 409, in dump
self.save(obj)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 736, in save_tuple
save(element)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 400, in save_function
self.save_function_tuple(obj)
File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 549, in save_function_tuple
save(state)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 634, in save_reduce
save(state)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 852, in _batch_setitems
save(v)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 496, in save
rv = reduce(self.proto)
File "D:\ProgramData\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1257, in call
answer, self.gateway_client, self.target_id, self.name)
File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "D:\ProgramData\Anaconda3\lib\site-packages\py4j\protocol.py", line 332, in get_return_value
format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o84.getstate. Trace:
py4j.Py4JException: Method getstate([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

Traceback (most recent call last):
File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\serializers.py", line 587, in dumps
return cloudpickle.dumps(obj, 2)
File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 863, in dumps
cp.dump(obj)
File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 260, in dump
return Pickler.dump(self, obj)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 409, in dump
self.save(obj)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 736, in save_tuple
save(element)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 400, in save_function
self.save_function_tuple(obj)
File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 549, in save_function_tuple
save(state)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 634, in save_reduce
save(state)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 852, in _batch_setitems
save(v)
File "D:\ProgramData\Anaconda3\lib\pickle.py", line 496, in save
rv = reduce(self.proto)
File "D:\ProgramData\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1257, in call
answer, self.gateway_client, self.target_id, self.name)
File "D:\ProgramData\Anaconda3\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "D:\ProgramData\Anaconda3\lib\site-packages\py4j\protocol.py", line 332, in get_return_value
format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o84.getstate. Trace:
py4j.Py4JException: Method getstate([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

刚开始以为pyspark的包有问题,后来仔细研究一下,才发现是下面的问题:

这个错误通常发生在尝试在 PySpark 中注册一个用户定义的函数(UDF)时,而该函数使用了不支持序列化的对象或方法。错误信息中的 __getstate__ 方法缺失提示 Py4J 无法正确地将 Python 对象转换为 Java 对象,这通常是因为 UDF 内部引用了 Spark DataFrame 函数或某些类实例,这些在 Py4J 上下文里是不可序列化的。

原来是在UDF用了lag 函数,这是不允许的,因为 UDF 必须是可序列化的,而 DataFrame 函数是不允许。

由于需要使用lag函数,解决方法是不用UDF来实现。

作者 east
数据仓库 7月 15,2024

大数据质量监控方法与实现

一、引言

在大数据时代,数据的质量直接关系到企业决策的准确性和业务发展的稳定性。本文旨在详细介绍大数据环境下数据质量的标准、监控方法以及相应的代码实现,确保数据的准确性、完整性、一致性和可靠性。我们将结合具体中间件和代码示例,全面阐述如何实现高效的数据质量监控。

二、数据质量标准

数据质量通常通过以下几个维度来衡量:

  1. 准确性:数据应真实反映实际情况,无错误或偏差。
  2. 完整性:数据应包含所有必需的信息,无遗漏。
  3. 一致性:同一实体在不同数据源或不同时间点的数据应保持一致。
  4. 时效性:数据应及时更新,满足业务需求。
  5. 可用性:数据应易于访问和使用,无格式或权限障碍。

三、数据质量监控方法

数据质量监控可以从多个层次进行,包括任务基线级别、任务级别与表级别、字段级别以及报表级别。

1. 任务基线级别监控

任务基线级别监控主要关注整个数据流水线(ETL任务)的运行状态和产出情况。

  • 监控内容:
    • 所有任务运行时长:与昨天运行时长对比,异常则报警。
    • 结果任务产出时间:与基线规定时间对比,未按时产出则预警。

实现方式:

  • 使用Apache Airflow等调度工具管理ETL任务,通过任务日志和执行时间监控任务运行时长和产出时间。
  • 配置Airflow的DAG(Directed Acyclic Graph)依赖关系,确保任务按序执行。

2. 任务级别 & 表级别监控

任务级别和表级别监控关注单个任务或表的运行状态和产出数据。

  • 监控内容:
    • 任务运行时长:与昨天运行时长对比。
    • 任务产出时间:与任务规定产出时间对比。
    • 表产出大小:与昨日分区大小对比。

实现方式:

  • 在ETL任务中添加日志记录功能,记录任务开始时间、结束时间和产出数据大小。
  • 使用Shell脚本或Python脚本定期检查日志文件,对比任务运行时长、产出时间和产出大小,异常则发送邮件或消息通知。

3. 字段级别监控

字段级别监控关注具体数据字段的质量,包括指标字段和维度字段。

  • 监控内容:
    • 指标字段:均值、最大值、最小值、中位数等,与昨天、近7天、近30天的数据进行对比。
    • 维度字段:维度覆盖率、维度占比、维度下指标的波动。

实现方式:

  • 使用Apache Spark进行大规模数据处理,通过Spark SQL计算字段的统计指标。
  • 结合Deequ等开源数据质量监控工具,编写DQC(Data Quality Center)任务,自动化监控字段质量。

代码示例:使用Deequ监控字段质量

import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.Check
import com.amazon.deequ.checks.CheckLevel

val verificationSuite = VerificationSuite()
  .onData(spark.table("your_table"))
  .addCheck(
    Check(CheckLevel.Error, "Mean of metric field")
      .isComplete("metric_field")
      .hasMeanEqualTo(0.5, 0.01)
  )
  .addCheck(
    Check(CheckLevel.Warning, "Dimension coverage of gender")
      .isComplete("gender")
      .hasMin("gender", 0.9)
  )

val result = verificationSuite.run()
result.show()

4. 报表级别监控

报表级别监控将监控结果可视化,便于项目组所有人直观查看数据变化。

  • 实现方式:
    • 使用Tableau、Power BI等数据可视化工具,将监控结果绘制成趋势图、仪表盘等。
    • 配置定时任务,自动更新报表数据。
作者 east
Hive, Spark 7月 7,2024

Hive、Spark SQL与Impala在大数据处理中的性能对比及应用分析

一、引言

在使用CDH6.3.2处理离线报表时,Hive、Spark SQL与Impala是三种主流的大数据处理工具。下面将对这三种工具进行详细的对比分析优缺点和适用场景。

二、Hive:Apache Hive

  1. 性能分析:Hive使用MapReduce作为其数据处理后端,适用于处理大规模数据集的批量查询和分析。然而,由于MapReduce的特性,Hive在处理实时或交互式查询时性能较差。一项由Hortonworks进行的测试显示,在处理1TB数据集时,Hive的平均查询时间约为20分钟。
  2. 优点:Hive提供了类似SQL的查询语言HiveQL,使得用户可以轻松地进行数据查询和管理。此外,Hive还支持多种数据格式,包括文本文件、序列文件、Avro等。
  3. 缺点:Hive的主要缺点是其查询速度慢,不适合实时或交互式查询。此外,Hive的资源消耗也较大,需要较大的内存和磁盘空间。
  4. 适用场景:Hive适合用于处理大规模数据集的批量查询和分析,例如日志分析、用户行为分析等。

三、Spark SQL:Apache Spark SQL

  1. 性能分析:Spark SQL使用Spark作为其数据处理后端,相较于Hive,Spark SQL的查询速度更快。一项由Databricks进行的测试显示,在处理1TB数据集时,Spark SQL的平均查询时间约为2分钟,比Hive快了近10倍。
  2. 优点:Spark SQL不仅查询速度快,而且支持实时查询和交互式查询。此外,Spark SQL还支持多种数据源,包括HDFS、Hive、Parquet、JSON、JDBC等。
  3. 缺点:Spark SQL的主要缺点是其资源消耗较大,需要较大的内存和CPU资源。此外,Spark SQL的学习曲线也比Hive陡峭。
  4. 适用场景:Spark SQL适合用于处理大规模数据集的实时查询和交互式查询,例如实时数据分析、交互式数据探索等。

四、Impala:Cloudera Impala

  1. 性能分析:Impala使用MPP(大规模并行处理)架构,相较于Hive和Spark SQL,Impala的查询速度更快。一项由Cloudera进行的测试显示,在处理1TB数据集时,Impala的平均查询时间约为1分钟,比Spark SQL快了一倍。
  2. 优点:Impala不仅查询速度快,而且支持实时查询和交互式查询。此外,Impala还支持多种数据格式,包括Parquet、ORC、Avro等。
  3. 缺点:Impala的主要缺点是其资源消耗较大,需要较大的内存和CPU资源。此外,Impala的兼容性也比Hive和Spark SQL差,不支持所有的Hive表类型和函数。用Impala做ETL时,会出现显示执行成功,但时间没有把查询结果成功写入。
  4. 适用场景:Impala适合用于处理大规模数据集的实时查询和交互式查询,例如实时数据分析、交互式数据探索等。

五、结论

总的来说,Hive、Spark SQL与Impala各有优劣。Hive适用于处理大规模数据集的批量查询和分析,其查询语言易于理解,但查询速度相对较慢。Spark SQL则在查询速度上有了显著提升,同时支持实时和交互式查询,但在资源消耗上相对较高。Impala在查询速度上更胜一筹,尤其适合实时和交互式查询,但其资源消耗和兼容性是其主要缺点。

在选择这些工具时,应根据具体的应用场景和需求来决定。例如,如果是在进行大规模数据的批量查询和分析,且对查询速度的要求不高,那么Hive是一个不错的选择。如果是在进行大规模数据的实时查询和交互式查询,且对查询速度有较高的要求,那么Spark SQL或Impala可能更适合。

此外,还应考虑到系统的资源状况。如果系统资源充足,那么可以选择Spark SQL或Impala,但如果系统资源有限,那么可能需要考虑Hive或其他资源消耗较小的工具。

总的来说,Hive、Spark SQL与Impala都是优秀的大数据处理工具,它们各有优势和劣势,没有绝对的好坏之分,只有是否适合具体的应用场景和需求。

在实际应用中,我们也可以考虑将这些工具结合使用,以发挥各自的长处。例如,可以使用Hive进行大规模数据的批量查询和分析,使用Spark SQL或Impala进行实时和交互式查询,或者使用Spark SQL进行数据预处理,然后使用Hive或Impala进行数据查询和分析。

作者 east
Kafka, 运维 6月 27,2024

CDH6.3.2一台服务器宕机后kafka集群无法选举leader

cdh6.3.2集群有一台服务器宕机了,重新恢复后,发现kafka集群无法正常启动,报错日志如下,其中 TopicRunData 是kafka消费的topic。

1、错误分析

[Controller id=469 epoch=67] Controller 469 epoch 67 failed to change state for partition TopicRunData-2 from OfflinePartition to OnlinePartition kafka.common.StateChangeFailedException: Failed to elect leader for partition
TopicRunData -2 under strategy OfflinePartitionLeaderElectionStrategy at kafka.controller.PartitionStateMachine$$anonfun$doElectLeaderForPartitions$3.apply(PartitionStateMachine.scala:390) at kafka.controller.PartitionStateMachine$$anonfun$doElectLeaderForPartitions$3.apply(PartitionStateMachine.scala:388) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.controller.PartitionStateMachine.doElectLeaderForPartitions(PartitionStateMachine.scala:388) at kafka.controller.PartitionStateMachine.electLeaderForPartitions(PartitionStateMachine.scala:315) at kafka.controller.PartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:225) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:141) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:123) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:109) at kafka.controller.KafkaController.kafka$controller$KafkaController$$onBrokerStartup(KafkaController.scala:382) at kafka.controller.KafkaController$BrokerChange$.process(KafkaController.scala:1318) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:94) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:94) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:94) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:93) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

从提供的错误信息来看,Kafka Broker中存在4个离线分区(offline partitions),并且在尝试将其中一个分区
TopicRunData -2从OfflinePartition状态转换到OnlinePartition状态时失败了。具体原因是未能为该分区选举出领导者(leader)。这个异常是由kafka.common.StateChangeFailedException引发的,并指出在使用OfflinePartitionLeaderElectionStrategy策略下无法选出分区领导者。

结合上面的场景, 这是由于服务器宕机造成分区的数据不完整或元数据损坏 。

使用kafka-topics.sh命令检查主题状态:

kafka-topics.sh --describe --topic TopicRunData --bootstrap-server cdh01:9092

看到信息如下:

Topic:TopicRunData	PartitionCount:3	ReplicationFactor:1	Configs:min.insync.replicas=1,segment.bytes=1073741824,retention.ms=604800000,max.message.bytes=1000000,min.cleanable.dirty.ratio=0.5,unclean.leader.election.enable=false,retention.bytes=-1,delete.retention.ms=604800000
Topic: TopicRunData	Partition: 0	Leader: 299	Replicas: 299	Isr: 299
Topic: TopicRunData	Partition: 1	Leader: 384	Replicas: 384	Isr: 384
Topic: TopicRunData	Partition: 2	Leader: none	Replicas: 298	Isr:

这意味着每个分区的数据只在一个broker上保存,没有副本。这样如果该broker发生故障,相应的分区数据将不可用,从而影响到数据的高可用性。通常建议至少设置Replication Factor为3以确保高可用。 可惜之前由于存储压力等原因只有1个副本。

分区2显示没有Leader(Leader: none),且ISR(In-Sync Replicas)列表为空。这表明分区2目前处于未分配状态,可能是由于负责该分区的broker(Replicas: 298)出现故障或者与ZooKeeper的通信出现问题。这种情况下,该分区的数据无法被消费或生产

2、手动选举leader

为
TopicRunData 主题中无leader的分区手动分配并重新选举leader。执行以下命令:

kafka-preferred-replica-election.sh --bootstrap-server cdh01:9092 --path-to-json-file partition.json

创建一个名为partition.json的文件,其中包含以下内容:

{
  "partitions": [
    {
      "topic": "TopicRunData",
      "partition": 2
    }
  ]
}

这将触发platformMutiRunData主题第2分区的leader重新选举。

partition.json 有可能需要更多参数,请根据实际情况调整或找更详细的教程。

3、清空Topic来解决问题

由于无法手动选择,解决时间又紧迫,根据分析可能丢失数据少,所以想清空TopicRunData 主题的数据,从外部重新导入数据到 TopicRunData 。

注意:这将删除
TopicRunData 主题的所有数据。在执行此操作之前,请确保您了解此操作的后果,并备份好相关数据。

清空 TopicRunData 主题的数据:

kafka-topics.sh --delete --topic
TopicRunData --bootstrap-server cdh01:9092

然后重新创建该主题(如果需要):

kafka-topics.sh --create --topic
TopicRunData --bootstrap-server cdh01:9092 --replication-factor 1 --partitions 3

检查Kafka集群状态:

kafka-consumer-groups.sh --bootstrap-server cdh01:9092 --describe --group

your_consumer_group请将your_consumer_group替换为您要检查的实际消费者组ID。

确保所有Kafka broker正常运行。

作者 east
mysql, 运维 6月 27,2024

centos离线安装mysql客户端(亲测可用)

由于项目的需要,需要在内网的centos服务器,有mysql客户端远程访问mysql服务器。

1. 下载MySQL客户端

首先,你需要下载MySQL客户端软件包。你可以从MySQL官方网站下载,或者使用CentOS的软件仓库。

https://dev.mysql.com/get/Downloads/MySQL-5.7/mysql-5.7.34-1.el7.x86_64.rpm-bundle.tar

2. 安装MySQL客户端

解压缩下载的文件,并安装RPM包:

tar xvf mysql-5.7.34-1.el7.x86_64.rpm-bundle.tar
rpm -ivh mysql-community-client-5.7.34-1.el7.x86_64.rpm

用命令 which mysql 看到下面结果时说明安装成功了:

/usr/bin/mysql

3. 连接到远程MySQL服务器

使用以下命令连接到远程MySQL服务器:

mysql -u your_username -p -h remote_server_ip

注意事项:

  • 确保远程MySQL服务器允许来自你的CentOS服务器的连接。你可能需要在远程服务器的MySQL配置文件(通常是my.cnf)中设置bind-address为0.0.0.0,并在mysql.user表中授予远程访问权限。
  • 确保防火墙规则允许两个服务器之间的3306端口(MySQL默认端口)的流量。
  • 如果你使用的是较新或较旧的MySQL版本,请下载相应的安装包。
作者 east
Hive, 数据仓库 6月 25,2024

在大数据如何检测源数据中的脏数据和异常数据

在大数据Hive中,检测源数据中的脏数据和异常数据可以通过以下几个步骤来实现:

  1. 数据质量检查规则定义:
    首先,需要根据业务需求和数据特点,定义一系列数据质量检查规则。这些规则可以包括:
    • 字段值域检查:例如,性别字段只允许有”男”、”女”或”未知”等值。
    • 字段必填性检查:确保某些字段不能为空。
    • 字段唯一性检查:确保某些字段(如身份证号)具有唯一性。
    • 日期范围检查:确保日期字段在合理的范围内。
    • 数值范围检查:确保数值字段在合理的范围内。
    • 格式检查:例如,电话号码、邮箱地址等字段需要符合特定的格式。
  2. 使用Hive SQL和UDF进行数据质量检查:
    根据定义的数据质量检查规则,使用Hive SQL查询和用户自定义函数(UDF)来检测脏数据和异常数据。以下是一些示例:
    • 字段值域检查:SELECT * FROM your_table WHERE gender NOT IN ('男', '女', '未知');
    • 字段必填性检查:SELECT * FROM your_table WHERE name IS NULL;
    • 字段唯一性检查:SELECT id, COUNT(*) as cnt FROM your_table GROUP BY id HAVING cnt > 1;
    • 日期范围检查(假设有一个名为date_column的日期字段):SELECT * FROM your_table WHERE date_column < '2000-01-01' OR date_column > '2099-12-31';
    • 数值范围检查(假设有一个名为age的数值字段):SELECT * FROM your_table WHERE age < 0 OR age > 120;
    • 格式检查(使用正则表达式):SELECT * FROM your_table WHERE NOT (email RLIKE '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$');
  3. 创建自定义函数(UDF):
    如果需要进行复杂的格式检查或计算,可以使用Java或Python编写自定义函数(UDF),然后在Hive SQL查询中调用这些函数。
  4. 定期执行数据质量检查:
    为了确保数据的持续质量,可以定期(如每天、每周或每月)执行数据质量检查任务。这可以通过设置定时任务(如使用Cron Job)或使用调度工具(如Apache Airflow)来实现。
  5. 数据清洗和处理:
    对于检测到的脏数据和异常数据,可以采取以下措施进行处理:
    • 删除:直接删除不符合要求的数据行。
    • 修正:根据业务需求修正错误的数据。
    • 填充缺失值:对于缺失的数据,可以根据业务规则填充合适的默认值或通过插值方法进行填充。
    • 记录日志:记录检测到的脏数据和异常数据,以便后续分析和处理。
作者 east
海豚调度器 6月 24,2024

用海豚调度器api启动工作流错误排查

在海豚调度器用api启动工作流,在海豚调度器的工作流实例看到是启动失败,但在任务失例又没看到。看启动工作流的代码觉得好像没问题,一时不得其解。

后来找到海豚调度器的日志dolphinscheduler-master.log, 看到日志如下:

[ERROR] 2024-06-24 12:27:24.466 org.apache.dolphinscheduler.dao.utils.DagHelper:[101] - start node name [ads_bigdata_xxxx] is not in task node list [[TaskNode{id='tasks-71533', name='ads_bigdata_battery_warning_detail.sh', desc='null', type='SHELL', runFlag='NORMAL', loc='null', maxRetryTimes=0, retryInterval=1, params='{"rawScript":"sh cnsaas/ads_bigdata_battery_warning_detail.sh ${complement_date}","localParams":[],"resourceList":[{"res":"cnsaas/ads_bigdata_xxxx.sh","name":"ads_bigdata_xxxx.sh","id":6},{"res":"cnsaas/ads_bigdata_xxxx.sql","name":"ads_bigdata_xxxx.sql","id":102}]}', preTasks='[]', extras='null', depList=[], dependence='{}', taskInstancePriority=MEDIUM, timeout='{"enable":false,"strategy":""}', workerGroup='default'}]] 
[ERROR] 2024-06-24 12:27:24.466 org.apache.dolphinscheduler.server.master.runner.MasterExecThread:[347] - processDag is null
[INFO] 2024-06-24 12:27:24.468 org.apache.dolphinscheduler.server.master.runner.MasterExecThread:[315] - prepare process :10712 end
[ERROR] 2024-06-24 12:27:24.468 org.apache.dolphinscheduler.server.master.runner.MasterExecThread:[184] - master exec thread exception
java.lang.NullPointerException: null
	at org.apache.dolphinscheduler.dao.utils.DagHelper.parsePostNodes(DagHelper.java:284)
	at org.apache.dolphinscheduler.server.master.runner.MasterExecThread.submitPostNode(MasterExecThread.java:482)
	at org.apache.dolphinscheduler.server.master.runner.MasterExecThread.runProcess(MasterExecThread.java:832)
	at org.apache.dolphinscheduler.server.master.runner.MasterExecThread.executeProcess(MasterExecThread.java:200)
	at org.apache.dolphinscheduler.server.master.runner.MasterExecThread.run(MasterExecThread.java:181)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[ERROR] 2024-06-24 12:27:24.468 org.apache.dolphinscheduler.server.master.runner.MasterExecThread:[185] - process execute failed, process id:10712

原来启动节点名称(ads_bigdata_xxx)不在任务节点列表中。这意味着在DAG定义中,可能没有包含这个节点,或者节点的名称有误 。认真检查一下,果然是启动工作流时,任务节点的名称写得有问题,重新修改后果然正常启动工作流了。

作者 east
Flink 6月 19,2024

Apache Flink处理IoT复杂数据流程案例

使用Apache Flink处理IoT复杂数据是一项涉及多个步骤和组件的任务,包括数据接入、数据清洗、实时处理、状态管理、窗口计算、以及结果输出等。以下是一个全面且详细的Flink流处理框架,结合理论和实际应用,以处理IoT数据为主线。

1. 引入依赖和设置环境

首先,需要在你的项目中引入Flink所需的依赖。

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>
xmlCopy Code<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>

2. 创建Flink执行环境

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(4);

        // 其他环境配置...
    }
}
javaCopy Codeimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(4);

        // 其他环境配置...
    }
}

3. 数据接入

通常,IoT数据会通过Kafka或其他消息队列接入。假设使用Kafka作为数据源:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-group");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("iot-topic", new SimpleStringSchema(), properties);
        DataStream<String> input = env.addSource(kafkaConsumer);

        // 进一步处理...
    }
}




4. 数据清洗和解析

实际的IoT数据通常是JSON格式的字符串,需要进行解析和清洗:

import org.apache.flink.api.common.functions.MapFunction;
import com.fasterxml.jackson.databind.ObjectMapper;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer setup...

        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        // 进一步处理...
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;
        // 其他字段和构造方法...
    }
}
javaCopy Codeimport org.apache.flink.api.common.functions.MapFunction;
import com.fasterxml.jackson.databind.ObjectMapper;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer setup...

        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        // 进一步处理...
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;
        // 其他字段和构造方法...
    }
}

5. 定义时间窗口和处理函数

import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.util.Collector;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer and parsing setup...

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        // 进一步处理...
    }

    public static class AggregateTemperatureHumidity extends ProcessWindowFunction<IoTEvent, AggregatedResult, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<IoTEvent> elements, Collector<AggregatedResult> out) {
            double sumTemp = 0;
            double sumHumidity = 0;
            int count = 0;

            for (IoTEvent event : elements) {
                sumTemp += event.temperature;
                sumHumidity += event.humidity;
                count++;
            }

            double avgTemp = sumTemp / count;
            double avgHumidity = sumHumidity / count;

            out.collect(new AggregatedResult(key, context.window().getStart(), context.window().getEnd(), avgTemp, avgHumidity));
        }
    }

    public static class AggregatedResult {
        public String deviceId;
        public long windowStart;
        public long windowEnd;
        public double avgTemperature;
        public double avgHumidity;

        public AggregatedResult(String deviceId, long windowStart, long windowEnd, double avgTemperature, double avgHumidity) {
            this.deviceId = deviceId;
            this.windowStart = windowStart;
            this.windowEnd = windowEnd;
            this.avgTemperature = avgTemperature;
            this.avgHumidity = avgHumidity;
        }
    }
}

6. 输出结果到外部系统

处理后的数据通常需要写到数据库、文件系统或者其他外部系统。以写入到Kafka为例:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer, parsing, and processing setup...

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }
}
javaCopy Codeimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer, parsing, and processing setup...

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }
}

7. 完整代码示例

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Properties;

public class IoTDataProcessing {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-group");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("iot-topic", new SimpleStringSchema(), properties);
        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;

        // constructor, getters, setters...
    }

    public static class AggregateTemperatureHumidity extends ProcessWindowFunction<IoTEvent, AggregatedResult, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<IoTEvent> elements, Collector<AggregatedResult> out) {
            double sumTemp = 0;
            double sumHumidity = 0;
            int count = 0;

            for (IoTEvent event : elements) {
                sumTemp += event.temperature;
                sumHumidity += event.humidity;
                count++;
            }

            double avgTemp = sumTemp / count;
            double avgHumidity = sumHumidity / count;

            out.collect(new AggregatedResult(key, context.window().getStart(), context.window().getEnd(), avgTemp, avgHumidity));
        }
    }

    public static class AggregatedResult {
        public String deviceId;
        public long windowStart;
        public long windowEnd;
        public double avgTemperature;
        public double avgHumidity;

        public AggregatedResult(String deviceId, long windowStart, long windowEnd, double avgTemperature, double avgHumidity) {
            this.deviceId = deviceId;
            this.windowStart = windowStart;
            this.windowEnd = windowEnd;
            this.avgTemperature = avgTemperature;
            this.avgHumidity = avgHumidity;
        }

        @Override
        public String toString() {
            return "AggregatedResult{" +
                    "deviceId='" + deviceId + '\'' +
                    ", windowStart=" + windowStart +
                    ", windowEnd=" + windowEnd +
                    ", avgTemperature=" + avgTemperature +
                    ", avgHumidity=" + avgHumidity +
                    '}';
        }
    }
}
作者 east

上一 1 … 7 8 9 … 42 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 解决gitlab配置Webhooks,提示 Invalid url given的问题
  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?

文章归档

  • 2025年12月
  • 2025年10月
  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (495)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (37)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (8)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.