gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Flink

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Flink"
  • ( 页面3 )
Flink 6月 11,2024

Flink ProcessFunction不同流异同及应用场景

ProcessFunction系列对比概览

函数类别关键特性应用场景示例
ProcessFunction基础类,处理单个事件,支持事件时间、水位线、状态管理、定时器。单独处理每个事件,执行复杂逻辑,如基于事件内容动态响应。
KeyedProcessFunction基于键的处理,每个键有自己的状态。支持事件时间、水位线、状态管理、定时器。按用户分组统计点击量,用户会话管理,状态跟踪。
CoProcessFunction处理两个数据流,独立处理来自两流的事件,支持事件时间、水位线、状态管理、定时器。实时融合交易流与价格流,实时计算订单总价;日志与用户信息流的匹配处理。
ProcessJoinFunction专为流连接设计,处理两个数据流,简化版的CoProcessFunction,不支持定时器。简单的流连接操作,如订单ID与用户信息的关联。
BroadcastProcessFunction处理普通流与广播流,广播流的每个元素发给所有普通流元素,适用于全局状态更新。实时规则更新,广播新的规则至所有交易验证逻辑。
KeyedBroadcastProcessFunction类似BroadcastProcessFunction,但作用于键控流,每个键控流元素接收广播流所有元素。每个用户个性化推荐算法更新,全局规则变化按用户分发。
ProcessWindowFunction在窗口聚合后处理窗口内所有元素,提供窗口上下文信息,如窗口开始/结束时间,适合窗口内复杂计算。计算每小时温度波动,统计窗口内中位数、分位数等。
ProcessAllWindowFunction处理全窗口数据,非键控,适用于全局操作,如计算整个数据流的汇总统计信息。计算整个数据流的总和或平均值,无需考虑分组。

异同点总结

  • 状态管理与事件时间:所有函数均支持事件时间和水位线处理,状态管理(除了ProcessJoinFunction),但Keyed系列额外支持键控状态。
  • 流处理:CoProcessFunction、ProcessJoinFunction处理多个流,而BroadcastProcessFunction和KeyedBroadcastProcessFunction支持广播状态传播。
  • 窗口处理:ProcessWindowFunction和ProcessAllWindowFunction专用于窗口处理,前者基于键控窗口,后者处理全窗口数据。
  • 灵活性:ProcessFunction和KeyedProcessFunction最为灵活,适用于广泛的复杂逻辑处理;ProcessWindowFunction在窗口上下文中提供了额外的处理能力。

1. ProcessFunction

概述:ProcessFunction是最基本的形式,它不依赖于任何键或窗口,为每个输入事件提供完全的控制权。它允许访问事件的时间戳和水位线信息,并提供了注册和处理定时器的能力。

应用场景:适合需要对每个事件进行独立、复杂处理的场景,如基于事件的复杂逻辑判断、状态更新或基于时间的操作。

示例:处理单个事件,根据事件的内容动态注册定时器,进行后续处理。

2. KeyedProcessFunction

概述:KeyedProcessFunction是对ProcessFunction的扩展,用于处理已经按照某个键(key)分组的数据流。它除了具备ProcessFunction的所有功能外,还可以访问键控状态,即每个键都有独立的状态。

应用场景:适用于需要基于键的聚合或状态管理的场景,如统计每个用户的点击次数、维持每个商品的库存状态等。

示例:统计每个用户的登录次数,同时在特定事件后发送通知。

3. CoProcessFunction

概述:用于处理两个数据流的连接操作,每个流可以有不同的类型。它允许独立地处理来自两个流的事件,并提供了注册定时器的功能。

应用场景:当需要根据两个不同的数据流进行联合处理时使用,例如在实时交易系统中,将订单流和价格流合并,实时计算订单的最新总价。

示例:实时融合两个数据源,比如订单流和用户流,根据订单ID匹配用户信息,进行个性化推荐。

4. ProcessJoinFunction

概述:专用于处理两个流的连接操作,但与CoProcessFunction相比,它更专注于流的连接逻辑,而不提供事件时间处理或定时器功能。

应用场景:适用于简单的流连接,当只需要对两个流进行匹配和简单的处理时使用。

示例:基于键匹配两个流的记录,如用户行为日志与用户详情表的关联查询。

5. BroadcastProcessFunction

概述:用于处理一个普通数据流和一个广播数据流。广播流的每个元素都会被发送给所有普通流的元素,适合实现广播状态模式。

应用场景:当需要将某些全局配置或规则广播给所有流的处理逻辑时,比如实时更新的黑名单列表应用于每一条交易验证。

示例:实时更新规则引擎,当规则发生变化时,广播新规则至所有交易流,进行动态规则匹配。

6. KeyedBroadcastProcessFunction

概述:类似于BroadcastProcessFunction,但作用于键控流上,每个键控流的元素会接收到广播流的所有元素,同时保持了键控状态。

应用场景:在需要根据键进行状态管理和同时应用全局更新的场景,如每个用户个性化推荐算法的更新。

示例:根据用户偏好动态调整推荐算法,当推荐算法模型更新时,广播更新至每个用户的推荐逻辑中。

7. ProcessWindowFunction

概述:在窗口聚合操作结束后,对窗口内所有元素进行进一步处理。提供了窗口上下文信息,如窗口的开始和结束时间,可以访问窗口内所有元素并执行复杂计算。

应用场景:当窗口聚合后还需要进行复杂的计算或转换时,如计算窗口内的中位数、分位数等。

示例:计算每个小时内的温度变化率,不仅统计平均温度,还计算温度的最大波动。

8. ProcessAllWindowFunction

概述:与ProcessWindowFunction类似,但处理的是非键控的全窗口,即所有输入数据被视为一个整体处理,常用于全局窗口。

应用场景:适用于需要在整个数据集上执行全局操作,而不考虑键的场景,如计算整个数据流的总体统计信息。

示例:计算整个数据流的总和或平均值,不考虑数据的分组。

作者 east
Flink 6月 11,2024

Flink 时间窗口在 IoT 项目中的应用实战

一、引言

在物联网(IoT)项目中,实时数据处理和分析至关重要。Apache Flink 作为一款高性能的流处理框架,提供了多种时间窗口机制,以支持复杂的时序数据处理需求。本文将通过实际案例,详细介绍 Flink 中的滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)在 IoT 项目中的应用。

二、时间窗口概述

在 Flink 中,时间窗口是一种处理时序数据的重要机制。它允许我们将数据按照时间范围进行分组,并对每个分组内的数据进行聚合。Flink 提供了三种主要的时间窗口:滚动窗口、滑动窗口和会话窗口。

  1. 滚动窗口(Tumbling Window):滚动窗口是一种固定大小、不重叠的时间窗口。它将数据流划分为一系列相等的时间段,并对每个时间段内的数据进行聚合。滚动窗口常用于计算每个时间段内的统计信息,如平均值、总和等。
  2. 滑动窗口(Sliding Window):滑动窗口是一种可以重叠的时间窗口。它允许我们指定一个滑动间隔,从而在每个滑动间隔内对数据进行聚合。滑动窗口常用于检测数据流中的趋势和周期性变化。
  3. 会话窗口(Session Window):会话窗口是一种基于数据活跃度的动态时间窗口。它将数据流中相邻的、活跃度较高的数据分组到一起,形成一个个会话。会话窗口常用于分析用户行为、设备连接状态等场景。

三、时间窗口在 IoT 项目中的应用

在 IoT 项目中,时间窗口的应用主要体现在以下几个方面:

  1. 实时监控和告警:通过滚动窗口或滑动窗口,可以实时计算设备的温度、湿度等指标的统计信息,并在异常情况下触发告警。
  2. 数据分析和预测:利用滑动窗口或会话窗口,可以对设备的历史数据进行分析,发现潜在的趋势和周期性变化,从而进行更精确的预测和优化。
  3. 用户行为分析:在智能家居等场景中,通过会话窗口分析用户的操作行为,可以更好地了解用户需求,提供个性化的服务。

四、实战案例分析

接下来,我们将通过三个实际的 IoT 项目案例,详细介绍如何在 Flink 中应用这三种时间窗口。

案例一:实时监控和告警

假设我们有一个 IoT 项目,需要实时监控工厂设备的温度数据,并在温度过高时触发告警。在这个项目中,我们可以使用滚动窗口来计算每个时间段内的平均温度,并设置阈值进行告警。

DataStream<TemperatureData> temperatureStream = ...; // 从设备读取温度数据
DataStream<Tuple2<Long, Double>> averagedTemperatures = temperatureStream
    .keyBy(data -> data.getDeviceId()) // 按设备ID分组
    .timeWindow(Time.minutes(1)) // 设置滚动窗口大小为1分钟
    .reduce((t1, t2) -> new TemperatureData(t1.getDeviceId(), (t1.getTemperature() + t2.getTemperature()) / 2)); // 计算平均温度

averagedTemperatures.addSink(new AlertSink()); // 添加告警接收器

案例二:数据分析和预测

假设我们有一个智能电网项目,需要分析电力消耗数据,预测未来的电力需求。在这个项目中,我们可以使用滑动窗口来计算每小时的电力消耗量,并基于历史数据进行预测。

DataStream<ElectricityData> electricityStream = ...; // 从电网读取电力消耗数据
DataStream<Tuple2<Long, Double>> hourlyConsumptions = electricityStream
    .keyBy(data -> data.getLocation()) // 按地点分组
    .timeWindow(Time.hours(1), Time.minutes(30)) // 设置滑动窗口大小为1小时,滑动间隔为30分钟
    .sum(0); // 计算每小时的总电力消耗量

hourlyConsumptions.addSink(new PredictionSink()); // 添加预测接收器

案例三:用户行为分析

假设我们有一个智能家居项目,需要分析用户的操作行为,以便提供个性化的服务。在这个项目中,我们可以使用会话窗口来分析用户在一定时间内的操作记录,识别用户的活跃度和偏好。

DataStream<UserAction> userActionStream = ...; // 从智能家居设备读取用户操作数据
DataStream<Tuple2<String, Integer>> userSessions = userActionStream
    .keyBy(action -> action.getUserId()) // 按用户ID分组
    .window(TumblingEventTimeWindows.of(Time.minutes(5))) // 设置会话窗口大小为5分钟
    .reduce(new CountReducer()); // 计算每个用户的操作次数

userSessions.addSink(new PersonalizedServiceSink()); // 添加个性化服务接收器

在Flink IoT项目中,时间窗口是处理和分析流数据的强大工具。滚动窗口适用于需要固定时间间隔统计的场景,滑动窗口适用于需要连续更新统计的场景,而会话窗口适用于需要检测活动会话的场景。每种窗口类型都有其特定的应用场景和优势,选择合适的窗口类型对于实现有效的流数据处理至关重要。

作者 east
Flink, 储能 6月 11,2024

RichSinkFunction 在 Flink IoT 项目中的应用实战

一、引言

随着物联网(IoT)技术的快速发展,实时数据处理和分析的需求日益增长。Apache Flink 作为一款高性能的流处理框架,广泛应用于 IoT 项目中。在 Flink 中,RichSinkFunction 是一种特殊的函数,它允许用户在数据流输出到外部系统之前,对数据进行进一步的转换和处理。本文将通过一个实际的 Flink IoT 项目案例,详细介绍 RichSinkFunction 的应用。

二、RichSinkFunction 概述

在 Flink 中,SinkFunction 是用于将数据流输出到外部系统的函数。与普通 SinkFunction 不同,RichSinkFunction 提供了更多的功能和灵活性。它允许用户访问 Flink 运行时的上下文信息,如状态管理、计时器和广播变量等。此外,RichSinkFunction 还可以处理异步 I/O 操作,提高数据输出的效率。

三、RichSinkFunction 的应用

在 IoT 项目中,RichSinkFunction 的应用主要体现在以下几个方面:

  1. 数据清洗和转换:在将数据输出到外部系统之前,可能需要对数据进行清洗、过滤和转换等操作。RichSinkFunction 可以方便地实现这些功能,提高数据质量。
  2. 异步输出:为了提高数据处理的效率,可以使用 RichSinkFunction 的异步输出功能。通过异步输出,可以将数据流的输出操作与 Flink 主线程分离,从而减少数据处理的延迟。
  3. 状态管理和计时器:在处理 IoT 数据时,可能需要根据历史数据或时间窗口内的数据进行决策。RichSinkFunction 可以利用 Flink 的状态管理和计时器功能,实现这些复杂的数据处理逻辑。

在物联网项目中,常见的数据输出需求包括:

  • 实时数据存储:将实时处理的传感器数据写入数据库,如MySQL、Cassandra或MongoDB,供后续查询分析。
  • 消息传递:将数据推送到消息队列如Kafka、RabbitMQ,用于数据集成或后续处理。
  • 持久化存储:将数据写入HDFS、S3等分布式文件系统,实现数据备份或离线分析。
  • 报警通知:根据实时数据触发警报,发送邮件、短信或推送通知。

实例应用:将Flink处理的IoT数据写入MySQL数据库

假设我们有一个物联网项目,需要实时收集来自智能设备的温度和湿度数据,并将处理后的数据实时插入到MySQL数据库中进行长期存储和分析。下面是使用RichSinkFunction实现这一需求的示例代码:

准备工作

  1. 依赖准备:确保项目中添加了Flink和MySQL驱动的依赖。
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>${mysql.connector.version}</version>
</dependency>
  1. 数据库表结构:假设我们已经创建了一个名为iot_data的表,用于存储温度和湿度数据。
Sql1CREATE TABLE iot_data (
2    device_id INT PRIMARY KEY,
3    temperature DOUBLE,
4    humidity DOUBLE,
5    timestamp TIMESTAMP
6);

RichSinkFunction实现

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class MySQLSink extends RichSinkFunction<TemperatureHumidityRecord> {

    private transient Connection connection;
    private final String url;
    private final String user;
    private final String password;

    public MySQLSink(String url, String user, String password) {
        this.url = url;
        this.user = user;
        this.password = password;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 初始化数据库连接
        Class.forName("com.mysql.jdbc.Driver");
        connection = DriverManager.getConnection(url, user, password);
    }

    @Override
    public void invoke(TemperatureHumidityRecord record, Context context) throws Exception {
        String sql = "INSERT INTO iot_data(device_id, temperature, humidity, timestamp) VALUES(?,?,?,?)";
        try (PreparedStatement statement = connection.prepareStatement(sql)) {
            statement.setInt(1, record.getDeviceId());
            statement.setDouble(2, record.getTemperature());
            statement.setDouble(3, record.getHumidity());
            statement.setTimestamp(4, new Timestamp(record.getTimestamp().getTime()));
            statement.executeUpdate();
        }
    }

    @Override
    public void close() throws Exception {
        if (connection != null) {
            connection.close();
        }
        super.close();
    }
}




应用集成

在Flink流处理作业中集成上述自定义sink:

public class IotDataStreamJob {
    public static void main(String[] args) throws Exception {
        // 设置Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 假设source为模拟的IoT数据流
        DataStreamSource<TemperatureHumidityRecord> source = env.addSource(new SimulatedIoTDataSource());

        // 定义转换逻辑,如过滤、聚合等

        // 将处理后的数据写入MySQL
        source.addSink(new MySQLSink("jdbc:mysql://localhost:3306/mydb", "username", "password"));

        // 启动任务
        env.execute("IoT Data to MySQL");
    }
}
Java1public class IotDataStreamJob {
2    public static void main(String[] args) throws Exception {
3        // 设置Flink环境
4        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
5
6        // 假设source为模拟的IoT数据流
7        DataStreamSource<TemperatureHumidityRecord> source = env.addSource(new SimulatedIoTDataSource());
8
9        // 定义转换逻辑,如过滤、聚合等
10
11        // 将处理后的数据写入MySQL
12        source.addSink(new MySQLSink("jdbc:mysql://localhost:3306/mydb", "username", "password"));
13
14        // 启动任务
15        env.execute("IoT Data to MySQL");
16    }
17}
作者 east
Flink 4月 28,2024

Flink ValueStateDescriptor使用实例

在Apache Flink中,ValueStateDescriptor 是用于定义状态的一种数据结构,它允许你为每个键(key)存储一个值(value)。状态是 Flink 流处理模型的核心概念之一,它允许你在任务失败和恢复时保持数据的一致性。

ValueStateDescriptor 的主要特性:

  1. 键控状态(Keyed State):
  • ValueStateDescriptor 是一种键控状态,意味着它总是与一个特定的键相关联。在 Flink 中,键控状态是根据键来分配和访问的,这意味着相同键的状态总是会被同一任务处理。
  1. 单值状态:
  • 与其他状态类型(如 ListState、MapState 等)不同,ValueStateDescriptor 只能存储一个值。这个值可以是任何类型,包括复杂的对象。
  1. 类型信息:
  • ValueStateDescriptor 需要一个类型信息参数,这个参数指定了存储在状态中的值的类型。这是为了序列化和反序列化状态时能够正确处理数据。
  1. 名称:
  • ValueStateDescriptor 需要一个字符串名称,这个名称用于在内部标识状态,并在作业的元数据中引用。

如何使用 ValueStateDescriptor:

  1. 创建状态描述符:
  • 使用 ValueStateDescriptor 的构造函数创建一个实例,需要提供状态名称和类型信息。
  1. 访问状态:
  • 在 Flink 的 RichFunction(如 RichMapFunction 或 RichFlatMapFunction)中,可以使用 RuntimeContext 来访问键控状态。
  1. 状态操作:
  • 可以通过 ValueState 对象来获取、更新或清空状态。
  1. 状态后端:
  • ValueStateDescriptor 需要与 Flink 的状态后端集成,状态后端负责实际的状态存储、检索和持久化。

示例代码:

public class MyMapper extends RichMapFunction<String, String> {
   private transient ValueState<String> state;
 
   @Override
   public void open(Configuration parameters) throws Exception {
       ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>("myState", String.class);
       state = getRuntimeContext().getState(descriptor);
   }
 
   @Override
   public String map(String value) throws Exception {
       String currentState = state.value();
       // 更新状态
       state.update("new value");
       return currentState;
   }
}




在上面的示例中,我们创建了一个 ValueStateDescriptor 来描述一个字符串类型的键控状态,并在 open 方法中获取了状态实例。在 map 方法中,我们通过 state.value() 来获取当前的状态值,并通过 state.update("new value") 来更新状态。

总之,ValueStateDescriptor 是 Flink 中定义和操作单值键控状态的关键组件,它简化了状态的管理,并确保了状态的一致性和容错性。

作者 east
Flink, Hive, Spark, 大数据开发 4月 20,2024

数仓开发LAG 和 LEAD 函数详细解析和用例

在做Iot大数据开发时,需要用到lag和lead函数来计算设备故障。下面详细解析lag和lead函数的作用和例子。

LAG 和 LEAD 函数是用于在 Spark SQL 中进行窗口函数操作时常用的两个函数,它们用于获取某一行在分组内的前一行或后一行的数值。下面详细解释它们的用法:

LAG 函数:

LAG 函数用于获取某一行在分组内的前一行的数值。其语法如下:

sqlCopy CodeLAG(column, offset, default) OVER (PARTITION BY partition_column ORDER BY order_column)
  • column: 要获取值的列。
  • offset: 指定要获取的偏移量,即前面第几行,默认为 1。
  • default: 当无法获取到前一行时的默认值,默认为 NULL。
  • PARTITION BY partition_column: 指定分组的列。
  • ORDER BY order_column: 指定排序的列。

LEAD 函数:

LEAD 函数用于获取某一行在分组内的后一行的数值。其语法如下:

sqlCopy CodeLEAD(column, offset, default) OVER (PARTITION BY partition_column ORDER BY order_column)
  • column: 要获取值的列。
  • offset: 指定要获取的偏移量,即后面第几行,默认为 1。
  • default: 当无法获取到后一行时的默认值,默认为 NULL。
  • PARTITION BY partition_column: 指定分组的列。
  • ORDER BY order_column: 指定排序的列。

示例:

假设有以下数据:

idvalue
110
220
330
440
550

我们可以使用 LAG 函数获取每一行的前一行值:

sqlCopy CodeSELECT id, value, LAG(value, 1) OVER (ORDER BY id) AS lag_value FROM table;

这将返回以下结果:

idvaluelag_value
110NULL
22010
33020
44030
55040

而使用 LEAD 函数则可以获取每一行的后一行值,以类似的方式进行操作。

作者 east
Flink, Spark 4月 11,2024

Spark的Master、Worker、Dirver和Executor,对比Flink的Jobmanager、Taskmanager、Slot异同

首先,我们来了解一下Spark和Flink的基本概念。Spark是一个快速、通用的大规模数据处理引擎,而Flink是一个流式和批处理的开源数据处理框架。它们都用于处理大量数据,但在架构和组件方面有所不同。接下来,我们将用通俗易懂的语言和比喻来解释它们的异同。

  1. Master vs Jobmanager

Spark中的Master负责管理整个集群的资源分配和任务调度。它就像一个公司的CEO,负责制定战略和协调各个部门的工作。而Flink中的Jobmanager也负责任务的调度和资源管理,但它更像是一个项目经理,负责具体项目的执行和监控。

  1. Worker vs Taskmanager

Spark中的Worker负责执行具体的任务,就像公司的员工,按照CEO的指示完成各自的工作。而Flink中的Taskmanager也负责执行任务,但它更像是一个团队,成员之间可以共享资源,协同完成任务。

  1. Driver vs Slot

Spark中的Driver负责协调任务的执行,收集结果并返回给客户端。它就像一个出租车司机,负责接送乘客(任务)到达目的地。而Flink中的Slot是Taskmanager的资源单元,可以理解为一台计算机的一个CPU核心。它就像一个工厂的机床,用于加工生产产品(任务)。

相同之处:

  1. 两者的Master/Jobmanager都负责任务的调度和资源管理。
  2. 两者的Worker/Taskmanager都负责执行具体的任务。
  3. Driver和Taskmanager:在执行用户程序时,都需要接收用户的代码并将其转换为可执行的任务。

不同之处:

  1. Spark的Driver是一个独立的进程,负责协调任务的执行;而Flink没有Driver的概念,任务直接在Taskmanager中执行。
  2. Flink的Slot是资源分配的单位,可以共享资源;而Spark中没有Slot的概念,资源分配是通过Master来实现的。
  3. Spark的Executor一旦启动就会占用固定的资源直到应用程序结束,而Flink的Taskmanager可以通过Slot动态地分配和释放资源,这使得Flink在资源利用上更加灵活。
  4. Flink中的Slot是一个独特的概念,它允许更细粒度的资源管理和任务并发。而Spark的Executor则是单一的执行单元,没有类似Slot的细分。
    Spark的Driver是运行在客户端机器上的一个进程,负责将用户程序转换为RDD(弹性分布式数据集)的操作序列,而Flink的Jobmanager则是负责整个作业的生命周期管理。
作者 east
Flink, Spark 3月 17,2024

Flink跟Spark Streaming的区别

  1. 架构模型:
    • Spark Streaming:基于 Spark 框架,其运行时主要角色包括 Master、Worker、Driver 和 Executor。Driver 负责创建和管理作业,Executor 则执行任务。
    • Flink:独立的实时处理引擎,主要包含 Jobmanager、Taskmanager 和 Slot。Jobmanager 负责作业的管理和调度,Taskmanager 执行具体的任务。
  2. 任务调度:
    • Spark Streaming:通过连续不断地生成微小的数据批次来处理数据。它构建有向无环图DAG,并依次创建 DStreamGraph、JobGenerator 和 JobScheduler。
    • Flink:根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager 进行处理。JobManager 根据 JobGraph 生成 ExecutionGraph,用于任务调度。
  3. 时间机制:
    • Spark Streaming:仅支持处理时间,即处理数据的时间。
    • Flink:支持处理时间、事件时间和注入时间的定义,并引入 watermark 机制来处理滞后数据。
  4. 容错机制:
    • Spark Streaming:可以设置 checkpoint 来恢复任务,但可能会导致重复处理,无法保证恰好一次处理语义。
    • Flink:使用两阶段提交协议来确保精确的一次处理语义,更好地处理容错。
  5. 数据模型:
    • Spark Streaming:基于 DStream(Discretized Stream)模型,将流数据看作是一系列微小批次的静态数据。
    • Flink:采用更灵活的 DataStream 模型,支持各种数据结构和操作。
  6. 应用场景:
    • Spark Streaming:适用于需要与现有 Spark 生态系统集成的场景,如批处理和交互式查询。
    • Flink:更专注于实时处理,提供更丰富的实时处理特性和更好的低延迟性能。
  7. 性能和扩展性:
    • Flink:在处理大流量和高并发场景时通常具有更好的性能和扩展性。
    • Spark Streaming:在某些情况下可能受到 Spark 核心框架的限制。

通过以上对比,我们可以看出Flink和Spark Streaming在架构模型、任务调度、时间机制和容错机制等方面存在显著差异。Flink作为一个基于事件驱动的实时处理引擎,具有更好的时间机制和容错机制,适用于对准确性要求较高的场景。而Spark Streaming作为一个基于微批的流处理引擎,具有较低的延迟和较高的吞吐量,适用于对性能要求较高的场景。在选择流处理框架时,应根据具体需求和场景选择合适的框架。

作者 east
Flink 9月 5,2023

Flink CDC对接数据报错:you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation Error code

这个错误消息表明在运行 Flink CDC 连接到 MySQL 数据库时,出现了权限问题。错误消息中提到需要 “REPLICATION SLAVE privilege” 权限来执行操作,但是当前用户似乎没有该权限。

错误原因:

  • Flink CDC 需要通过 MySQL 的二进制日志来捕获数据库的变更,以便进行实时流式处理。这需要 MySQL 用户具有 REPLICATION SLAVE 权限,以允许 Flink CDC 作为 MySQL 复制从机来读取二进制日志。

解决方案: 为了解决这个问题,您可以采取以下步骤:

  1. 授予 REPLICATION SLAVE 权限:
    • 通过 MySQL 的 root 或具有足够权限的用户登录。
    • 执行以下 SQL 命令,将 REPLICATION SLAVE 权限授予 Flink CDC 使用的用户名(在 Flink 配置中指定的用户名):sql复制代码GRANT REPLICATION SLAVE ON *.* TO 'your_cdc_user'@'%' IDENTIFIED BY 'your_password';
      • your_cdc_user 替换为 Flink CDC 使用的用户名。
      • your_password 替换为 Flink CDC 使用的密码。
  2. 重新启动 Flink CDC 应用:
    • 确保 Flink CDC 应用程序重新启动,并尝试重新连接到 MySQL 数据库。
  3. 检查 Flink CDC 配置:
    • 确保 Flink CDC 配置文件中的连接字符串、用户名和密码正确配置,以匹配 MySQL 数据库的设置。
  4. 检查防火墙和网络配置:
    • 确保 MySQL 数据库的防火墙和网络配置允许 Flink CDC 应用程序连接到数据库端口。
  5. 查看 MySQL 错误日志:
    • 检查 MySQL 错误日志以获取更多关于访问被拒绝的详细信息。可能会提供有关错误原因的更多线索。
  6. 升级或重新配置 Flink CDC:
    • 如果问题仍然存在,考虑升级 Flink CDC 或重新配置其版本,以确保与 MySQL 数据库兼容性。

通过执行上述步骤,您应该能够解决 Flink CDC 连接到 MySQL 数据库时出现的权限问题。确保授予足够的权限,并检查配置以确保准确性。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 22,2023

Flink SQL 配方:窗口 Top-N 和连续 Top-N

Flink SQL 已成为低代码流分析的标准,并设法统一批处理和流处理,同时保持 SQL 标准。此外,它还提供了一组丰富的实时数据分析高级功能。简而言之,Flink SQL 是两全其美的:它使您能够使用 SQL 处理流数据,但它还支持批处理。

Ververica Platform 使 Flink SQL 更易于跨团队访问和高效扩展。该平台附带了用于开发 SQL 脚本、管理用户定义函数 (UDF)、目录和连接器以及操作生成的长时间运行查询的附加工具。

我们已经看到了 Flink SQL 的许多用例,我们很高兴向您展示你可以用它构建什么。在本系列博文中,我们将探讨如何使用 Flink SQL 以多种方式处理数据。这篇文章将特别关注两个查询:Window Top-N 和 Continuous Top-N。

提示:访问我们的案例研究,探索其他人如何使用 Apache Flink。

什么是 Window Top-N 和 Continuous Top-N 查询?

Window Top-N 和 Continuous Top-N 是两种相似但略有不同的数据处理方式。在这两种情况下,我们都希望找到数据流中的前 N 项,但存在一些关键差异:

在 Window Top-N 中,我们在固定大小的窗口中处理数据。例如,我们可能希望每分钟找到前 10 个项目。

在 Continuous Top-N 中,我们连续处理数据。我们不使用窗口,而是在数据到达时对其进行处理。连续 Top-N 比窗口 Top-N 更难实现,但它有一些优点。例如,它可以更快地为我们提供结果,因为我们不必等待窗口关闭才能看到结果。

窗口 Top-N 和连续 Top-N 查询的常见用例

窗口 Top-N 和连续 Top-N 查询对于各种任务都很有用。例如,它们可以用于:

  • 欺诈检测:在金融交易流中,我们可能希望找到每分钟按金额排名前 10 的交易。它可以帮助我们识别可疑活动。
  • 用户交互流:在用户交互流中,我们可能希望找到正在查看或购买的前 10 件商品。它可以帮助我们向用户提出建议。
  • 异常检测:在传感器读数流中,我们可能希望找到读数最高的前 10 个传感器。它可以帮助我们识别监控中出现故障的传感器。
  • 日志消息流:在日志消息流中,我们可能希望按数量查找前 10 条日志消息。它可以帮助我们识别系统问题。

如何使用 Flink SQL 编写 Window Top-N 查询

首先我们来看看如何使用 Flink SQL 编写 Window Top-N 查询。我们将向您展示如何计算每个翻滚 5 分钟窗口中销售额最高的前 3 名供应商。

sql复制代码
CREATE TABLE orders (
  bidtime TIMESTAMP(3),
  price DOUBLE,
  item STRING,
  supplier STRING,
  WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECONDS
) WITH (
  'connector' = 'faker',
  'fields.bidtime.expression' = '#{date.past ''30'',''SECONDS''}',
  'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
  'fields.item.expression' = '#{Commerce.productName}',
  'fields.supplier.expression' = '#{regexify ''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
  'rows-per-second' = '100'
);

SELECT *
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
  FROM (
    SELECT window_start, window_end, supplier, SUM(price) as price, COUNT(*) as cnt
    FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(bidtime), INTERVAL '5' MINUTE))
    GROUP BY window_start, window_end, supplier
  )
) WHERE rownum <= 3;

源表(orders)由 thefaker 连接器支持,该连接器根据 Java Faker 表达式在内存中不断生成行。注意:此示例利用 Window Top-N 功能来显示排名前 3 的供应商每 5 分钟最高销售额。

如何使用 Flink SQL 编写连续 Top-N 查询

编写连续 Top-N 查询比编写 Window Top-N 查询更困难。这样做的原因是,在 Continuous Top-N 中,我们在数据到达时对其进行处理,而不是使用窗口。这个示例将带我们进入神奇的领域,因为流处理通常被外行认为是这样。然而,它实际上只是在数据流上执行的一组指令。

我们将展示如何使用 OVER window 和 ROW_NUMBER() 函数根据给定属性连续计算“Top-N”行。源表 (spells_cast) 由 thefaker 连接器支持,该连接器基于 Java Faker 在内存中连续生成行。

sql复制代码
CREATE TABLE spells_cast (
  wizard STRING,
  spell STRING
) WITH (
  'connector' = 'faker',
  'fields.wizard.expression' = '#{harry_potter.characters}',
  'fields.spell.expression' = '#{harry_potter.spells}'
);

SELECT wizard, spell, COUNT(*) AS times_cast
FROM spells_cast
GROUP BY wizard, spell;

此结果可以在 OVER 窗口中用于计算 Top-N。使用 wizard 列对行进行分区,然后根据施法次数(times_cast DESC)进行排序。内置函数 ROW_NUMBER() 根据排序顺序为每个分区的行分配序号。通过筛选序号小于等于 N 的行,我们可以获得每个巫师施法次数前 N 的法术。

以下是一个示例查询:

sql复制代码
SELECT wizard, spell, times_cast
FROM (
  SELECT wizard, spell, times_cast,
         ROW_NUMBER() OVER (PARTITION BY wizard ORDER BY times_cast DESC) AS row_num
  FROM (
    SELECT wizard, spell, COUNT(*) AS times_cast
    FROM spells_cast
    GROUP BY wizard, spell
  )
)
WHERE row_num <= 3;

在此查询中,我们首先计算每个巫师施法次数的统计信息。然后,我们在内部查询中使用 ROW_NUMBER() 函数对每个巫师的法术按照施法次数进行降序排列,为每个法术分配行号。最后,在外部查询中,我们筛选出行号小于等于 3 的记录,以获取每个巫师施法次数前 3 的法术。

这就是如何使用 Flink SQL 编写连续 Top-N 查询的方式。通过以上方法,您可以处理实时数据流并获取持续更新的 Top-N 数据。

请注意,上述示例中的 SQL 查询是根据您提供的上下文进行的翻译和还原,可能会因为特定上下文的变化而略有不同。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 21,2023

磁盘对 Flink 中 RocksDB 状态后端的影响:案例研究

RocksDB 在 Flink 中的性能问题分析

正如最近的博客文章所述,RocksDB 是 Flink 中的一个状态后端,它允许作业的状态大于可用内存量,因为状态后端可以将状态溢出到本地磁盘。这意味着磁盘性能可能会对使用 RocksDB 的 Flink 作业的性能产生影响。通过一个案例研究,这篇博文说明了使用 RocksDB 的 Flink 作业的吞吐量下降问题,并演示了我们如何将底层磁盘的性能确定为根本原因。

背景和问题描述

我们正在处理一个典型的物联网 (IoT) 作业,该作业处理从数百万台设备发出的事件流。每个事件都包含设备标识符 (ID)、事件类型以及事件生成时的时间戳。该作业根据设备 ID 对流进行分区,并在状态中存储从每个事件类型到接收到该类型事件时的最新时间戳的映射。事件类型可能有数百种。对于每个传入事件,作业需要从接收事件类型的状态读取时间戳,并将其与传入事件进行比较。如果传入的时间戳较新,它会更新状态中存储的时间戳。

该作业在使用官方 AWS 命令​​行工具 eksctl 创建的 Amazon Elastic Kubernetes Service (EKS) 集群上运行,并具有所有默认设置。 Flink TaskManager 分配有 1.5 个 CPU 核和 4 GB 内存。该作业使用 RocksDB 状态后端,该后端配置为使用 Flink 的托管内存。state.backend.rocksdb.localdir 配置选项未显式设置,因此默认情况下底层 EC2 实例根卷上的 /tmp 目录用于 RocksDB 运行状态(即工作状态)。

吞吐量下降问题的观察

这篇博文指出,该作业最初在 EKS 上运行良好。但一段时间后(几小时或几天,具体取决于传入事件)作业吞吐量突然大幅下降。该下降可以很容易地再现。吞吐量指标图表显示,在某一天的 23:50 后不久,从每秒超过 10k 个事件下降到每秒几百个事件。此外,使用保存点停止作业然后从中恢复并没有帮助:重启后作业吞吐量仍然很低。尽管当作业从空状态重新启动时恢复了高吞吐量,但这不是一个选择,因为(1)作业状态会丢失,(2)作业吞吐量会在较短的时间后再次下降。

性能问题的定位

通过检查CPU指标,我们发现当吞吐量下降时,TaskManager 容器的 CPU 利用率也会降低。由于TaskManager容器可能会使用更多的CPU资源,因此CPU使用率的减少在这里只是一个症状。TaskManager容器的内存使用率在吞吐量下降之前很长时间就达到了分配限制,并且在 23:50 左右没有明显变化。

为了进一步调查性能问题,我们启用了 TaskManager 的 JMX 监控,并使用 VisualVM 进行 CPU 采样。结果显示,93% 的 CPU 时间都被 threadUpdateState 消耗了,这是运行 operatorUpdateState 的线程,该线程读取并更新 RocksDB 中的状态。几乎所有的CPU时间都被本机方法 org.rocksdb.RocksDB.get() 占用。这表明作业在从 RocksDB 读取状态时遇到了瓶颈。

磁盘性能分析

为了深入了解 RocksDB 的性能问题,我们启用了 Flink RocksDB 指标。块缓存是在内存中缓存数据以供读取的地方。块缓存在作业启动后的前几分钟内迅速被填满,主要是状态条目。然而,这并不能完全解释在 23:50 左右吞吐量下降的原因。

我们继续检查根卷的磁盘指标。读取吞吐量下降至每秒约 230 次,写入吞吐量也出现类似的下降。检查磁盘每秒输入/输出操作数 (IOPS) 容量,我们发现默认情况下,使用 eksctl 创建的 EKS 集群中的每个 EC2 实例都是 am5.large 实例,并带有一个通用 (gp2) 弹性块存储 (EBS) 根卷。根卷的大小为 80GB,提供 240 IOPS 的基准速率。这表明作业在磁盘 IO 上遇到了瓶颈。一开始能够实现更高 IOPS 的原因是 AWS 为每个 gp2 卷提供了初始 I/O 信用来维持突发 IO 请求。然而,初始 I/O 积分耗尽后,问题就出现了。

解决方案

为了解决性能问题,我们建议附加具有高 IOPS 率的专用卷,如 gp3 或 io1/io2 卷,并将 Flink 配置 state.backend.rocksdb.localdir 设置为该卷上的目录。需要注意的是,RocksDB 本机指标在默认情况下处于禁用状态,因为它们可能会对作业性能产生负面影响。但是在你面临性能问题时,启用这些指标可以帮助你更好地了解 RocksDB 的内部行为,以便更好地诊断和优化问题。

要实施这个解决方案,你可以按照以下步骤进行操作:

  1. 创建高性能的磁盘卷:
    • 使用 AWS 控制台或 AWS 命令行工具创建一个 gp3 或 io1/io2 卷,它提供足够的 IOPS 来支持你的作业需求。你可以根据作业的负载情况来选择适当的磁盘类型和大小。
  2. 将 RocksDB 目录配置到新卷上:
    • 在 Flink 配置中,将 state.backend.rocksdb.localdir 配置选项设置为新创建的高性能卷的挂载路径。这将使 RocksDB 在新卷上运行,并获得更高的磁盘性能。
  3. 启用 RocksDB 指标(可选):
    • 如果你想深入了解 RocksDB 的性能状况,你可以在 Flink 配置中启用 RocksDB 本机指标。这些指标将提供更多关于 RocksDB 内部运行情况的信息,帮助你更好地监视和优化作业。
  4. 重新部署作业:
    • 在进行了上述更改后,重新部署你的 Flink 作业。确保作业配置正确地指向新的 RocksDB 目录,并验证作业在新磁盘上运行。
  5. 监控和调整:
    • 监控你的作业性能,特别是 CPU 利用率、磁盘 IOPS 和延迟等指标。根据观察到的情况,你可能需要调整作业配置、磁盘类型或作业规模来进一步优化性能。

总之,通过将 RocksDB 目录配置到高性能的磁盘卷上,你可以显著改善 Flink 作业的性能,并避免在处理大量数据时出现吞吐量下降的问题。同时,启用 RocksDB 指标可以让你更深入地了解 RocksDB 在作业中的行为,从而更好地优化和监控作业性能。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 21,2023

Flink SQL 连接 – 第 1 部分

Flink SQL 已成为低代码数据分析的事实标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。由于 Flink SQL 始终遵循 ANSI-SQL 2011 标准,因此所有功能都来自兼容的数据库应该管用。这包括内部联接和外部联接,以及 SQL 标准中描述的所有其他联接类型。

常规连接、间隔连接和查找连接

在这个由三部分组成的博客文章系列中,我们将向您展示 Flink SQL 中不同类型的联接以及如何使用它们以各种方式处理数据。这篇文章将重点介绍常规连接、间隔连接和查找连接。

常规连接

常规连接在 SQL 中用于组合两个或多个表中的数据。使用联接时,您可以指定每个表中要用于创建新表的列。您还可以使用联接来创建包含多个表中的数据的单个表。例如,如果您有一个包含客户信息的表和另一个包含订单信息的表,则可以使用联接创建一个同时包含客户信息和订单信息的表。

sql复制代码
CREATE TABLE NOC (
    agent_id STRING,
    codename STRING
);

CREATE TABLE RealNames (
    agent_id STRING,
    name STRING
);

SELECT
    name,
    codename
FROM NOC
INNER JOIN RealNames ON NOC.agent_id = RealNames.agent_id;

间隔连接

间隔连接用于比较相隔一定时间的两组数据。每组数据被分为多个区间,每个区间由开始时间和结束时间定义。间隔连接在处理具有时间上下文的事件时非常有用。例如,您可以将销售数据按小时间隔与客户数据按天间隔连接起来。

sql复制代码
CREATE TABLE orders (
    id INT,
    order_time TIMESTAMP
);

CREATE TABLE shipments (
    id INT,
    order_id INT,
    shipment_time TIMESTAMP
);

SELECT
    o.id AS order_id,
    o.order_time,
    s.shipment_time,
    TIMESTAMPDIFF(DAY, o.order_time, s.shipment_time) AS day_diff
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;

查找连接

查找连接用于在公共键上连接两个数据集,其中一个数据集是静态的,不会随时间变化。通过查找连接,您可以在流数据中丰富外部参考数据表中的信息。这对于实时数据分析非常有用。

sql复制代码
CREATE TABLE subscriptions (
    id STRING,
    user_id INT,
    type STRING,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    payment_expiration TIMESTAMP,
    proc_time AS PROCTIME()
);

CREATE TABLE users (
    user_id INT PRIMARY KEY,
    username VARCHAR(255) NOT NULL,
    age INT NOT NULL
);

SELECT
    s.id AS subscription_id,
    s.type AS subscription_type,
    u.age,
    CASE WHEN u.age < 18 THEN 1 ELSE 0 END AS is_minor
FROM subscriptions s
JOIN users FOR SYSTEM_TIME AS OF s.proc_time AS u ON s.user_id = u.user_id;

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 21,2023

Flink SQL:使用 MATCH_RECOGNIZE 检测模式

Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 是两全其美的:它使您能够使用 SQL 处理流数据,但它还支持批处理。

Ververica Platform 使 Flink SQL 更易于跨团队访问和高效扩展。该平台附带了额外的工具,用于开发 SQL 脚本、管理用户定义函数 (UDF)、目录和连接器,以及操作生成的长时间运行的查询。

我们已经看到 Flink SQL 有很多用例,我们很兴奋看看您将用它构建什么。在这篇博文中,我们将向您展示 MATCH_RECOGNIZE 函数可以做什么。

什么是 MATCH_RECOGNIZE?

MATCH_RECOGNIZE 是 SQL 标准中的一个子句,允许您检测数据中的模式。它类似于许多编程语言中的正则表达式功能。MATCH_RECOGNIZE 允许您:

  • 定义模式
  • 根据这些模式匹配数据
  • 提取与模式匹配的数据部分
  • 对与模式匹配的数据执行操作

例如,您可以使用 MATCH_RECOGNIZE 查找所有表中代表股票价格趋势的行。然后,您可以提取与模式匹配的数据并对其执行进一步分析。

SQL 日常工作中的一个常见(但历史上很复杂)任务是识别数据集中有意义的事件序列 – 也称为复杂事件处理(CEP)。在处理流数据时,这一点变得更加重要,因为您希望对已知模式或不断变化的趋势做出快速反应,以提供最新的业务洞察。在 Flink SQL 中,您可以使用标准 SQL 子句 MATCH_RECOGNIZE 轻松执行此类任务。

如何使用 MATCH_RECOGNIZE 的示例

在此示例中,您将使用 Flink SQL 和 MATCH_RECOGNIZE 来查找从高级级别之一降级其服务订阅的用户( type IN (‘premium’,’platinum’)) 到基本层。完整的 Flink SQL 查询源表(订阅)由 thefaker 连接器支持,它基于 Java Faker 表达式在内存中不断生成行。

 
CREATE TABLE 订阅 (
    ID STRING,
    user ID INT,
    输入 STRING,
    开始日期 TIMESTAMP(3),
    结束日期 TIMESTAMP(3),
    payment_expiration TIMESTAMP(3),
    proc_time AS PROCTIME()
)
WITH (
    'connector' = 'faker',
    'fields.id.expression' = '#{Internet.uuid}',
    'fields.user_id.expression' = '#{number.numberBetween ''1'',''50''}',
    'fields.type.expression' = '#{regexify ''(basic|premium|platinum){1}''}',
    'fields.start_date.expression' = '#{date.past ''30'',''DAYS''}',
    'fields.end_date.expression' = '#{date.future ''15'',''DAYS''}',
    'fields.payment_expiration.expression' = '#{date.future ''365'',''DAYS''}'
);

SELECT *
FROM 订阅
MATCH_RECOGNIZE (
    PARTITION BY user ID
    ORDER BY proc_time
    MEASURES
        LAST(PREMIUM.type) AS premium_type,
        AVG(TIMESTAMPDIFF(DAY, PREMIUM.start_date, PREMIUM.end_date)) AS premium_avg_duration,
        BASIC.start_date AS downgrade_date
    AFTER MATCH SKIP TO LAST ROW
    PATTERN (PREMIUM+ BASIC)
    DEFINE
        PREMIUM AS PREMIUM.type IN ('premium', 'platinum'),
        BASIC AS BASIC.type = 'basic'
) AS MR;

MATCH_RECOGNIZE 的输入参数将是基于订阅的行模式表。第一步,必须对输入行模式表应用逻辑分区和排序,以确保事件处理正确且具有确定性:

 
PARTITION BY user ID
ORDER BY proc_time

然后在 MEASURES 子句中定义 ORDER BY proc_timeOutputRow 模式列,可以将其视为 MATCH_RECOGNIZE 的 SELECT。如果您有兴趣获取与降级之前的最后一个事件关联的高级订阅类型,可以使用逻辑偏移运算符 LAST 获取它。降级日期可以从任何现有高级订阅事件之后的第一个基本订阅事件的开始日期推断出来。AFTER MATCH SKIP 子句指定在非空之后模式匹配恢复的位置找到 y 匹配项。

sql复制代码
AFTER MATCH SKIP TO LAST ROW

模式定义模式在 PATTERN 子句中使用行模式变量(即事件类型)和正则表达式来指定。这些变量还必须使用 DEFINE 子句与事件必须满足才能包含在模式中的匹配条件相关联。在这里,您有兴趣匹配一个或多个高级订阅事件 (PREMIUM+),后跟基本订阅事件 (BASIC)。您可以使用正则表达式语法来定义模式的结构。在这个例子中,PREMIUM+ 表示一个或多个连续的高级订阅事件,而 BASIC 表示一个基本订阅事件。

在 MEASURES 子句中,您定义了要在匹配后提取的数据。在这个查询中,您从匹配的最后一个 PREMIUM 事件中提取高级订阅类型(premium_type),从所有匹配的 PREMIUM 事件中计算平均持续时间(premium_avg_duration),以及降级的日期(downgrade_date)。

最终的查询将返回按用户 ID 分区的每个用户的匹配结果。这些结果显示了每个用户从高级订阅降级到基本订阅的情况,以及有关此过程的相关信息。

 

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east

上一 1 2 3 4 … 7 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (491)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.