gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面34 )
Hbase 10月 26,2020

Hbase使用过滤器Filter例子

使用过滤器Filter

功能简介

HBase Filter主要在Scan和Get过程中进行数据过滤,通过设置一些过滤条件来实现,如设置RowKey、列名或者列值的过滤条件。

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testSingleColumnValueFilter方法中。

public void testSingleColumnValueFilter() {    
LOG.info("Entering testSingleColumnValueFilter.");
Table table = null;
ResultScanner rScanner = null;
try {
table = conn.getTable(tableName);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
// Set the filter criteria.
SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("name"), CompareOp.EQUAL, Bytes.toBytes("Xu Bing"));
scan.setFilter(filter);
// Submit a scan request.
rScanner = table.getScanner(scan);
// Print query results.
for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Single column value filter successfully."); } catch (IOException e) {
LOG.error("Single column value filter failed " ,e);
} finally {
if (rScanner != null) {
// Close the scanner object.
rScanner.close();
}
if (table != null) {
try {
// Close the HTable object.
table.close();
} catch (IOException e) {
LOG.error("Close table failed " ,e);
}
}
}
LOG.info("Exiting testSingleColumnValueFilter.");
}

注意事项

当前二级索引不支持使用SubstringComparator类定义的对象作为Filter的比较器。

例如,如下示例中的用法当前不支持:

Scan scan = new Scan();
filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(new SingleColumnValueFilter(Bytes
.toBytes(columnFamily), Bytes.toBytes(qualifier),
CompareOp.EQUAL, new SubstringComparator(substring)));
scan.setFilter(filterList);
作者 east
Hive 10月 26,2020

Hive用户自定义函数

用户自定义函数

当Hive的内置函数不能满足需要时,可以通过编写用户自定义函数UDF(User-Defined Functions)插入自己的处理代码并在查询中使用它们。

按实现方式,UDF分如下分类:

  • 普通的UDF,用于操作单个数据行,且产生一个数据行作为输出。
  • 用户定义聚集函数UDAF(User-Defined Aggregating Functions),用于接受多个输入数据行,并产生一个输出数据行。
  • 用户定义表生成函数UDTF(User-Defined Table-Generating Functions),用于操作单个输入行,产生多个输出行。

按使用方法,UDF有如下分类:

  • 临时函数,只能在当前会话使用,重启会话后需要重新创建。
  • 永久函数,可以在多个会话中使用,不需要每次创建。

下面以编写一个AddDoublesUDF为例,说明UDF的编写和使用方法:

功能介绍

AddDoublesUDF主要用来对两个及多个浮点数进行相加。在该样例中可以掌握如何编写和使用UDF。

说明:

  • 一个普通UDF必须继承自“org.apache.hadoop.hive.ql.exec.UDF”。
  • 一个普通UDF必须至少实现一个evaluate()方法,evaluate函数支持重载。
  • 开发自定义函数需要在工程中添加hive-exec-1.3.0.jar依赖包,可从hive安装目录下获取。

样例代码

以下为UDF示例代码:

package com.huawei.bigdata.hive.example.udf;
import org.apache.hadoop.hive.ql.exec.UDF;

public class AddDoublesUDF extends UDF { 
 public Double evaluate(Double... a) { 
    Double total = 0.0; 
    // 处理逻辑部分. 
    for (int i = 0; i < a.length; i++) 
      if (a[i] != null) 
        total += a[i]; 
    return total; 
  } 
} 

如何使用

  1. 把以上程序打包成AddDoublesUDF.jar,并上传到HDFS指定目录下(如“/user/hive_examples_jars/”)且创建函数的用户与使用函数的用户有该文件的可读权限。示例语句: hdfs dfs -put ./hive_examples_jars /user/hive_examples_jars hdfs dfs -chmod 777 /user/hive_examples_jars
  2. 需要使用一个具有admin权限的用户登录beeline客户端,执行如下命令: kinit Hive业务用户 beeline set role admin;
  3. 在Hive Server中定义该函数,以下语句用于创建永久函数: CREATE FUNCTION addDoubles AS ‘com.bigdata.hive.example.udf.AddDoublesUDF’ using jar ‘hdfs://hacluster/user/hive_examples_jars/AddDoublesUDF.jar’; 其中addDoubles是该函数的别名,用于SELECT查询中使用。 以下语句用于创建临时函数: CREATE TEMPORARY FUNCTION addDoubles AS ‘com.bigdata.hive.example.udf.AddDoublesUDF’ using jar ‘hdfs://hacluster/user/hive_examples_jars/AddDoublesUDF.jar’;
    • addDoubles是该函数的别名,用于SELECT查询中使用。
    • 关键字TEMPORARY说明该函数只在当前这个Hive Server的会话过程中定义使用。
  4. 在Hive Server中使用该函数,执行SQL语句: SELECT addDoubles(1,2,3); 说明: 若重新连接客户端再使用函数出现[Error 10011]的错误,可执行reload function;命令后再使用该函数。
  5. 在Hive Server中删除该函数,执行SQL语句: DROP FUNCTION addDoubles;
作者 east
Flink 10月 26,2020

flink调优经验

数据倾斜

当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。

  • 需要重新设计key,以更小粒度的key使得task大小合理化。
  • 修改并行度。
  • 调用rebalance操作,使数据分区均匀。

缓冲区超时设置

  • 由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。
  • 当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。 示例可以参考如下:env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
作者 east
Spark 10月 26,2020

Spark Streaming调优经验

Spark Streaming调优

操作场景

Streaming作为一种mini-batch方式的流式处理框架,它主要的特点是:秒级时延和高吞吐量。因此Streaming调优的目标:在秒级延迟的情景下,提高Streaming的吞吐能力,在单位时间处理尽可能多的数据。

说明:

本章节适用于输入数据源为Kafka的使用场景。

操作步骤

一个简单的流处理系统由以下三部分组件组成:数据源 + 接收器 + 处理器。数据源为Kafka,接受器为Streaming中的Kafka数据源接收器,处理器为Streaming。

对Streaming调优,就必须使该三个部件的性能都最优化。

  • 数据源调优 在实际的应用场景中,数据源为了保证数据的容错性,会将数据保存在本地磁盘中,而Streaming的计算结果全部在内存中完成,数据源很有可能成为流式系统的最大瓶颈点。 对Kafka的性能调优,有以下几个点:
    • 使用Kafka-0.8.2以后版本,可以使用异步模式的新Producer接口。
    • 配置多个Broker的目录,设置多个IO线程,配置Topic合理的Partition个数。
    详情请参见Kafka开源文档中的“性能调优”部分:http://kafka.apache.org/documentation.html
  • 接收器调优 Streaming中已有多种数据源的接收器,例如Kafka、Flume、MQTT、ZeroMQ等,其中Kafka的接收器类型最多,也是最成熟一套接收器。 Kafka包括三种模式的接收器API:
    • KafkaReceiver:直接接收Kafka数据,进程异常后,可能出现数据丢失。
    • ReliableKafkaReceiver:通过ZooKeeper记录接收数据位移。
    • DirectKafka:直接通过RDD读取Kafka每个Partition中的数据,数据高可靠。
    从实现上来看,DirectKafka的性能会是最好的,实际测试上来看,DirectKafka也确实比其他两个API性能好了不少。因此推荐使用DirectKafka的API实现接收器。 数据接收器作为一个Kafka的消费者,对于它的配置优化,请参见Kafka开源文档:http://kafka.apache.org/documentation.html
  • 处理器调优 Streaming的底层由Spark执行,因此大部分对于Spark的调优措施,都可以应用在Streaming之中,例如:
    • 数据序列化
    • 配置内存
    • 设置并行度
    • 使用External Shuffle Service提升性能
    说明: 在做Spark Streaming的性能优化时需注意一点,越追求性能上的优化,Streaming整体的可靠性会越差。例如: “spark.streaming.receiver.writeAheadLog.enable”配置为“false”的时候,会明显减少磁盘的操作,提高性能,但由于缺少WAL机制,会出现异常恢复时,数据丢失。 因此,在调优Streaming的时候,这些保证数据可靠性的配置项,在生产环境中是不能关闭的。
  • 日志归档调优 参数“spark.eventLog.group.size”用来设置一个应用的JobHistory日志按照指定job个数分组,每个分组会单独创建一个文件记录日志,从而避免应用长期运行时形成单个过大日志造成JobHistory无法读取的问题,设置为“0”时表示不分组。 大部分Spark Streaming任务属于小型job,而且产生速度较快,会导致频繁的分组,产生大量日志小文件消耗磁盘I/O。建议增大此值,例如改为“1000”或更大值。
作者 east
Spark 10月 26,2020

Spark Core调优经验

使用mapPartitions,按每个分区计算结果

如果每条记录的开销太大,例:

rdd.map{x=>conn=getDBConn;conn.write(x.toString);conn.close}

则可以使用MapPartitions,按每个分区计算结果,如

rdd.mapPartitions(records => conn.getDBConn;for(item <- records)
write(item.toString); conn.close)

使用mapPartitions可以更灵活地操作数据,例如对一个很大的数据求TopN,当N不是很大时,可以先使用mapPartitions对每个partition求TopN,collect结果到本地之后再做排序取TopN。这样相比直接对全量数据做排序取TopN效率要高很多。

使用coalesce调整分片的数量

coalesce可以调整分片的数量。coalesce函数有两个参数:

coalesce(numPartitions: Int, shuffle: Boolean = false)

当shuffle为true的时候,函数作用与repartition(numPartitions: Int)相同,会将数据通过Shuffle的方式重新分区;当shuffle为false的时候,则只是简单的将父RDD的多个partition合并到同一个task进行计算,shuffle为false时,如果numPartitions大于父RDD的切片数,那么分区不会重新调整。

遇到下列场景,可选择使用coalesce算子:

  • 当之前的操作有很多filter时,使用coalesce减少空运行的任务数量。此时使用coalesce(numPartitions, false),numPartitions小于父RDD切片数。
  • 当输入切片个数太大,导致程序无法正常运行时使用。
  • 当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用coalesce(numPartitions, true)。

localDir配置

Spark的Shuffle过程需要写本地磁盘,Shuffle是Spark性能的瓶颈,I/O是Shuffle的瓶颈。配置多个磁盘则可以并行的把数据写入磁盘。如果节点中挂载多个磁盘,则在每个磁盘配置一个Spark的localDir,这将有效分散Shuffle文件的存放,提高磁盘I/O的效率。如果只有一个磁盘,配置了多个目录,性能提升效果不明显。

Collect小数据

大数据量不适用collect操作。

collect操作会将Executor的数据发送到Driver端,因此使用collect前需要确保Driver端内存足够,以免Driver进程发生OutOfMemory异常。当不确定数据量大小时,可使用saveAsTextFile等操作把数据写入HDFS中。只有在能够大致确定数据大小且driver内存充足的时候,才能使用collect。

使用reduceByKey

reduceByKey会在Map端做本地聚合,使得Shuffle过程更加平缓,而groupByKey等Shuffle操作不会在Map端做聚合。因此能使用reduceByKey的地方尽量使用该算子,避免出现groupByKey().map(x=>(x._1,x._2.size))这类实现方式。

广播map代替数组

当每条记录需要查表,如果是Driver端用广播方式传递的数据,数据结构优先采用set/map而不是Iterator,因为Set/Map的查询速率接近O(1),而Iterator是O(n)。

数据倾斜

当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。

  • 需要重新设计key,以更小粒度的key使得task大小合理化。
  • 修改并行度。

优化数据结构

  • 把数据按列存放,读取数据时就可以只扫描需要的列。
  • 使用Hash Shuffle时,通过设置spark.shuffle.consolidateFiles为true,来合并shuffle中间文件,减少shuffle文件的数量,减少文件IO操作以提升性能。最终文件数为reduce tasks数目。
作者 east
python, 人工智能, 数据挖掘 10月 8,2020

python多项式回归代码实现

多项式回归是在上文python源码实现线性回归并绘图

基础上实现的,要实现下面的多项式

可以用矩阵相乘来实现

代码如下:

import numpy as np
import matplotlib.pyplot as plt

# 读入训练数据
train = np.loadtxt('click.csv', delimiter=',', dtype='int', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

# 标准化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 参数初始化
theta = np.random.rand(3)

# 创建训练数据的矩阵
def to_matrix(x):
    return np.vstack([np.ones(x.size), x, x ** 2]).T

X = to_matrix(train_z)

# 预测函数
def f(x):
    return np.dot(x, theta)

# 目标函数
def E(x, y):
    return 0.5 * np.sum((y - f(x)) ** 2)

# 学习率
ETA = 1e-3

# 误差的差值
diff = 1

# 更新次数
count = 0

# 直到误差的差值小于 0.01 为止,重复参数更新
error = E(X, train_y)
while diff > 1e-2:
    # 更新结果保存到临时变量
    theta = theta - ETA * np.dot(f(X) - train_y, X)

    # 计算与上一次误差的差值
    current_error = E(X, train_y)
    diff = error - current_error
    error = current_error

    # 输出日志
    count += 1
    log = '第 {} 次 : theta = {}, 差值 = {:.4f}'
    print(log.format(count, theta, diff))

# 绘图确认
x = np.linspace(-3, 3, 100)
plt.plot(train_z, train_y, 'o')
plt.plot(x, f(to_matrix(x)))
plt.show()

最后输出效果如下:

作者 east
python, 数据挖掘 10月 8,2020

python源码实现线性回归并绘图

用python实现一次函数 的线性回归。把fθ(x)作为一次函数来实现吧。我们要实现下面这样的fθ(x)和目标函数E(θ)。

要把下面的训练数据变成平均值为0、方差为1的数据。作用是参数的收敛会更快。这种做法也被称为标准化或者z-score规范化,变换表达式是这样的。µ是训练数据的平均值,σ是标准差。

参数更新表达式如下:

训练数据如下:

x	y
235	591
216	539
148	413
35	310
85	308
204	519
49	325
25	332
173	498
191	498
134	392
99	334
117	385
112	387
162	425
272	659
159	400
159	427
59	319
198	522

import numpy as np
import matplotlib.pyplot as plt

# 读入训练数据
train = np.loadtxt('click.csv', delimiter=',', dtype='int', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

# 标准化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 参数初始化
theta0 = np.random.rand()
theta1 = np.random.rand()

# 预测函数
def f(x):
    return theta0 + theta1 * x

# 目标函数
def E(x, y):
    return 0.5 * np.sum((y - f(x)) ** 2)

# 学习率
ETA = 1e-3

# 误差的差值
diff = 1

# 更新次数
count = 0

# 直到误差的差值小于 0.01 为止,重复参数更新
error = E(train_z, train_y)
while diff > 1e-2:
    # 更新结果保存到临时变量
    tmp_theta0 = theta0 - ETA * np.sum((f(train_z) - train_y))
    tmp_theta1 = theta1 - ETA * np.sum((f(train_z) - train_y) * train_z)

    # 更新参数
    theta0 = tmp_theta0
    theta1 = tmp_theta1

    # 计算与上一次误差的差值
    current_error = E(train_z, train_y)
    diff = error - current_error
    error = current_error

    # 输出日志
    count += 1
    log = '第 {} 次 : theta0 = {:.3f}, theta1 = {:.3f}, 差值 = {:.4f}'
    print(log.format(count, theta0, theta1, diff))

# 绘图确认
x = np.linspace(-3, 3, 100)
plt.plot(train_z, train_y, 'o')
plt.plot(x, f(x))
plt.show()

最终输出图形如下:

作者 east
Kafka, Spark 9月 5,2020

Spark Streaming读取kafka直连案例

Spark Streaming读取kafka有2种方式:基于Receiver的方式和 基于Direct的方式 。 Direct 更高效, 负责追踪消费的offset 可以用redis或mysql来保存。

 
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
object DirectKafka {
def main(args: Array[String]): Unit = {
if (args.length < 2){ System.err.println( s""" |DirectKafka
| is a list of one or more kafka brokers
| is a list of one or more kafka topics
""".stripMargin)
System.exit(1)
}


/*启动zookeeper
cd /root/kafka/kafka_2.11-1.0.0/bin
./zookeeper-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/zookeeper.properties

启动kafka
cd /root/kafka/kafka_2.11-1.0.0/bin
./kafka-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/server.properties

启动kafka生产者
./kafka-console-producer.sh --broker-list master:9092 --topic kafka_test

提交任务
spark-submit --class com.kafka.DirectKafka \
--master spark://master:7077 \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
/root/datafile/SparkStreamingKafka-1.0-SNAPSHOT-jar-with-dependencies.jar \
master:9092 kafka_test
*/

val sparkConf = new SparkConf().setAppName("SparkStreaming-DirectKafka")
val sc = new SparkContext(sparkConf)
val Array(brokers, topics) = args
val ssc = new StreamingContext(sc, Seconds(2))
val topicset = topics.split(",").toSet
val KafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, KafkaParams, topicset)
directKafkaStream.print()
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map(._2) .flatMap(.split(" "))
.map(x => (x, 1L))
.reduceByKey(_ + _)
.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
rdd.take(10).foreach(println)
}
ssc.start()
ssc.awaitTermination()
}
}


import com.alibaba.fastjson.JSON
import main.scala.until.dataschema
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import scalikejdbc.config.{DBs, DBsWithEnv}
import scalikejdbc._
import main.scala.until.ParamsUtils
import main.scala.until.SparkUtils

object readkafka {
  def main(args: Array[String]): Unit = {
    if (args.length != 1){
      System.err.println("please input data args")
      System.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("SparkStreaming-test")
      .setMaster("local[*]")
      .set("spark.testing.memory","2147480000")
    val sc = new SparkContext(sparkConf)

//topic : spark_example_topic , countly_event ,countly_imp
//broker : 172.31.2.6:9292,172.31.2.7:9292,172.31.2.8:9292

//    val ssc = new StreamingContext(sc, Seconds(2))
    val ssc = new StreamingContext(sc, Seconds(2))

    val messages = new SparkUtils(ssc).getDirectStream(ParamsUtils.kafka.KAFKA_TOPIC)
    messages.print()

//    SparkUtils.apply(null).getDirectStream()

//-------------------------------------------------------------------------------

    // messages 从kafka获取数据,将数据转为RDD
    messages.foreachRDD((rdd, batchTime) => {
      import org.apache.spark.streaming.kafka.HasOffsetRanges
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges   // 获取偏移量信息
      /**
        * OffsetRange 是对topic name,partition id,fromOffset(当前消费的开始偏移),untilOffset(当前消费的结束偏移)的封装。
        * *  所以OffsetRange 包含信息有:topic名字,分区Id,开始偏移,结束偏移
        */
      println("===========================> count: " + rdd.map(x => x + "1").count())
     // offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset))
      for (offset <- offsetRanges) {
        // 遍历offsetRanges,里面有多个partition
        println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset)
        DBs.setupAll()
        // 将partition及对应的untilOffset存到MySQL中
        val saveoffset = DB localTx {
          implicit session =>
           sql"DELETE FROM offsetinfo WHERE topic = ${offset.topic} AND partitionname = ${offset.partition}".update.apply()
            sql"INSERT INTO offsetinfo (topic, partitionname, untilOffset) VALUES (${offset.topic},${offset.partition},${offset.untilOffset})".update.apply()
        }
      }
    })

    // 处理从kafka获取的message信息
    val parameter = messages.flatMap(line => {
      //获取服务端事件日期 reqts_day
      val reqts_day = try {
        new DateTime(JSON.parseObject(line._2).getJSONObject("i").getLong("timestamp") * 1000).toDateTime.toString("yyyy-MM-dd HH:mm:ss")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //获取 设备号
      val cookieid = try {
        JSON.parseObject(line._2).getJSONObject("d").get("d")    //将Json字符串转化为相应的对象  .getString("kid")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //组合成一个字符串
      val data = reqts_day + "##" + cookieid
      Some(data)       //some是一定有值的, some.get获取值,如果没有值,会报异常
    }).map(_.split("##")).map(x => (x(0),x(1)))

    println("------------------")

    parameter.foreachRDD{ rdd =>

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._
      // 转换成DataFrame
      val SaveParameter = rdd.map(w => dataschema(w._1.toString,w._2.toString)).toDF("data_date","cookies_num")
      // 注册视图
      SaveParameter.createOrReplaceTempView("dau_tmp_table")
      val insertsql =sqlContext.sql("select * from dau_tmp_table")
      insertsql.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/userprofile_test","dau_tmp_table",ParamsUtils.mysql.mysqlProp)

    }

    messages.print()
    ssc.start()
    ssc.awaitTermination()

  }
}
作者 east
Elasticsearch 8月 31,2020

Elsticsearch 数据查询流程

Elasticsearch在存储数据的时候默认是根据每条记录的_id字段做路由分发的,这意味着es服务端是准确知道每个document分布在那个shard上的。

相对比于CURD上操作,search一个比较复杂的执行模式,因为我们不知道那些document会被匹配到,任何一个shard上都有可能,所以一个search请求必须查询一个索引或多个索引里面的所有shard才能完整的查询到我们想要的结果。

找到所有匹配的结果是查询的第一步,来自多个shard上的数据集在分页返回到客户端的之前会被合并到一个排序后的list列表,由于需要经过一步取top N的操作,所以search需要进过两个阶段才能完成,分别是query和fetch。

如下图所示:

(一)query(查询阶段)

当一个search请求发出的时候,这个query会被广播到索引里面的每一个shard(主shard或副本shard),每个shard会在本地执行查询请求后会生成一个命中文档的优先级队列。

这个队列是一个排序好的top N数据的列表,它的size等于from+size的和,也就是说如果你的from是10,size是10,那么这个队列的size就是20,所以这也是为什么深度分页不能用from+size这种方式,因为from越大,性能就越低。

es里面分布式search的查询流程如下:

1,客户端发送一个search请求到Node 3上,然后Node 3会创建一个优先级队列它的大小=from+size

2,接着Node 3转发这个search请求到索引里面每一个主shard或者副本shard上,每个shard会在本地查询然后添加结果到本地的排序好的优先级队列里面。

3,每个shard返回docId和所有参与排序字段的值例如_score到优先级队列里面,然后再返回给coordinating节点也就是Node 3,然后Node 3负责将所有shard里面的数据给合并到一个全局的排序的列表。

上面提到一个术语叫coordinating node,这个节点是当search请求随机负载的发送到一个节点上,然后这个节点就会成为一个coordinating node,它的职责是广播search请求到所有相关的shard上,然后合并他们的响应结果到一个全局的排序列表中然后进行第二个fetch阶段,注意这个结果集仅仅包含docId和所有排序的字段值,search请求可以被主shard或者副本shard处理,这也是为什么我们说增加副本的个数就能增加搜索吞吐量的原因,coordinating节点将会通过round-robin的方式自动负载均衡。

(二)fetch(读取阶段)

query阶段标识了那些文档满足了该次的search请求,但是我们仍然需要检索回document整条数据,这个阶段称为fetch。

流程如下:

1,coordinating 节点标识了那些document需要被拉取出来,并发送一个批量的mutil get请求到相关的shard上。

2,每个shard加载相关document,如果需要他们将会被返回到coordinating 节点上。

3,一旦所有的document被拉取回来,coordinating节点将会返回结果集到客户端上。

作者 east
Elasticsearch 8月 31,2020

Elsticsearch 数据写入流程

前言

由于Elasticsearch使用Lucene来处理shard级别的索引和查询,因此数据目录中的文件由Elasticsearch和Lucene共同编写。

Lucene负责编写和维护Lucene索引文件,而Elasticsearch在Lucene之上编写与功能相关的元数据,例如字段映射,索引设置和其他集群元数据,用户和支持功能由Elasticsearch提供。

ES数据

Node Data

只需启动ES就会有生成下面的目录树:

node.lock:为了确保一次只有一个ES应用从这个数据目录读取/写入。

global-0.st:global-prefix表示这是一个全局状态的文件,而.st扩展名表示这是一个包含元数据状态的文件。这个二进制文件是包含集群的全局元数据,前缀后的数字表示集群元数据版本,这是一个严格增加的版本控制方案。版本号跟随集群而变。

不要去编辑这些文件,因为有可能会导致数据丢失。

Index Data

当我们创建index,文件目录发生变化:

此时可以看出,已经创建了与索引名称对应的新目录。目录有两类子文件夹:_state和0.。

_state包含state-{version}.st索引状态文件,包含了索引的元数据,例如:创建的时间戳。还包含唯一标识符以及索引的settings和mappings。

数字0表示的shard编号,这个文件夹包含了shard相关的数据。

Shard Data

_state目录包含shard的状态文件,其中包含版本控制以及有关分片是主分片还是副本的信息。

index目录包含Lucene拥有的文件。ES通常不直接写入此文件夹。这些文件构成了ES数据目录的大小。

translog:ES事物日志。ES事物日志确保数据可以安全的写入到索引中,而无需为每个文档执行Lucene commit。提交Lucene index会产生一个新的segment,即fsync()-ed,会导致大量的磁盘I/O,从而影响性能。

Lucene数据

Lucene文件如下表所示:

Name Extension Brief Description
Segemnts File segments_N 存储lucene数据文件的元信息,记录所有segment的元数据信息。主要记录了当前有多少个segment,每个segment有一些基本信息,更新这些信息能定位到每个segment的元信息文件。
Lock File write.lock 防止多个IndexWriters写入同一个文件
Segement Info .si 存储有关段的元数据
Compound File .cfs,.cfe 一个segment包含了如下表的各个文件,为减少打开文件的数量,在segment小的时候,segment的所有文件内容都保存在cfs文件中,cfe文件保存了lucene各文件在cfs文件的位置信息
Fields .fnm 存储fileds的相关信息
Fields Index .fdx 正排存储文件的元数据信息
Fields Data .fdt 存储了正排存储数据,写入的原文存储在这
Term Dictionary .tim 倒排索引的元数据信息
Term Index .tip 倒排索引文件,存储了所有的倒排索引数据
Frequencies .doc 保存了每个term的doc id列表和term在doc中位置
Positions .pos 全文索引的字段,会有该文件,保存了term在doc中的位置
Playloads .pay Stores additional per-position metadata information such as character offsets and user payloads全文索引的字段,使用了一些像payloads的高级特性会有该文件,保存了term在doc中的一些高级特性
Norms .nvd,.nvm 存储索引字段加权数据
Per-Document Values .dvd,.dvm Lucene的docvalues文件,即列式存储,用作聚合和排序
Term Vector Data .tvx,.tvd,.tvf 保存索引字段的矢量信息,用在对term进行高亮,计算文本相关性中使用
Live Documents .liv 记录了segment中删除的doc

数据写入流程

1.segment file: 存储倒排索引的文件,每个segment本质上就是一个倒排索引,每秒都会生成一个segment文件,当文件过多时es会自动进行segment merge(合并文件),合并时会同时将已经标注删除的文档物理删除;

2.commit point(重点理解): 记录当前所有可用的segment,每个commit point都会维护一个.del文件(es删除数据本质是不属于物理删除),当es做删改操作时首先会在.del文件中声明某个document已经被删除,文件内记录了在某个segment内某个文档已经被删除,当查询请求过来时在segment中被删除的文件是能够查出来的,但是当返回结果时会根据commit point维护的那个.del文件把已经删除的文档过滤掉;

3.translog日志文件: 为了防止elasticsearch宕机造成数据丢失保证可靠存储,es会将每次写入数据同时写到translog日志中。

translog的写入也可以设置,默认是request,每次请求都会写入磁盘(fsync),这样就保证所有数据不会丢,但写入性能会受影响;如果改成async,则按照配置触发trangslog写入磁盘,注意这里说的只是trangslog本身的写盘。

4. flush:translog什么时候清空?默认是512mb,或30分钟。这个动作就是flush,同时伴随着segment提交(写入磁盘)。flush之后,这段translog的使命就完成了,因为segment已经写入磁盘,就算故障,也可以从segment文件恢复。

5.fsync:另外,有一个/_flush/sync命令,在做数据节点维护时很有用。其逻辑就是flush translog并且将sync_id同步到各个分片。可以实现快速恢复。

综述:fsync指的是translog本身被写入磁盘的动作;flush指的是逻辑上的刷新,包含一系列逻辑操作。

其实flush API的作用是强制停止translog的fsync行为,提交segment并清除translog。间隔要尽量大一点(需要优化写速度的话,就稍微调大一点flush间隔。)

作者 east
Elasticsearch 8月 31,2020

Elasticsearch写入性能调优

bulk批量写入

如果业务场景支持将一批数据聚合起来,一次性写入Elasticsarch,那么尽量采用bulk的方式,bulk批量写入的速度远高于一条一条写入大量document的速度。

并不是bulk size越大越好,而是根据写入数据量具体来定的,因为越大的bulk size会导致内存压力过大,因此最好一个请求不要发送超过10MB的数据量,以5-10MB为最佳值。计算公式为:

一次bulk写入的数据量大小=一次bulk批量写入的文档数*每条数据量的大小。

多线程写入

单线程发送bulk请求是无法最大化Elasticsearch集群写入的吞吐量的。如果要利用集群的所有资源,就需要使用多线程并发将数据bulk写入集群中。多线程并发写入同时可以减少每次底层磁盘fsync的次数和开销。

一旦发现ES返回了TOO_MANY_REQUESTS的错误,JavaClient也就是EsRejectedExecutionException。此时说明Elasticsearch已经达到了一个并发写入的最大瓶颈了。

推荐使用CPU核数的2倍线程数。

修改索引刷新时间及副本数

默认“index.refresh_interval”为“1s”,即每秒都会强制生成1个新的segments文件,增大索引刷新时间,可以生成更大的segments文件,有效降低IO并减少segments merge的压力,该配置项可以建索引时指定(或者配置到template里去)。

如果只是单纯导入数据,不需要做实时查询,可以把refresh禁用(即设置index.refresh_interval为-1),并设置“index.number_of_replicas”为“0”,当然这样设置会有数据丢失风险。等到数据完成导入后,再把参数设置为合适的值。

命令为单索引下操作如下所示,同时也支持多索引(索引名按逗号分隔)和全索引(用*通配符)操作。

curl -XPUT --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/myindex/_settings" -H 'Content-Type: application/json' -d'
{
    "number_of_replicas": 0,
    "refresh_interval": "180s"
}'

修改事务日志translog参数

默认设置下,translog 的持久化策略是每个请求都flush(durability参数值为request),这样能保证写操作的可靠性,但是对性能会有很严重的影响,实际测试发现如果使用默认设置进行导数据磁盘IO会持续占满。如果系统可以接受一定几率的数据丢失(或有手段补录丢失数据),可以通过调整 translog 持久化策略、周期性和一定大小的时候 flush,能大大提升导入性能。该配置项可以建索引时指定(或者配置到template里去)。执行命令如下所示:

curl -XPUT --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/myindex/_settings" -H 'Content-Type: application/json' -d'
{
"index": {
    "translog": {
          "flush_threshold_size": "1GB",
          "sync_interval": "180s",
          "durability": "async"
          }
     }
}'

备注:数据丢失风险

参数描述清楚

修改merge参数以及线程数

Elasticsearch写入数据时,refresh刷新会生成1个新的segment,segments会按照一定的策略进行索引段合并merge。merge的频率对写入和查询的速度都有一定的影响,如果merge频率比较快,会占用较多的IO,影响写入的速度,但同时segment个数也会比较少,可以提高查询速度。所以merge频率的设定需要根据具体业务去权衡,同时保证写入和查询都相对快速。Elasticsearch默认使用TieredMergePolicy,可以通过参数去控制索引段合并merge的频率:

  1. 参数“index.merge.policy.floor_segment”,Elasticsearch避免产生很小的segment,小于这个阀值的所有的非常小的segment都会merge直到达到这个floor的size,默认是2MB。
  2. 参数“index.merge.policy.max_merge_at_once”,一次最多只merge多少个segments,默认是10。
  3. 参数“index.merge.policy.max_merged_segment”,超过多大size的segment不会再做merge,默认是5g。
  4. 参数“index.merge.policy.segment_per_tier”默认为10,表示每个tier允许的segment个数,注意这个值要大于等于“index.merge.policy.max_merge_at_once”值,否则这个值会先于最大可操作数到达,就会立刻做merge,这样会造成频繁merge。
  5. 参数“ index.merge.scheduler.max_thread_count ”,单个shard上可能同时合并的最大线程数。默认会启动 Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors() / 2)) 个线程进行merge操作,适用于SSD固态硬盘。但是如果硬盘是机械硬盘,很容易出现IO阻塞,将线程数设置为1。

一般情况下,通过调节参数“index.merge.policy.max_merge_at_once”和“index.merge.policy.segment_per_tier”去控制merge的频率。

参数修改 好处 坏处
提高“index.merge.policy.max_merge_at_once” 和“index.merge.policy.segment_per_tier”参数值(eg :50) 提升indexing速度 减少了segment merge动作的发生,意味着更多的segments,会降低searching速度
降低“index.merge.policy.max_merge_at_once” 和“index.merge.policy.segment_per_tier”参数值(eg :5) Segments更少,即能够提升searching速度 更多的segments merge操作,会花费更多系统资源(CPU/IO/RAM),会降低indexing速度

修改参数命令如下示例:

curl -XPUT --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/myindex-001/_settings?pretty" -H 'Content-Type: application/json' -d' {      "merge":{          "scheduler":{             "max_thread_count" : "1"          },          "policy":{               "segments_per_tier" : "20",               "max_merge_at_once": "20",               "floor_segment" : "2m",               "max_merged_segment" : "5g"          }       } }'


禁用Doc Values

禁用Doc Values

默认情况下,支持doc values 的所有字段都是开启的。因为 Doc Values 默认启用,可以选择对数据集里面的大多数字段进行聚合和排序操作。但是如果确定不需要在字段上进行排序和聚合,或从脚本中访问字段值,则可以禁用 doc values 来节省磁盘空间。

要禁用 Doc Values ,在字段的映射(mapping)设置 “doc_values”为“false”即可。例如,这里我们创建了一个新的索引,字段 “session_id” 禁用了 Doc Values:

curl -XPUT --tlsv1.2 --negotiate -k -v -u : "http://ip:httpport/myindex" -H 'Content-Type: application/json' -d'
{
"mappings": {
      "my_type": {
           "properties": {
                 "session_id": {
                         "type": "keyword",
                         "doc_values": false
                  }
            }
       }
    }
}'

禁用_source字段

“_source”字段包含在索引时传递的原始JSON文档正文。该“_source”字段本身不被索引(因此是不可搜索的),但它被存储,以便在执行撷取请求时可以返回,例如get或search。

虽然很方便,但是“_source”字段确实在索引中有不小的存储开销。因此,可以使用如下方式禁用:

curl -XPUT --tlsv1.2 --negotiate -k -v -u : 'https://ip:httpport/tweets?pretty' -H 'Content-Type: application/json' -d'
{
   "mappings": {
          "tweet": {
               "_source": {
                       "enabled": false
                 }
           }
     }
}'

说明: 在禁用_source 字段之前请注意:如果_source字段不可用,则不支持以下功能:

  • update,update_by_query,reindex APIs.
  • 高亮
  • 将索引从一个Elasticsearch索引reindex(重索引)到另一个索引的能力,以便更改映射或分析,或将索引升级到新的主要版本。
  • 通过查看索引时使用的原始文档来调试查询或聚合的能力。
  • 潜在的未来可能会自动修复索引损坏的能力。
作者 east
Elasticsearch 8月 31,2020

Elasticsearch查询性能调优

查询语句优化

查询语句优化的内容包括:查询范围,单次查询数量等。

  1. 根据实际业务需求去规划查询范围,查询越少的字段越快,过大的查询范围不仅会导致查询效率低,而且会使Elasticsearch集群资源耗费急剧增加,甚至可能造成集群崩溃。通过_source参数可以控制返回字段信息,尽量避免读取大字段;
  2. 单次查询数量限制是为了保证内存不会被查询内存大量占用,Elasticsearch默认的查询请求通常返回排序后的前10条记录,最多一次读取10000条记录。通过from和size参数控制读取记录范围,避免一次读取过多的记录。一次性查询大于10000条的数据,使用scroll查询,请参考3.7.6。

安全模式下查询示例:

curl -XGET --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/myindex-001/_search?pretty"  -H 'Content-Type: application/json' -d' 
{
  "from": 0,
  "size": 10,
  "_source": "age",
  "query": {
      "match": {
        "age": "56"
      }
  },
  "sort": [
    {
      "age": {
        "order": "asc"
      }
    }
  ]
}'

强制段合并(force merge)

每个shard是基于多个segment组成创建的,segment的个数的减少可以大幅的提高查询的速度,定时的进行手动索引段合并,可以提高查询速度。支持单索引和多索引批量操作。

单索引安全模式下示例:

curl -XPOST --tlsv1.2 --negotiate -k -v -u : 'https://ip:httpport/myindex-001/_forcemerge?only_expunge_deletes=false&max_num_segments=1&flush=true&pretty'

多索引安全模式下示例:

curl -XPOST --tlsv1.2 --negotiate -k -v -u : 'https://ip:httpport/myindex-001,myindex-002/_forcemerge?only_expunge_deletes=false&max_num_segments=1&flush=true&pretty'
curl -XPOST --tlsv1.2 --negotiate -k -v -u : 'https://ip:httpport/_all/_forcemerge?only_expunge_deletes=false&max_num_segments=1&flush=true&pretty'

说明:

max_num_segments:merge到多少个segments,1的意思是强行merge到1个segment;

only_expunge_deletes:只清理有deleted标记的segments,推荐值false;

flush:清理完执行一下flush,默认是true。

过滤查询(filter)

Elasticsearch的查询操作分为2种:查询(query)和过滤(filter),查询(query)默认会计算每个返回文档的得分,然后根据得分排序;而过滤(filter)只会筛选出符合的文档,并不计算得分,且可以缓存文档。

对于非全文检索的使用场景,如果不关心查询结果和查询条件的相关度,只是想查找目标数据,可以使用filter来提高查询效率。

query安全模式下查询示例:

curl -XGET --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/myindex-001/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "age": "56"
    }
  }
}'

filter安全模式下查询示例:

curl -XGET --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/myindex-001/_search?pretty" -H 'Content-Type: application/json' -d' {   "query": {     "bool": {       "filter": {          "match": {           "age": "56"         }       }     }   } }'

路由(routing)

Elasticsearch写入文档时,文档会通过一个公式路由到一个索引中的一个分片上。默认公式如下:

shard_num = hash(_routing) % num_primary_shards

_routing字段的取值,默认是_id字段,可以根据业务场景设置经常查询的字段作为路由字段。例如可以考虑将用户id、地区作为路由字段,查询时可以过滤不必要的分片,加快查询速度。

安全模式下写入时指定路由:

curl -XPUT --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/my_index/my_type/1?routing=user1&refresh=true" -H 'Content-Type: application/json' -d' 
{
  "title": "This is a document"
}'

安全模式下查询时不指定路由示例:

curl -XGET --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/my_index/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "title": "document"
    }
  }
}'

需要查询所有的分片,返回结果:

{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 0.2876821,
    "hits" : [
      {
        "_index" : "my_index",
        "_type" : "my_type",
        "_id" : "1",
        "_score" : 0.2876821,
        "_routing" : "user1",
        "_source" : {
          "title" : "This is a document"
        }
      }
    ]
  }
}

安全模式下查询时指定路由示例:

curl -XGET --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/my_index/_search?routing=user1&pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "title": "document"
    }
  }
}'

查询时只需要查询一个分片,查询结果:

{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 0.2876821,
    "hits" : [
      {
        "_index" : "my_index",
        "_type" : "my_type",
        "_id" : "1",
        "_score" : 0.2876821,
        "_routing" : "user1",
        "_source" : {
          "title" : "This is a document"
        }
      }
    ]
  }
}

游标查询(scroll)

Elasticsearch为了避免深分页,不允许使用分页(from&size)查询10000条以后的数据,需要使用游标(scroll)查询。

安全模式下scroll查询示例:

curl -XGET --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/myindex-001/_search?scroll=1m&pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "age": "36"
    }
  },
  "size":1000
}'

说明:

使用scroll查询,应该在初始搜索请求中指定scroll参数,这个参数告诉Elasticsearch保持游标窗口期多长时间。例如:scroll=1m,表示1分钟。

结果返回:

{
  "_scroll_id" : "DnF1ZXJ5VGhlbkZldGNoMgAAAAAAAABPFlFHZzExcFdnUWJDU0d5bU==",
  "took" : 55,
  "timed_out" : false,
  "_shards" : {
    "total" : 50,
    "successful" : 50,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 16692062,
    "max_score" : 0.0,
    "hits" : [...1000 data ]
  }
}

优化scroll:在一般场景下,scroll用来取得排序好的大量数据,但很多时候只需要返回数据,这时候可以对scroll进行优化。使用_doc去sort返回的结果不会有排序,此时执行效率最快。

安全模式下示例:

curl -XGET --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/myindex-001/_search?scroll=1m&pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "age": "36"
    }
  },
  "size":1000,
  "sort": "_doc"
}'

避免使用wildcard模糊匹配查询

Elasticsearch默认支持通过*?正则表达式来做模糊匹配,数据量级别达到TB+甚至更高之后,模糊匹配查询通常会耗时比较长,甚至可能导致内存溢出,卡死乃至崩溃宕机的情况。所以数据量大的情况下,不要使用模糊匹配查询。

安全模式下模糊匹配查询示例:

curl -XGET --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/myindex-001/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "wildcard" : {
	"name" : "*优" 
	}
  }
}'

聚合优化

大多时候对单个字段的聚合查询还是比较快的,但是当需要聚合多个字段时,就会产生大量的分组,最终结果就是占用Elasticsearch大量的内存,从而导致内存溢出的情况发生。尽量根据业务优化,减少聚合次数。

默认深度优化聚合改为广度优先聚合

添加设置:”collect_mode”: “breadth_first”。

depth_first:直接进行子聚合的计算。

breadth_first:先计算出当前聚合的结果,针对这个结果在对子聚合进行计算。

优化聚合执行方式

在每一层terms aggregation内部加一个 “execution_hint”: “map”。

添加设置:”execution_hint”: “map”。

  1. 查询结果直接放入到内存中构建map,在查询结果集小的场景下,速度极快;
  2. 但如果查询结果集合很大(百万-亿级别)的时候,传统聚合方式会比map方式快。

安全模式下聚合查询示例:

curl -XGET --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/myindex-001/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "size" : 0,
  "aggregations": {
    "count_age" : {
	"terms" : {
		   "field" : "age"
		} 
	}
  }
}'

安全模式下聚合优化后查询示例:

curl -XGET --tlsv1.2 --negotiate -k -v -u : "https://ip:httpport/myindex-001/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "size" : 0,
  "aggregations": {
    "count_age" : {
	"terms" : {
		   "field" : "age",
		   "execution_hint": "map",
		   "collect_mode": "breadth_first"
		} 
	}
  }
}'

配置EsClient角色(协调节点)

EsClient角色可以用于发送查询请求到其他节点,收集和合并结果,以及响应发出查询的客户端。通过配置EsClient角色可以加快查询运算速度,提升缓存命中数。

mappings优化

请确认mappings设置是否合理。

  • 对于只需要精确查询的字段,例如时间戳,应该设置为keyword。
  • 对需要进行全文检索的字段设置合理的分词器,不同的分词器查询效率相差较大。

超时参数

在对查询结果的精确度要求较低的场景下,如果低响应时间比搜索结果更重要,可以使用如下两个参数来提升查询性能:

  1. terminate_after:表示每个分片收集的文档的最大数量,一旦达到该数量,查询请求提前终止。
  2. timeout:表示每个分片上的查询超时时间,在请求超时之前,Elasticsearch将会返回已经成功从每个分片上获取的结果。 安全模式下使用示例 : curl -XGET –tlsv1.2 –negotiate -k -v -u : “https://ip:httpport/_search?pretty&timeout=10ms&terminate_after=10″
作者 east

上一 1 … 33 34 35 … 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (489)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.