第三章:DataStream API¶
DataStream 程序结构¶
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 创建数据源
DataStream<String> source = env.socketTextStream("localhost", 9999);
// 3. 数据转换
DataStream<Tuple2<String, Integer>> counts = source
.flatMap(new Tokenizer())
.keyBy(tuple -> tuple.f0)
.sum(1);
// 4. 输出结果
counts.print();
// 5. 执行作业
env.execute("WordCount");
数据源¶
内置数据源¶
// Socket 数据源
DataStream<String> socket = env.socketTextStream("localhost", 9999);
// 文件数据源
DataStream<String> file = env.readTextFile("/path/to/file");
// 集合数据源
DataStream<String> collection = env.fromElements("a", "b", "c");
// 并行数据源
DataStream<Long> sequence = env.generateSequence(1, 100);
Kafka 数据源¶
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "flink-consumer");
DataStream<String> kafka = env
.addSource(new FlinkKafkaConsumer<>(
"topic",
new SimpleStringSchema(),
props
));
自定义数据源¶
public class CustomSource implements SourceFunction<Event> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (running) {
Event event = generateEvent();
ctx.collect(event);
Thread.sleep(100);
}
}
@Override
public void cancel() {
running = false;
}
}
DataStream<Event> source = env.addSource(new CustomSource());
转换操作¶
基本转换¶
// Map
DataStream<Integer> mapped = source.map(s -> s.length());
// FlatMap
DataStream<String> flatMapped = source.flatMap((s, out) -> {
for (String word : s.split(" ")) {
out.collect(word);
}
});
// Filter
DataStream<String> filtered = source.filter(s -> s.length() > 3);
KeyBy 分组¶
// 按 Key 分组
KeyedStream<Tuple2<String, Integer>, String> keyed = source
.keyBy(tuple -> tuple.f0);
// 按位置分组
KeyedStream<Tuple2<String, Integer>, String> keyed = source.keyBy(0);
聚合操作¶
// Sum
DataStream<Tuple2<String, Integer>> sum = keyed.sum(1);
// Min/Max
DataStream<Tuple2<String, Integer>> min = keyed.min(1);
DataStream<Tuple2<String, Integer>> max = keyed.max(1);
// Reduce
DataStream<Tuple2<String, Integer>> reduced = keyed.reduce(
(a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1)
);
多流转换¶
// Union 合并
DataStream<String> union = stream1.union(stream2, stream3);
// Connect 连接
ConnectedStreams<String, Integer> connected = stream1.connect(stream2);
DataStream<String> result = connected
.map(new CoMapFunction<String, Integer, String>() {
@Override
public String map1(String value) {
return "String: " + value;
}
@Override
public String map2(Integer value) {
return "Integer: " + value;
}
});
数据输出¶
标准输出¶
文件输出¶
Kafka 输出¶
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
stream.addSink(new FlinkKafkaProducer<>(
"topic",
new SimpleStringSchema(),
props
));
自定义 Sink¶
public class CustomSink implements SinkFunction<Event> {
@Override
public void invoke(Event value, Context context) {
// 写入外部系统
writeToDatabase(value);
}
}
stream.addSink(new CustomSink());
小结¶
DataStream API 要点:
- 程序结构:环境、数据源、转换、输出、执行
- 数据源:内置、Kafka、自定义
- 转换操作:Map、FlatMap、Filter、KeyBy、聚合
- 数据输出:标准输出、文件、Kafka、自定义
下一章我们将学习窗口与时间。