Skip to content

API 迁移指南

TypeSerializer 的变化

这部分主要与实现TypeSerializer接口来自定义序列化的用户有关。

原来的 TypeSerializerConfigSnapshot 抽象接口被弃用了, 并且将在将来完全删除,取而代之的是新的 TypeSerializerSnapshot. 详情请参考 Migrating from deprecated serializer snapshot APIs before Flink 1.7

自Flink 1.2以来,有一些API已被更改。大多数更改都记录在其特定文档中。以下是API更改的综合列表以及升级到Flink 1.3时迁移详细信息的链接。

TypeSerializer 接口变化

这主要适用于自定义 TypeSerializer接口的用户

从Flink 1.3开始,添加了两个与保存点恢复的串行器兼容性相关的其他方法。 有关如何实现这些方法的更多详细信息,请参阅 序列化升级和兼容性

ProcessFunctionRichFunction

从Flink 1.2, ProcessFunction 引入,并有了多种实现例如 RichProcessFunction 。 从Flink 1.3,开始RichProcessFunction 被移除了, 现在 ProcessFunction 始终是 RichFunction 并且可以访问运行时上下文。

Flink 1.3中的CEP库新增了许多新函数,请参阅 CEP迁移文档

Flink1.3以后,用户可以选用自己期望的日志框架了,Flink移除了日志记录框架的依赖。

实例和快速入门的demo已经指定了日志记录器,不会有问题,其他项目,请确保添加日志依赖,比如Maven的 pom.xml,中需要增加

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.7</version>
</dependency>

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

正如 状态文档中所说,Flink 有两种状态: keyednon-keyed 状态 (也被称作 operator 状态). 这两种类型都可用于 算子和用户定义的函数。文档将指导您完成从Flink 1.1函数代码迁移到Flink 1.2的过程,并介绍Flink 1.2中引入的一些重要内部更改,这些改变涉及到Flink 1.1中对齐窗口操作的弃用。 (请参阅 时间对齐窗口算子Aligned Processing Time Window Operators).

迁移有两个目标:

  1. 引入Flink1.2中引入的新函数,比如自适应(rescaling)

  2. 确保新Flink 1.2作业能够从Flink 1.1的保存点恢复执行

按照本指南操作可以把正在运行的Flink1.1作业迁移到Flink1.2中。前提需要在Flink1.1中使用 保存点 并把保存点作为Flink1.2作业的起点。这样,Flink1.2就可以从之前中断的位置恢复执行了。

用户函数示例

本文档其余部分使用 CountMapperBufferingSink 函数作为示例。第一个函数是 keyed 状态,第二个不是 s non-keyed 状态,Flink 1.1中上述两个函数的代码如下:

public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private transient ValueState<Integer> counter;

    private final int numberElements;

    public CountMapper(int numberElements) {
        this.numberElements = numberElements;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        counter = getRuntimeContext().getState(
            new ValueStateDescriptor<>("counter", Integer.class, 0));
    }

    @Override
    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        int count = counter.value() + 1;
        counter.update(count);

        if (count % numberElements == 0) {
            out.collect(Tuple2.of(value.f0, count));
            counter.update(0); // reset to 0
        }
    }
}

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
    Checkpointed<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private ArrayList<Tuple2<String, Integer>> bufferedElements;

    BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public ArrayList<Tuple2<String, Integer>> snapshotState(
        long checkpointId, long checkpointTimestamp) throws Exception {
        return bufferedElements;
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        bufferedElements.addAll(state);
    }
}

CountMapper 是一个按表格分组输入(word, 1)RichFlatMapFunction , 函数为每个传入的key保存一个计数器 (ValueState&lt;Integer&gt; counter) 并且 ,如果某个单词的出现次数超过用户提供的阈值,则会发出一个包含单词本身和出现次数的元组。

BufferingSink 是一个 SinkFunction 接收方 ( CountMapper可能的输出) ,直到达到用户定义的最终状态之前会一直缓存数据,这可以避免频繁的对数据库或者是存储系统的操作,通常这些操作都是比较耗时或开销比较大的。为了以容错方式进行缓冲,缓冲数据元保存在列表(bufferedElements) 列表会被定期被检查点保存。

状态 API 迁移

要使用Flink 1.2的新函数,应修改上面的代码来完成新的状态抽象。完成这些更改后,就可以实现作业的并行度的修改(向上或向下扩展),并确保新版本的作业将从之前作业停止的位置开始执行。

Keyed State: 需要注意的是,如果代码中只有keyed state,那么Flink1.1的代码也适用于1.2版本,并且完全支持新函数和向下兼容。可以仅针代码格式进行更改,但这只是风格/习惯问题。

综上所述,本章我们重点阐述 non-keyed state的迁移

自适应和新状态抽象

第一个修改是 Checkpointed&lt;T extends Serializable&gt; 接口有了新的实现。在 Flink 1.2中,有状态函数可以实现更通用的 CheckpointedFunction 接口或 ListCheckpointed&lt;T extends Serializable&gt; 接口 ,和之前版本的 Checkpointed 类似。

在这两种情况中,非键合状态预期是一个 可序列化List ,对象彼此独立,这样可以在自适应的时候重新分配,意味着, 这些对象是可以重新分区非被Keys化状态的最细粒度。例如,如果并行度为1的 BufferingSink(test1, 2)(test2, 2)两个数据,当并行度增加到2时, (test1, 2) 可能在task 0中,而 (test2, 2) 可能在task 1中。

更详细的信息可以参阅状态文档.

ListCheckpointed

ListCheckpointed 接口需要实现两个方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

它的语义和之前的 Checkpointed 接口类似,唯一区别是,现在 snapshotState() 返回的是检查点对象列表, 如前所述, restoreState 必须在恢复的时候,处理这个列表。如果状态不是重新分区,可以随时返回 Collections.singletonList(MY_STATE)snapshotState()。 更新的代码 BufferingSink 如下:

public class BufferingSinkListCheckpointed implements
        SinkFunction<Tuple2<String, Integer>>,
        ListCheckpointed<Tuple2<String, Integer>>,
        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSinkListCheckpointed(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        this.bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public List<Tuple2<String, Integer>> snapshotState(
            long checkpointId, long timestamp) throws Exception {
        return this.bufferedElements;
    }

    @Override
    public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
        if (!state.isEmpty()) {
            this.bufferedElements.addAll(state);
        }
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        // this is from the CheckpointedRestoring interface.
        this.bufferedElements.addAll(state);
    }
}

更新后的函数也实现了 CheckpointedRestoring 接口。这是出于向后兼容性原因,更多细节将在本节末尾解释。

CheckpointedFunction

CheckpointedFunction 接口也需要实现这两个方法。

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

在Flink 1.1中, 检查点执行会调用snapshotState() 方法,但是现在当用户每次初始化自定义函数时,会调用 initializeState() (对应 restoreState()) ,而不是在恢复的情况下调用,鉴于此, initializeState() 不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑。实现了 CheckpointedFunction 接口的 BufferingSink 代码如下所示

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        checkpointedState = context.getOperatorStateStore().
            getSerializableListState("buffered-elements");

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        // this is from the CheckpointedRestoring interface.
        this.bufferedElements.addAll(state);
    }
}

initializeState 方法是需要传入 FunctionInitializationContext,用于初始化non-keyed 状态的 “容器”,容器的类型是 ListState,供 non-keyed 状态的对象被检查点存储时使用:

this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");

初始化之后,调用 isRestored() 方法可以获取当前是否在恢复。如果是 true, 表示正在恢复。

正如下面的代码所示,在状态初始化期间恢复 BufferingSinkListState 中保存的变量可以被 snapshotState()使用, ListState 会清除掉之前检查点存储的对象,然后存储当前检查点的对象。

当然, keyed 状态也可以在 initializeState() 方法中初始化, 可以使用 FunctionInitializationContext 来完成初始化,而不是使用Flink1.1中的 RuntimeContext,如果CheckpointedFunction 要在 CountMapper 中使用该接口,则可以不使用 open() 方法, snapshotState()initializeState() 方法如下所示:

public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
        implements CheckpointedFunction {

    private transient ValueState<Integer> counter;

    private final int numberElements;

    public CountMapper(int numberElements) {
        this.numberElements = numberElements;
    }

    @Override
    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        int count = counter.value() + 1;
        counter.update(count);

        if (count % numberElements == 0) {
            out.collect(Tuple2.of(value.f0, count));
            counter.update(0); // reset to 0
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // all managed, nothing to do.
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        counter = context.getKeyedStateStore().getState(
            new ValueStateDescriptor<>("counter", Integer.class, 0));
    }
}

请注意, snapshotState() 方法为空,因为Flink本身负责在检查点时就会存储Keys化的对象

到目前为止,我们已经了解如何修改函数来引入Flink 1.2的新函数。剩下的问题是“我可以确保我的修改后的(Flink 1.2)作业将从我从Flink 1.1运行的作业停止的位置开始吗?”。

答案是肯定的,而这样做的方式非常简单。对于被Keys化状态,什么都不需要做。Flink将负责从Flink 1.1恢复状态。对于非被Keys化状态,新函数必须像上面代码一样实现 CheckpointedRestoring 接口,还有个办法,需要熟悉Flink1.1的 restoreState()Checkpointed 接口,然后修改 BufferingSink, restoreState() 方法完成和之前一样的功能。

时间对齐窗口算子

在Flink 1.1中,只有在没有指定的触发器的时, timeWindow() 才会实例化特殊的数据类型 WindowOperator。 它可以是 AggregatingProcessingTimeWindowOperatorAccumulatingProcessingTimeWindowOperator.。这两个算子都可以称之为时间对齐窗口算子,因为它们假定输入数据是按顺序到达,当在处理时间中操作时,这是有效的,元素到达窗口操作时获得时间。算子仅限于使用内存状态,并且优化了用于存储数据的元素结构。

在Flink 1.2中,不推荐使用对齐窗口算子,并且所有窗口算子操作都通过泛型WindowOperator来实现。迁移不需要更改Flink 1.1作业的代码,因为Flink将读取Flink 1.1保存点中对齐的窗口 算子存储的状态,将其转换为与泛型相兼容的格式 WindowOperator,并使用通用的 WindowOperator

虽然是已经弃用这个方法,但是在Flink 1.2 中仍然是可以使用的,通过特殊的 WindowAssigners 可以实现这个目的。 SlidingAlignedProcessingTimeWindowsTumblingAlignedProcessingTimeWindows assigners,分别是滑动窗口和滚动窗口,使用对齐窗口的Flink 1.2作业必须是一项新作业,因为在使用这些 算子时无法从Flink 1.1保存点恢复执行。

注意时间对齐窗口算子不提供自适应 而且 不向下兼容 Flink 1.1.

在Flink 1.2中使用对齐窗口 算子的代码如下所示:

// for tumbling windows
DataStream<Tuple2<String, Integer>> window1 = source
    .keyBy(0)
    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
    .apply(your-function)

// for sliding windows
DataStream<Tuple2<String, Integer>> window1 = source
    .keyBy(0)
    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
    .apply(your-function)
// for tumbling windows val window1 = source
    .keyBy(0)
    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
    .apply(your-function)

// for sliding windows val window2 = source
    .keyBy(0)
    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
    .apply(your-function)


回到顶部