跳转至

第三章:DataStream API

DataStream 程序结构

// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2. 创建数据源
DataStream<String> source = env.socketTextStream("localhost", 9999);

// 3. 数据转换
DataStream<Tuple2<String, Integer>> counts = source
    .flatMap(new Tokenizer())
    .keyBy(tuple -> tuple.f0)
    .sum(1);

// 4. 输出结果
counts.print();

// 5. 执行作业
env.execute("WordCount");

数据源

内置数据源

// Socket 数据源
DataStream<String> socket = env.socketTextStream("localhost", 9999);

// 文件数据源
DataStream<String> file = env.readTextFile("/path/to/file");

// 集合数据源
DataStream<String> collection = env.fromElements("a", "b", "c");

// 并行数据源
DataStream<Long> sequence = env.generateSequence(1, 100);

Kafka 数据源

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "flink-consumer");

DataStream<String> kafka = env
    .addSource(new FlinkKafkaConsumer<>(
        "topic",
        new SimpleStringSchema(),
        props
    ));

自定义数据源

public class CustomSource implements SourceFunction<Event> {
    private volatile boolean running = true;

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        while (running) {
            Event event = generateEvent();
            ctx.collect(event);
            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

DataStream<Event> source = env.addSource(new CustomSource());

转换操作

基本转换

// Map
DataStream<Integer> mapped = source.map(s -> s.length());

// FlatMap
DataStream<String> flatMapped = source.flatMap((s, out) -> {
    for (String word : s.split(" ")) {
        out.collect(word);
    }
});

// Filter
DataStream<String> filtered = source.filter(s -> s.length() > 3);

KeyBy 分组

// 按 Key 分组
KeyedStream<Tuple2<String, Integer>, String> keyed = source
    .keyBy(tuple -> tuple.f0);

// 按位置分组
KeyedStream<Tuple2<String, Integer>, String> keyed = source.keyBy(0);

聚合操作

// Sum
DataStream<Tuple2<String, Integer>> sum = keyed.sum(1);

// Min/Max
DataStream<Tuple2<String, Integer>> min = keyed.min(1);
DataStream<Tuple2<String, Integer>> max = keyed.max(1);

// Reduce
DataStream<Tuple2<String, Integer>> reduced = keyed.reduce(
    (a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1)
);

多流转换

// Union 合并
DataStream<String> union = stream1.union(stream2, stream3);

// Connect 连接
ConnectedStreams<String, Integer> connected = stream1.connect(stream2);

DataStream<String> result = connected
    .map(new CoMapFunction<String, Integer, String>() {
        @Override
        public String map1(String value) {
            return "String: " + value;
        }

        @Override
        public String map2(Integer value) {
            return "Integer: " + value;
        }
    });

数据输出

标准输出

// 打印到控制台
stream.print();

// 打印到错误流
stream.printToErr();

文件输出

// 写入文件
stream.writeAsText("/path/to/output");

// 写入 CSV
stream.writeAsCsv("/path/to/output");

Kafka 输出

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");

stream.addSink(new FlinkKafkaProducer<>(
    "topic",
    new SimpleStringSchema(),
    props
));

自定义 Sink

public class CustomSink implements SinkFunction<Event> {
    @Override
    public void invoke(Event value, Context context) {
        // 写入外部系统
        writeToDatabase(value);
    }
}

stream.addSink(new CustomSink());

小结

DataStream API 要点:

  • 程序结构:环境、数据源、转换、输出、执行
  • 数据源:内置、Kafka、自定义
  • 转换操作:Map、FlatMap、Filter、KeyBy、聚合
  • 数据输出:标准输出、文件、Kafka、自定义

下一章我们将学习窗口与时间。