Skip to content

测试

译者:flink.sojb.cn

本页简要讨论如何在IDE或本地环境中测试Flink应用程序。

单元测试

通常,可以假设Flink在用户定义之外产生正确的结果Function。因此,建议Function尽可能使用单元测试来测试包含主业务逻辑的类。

例如,如果实现以下内容ReduceFunction

public class SumReduce implements ReduceFunction<Long> {

    @Override
    public Long reduce(Long value1, Long value2) throws Exception {
        return value1 + value2;
    }
}
class SumReduce extends ReduceFunction[Long] {

    override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {
        value1 + value2
    }
}

通过传递合适的参数并验证输出,可以很容易地使用您喜欢的框架对其进行单元测试:

public class SumReduceTest {

    @Test
    public void testSum() throws Exception {
        // instantiate your function
        SumReduce sumReduce = new SumReduce();

        // call the methods that you have implemented
        assertEquals(42L, sumReduce.reduce(40L, 2L));
    }
}
class SumReduceTest extends FlatSpec with Matchers {

    "SumReduce" should "add values" in {
        // instantiate your function
        val sumReduce: SumReduce = new SumReduce()

        // call the methods that you have implemented
        sumReduce.reduce(40L, 2L) should be (42L)
    }
}

集成测试

为了端到端测试Flink流管道,您还可以编写针对本地Flink迷你集群执行的集成测试。

为此,添加测试依赖项flink-test-utils

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils_2.11</artifactId>
  <version>1.7-SNAPSHOT</version>
</dependency>

例如,如果要测试以下内容MapFunction

public class MultiplyByTwo implements MapFunction<Long, Long> {

    @Override
    public Long map(Long value) throws Exception {
        return value * 2;
    }
}
class MultiplyByTwo extends MapFunction[Long, Long] {

    override def map(value: Long): Long = {
        value * 2
    }
}

您可以编写以下集成测试:

public class ExampleIntegrationTest extends AbstractTestBase {

    @Test
    public void testMultiply() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(1);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements(1L, 21L, 22L)
                .map(new MultiplyByTwo())
                .addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
    }

    // create a testing sink
    private static class CollectSink implements SinkFunction<Long> {

        // must be static
        public static final List<Long> values = new ArrayList<>();

        @Override
        public synchronized void invoke(Long value) throws Exception {
            values.add(value);
        }
    }
}
class ExampleIntegrationTest extends AbstractTestBase {

    @Test
    def testMultiply(): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        // configure your test environment
        env.setParallelism(1)

        // values are collected in a static variable
        CollectSink.values.clear()

        // create a stream of custom elements and apply transformations
        env
            .fromElements(1L, 21L, 22L)
            .map(new MultiplyByTwo())
            .addSink(new CollectSink())

        // execute
        env.execute()

        // verify your results
        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
    }
}    

// create a testing sink class CollectSink extends SinkFunction[Long] {

    override def invoke(value: java.lang.Long): Unit = {
        synchronized {
            values.add(value)
        }
    }
}

object CollectSink {

    // must be static
    val values: List[Long] = new ArrayList()
}

CollectSink此处使用静态变量in ,因为Flink在将所有 算子分布到集群之前将其序列化。通过静态变量与本地Flink迷你集群实例化的算子进行通信是解决此问题的一种方法。或者,您可以使用测试接收器将数据写入临时目录中的文件。您还可以实现自己的自定义源以发出水印。

测试检查点和状态处理

测试状态处理的一种方法是在集成测试中启用检查点。

您可以通过StreamExecutionEnvironment在测试中配置来完成此 算子操作:

env.enableCheckpointing(500);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
env.enableCheckpointing(500)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))

例如,向Flink应用程序添加一个身份映射器 算子,该 算子将每次抛出一次异常1000ms。但是,由于动作之间存在时间依赖关系,因此编写此类测试可能会非常棘手。

另一种方法是写使用Flink内部测试效用一个单元测试AbstractStreamOperatorTestHarnessflink-streaming-java模块。

对于如何做到这一点,请看看在一个例子org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest中也flink-streaming-java模块。

请注意,AbstractStreamOperatorTestHarness目前它不是公共API的一部分,可能会有所变化。



回到顶部