Skip to content

用户定义的源和接收器

译者:flink.sojb.cn

A TableSource提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问。TableSourceTableEnvironment中注册后,可以通过 Table APISQL查询访问它。

A TableSink 将表发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Parquet或ORC)。

A TableFactory允许将与外部系统的连接声明与实际实现分开。表工厂从规范化的基于字符串的属性创建表源和接收器的已配置实例。可以使用SQL客户端Descriptor或通过YAML配置文件以编程方式生成属性。

有关如何注册TableSource以及如何通过TableSink发出表的详细信息,请查看常见概念和API页面。有关如何使用工厂的示例请参阅内置源,接收器和格式页面。

定义TableSource

A TableSource是一个通用接口,它使 Table API和SQL查询可以访问存储在外部系统中的数据。它提供了表的模式以及使用表的模式映射到行的记录。根据TableSource是在流式查询还是批量查询中使用,记录将生成为DataSetDataStream

如果TableSource在流式查询中使用a,则必须实现该StreamTableSource接口,如果在批处理查询中使用它,则必须实现该BatchTableSource接口。A TableSource还可以实现两个接口,并用于流式和批量查询。

StreamTableSourceBatchTableSource扩展TableSource定义以下方法的基接口:

TableSource<T> {

  public TableSchema getTableSchema();

  public TypeInformation<T> getReturnType();

  public String explainSource();
}
TableSource[T] {

  def getTableSchema: TableSchema

  def getReturnType: TypeInformation[T]

  def explainSource: String

}
  • getTableSchema():返回表的架构,即表的字段的名称和类型。字段类型使用Flink定义TypeInformation(请参阅 Table API类型SQL类型)。

  • getReturnType():返回DataStreamStreamTableSource)或DataSetBatchTableSource)的物理类型以及由此生成的记录TableSource

  • explainSource():返回描述TableSource。的String 。此方法是可选的,仅用于显示目的。

所述TableSource界面分离从物理类型的返回的逻辑表模式DataStreamDataSet。因此,表schema(getTableSchema())的所有字段都必须映射到具有相应类型的物理返回类型(getReturnType())的字段。默认情况下,此映射基于字段名称完成。例如,TableSource定义具有两个字段的表模式的a [name: String, size: Integer]要求TypeInformation具有至少两个被调用的字段namesize类型String和的字段Integer。这可能是一个PojoTypeInfoRowTypeInfo有两个名为领域name,并size与匹配的类型。

但是,某些类型(如Tuple或CaseClass类型)确实支持自定义字段名称。如果a TableSource返回a DataStreamDataSet具有固定字段名称的类型,则它可以实现DefinedFieldMapping接口以将字段名称从表模式映射到物理返回类型的字段名称。

定义BatchTableSource

BatchTableSource接口扩展了TableSource接口,并定义一个额外的方法:

BatchTableSource<T> extends TableSource<T> {

  public DataSet<T> getDataSet(ExecutionEnvironment execEnv);
}
BatchTableSource[T] extends TableSource[T] {

  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
  • getDataSet(execEnv):返回DataSet带有表数据的a 。类型DataSet必须与TableSource.getReturnType()方法定义的返回类型相同。在DataSet通过使用常规创建罐数据源的数据集的API。通常,a BatchTableSource通过打包InputFormat批处理连接器来实​​现

定义StreamTableSource

StreamTableSource接口扩展了TableSource接口,并定义一个额外的方法:

StreamTableSource<T> extends TableSource<T> {

  public DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
}
StreamTableSource[T] extends TableSource[T] {

  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
  • getDataStream(execEnv):返回DataStream带有表数据的a 。类型DataStream必须与TableSource.getReturnType()方法定义的返回类型相同。在DataStream通过使用常规罐创建数据源数据流的API。通常,a StreamTableSource通过打包SourceFunction流连接器来实​​现

使用时间属性定义TableSource

Table APISQL查询的基于时间的 算子操作(例如窗口化聚合或连接)需要明确指定的时间属性

A TableSource将时间属性定义Types.SQL_TIMESTAMP为其表模式中的类型字段。与模式中的所有常规字段相比,时间属性不能与表源的返回类型中的物理字段匹配。相反,a TableSource通过实现某个接口来定义时间属性。

定义处理时间属性

处理时间属性通常用于流式查询。处理时间属性返回访问它的算子的当前挂钟时间。A TableSource通过实现DefinedProctimeAttribute接口来定义处理时间属性。界面如下:

DefinedProctimeAttribute {

  public String getProctimeAttribute();
}
DefinedProctimeAttribute {

  def getProctimeAttribute: String
}
  • getProctimeAttribute():返回处理时间属性的名称。必须Types.SQL_TIMESTAMP在表模式中定义指定的属性类型,并且可以在基于时间的 算子操作中使用该属性。一个DefinedProctimeAttribute表源可以通过返回没有定义的处理时间属性null

注意两者StreamTableSourceBatchTableSource可以实现DefinedProctimeAttribute并定义的处理时间属性。在BatchTableSource处理时间字段的情况下,在表扫描期间使用当前时间戳初始化字段。

定义行时属性

Rowtime属性是类型的属性,TIMESTAMP并在流和批处理查询中以统一的方式处理。

SQL_TIMESTAMP可以通过指定将类型的表模式字段声明为rowtime属性

  • 该字段的名称,
  • a TimestampExtractor计算属性的实际值(通常来自一个或多个其他字段),和
  • a WatermarkStrategy指定如何为rowtime属性生成水印。

A TableSource通过实现DefinedRowtimeAttributes接口来定义行时属性。界面如下:

DefinedRowtimeAttribute {

  public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors();
}
DefinedRowtimeAttributes {

  def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}
  • getRowtimeAttributeDescriptors():返回列表RowtimeAttributeDescriptor。A RowtimeAttributeDescriptor描述了具有以下属性的rowtime属性:
    • attributeName:表架构中rowtime属性的名称。必须使用类型定义该字段Types.SQL_TIMESTAMP
    • timestampExtractor:时间戳提取器从具有返回类型的记录中提取时间戳。例如,它可以将Long字段转换为时间戳或解析字符串编码的时间戳。Flink附带了一组TimestampExtractor针对常见用例的内置实现。还可以提供自定义实现。
    • watermarkStrategy:水印策略定义如何为rowtime属性生成水印。Flink附带了一组WatermarkStrategy针对常见用例的内置实现。还可以提供自定义实现。

注意虽然该getRowtimeAttributeDescriptors()方法返回描述符列表,但目前仅支持单个rowtime属性。我们计划在将来删除此限制,并支持具有多个rowtime属性的表。

注意两者,StreamTableSource并且BatchTableSource,可以实现DefinedRowtimeAttributes并定义rowtime属性。在任何一种情况下,都使用提取行时字段TimestampExtractor。因此,TableSource实现StreamTableSourceBatchTableSource定义行时属性的实现为流和批处理查询提供完全相同的数据。

提供时间戳提取器

Flink提供TimestampExtractor了常见用例的实现。

TimestampExtractor目前提供以下实现:

  • ExistingField(fieldName):提取一个rowtime属性的从现有的值LONGSQL_TIMESTAMP或时间戳记格式化STRING字段。这种字符串的一个例子是'2018-05-28 12:34:56.000'。
  • StreamRecordTimestamp():从时间戳中提取rowtime属性的值DataStream StreamRecord。请注意,这TimestampExtractor不适用于批处理表源。

TimestampExtractor可以通过实现相应的接口来定义自定义。

提供水印策略

Flink提供WatermarkStrategy了常见用例的实现。

WatermarkStrategy目前提供以下实现:

  • AscendingTimestamps:用于提升时间戳的水印策略。带有无序时间戳的记录将被视为迟到。
  • BoundedOutOfOrderTimestamps(delay):时间戳的水印策略,指定的延迟最多是无序的。
  • PreserveWatermarks():一种策略,指示应从底层保存水印DataStream

WatermarkStrategy可以通过实现相应的接口来定义自定义。

使用Projection Push-Down定义TableSource

A TableSource通过实现ProjectableTableSource接口支持Projection下推。该接口定义了一个方法:

ProjectableTableSource<T> {

  public TableSource<T> projectFields(int[] fields);
}
ProjectableTableSource[T] {

  def projectFields(fields: Array[Int]): TableSource[T]
}
  • projectFields(fields):返回一个_副本_的的TableSource与调整身体返回类型。该fields参数提供必须由提供的字段的索引TableSource。索引与TypeInformation物理返回类型有关,而_与逻辑表模式_无关。复制TableSource必须调整其返回类型和返回的DataStreamDataSet。在TableSchema复制的TableSource不能改变,即它必须跟原来一样TableSource。如果TableSource实现DefinedFieldMapping接口,则必须将字段映射调整为新的返回类型。

ProjectableTableSource增加支持项目平场。如果TableSource使用嵌套模式定义表,则可以实现NestedFieldsProjectableTableSource将Projection扩展到嵌套字段。的NestedFieldsProjectableTableSource定义如下:

NestedFieldsProjectableTableSource<T> {

  public TableSource<T> projectNestedFields(int[] fields, String[][] nestedFields);
}
NestedFieldsProjectableTableSource[T] {

  def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
}
  • projectNestedField(fields, nestedFields):返回一个_副本_的的TableSource与调整身体返回类型。可以删除或重新排序物理返回类型的字段,但不得更改其类型。该方法的合同与该方法基本相同ProjectableTableSource.projectFields()。此外,该nestedFields参数包含fields列表中每个字段索引,该列表指向查询访问的所有嵌套字段的路径。所有其他嵌套字段不需要在由TableSource。生成的记录中读取,解析和设置。重要信息不得更改Projection字段的类型,但可以将未使用的字段设置为空或默认值。

使用过滤器下推定义TableSource

FilterableTableSource界面增加了对过滤器下推的支持TableSource。一个TableSource扩展这个接口能够过滤记录,从而使返回DataStream或者DataSet返回较少的记录。

界面如下:

FilterableTableSource<T> {

  public TableSource<T> applyPredicate(List<Expression> predicates);

  public boolean isFilterPushedDown();
}
FilterableTableSource[T] {

  def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]

  def isFilterPushedDown: Boolean
}
  • applyPredicate(predicates):返回一个_副本_的TableSource添加了谓词。该predicates参数是一个可变的联合谓词列表,它被“提供”给TableSource。在TableSource受理用户从列表中删除它来评估一个谓语。列表中剩余的谓词将由后续过滤器 算子进行评估。
  • isFilterPushedDown():如果applyPredicate()之前调用该方法,则返回true 。因此,isFilterPushedDown()必须对TableSourceapplyPredicate()调用返回的所有实例返回true 。

定义TableSink

A TableSink指定如何向Table外部系统或位置发射。该接口是通用的,因此它可以支持不同的存储位置和格式。批处理表和流表有不同的表接收器。

一般界面如下所示:

TableSink<T> {

  public TypeInformation<T> getOutputType();

  public String[] getFieldNames();

  public TypeInformation[] getFieldTypes();

  public TableSink<T> configure(String[] fieldNames, TypeInformation[] fieldTypes);
}
TableSink[T] {

  def getOutputType: TypeInformation<T>

  def getFieldNames: Array[String]

  def getFieldTypes: Array[TypeInformation]

  def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T]
}

TableSink#configure调用该方法以传递表的模式(字段名称和类型)以发送到TableSink。该方法必须返回TableSink的新实例,该实例配置为发出提供的表模式。

BatchTableSink

定义外部TableSink以发出批处理表。

界面如下:

BatchTableSink<T> extends TableSink<T> {

  public void emitDataSet(DataSet<T> dataSet);
}
BatchTableSink[T] extends TableSink[T] {

  def emitDataSet(dataSet: DataSet[T]): Unit
}

AppendStreamTableSink

定义外部TableSink以发出仅具有插入更改的流表。

界面如下:

AppendStreamTableSink<T> extends TableSink<T> {

  public void emitDataStream(DataStream<T> dataStream);
}
AppendStreamTableSink[T] extends TableSink[T] {

  def emitDataStream(dataStream: DataStream<T>): Unit
}

如果还通过更新或删除更改来修改表,TableException则将抛出a。

RetractStreamTableSink

定义外部TableSink以发出包含插入,更新和删除更改的流表。

界面如下:

RetractStreamTableSink<T> extends TableSink<Tuple2<Boolean, T>> {

  public TypeInformation<T> getRecordType();

  public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

  def getRecordType: TypeInformation[T]

  def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}

该表将被转换为累积和撤销消息流,这些消息被编码为Java Tuple2。第一个字段是一个布尔标志,用于指示消息类型(true表示插入,false表示删除)。第二个字段保存所请求类型的记录T

UpsertStreamTableSink

定义外部TableSink以发出包含插入,更新和删除更改的流表。

界面如下:

UpsertStreamTableSink<T> extends TableSink<Tuple2<Boolean, T>> {

  public void setKeyFields(String[] keys);

  public void setIsAppendOnly(boolean isAppendOnly);

  public TypeInformation<T> getRecordType();

  public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

  def setKeyFields(keys: Array[String]): Unit

  def setIsAppendOnly(isAppendOnly: Boolean): Unit

  def getRecordType: TypeInformation[T]

  def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}

该表必须具有唯一的键字段(原子或复合)或仅附加。如果表没有唯一键并且不是仅附加,TableException则将抛出a。表的唯一键由该UpsertStreamTableSink#setKeyFields()方法配置。

该表将被转换为upsert和delete消息流,这些消息被编码为Java Tuple2。第一个字段是一个布尔标志,用于指示消息类型。第二个字段保存所请求类型的记录T

具有true布尔字段的消息是已配置Keys的upsert消息。带有错误标志的消息是已配置Keys的删除消息。如果表是仅附加的,则所有消息都将具有true标志,并且必须解释为插入。

定义TableFactory

A TableFactory允许从基于字符串的属性创建不同的表相关实例。调用所有可用工厂以匹配给定的属性集和相应的工厂类。

工厂利用Java服务提供者接口(SPI)进行发现。这意味着每个依赖项和JAR文件都应该org.apache.flink.table.factories.TableFactoryMETA_INF/services资源目录中包含一个文件,该文件列出了它提供的所有可用表工厂。

每个表工厂都需要实现以下接口:

package org.apache.flink.table.factories;

interface TableFactory {

  Map<String, String> requiredContext();

  List<String> supportedProperties();
}
package org.apache.flink.table.factories

trait TableFactory {

  def requiredContext(): util.Map[String, String]

  def supportedProperties(): util.List[String]
}
  • requiredContext():指定已为此工厂实现的上下文。如果满足指定的属性和值集,框架保证仅匹配此工厂。典型属性可能是connector.typeformat.typeupdate-mode。诸如connector.property-version和的属性键format.property-version保存用于将来的向后兼容性情况。
  • supportedProperties:此工厂可以处理的属性键列表。此方法将用于验证。如果传递了该工厂无法处理的属性,则会抛出异常。该列表不得包含上下文指定的键。

为了创建特定实例,工厂类可以实现以下提供的一个或多个接口org.apache.flink.table.factories

  • BatchTableSourceFactory:创建批处理表源。
  • BatchTableSinkFactory:创建批处理表接收器。
  • StreamTableSoureFactory:创建流表源。
  • StreamTableSinkFactory:创建流表接收器。
  • DeserializationSchemaFactory:创建反序列化架构格式。
  • SerializationSchemaFactory:创建序列化架构格式。

工厂的发现发生在多个阶段:

  • 发现所有可用的工厂。
  • 按工厂类别过滤(例如StreamTableSourceFactory)。
  • 通过匹配上下文过滤。
  • 按支持的属性过滤。
  • 验证一个工厂是否匹配,否则抛出一个AmbiguousTableFactoryExceptionNoMatchingTableFactoryException

以下示例显示如何为参数化提供带有附加connector.debug属性标志的自定义流式源。

import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

class MySystemTableSourceFactory extends StreamTableSourceFactory<Row> {

  @Override
  public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put("update-mode", "append");
    context.put("connector.type", "my-system");
    return context;
  }

  @Override
  public List<String> supportedProperties() {
    List<String> list = new ArrayList<>();
    list.add("connector.debug");
    return list;
  }

  @Override
  public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
    boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));

    # additional validation of the passed properties can also happen here

    return new MySystemAppendTableSource(isDebug);
  }
}
import java.util
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.types.Row

class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] {

  override def requiredContext(): util.Map[String, String] = {
    val context = new util.HashMap[String, String]()
    context.put("update-mode", "append")
    context.put("connector.type", "my-system")
    context
  }

  override def supportedProperties(): util.List[String] = {
    val properties = new util.ArrayList[String]()
    properties.add("connector.debug")
    properties
  }

  override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = {
    val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug"))

    # additional validation of the passed properties can also happen here

    new MySystemAppendTableSource(isDebug)
  }
}

在SQL客户端中使用TableFactory

在SQL客户端环境文件中,先前显示的工厂可以声明为:

tables:
 - name: MySystemTable
   type: source
   update-mode: append
   connector:
     type: my-system
     debug: true

YAML文件被转换为扁平字符串属性,并使用描述与外部系统的连接的属性调用表工厂:

update-mode=append
connector.type=my-system
connector.debug=true

注意属性,例如tables.#.name或是tables.#.typeSQL客户端细节,不会传递给任何工厂。该type属性决定,取决于运行环境,无论是BatchTableSourceFactory/ StreamTableSourceFactory(对source),一个BatchTableSinkFactory/ StreamTableSinkFactory(对于sink),或两者(对both)需要发现。

在Table&SQL API中使用TableFactory

对于具有解释性Scaladoc / Javadoc的类型安全的编程方法,Table&SQL API提供org.apache.flink.table.descriptors了转换为基于字符串的属性的描述符。请参阅源,接收器和格式的内置描述符作为参考。

MySystem我们示例中的连接器可以扩展ConnectorDescriptor,如下所示:

import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.DescriptorProperties;

/**
  * Connector to MySystem with debug mode.
  */
public class MySystemConnector extends ConnectorDescriptor {

  public final boolean isDebug;

  public MySystemConnector(boolean isDebug) {
    super("my-system", 1, false);
    this.isDebug = isDebug;
  }

  @Override
  public void addConnectorProperties(DescriptorProperties properties) {
    properties.putString("connector.debug", Boolean.toString(isDebug));
  }
}
import org.apache.flink.table.descriptors.ConnectorDescriptor
import org.apache.flink.table.descriptors.DescriptorProperties

/**
  * Connector to MySystem with debug mode.
  */
class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, formatNeeded = false) {

  override protected def addConnectorProperties(properties: DescriptorProperties): Unit = {
    properties.putString("connector.debug", isDebug.toString)
  }
}

然后可以在API中使用描述符,如下所示:

StreamTableEnvironment tableEnv = // ...

tableEnv
  .connect(new MySystemConnector(true))
  .inAppendMode()
  .registerTableSource("MySystemTable");
val tableEnv: StreamTableEnvironment = // ... 
tableEnv
  .connect(new MySystemConnector(isDebug = true))
  .inAppendMode()
  .registerTableSource("MySystemTable")


回到顶部