Skip to content

Operators

运算符将一个或多个数据流转换为新的Datastream。程序可以将多个转换组合成复杂的数据流拓扑。

本节介绍了基本转换、应用这些转换后的有效物理分区以及对FLink的操作员链接的见解。

DataStream Transformations 数据流转换

转换 描述
Map
DataStream → DataStream 获取一个元素并生成一个元素。将输入流的值加倍的map函数:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
}); 

| | FlatMap DataStream → DataStream | 获取一个元素并生成零个、一个或多个元素。将语句拆分为单词的FlatMap函数:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
}); 

| | Filter DataStream → DataStream | 为每个元素评估布尔函数,并保留函数返回true的布尔函数。筛选零值的筛选器:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
}); 

| | KeyBy 数据流(keyedstream)|在逻辑上将数据流分割为不相交的分区。具有相同密钥的所有记录被分配给相同的分区。内部,KEYBY()_是用哈希分区实现的。指定键有不同的方法。此转换返回a_keyedstream,它是使用键入状态所需的其他事项。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple 

注意,如果下列情况,类型不能是键

  1. 它是POJO类型,但不重写hashCode()方法,并且依赖于Object.hashCode()实现。
  2. 它是任何类型的数组。

| | Reduce KeyedStream → DataStream | 键控数据流上的“滚动”还原。将当前元素与最后一个约简值组合起来,并发出新值。

A reduce function that creates a stream of partial sums: 创建部分和流的还原函数:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
}); 

| | Fold KeyedStream → DataStream | 具有初始值的键控数据流上的“滚动”折叠。将当前元素与最后折叠的值组合起来,并发出新值。

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ... 当应用于序列(1,2,3,4,5)时,折叠函数发出序列"启动-1"、"启动-1-2"、"启动-1-2-3"等。

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  }); 

| | Aggregations KeyedStream → DataStream | 键控数据流上的滚动聚合。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy相同)。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key"); 

| | Window KeyedStream → WindowedStream | 可以在已分区的KeyedStreams上定义窗口。Windows根据某些特性(例如,在最后5秒内到达的数据)对每个键中的数据进行分组。有关Windows的完整描述,请参见windows

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data 

| | WindowAll DataStream → AllWindowedStream | Windows可以在常规的数据流上定义。Windows根据某些特性(例如,在最后5秒内到达的数据)对所有流事件进行分组。有关windows的完整描述,请参见windows警告:这在很多情况下是非并行的转换。所有记录将在一个任务中为windowAll操作符收集。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data 

| | Window Apply WindowedStream → DataStream AllWindowedStream → DataStream | 将一般函数应用于整个窗口。下面是手动对窗口元素进行求和的函数。注意:如果您正在使用窗口墙转换,则需要使用AllWindowFunction。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
}); 

| | Window Reduce WindowedStream → DataStream | 将函数约简函数应用到窗口并返回约简值。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
}); 

| | Window Fold WindowedStream → DataStream | 将函数折叠函数应用于窗口并返回折叠值。当应用于序列(1,2,3,4,5)时,示例函数将序列折叠为字符串“start-1-2-3-4-5”:

windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
}); 

| | Aggregations on windows WindowedStream → DataStream | 聚合窗口的内容。min和minBy的区别在于min返回最小值,而minBy返回在此字段中具有最小值的元素(max和maxBy相同)。

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key"); 

| | Union DataStream* → DataStream | 两个或多个数据流的联合,创建一个新的流,其中包含来自所有流的所有元素。注意:如果将数据流与自身合并,则会在结果流中获得两次每个元素。

dataStream.union(otherStream1, otherStream2, ...); 

| | Window Join DataStream,DataStream → DataStream | 在给定键和公用窗口上联接两个数据流。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...}); 

| | Interval Join KeyedStream,KeyedStream → DataStream | 在给定的时间间隔内,将两个键控流的两个元素E1和E2与公共密钥一起加入,以便E1.timestamp+下限<=e2.timestamp<=E1.timestamp+Upperbound

// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...}); 

| | Window CoGroup DataStream,DataStream → DataStream | 在给定的密钥和公共窗口上合并两个数据流。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...}); 

| | Connect DataStream,DataStream → ConnectedStreams | "连接"两个数据流保留它们的类型。连接允许两个流之间共享状态。

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); 

| | CoMap, CoFlatMap ConnectedStreams → DataStream | 与连接的数据流上的映射和平面映射类似

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
}); 

| | Split DataStream → SplitStream | 根据某种标准将流分成两个或多个流。

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
}); 

| | Select SplitStream → DataStream | 从分离流中选择一个或多个流。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd"); 

| | Iterate DataStream → IterativeStream → DataStream | 通过将一个运算符的输出重定向到某个以前的运算符,在流程中创建"反馈"循环。这对于定义连续更新模型的算法尤其有用。下面的代码以一个流开始,并连续地应用迭代体。大于0的元素被发送回反馈信道,其余元素被转发到下游。有关完整说明,请参见迭代

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
}); 

| | Extract Timestamps DataStream → DataStream | 从记录中提取时间戳,以便与使用事件时间语义的窗口一起工作。请参见事件时间

stream.assignTimestamps (new TimeStampExtractor() {...}); 

|

Transformation Description
Map
DataStream → DataStream 获取一个元素并生成一个元素。将输入流的值加倍的map函数:
dataStream.map { x => x * 2 } 

| | FlatMap DataStream → DataStream | 获取一个元素并生成零个、一个或多个元素。将语句拆分为单词的FlatMap函数:

dataStream.flatMap { str => str.split(" ") } 

| | Filter DataStream → DataStream | 为每个元素评估布尔函数,并保留函数返回true的布尔函数。筛选零值的筛选器:

dataStream.filter { _ != 0 } 

| | KeyBy DataStream → KeyedStream | 逻辑地将流分割为不相交的分区,每个分区包含相同密钥的元素。在内部,这是通过哈希分区实现的。有关如何指定密钥的Keys。此转换返回一个KeypedStream。

dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple 

| | Reduce KeyedStream → DataStream | 键控数据流上的“滚动”还原。将当前元素与最后一个约简值组合起来,并发出新值。

A reduce function that creates a stream of partial sums: 创建部分和流的还原函数:

keyedStream.reduce { _ + _ } 

</p> | | Fold KeyedStream → DataStream | 具有初始值的键控数据流上的“滚动”折叠。将当前元素与最后折叠的值组合起来,并发出新值。

当应用于序列(1,2,3,4,5)时,折叠函数发出序列"启动-1"、"启动-1-2"、"启动-1-2-3"等。

val result: DataStream[String] =
    keyedStream.fold("start")((str, i) =&gt; { str + "-" + i }) 

| | Aggregations KeyedStream → DataStream | 键控数据流上的滚动聚合。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy相同)。

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key") 

| | Window KeyedStream → WindowedStream | 可以在已经分区的Keyed Streams上定义Windows。Windows根据某种特性(例如,在最后5秒内到达的数据)对每个键中的数据进行分组。有关窗口的描述,请参见[windows](windows.html)。

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data 

| | WindowAll DataStream → AllWindowedStream | Windows可以在常规的数据流上定义。Windows根据某些特性(例如,在最后5秒内到达的数据)对所有流事件进行分组。有关windows的完整描述,请参见windows警告:这在很多情况下是非并行的转换。所有记录将在一个任务中为windowAll操作符收集。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data 

| | Window Apply WindowedStream → DataStream AllWindowedStream → DataStream | 将一般函数应用于整个窗口。下面是手动对窗口元素进行求和的函数。注意:如果您正在使用窗口墙转换,则需要使用AllWindowFunction。

windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction } 

| | Window Reduce WindowedStream → DataStream | 将函数约简函数应用到窗口并返回约简值。

windowedStream.reduce { _ + _ } 

| | Window Fold WindowedStream → DataStream | 将函数折叠函数应用于窗口并返回折叠值。当应用于序列(1,2,3,4,5)时,示例函数将序列折叠为字符串“start-1-2-3-4-5”:

val result: DataStream[String] =
    windowedStream.fold("start", (str, i) =&gt; { str + "-" + i }) 

| | Aggregations on windows WindowedStream → DataStream | 聚合窗口的内容。min和minBy的区别在于min返回最小值,而minBy返回在此字段中具有最小值的元素(max和maxBy相同)。

windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key") 

| | Union DataStream* → DataStream | 两个或多个数据流的联合,创建一个新的流,其中包含来自所有流的所有元素。注意:如果将数据流与自身合并,则会在结果流中获得两次每个元素。

dataStream.union(otherStream1, otherStream2, ...) 

| | Window Join DataStream,DataStream → DataStream | 在给定键和公用窗口上联接两个数据流。

dataStream.join(otherStream)
    .where(&lt;key selector&gt;).equalTo(&lt;key selector&gt;)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... } 

| | Window CoGroup DataStream,DataStream → DataStream | 在给定的密钥和公共窗口上合并两个数据流。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply {} 

| | Connect DataStream,DataStream → ConnectedStreams | “连接”两个数据流,保留它们的类型,允许在两个数据流之间共享状态。

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream) 

| | CoMap, CoFlatMap ConnectedStreams → DataStream | 与连接的数据流上的映射和平面映射类似

connectedStreams.map(
    (_ : Int) =&gt; true,
    (_ : String) =&gt; false
)
connectedStreams.flatMap(
    (_ : Int) =&gt; true,
    (_ : String) =&gt; false
) 

| | Split DataStream → SplitStream | 根据某种标准将流分成两个或多个流。

val split = someDataStream.split(
  (num: Int) =&gt;
    (num % 2) match {
      case 0 =&gt; List("even")
      case 1 =&gt; List("odd")
    }
) 

| | Select SplitStream → DataStream | 从分离流中选择一个或多个流。

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd") 

| | Iterate DataStream → IterativeStream → DataStream | 通过将一个运算符的输出重定向到某个以前的运算符,在流程中创建"反馈"循环。这对于定义连续更新模型的算法尤其有用。下面的代码以一个流开始,并连续地应用迭代体。大于0的元素被发送回反馈信道,其余元素被转发到下游。有关完整说明,请参见迭代

initialStream.iterate {
  iteration =&gt; {
    val iterationBody = iteration.map {/*do something*/}
    (iterationBody.filter(_ &gt; 0), iterationBody.filter(_ &lt;= 0))
  }
} 

| | Extract Timestamps DataStream → DataStream | 从记录中提取时间戳,以便与使用事件时间语义的窗口一起工作。请参见事件时间

stream.assignTimestamps { timestampExtractor } 

|

通过匿名模式匹配从元组、案例类和集合中提取,如下所示:

val data: DataStream[(Int, String, Double)] = // [...] data.map {
  case (id, name, temperature) => // [...] }

不支持APIOut-of-the-Box。要使用此功能,您应该使用ScalaAPI扩展

以下转换可用于元组的数据流:

转换 描述
Project
DataStream → DataStream 从元组中选择字段子集。
DataStream&lt;Tuple3&lt;Integer, Double, String&gt;&gt; in = // [...]
DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; out = in.project(2,0);

|

Physical partitioning 物理分区

FLink还通过以下功能对转换后的精确流分区给出低级控制(如果需要)。

Transformation Description
Custom partitioning
DataStream → DataStream 使用用户定义的Partitioner为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0); 

| | Random partitioning DataStream → DataStream | 根据均匀分布随机划分元素。

dataStream.shuffle(); 

| | Rebalancing (Round-robin partitioning) DataStream → DataStream | 分区元素循环,每个分区创建相等的负载。用于在数据偏斜的存在下进行性能优化。

dataStream.rebalance(); 

| | Rescaling DataStream → DataStream | 将元素(循环)划分为下游操作的子集。如果您希望有管道,例如,从源的每个并行实例到几个映射器的子集来分配负载,但不希望重新平衡会引起的重新平衡,这是非常有用的。这将只需要本地数据传输,而不是通过网络传输数据,这取决于其他配置值,例如TaskManager的时隙数。上游操作向其发送元素的下游操作的子集取决于上游和下游操作的并行度。例如,如果上游操作具有并行性2,而下游操作具有并行性6,则一个上游操作将单元分配到三个下游操作,而另一个上游操作将分配给其他三个下游操作。另一方面,如果下游操作具有并行性2,而上游操作具有并行性6,则三个上游操作将分布到一个下游操作,而其他三个上游操作将分配给另一个下游操作。如果不同的并行性不是彼此的倍数,则一个或几个下游操作将有来自上游操作的不同数量的输入。请参见上图中的连接模式可视化图:!(数据流中的检查点屏障)(./img/rescale.svg)

dataStream.rescale(); 

| | Broadcasting DataStream → DataStream | 向每个分区广播元素。

dataStream.broadcast(); 

|

Transformation Description
Custom partitioning
DataStream → DataStream 使用用户定义的Partitioner为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0) 

| | Random partitioning DataStream → DataStream | 根据均匀分布随机划分元素。

dataStream.shuffle() 

| | Rebalancing (Round-robin partitioning) DataStream → DataStream | 分区元素循环,每个分区创建相等的负载。用于在数据偏斜的存在下进行性能优化。

dataStream.rebalance() 

| | Rescaling DataStream → DataStream | 分区元素,循环,到下游操作的子集。如果您希望具有管线(例如,从源的每个并行实例导出到多个映射器的子集以分配负载),但不希望重新平衡()将产生的全部重新平衡,则这是有用的。这将仅需要本地数据传送而不是通过网络传送数据,这取决于诸如任务管理器的时隙数目的其它配置值。上游操作发送元件的下行操作的子集取决于上游和下游操作两者的并行度。例如,如果上游操作具有平行度2且下游操作具有平行度4,则一个上游操作将将元素分配到两个下游操作,而另一个上游操作将分配到另两个下游操作。另一方面,如果下游操作具有并行度2,而上游操作具有并行4,则两个上游操作将分配到一个下游操作,而另两个上游操作将分配到另一个下游操作。在不同的并行不是彼此的倍数的情况下,一个或多个下游操作将具有来自上游操作的不同数量的输入。&lt;/p&gt;请参见该图用于在上述示例中用于连接模式的可视化:&lt;/p&gt;![数据流中的检查点屏障](../IMG/rescale.svg)

dataStream.rescale() 

| | Broadcasting DataStream → DataStream | 向每个分区广播元素。

dataStream.broadcast() 

|

Task chaining and resource groups 任务链接和资源组

将两个后续转换链接在一起意味着将它们放在同一个线程中,以获得更好的性能。默认情况下,Flink操作符(如果可能的话)(例如,两个后续的映射转换)。如果需要,API可以对链接进行细粒度控制:

如果要禁用整个作业中的链接,请使用“StreameExecutionEnvironment.DisableOperator链接()”。对于更精细的控制,可以使用以下功能。请注意,这些函数只能在数据流转换后使用,因为它们引用了以前的转换。例如,可以使用“someStream.map(...).StartNewChain()”,但不能使用“SomeStream.StartNewChain()”。

资源组是Flink中的一个槽,请参见slots.如果需要,可以在单独的槽中手动隔离运算符。

Transformation Description
Start new chain 开始一个新的链,从这个操作符开始。这两个映射器将被链子化,过滤器将不被链接到第一个映射器。
someStream.filter(...).map(...).startNewChain().map(...);

| | 禁用链 | 不要对地图操作员链进行链

someStream.map(...).disableChaining();

| | Set slot sharing group | 设置操作的时隙共享组。Flink将把具有相同时隙共享组的操作放到相同的时隙中,同时保持在其他时隙中没有时隙共享组的操作。这可以用来隔离插槽。如果所有输入操作都在同一个时隙共享组中,则从输入操作继承时隙共享组。默认插槽共享组的名称为“默认”,可以通过调用slotSharingGroup(“Default”)显式地将操作放入该组。

someStream.filter(...).slotSharingGroup("name");

|

Transformation Description
Start new chain 开始一个新的链,从这个操作符开始。这两个映射器将被链子化,过滤器将不被链接到第一个映射器。
someStream.filter(...).map(...).startNewChain().map(...)

| | Disable chaining | Do not chain the map operator

someStream.map(...).disableChaining()

| | Set slot sharing group | 设置操作的时隙共享组。Flink将把具有相同时隙共享组的操作放到相同的时隙中,同时保持在其他时隙中没有时隙共享组的操作。这可以用来隔离插槽。如果所有输入操作都在同一个时隙共享组中,则从输入操作继承时隙共享组。默认插槽共享组的名称为“默认”,可以通过调用slotSharingGroup(“Default”)显式地将操作放入该组。

someStream.filter(...).slotSharingGroup("name")

|



回到顶部