Flink之Task重启策略

发布时间:2024年01月09日

Task重启策略

1 策略API
  • noRestart

    无参数,task失败后不重启,整个job同时失败,默认策略.

    代码示例

    RestartStrategies.noRestart();
    
  • fixedDelayRestart

    参数注释
    restartAttempts最大重启次数
    delayBetweenAttempts重启时间间隔

    代码示例

    // 最多重启5次,每次任务失败后间隔1s重启
    RestartStrategies.fixedDelayRestart(5, 1000);
    
  • exponentialDelayRestart

    参数注释
    initialBackoff重启间隔惩罚时长初始值(重启延迟时间)
    maxBackoff重启间隔最大惩罚时长
    backoffMultiplier重启间隔时长的惩罚倍数
    resetBackoffThreshold重置惩罚时长的平稳运行时长(平稳运行时长达到这个阈值后,再次发生故障则重启延迟时间恢复到初始值)
    jitterFactor取一个随机数,来加在重启时间点上,已让每次重启的时间呈现一定随机性(避免某一时刻集群中有大量的task同时重启,如果task重启时间是规律性的就可能发生这种情况)

    代码示例

    // 第一次失败后间隔1s重启任务,如果稳定运行时长没有达到120s就发生task失败,则重启间隔时长=上一次重启间隔时长*1.2,如果稳定运行时长超过120秒则重启间隔时长恢复到1s
    RestartStrategies.exponentialDelayRestart(Time.seconds(1), Time.seconds(30), 1.2, Time.seconds(120), 0.56);
    
  • failureRateRestart

    参数注释
    failureRate指定时间范围内的最大Task任务失败率(次数)
    failureInterval指定时间范围
    delayInterval重启时间间隔

    代码示例

    // task失败重启间隔为1s,只要在30分钟内task失败重启次数没超过3次就可以一直执行这个策略,如果超过则job停止
    RestartStrategies.failureRateRestart(3, Time.minutes(30), Time.seconds(1));
    
  • fallBackRestart

    无参数,常用于自定义的RestartStrategy,即用户自定义了重启策略,且将其配置在了flink-conf.yaml文件中,也就是说调用这个方法时会读取集群的配置文件,根据配置文件的内容调整策略

    代码示例

    RestartStrategies.fallBackRestart();
    
2 代码详情
public class FlinkCheckpoint {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启并设置checkpoint的时间间隔
        env.enableCheckpointing(3000);
        // 设置checkpoint的存储位置
        env.getCheckpointConfig().setCheckpointStorage(new Path("hdfs://lx01:8020/flink-ck"));
        // 允许checkpoint失败的最大次数
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        // checkpoint的算法模式,是否需要对其(EXACTLY_ONCE)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // job取消是否保留checkpoint数据
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 设置checkpoint对齐的超时时间
        env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofMillis(10000));
        // 两次checkpoint的最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
        // 并行最大的checkpoint数
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);
        // 选择后端状态(默认HashMapStateBackend)
        env.setStateBackend(new EmbeddedRocksDBStateBackend());

        // TODO Task重启策略
        RestartStrategies.RestartStrategyConfiguration restartStrategy = null;
        // 第一次失败后间隔1s重启任务,如果稳定运行时长没有达到120s就发生task失败,则重启间隔时长=上一次重启间隔时长*1.2,如果稳定运行时长超过120秒则重启间隔时长恢复到1s
        restartStrategy = RestartStrategies.exponentialDelayRestart(Time.seconds(1), Time.seconds(30), 1.2, Time.seconds(120), 0.56);

        // 配置Task重启策略
        env.setRestartStrategy(restartStrategy);
      
        // ...业务代码
      
        env.execute();
    }
}
文章来源:https://blog.csdn.net/AnameJL/article/details/135484753
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。