Skip to content

为Flink程序注册自定义序列化程序

译者:flink.sojb.cn

如果您在Flink程序中使用自定义类型无法通过Flink类型序列化程序进行序列化,则Flink将回退到使用通用Kryo序列化程序。您可以注册自己的序列化程序或序列化系统,如Google Protobuf或Apache Thrift with Kryo。为此,只需在ExecutionConfigFlink程序中注册类型类和序列化程序即可。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register the class of the serializer as serializer for a type
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);

// register an instance as serializer for a type
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);

请注意,您的自定义序列化程序必须扩展Kryo的Serializer类。对于Google Protobuf或Apache Thrift,已经为您完成了这项工作:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register the Google Protobuf serializer with Kryo
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);

// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);

要使上述示例起作用,您需要在Maven项目文件(pom.xml)中包含必要的依赖项。在依赖项部分中,为Apache Thrift添加以下内容:

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>chill-thrift</artifactId>
    <version>0.5.2</version>
</dependency>
<!-- libthrift is required by chill-thrift -->
<dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libthrift</artifactId>
    <version>0.6.1</version>
    <exclusions>
        <exclusion>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </exclusion>
    </exclusions>
</dependency>

对于Google Protobuf,您需要以下Maven依赖项:

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>chill-protobuf</artifactId>
    <version>0.5.2</version>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.5.0</version>
</dependency>

请根据需要调整两个库的版本。

使用Kryo JavaSerializer的问题

如果您JavaSerializer为自定义类型注册Kryo ,即使您的自定义类型类包含在提交的用户代码jar中,您也可能遇到ClassNotFoundException。这是由于Kryo的已知问题JavaSerializer,可能会错误地使用错误的类加载器。

在这种情况下,您应该使用它org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer 来解决问题。这是JavaSerializer在Flink中重新实现的,确保使用用户代码类加载器。

有关详细信息,请参阅FLINK-6025



回到顶部