第五章:状态管理¶
状态类型¶
Keyed State¶
public class CountFunction extends KeyedProcessFunction<String, Event, Long> {
// ValueState
private ValueState<Long> countState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
"count",
Long.class
);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Event value, Context ctx, Collector<Long> out) throws Exception {
Long count = countState.value();
if (count == null) {
count = 0L;
}
count++;
countState.update(count);
out.collect(count);
}
}
Operator State¶
public class BufferSink implements SinkFunction<Event>, CheckpointedFunction {
private ListState<Event> bufferState;
private List<Event> buffer = new ArrayList<>();
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>(
"buffer",
Event.class
);
bufferState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Event event : bufferState.get()) {
buffer.add(event);
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
bufferState.clear();
bufferState.addAll(buffer);
}
@Override
public void invoke(Event value, Context context) {
buffer.add(value);
if (buffer.size() >= 1000) {
flush();
}
}
}
状态类型¶
ValueState¶
// 单值状态
ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>("state", String.class);
ValueState<String> state = getRuntimeContext().getState(descriptor);
// 操作
state.update("value"); // 更新
String value = state.value(); // 获取
state.clear(); // 清除
ListState¶
// 列表状态
ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("state", String.class);
ListState<String> state = getRuntimeContext().getListState(descriptor);
// 操作
state.add("value"); // 添加
Iterable<String> values = state.get(); // 获取
state.update(Arrays.asList("a", "b")); // 更新
state.clear(); // 清除
MapState¶
// Map 状态
MapStateDescriptor<String, Integer> descriptor = new MapStateDescriptor<>("state", String.class, Integer.class);
MapState<String, Integer> state = getRuntimeContext().getMapState(descriptor);
// 操作
state.put("key", 1); // 添加
Integer value = state.get("key"); // 获取
state.remove("key"); // 删除
state.clear(); // 清除
ReducingState¶
// 聚合状态
ReducingStateDescriptor<Long> descriptor = new ReducingStateDescriptor<>(
"state",
(a, b) -> a + b, // ReduceFunction
Long.class
);
ReducingState<Long> state = getRuntimeContext().getReducingState(descriptor);
// 操作
state.add(1L); // 添加并聚合
Long value = state.get(); // 获取聚合结果
AggregatingState¶
// 聚合状态
AggregatingStateDescriptor<Event, Long, Long> descriptor = new AggregatingStateDescriptor<>(
"state",
new AggregateFunction<Event, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
},
Long.class
);
AggregatingState<Event, Long> state = getRuntimeContext().getAggregatingState(descriptor);
状态后端¶
MemoryStateBackend¶
FsStateBackend¶
RocksDBStateBackend¶
状态 TTL¶
// 配置状态 TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>("state", String.class);
descriptor.enableTimeToLive(ttlConfig);
小结¶
状态管理要点:
- 状态类型:Keyed State、Operator State
- 状态存储:ValueState、ListState、MapState
- 状态后端:Memory、FileSystem、RocksDB
- 状态 TTL:自动清理过期状态
下一章我们将学习容错机制。