Skip to content

并行执行

译者:flink.sojb.cn

本节介绍如何在Flink中配置程序的并行执行。Flink程序由多个任务(转换/ 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为_并行性_。

如果要使用保存点,还应考虑设置最大并行度(或max parallelism)。从保存点恢复时,您可以更改特定 算子或整个程序的并行度,此设置指定并行度的上限。这是必需的,因为Flink在内部将状态划分为Keys组,并且我们不能拥有+Inf多个Keys组,因为这会对性能产生不利影响。

设置并行性

可以在不同级别的Flink中指定任务的并行性:

算子级别

可以通过调用其setParallelism()方法来定义单个 算子,数据源或数据接收器的并行性 。例如,像这样:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
val env = StreamExecutionEnvironment.getExecutionEnvironment

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5)
wordCounts.print()

env.execute("Word Count Example")

运行环境级别

如此处所述 Flink程序在运行环境的上下文中执行。运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。可以通过显式配置 算子的并行性来覆盖运行环境并行性。

可以通过调用setParallelism()方法来指定运行环境的默认并行性 。要以并行方式执行所有 算子,数据源和数据接收器,请3按如下方式设置运行环境的默认并行度:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1)
wordCounts.print()

env.execute("Word Count Example")

客户级别

在向Flink提交作业时,可以在客户端设置并行性。客户端可以是Java或Scala程序。这种客户端的一个例子是Flink的命令行界面(CLI)。

对于CLI客户端,可以使用指定parallelism参数-p。例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar 

在Java / Scala程序中,并行性设置如下:

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}
try {
    PackagedProgram program = new PackagedProgram(file, args)
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
    Configuration config = new Configuration()

    Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())

    // set the parallelism to 10 here
    client.run(program, 10, true)

} catch {
    case e: Exception => e.printStackTrace
}

系统级别

可以通过设置parallelism.default属性来定义所有运行环境的系统范围默认并行度 ./conf/flink-conf.yaml。有关详细信息,请参阅 配置文档

设置最大并行度

可以在可以设置并行度的位置设置最大并行度(客户端级别和系统级别除外)。而不是调用,setParallelism()你调用 setMaxParallelism()设置最大并行度。

最大并行度的默认设置大致operatorParallelism + (operatorParallelism / 2)为下限127和上限32768

注意将最大并行度设置为非常大的值可能对性能有害,因为某些状态后台必须保持内部数据结构随Keys组的数量(这是可重新缓存状态的内部实现机制)进行扩展。



回到顶部