gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面25 )
Hadoop, Spark 3月 23,2022

企业生产环境考虑:Spark 全方位对比 Hadoop MapReduce

Apache Spark 与 Hadoop MapReduce 的五个主要区别:
1、Apache Spark 可能比 Hadoop MapReduce 快 100 倍。
2、Apache Spark 使用 内存,并且不依赖于 Hadoop 的两阶段范式。
3、Apache Spark 适用于可以全部放入服务器 内存 的较小数据集。
4、Hadoop 处理海量数据集更具成本效益。
5、Apache Spark 现在比 Hadoop MapReduce 更受欢迎。
多年来,Hadoop 一直是大数据无可争议的首选——直到 Spark 出现。自 2014 年首次发布以来,Apache Spark 一直在点燃大数据世界。凭借 Spark 便捷的 API 和承诺的速度比 Hadoop MapReduce 快 100 倍,一些分析人士认为,Spark 标志着大数据新时代的到来。

Spark 是一个开源数据处理框架,如何能够如此快速地处理所有这些信息?秘诀在于 Spark 在集群上运行在内存中,它不依赖于 Hadoop 的 MapReduce 两阶段范式。这使得重复访问相同数据的速度更快。 Spark 可以作为独立应用程序运行,也可以在 Hadoop YARN 之上运行,它可以直接从 HDFS 读取数据。雅虎、英特尔、百度、Yelp 和 Zillow 等数十家主要科技公司已经将 Spark 作为其技术堆栈的一部分。

虽然 Spark 似乎注定要取代 Hadoop MapReduce,但您现在还不应该指望 MapReduce。在这篇文章中,我们将比较这两个平台,看看 Spark 是否真的非常有优势。

什么是 Apache Spark?
Apache Spark 是“用于大规模数据处理的统一分析引擎”。 Spark 由非营利性的 Apache Software Foundation 维护,该基金会已经发布了数百个开源软件项目。自项目启动以来,已有 1200 多名开发人员为 Spark 做出了贡献。

Spark 最初是在加州大学伯克利分校的 AMPLab 开发的,于 2010 年首次作为开源项目发布。Spark 使用 Hadoop MapReduce 分布式计算框架作为其基础。 Spark 旨在改进 MapReduce 项目的几个方面,例如性能和易用性,同时保留 MapReduce 的许多优点。

Spark 包括一个核心数据处理引擎,以及用于 SQL、机器学习和流处理的库。凭借适用于 Java、Scala、Python 和 R 的 API,Spark 在开发人员中享有广泛的吸引力——为其赢得了大数据处理领域“瑞士军刀”的美誉。

什么是 Hadoop MapReduce?
Hadoop MapReduce 将自己描述为“一个用于轻松编写应用程序的软件框架,该应用程序以可靠、容错的方式在大型商用硬件集群(数千个节点)上并行处理大量数据(多 TB 数据集)。”

MapReduce 范式由两个顺序任务组成:Map 和 Reduce(因此得名)。 Map 过滤和排序数据,同时将其转换为键值对。然后,Reduce 接受此输入并通过对数据集执行某种汇总操作来减小其大小。

MapReduce 可以通过分解大型数据集并并行处理它们来极大地加速大数据任务。 MapReduce 范式由 Google 员工 Jeff Dean 和 Sanjay Ghemawat 于 2004 年首次提出;后来它被整合到 Apache 的 Hadoop 框架中以进行分布式处理。

Spark 和 MapReduce 的区别
Apache Spark 和 Hadoop MapReduce 之间的主要区别是:

>性能
>易于使用
>数据处理
>安全
然而,Spark 和 MapReduce 之间也有一些相似之处——这并不奇怪,因为 Spark 使用 MapReduce 作为其基础。 Spark 和 MapReduce 的相似点包括:

>成本
>兼容性
>容错
下面,我们将在每个部分详细介绍 Spark 和 MapReduce 之间的差异(以及相似之处)。

Spark VS MapReduce:性能
Apache Spark 在随机存取存储器 (RAM) 中处理数据,而 Hadoop MapReduce 在执行映射或归约操作后将数据持久化回磁盘。那么理论上,Spark 的性能应该优于 Hadoop MapReduce。尽管如此,Spark 需要大量内存。与标准数据库非常相似,Spark 将进程加载到内存中并保留在那里,直到进一步通知以进行缓存。如果您在 Hadoop YARN 上运行 Spark 和其他需要资源的服务,或者如果数据太大而无法完全放入内存,那么 Spark 可能会遭受严重的性能下降。

MapReduce 会在作业完成后立即终止其进程,因此它可以轻松地与性能差异很小的其他服务一起运行。

对于需要多次传递相同数据的迭代计算,Spark 具有优势。但是,当涉及到类似 ETL 的一次性作业时——例如,数据转换或数据集成——这正是 MapReduce 的设计目的。

小结:当所有数据都适合内存时,Spark 性能更好,尤其是在专用集群上。 Hadoop MapReduce 专为无法放入内存的数据而设计,并且可以与其他服务一起很好地运行。

Spark VS Hadoop MapReduce:易用性
Spark 为 Java、Scala 和 Python 提供了预构建的 API,还包括用于 SQL 的 Spark SQL(以前称为 Shark)。由于 Spark 的简单构建块,编写用户定义的函数很容易。 Spark 甚至包括用于运行命令并立即反馈的交互模式。

MapReduce 是用 Java 编写的,并且非常难以编程。 Apache Pig 让它变得更容易(尽管它需要一些时间来学习语法),而 Apache Hive 则增加了 SQL 兼容性。一些 Hadoop 工具也可以在没有任何编程的情况下运行 MapReduce 作业。

此外,虽然 Hive 包含命令行界面,但 MapReduce 没有交互模式。 Apache Impala 和 Apache Tez 等项目希望将完整的交互式查询引入 Hadoop。

在安装和维护方面,Spark 不受 Hadoop 的约束。 Spark 和 Hadoop MapReduce 都包含在 Hortonworks (HDP 3.1) 和 Cloudera (CDH 5.13) 的发行版中。

小结:Spark 更易于编程,并且包含交互模式。 Hadoop MapReduce 更难编程,但有几个工具可以使它更容易。

Spark VS Hadoop MapReduce:成本
Spark 和 MapReduce 是开源解决方案,但您仍然需要在机器和人员上花钱。 Spark 和 MapReduce 都可以使用商品服务器并在云上运行。此外,这两种工具都有相似的硬件要求:


Spark 集群中的内存至少应该与您需要处理的数据量一样大,因为数据必须适合内存才能获得最佳性能。如果您需要处理大量数据,Hadoop 肯定是更便宜的选择,因为硬盘空间比内存空间便宜得多。

另一方面,考虑到 Spark 和 MapReduce 的性能,Spark 应该更划算。 Spark 需要更少的硬件来更快地执行相同的任务,尤其是在计算能力按使用付费的云服务器上。

人员配备问题呢?尽管 Hadoop 自 2005 年就已经存在,但市场上仍然缺乏 MapReduce 专家。根据 Gartner 的一份研究报告,57% 的使用 Hadoop 的组织表示“获得必要的技能和能力”是他们最大的 Hadoop 挑战。

那么这对于自 2010 年才出现的 Spark 来说意味着什么呢?虽然它可能有更快的学习曲线,但 Spark 也缺乏合格的专家。好消息是,有大量 Hadoop 即服务产品和基于 Hadoop 的服务(如 Integrate.io 自己的数据集成服务),这有助于缓解这些硬件和人员配备要求。同时,Spark 即服务选项可通过 Amazon Web Services 等提供商获得。

小结:根据基准,Spark 更具成本效益,但人员配备成本可能更高。 Hadoop MapReduce 可能会更便宜,因为可用的人员更多,而且对于海量数据量来说可能更便宜。

Spark VS Hadoop MapReduce:兼容性
Apache Spark 可以作为独立应用程序在 Hadoop YARN 或 Apache Mesos 内部部署或云中运行。 Spark 支持实现 Hadoop 输入格式的数据源,因此它可以与 Hadoop 支持的所有相同数据源和文件格式集成。

Spark 还通过 JDBC 和 ODBC 与商业智能工具一起工作。

底线:Spark 对各种数据类型和数据源的兼容性与 Hadoop MapReduce 相同。

Spark vs Hadoop MapReduce:数据处理
Spark 可以做的不仅仅是简单的数据处理:它还可以处理图形,它包括 MLlib 机器学习库。由于其高性能,Spark 可以进行实时处理和批处理。 Spark 提供了一个“一刀切”的平台供您使用,而不是在不同的平台上拆分任务,这会增加您的 IT 复杂性。

Hadoop MapReduce 非常适合批处理。如果你想要一个实时选项,你需要使用另一个平台,比如 Impala 或 Apache Storm,而对于图形处理,你可以使用 Apache Giraph。 MapReduce 曾经有 Apache Mahout 用于机器学习,但后来被 Spark 和 H2O 抛弃了。

小结:Spark 是数据处理的瑞士军刀,而 Hadoop MapReduce 是批处理的突击刀。

Spark vs Hadoop MapReduce:容错
Spark 具有每个任务的重试和推测执行,就像 MapReduce 一样。尽管如此,MapReduce 在这里有一点优势,因为它依赖于硬盘驱动器,而不是 RAM。如果 MapReduce 进程在执行过程中崩溃,它可以从中断的地方继续,而 Spark 必须从头开始处理。

小结:Spark 和 Hadoop MapReduce 都具有良好的容错性,但 Hadoop MapReduce 的容错性稍强一些。

Spark VS Hadoop MapReduce:安全性
在安全性方面,与 MapReduce 相比,Spark 没有那么先进。事实上,Spark 中的安全性默认设置为“关闭”,这会使您容易受到攻击。 RPC 通道支持通过共享密钥在 Spark 中进行身份验证。 Spark 将事件日志记录作为一项功能,并且可以通过 javax servlet 过滤器保护 Web UI。此外,由于 Spark 可以运行在 YARN 上并使用 HDFS,因此还可以享受 Kerberos 身份验证、HDFS 文件权限以及节点之间的加密。

Hadoop MapReduce 可以享受所有 Hadoop 安全优势并与 Hadoop 安全项目集成,例如 Knox Gateway 和 Apache Sentry。旨在提高 Hadoop 安全性的 Project Rhino 仅在添加 Sentry 支持方面提到了 Spark。否则,Spark 开发人员将不得不自己提高 Spark 的安全性。

小结:与具有更多安全功能和项目的 MapReduce 相比,Spark 安全性仍然欠发达。

Spark 的常用场景
虽然两者都是大规模数据处理的强大选项,但某些情况下,其中一种比另一种更理想。

流数据处理
随着公司走向数字化转型,他们正在寻找实时分析数据的方法。 Spark 的内存数据处理使其成为处理流数据的理想选择。 Spark Streaming 是 Spark 的一个变体,它使这个用例成为可能。那么,公司可以通过哪些方式利用 Spark Streaming?

流式 ETL – 在传统的 ETL 过程中,数据被读取、转换为兼容格式并保存到目标数据存储中。使用 Streaming ETL 的过程效率更高,因为数据在保存到目标数据存储之前会在内存中不断清理和聚合。

数据丰富——公司在尝试适应和提供更增强的客户体验时处于不断变化的状态。通过将实时数据与静态数据相结合,公司可以构建更可靠的客户画像,从而为他们提供个性化体验。

触发事件检测——实时响应事件的能力是一项重要的业务能力,有助于提高敏捷性和适应变化的能力。借助 Spark Streaming,公司可以实时分析数据,以识别需要立即关注的异常活动。

机器学习
在预测分析方面,Spark 的机器学习库 (MLib) 提供了一套强大的工具,可以轻松完成它。当用户对一组数据进行重复查询时,他们本质上是在构建类似于机器学习的算法。例如,机器学习可以帮助公司出于营销目的进行客户细分。它还可以帮助执行情绪分析。

交互式查询
想象一下能够对实时数据执行交互式查询。从本质上讲,您可以分析大型数据集,而无需依赖外部数据存储来处理信息。使用 Spark Streaming,您可以查询数据流,而无需将其持久化到外部数据库。

MapReduce 的常用场景
当处理对于内存中操作来说太大的数据时,MapReduce 是要走的路。因此,MapReduce 最适合处理大型数据集。

处理大型数据集(PB或TB)
考虑到实施和维护所需的时间和费用,千兆字节大小不足以证明 MapReduce 的合理性。希望管理PB或TB数据的组织是 MapReduce 的理想选择。

以不同格式存储数据
公司可以使用 MapReduce 处理多种文件类型,例如文本、图像、纯文本等。由于这些文件对于内存中的处理来说太大了,使用 MapReduce 进行批处理更经济。

数据处理
MapReduce 具有对大型数据集执行基本和复杂分析的强大功能。通过使用基于磁盘的存储而不是内存中的处理,对大型数据集进行汇总、过滤和连接等任务的效率要高得多。

Spark 与 Hadoop MapReduce 趋势

随着公司寻找在拥挤的市场中保持竞争力的新方法,他们将需要适应即将到来的数据管理趋势。这些趋势包括:

XOps – 使用 DevOps 的最佳实践,XOps 的目标是在数据管理过程中实现可靠性、可重用性和可重复性。

Data Fabric – 作为一个架构框架,Data Fabric 的目标是在一个无缝的数据管理平台中结合多种类型的数据存储、分析、处理和安全性

数据分析作为核心业务功能 – 传统上,数据管理由一个单独的团队处理,该团队分析数据并将其提供给关键业务领导者。然而,一种新方法将这些数据直接交到组织领导者手中,这样他们就可以立即访问这些信息以进行决策。

结论
Apache Spark 可能比 Hadoop MapReduce 快 100 倍。
Apache Spark 使用 RAM,并且不依赖于 Hadoop 的两阶段范式。
Apache Spark 适用于可以全部放入服务器 RAM 的较小数据集。
Hadoop 处理海量数据集更具成本效益。
Apache Spark 现在比 Hadoop MapReduce 更受欢迎。
Apache Spark 是大数据平台上闪亮的新玩具,但仍有使用 Hadoop MapReduce 的用例。无论您选择 Apache Spark 还是 Hadoop MapReduce,

Spark具有出色的性能,并且具有很高的成本效益,这得益于其内存数据处理。它与 Hadoop 的所有数据源和文件格式兼容,并且学习曲线更快,并具有可用于多种编程语言的友好 API。 Spark 甚至包括图形处理和机器学习功能。

Hadoop MapReduce 是一个更成熟的平台,它是专门为批处理而构建的。对于无法放入内存的超大数据,MapReduce 比 Spark 更具成本效益,而且可能更容易找到具有 MapReduce 经验的员工。此外,由于许多支持项目、工具和云服务,MapReduce 生态系统目前更大。

但即使你认为 Spark 看起来像这里的赢家,你也很可能不会单独使用它。您仍然需要 HDFS 来存储数据,并且您可能想要使用 HBase、Hive、Pig、Impala 或其他 Hadoop 项目。这意味着您仍然需要与 Spark 一起运行 Hadoop 和 MapReduce 以获得完整的大数据包。

作者 east
Elasticsearch, spring 3月 22,2022

Spring Boot直接输出到Logstash

Spring Boot应用程序可以直接远程输出到Logstash。这里以Logback日志为例,新建项目,在项目中加入Logstash依赖。

1、 要使用logback一个插件来将数据转成json,引入maven配置

dependency>

      <groupId>net.logstash.logback</groupId>

      <artifactId>logstash-logback-encoder</artifactId>

     <version>5.3</version>

</dependency>



2、配置 logback-spring.xml

接下来,在src/resources目录下创建logback-spring.xml配置文件,在配置文件中将对日志进行格式化,并且输出到控制台和Logstash。需要注意的是,在destination属性中配置的地址和端口要与Logstash输入源的地址和端口一致,比如这里使用的是127.0.0.1:4560,则在Logstash输入源中要与这个配置一致。其中logback-spring.xml内容如

<xml version="1.0" encoding="UTF-8"?>

    <configuration scan="true" scanPeriod="60 seconds" debug="true">

         <contextName>logstash-test</contextName>

   <!-- 这个是控制台日志输出格式 方便调试对比--->

    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">

      <encoder>

          <pattern>%d{yyyy-MM-dd HH:mm:ss} %contextName %-5level %logger{50} -%msg%n</pattern>

   </encoder>

</appender>

<appender name="stash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">

  <destination>127.0.0.1:4560</destination> 这是是logstash服务器地址 端口

  <encoder class="net.logstash.logback.encoder.LogstashEncoder" /> 输出的格式,推荐使用这个

</appender>

<root level="info">

   <appender-ref ref="console"/>

   <appender-ref ref="stash"/>

</root>

启动项目,就会请求127.0.0.1:4560,如果有监听,就会自动发送日志。

3、logstash配置

input {

             tcp {

                        host => "localhost" #这个一定要是logstash本机ip,不然logstash 无法启动,也可以去除

                        port => 4560

                       codec => json_lines

                       mode => "server"

                  }

}

filter {

                 grok {

               match => {

"message" => "%{URIPATH:request} %{IP:clientip} %{NUMBER:response:int} \"%{WORD:sources}\" (?:%{URI:referrer}|-) \[%{GREEDYDATA:agent}\] \{%{GREEDYDATA:params}\}"

     }

}


output {

       stdout { codec => rubydebug } #标准输出,在命令行中输出方便调试

elasticsearch { hosts => [ "localhost:9200" ]

index => "pybbs"

document_type => "weblog"

    }

}

如果数据特别多,上述方案就会为Elasticsearch带来很大的压力。为了缓解Elasticsearch的压力,可以将Logstash收集的内容不直接输出到Elasticsearch中,而是输出到缓冲层,比如Redis或者Kafka,然后使用一个Logstash从缓冲层输出到Elasticsearch。当然,还有很多种方案进行日志收集,比如使用Filebeat替换Logstash等。笔者在生产环节搭建过ELK配置,这里提几点建议:

(1)根据日志量判断Elasticsearch的集群选择,不要盲目追求高可用,实际应用需要根据实际场景的预算等因素使用。

(2)缓冲层选择,一般来说选择Kafka和Redis。虽然Kafka作为日志消息很适合,具备高吞吐量等,但是如果需求不是很大,并且环境中不存在Kafka,就没有必要使用Kafka作为消息缓存层,使用现有的Redis也未尝不可。

(3)内存分配,ELK三者部署都是占有一定内存的,并且官网建议的配置都很大。建议结合场景来修改配置,毕竟预算是很重要的一环。

作者 east
Elasticsearch, neo4j, solr 3月 22,2022

neo4j、solr、es数据同步和增量更新

一、Neo4j的数据增量更新:

图数据除了节点的增量更新还牵扯到边的增量更新,节点其实还好说,无论是新增节点还是新增节点属性或者是节点属性更新,实际上都是在节点表可以完成,不是很消耗资源;比较复杂且影响更新效率的其实边的增量更新。

1、节点更新

这个需求其实很普遍,比如我有一个节点:

(n:Person {id: 'argan', name: 'argan', age: 32})

然后用户又传递了一个人数据过来:

{id: 'argan', age: 30, sex: 'male', email: 'arganzheng@gmail.com'} 

可以看到更新了一个属性:年龄,新增了两个属性:性别和电子邮件。我们希望最后的结果是:

(n:Person {id: 'argan', name: 'argan', age: 30, sex: 'male', email: 'arganzheng@gmail.com'})。

需要注意的是name是没有传递的,但还是保留着的。如果要删除一个属性,需要把它的值显式的设置为空。

在Neo4j的要怎么做到呢?

Neo4j的提供了合并语句来实现这个功能。

与ON CREATE和ON MATCH合并

如果需要创建节点,请合并节点并设置属性。

MERGE (n:Person { id: 'argan' })
ON CREATE SET n.created = timestamp()
ON MATCH SET n.lastAccessed = timestamp()
RETURN n.name, n.created, n.lastAccessed

上面的例子可以这么写:

MERGE (n:Node {id: 'argan'}) SET n += {id: 'argan', age: 30, sex: 'male', email: 'arganzheng@gmail.com'} 
RETURN n 

因为这里采用了+=本身就是合并属性,所以区分不需要的英文ON CREATE还是ON MATCH。

同样关系也可以用合并保证只创建一次:

MATCH (n), (m) WHERE n.id = "argan" AND m.id = "magi" CREATE (n)-[:KNOWS]->(m)

写成这样子就可以保证唯一了:

MATCH (n:User {name: "argan"}), (m:User {name: "magi"}) MERGE (n)-[:KNOWS]->(m)

2、neo4j如何支持动态节点标签和关系类型?

上面的合并语句能够实现“存在更新,否则创建”的逻辑,但是还有一个问题没有解决,就是没有设置节点的标签。我们希望构建的节点数据完全是运行时根据用户提供的数据构造的,包括。标签比如用户提供如下数据:

:param batch: [{properties: {name: "argan", label: "Person", id: "1", age: 31}}, {properties: {name: "magi", label: "Person", id: "2", age: 28}}]

下面的暗号语句并没有设置节点的标签,虽然节点有一个叫做标签的属性:

UNWIND {batch} as row 
MERGE (n {id: row.id})
SET n += row.properties

那么我们能不能简单的指定标签呢?

UNWIND {batch} as row 
MERGE (n:row.properties.label {id: row.id})
SET n += row.properties

但是遗憾的是这个语句会报错,因为neo4j不支持动态的节点标签。把row.properties.label去掉或者改成一个固定的字符串就没有问题。

改成这样子也不行:

UNWIND {batch} as row   MERGE (n {id: row.id} )   SET n:row.properties.label,  n += row.properties

绑定变量也不行:

UNWIND {batch} as row   MERGE (n {id: row.id} )   SET n:{label},  n += row.properties

直接指定标签就可以了:

UNWIND {batch} as row   MERGE (n {id: row.id} )   SET n:Test,  n += row.properties

也就是说3.3.13.9。在节点上设置标签也并不支持动态标签..

笔记

neo4j的设置标签还有一个问题,就是它其实是新增标签,不是修改标签。要到更新的效果,你需要先删除掉,再新增..

MATCH (n) WHERE ID(n) = 14  REMOVE n:oldLabel SET n:newLabel

如果是单条数据更新,那其实很简单,我们只需要做字符串拼接就可以了:

String label = vertex.getLabel(); "MERGE (n:" + label + " {id: {id}} " + "SET n += {properties}"

但是关键是我们这里是在Neo4j的内部用开卷展开的服务端变量,如果它不允许动态变量,根本搞不定。难道真的要一条条的插入,那会非常慢的!Neo4j的的插入性能是众所周知的差。一种做法就是先批量插入数据,设置一个临时的标签,然后再批量的更新标签。不过需要两次操作,性能肯定至少慢两倍。

有没有什么方式呢?谷歌了很久,发现了也有人遇到这样的问题:功能请求:apoc支持MERGE节点和rels#271和是否可以使用数据驱动的节点或关系标签进行合并?。

原理跟单条数据插入一样,只是由于退绕是在服务端(Neo4j的)进行的,所以拼接也只能在服务端进行,怎么拼接的就是用?apoc.cypher.doIt拼接后让它在服务端执行:

UNWIND {batch} as row  WITH 'MERGE (n:' + row.properties.label + ' { id: row.id }) SET n += row.properties return n' AS cypher CALL apoc.cypher.doIt(cypher, {}) YIELD value return value.n

但是可惜,会报这样的异常:

org.neo4j.driver.v1.exceptions.ClientException: 
Failed to invoke procedure `apoc.cypher.doIt`: 
Caused by: org.neo4j.graphdb.QueryExecutionException: 
Variable `row` not defined (line 1, column 23 (offset: 22)) "MERGE (n:Person { id: row.id }) SET n += row.properties return n"

所以还是要分两步进行,不过可以合并在一起SET标签:传递标签名称作为参数:

UNWIND {batch} as row  MERGE (n { id: row.id }) 
SET n += row.properties  WITH n  CALL apoc.create.addLabels(id(n), [n.label]) 
YIELD node RETURN node

这样就可以正确的保存数据并且动态设置标签了。笔

本来我们是可以直接使用APOC库的apoc.merge.node状语从句:apoc.create.relationship动态的更新节点标签,关系和节点的。但是正如前面分析的,apoc.merge.node状语从句:apoc.create.relationship现在的实现其实的英文一个防重复CREATE而已,不能达到更新的目的。否则我们的实现将非常简单明了:

更新节点:

UWNIND {batch} as row CALL apoc.merge.node(row.labels, {id: row.id} , row.properties) 
yield node RETURN count(*)

更新关系:

UWNIND {batch} as row MATCH (from) WHERE id(from) = row.from 
MATCH (to:Label) where to.key = row.to CALL apoc.merge.relationship(from, row.type, {id: row.id}, row.properties, to) 
yield rel RETURN count(*)

一种做法就是叉一个分支出来,修改源码,部署自己的罐子包。

二、solr 的增量更新

1.首先要弄懂几个必要的属性,以及数据库建表事项,和dataimporter.properties 、data-config.xml里面的数据

<!–  transformer 格式转化:HTMLStripTransformer 索引中忽略HTML标签   —> 
  <!–  query:查询数据库表符合记录数据   —> 
  <!–  deltaQuery:增量索引查询主键ID    —>    注意这个只能返回ID字段 
  <!–  deltaImportQuery:增量索引查询导入数据  —> 
  <!–  deletedPkQuery:增量索引删除主键ID查询  —> 注意这个只能返回ID字段

2.数据库配置注意事项

1.如果只涉及添加,与修改业务,那么数据库里只需额外有一个timpstamp字段 
就可以了,默认值为当前系统时间,CURRENT_TIMESTAMP
2.如果还涉及删除业务,那么数据里就需额外再多添加一个字段isdelete,int类型的 
用0,1来标识,此条记录是否被删除

3.dataimporter.properties 

这个配置文件很重要,它是用来记录当前时间与上一次修改时间的,通过它能够找出,那些,新添加的,修改的,或删除的记录标识,此条记录是否被删除的记录

4.增量更新就是在全量更新的基础上加上一些配置,配置如下:

<?xml version="1.0" encoding="UTF-8" ?>
<dataConfig> 
    <!--数据源-->
    <dataSource type="JdbcDataSource"
                driver="com.mysql.jdbc.Driver"
                url="jdbc:mysql://192.168.2.10:3306/xtjkqyfw"
                user="root"
                password="Biaopu8888"/>
    <document> 

        <entity name="solrTest" 
        query="SELECT fid,ftime,fcontent,ftitle,flastupdatetime FROM solrTest where flag = '0'"
        deltaImportQuery = "SELECT fid,ftime,fcontent,ftitle,flastupdatetime FROM solrTest where fid = '${dataimporter.delta.fid}'"
        deltaQuery = "SELECT fid FROM solrTest where flastupdatetime > '${dataimporter.last_index_time}' and flag = '0'"
        deletedPkQuery = "SELECT fid FROM solrTest where flag = '1'"
        >
            <!--查询的数据和数据库索引意义对应column 是查询的字段name 是solr索引对应的字段-->
            <field column="fid" name="fid"/>
            <field column="ftitle" name="ftitle"/>
            <field column="fcontent" name="fcontent"/>
            <field column="flastupdatetime" name="flastupdatetime"/>
            <field column="ftime" name="ftime"/>
        </entity>
        
    </document> 
</dataConfig>

三、LOGSTASH-INPUT-JDBC 实现数据库同步ES

在数据方面碰到第一个问题是怎么将postgres中的数据库中同步到es中,在网上找了下相关文档,只有logstash-input-jdbc这个插件还在维护,而且在es中logstash高版本已经集成了这一插件,所以就省去了安装ruby和安装插件这一步了

1 安装elasticsearch logstash kibana 三件套

2 下载数据库驱动

图方便的话可以直接拷贝maven仓库里面的即可

3 添加 .conf文件

input {  
jdbc {
# mysql 数据库链接,shop为数据库名
jdbc_connection_string => "jdbc:postgresql://ip:5432/chongqing_gis"
# 用户名和密码
jdbc_user => ""
jdbc_password => ""
# 驱动
jdbc_driver_library => "E:/ES/logstash-7.8.0/postgres/postgresql-42.2.9.jar"
# 驱动类名
jdbc_driver_class => "org.postgresql.Driver" jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => "E:/ES/logstash-7.8.0/postgres/jdbc.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
# schedule => "* * * * *"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名称
index => "test_index"
# 需要关联的数据库中有有一个id字段,对应类型中的id document_id => "%{gid}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}

修改输入参数:数据库ip、端口、账号、密码等

修改输出参数:es的ip、端口、索引、document_id等

输出参数中索引如果还没有创建,在启动logstash时会自动根据默认模板创建索引,其中有些教程中出现了index_type的设置,这个类型在es的高版本中已经取消,不需要再设置,如果设置了,启动logstash会报错

statement_filepath 保存准备执行的sql文件

jdbc.sql文件:

select gid,name,address from qtpoi

在qtpoi表中有个字段是保存的空间地理信息geom字段,当我加上这个时,启动logstash一直报错,可能对空间字段需要做进一步的处理

Exception when executing JDBC query {:exception=>#<Sequel::DatabaseError: Java::OrgLogstash::MissingConverterException: Missing Converter handling for full class name=org.postgresql.util.PGobject, simple name=PGobject

4 启动logstash即可同步

bin/logstash -f config/postgres.conf

5 打开kibana即可查看到刚同步的数据

GET test_index/_doc/_search

6 如果设置了定时任务,logstash会定时去访问数据同步到es中,但是上面jdbc.sql文件中获取的是整张表的数据,也就是说每次同步都会对全表进行同步,但是我的需求是只需要第一次同步整张表后面只对更新的和修改的数据做同步,

在网上找了下,思路大概是给表增加一个新的字段,保存当前创建的时间或者是当前更新的时间,然后根据:sql_last_value这个函数获取大于这个函数的时间,就过滤出新增的数据和更新的数据,实现对增量数据同步到es,:sql_last_value这个函数官网也没说的太清楚,我大致认为是最后一个更新的值或者最后一次更新的时间

作者 east
flume 3月 19,2022

Logstash和flume全方位对比

Logstash架构如下:

Flume架构如下:



在这里插入图片描述

首先从结构对比,我们会惊人的发现,两者是多么的相似!Logstash的Shipper、Broker、Indexer分别和Flume的Source、Channel、Sink各自对应!只不过是Logstash集成了,Broker可以不需要,而Flume需要单独配置,且缺一不可,但这再一次说明了计算机的设计思想都是通用的!只是实现方式会不同而已。

从程序员的角度来说,上文也提到过了,Flume是真的很繁琐,你需要分别作source、channel、sink的手工配置,而且涉及到复杂的数据采集环境,你可能还要做多个配置,这在上面提过了,反过来说Logstash的配置就非常简洁清晰,三个部分的属性都定义好了,程序员自己去选择就行,就算没有,也可以自行开发插件,非常方便。当然了,Flume的插件也很多,但Channel就只有内存和文件这两种(其实现在不止了,但常用的也就两种)。读者可以看得出来,两者其实配置都是非常灵活的,只不过看场景取舍罢了。

其实从作者和历史背景来看,两者最初的设计目的就不太一样。Flume本身最初设计的目的是为了把数据传入HDFS中(并不是为了采集日志而设计,这和Logstash有根本的区别),所以理所应当侧重于数据的传输,程序员要非常清楚整个数据的路由,并且比Logstash还多了一个可靠性策略,上文中的channel就是用于持久化目的,数据除非确认传输到下一位置了,否则不会删除,这一步是通过事务来控制的,这样的设计使得可靠性非常好。相反,Logstash则明显侧重对数据的预处理,因为日志的字段需要大量的预处理,为解析做铺垫。

为什么先讲Logstash然后讲Flume?这里面有几个考虑,

其一:Logstash其实更有点像通用的模型,所以对新人来说理解起来更简单,而Flume这样轻量级的线程,可能有一定的计算机编程基础理解起来更好;

其二:目前大部分的情况下,Logstash用的更加多,这个数据我自己没有统计过,但是根据经验判断,Logstash可以和ELK其他组件配合使用,开发、应用都会简单很多,技术成熟,使用场景广泛。相反Flume组件就需要和其他很多工具配合使用,场景的针对性会比较强,更不用提Flume的配置过于繁琐复杂了。

作者 east
Docker 3月 18,2022

解决Docker搭建kibana访问出现[Kibana server is not ready yet]的问题

在采用单机版的docker搭建elk,照着网上的教程如下:

1、下载es7.3.0镜像
docker pull docker.elastic.co/elasticsearch/elasticsearch:7.3.0

2、创建一个网络,方便elk使用
docker network create esnet

3、启动es的docker镜像
docker run --name es  -p 9200:9200 -p 9300:9300  --network esnet -e "discovery.type=single-node" bdaab402b220


4、下载docker安装es对应版本的kibana
docker run --name kibana --net esnet -e ELASTICSEARCH_URL=http://127.0.0.1:9200 -p 5601:5601 -d 8bcee4a4f79d

在这一步,满心欢喜想访问kibana后台。

http://localhost:5601

却发现提示
Kibana server is not ready yet

原因是因为 ELASTICSEARCH_URL 配置的应该是容器的 ip,而不是本地ip。

1、首先查看ElasticSearch的容器内部的ip

docker inspect es (es是 ElasticSearch 在容器内部的名称)

Docker容器中启动服务和直接在Linux环境下安装服务,会有ip上的区别,不在是服务器的物理ip, 而是容器对外暴露对的ip, 通过docker inspect elasticsearch查看ES容器暴露出来的ip

2、 然后进入 Kibana 容器内部,修改 kibana.yml 中的ip

$ docker exec -it kibana容器id /bin/bash
$ cd config
$ vi kibana.yml
#
# ** THIS IS AN AUTO-GENERATED FILE **
#

# Default Kibana configuration for docker target
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true

3需要将上面的 "http://elasticsearch:9200" 中的 elasticsearch 替换成上一步的es容器内部ip就可以了。

3、修改完成之后退出容器,重新启动即可

docker stop kibana
docker start kibana

重新访问http://localhost:5601,发现出来后台界面了。

作者 east
Docker 3月 18,2022

Docker网络名词解释和例子

最近在学习docker,研究如何用docker安装elk。看到有一个教程是这样说的:

docker创建一个网络,方便elk使用
docker network create esnet

那么网络是什么?

Docker在容器内部运行应用,这些应用之间的交互依赖于大量不同的网络。Docker对于容器之间、容器与外部网络和VLAN之间的连接均有相应的解决方案。在顶层设计中,Docker网络架构由3个主要部分构成:CNM、Libnetwork和驱动。

Docker网络架构的设计规范是CNM。网络就是需要交互的终端的集合,并且终端之间相互独立。Docker环境中最小的调度单位就是容器,沙盒被放置在容器内部,为容器提供网络连接。

容器A只有一个接口(终端)并连接到了网络A。容器B有两个接口(终端)并且分别接入了网络A和网络B。容器A与B之间是可以相互通信的,因为都接入了网络A。但是,如果没有三层路由器的支持,容器B的两个终端之间是不能进行通信的。

每个Docker主机都有一个默认的单机桥接网络。在Linux上网络名称为bridge,在Windows上叫作nat。除非读者通过命令行创建容器时指定参数–network,否则默认情况下,新创建的容器都会连接到该网络。

查看docker网络的命令:

docker network ls

作者 east
spring 3月 17,2022

Springboot接口使用注意事项

Spring boot接口传参,如果接口用到整型,最好用Integer类型而不是int类型,例如

public ReturnResult deleteByPrimaryKey(@RequestParam("id") Integer id)

如果接口传参用int类型,最好带有默认值,例如:

@RequestParam(value="pageNum", required=false, defaultValue="1") int pageNum,
                                                @RequestParam(value="pageSize", required=false, defaultValue="10") int pageSize

给传参的接口,最好统一封装带有状态码和状态信息,这样客户端调用接口时,方便知道异常信息。


public class ReturnResult {

    //1是成功,0是失败
	private int code;
	
	private String msg;
	
	private Object data;
	
	
	
	

	public ReturnResult() {
		super();
	}


	public ReturnResult(int code, String msg, Object data) {
		super();
		this.code = code;
		this.msg = msg;
		this.data = data;
	}
	

	public ReturnResult(int code, String msg) {
		super();
		this.code = code;
		this.msg = msg;
	}

	public int getCode() {
		return code;
	}

	public void setCode(int code) {
		this.code = code;
	}

	public String getMsg() {
		return msg;
	}

	public void setMsg(String msg) {
		this.msg = msg;
	}

	public Object getData() {
		return data;
	}

	public void setData(Object data) {
		this.data = data;
	}
	
	
	
}
作者 east
solr 3月 17,2022

封装HttpSolrServer设置超时时间

凡是要连接网络的,都要设置超时时间,这样防止网络卡住了没返回结果,例子可以参考 Spark Streaming调用http接口导致卡住了

/**
 * HttpSolrServer扩展对象服务 .<br>
 * 专门设置请求连接超时时间和socket超时时间,调用solr请求服务,防止网络断开时无法中断调用线程<br>
 * 
 */
public class HttpSolrExServer extends HttpSolrServer {

    /** serialVersionUID */
    private static final long serialVersionUID = 4068028650816903817L;

    /**
     * 连接超时值
     */
    private static Integer connectionTimeoutNum;

    /**
     * socket超时值
     */
    private static Integer socketTimeoutNum;



    /**
     * 有参构造函数
     * 
     * @param baseUrl url地址
     */
    public HttpSolrExServer(String baseUrl) {
        super(baseUrl);
        // 设置请求连接超时时间,以毫秒为单位
        this.setConnectionTimeout(getConnectionTimeout());
        // 设置socket超时时间,以毫秒为单位
        this.setSoTimeout(getSocketTimeout());
    }

    public HttpSolrExServer(String baseUrl, HttpClient client){
        super(baseUrl,client);
        // 设置请求连接超时时间,以毫秒为单位
        this.setConnectionTimeout(getConnectionTimeout());
        // 设置socket超时时间,以毫秒为单位
        this.setSoTimeout(getSocketTimeout());
    }

    /**
     * 得到solr的连接超时时间,以毫秒为单位,默认为60秒<br>
     * 
     * @return
     */
    private static int getConnectionTimeout() {
        if (connectionTimeoutNum == null || connectionTimeoutNum <= 0) {
            // solr的连接超时时间,以毫秒为单位,默认为60秒
            int defaultConnectionTimeout = 60000;
            // solr的连接超时时间字符串变量值

            String connectionTimeoutStr = ConfigUtil.getPropsValueByKey("ac.httpSolr.connectionTimeout");
            

            try {
                int configValue = Integer.parseInt(connectionTimeoutStr);

                if (configValue > 0) {
                    defaultConnectionTimeout = configValue;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            connectionTimeoutNum = defaultConnectionTimeout;
        }
        return connectionTimeoutNum.intValue();
    }

    /**
     * 得到solr的socket超时时间,以毫秒为单位,默认为60秒<br>
     * 
     * @return
     */
    private static int getSocketTimeout() {
        if (socketTimeoutNum == null || socketTimeoutNum <= 0) {
            // solr的socket超时时间,以毫秒为单位,默认为60秒
            int defaultSocketTimeout = 60000;
            // solr的socket超时时间字符串变量值
            String socketTimeoutStr = ConfigUtil.getPropsValueByKey("ac.httpSolr.socketTimeout");

            try {
                int configValue = Integer.parseInt(socketTimeoutStr);

                if (configValue > 0) {
                    defaultSocketTimeout = configValue;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            socketTimeoutNum = defaultSocketTimeout;
        }
        return socketTimeoutNum.intValue();
    }
}
/**
     * 根据属性key得到对应的属性值<br>
     * 配置文件按spring boot的约定的配置文件命名规则进行加载<br>
     * <font color=red>注意: 暂不支持yml文件属性读取</font>
     * 
     * @param key 键名称
     * @return
     */
    public static String getPropsValueByKey(String key) {
        // TODO 暂时不支持读取yml格式文件,yml文件支持map和list格式数据,需要另写方法支持
        if (!props.containsKey(CONFIG_FILENAME)) {
            Properties prop = new Properties();
            prop = getPropertiesByFileName(CONFIG_FILENAME);

            if (prop.get(SPRING_PROFILES_ACTIVE) != null) {
                // 依次读取指定的配置文件
                for (String partName : prop.get(SPRING_PROFILES_ACTIVE).toString().split(",")) {
                    prop.putAll(getPropertiesByFileName(SPRING_BOOT_PROFILE_TEMPLATE.replace("{profile}", partName)));
                }
            }
            props.put(CONFIG_FILENAME, prop);
        }
        Object obj = props.get(CONFIG_FILENAME).get(key);
        if (obj == null) {
            return null;
        } else {
            return obj.toString();
        }
    }
作者 east
Spark 3月 16,2022

Spark大数据平台调度任务的优化

在大数据平台生产环境上,遇到一个头疼的问题,每天都要定时运行一个任务。刚开始数据量小和简单,用cron来定时调用可以满足需求。

后来数据量大,出现昨天的任务没跑完,今天的任务又要开始了,在大数据平台上运行的任务越来越多,大数据平台资源被占满了。

考虑进行下面的优化:

1、某个任务如果运行超过30小时时,进行中断。

2、采用DelayQueue来实现延时队列,等前面的任务执行完或被中断,又到它的开始时间时才进行执行。

延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。

入门例子

DelayQueue 非常适合指定时间之后,才能让消费者获取到的场景。

private static class DelayElem implements Delayed {
    /**
     * 延迟时间
     */
    private final long delay;
    /**
     * 到期时间
     */
    private final long expire;
    /**
     * 数据
     */
    private final String msg;

    private DelayElem(long delay, String msg) {
        this.delay = delay;
        this.msg = msg;
        //到期时间 = 当前时间+延迟时间
        this.expire = System.currentTimeMillis() + this.delay;
    }
    /**
     * 需要实现的接口,获得延迟时间
     *
     * 用过期时间-当前时间
     * @param unit 时间单位
     * @return 延迟时间
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }
    /**
     * 用于延迟队列内部比较排序
     * <p>
     * 当前时间的延迟时间 - 比较对象的延迟时间
     *
     * @param o 比较对象
     * @return 结果
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
    @Override
    public String toString() {
        return "DelayElem{" +
                "delay=" + delay +
                ", expire=" + expire +
                ", msg='" + msg + '\'' +
                '}';
    }
}
private static class WriteThread extends Thread {
    private final DelayQueue<DelayElem> delayQueue;
    private WriteThread(DelayQueue<DelayElem> delayQueue) {
        this.delayQueue = delayQueue;
    }
    @Override
    public void run() {
        for(int i = 0; i < 3; i++) {
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            DelayElem element = new DelayElem(1000,i+"test");
            delayQueue.offer(element);
            System.out.println(System.currentTimeMillis() + " 放入元素 " + i);
        }
    }
}
private static class ReadThread extends Thread {
    private final DelayQueue<DelayElem> delayQueue;
    private ReadThread(DelayQueue<DelayElem> delayQueue) {
        this.delayQueue = delayQueue;
    }
    @Override
    public void run() {
        while (true){
            try {
                DelayElem element =  delayQueue.take();
                System.out.println(System.currentTimeMillis() +" 获取元素:" + element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public static void main(String[] args) throws InterruptedException {
    DelayQueue<DelayElem> delayQueue = new DelayQueue<>();
    new WriteThread(delayQueue).start();
    new ReadThread(delayQueue).start();
}
作者 east
运维 3月 10,2022

如何恢复MySQL误删除数据文件

在生产环境上,误删除ibdata1、ib_logfile0和ib_logfile1。如果mysql进程还在运行,此时select读取数据还是insert、update更新数据都正常。

这时可以通过命令查看mysql的进程

# ps -ef | grep mysqld|awk '{print $1,$2}'|grep mysql
# mysql 4070

查看mysql误删除文件在proc的位置

ll /proc/4070fd|grep -e ibdata -e ib_

可以看到,被删除的3个文件ibdata1、ib_logfile0和ib_logfile1在内存中已经标记为deleted了,可见文件确实是被删除了。

那么为什么MySQL还能正常使用呢?其实,mysqld在运行状态下,会保持ibdata1、ib_logfile0、ib_logfile1这些文件为打开状态,即使把它们删除了,它们仍旧存在于内存文件系统中,所以,mysqld仍然可以对其进行读写。只要mysqld进程不结束(MySQL服务不重启),就可以通过proc文件系统找到这几个被删除的文件。

现在数据库还一直对外提供服务,也就是有数据会持续写入,而在InnoDB存储引擎的buffer pool中,有许多dirty page(脏数据,就是内存中的数据已经被修改,但是没有写到磁盘中)还没提交,如果直接把文件复制回去,肯定会丢失数据,甚至还有可能导致ibdata1文件损坏。在复制数据文件之前,必须保证所有buffer pool中的数据修改都保存到了硬盘上,因此,首先需要停止目前的写入、更新和删除等操作,然后刷新内存数据到磁盘,最后才能复制文件。如何操作呢?可执行下面几个SQL:

FLUSH TABLES WITH READ LOCK;
SHOW engine innodb STATUS\G;
show variables like '%innodb_max_dirty_pages_pct%';
SET global innodb_max_dirty_pages_pct=0;
这样设置后,脏页会迅速减少,磁盘写操作会迅速完成。等待所有脏数据刷新到磁盘后,就可以进行文件复制了。
cp /proc/4070/fd/10  /data1/mysql/ib_logfile1
cp /proc/4070/fd/4   /data1/mysql/ibdata1
cp /proc/4070/fd/9   /data1/myql/ib_logfile0

修改文件权限为MySQL,操作如下:

chown mysql:mysql /data1/mysql/ib*

所有操作完成后,还需要重启数据库服务:

/etc/init.d/mysqld restart
作者 east
Java, python 3月 6,2022

Python基础语法规则和Java不同的地方

Java是现在最流行的语言,也是广大程序员最熟悉的语言,而Python作为在人工智能领域的新星,通过对比Java语言来学习Python语言,可以起到事半功倍的效果。

和Java单行注释不同,Python注释更像shell等脚本语言, python语言单行注释通常是以“#”号开头,在“#”号后面紧跟注释说明的文字。

Java语言用{ } 来区分代码块, Python是用缩进代码 , 缩进相同的一组语句构成一个代码块,也称为代码组。

在数学运算上,Java使用除法,如果除数和被除数都是整数,那么结果还是整数。如果结果要为准确的浮点数,要对其中一个数进行强制转换。Python就没有那么麻烦。 Python数值的除法包含两个运算符:“/”返回一个浮点数,“//”返回一个整数 , 在混合计算时,Python会把整数转换成为浮点数 。

在字符串的定义上,Python更加灵活。 可以使用单引号(’)、双引号(”)或三引号(”’或”””)来标识字符串,引号的开始与结束必须是相同类型的 。

Python中没有switch和case语句 , 多路分支语句只能通过if…elif…else流程控制语句来实现,并且每个分支语句都要有严格的缩进。 for循环中也可以使用else语句 。

作者 east
Spark 2月 25,2022

Spark Streaming调用http接口导致卡住了

在生产环境上,之前一直正常,最近运行Spark Streaming出不来结果,通过打印日志和远程调试,发现是调用一个接口,是用HttpUrlConnection来请求,在下面的语句一直没返回结果导致卡住了。

connection.getResonseCode();

解决方法,是对
HttpUrlConnection 设置超时时间

HttpURLConnection connection= (HttpURLConnection)url.openConnection();
connection.setConnectTimeout(30000);
connection.setReadTimeout(30000);
作者 east

上一 1 … 24 25 26 … 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?
  • 如何进行AUTOSAR模块的持续集成(CI)部署与版本控制?

文章归档

  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (42)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (484)
    • CDH (6)
    • datax (4)
    • doris (28)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (9)
    • 运维 (33)
      • Docker (2)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.