跳转至

第六章:容错机制

Checkpoint

开启 Checkpoint

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 开启 Checkpoint,间隔 1 秒
env.enableCheckpointing(1000);

// Checkpoint 配置
CheckpointConfig config = env.getCheckpointConfig();

// 设置模式
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 设置超时时间
config.setCheckpointTimeout(60000);

// 设置最小间隔
config.setMinPauseBetweenCheckpoints(500);

// 设置最大并发
config.setMaxConcurrentCheckpoints(1);

// 设置最小延迟
config.setTolerableCheckpointFailureNumber(3);

Checkpoint 存储

// 设置 Checkpoint 目录
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");

// 或使用 RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");

Savepoint

创建 Savepoint

# 触发 Savepoint
./bin/flink savepoint <job-id> [target-directory]

# 示例
./bin/flink savepoint abc123 hdfs:///flink/savepoints

从 Savepoint 恢复

# 从 Savepoint 恢复作业
./bin/flink run -s hdfs:///flink/savepoints/savepoint-abc123 -c com.example.Job job.jar

取消时创建 Savepoint

# 取消作业并创建 Savepoint
./bin/flink cancel -s [target-directory] <job-id>

Exactly-Once 语义

两阶段提交

public class ExactlyOnceSink extends TwoPhaseCommitSinkFunction<Event, Transaction, Void> {

    public ExactlyOnceSink() {
        super(new KryoSerializer<>(Transaction.class, env.getConfig()), VoidSerializer.INSTANCE);
    }

    @Override
    protected Transaction beginTransaction() {
        // 开启事务
        return new Transaction();
    }

    @Override
    protected void invoke(Transaction transaction, Event value, Context context) {
        // 写入数据(预提交)
        transaction.write(value);
    }

    @Override
    protected void preCommit(Transaction transaction) {
        // 预提交
        transaction.flush();
    }

    @Override
    protected void commit(Transaction transaction) {
        // 提交事务
        transaction.commit();
    }

    @Override
    protected void abort(Transaction transaction) {
        // 回滚事务
        transaction.rollback();
    }
}

Kafka Exactly-Once

// 生产者配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("transaction.timeout.ms", "900000");

FlinkKafkaProducer<Event> producer = new FlinkKafkaProducer<>(
    "topic",
    new EventSerializer(),
    props,
    FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);

stream.addSink(producer);

故障恢复

自动恢复

# flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10s

恢复策略

// 固定延迟重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    3,  // 重启次数
    Time.seconds(10)  // 延迟时间
));

// 指数退避重启
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
    Time.seconds(1),  // 初始延迟
    Time.seconds(30),  // 最大延迟
    2.0  // 退避因子
));

// 失败率重启
env.setRestartStrategy(RestartStrategies.failureRateRestart(
    3,  // 失败率
    Time.minutes(5),  // 时间窗口
    Time.seconds(10)  // 延迟
));

// 不重启
env.setRestartStrategy(RestartStrategies.noRestart());

小结

容错机制要点:

  • Checkpoint:定期保存状态快照
  • Savepoint:手动触发的状态快照
  • Exactly-Once:两阶段提交
  • 故障恢复:自动重启策略

下一章我们将学习 Flink SQL。