为Flink程序注册自定义序列化程序
如果您在Flink程序中使用自定义类型无法通过Flink类型序列化程序进行序列化,则Flink将回退到使用通用Kryo序列化程序。您可以注册自己的序列化程序或序列化系统,如Google Protobuf或Apache Thrift with Kryo。为此,只需在ExecutionConfig
Flink程序中注册类型类和序列化程序即可。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register the class of the serializer as serializer for a type
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
// register an instance as serializer for a type
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);
请注意,您的自定义序列化程序必须扩展Kryo的Serializer类。对于Google Protobuf或Apache Thrift,已经为您完成了这项工作:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register the Google Protobuf serializer with Kryo
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);
// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
要使上述示例起作用,您需要在Maven项目文件(pom.xml)中包含必要的依赖项。在依赖项部分中,为Apache Thrift添加以下内容:
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-thrift</artifactId>
<version>0.5.2</version>
</dependency>
<!-- libthrift is required by chill-thrift -->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.6.1</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
对于Google Protobuf,您需要以下Maven依赖项:
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-protobuf</artifactId>
<version>0.5.2</version>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
请根据需要调整两个库的版本。
使用Kryo JavaSerializer的问题
如果您JavaSerializer
为自定义类型注册Kryo ,即使您的自定义类型类包含在提交的用户代码jar中,您也可能遇到ClassNotFoundException。这是由于Kryo的已知问题JavaSerializer
,可能会错误地使用错误的类加载器。
在这种情况下,您应该使用它org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer
来解决问题。这是JavaSerializer
在Flink中重新实现的,确保使用用户代码类加载器。
有关详细信息,请参阅FLINK-6025。