Flink 的测试工具揭秘

使用 Flink 的测试工具测试用户定义函数 (UDF)

使用 Apache Flink 时,开发人员在测试利用状态和计时器的用户定义函数 (UDF) 时经常面临挑战。在本文中,我们将回答一个问题“如何使用 Flink 的测试工具来测试用户定义函数 (UDF)”。

使用 Flink 的测试工具测试用户定义函数 (UDF)

测试使用 Flink 状态和计时器的用户定义函数 (UDF) 可能会很复杂,尤其是在使用诸如 KeyedProcessFunction 之类的函数时。 Flink 包含一组专门为简化此任务而设计的测试工具。这些测试工具是在 Flink 1.15 中引入的,并且被认为是实验性的。

测试工具的配置

要使用 Flink 的测试工具,您需要添加一些依赖项到您的项目。要测试 DataStream 作业,您可以在 Maven 项目的 pom.xml 的依赖项块中添加以下内容:

XML
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.17.0</version>
    <scope>test</scope>
</dependency>

要测试 Table/SQL 作业,除了前面提到的 flink-test-utils 之外,您还可以在 Maven 项目的 pom.xml 的依赖项块中添加以下内容:

XML
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-test-utils</artifactId>
    <version>1.17.0</version>
    <scope>test</scope>
</dependency>

通过运算符测试用户函数

通过运算符测试用户函数可能是一种有用的方法,因为它允许您测试函数的功能。需要注意的是,Flink 只为算子提供测试工具,因此您需要手动实例化适当的算子才能使用这些测试工具。这意味着您需要创建运算符的实例并使用必要的输入和参数对其进行配置。完成此操作后,您可以使用测试工具来验证用户功能的正确性。值得注意的是,这种方法可能比使用其他测试方法更复杂,因为它需要您手动设置操作员并使用适当的输入对其进行配置。

Java
@Test
public void testMyProcessFunction() throws Exception {
    KeyedProcessOperator<String, String, String> operator =
        new KeyedProcessOperator<>(new MyKeyedProcessFunction());

    // 设置测试工具
    // 推送数据
    // 验证结果
}

测试计时器行为

在 Flink 中测试用户函数时,必须测试其行为的各个方面。测试的一件常见事情是计时器的创建和触发。您可以使用 TestHarness 通过操作员发送测试数据并验证是否创建了计时器。然后,您可以推进水印并验证计时器是否已触发。除了测试计时器之外,您可能还想测试处理元素是否会创建状态并验证此处理的结果。假设我们希望在 20 毫秒内为我们的测试目标触发计时器。下面的示例代码演示了如何使用测试工具对其进行测试。

Java
@Test
public void testTimelyOperator() throws Exception {
    // 设置初始条件
    testHarness.processWatermark(0L);
    assertThat(testHarness.numEventTimeTimers(), is(0));

    // 发送一些数据
    testHarness.processElement(3L, 100L);

    // 验证定时器
    assertThat(testHarness.numEventTimeTimers(), is(1));

    // 将时间提前到 20 毫秒以触发计时器。
    testHarness.processWatermark(20);
    assertThat(testHarness.numEventTimeTimers(), is(0));

    // 验证结果
    assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
    assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0));

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627