gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单

分类归档flume

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "flume"
flume 2月 21,2021

Flume案例研究:接收Twitter数据


问题陈述
在此案例研究中,将flume代理配置为从Twitter检索数据。我们知道,Twitter是巨大的数据来源,具有人们的意见和偏好。数据可用于分析舆论或对特定主题或产品进行评论。基于推文数据和位置可以进行各种类型的分析。来自flume的数据可用于通过Streaming API使用Apache Spark进行实时处理。 Spark Streaming用于使用各种数据源(例如Kafka,Flume或TCP套接字)处理实时数据。它还支持Twitter流API。通过使用Flume,我们可以构建一个容错系统,该系统提供实时数据并将数据的副本保存在所需的位置。 Spark还内置了机器学习算法,可以使分析更快,更可靠且具有容错能力。

这样,我们可以使用Spark实时获取所需的结果,并将数据存储在数据库中,以便使用Hadoop进行更深入的分析。现在,我们构建一个简单的flume代理,该代理具有Twitter源和接收器,Spark可通过接收器进行数据检索。为了防止数据丢失,我们将使用自定义接收器构建flume代理。即使spark产生故障,由于数据传输中的事务处理功能,数据仍保留在通道中。

拟议的解决方案
现在,必须为我们架构中的各种重要组件设置配置。这样的组件之一就是被配置为从Twitter读取数据的源。源为“ source_read”的flume代理“ agent1”已配置为自定义类型源。为了访问数据,通过注册应用程序,twitter提供了凭证,用户可以使用凭证来检索数据。如果我们需要包含这些单词的特定推文,我们也可以设置关键字。在对特定主题或产品进行分析时,这非常有用。

Cloudera提供了必须包含在Flume类路径中的jar文件才能访问这些类。可以通过在“ flume-env.sh”配置文件中添加jar的路径来完成。如果需要设置其他参数(例如代理),则必须使用源代码重新构建jar。

agent1.sources.source_read.type =
    com.cloudera.flume.source.TwitterSource
agent1.sources.source_read.channels = MemChannel
agent1.sources.source_read.consumerKey = 
agent1.sources.source_read.consumerSecret = 
agent1.sources.source_read.accessToken = 
agent1.sources.source_read.accessTokenSecret = 
agent1.sources.source_read.keywords = hadoop

channel的配置如下:

agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100

使用参数“ type”传递自定义sink的代码。 代码的jar文件必须添加到flume类路径中。 定义了spark的IP地址和端口。

agent1.sinks = spark_dump
agent1.sinks.spark_dump.type = org.apache.spark.streaming.flume.sink.SparkSink agent1.sinks.spark_dump.hostname =
agent1.sinks.spark_dump.port =
agent1.sinks.spark_dump.channel = memory1

启动flume:

$ bin/flume-ng agent -n $agent_name -c conf -f
    conf/flume-conf.properties.template
作者 east
flume 2月 20,2021

Flume处理Spooling Directory Source数据太慢优化

一个数据采集处理系统,架构如下:

日志数据 -> flume -> kafka -> Spark Streaming。

flume到kafka的数据处理时间是1秒多;而spark Streaming的数据处理时间是十几毫秒。

flume方面:

flume数量不够:增加日志服务器以增加并行度;

(1)自身:增加内存flume-env.sh 4-6g

-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

(2)找朋友:多个目录,多个spooling directory source同时采集

(3)taildir source的batchsize修改为合适的值

( 4 ) flume要读取文件夹如果文件太多,要按最新或最早顺序读取时,会很影响速度。

# batchsize是每次处理的数据条数越高,处理的数据越多,延迟越高。

kafka数据积压的问题,主要的解决办法是:

(1)增加Kafka对应的分区数(比如:期望处理数据的总吞吐量是100M/s。但是实际最多每个分区的生产能力和消费能力的最小值是20M/s,那么我们就需要设置5个或者6个分区),2)要求下一级消费者配套增加CPU核数,动态增加Kafka服务器集群。

(2)kafka ack设成0(ack有0有1有-1。0的可靠性最差,但是速度最快)

注:ack有3个可选值,分别是1,0,-1。

ack=1,简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。

ack=0,简单来说就是,producer发送一次就不再发送了,不管是否发送成功。

ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。

(3) a1.channels.c1.type = memory memory类型可能会丢失数据,但是速度最快。

作者 east
bug清单, flume 2月 20,2021

Flume Spooling Directory Source 采集NullPointerException

采用flume的Spooling Directory Source,突然遇到下面的错误

(org.apache.flume.source.SpoolDirectoryExtSource2$SpoolDirectoryRunnable.run:277)  - FATAL: Spool Directory source source1: { spoolDir: /home/work/local/log }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.NullPointerException
    at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:159)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
    at org.apache.flume.serialization.DurablePositionTracker.initReader(DurablePositionTracker.java:171)
    at org.apache.flume.serialization.DurablePositionTracker.<init>(DurablePositionTracker.java:158)
    at org.apache.flume.serialization.DurablePositionTracker.getInstance(DurablePositionTracker.java:76)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventExtReader2.openFile(ReliableSpoolingFileEventExtReader2.java:561)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventExtReader2.getNextFile(ReliableSpoolingFileEventExtReader2.java:511)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventExtReader2.readEvents(ReliableSpoolingFileEventExtReader2.java:264)
    at org.apache.flume.source.SpoolDirectoryExtSource2$SpoolDirectoryRunnable.run(SpoolDirectoryExtSource2.java:252)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

重启flume 后发现问题依旧。试验把flume 采集的日志放到新目录,重启flume采集新目录发现没问题了。看来是旧目录/home/work/local/log 有文件变动造成的。

例如 有.flumespool隐藏目录,该目录下有文件.flumespool-main.meta隐藏文件,用来记录flume读取文件的位置,发现该记录停止在flume出问题的时间。再查看其它正常运行的机器的相同路径及文件,并没有发现该文件,于是将该文件移到其它目录下,重启flume,此时发现flume成功运行!

作者 east

标签

flex布局 github mysql O2O UI控件 不含后台 交流 体育 共享经济 出行 单机类 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 旅游 日历 时钟 流量主 物流 用户系统 电商 画图 画布(canvas) 社交 签到 算命 联网 装修 解锁 评论 读书 读音 资讯 阅读 预订

关注公众号“康波之道”回复“小程序”获取1000个小程序打包源码

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 微信小程序语音识别、语音合成(微信同声传译)使用代码实例
  • 切换城市微信小程序代码
  • 辩论倒计时微信小程序源码
  • 东航订机票微信小程序源码
  • 仿车源宝微信小程序代码
  • 语音跟读微信小程序代码
  • 各国货币汇率微信小程序源代码
  • 仿小红书购物推荐微信小程序
  • 带富文本解析折线图的财经微信小程序
  • 摇一摇切换文章微信小程序代码

文章归档

  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (34)
  • bug清单 (63)
  • Fuchsia (15)
  • php (2)
  • python (6)
  • 人工智能 (4)
  • 大数据开发 (160)
    • Elasticsearch (12)
    • Flink (9)
    • flume (3)
    • Hadoop (11)
    • Hbase (12)
    • Hive (4)
    • Java (31)
    • Kafka (3)
    • shardingsphere (3)
    • solr (2)
    • Spark (47)
    • spring (8)
    • 数据仓库 (1)
    • 数据挖掘 (5)
    • 运维 (8)
  • 小游戏代码 (1)
  • 小程序代码 (133)
    • O2O (16)
    • UI控件 (4)
    • 互联网类 (22)
    • 企业类 (5)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (23)
    • 电商类 (21)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 开发博客 (6)
  • 技术架构 (4)
  • 数据库 (2)
  • 未分类 (5)
  • 程序员网赚 (1)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.