跳转至

第五章:状态管理

状态类型

Keyed State

public class CountFunction extends KeyedProcessFunction<String, Event, Long> {
    // ValueState
    private ValueState<Long> countState;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
            "count",
            Long.class
        );
        countState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(Event value, Context ctx, Collector<Long> out) throws Exception {
        Long count = countState.value();
        if (count == null) {
            count = 0L;
        }
        count++;
        countState.update(count);
        out.collect(count);
    }
}

Operator State

public class BufferSink implements SinkFunction<Event>, CheckpointedFunction {
    private ListState<Event> bufferState;
    private List<Event> buffer = new ArrayList<>();

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>(
            "buffer",
            Event.class
        );
        bufferState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Event event : bufferState.get()) {
                buffer.add(event);
            }
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        bufferState.clear();
        bufferState.addAll(buffer);
    }

    @Override
    public void invoke(Event value, Context context) {
        buffer.add(value);
        if (buffer.size() >= 1000) {
            flush();
        }
    }
}

状态类型

ValueState

// 单值状态
ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>("state", String.class);
ValueState<String> state = getRuntimeContext().getState(descriptor);

// 操作
state.update("value");  // 更新
String value = state.value();  // 获取
state.clear();  // 清除

ListState

// 列表状态
ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("state", String.class);
ListState<String> state = getRuntimeContext().getListState(descriptor);

// 操作
state.add("value");  // 添加
Iterable<String> values = state.get();  // 获取
state.update(Arrays.asList("a", "b"));  // 更新
state.clear();  // 清除

MapState

// Map 状态
MapStateDescriptor<String, Integer> descriptor = new MapStateDescriptor<>("state", String.class, Integer.class);
MapState<String, Integer> state = getRuntimeContext().getMapState(descriptor);

// 操作
state.put("key", 1);  // 添加
Integer value = state.get("key");  // 获取
state.remove("key");  // 删除
state.clear();  // 清除

ReducingState

// 聚合状态
ReducingStateDescriptor<Long> descriptor = new ReducingStateDescriptor<>(
    "state",
    (a, b) -> a + b,  // ReduceFunction
    Long.class
);
ReducingState<Long> state = getRuntimeContext().getReducingState(descriptor);

// 操作
state.add(1L);  // 添加并聚合
Long value = state.get();  // 获取聚合结果

AggregatingState

// 聚合状态
AggregatingStateDescriptor<Event, Long, Long> descriptor = new AggregatingStateDescriptor<>(
    "state",
    new AggregateFunction<Event, Long, Long>() {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return a + b;
        }
    },
    Long.class
);
AggregatingState<Event, Long> state = getRuntimeContext().getAggregatingState(descriptor);

状态后端

MemoryStateBackend

// 内存状态后端(默认)
env.setStateBackend(new MemoryStateBackend());

FsStateBackend

// 文件系统状态后端
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));

RocksDBStateBackend

// RocksDB 状态后端(推荐用于大状态)
env.setStateBackend(new RocksDBStateBackend("hdfs:///flink/checkpoints"));

状态 TTL

// 配置状态 TTL
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>("state", String.class);
descriptor.enableTimeToLive(ttlConfig);

小结

状态管理要点:

  • 状态类型:Keyed State、Operator State
  • 状态存储:ValueState、ListState、MapState
  • 状态后端:Memory、FileSystem、RocksDB
  • 状态 TTL:自动清理过期状态

下一章我们将学习容错机制。