gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档flume

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "flume"
flume 3月 19,2022

Logstash和flume全方位对比

Logstash架构如下:

Flume架构如下:



在这里插入图片描述

首先从结构对比,我们会惊人的发现,两者是多么的相似!Logstash的Shipper、Broker、Indexer分别和Flume的Source、Channel、Sink各自对应!只不过是Logstash集成了,Broker可以不需要,而Flume需要单独配置,且缺一不可,但这再一次说明了计算机的设计思想都是通用的!只是实现方式会不同而已。

从程序员的角度来说,上文也提到过了,Flume是真的很繁琐,你需要分别作source、channel、sink的手工配置,而且涉及到复杂的数据采集环境,你可能还要做多个配置,这在上面提过了,反过来说Logstash的配置就非常简洁清晰,三个部分的属性都定义好了,程序员自己去选择就行,就算没有,也可以自行开发插件,非常方便。当然了,Flume的插件也很多,但Channel就只有内存和文件这两种(其实现在不止了,但常用的也就两种)。读者可以看得出来,两者其实配置都是非常灵活的,只不过看场景取舍罢了。

其实从作者和历史背景来看,两者最初的设计目的就不太一样。Flume本身最初设计的目的是为了把数据传入HDFS中(并不是为了采集日志而设计,这和Logstash有根本的区别),所以理所应当侧重于数据的传输,程序员要非常清楚整个数据的路由,并且比Logstash还多了一个可靠性策略,上文中的channel就是用于持久化目的,数据除非确认传输到下一位置了,否则不会删除,这一步是通过事务来控制的,这样的设计使得可靠性非常好。相反,Logstash则明显侧重对数据的预处理,因为日志的字段需要大量的预处理,为解析做铺垫。

为什么先讲Logstash然后讲Flume?这里面有几个考虑,

其一:Logstash其实更有点像通用的模型,所以对新人来说理解起来更简单,而Flume这样轻量级的线程,可能有一定的计算机编程基础理解起来更好;

其二:目前大部分的情况下,Logstash用的更加多,这个数据我自己没有统计过,但是根据经验判断,Logstash可以和ELK其他组件配合使用,开发、应用都会简单很多,技术成熟,使用场景广泛。相反Flume组件就需要和其他很多工具配合使用,场景的针对性会比较强,更不用提Flume的配置过于繁琐复杂了。

作者 east
flume, Spark 2月 20,2022

大数据运维一些常见批量操作命令

在使用flume过程,由于故障停止采集,堆积文件很多,想迁移到新目录,但如果文件数目太多的话,想直接用mv 命令会报错。这时我们需要利用管道技术和xargs命令。

xargs(英文全拼: eXtended ARGuments)是给命令传递参数的一个过滤器,也是组合多个命令的一个工具。

xargs 可以将管道或标准输入(stdin)数据转换成命令行参数,也能够从文件的输出中读取数据。

常用操作1:把当前目录的文件移到新的目录newdir

find . -name '2106*.json' | xargs -i mv {} ../newdir

常用操作2:把当前目录json.1后缀的批量删除

find . -name '*.json.1' | xargs -i rm -f {}

常用操作3:shell批量kill掉java进程

ps aux | grep test.jar | grep -v grep | awk '{print $2}' | xargs kill -9
作者 east
flume 7月 10,2021

Flume对接数据遇到的坑

业务是这样的:别的地方服务器通过ftp传来一些压缩包,对压缩包进行解压,然后flume进行采集发到kafka,spark Streaming进行处理。

进行解压的脚本代码如下:

#/bin/bash
end = "${dirname "$0"}"/"$1"
source = "${dirname "$0"}"/"$2"
final =  "${dirname "$0"}"/"$3"
while [2 -gt 1 ]
do
   for i in 'ls ${source}‘
   do
   if [[ ${i} != "" && ${i} != *.tmp ]]
   then
       unzip ${source}/${i} -d ${end}/
       mv -f ${source}/${i} ${final}
   fi
done
sleep 5

原本一直持续能解压文件,最近出现停止。

执行命令时,还出现提示是否覆盖。这时才明白可能是这个原因导致脚本没能顺利执行。为了不提示是否覆盖,可以加参数 -o。修改命令如下:

unzip -o ${source}/${i} -d ${end}/

作者 east
bug清单, flume 6月 13,2021

Flume pollDelay设置不正确停止采集

使用FusionInsight HD Flume从本地采集静态日志( Spooling Source )保存到Kafka,由于采集堆积太多了,flume配置参数做了一些修改。后来发现一个诡异问题:每次重启flume采集,只采集1、2个文件就停止采集了,也没报什么错误。

采用对比法排查问题,对比正常运行的flume配置,看到pollDelay跟之前的不同。才想起之前一顿三百五的操作:想加快速度。pollDelay的设置值从5000改成500。

采集方案采用的Spooling Source + Memory Channel + kfaka

Spooling Source常用配置 :

Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如下表所示:

Kafka Sink将数据写入到Kafka中。常用配置如下表所示:

参考配置如下:

a1.channels = c1
a1.sources = s1
a1.sinks = sink1

a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
a1.sources.s1.spoolDir = /home/ftp (填写实际的路径)
a1.sources.s1.bufferMaxLineLength = 1073741824
a1.sources.s1.pollDelay = 5000
a1.sources.s1.consumeOrder = random

a1.channels.c1.type = memory
a1.channels.c1.capacity = 30000
a1.channels.c1.tansactionCapacity = 30000

a1.sinks.sink1.channel = c1
a1.sinks.sink1.type = org.apache.kafka.kafkaSink
a1.sinks.sink1.bootstrap.servers=192.168.1.1:210007  (根据实际填写)
a1.sinks.sink1.topic = mytopic (根据实际填写)
a1.sinks.sink1.batchSize = 200
a1.sinks.sink1.producer.requiredAcks = 1



作者 east
flume 2月 21,2021

Flume案例研究:接收Twitter数据


问题陈述
在此案例研究中,将flume代理配置为从Twitter检索数据。我们知道,Twitter是巨大的数据来源,具有人们的意见和偏好。数据可用于分析舆论或对特定主题或产品进行评论。基于推文数据和位置可以进行各种类型的分析。来自flume的数据可用于通过Streaming API使用Apache Spark进行实时处理。 Spark Streaming用于使用各种数据源(例如Kafka,Flume或TCP套接字)处理实时数据。它还支持Twitter流API。通过使用Flume,我们可以构建一个容错系统,该系统提供实时数据并将数据的副本保存在所需的位置。 Spark还内置了机器学习算法,可以使分析更快,更可靠且具有容错能力。

这样,我们可以使用Spark实时获取所需的结果,并将数据存储在数据库中,以便使用Hadoop进行更深入的分析。现在,我们构建一个简单的flume代理,该代理具有Twitter源和接收器,Spark可通过接收器进行数据检索。为了防止数据丢失,我们将使用自定义接收器构建flume代理。即使spark产生故障,由于数据传输中的事务处理功能,数据仍保留在通道中。

拟议的解决方案
现在,必须为我们架构中的各种重要组件设置配置。这样的组件之一就是被配置为从Twitter读取数据的源。源为“ source_read”的flume代理“ agent1”已配置为自定义类型源。为了访问数据,通过注册应用程序,twitter提供了凭证,用户可以使用凭证来检索数据。如果我们需要包含这些单词的特定推文,我们也可以设置关键字。在对特定主题或产品进行分析时,这非常有用。

Cloudera提供了必须包含在Flume类路径中的jar文件才能访问这些类。可以通过在“ flume-env.sh”配置文件中添加jar的路径来完成。如果需要设置其他参数(例如代理),则必须使用源代码重新构建jar。

agent1.sources.source_read.type =
    com.cloudera.flume.source.TwitterSource
agent1.sources.source_read.channels = MemChannel
agent1.sources.source_read.consumerKey = 
agent1.sources.source_read.consumerSecret = 
agent1.sources.source_read.accessToken = 
agent1.sources.source_read.accessTokenSecret = 
agent1.sources.source_read.keywords = hadoop

channel的配置如下:

agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100

使用参数“ type”传递自定义sink的代码。 代码的jar文件必须添加到flume类路径中。 定义了spark的IP地址和端口。

agent1.sinks = spark_dump
agent1.sinks.spark_dump.type = org.apache.spark.streaming.flume.sink.SparkSink agent1.sinks.spark_dump.hostname =
agent1.sinks.spark_dump.port =
agent1.sinks.spark_dump.channel = memory1

启动flume:

$ bin/flume-ng agent -n $agent_name -c conf -f
    conf/flume-conf.properties.template
作者 east
flume 2月 20,2021

Flume处理Spooling Directory Source数据太慢优化

一个数据采集处理系统,架构如下:

日志数据 -> flume -> kafka -> Spark Streaming。

flume到kafka的数据处理时间是1秒多;而spark Streaming的数据处理时间是十几毫秒。

flume方面:

flume数量不够:增加日志服务器以增加并行度;

(1)自身:增加内存flume-env.sh 4-6g

-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

(2)找朋友:多个目录,多个spooling directory source同时采集

(3)taildir source的batchsize修改为合适的值

( 4 ) flume要读取文件夹如果文件太多,要按最新或最早顺序读取时,会很影响速度。

# batchsize是每次处理的数据条数越高,处理的数据越多,延迟越高。

kafka数据积压的问题,主要的解决办法是:

(1)增加Kafka对应的分区数(比如:期望处理数据的总吞吐量是100M/s。但是实际最多每个分区的生产能力和消费能力的最小值是20M/s,那么我们就需要设置5个或者6个分区),2)要求下一级消费者配套增加CPU核数,动态增加Kafka服务器集群。

(2)kafka ack设成0(ack有0有1有-1。0的可靠性最差,但是速度最快)

注:ack有3个可选值,分别是1,0,-1。

ack=1,简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。

ack=0,简单来说就是,producer发送一次就不再发送了,不管是否发送成功。

ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。

(3) a1.channels.c1.type = memory memory类型可能会丢失数据,但是速度最快。

作者 east
bug清单, flume 2月 20,2021

Flume Spooling Directory Source 采集NullPointerException

采用flume的Spooling Directory Source,突然遇到下面的错误

(org.apache.flume.source.SpoolDirectoryExtSource2$SpoolDirectoryRunnable.run:277)  - FATAL: Spool Directory source source1: { spoolDir: /home/work/local/log }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.NullPointerException
    at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:159)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
    at org.apache.flume.serialization.DurablePositionTracker.initReader(DurablePositionTracker.java:171)
    at org.apache.flume.serialization.DurablePositionTracker.<init>(DurablePositionTracker.java:158)
    at org.apache.flume.serialization.DurablePositionTracker.getInstance(DurablePositionTracker.java:76)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventExtReader2.openFile(ReliableSpoolingFileEventExtReader2.java:561)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventExtReader2.getNextFile(ReliableSpoolingFileEventExtReader2.java:511)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventExtReader2.readEvents(ReliableSpoolingFileEventExtReader2.java:264)
    at org.apache.flume.source.SpoolDirectoryExtSource2$SpoolDirectoryRunnable.run(SpoolDirectoryExtSource2.java:252)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

重启flume 后发现问题依旧。试验把flume 采集的日志放到新目录,重启flume采集新目录发现没问题了。看来是旧目录/home/work/local/log 有文件变动造成的。

例如 有.flumespool隐藏目录,该目录下有文件.flumespool-main.meta隐藏文件,用来记录flume读取文件的位置,发现该记录停止在flume出问题的时间。再查看其它正常运行的机器的相同路径及文件,并没有发现该文件,于是将该文件移到其它目录下,重启flume,此时发现flume成功运行!

作者 east

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.