Skip to content

风暴兼容性Beta

译者:flink.sojb.cn

Flink流与Apache Storm接口兼容,因此允许重用为Storm实现的代码。

您可以:

  • Topology在Flink 执行整个Storm 。
  • 在Flink流处理节目中使用Storm Spout/ Bolt作为源/算子。

本文档介绍了如何在Flink中使用现有的Storm代码。

项目配置

译者:flink.sojb.cn

支持Storm包含在flink-stormMaven模块中。代码驻留在org.apache.flink.storm包中。

pom.xml如果要在Flink中执行Storm代码,请将以下依赖项添加到您的。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-storm_2.11</artifactId>
    <version>1.7-SNAPSHOT</version>
</dependency>

请注意:不要添加storm-core为依赖项。它已包含在内flink-storm

请注意flink-storm不是提供的二进制Flink发行版的一部分。因此,您需要flink-storm在提交给Flink的JobManager的程序jar(也称为uber-jar或fat-jar)中包含类(及其依赖项)。见_字计数风暴_中flink-storm-examples/pom.xml的一个例子,如何正确地打包罐。

如果你想避免大尤伯杯罐子,你可以手动复制storm-core-0.9.4.jarjson-simple-1.1.jarflink-storm-1.7-SNAPSHOT.jar进入Flink的lib/每个群集节点的文件夹(_之前_在启动群集)。对于这种情况,仅将您自己的Spout和Bolt类(及其内部依赖项)包含在程序jar中就足够了。

执行Storm拓扑

译者:flink.sojb.cn

Flink提供与Storm兼容的API(org.apache.flink.storm.api),它可以替代以下类:

  • StormSubmitter 取而代之 FlinkSubmitter
  • NimbusClientClient替换为FlinkClient
  • LocalCluster 取而代之 FlinkLocalCluster

为了向Flink提交Storm拓扑,只需使用_组装_拓扑的Storm _客户端代码_中的Flink替换来替换使用过的Storm类。实际的运行时代码,即Spouts和Bolts,可以不加_修改_地使用。如果拓扑在远程集群执行时,参数nimbus.hostnimbus.thrift.port被用作jobmanger.rpc.addressjobmanger.rpc.port分别。如果未指定参数,则取值flink-conf.yaml

TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder

// actual topology assembling code and used Spouts/Bolts can be used as-is
builder.setSpout("source", new FileSpout(inputFilePath));
builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");

Config conf = new Config();
if(runLocal) { // submit to test cluster
    // replaces: LocalCluster cluster = new LocalCluster();
    FlinkLocalCluster cluster = new FlinkLocalCluster();
    cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
} else { // submit to remote cluster
    // optional
    // conf.put(Config.NIMBUS_HOST, "remoteHost");
    // conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
    // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
    FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
}

在Flink流程序中嵌入Storm算子

译者:flink.sojb.cn

作为替代方案,Spouts和Bolts可以嵌入到常规流处理节目中。Storm兼容层为每个提供了一个打包类,即SpoutWrapperBoltWrapperorg.apache.flink.storm.wrappers)。

每默认情况下,打包转换风暴输出元组Flink的元组类型(即,Tuple0Tuple25根据风暴元组的字段数)。对于单场输出元组,也可以转换为字段的数据类型(例如,String代替Tuple1&lt;String&gt;)。

由于Flink无法推断Storm 算子的输出字段类型,因此需要手动指定输出类型。为了获得正确的TypeInformation对象,TypeExtractor可以使用Flink 。

嵌入Spouts

要将Spout用作Flink源,请使用StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)。Spout对象被传递给它的构造函数SpoutWrapper&lt;OUT&gt;,作为第一个参数addSource(...)。泛型类型声明OUT指定源输出流的类型。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// stream has `raw` type (single field output streams only)
DataStream<String> rawInput = env.addSource(
    new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
    TypeExtractor.getForClass(String.class)); // output type

// process data stream
[...]

如果Spout发出有限数量的元组,SpoutWrapper可以通过numberOfInvocations在其构造函数中设置参数来配置为自动终止。这允许Flink程序在处理完所有数据后自动关闭。默认情况下,程序将一直运行,直到手动取消

嵌入螺栓

要使用Bolt作为Flink 算子,请使用DataStream.transform(String, TypeInformation, OneInputStreamOperator)。Bolt对象被传递给它的构造函数BoltWrapper&lt;IN,OUT&gt;,作为最后一个参数transform(...)。泛型类型声明IN并分别OUT指定 算子的输入和输出流的类型。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile(localFilePath);

DataStream<Tuple2<String, Integer>> counts = text.transform(
    "tokenizer", // operator name
    TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // output type
    new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); // Bolt operator

// do further processing
[...]

嵌入式螺栓的命名属性访问

螺栓可以通过名称访问输入元组字段(另外通过索引访问)。要在嵌入式螺栓中使用此函数,您需要具有a

  1. POJO类型输入流或
  2. 元组类型输入流并指定输入模式(即名称到索引映射)

对于POJO输入类型,Flink通过反射访问字段。对于这种情况,Flink期望相应的公共成员变量或公共getter方法。例如,如果Bolt通过名称sentence(例如String s = input.getStringByField("sentence");)访问字段,则输入POJO类必须具有成员变量public String sentence;或方法public String getSentence() { ... };(注意驼峰式命名)。

对于Tuple输入类型,需要使用Storm的Fields类指定输入模式。对于这种情况,构造函数BoltWrapper需要另外一个参数:new BoltWrapper&lt;Tuple1&lt;String&gt;, ...&gt;(..., new Fields("sentence"))。输入类型是Tuple1&lt;String&gt;Fields("sentence")指定input.getStringByField("sentence")相当于input.getString(0)

有关示例,请参阅BoltTokenizerWordCountPojoBoltTokenizerWordCountWithNames

配置喷口和螺栓

在Storm中,Spouts和Bolts可以配置一个全局分布的Map对象,该对象被赋予submitTopology(...)方法LocalClusterStormSubmitter。这Map是由拓扑旁边的用户提供的,并作为参数转发给呼叫Spout.open(...)Bolt.prepare(...)。如果在Flink中使用FlinkTopologyBuilder等执行整个拓扑,则不需要特别注意 - 它与常规Storm一样。

对于嵌入式使用,必须使用Flink的配置机制。可以在StreamExecutionEnvironmentvia中设置全局配置.getConfig().setGlobalJobParameters(...)。Flink的常规Configuration课程可用于配置Spouts和Bolts。但是,Configuration不像Storm那样支持任意Keys数据类型(只String允许Keys)。因此,Flink还提供StormConfig了可以像raw一样使用的类,Map以提供与Storm的完全兼容性。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StormConfig config = new StormConfig();
// set config values
[...]

// set global Storm configuration
env.getConfig().setGlobalJobParameters(config);

// assemble program with embedded Spouts and/or Bolts
[...]

多输出流

Flink还可以处理Spout和Bolts的多个输出流的声明。如果在Flink中使用FlinkTopologyBuilder等执行整个拓扑,则不需要特别注意 - 它与常规Storm一样。

对于嵌入式使用,输出流将是数据类型SplitStreamType&lt;T&gt;,必须使用DataStream.split(...)和拆分SplitStream.select(...)。Flink提供预定义输出选择StormStreamSelector&lt;T&gt;.split(...)已经。此外,SplitStreamTuple&lt;T&gt;可以使用除去打包类型SplitStreamMapper&lt;T&gt;

[...]

// get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
DataStream<SplitStreamType<SomeType>> multiStream = ...

SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());

// remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType
DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);
DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);

// do further processing on s1 and s2
[...]

有关完整示例,请参阅SpoutSplitExample.java

Flink Extensions

译者:flink.sojb.cn

有限的喷口

在Flink中,流处理源可以是有限的,即发出有限数量的记录并在发出最后一条记录后停止。但是,Spouts通常会发出无限的流。两种方法之间的桥接是FiniteSpout除了IRichSpout包含reachedEnd()方法之外的接口,其中用户可以指定停止条件。用户可以通过实现此接口而不是(或另外)来创建有限Spout IRichSpout,并reachedEnd()另外实现该方法。与SpoutWrapper配置为发出有限数量的元组的FiniteSpout接口相比,接口允许实现更复杂的终止标准。

尽管有限的Spout不需要将Spouts嵌入到Flink流程序中或向Flink提交整个Storm拓扑,但有些情况下它们可能会派上用场:

  • 实现原生Spout的行为与有限Flink源相同,只需要很少的修改
  • 用户想要只处理一段时间; 之后,Spout可以自动停止
  • 将文件读入流中
  • 用于测试目的

有限Spout的示例,仅发出10秒的记录:

public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
    [...] // implement open(), nextTuple(), ...

    private long starttime = System.currentTimeMillis();

    public boolean reachedEnd() {
        return System.currentTimeMillis() - starttime > 10000l;
    }
}

Storm兼容性示例

译者:flink.sojb.cn

您可以在Maven模块中找到更多示例flink-storm-examples。有关不同版本的WordCount,请参阅README.md。要运行示例,您需要组装正确的jar文件。 flink-storm-examples-1.7-SNAPSHOT.jarno / not作业执行有效的jar文件(这仅仅是一个标准的Maven神器)。

有嵌入式喷口和螺栓,即例如罐WordCount-SpoutSource.jarWordCount-BoltTokenizer.jar分别。比较pom.xml看两个罐子是如何构建的。此外,整个Storm拓扑(WordCount-StormTopology.jar)有一个例子。

您可以通过运行这些示例中的每一个bin/flink run &lt;jarname&gt;.jar。每个jar的清单文件中都包含正确的入口点类。



回到顶部