第六章:容错机制¶
Checkpoint¶
开启 Checkpoint¶
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启 Checkpoint,间隔 1 秒
env.enableCheckpointing(1000);
// Checkpoint 配置
CheckpointConfig config = env.getCheckpointConfig();
// 设置模式
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置超时时间
config.setCheckpointTimeout(60000);
// 设置最小间隔
config.setMinPauseBetweenCheckpoints(500);
// 设置最大并发
config.setMaxConcurrentCheckpoints(1);
// 设置最小延迟
config.setTolerableCheckpointFailureNumber(3);
Checkpoint 存储¶
// 设置 Checkpoint 目录
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");
// 或使用 RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");
Savepoint¶
创建 Savepoint¶
# 触发 Savepoint
./bin/flink savepoint <job-id> [target-directory]
# 示例
./bin/flink savepoint abc123 hdfs:///flink/savepoints
从 Savepoint 恢复¶
# 从 Savepoint 恢复作业
./bin/flink run -s hdfs:///flink/savepoints/savepoint-abc123 -c com.example.Job job.jar
取消时创建 Savepoint¶
Exactly-Once 语义¶
两阶段提交¶
public class ExactlyOnceSink extends TwoPhaseCommitSinkFunction<Event, Transaction, Void> {
public ExactlyOnceSink() {
super(new KryoSerializer<>(Transaction.class, env.getConfig()), VoidSerializer.INSTANCE);
}
@Override
protected Transaction beginTransaction() {
// 开启事务
return new Transaction();
}
@Override
protected void invoke(Transaction transaction, Event value, Context context) {
// 写入数据(预提交)
transaction.write(value);
}
@Override
protected void preCommit(Transaction transaction) {
// 预提交
transaction.flush();
}
@Override
protected void commit(Transaction transaction) {
// 提交事务
transaction.commit();
}
@Override
protected void abort(Transaction transaction) {
// 回滚事务
transaction.rollback();
}
}
Kafka Exactly-Once¶
// 生产者配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("transaction.timeout.ms", "900000");
FlinkKafkaProducer<Event> producer = new FlinkKafkaProducer<>(
"topic",
new EventSerializer(),
props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
stream.addSink(producer);
故障恢复¶
自动恢复¶
# flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10s
恢复策略¶
// 固定延迟重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.seconds(10) // 延迟时间
));
// 指数退避重启
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.seconds(1), // 初始延迟
Time.seconds(30), // 最大延迟
2.0 // 退避因子
));
// 失败率重启
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 失败率
Time.minutes(5), // 时间窗口
Time.seconds(10) // 延迟
));
// 不重启
env.setRestartStrategy(RestartStrategies.noRestart());
小结¶
容错机制要点:
- Checkpoint:定期保存状态快照
- Savepoint:手动触发的状态快照
- Exactly-Once:两阶段提交
- 故障恢复:自动重启策略
下一章我们将学习 Flink SQL。