Skip to content

连接到外部系统

Flink的Table API和SQL程序可以连接到其他外部系统,以便读取和写入批处理表和流表。表源(table source)提供对存储在外部系统(例如数据库,键值存储,消息队列或文件系统)中的数据的访问。表接收器(talbe sink)将表发送到外部存储系统。根据源和接收器的类型,它们支持不同的格式,如 CSV,Parquet 或 ORC。

本文介绍如何声明内置表源和/或表接收器,并在 Flink 中注册它们。注册源或接收器后,可以通过 Table API 和 SQL 语句访问它。

注意如果要实现自己的 自定义 表源或接收器,请查看后面用户定义的源和接收器

依赖

下表列出了所有可用的连接器和格式。它们的相互兼容性在表连接器表格格式的相应部分中标记。下表提供了使用构建自动化工具(如 Maven 或 SBT)和带有 SQL JAR 包的 SQL Client 的两个项目的依赖关系信息。

连接器Connectors

Name Version Maven dependency SQL Client JAR
Filesystem   Built-in Built-in
Elasticsearch 6 flink-connector-elasticsearch6 Download
Apache Kafka 0.8 flink-connector-kafka-0.8 Not available
Apache Kafka 0.9 flink-connector-kafka-0.9 Download
Apache Kafka 0.10 flink-connector-kafka-0.10 Download
Apache Kafka 0.11 flink-connector-kafka-0.11 Download
Apache Kafka 0.11+ (universal) flink-connector-kafka Download

格式 Formats

Name Maven dependency SQL Client JAR
CSV Built-in Built-in
JSON flink-json Download
Apache Avro flink-avro Download

概述

从 Flink 1.6 开始,与外部系统的连接声明与实际实现分离。

也可以指定连接

  • 以编程方式在 Table 和 SQL API 的 org.apache.flink.table.descriptors 下使用 Descriptor
  • 声明性地 通过 SQL 客户端的YAML配置文件

这不仅可以更好地统一 API 和 SQL Client,还可以在自定义实现的情况下实现更好的可扩展性,而无需更改实际声明。

每个声明都类似于SQL CREATE TABLE 语句。可以预先定义表的名称,表的模式(schema),连接器以及用于连接到外部系统的数据格式。

连接器 描述了存储表数据的外部系统。可以在此处声明 Apacha Kafka 或常规文件系统等存储系统。连接器可能已经提供了具有字段和架构的固定格式。

某些系统支持不同的 数据格式。例如,存储在 Kafka 或文件中的表可以使用 CSV,JSON 或 Avro 对其行进行编码。数据库连接器可能需要此处的 table schema。每个连接器都记录了存储系统是否需要定义格式。不同的系统还需要不同类型的格式(例如,面向列的格式(column-oriented formats)与面向行的格式(row-oriented formats))。该文档说明了哪些格式类型和连接器兼容。

表模式(table schema) 定义了向 SQL 查询公开的表的模式。它描述了 source 如何将数据格式映射到表模式,接收器(sink)同样也是。schema 可以访问连接器或格式定义的字段。它可以使用一个或多个字段来提取或插入时间属性。如果输入字段没有确定性字段顺序,则模式清楚地定义列名称,它们的顺序和原点。

后续部分将更详细地介绍每个定义部分(连接器格式模式)。以下示例显示了如何传递它们:

tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .registerTableSource("MyTable")
name: MyTable
type: source
update-mode: append
connector: ...
format: ...
schema: ...

表的类型(sourcesinkboth)决定了表的注册方式。如果表类型为 both ,则表源和表接收器都以相同的名称注册。从逻辑上讲,这意味着我们可以读取和写入这样的表,类似于常规 DBMS 中的表。

对于流式查询,更新模式声明如何在动态表和存储系统之间进行通信以进行连续查询。

以下代码显示了如何连接到Kafka以读取Avro记录的完整示例。

tableEnvironment
  // declare the external system to connect to
  .connect(
    new Kafka()
      .version("0.10")
      .topic("test-input")
      .startFromEarliest()
      .property("zookeeper.connect", "localhost:2181")
      .property("bootstrap.servers", "localhost:9092")
  )

  // declare a format for this system
  .withFormat(
    new Avro()
      .avroSchema(
        "{" +
        "  \"namespace\": \"org.myorganization\"," +
        "  \"type\": \"record\"," +
        "  \"name\": \"UserMessage\"," +
        "    \"fields\": [" +
        "      {\"name\": \"timestamp\", \"type\": \"string\"}," +
        "      {\"name\": \"user\", \"type\": \"long\"}," +
        "      {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
        "    ]" +
        "}" +
      )
  )

  // declare the schema of the table
  .withSchema(
    new Schema()
      .field("rowtime", Types.SQL_TIMESTAMP)
        .rowtime(new Rowtime()
          .timestampsFromField("timestamp")
          .watermarksPeriodicBounded(60000)
        )
      .field("user", Types.LONG)
      .field("message", Types.STRING)
  )

  // specify the update-mode for streaming tables
  .inAppendMode()

  // register as source, sink, or both and under a name
  .registerTableSource("MyUserTable");
tables:
  - name: MyUserTable      # name the new table
    type: source           # declare if the table should be "source", "sink", or "both"
    update-mode: append    # specify the update-mode for streaming tables

    # declare the external system to connect to
    connector:
      type: kafka
      version: "0.10"
      topic: test-input
      startup-mode: earliest-offset
      properties:
        - key: zookeeper.connect
          value: localhost:2181
        - key: bootstrap.servers
          value: localhost:9092

    # declare a format for this system
    format:
      type: avro
      avro-schema: >
        {
          "namespace": "org.myorganization",
          "type": "record",
          "name": "UserMessage",
            "fields": [
              {"name": "ts", "type": "string"},
              {"name": "user", "type": "long"},
              {"name": "message", "type": ["string", "null"]}
            ]
        }

    # declare the schema of the table
    schema:
      - name: rowtime
        type: TIMESTAMP
        rowtime:
          timestamps:
            type: from-field
            from: ts
          watermarks:
            type: periodic-bounded
            delay: "60000"
      - name: user
        type: BIGINT
      - name: message
        type: VARCHAR

在两种方式中,所需的连接属性都转换为规范化的,基于字符串的键值对。所谓的表工厂(table factories)从键值对创建配置的表源,表接收器和相应的格式。在搜索完全匹配的表工厂时,会考虑通过 Java 的服务提供程序接口(SPI) 找到的所有表工厂。

如果找不到工厂或多个工厂匹配给定的属性,则会抛出一个异常,其中包含有关已考虑的工厂和支持的属性的其他信息。

表模式 Table Schema

表模式定义列的名称和类型,类似于SQL CREATE TABLE 语句的列定义。此外,可以指定列的表示方式和表格数据编码格式的字段 如果列的名称应与输入/输出格式不同,则字段的来源可能很重要。 例如,列 user_name 应从 JSON 格式引用字段 $$-user-name。此外,schema 需要将类型从外部系统映射到Flink的表示 对于表接收器,它确保仅将具有有效 schema 的数据写入外部系统。

以下示例显示了没有时间属性的简单模式以及输入/输出到表列的一对一字段映射。

.withSchema(
  new Schema()
    .field("MyField1", Types.INT)     // required: specify the fields of the table (in this order)
    .field("MyField2", Types.STRING)
    .field("MyField3", Types.BOOLEAN)
)
schema:
  - name: MyField1    # required: specify the fields of the table (in this order)
    type: INT
  - name: MyField2
    type: VARCHAR
  - name: MyField3
    type: BOOLEAN

对于 每个字段,除了列的名称和类型之外,还可以声明以下属性:

.withSchema(
  new Schema()
    .field("MyField1", Types.SQL_TIMESTAMP)
      .proctime()      // optional: declares this field as a processing-time attribute
    .field("MyField2", Types.SQL_TIMESTAMP)
      .rowtime(...)    // optional: declares this field as a event-time attribute
    .field("MyField3", Types.BOOLEAN)
      .from("mf3")     // optional: original field in the input that is referenced/aliased by this field
)
schema:
  - name: MyField1
    type: TIMESTAMP
    proctime: true    # optional: boolean flag whether this field should be a processing-time attribute
  - name: MyField2
    type: TIMESTAMP
    rowtime: ...      # optional: wether this field should be a event-time attribute
  - name: MyField3
    type: BOOLEAN
    from: mf3         # optional: original field in the input that is referenced/aliased by this field

使用无界流表时,时间属性是必不可少的。因此,处理时间和事件时间(也称为“rowtime”)属性都可以定义为模式的一部分。

有关Flink中时间处理的更多信息,特别是事件时间,我们建议使用常规事件时间部分

Rowtime 属性

为了控制表的事件时间行为,Flink 提供了预定义的时间戳提取器和水印策略。

支持以下时间戳提取器:

// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute.
// 将输入中的现有 LONG 或 SQL_TIMESTAMP 字段转换为 rowtime 属性。
.rowtime(
  new Rowtime()
    .timestampsFromField("ts_field")    // required: original field name in the input
)

// Converts the assigned timestamps from a DataStream API record into the rowtime attribute
// 将分配的时间戳从DataStream API记录转换为rowtime属性
// and thus preserves the assigned timestamps from the source.
// 从而保留来自源的指定时间戳。
// This requires a source that assigns timestamps (e.g., Kafka 0.10+).
// 这需要一个分配时间戳的来源(例如,Kafka 0.10+)。
.rowtime(
  new Rowtime()
    .timestampsFromSource()
)

// Sets a custom timestamp extractor to be used for the rowtime attribute.
// 设置要用于rowtime属性的自定义时间戳提取器。
// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
// 提取器必须扩展 `org.apache.flink.table.sources.tsextractors.TimestampExtractor`。
.rowtime(
  new Rowtime()
    .timestampsFromExtractor(...)
)
# Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
rowtime:
  timestamps:
    type: from-field
    from: "ts_field"                 # required: original field name in the input

# Converts the assigned timestamps from a DataStream API record into the rowtime attribute
# and thus preserves the assigned timestamps from the source.
rowtime:
  timestamps:
    type: from-source

支持以下水印策略:

// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
// observed timestamp so far minus 1\. Rows that have a timestamp equal to the max timestamp
// are not late.
.rowtime(
  new Rowtime()
    .watermarksPeriodicAscending()
)

// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
// Emits watermarks which are the maximum observed timestamp minus the specified delay.
.rowtime(
  new Rowtime()
    .watermarksPeriodicBounded(2000)    // delay in milliseconds
)

// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
// underlying DataStream API and thus preserves the assigned watermarks from the source.
.rowtime(
  new Rowtime()
    .watermarksFromSource()
)
# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
# observed timestamp so far minus 1\. Rows that have a timestamp equal to the max timestamp
# are not late.
rowtime:
  watermarks:
    type: periodic-ascending

# Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
# Emits watermarks which are the maximum observed timestamp minus the specified delay.
rowtime:
  watermarks:
    type: periodic-bounded
    delay: ...                # required: delay in milliseconds

# Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
# underlying DataStream API and thus preserves the assigned watermarks from the source.
rowtime:
  watermarks:
    type: from-source

确保始终声明时间戳和水印。触发基于时间的操作需要水印。

类型字符串

由于类型信息仅在编程语言中可用,因此支持在 YAML 文件中定义以下类型字符串:

VARCHAR
BOOLEAN
TINYINT
SMALLINT
INT
BIGINT
FLOAT
DOUBLE
DECIMAL
DATE
TIME
TIMESTAMP
MAP<fieldtype, fieldtype>        # generic map; e.g. MAP<VARCHAR, INT> that is mapped to Flink's MapTypeInfo
MULTISET<fieldtype>              # multiset; e.g. MULTISET<VARCHAR> that is mapped to Flink's MultisetTypeInfo
PRIMITIVE_ARRAY<fieldtype>       # primitive array; e.g. PRIMITIVE_ARRAY<INT> that is mapped to Flink's PrimitiveArrayTypeInfo
OBJECT_ARRAY<fieldtype>          # object array; e.g. OBJECT_ARRAY<POJO(org.mycompany.MyPojoClass)> that is mapped to
                                 #   Flink's ObjectArrayTypeInfo
ROW<fieldtype, ...>              # unnamed row; e.g. ROW<VARCHAR, INT> that is mapped to Flink's RowTypeInfo
                                 #   with indexed fields names f0, f1, ...
ROW<fieldname fieldtype, ...>    # named row; e.g., ROW<myField VARCHAR, myOtherField INT> that
                                 #   is mapped to Flink's RowTypeInfo
POJO<class>                      # e.g., POJO<org.mycompany.MyPojoClass> that is mapped to Flink's PojoTypeInfo
ANY<class>                       # e.g., ANY<org.mycompany.MyClass> that is mapped to Flink's GenericTypeInfo
ANY<class, serialized>           # used for type information that is not supported by Flink's Table & SQL API

更新模式

对于流式查询,需要声明如何在动态表和外部连接器之间执行转换更新模式 指定应与外部系统交换哪种消息:

追加模式(Append Mode): 在追加模式下,动态表和外部连接器仅交换 INSERT 消息。

回退模式(Retract Mode): 在收回模式下,动态表和外部连接器交换 ADD 和 RETRACT 消息。INSERT 更改被编码为 ADD 消息,DELETE 更改被编码为 RETRACT 消息,UPDATE 更改被编码为更新(前一个)行的 RETRACT 消息和更新(新)行的 ADD 消息。在此模式下,不能定义与 upsert 模式相反的 key。但是,每次更新都包含两个效率较低的消息。

Upsert模式(Upsert Mode): 在upsert模式下,动态表和外部连接器交换 UPSERT 和 DELETE 消息。此模式需要一个(可能是复合的)唯一 Key,通过该Key可以传播更新。外部连接器需要知道唯一 Key 属性才能正确应用消息。 INSERT 和 UPDATE 更改被编码为 UPSERT 消息。DELETE 更改为 DELETE 消息。与回退(retract)流的主要区别在于 UPDATE 更改使用单个消息进行编码,因此更有效。

注意,每个连接器的文档都说明了支持哪种更新模式。

.connect(...)
  .inAppendMode()    // otherwise: inUpsertMode() or inRetractMode()
tables:
  - name: ...
    update-mode: append    # otherwise: "retract" or "upsert"

有关更多信息,请参见通用流概念文档

表连接器 Table Connectors

Flink 提供了一组用于连接外部系统的连接器。

请注意,并非所有连接器都可用于批量和流式传输。此外,并非每个流连接器都支持每种流模式。因此,每个连接器都相应地做了标记。格式标记表示连接器需要某种类型的格式。

文件系统连接器 File System Connector

源: 批处理源: 流追加模式接收器: 批处理接收器: 流追加模式格式: CSV-only

文件系统连接器允许从本地或分布式文件系统进行读写。文件系统可以定义为:

.connect(
  new FileSystem()
    .path("file:///path/to/whatever")    // required: path to a file or directory
)
connector:
  type: filesystem
  path: "file:///path/to/whatever"    # required: path to a file or directory

文件系统连接器本身包含在Flink中,不需要额外的依赖项。需要指定相应的格式,以便从文件系统读取和写入行。

注意,确保包含特定于 Flink 文件系统的依赖项.。

注意,用于流式传输的文件系统源和接收器仅是实验性的。将来,我们将支持实际的流用例,即目录监控和桶(bucket)输出。

Kafka 连接器 Kafka Connector

源: 流追加模式接收器: 流追加模式格式: 序列化模式格式: 反序列化模式

Kafka 连接器允许从 Apache Kafka topic 读取和写入。它可以定义如下:

.connect(
  new Kafka()
    .version("0.11")    // required: valid connector versions are
                        //   "0.8", "0.9", "0.10", "0.11", and "universal"
    .topic("...")       // required: topic name from which the table is read

    // optional: connector specific properties
    .property("zookeeper.connect", "localhost:2181")
    .property("bootstrap.servers", "localhost:9092")
    .property("group.id", "testGroup")

    // optional: select a startup mode for Kafka offsets
    .startFromEarliest()
    .startFromLatest()
    .startFromSpecificOffsets(...)

    // optional: output partitioning from Flink's partitions into Kafka's partitions
    .sinkPartitionerFixed()         // each Flink partition ends up in at-most one Kafka partition (default)
    .sinkPartitionerRoundRobin()    // a Flink partition is distributed to Kafka partitions round-robin
    .sinkPartitionerCustom(MyCustom.class)    // use a custom FlinkKafkaPartitioner subclass
)
connector:
  type: kafka
  version: "0.11"     # required: valid connector versions are
                      #   "0.8", "0.9", "0.10", "0.11", and "universal"
  topic: ...          # required: topic name from which the table is read

  properties:         # optional: connector specific properties
    - key: zookeeper.connect
      value: localhost:2181
    - key: bootstrap.servers
      value: localhost:9092
    - key: group.id
      value: testGroup

  startup-mode: ...   # optional: valid modes are "earliest-offset", "latest-offset",
                      # "group-offsets", or "specific-offsets"
  specific-offsets:   # optional: used in case of startup mode with specific offsets
    - partition: 0
      offset: 42
    - partition: 1
      offset: 300

  sink-partitioner: ...    # optional: output partitioning from Flink's partitions into Kafka's partitions
                           # valid are "fixed" (each Flink partition ends up in at most one Kafka partition),
                           # "round-robin" (a Flink partition is distributed to Kafka partitions round-robin)
                           # "custom" (use a custom FlinkKafkaPartitioner subclass)
  sink-partitioner-class: org.mycompany.MyPartitioner  # optional: used in case of sink partitioner custom

指定开始读取位置: 默认情况下,Kafka 源将开始从 Zookeeper 或 Kafka brokers 中提交的组偏移量读取数据。您可以指定其他起始位置,这些位置对应于Kafka 消费者起始位置配置.部分中的配置

Flink-Kafka 接收器分区: 默认情况下,Kafka sink 写入的分区数最多与其自身的并行性相同的分区(每个并行的 sink 实例只写入一个分区)。为了将写操作分发到更多的分区或控制行到分区的路由,可以提供自定义接收器分区器。循环分区器有助于避免不平衡的分区。但是,它会在所有 Flink 实例和所有 Kafka 代理之间造成大量的网络连接。

一致性保证: 默认情况下,如果查询是在启用检查点(checkpointing enabled)的情况下执行的,Kafka 接收器会将至少有一次保证的数据提取到到 Kafka 主题中

Kafka 0.10+ 时间戳: 自 Kafka 0.10, Kafka 消息的时间戳作为元数据,它指定记录写入 Kafka 主题的时间。通过分别在 YAML 中选择 timestamps: from-source 和在Java/Scala中选择 timestampsFromSource(),可以将这些时间戳用于 rowtime属性

Kafka 0.11+版本控制: 由于Flink 1.7, Kafka 连接器定义应该独立于硬编码(hard-coded)的 Kafka 版本。使用连接器版本 universal 作为 Flink 的 Kafka 连接器的通配符,该连接器从 0.11 开始兼容所有 Kafka 版本。

确保添加特定于版本的 Kafka 依赖项。此外,还需要为读写 Kafka 之间的行指定相应的格式。

Elasticsearch 连接器 Elasticsearch Connector

接收器: 流追加模式接收器: 流 Upsert 模式格式: JSON-only

Elasticsearch 连接器允许写入 Elasticsearch 搜索引擎的索引。

连接器可以在 upsert模式 下运行,以使用查询定义的密钥与外部系统交换 UPSERT/DELETE 消息。

对于仅追加查询,连接器也可以在追加模式下操作,以便仅与外部系统交换 INSERT 消息。如果查询未定义任何键,则 Elasticsearch 会自动生成一个键。

连接器可以定义如下:

.connect(
  new Elasticsearch()
    .version("6")                      // required: valid connector versions are "6"
    .host("localhost", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
    .index("MyUsers")                  // required: Elasticsearch index
    .documentType("user")              // required: Elasticsearch document type

    .keyDelimiter("$")        // optional: delimiter for composite keys ("_" by default)
                              //   e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
    .keyNullLiteral("n/a")    // optional: representation for null fields in keys ("null" by default)

    // optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
    .failureHandlerFail()          // optional: throws an exception if a request fails and causes a job failure
    .failureHandlerIgnore()        //   or ignores failures and drops the request
    .failureHandlerRetryRejected() //   or re-adds requests that have failed due to queue capacity saturation
    .failureHandlerCustom(...)     //   or custom failure handling with a ActionRequestFailureHandler subclass

    // optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
    .disableFlushOnCheckpoint()    // optional: disables flushing on checkpoint (see notes below!)
    .bulkFlushMaxActions(42)       // optional: maximum number of actions to buffer for each bulk request
    .bulkFlushMaxSize("42 mb")     // optional: maximum size of buffered actions in bytes per bulk request
                                   //   (only MB granularity is supported)
    .bulkFlushInterval(60000L)     // optional: bulk flush interval (in milliseconds)

    .bulkFlushBackoffConstant()    // optional: use a constant backoff type
    .bulkFlushBackoffExponential() //   or use an exponential backoff type
    .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
    .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff attempt (in milliseconds)

    // optional: connection properties to be used during REST communication to Elasticsearch
    .connectionMaxRetryTimeout(3)  // optional: maximum timeout (in milliseconds) between retries
    .connectionPathPrefix("/v1")   // optional: prefix string to be added to every REST communication
)
connector:
  type: elasticsearch
  version: 6                # required: valid connector versions are "6"
    hosts:                  # required: one or more Elasticsearch hosts to connect to
      - hostname: "localhost"
        port: 9200
        protocol: "http"
    index: "MyUsers"        # required: Elasticsearch index
    document-type: "user"   # required: Elasticsearch document type

    key-delimiter: "$"      # optional: delimiter for composite keys ("_" by default)
                            #   e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
    key-null-literal: "n/a" # optional: representation for null fields in keys ("null" by default)

    # optional: failure handling strategy in case a request to Elasticsearch fails ("fail" by default)
    failure-handler: ...    # valid strategies are "fail" (throws an exception if a request fails and
                            #   thus causes a job failure), "ignore" (ignores failures and drops the request),
                            #   "retry-rejected" (re-adds requests that have failed due to queue capacity
                            #   saturation), or "custom" for failure handling with a
                            #   ActionRequestFailureHandler subclass

    # optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
    flush-on-checkpoint: true   # optional: disables flushing on checkpoint (see notes below!) ("true" by default)
    bulk-flush:
      max-actions: 42           # optional: maximum number of actions to buffer for each bulk request
      max-size: 42 mb           # optional: maximum size of buffered actions in bytes per bulk request
                                #   (only MB granularity is supported)
      interval: 60000           # optional: bulk flush interval (in milliseconds)
      back-off:                 # optional: backoff strategy ("disabled" by default)
        type: ...               #   valid strategies are "disabled", "constant", or "exponential"
        max-retries: 3          # optional: maximum number of retries
        delay: 30000            # optional: delay between each backoff attempt (in milliseconds)

    # optional: connection properties to be used during REST communication to Elasticsearch
    connection-max-retry-timeout: 3   # optional: maximum timeout (in milliseconds) between retries
    connection-path-prefix: "/v1"     # optional: prefix string to be added to every REST communication

批量刷新(Bulk flushing): 有关可选刷新参数的特征的更多信息,请参阅相应的文档

禁用检查点上的刷新(Disabling flushing on checkpoint): 禁用时,接收器不会等待检查点上的 Elasticsearch 确认所有挂起的操作请求。因此,接收器不能为至少一次的操作请求交付提供任何强有力的保证。

密钥提取(Key extraction): Flink 自动从查询中提取有效密钥。例如,查询 SELECT a, b, c FROM t GROUP BY a, b 定义字段 ab 的复合键。Elasticsearch 连接器通过使用键分隔符连接查询中定义的顺序中的所有键字段,为每一行生成文档 ID 字符串。可以定义关键字段的空文字的自定义表示。

注意,JSON 格式定义了如何为外部系统编码文档,因此,必须将其添加为依赖项

表格

Flink 提供了一组可与表连接器一起使用的表格式。

格式标记表示与连接器匹配的格式类型。

CSV 格式

CSV 格式允许读取和写入以逗号分隔的行。

.withFormat(
  new Csv()
    .field("field1", Types.STRING)    // required: ordered format fields
    .field("field2", Types.TIMESTAMP)
    .fieldDelimiter(",")              // optional: string delimiter "," by default
    .lineDelimiter("\n")              // optional: string delimiter "\n" by default
    .quoteCharacter('"')              // optional: single character for string values, empty by default
    .commentPrefix('#')               // optional: string to indicate comments, empty by default
    .ignoreFirstLine()                // optional: ignore the first line, by default it is not skipped
    .ignoreParseErrors()              // optional: skip records with parse error instead of failing by default
)
format:
  type: csv
  fields:                    # required: ordered format fields
    - name: field1
      type: VARCHAR
    - name: field2
      type: TIMESTAMP
  field-delimiter: ","       # optional: string delimiter "," by default
  line-delimiter: "\n"       # optional: string delimiter "\n" by default
  quote-character: '"'       # optional: single character for string values, empty by default
  comment-prefix: '#'        # optional: string to indicate comments, empty by default
  ignore-first-line: false   # optional: boolean flag to ignore the first line, by default it is not skipped
  ignore-parse-errors: true  # optional: skip records with parse error instead of failing by default

CSV 格式包含在 Flink 中,不需要其他依赖项。

注意,目前写入行的 CSV 格式有限。仅支持自定义字段分隔符作为可选参数。

JSON 格式

格式:Serialization Schema 格式:Deserialization Schema

JSON 格式允许读写与给定格式模式对应的 JSON 数据。可以将格式模式定义为 Flink 类型、JSON 模式或从所需的表模式派生出来。Flink 类型支持更类似 SQL 的定义并映射到相应的 SQL 数据类型。JSON 模式支持更复杂的嵌套结构。

如果格式模式等于表模式,也可以自动派生该模式。这只允许定义模式信息一次。格式的名称、类型和字段顺序由表的模式决定。如果时间属性的原点不是字段,则忽略它们。表模式中的 from 定义被解释为格式中的字段重命名。

.withFormat(
  new Json()
    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default

    // required: define the schema either by using type information which parses numbers to corresponding types
    .schema(Type.ROW(...))

    // or by using a JSON schema which parses to DECIMAL and TIMESTAMP
    .jsonSchema(
      "{" +
      "  type: 'object'," +
      "  properties: {" +
      "    lon: {" +
      "      type: 'number'" +
      "    }," +
      "    rideTime: {" +
      "      type: 'string'," +
      "      format: 'date-time'" +
      "    }" +
      "  }" +
      "}"
    )

    // or use the table's schema
    .deriveSchema()
)
format:
  type: json
  fail-on-missing-field: true   # optional: flag whether to fail if a field is missing or not, false by default

  # required: define the schema either by using a type string which parses numbers to corresponding types
  schema: "ROW(lon  FLOAT,  rideTime  TIMESTAMP)"

  # or by using a JSON schema which parses to DECIMAL and TIMESTAMP
  json-schema: >
    {
      type: 'object',
      properties: {
        lon: {
          type: 'number'
        },
        rideTime: {
          type: 'string',
          format: 'date-time'
        }
      }
    }

  # or use the table's schema
  derive-schema: true

下表显示了 JSON 模式类型到 Flink SQL 类型的映射:

JSON schema Flink SQL
object ROW
boolean BOOLEAN
array ARRAY[_]
number DECIMAL
integer DECIMAL
string VARCHAR
string with format: date-time TIMESTAMP
string with format: date DATE
string with format: time TIME
string with encoding: base64 ARRAY[TINYINT]
null NULL (unsupported yet)

目前,Flink 仅支持 JSON 模式规范 draft-07 的子集。Union 类型(以及allOf, anyOf, not)尚不支持。oneOf 和类型数组仅支持指定为 nullability。

支持链接到文档中的通用定义的简单引用,如下面更复杂的示例所示:

{  "definitions":  {  "address":  {  "type":  "object",  "properties":  {  "street_address":  {  "type":  "string"  },  "city":  {  "type":  "string"  },  "state":  {  "type":  "string"  }  },  "required":  [  "street_address",  "city",  "state"  ]  }  },  "type":  "object",  "properties":  {  "billing_address":  {  "$ref":  "#/definitions/address"  },  "shipping_address":  {  "$ref":  "#/definitions/address"  },  "optional_address":  {  "oneOf":  [  {  "type":  "null"  },  {  "$ref":  "#/definitions/address"  }  ]  }  }  }

缺少字段处理: 默认情况下,缺少的 JSON 字段设置为 null。如果缺少字段,可以启用严格的 JSON 解析来取消源(source)(和查询)。

确保将JSON格式添加为依赖项。

Avro 格式

格式: Serialization Schema 格式: Deserialization Schema

Apache Avro 格式允许读取和写入相对应的给定格式模式的 Avro 数据。格式模式(format schema)可以定义为 Avro 指定记录的完全 qualified 类名,也可以定义为 Avro 模式字符串。如果使用类名,则在运行时期间类必须在类路径中可用。

.withFormat(
  new Avro()

    // required: define the schema either by using an Avro specific record class
    .recordClass(User.class)

    // or by using an Avro schema
    .avroSchema(
      "{" +
      "  \"type\": \"record\"," +
      "  \"name\": \"test\"," +
      "  \"fields\" : [" +
      "    {\"name\": \"a\", \"type\": \"long\"}," +
      "    {\"name\": \"b\", \"type\": \"string\"}" +
      "  ]" +
      "}"
    )
)
format:
  type: avro

  # required: define the schema either by using an Avro specific record class
  record-class: "org.organization.types.User"

  # or by using an Avro schema
  avro-schema: >
    {
      "type": "record",
      "name": "test",
      "fields" : [
        {"name": "a", "type": "long"},
        {"name": "b", "type": "string"}
      ]
    }

Avro 类型映射到相应的SQL数据类型 仅支持 Union 类型以指定可为空(nullability),否则它们将转换为 ANY 类型。下表显示了映射:

Avro schema Flink SQL
record ROW
enum VARCHAR
array ARRAY[_]
map MAP[VARCHAR, _]
union non-null type or ANY
fixed ARRAY[TINYINT]
string VARCHAR
bytes ARRAY[TINYINT]
int INT
long BIGINT
float FLOAT
double DOUBLE
boolean BOOLEAN
int with logicalType: date DATE
int with logicalType: time-millis TIME
int with logicalType: time-micros INT
long with logicalType: timestamp-millis TIMESTAMP
long with logicalType: timestamp-micros BIGINT
bytes with logicalType: decimal DECIMAL
fixed with logicalType: decimal DECIMAL
null NULL (unsupported yet)

Avro 使用 Joda-Time 来表示特定记录类中的逻辑日期和时间类型。Joda-Time 依赖不是 Flink 分布式的一部分。因此,确保 Joda-Time 在运行时期间与特定记录类一起位于类路径中。通过模式字符串指定的 Avro 格式不需要 Joda-Time。

确保添加Apache Avro依赖项。

进一步的 TableSources 和 TableSinks

尚未将以下表源和接收器迁移(或尚未完全迁移)到新的统一接口。

这些是 Flink 提供的额外 TableSource

| 类名 | Maven 依赖项 | 批处理? | 流? | 描述 | | OrcTableSource | flink-orc | Y | N | ORC 文件的 TableSource。 |

这些是 Flink 提供的附加 TableSink

| 类名 | Maven 依赖 | 批处理? | 流? | 描述 | | CsvTableSink | flink-table | Y | Append | CSV 文件的简单 sink。 | | JDBCAppendTableSink | flink-jdbc | Y | Append | 将 JDBC 表写入 Table sink。 | | CassandraAppendTableSink | flink-connector-cassandra | N | Append | 写表到 Cassandra 表。 |

OrcTableSource

OrcTableSource 读取 ORC文件。ORC 是结构化数据的文件格式,并以压缩的列式表示形式存储数据。ORC 非常高效,支持投影(projection )和滤波器下推(filter push-down)。

创建 OrcTableSource,如下所示:

// create Hadoop Configuration
Configuration config = new Configuration();

OrcTableSource orcTableSource = OrcTableSource.builder()
  // path to ORC file(s). NOTE: By default, directories are recursively scanned.
  .path("file:///path/to/data")
  // schema of ORC files
  .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
  // Hadoop configuration
  .withConfiguration(config)
  // build OrcTableSource
  .build();
// create Hadoop Configuration val config = new Configuration()

val orcTableSource = OrcTableSource.builder()
  // path to ORC file(s). NOTE: By default, directories are recursively scanned.
  .path("file:///path/to/data")
  // schema of ORC files
  .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
  // Hadoop configuration
  .withConfiguration(config)
  // build OrcTableSource
  .build()

注意: OrcTableSource 尚不支持 ORC 的 Union 类型。

CsvTableSink

CsvTableSink 向一个或多个 CSV 文件发送(emits) Table

接收器只支持仅追加的流表(streaming tables)。它不能用于发出持续更新的 Table。有关详细信息,请参见表转换流中的文档。在发出流表时,至少写入一次行(如果启用了检查点),并且 CsvTableSink 不会将输出文件分割成桶文件,而是连续地写入相同的文件。

CsvTableSink sink = new CsvTableSink(
    path,                  // output path
    "|",                   // optional: delimit files by '|'
    1,                     // optional: write to a single file
    WriteMode.OVERWRITE);  // optional: override existing files

tableEnv.registerTableSink(
  "csvOutputTable",
  // specify table schema
  new String[]{"f0", "f1"},
  new TypeInformation[]{Types.STRING, Types.INT},
  sink);

Table table = ...
table.insertInto("csvOutputTable");
val sink: CsvTableSink = new CsvTableSink(
    path,                             // output path
    fieldDelim = "|",                 // optional: delimit files by '|'
    numFiles = 1,                     // optional: write to a single file
    writeMode = WriteMode.OVERWRITE)  // optional: override existing files
tableEnv.registerTableSink(
  "csvOutputTable",
  // specify table schema
  Array[String]("f0", "f1"),
  Array[TypeInformation[_]](Types.STRING, Types.INT),
  sink)

val table: Table = ???
table.insertInto("csvOutputTable")

JDBCAppendTableSink

JDBCAppendTableSinkTable 发送(emit)到 JDBC 连接器。接收器仅支持仅追加流表。它不能用于发出(emit)不断更新的 Table。有关详细信息,请参阅表到流转换的文档

JDBCAppendTableSink 将每个 Table 行至少插入一次数据库表(如果启用了检查点)。但是,可以使用 REPLACEINSERT OVERWRITE 指定插入查询(insertion query)以执行对数据库的 upsert 写入。

要使用 JDBC 接收器,必须将 JDBC 连接器依赖项(flink-jdbc)添加到项目中。然后,可以使用 JDBCAppendSinkBuilder 创建接收器:

JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
  .setDBUrl("jdbc:derby:memory:ebookshop")
  .setQuery("INSERT INTO books (id) VALUES (?)")
  .setParameterTypes(INT_TYPE_INFO)
  .build();

tableEnv.registerTableSink(
  "jdbcOutputTable",
  // specify table schema
  new String[]{"id"},
  new TypeInformation[]{Types.INT},
  sink);

Table table = ...
table.insertInto("jdbcOutputTable");
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
  .setDBUrl("jdbc:derby:memory:ebookshop")
  .setQuery("INSERT INTO books (id) VALUES (?)")
  .setParameterTypes(INT_TYPE_INFO)
  .build()

tableEnv.registerTableSink(
  "jdbcOutputTable",
  // specify table schema
  Array[String]("id"),
  Array[TypeInformation[_]](Types.INT),
  sink)

val table: Table = ???
table.insertInto("jdbcOutputTable")

与使用 JDBCOutputFormat 类似,必须显式指定 JDBC 驱动程序的名称,JDBC URL,要执行的查询以及 JDBC 表的字段类型。

CassandraAppendTableSink

CassandraAppendTableSink 向 Cassandra 表发送(emit)一个 Table。接收器仅支持仅追加流表。它不能用于发送(emit)不断更新的 Table。有关详细信息,请参阅表到流转换的文档

如果启用了检查点,CassandraAppendTableSink 会将所有行至少插入一次 Cassandra 表中。但是,可以将查询指定为 upsert 查询。

要使用 CassandraAppendTableSink,必须将 Cassandra 连接器依赖项(flink-connector-cassandra)添加到项目中。下面的示例显示了如何使用 CassandraAppendTableSink

ClusterBuilder builder = ... // configure Cassandra cluster connection

CassandraAppendTableSink sink = new CassandraAppendTableSink(
  builder,
  // the query must match the schema of the table
  "INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)");

tableEnv.registerTableSink(
  "cassandraOutputTable",
  // specify table schema
  new String[]{"id", "name", "value"},
  new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE},
  sink);

Table table = ...
table.insertInto(cassandraOutputTable);
val builder: ClusterBuilder = ... // configure Cassandra cluster connection
val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
  builder,
  // the query must match the schema of the table
  "INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)")

tableEnv.registerTableSink(
  "cassandraOutputTable",
  // specify table schema
  Array[String]("id", "name", "value"),
  Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE),
  sink)

val table: Table = ???
table.insertInto(cassandraOutputTable)


回到顶部