gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面26 )
flume, Spark 2月 20,2022

大数据运维一些常见批量操作命令

在使用flume过程,由于故障停止采集,堆积文件很多,想迁移到新目录,但如果文件数目太多的话,想直接用mv 命令会报错。这时我们需要利用管道技术和xargs命令。

xargs(英文全拼: eXtended ARGuments)是给命令传递参数的一个过滤器,也是组合多个命令的一个工具。

xargs 可以将管道或标准输入(stdin)数据转换成命令行参数,也能够从文件的输出中读取数据。

常用操作1:把当前目录的文件移到新的目录newdir

find . -name '2106*.json' | xargs -i mv {} ../newdir

常用操作2:把当前目录json.1后缀的批量删除

find . -name '*.json.1' | xargs -i rm -f {}

常用操作3:shell批量kill掉java进程

ps aux | grep test.jar | grep -v grep | awk '{print $2}' | xargs kill -9
作者 east
Hive, 数据仓库 2月 19,2022

Hive构建数据仓库常用的函数

concat()函数。

concat()函数用于连接字符串,在连接字符串时,只要其中一个字符串是NULL,结果就返回NULL。

concat_ws()函数。

concat_ws()函数同样用于连接字符串,在连接字符串时,只要有一个字符串不是NULL,结果就不会返回NULL。concat_ws()函数需要指定分隔符。

str_to_map()函数。

● 语法描述。str_to_map(VARCHAR text,VARCHAR listDelimiter,VARCHARkeyValueDelimiter)。

● 功能描述。使用listDelimiter将text分隔成key-value对,然后使用keyValueDelimiter分隔每个keyvalue对,并组装成MAP返回。默认listDelimiter为“,”,keyValueDelimiter为“=”。

nvl()函数

基本语法:nvl(表达式1,表达式2)。如果表达式1为空值,则nvl()函数返回表达式2的值,否则返回表达式1的值。nvl()函数的作用是把一个空值(null)转换成一个实际的值。其表达式的数据类型可以是数字型、字符型和日期型。需要注意的是,表达式1和表达式2的数据类型必须相同。

日期处理函数

1)date_format()函数(根据格式整理日期)

hive> select date_format('2020-03-18',''yyyy-MM');
hive> 2020-03

2)date_add()函数(加减日期)

hive> select date_add('2020-03-11',1);
hive> 2020-03-12

3)next_day()函数

(1)获取当前日期的下一个星期一。

hive> select next_day('2020-03-13','MO');
hive> 2020-03-16

(2)获取当前周的星期一。

hive> select date_add(next_day('2020-03-13','MO'),-7);
hive> 2020-03-11

4)last_day()函数(获取当月最后一天的日期)

作者 east
Hive 2月 19,2022

Tez运行报TezSeeion has already shutdown错误

运行Tez出现下面错误:

Caused by: org.apache.tez.dag.api.SessionNotRunning: TezSeeion has already shutdown.Application application_15xxxx  failed 2 times due to AM Container for appattempt_xxx exited with exitCode: -103 For more detailed output,check application tracking page:http://hadoop1:8088/cluster/app/application_15xxxThen,click on links to logs of each attempt.

产生原因:

这是由于Container使用过多内存而被NodeManager杀死进程

解决方法:

修改Hadoop的配置文件yarn-site.xml,增加如下配置,关掉虚拟内存检查,修改后,分发配置文件,并重启集群。

<property>
   <name>yarn.nodemanager.vmem-check-enabled</name>
   <value>false</value>
</property>
作者 east
Spark 1月 6,2022

Spark如何在生产环境调试

接手别人开发的spark程序,大概弄懂整个流程,但一些细节总是猜不透,在生产环境运行效果也达不到理想。

想去修改,遇到下面的问题:

一、由于生产环境是运行在linux服务上的,在华为HD Insight大数据平台上,在开发机不知怎样调试。

解决方式:后来发现其实在idea是可以远程调试的:

  1. 打开工程,在菜单栏中选择“Run > Edit Configurations”。
  2. 在弹出的配置窗口中用鼠标左键单击左上角的号,在下拉菜单中选择Remote,如图1所示。 图1 选择Remote

3. 选择对应要调试的源码模块路径,并配置远端调试参数Host和Port,如图2所示。
其中Host为Spark运行机器IP地址,Port为调试的端口号(确保该端口在运行机器上没被占用)。

说明: 当改变Port端口号时,For JDK1.4.x对应的调试命令也跟着改变,比如Port设置为5006,对应调试命令会变更为-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5006,这个调试命令在启动Spark程序时要用到。

4.执行以下命令,远端启动Spark运行SparkPi。 ./spark-submit –master yarn-client –driver-java-options “-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5006” –class org.apache.spark.examples.SparkPi /opt/FI-Client/Spark2x/spark/examples/jars/spark-examples_2.11-2.1.0.jar 用户调试时需要把–class和jar包换成自己的程序,-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5006需要换成3获取到的For JDK1.4.x对应的调试命令。

5.设置调试断点。 在IDEA代码编辑窗口左侧空白处单击鼠标左键设置相应代码行断点,如图4所示,在SparkPi.scala的29行设置断点。

6.启动调试。 在IDEA菜单栏中选择“Run > Debug ‘Unnamed’”开启调试窗口,接着开始SparkPi的调试,比如单步调试、查看调用栈、跟踪变量值等,如图5所示。

二、在spark executor执行的,如果看调试结果?

解决方式是在相应的rdd加上collect()方法,把结果传送到driver来看

作者 east
Java 12月 14,2021

Java多线程使用总结

Java线程的3种创建方式,使用继承方式的好处是方便传参,你可以在子类里面添加成员变量,通过set方法设置参数或者通过构造函数进行传递,而如果使用Runnable方式,则只能使用主线程里面被声明为final的变量。不好的地方是Java不支持多继承,如果继承了Thread类,那么子类不能再继承其他类,而Runable则没有这个限制。前两种方式都没办法拿到任务的返回结果,但是Futuretask方式可以。

sleep与yield方法的区别在于,当线程调用sleep方法时调用线程会被阻塞挂起指定的时间,在这期间线程调度器不会去调度该线程。而调用yield方法时,线程只是让出自己剩余的时间片,并没有被阻塞挂起,而是处于就绪状态,线程调度器下一次调度时就有可能调度到当前线程执行。

守护线程并非只有虚拟机内部可以提供,用户也可以手动将一个用户线程设定/转换为守护线程。在Thread类中提供了一个setDaemon(true)方法来将一个普通的线程(用户线程)设置为守护线

public final void setDaemon(boolean on);

如果你希望在主线程结束后JVM进程马上结束,那么在创建线程时可以将其设置为守护线程,如果你希望在主线程结束后子线程继续工作,等子线程结束后再让JVM进程结束,那么就将子线程设置为用户线程。

作者 east
Java, 数据库 12月 14,2021

华为大数据平台FusionInsight HD Redis批量删除key方法

spark程序突然跑不起来,排查后发现原来是内存满了。(可以通过redis客户端的 info memory命令查看)

./redis-cli -h 127.0.0.1 -p 6379
info memory

网上查到的批量方案

./redis-cli -h 127.0.0.1 -p 6379 keys "mykeys*" | xargs ./redis-cli -h 127.0.0.1 -p 6379 del

运行后报错。好像在华为的FusionInsight HD集群这种方案行不通。

后来通过阅读源码,在ClusterUtil的类发现可以批量删除key的方法。

  public void batchDelete(String pattern, int tryTimes)
  {
    if (tryTimes <= 0) {
      throw new IllegalArgumentException("tryTimes must be greater than or equal to 0");
    }
    ScanParams scanRarams = new ScanParams().match(pattern).count(1000);
    Set<JedisPool> pools = this.jedisCluster.getServingNodes();
    CountDownLatch latch = new CountDownLatch(pools.size() * tryTimes);
    try
    {
      for (int i = 0; i < tryTimes; i++) {
        for (JedisPool jedisPool : pools) {
          this.threadPool.submit(new DelRunnable(jedisPool, scanRarams, latch));
        }
      }
      latch.await();
    }
    catch (InterruptedException e)
    {
      throw new JedisException(e);
    }
  }

作者 east
shardingsphere 12月 10,2021

Shardingsphere 4.0.0-RC1使用遇到的坑

1、使用pagehelper后跨表查询奇慢无比

shardingsphere后面的版本还没试过,在目前版本时,结合mybatis + pagehelper组件是有存在问题的。在
Shardingsphere 4.0.0-RC1和pagehelper分页时报错解决 一文解决了冲突,没想到后来又发现新的问题,在进行分表时,每个表数据几百万,查询跨几个表时,发现要等几分钟才出来结果。这样比单库查询加索引查还慢,一时很崩溃,本来分库分表为了提高速度,现在反而速度更慢了。后来在查询前把

PageHelper.startPage(pageNum, pageSize);

注释掉,刷刷一下子出来结果了。看来如果要实现分页还是自己实现。

2、查询语句太复杂,不走分片规则,进行全库查询

原来sql语句片段如下:

(VALUE1 LIKE concat(#(value,jdbcType=VARCHAR},%) AND TYPE = 1) OR  
(VALUE2 LIKE concat(#(value,jdbcType=VARCHAR},%) AND TYPE = 2) OR
(VALUE2 LIKE concat(#(value,jdbcType=VARCHAR},%) AND TYPE = 3)

没有调用到分片规则,后来把sql改简单后就正常了。例如下面的

TYPE != 4 AND VALUE1 LIKE concat(#(value,jdbcType=VARCHAR},%) OR VALUE2 LIKE concat(#(value,jdbcType=VARCHAR},%)

作者 east
Spark 12月 6,2021

Spark Streaming多个输入流

由于业务需要,一个地方部署1个Spark Streaming程序,由于业务扩展部署了多个地方,导致大数据平台的yarn资源不足了,CPU和内存经常是100%的。而且多套只是配置不同的程序,一旦有修改,维护起来也不方便。于是想到提升Spark Streaming的并行度,同时接收多个Dstream的输入。

通过网络接收数据(如Kafka、Flume、套接字等)需要将数据反序列化并存储在Spark上,如果数据接收成为系统中的瓶颈,则需要并行接收数据。主要通过提升Receiver的并发度和调整Receiver的RDD数据分区时间隔。提升Receiver的并发度:在Worker节点上对每个输入DStream创建一个Receiver并运行,以接收一个数据流。通过创建多个输入DStream并配置从数据源接收不同分区的数据流,从而实现接收多数据流。例如,一个单Kafka输入DStream接收两个主题的数据,可以分成两个Kafka的输入流,每个仅仅接收一个主题。输入DStream运行在两个Worker节点的接收器上,从而能够并行接受并行,提高整体的吞吐量。多DStream可以通过联合(union)在一起从而创建一个DStream,这样一些应用在一个输入DStream的转换操作便可以用在联合后的DStream上。

JavaDstream<string> sources1=ssc.receiverstream(new JavacustomReceiver2(ip1, port, StorageLevel.MEMORY_ONLY-2()));

JavaDStream<String> sources2 = ssc.receiverStream(new JavaCustomReceiver2(ip2, port, StorageLevel.MEMORY_ONLY-2()));
JavaDStream<String> sources3 = ssc.receiverstream(new JavaCustomreceiver2(whip, port, StorageLeve1.MEMORY_ONLY-2()));

Javadstream<string> sources3 = ssc.socketTextstream(ip3, port, storagetevel.MEMORY ONLY2())); 
JavaDStream<String> sources = sources1.union(sources2).union(sources3);
作者 east
shardingsphere 11月 28,2021

Shardingsphere 4.0.0-RC1和pagehelper分页时报错解决

项目架构使用Shardingsphere和pagehelper架构,对一个复杂sql语句,运行后报错“Must have sharding column with subquery. "

原来 PageHelper里面有个机制是,当解析的sql比较复杂的时候,会加上别名,而Sharding-jdbc执行这个带有别名的sql会报错。如果不用pageHelper,自己来分页是可以避免这个问题。但这样做比较麻烦。

解决办法是在另加一个XXX_COUNT的sql,不要让PageHelper给原始sql加上别名。 官网的做法解释如下:

增加 countSuffix count 查询后缀配置参数,该参数是针对 PageInterceptor 配置的,默认值为 _COUNT。

分页插件会优先通过当前查询的 msId + countSuffix 查找手写的分页查询。

如果存在就使用手写的 count 查询,如果不存在,仍然使用之前的方式自动创建 count 查询。

例如,如果存在下面两个查询:

<select id="selectLeftjoin" resultType="com.github.pagehelper.model.User">     select a.id,b.name,a.py from user a     left join user b on a.id = b.id     order by a.id
</select>

<select id="selectLeftjoin_COUNT" resultType="Long">
select count(distinct a.id) from user a left join user b on a.id = b.id
</select>

上面的 countSuffix 使用的默认值 _COUNT,分页插件会自动获取到 selectLeftjoin_COUNT 查询,这个查询需要自己保证结果数正确。

返回值的类型必须是resultType="Long",入参使用的和 selectLeftjoin 查询相同的参数,所以在 SQL 中要按照 selectLeftjoin 的入参来使用。

因为 selectLeftjoin_COUNT 方法是自动调用的,所以不需要在接口提供相应的方法,如果需要单独调用,也可以提供。

作者 east
Java 11月 25,2021

优化后台接口经验总结

开发一个后台接口不难,可能花几个小时就能跑通;而做一个好用的接口,可能要花几天时间精雕细磨。尤其是业务复杂,数据量大的,如果没有优化导致速度很慢,用户根本没耐心等。

下面谈谈优化思路:

1、串行操作改为并发操作。如果接口有串行操作,而且其中一些操作是比较耗时的,而且它们的操作没有因果关系需要等待前面的结果,那么可以把这些操作改为并发线程去操作。使用多线程遇到坑会多一些,例如线程池使用FutureTask时如果把拒绝策略设置为DiscardPolicy和DiscardOldestPolicy,并且在被拒绝的任务的Future对象上调用了无参get方法,那么调用线程会一直被阻塞。在日常开发中尽量使用带超时参数的get方法以避免线程一直阻塞。

2、对一些常用的可复用的数据库查询加上缓存。做一个系统需要经常查询点位信息,这些点位信息不会经常变,对相关的数据库查询加上缓存。如果用mybatis,可以考虑直接在xml上添加,或者利用redis进行添加。

3、数据的优化,如果单表数据过大,可以考虑进行分库分表。

4、对查询慢的语句,考虑加上相关的索引。频繁单个查询或插入,考虑是否能改为批量操作。对一些频繁查询可复用的,可以考虑一次批量查询出来并缓存起来。

5、考虑是否可以后台先计算出中间结果或最终结果,用户查询时不用从头开始计算。在接口查询计算过程,如果有频繁重复计算的,可以考虑采用备忘录算法。

作者 east
Java, 运维 11月 7,2021

一次诡异系统变慢排查

给客户上线了系统(Spring Cloud、微服务,centos上运行),运行1、2年后,客户投诉系统很慢。

自己打开系统,刚开始很快,用一些时间就变慢。看前端请求的接口,是挂起状态,有的要几分钟才有结果。

检查服务器内存和CPU。这是引起系统慢常见的问题,发现这2方面改善后还是变慢,后来干脆重启服务器,依然无解。

检查磁盘坏道。之前用电脑时,如果磁盘有坏道,如果有写操作,有时也会为坏道。用centos检查坏道的命令也没发现。

检查接口代理。由于是前后端分离,用户在前端的请求,都经过第三方代理。如果直接测后台接口是很快,但通过前端访问就变慢了,于是怀疑是第三方代理搞的鬼。于是咨询第三方代理,第三方代理说他们服务的客户,都没有发现这种情况,建议我们排查网络。

检查前端。由于后来又上线几个类似的系统,前端基本一样的,没有安装新的插件。所以觉得可能性不大。

检查浏览器。网上有的说是chrome浏览器早期的bug,如果是已经请求过的接口,会复用之前的。想找不同浏览器或更新到最新chrome的。但几个一样的系统,用同样浏览器也没有变慢,也解释不通。

终极答案。经反复排查,最后发现是前端有个页面每几秒钟请求1次,而请求相关的数据库年长月久数据很多,数据库不能及时响应请求。后来根据业务需要改成几分钟请求1次,果然这个诡异问题没再出现。

作者 east
flume 7月 10,2021

Flume对接数据遇到的坑

业务是这样的:别的地方服务器通过ftp传来一些压缩包,对压缩包进行解压,然后flume进行采集发到kafka,spark Streaming进行处理。

进行解压的脚本代码如下:

#/bin/bash
end = "${dirname "$0"}"/"$1"
source = "${dirname "$0"}"/"$2"
final =  "${dirname "$0"}"/"$3"
while [2 -gt 1 ]
do
   for i in 'ls ${source}‘
   do
   if [[ ${i} != "" && ${i} != *.tmp ]]
   then
       unzip ${source}/${i} -d ${end}/
       mv -f ${source}/${i} ${final}
   fi
done
sleep 5

原本一直持续能解压文件,最近出现停止。

执行命令时,还出现提示是否覆盖。这时才明白可能是这个原因导致脚本没能顺利执行。为了不提示是否覆盖,可以加参数 -o。修改命令如下:

unzip -o ${source}/${i} -d ${end}/

作者 east

上一 1 … 25 26 27 … 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?
  • 如何进行AUTOSAR模块的持续集成(CI)部署与版本控制?

文章归档

  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (42)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (484)
    • CDH (6)
    • datax (4)
    • doris (28)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (9)
    • 运维 (33)
      • Docker (2)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.