Apache Cassandra连接器
此连接器提供将数据写入Apache Cassandra数据库的接收器。
要使用此连接器,请将以下依赖项添加到项目中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
请注意,流连接器当前不是二进制分发的一部分。在此处了解如何与群集执行链接。
安装Apache Cassandra
有多种方法可以在本地计算机上启动Cassandra实例:
- 按照Cassandra入门页面上的说明进行 算子操作。
- 从官方Docker Repository启动一个运行Cassandra的容器
Cassandra Sinks
配置
Flink的Cassandra接收器是使用静态CassandraSink.addSink(DataStream)创建的<in>输入法。这个方法返回一个CassandraSinkBuilder,它提供了进一步配置接收器的方法,最后是build()
接收器实例。</in>
可以使用以下配置方法:
- setQuery(字符串查询)
- 设置为接收器接收的每个记录执行的upsert查询。
- 该查询在内部被视为CQL语句。
- DO设置upsert查询以处理Tuple数据类型。
- 请勿设置查询以处理POJO数据类型。
- setClusterBuilder()
- 将用于配置与cassandra的连接的集群构建器设置为更复杂的设置,例如一致性级别,重试策略等。
- setHost(String host [,int port])
- setClusterBuilder()的简单版本,包含连接到Cassandra实例的主机/端口信息
- setMapperOptions(MapperOptions选项)
- 设置用于配置DataStax ObjectMapper的映射器选项。
- 仅在处理POJO数据类型时适用。
- enableWriteAheadLog([CheckpointCommitter committer])
- 一个Optional设置
- 允许对非确定性算法进行一次性处理。
- 建立()
- 完成配置并构造CassandraSink实例。
预写日志
检查点提交者在某些资源中存储有关已完成检查点的其他信息。此信息用于防止在发生故障时完整重播最后完成的检查点。您可以使用a CassandraCommitter
将它们存储在cassandra的单独表中。请注意,Flink不会清理此表。
如果查询是幂等的(意味着可以多次应用而不更改结果)并且启用了检查点,则Flink可以提供一次性保证。如果发生故障,将完全重播失败的检查点。
此外,对于非确定性程序,必须启用预写日志。对于这样的程序,重放检查点可能与先前的尝试完全不同,这可能使数据库处于不一致状态,因为可能已经写入了第一次尝试的部分。预写日志保证重放的检查点与第一次尝试相同。请注意,启用此函数会对延迟产生负面影响。
注意:预写日志函数目前是实验性的。在许多情况下,使用连接器而不启用它就足够了。请将问题报告给开发邮件列表。
检查点和容错
启用检查点后,Cassandra Sink保证至少一次向C *实例传递 算子操作请求。
例子
Cassandra接收器当前支持Tuple和POJO数据类型,Flink自动检测使用哪种类型的输入。有关那些流数据类型的一般用例,请参阅支持的数据类型。我们分别针对Pojo和Tuple数据类型展示了基于SocketWindowWordCount的两个实现。
在所有这些示例中,我们假设已创建关联的Keyspace example
和表wordcount
。
CREATE KEYSPACE IF NOT EXISTS example
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
CREATE TABLE IF NOT EXISTS example.wordcount (
word text,
count bigint,
PRIMARY KEY(word)
);
用于流式元组数据类型的Cassandra Sink示例
在将结果与Java / Scala Tuple数据类型存储到Cassandra接收器时,需要设置CQL upsert语句(通过setQuery('stmt'))将每条记录保存回数据库。将upsert查询缓存为PreparedStatement
,每个Tuple数据元都转换为语句的参数。
有关细节PreparedStatement
和BoundStatement
信息,请访问DataStax Java驱动程序手册
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<Tuple2<String, Long>> result = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
// normalize and split the line
String[] words = value.toLowerCase().split("\\s");
// emit the pairs
for (String word : words) {
//Do not accept empty word, since word is defined as primary key in C* table
if (!word.isEmpty()) {
out.collect(new Tuple2<String, Long>(word, 1L));
}
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
CassandraSink.addSink(result)
.setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
.setHost("127.0.0.1")
.build();
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
// parse the data, group it, window it, and aggregate the counts val result: DataStream[(String, Long)] = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(_.toLowerCase.split("\\s"))
.filter(_.nonEmpty)
.map((_, 1L))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
CassandraSink.addSink(result)
.setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
.setHost("127.0.0.1")
.build()
result.print().setParallelism(1)
用于流式传输POJO数据类型的Cassandra Sink示例
流式传输POJO数据类型并将相同的POJO实体存储回Cassandra的示例。此外,此POJO实现需要遵循DataStax Java驱动程序手册来注释类,因为此实体的每个字段都使用DataStax Java Driver com.datastax.driver.mapping.Mapper
类映射到指定表的关联列。
可以通过放置在Pojo类中的字段声明上的注释来定义每个表列的映射。有关映射的详细信息,请参阅有关映射类和CQL数据类型定义的 CQL文档
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordCount> result = text
.flatMap(new FlatMapFunction<String, WordCount>() {
public void flatMap(String value, Collector<WordCount> out) {
// normalize and split the line
String[] words = value.toLowerCase().split("\\s");
// emit the pairs
for (String word : words) {
if (!word.isEmpty()) {
//Do not accept empty word, since word is defined as primary key in C* table
out.collect(new WordCount(word, 1L));
}
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordCount>() {
@Override
public WordCount reduce(WordCount a, WordCount b) {
return new WordCount(a.getWord(), a.getCount() + b.getCount());
}
});
CassandraSink.addSink(result)
.setHost("127.0.0.1")
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
@Table(keyspace = "example", name = "wordcount")
public class WordCount {
@Column(name = "word")
private String word = "";
@Column(name = "count")
private long count = 0;
public WordCount() {}
public WordCount(String word, long count) {
this.setWord(word);
this.setCount(count);
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
@Override
public String toString() {
return getWord() + " : " + getCount();
}
}