算子
算子将一个或多个DataStream转换为新的DataStream。程序可以将多个转换组合成复杂的数据流拓扑。
本节介绍了基本转换,应用这些转换后的有效物理分区以及对Flink 算子链接的见解。
DataStream转换
转换:映射 DataStream→DataStream
描述:采用一个数据元并生成一个数据元。一个map函数,它将输入流的值加倍:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
转换:FlatMap DataStream→DataStream
描述:采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
转换:Filter DataStream→DataStream
描述:计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
转换:KeyBy DataStream→KeyedStream
描述:逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()_是使用散列分区实现的。指定键有不同的方法。此转换返回_KeyedStream,其中包括使用被Keys化状态所需的_KeyedStream_。
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
注意 如果出现以下情况,则类型不能成为关键:
- 它是POJO类型但不覆盖_hashCode()_方法并依赖于_Object.hashCode()_实现。
- 它是任何类型的数组。
转换:Reduce KeyedStream→DataStream
描述:被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值。
reduce函数,用于创建部分和的流:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
转换:折叠 KeyedStream→DataStream
描述:具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值。
折叠函数,当应用于序列(1,2,3,4,5)时,发出序列“start-1”,“start-1-2”,“start-1-2-3”,. ..
DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
转换:聚合 KeyedStream→DataStream
描述:在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
转换:Window KeyedStream→WindowedStream
描述:可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。有关窗口的完整说明,请参见windows。
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
转换:WindowAll DataStream→AllWindowedStream
描述:Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分组。有关窗口的完整说明,请参见windows。警告:在许多情况下,这是非并行转换。所有记录将收集在windowAll 算子的一个任务中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
转换:Window Apply WindowedStream→DataStream AllWindowedStream→DataStream
描述:将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。注意:如果您正在使用windowAll转换,则需要使用AllWindowFunction。
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple,
Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public void apply (Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
转换:Window Reduce WindowedStream→DataStream
描述:将函数缩减函数应用于窗口并返回缩小的值。
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
});
转换:Window Fold WindowedStream→DataStream
描述:将函数折叠函数应用于窗口并返回折叠值。示例函数应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”:
windowedStream.fold("start", new FoldFunction<Integer, String>() {
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
转换:Windows上的聚合 WindowedStream→DataStream
描述:聚合窗口的内容。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
转换:Union DataStream *→DataStream
描述:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。
dataStream.union(otherStream1, otherStream2, ...);
转换:Window Join DataStream,DataStream→DataStream
描述:在给定Keys和公共窗口上连接两个数据流。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
转换:Interval Join KeyedStream,KeyedStream→DataStream
描述:在给定的时间间隔内使用公共Keys关联两个被Key化的数据流的两个数据元e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
转换:Window CoGroup DataStream,DataStream→DataStream
描述:在给定Keys和公共窗口上对两个数据流进行Cogroup。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
转换:连接 DataStream,DataStream→ConnectedStreams
描述:“连接”两个保存其类型的数据流。连接允许两个流之间的共享状态。
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
转换:CoMap,CoFlatMap ConnectedStreams→DataStream
描述:类似于连接数据流上的map和flatMap
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
转换:拆分 DataStream→SplitStream
描述:根据某些标准将流拆分为两个或更多个流。
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
转换:选择 SplitStream→DataStream
描述:从拆分流中选择一个或多个流。
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
转换:迭代 DataStream→IterativeStream→DataStream
描述:通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅迭代。
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value <= 0;
}
});
转换:提取时间戳 DataStream→DataStream
描述:从记录中提取时间戳,以便使用使用事件时间语义的窗口。查看活动时间。
stream.assignTimestamps (new TimeStampExtractor() {...});
转换:Map DataStream → DataStream
描述:Takes one element and produces one element. A map function that doubles the values of the input stream:
dataStream.map { x => x * 2 }
转换:FlatMap DataStream → DataStream
描述:Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap { str => str.split(" ") }
转换:Filter DataStream → DataStream
描述:Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
dataStream.filter { _ != 0 }
转换:KeyBy DataStream → KeyedStream
描述:Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedStream.
dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple
转换:Reduce KeyedStream → DataStream
描述:A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:
keyedStream.reduce { _ + _ }
转换:Fold KeyedStream → DataStream
描述:A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.
A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })
转换:Aggregations KeyedStream → DataStream
描述:Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
转换:Window KeyedStream → WindowedStream
描述:Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows.
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
转换:WindowAll DataStream → AllWindowedStream
描述:Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
转换:Window Apply WindowedStream → DataStream AllWindowedStream → DataStream | Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction }
转换:Window Reduce WindowedStream → DataStream
描述:Applies a functional reduce function to the window and returns the reduced value.
windowedStream.reduce { _ + _ }
转换:Window Fold WindowedStream → DataStream
描述:Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":
val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })
转换:Aggregations on windows WindowedStream → DataStream
描述:Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
转换:Union DataStream* → DataStream
描述:Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.
dataStream.union(otherStream1, otherStream2, ...)
转换:Window Join DataStream,DataStream → DataStream
描述:Join two data streams on a given key and a common window.
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
转换:Window CoGroup DataStream,DataStream → DataStream
描述:Cogroups two data streams on a given key and a common window.
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
转换:Connect DataStream,DataStream → ConnectedStreams
描述:"Connects" two data streams retaining their types, allowing for shared state between the two streams.
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
转换:CoMap, CoFlatMap ConnectedStreams → DataStream
描述:Similar to map and flatMap on a connected data stream
connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
转换:Split DataStream → SplitStream
描述:Split the stream into two or more streams according to some criterion.
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
转换:Select SplitStream → DataStream
描述:Select one or more streams from a split stream.
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
转换:Iterate DataStream → IterativeStream → DataStream
描述:Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.
initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
转换:Extract Timestamps DataStream → DataStream
描述:Extracts timestamps from records in order to work with windows that use event time semantics. See Event Time.
stream.assignTimestamps { timestampExtractor }
Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:
val data: DataStream[(Int, String, Double)] = // [...] data.map {
case (id, name, temperature) => // [...] }
is not supported by the API out-of-the-box. To use this feature, you should use a Scala API extension.
以下转换可用于元组的数据流:
转换:Project DataStream→DataStream
描述:从元组中选择字段的子集
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);
物理分区
Flink还通过以下函数对转换后的精确流分区进行低级控制(如果需要)。
转换:自定义分区 DataStream→DataStream
描述:使用用户定义的分区程序为每个数据元选择目标任务。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
转换:随机分区 DataStream→DataStream
描述:根据均匀分布随机分配数据元。
dataStream.shuffle();
转换:Rebalance (循环分区) DataStream→DataStream
描述:分区数据元循环,每个分区创建相等的负载。在存在数据偏斜时用于性能优化。
dataStream.rebalance();
转换:重新调整 DataStream→DataStream
描述:分区数据元,循环,到下游 算子操作的子集。如果您希望拥有管道,例如,从源的每个并行实例扇出到多个映射器的子集以分配负载但又不希望发生rebalance()会产生完全Rebalance ,那么这非常有用。这将仅需要本地数据传输而不是通过网络传输数据,具体取决于其他配置值,例如TaskManagers的插槽数。上游 算子操作发送数据元的下游 算子操作的子集取决于上游和下游 算子操作的并行度。例如,如果上游 算子操作具有并行性2并且下游 算子操作具有并行性6,则一个上游 算子操作将分配元件到三个下游 算子操作,而另一个上游 算子操作将分配到其他三个下游 算子操作。另一方面,如果下游 算子操作具有并行性2而上游 算子操作具有并行性6,则三个上游 算子操作将分配到一个下游 算子操作,而其他三个上游 算子操作将分配到另一个下游 算子操作。在不同并行度不是彼此的倍数的情况下,一个或多个下游 算子操作将具有来自上游 算子操作的不同数量的输入。请参阅此图以获取上例中连接模式的可视化:
dataStream.rescale();
转换:广播 DataStream→DataStream
描述:向每个分区广播数据元。
dataStream.broadcast();
转换:Custom partitioning DataStream → DataStream
描述:Uses a user-defined Partitioner to select the target task for each element.
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
转换:Random partitioning DataStream → DataStream
描述:Partitions elements randomly according to a uniform distribution.
dataStream.shuffle()
转换:Rebalancing (Round-robin partitioning) DataStream → DataStream
描述:Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.
dataStream.rebalance()
转换:Rescaling DataStream → DataStream
描述:Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations would distribute to one downstream operation while the other two upstream operations would distribute to the other downstream operations.In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.</p> Please see this figure for a visualization of the connection pattern in the above example: </p>
dataStream.rescale()
转换:Broadcasting DataStream → DataStream
描述:Broadcasts elements to every partition.
dataStream.broadcast()
任务链和资源组
链接两个后续转换意味着将它们共同定位在同一个线程中以获得更好的性能。如果可能的话,Flink默认链算子(例如,两个后续的映射转换)。如果需要,API可以对链接进行细粒度控制:
使用StreamExecutionEnvironment.disableOperatorChaining()
如果要禁用整个工作链。对于更细粒度的控制,可以使用以下函数。请注意,这些函数只能在DataStream转换后立即使用,因为它们引用了前一个转换。例如,您可以使用someStream.map(...).startNewChain()
,但不能使用someStream.startNewChain()
。
资源组是Flink中的一个插槽,请参阅 插槽。如果需要,您可以在单独的插槽中手动隔离算子
转换:开始新的链条
描述:从这个 算子开始,开始一个新的链。两个映射器将被链接,并且过滤器将不会链接到第一个映射器。
someStream.filter(...).map(...).startNewChain().map(...);
转换:禁用链接
描述:不要链接Map 算子
someStream.map(...).disableChaining();
转换:设置插槽共享组
描述:设置 算子操作的插槽共享组。Flink将把具有相同插槽共享组的 算子操作放入同一个插槽,同时保持其他插槽中没有插槽共享组的 算子操作。这可用于隔离插槽。如果所有输入 算子操作都在同一个插槽共享组中,则插槽共享组将继承输入 算子操作。默认插槽共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)将 算子操作显式放入此组中。
someStream.filter(...).slotSharingGroup("name");
转换:Start new chain
描述:Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.
someStream.filter(...).map(...).startNewChain().map(...)
转换:Disable chaining
描述:Do not chain the map operator
someStream.map(...).disableChaining()
转换:Set slot sharing group
描述:Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").
someStream.filter(...).slotSharingGroup("name")