Skip to content

最佳实践

这里是Flink常见问题的解决办法

大部分Flink程序(无论是批处理还是流式计算)都依赖于外部的参数配置. 比如指定输入输出位置(路径或地址),系统参数(并行度和运行时配置),程序相关的参数配置。

Flink提供一个叫 ParameterTool 的工具类来解决此问题。 ParameterTool 并不是解决这个问题的唯一办法,其他框架,比如 Commons CLIargparse4j 也可以解决这个问题.

通过 ParameterTool 来获取配置值

ParameterTool 工具提供了很多读取配置的静态方法, 工具类内部实现类似于 Map<String, String>, 所以和项目代码风格不会有冲突

.properties 文件获取配置

下面是个读 Properties 文件并将内容转换成key-value的例子

String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);

File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);

InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

从命令行中获取配置

下面是命令行参数的例子,例如在命令行后加入 --input hdfs:///mydata --elements 42 参数。

public static void main(String[] args) {
    ParameterTool parameter = ParameterTool.fromArgs(args);
    // .. regular code ..

从系统属性中获取配置

启动JVM时,可以把系统属性传递给程序,例如: -Dinput=hdfs:///mydata。 可以通过 ParameterTool 来获取配置,例如:

ParameterTool parameter = ParameterTool.fromSystemProperties();

根据上文,我们已经获取了参数,下面是如何使用它们。

使用 ParameterTool 工具类

ParameterTool 工具类有直接访问值的方法,例如:

ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.

您可以直接在 main() 中使用, 您可以通过如下代码来设置算子的并行度:

ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

因为 ParameterTool 是可序列化的,所以你可以把它作为函数的参数:

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

然后在函数内通过上文方法获取值.

注册全局参数

通过 ExecutionConfig 注册为全局参数后,可以被JobManager Web界面中的配置值和代码内的所有函数来访问.

注册为全局参数:

ParameterTool parameters = ParameterTool.fromArgs(args);

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

在代码中的函数内访问:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    ParameterTool parameters = (ParameterTool)
        getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    parameters.getRequired("input");
    // .. do more ..

超大TupleX类型命令

强烈推荐使用 POJO 来代替有很多字段的 TupleX, POJO 可以代替多字段的 Tuple类型.

Example

而不是使用:

Tuple11<String, String, ..., String> var = new ...;

把大型元组类型继承为自定义类型也会方便很多

CustomType var = new ...;

public static class CustomType extends Tuple11<String, String, ..., String> {
    // constructor matching super
}

用Logback而不是Log4j

注意:本教程适用于Flink 0.10及其以上版本

Apache Flink 使用 slf4j 作为记录日志的抽象.,建议用户在使用事采用sfl4j来记录日志。

Sfl4j 是一个日志记录的抽象接口,可以在代码中使用不同的日志实现,例如 log4j 或者 Logback

Flink 默认依赖于Log4j. 本页介绍如何使用在Flink中使用Logback. 用户报告说, 他们可以通过 Graylog来建立一个集中式日志收集处理系统.

以下代码可以获取Logger实例:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyClass implements MapFunction {
    private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
    // ...

当通过 IDE 或者 Java程序运行Flink程序时如何使用Logback

在任何情况下,类都是通过依赖管理器(例如maven)创建的路径来执行, Flink 会把log4j的相关依赖拉入到classpath中.

因此, 需要排除掉Flink的log4j依赖. 下面的pom文件从 Flink quickstart创建的maven项目.

pom.xml文件需要进行如下更改:

<dependencies>
    <!-- Add the two required logback dependencies -->
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>1.1.3</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.1.3</version>
    </dependency>

    <!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
     Hadoop is logging to log4j! -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>log4j-over-slf4j</artifactId>
        <version>1.7.7</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.7.1</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.7.1</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.7.1</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

&lt;dependencies&gt; 部分进行了以下更改:

  • 从Flink中排除了所有的 log4j 依赖项: maven会忽略所有Flink对log4j的依赖传递。
  • 从Flink中排除了所有的 slf4j-log4j12 依赖项: 因为我们要使用sl4j来进行logback绑定, 所以我们必须将slf4j从log4j中的绑定删除。
  • 增加 logback-corelogback-classic的依赖项。
  • 增加 log4j-over-slf4j依赖项。 log4j-over-slf4j 是一个可以通过Log4j API来使用Slf4j接口的工具。 Flink依赖于Hadoop,然而Hadoop直接使用Log4j进行日志记录。 因此, 我们需要将所有日志记录器调用从Log4j重定向到Slf4j,后者又记录到Logback。

请注意,您需要对新添加到pom中的Flink依赖都进行手动排除。

您还需要检查其他非Flink依赖项是不是引入了log4j绑定,可以通过mvn dependency:tree来对项目依赖进行分析.

在集群上的Flink程序如何使用Logback

This tutorial is applicable when running Flink on YARN or as a standalone cluster. 本教程适用于在YARN上运行Flink或其他独立集群。

为了在Flink中使用Logback而不是Log4j, 您首先需要从 lib/ 目录移除 log4j-1.2.xx.jarsfl4j-log4j12-xxx.jar

然后, 您需要把以下Jar文件放到 lib/ 目录下:

  • logback-classic.jar
  • logback-core.jar
  • log4j-over-slf4j.jar: 此桥接器需要存在于classpath中,以便Hadoop将日志记录器调用从Log4j重定向到Slf4j。

请注意,此情况你需要在向YARN提交Flink作业时显式指定 lib/ 目录。 例如:

./bin/flink run -yt $FLINK_HOME/lib &lt;... remaining arguments ...&gt;



回到顶部