noRestart
无参数,task失败后不重启,整个job同时失败,默认策略.
代码示例
RestartStrategies.noRestart();
fixedDelayRestart
参数 | 注释 |
---|---|
restartAttempts | 最大重启次数 |
delayBetweenAttempts | 重启时间间隔 |
代码示例
// 最多重启5次,每次任务失败后间隔1s重启
RestartStrategies.fixedDelayRestart(5, 1000);
exponentialDelayRestart
参数 | 注释 |
---|---|
initialBackoff | 重启间隔惩罚时长初始值(重启延迟时间) |
maxBackoff | 重启间隔最大惩罚时长 |
backoffMultiplier | 重启间隔时长的惩罚倍数 |
resetBackoffThreshold | 重置惩罚时长的平稳运行时长(平稳运行时长达到这个阈值后,再次发生故障则重启延迟时间恢复到初始值) |
jitterFactor | 取一个随机数,来加在重启时间点上,已让每次重启的时间呈现一定随机性(避免某一时刻集群中有大量的task同时重启,如果task重启时间是规律性的就可能发生这种情况) |
代码示例
// 第一次失败后间隔1s重启任务,如果稳定运行时长没有达到120s就发生task失败,则重启间隔时长=上一次重启间隔时长*1.2,如果稳定运行时长超过120秒则重启间隔时长恢复到1s
RestartStrategies.exponentialDelayRestart(Time.seconds(1), Time.seconds(30), 1.2, Time.seconds(120), 0.56);
failureRateRestart
参数 | 注释 |
---|---|
failureRate | 指定时间范围内的最大Task任务失败率(次数) |
failureInterval | 指定时间范围 |
delayInterval | 重启时间间隔 |
代码示例
// task失败重启间隔为1s,只要在30分钟内task失败重启次数没超过3次就可以一直执行这个策略,如果超过则job停止
RestartStrategies.failureRateRestart(3, Time.minutes(30), Time.seconds(1));
fallBackRestart
无参数,常用于自定义的RestartStrategy,即用户自定义了重启策略,且将其配置在了flink-conf.yaml文件中,也就是说调用这个方法时会读取集群的配置文件,根据配置文件的内容调整策略
代码示例
RestartStrategies.fallBackRestart();
public class FlinkCheckpoint {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启并设置checkpoint的时间间隔
env.enableCheckpointing(3000);
// 设置checkpoint的存储位置
env.getCheckpointConfig().setCheckpointStorage(new Path("hdfs://lx01:8020/flink-ck"));
// 允许checkpoint失败的最大次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// checkpoint的算法模式,是否需要对其(EXACTLY_ONCE)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// job取消是否保留checkpoint数据
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置checkpoint对齐的超时时间
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofMillis(10000));
// 两次checkpoint的最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
// 并行最大的checkpoint数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);
// 选择后端状态(默认HashMapStateBackend)
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// TODO Task重启策略
RestartStrategies.RestartStrategyConfiguration restartStrategy = null;
// 第一次失败后间隔1s重启任务,如果稳定运行时长没有达到120s就发生task失败,则重启间隔时长=上一次重启间隔时长*1.2,如果稳定运行时长超过120秒则重启间隔时长恢复到1s
restartStrategy = RestartStrategies.exponentialDelayRestart(Time.seconds(1), Time.seconds(30), 1.2, Time.seconds(120), 0.56);
// 配置Task重启策略
env.setRestartStrategy(restartStrategy);
// ...业务代码
env.execute();
}
}