demo测试代码
public class Main {
static int inCount = 0;
static int runCount = 0;
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(1000);
Timer timer = new Timer();
for(int i=1;i<=1000;i++){
TimerTask timerTask = new TimerTask(i,()->{
countDownLatch.countDown();
int index = addRun();
System.out.println(index+"----------执行");
});
timer.addTask(timerTask);
System.out.println(i+"++++++++++加入");
inCount++;
}
TimerTask timerTask = new TimerTask(5000,()->{
countDownLatch.countDown();
int index = addRun();
System.out.println(index+"----------执行");
});
timer.addTask(timerTask);
try {
countDownLatch.await();
System.out.println("inCount" + inCount);
System.out.println("runCount" + runCount);
} catch (Exception e){
e.printStackTrace();
}
}
public synchronized static int addRun(){
runCount++;
return runCount;
}
}
时间轮定时器
**
* 定时器
*/
public class Timer {
/**
* 底层时间轮
*/
private TimeWheel timeWheel;
/**
* 一个Timer只有一个delayQueue
*/
private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
/**
* 过期任务执行线程
*/
private ExecutorService workerThreadPool;
/**
* 轮询delayQueue获取过期任务线程
*/
private ExecutorService bossThreadPool;
/**
* 构造函数
*/
public Timer() {
timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);
workerThreadPool = Executors.newFixedThreadPool(100);
bossThreadPool = Executors.newFixedThreadPool(1);
//20ms获取一次过期任务
bossThreadPool.submit(() -> {
while (true) {
this.advanceClock(20);
}
});
}
/**
* 添加任务
*/
public void addTask(TimerTask timerTask) {
//添加失败任务直接执行
if (!timeWheel.addTask(timerTask)) {
workerThreadPool.submit(timerTask.getTask());
}
}
/**
* 获取过期任务
*/
private void advanceClock(long timeout) {
try {
TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
if (timerTaskList != null) {
//推进时间
timeWheel.advanceClock(timerTaskList.getExpiration());
//执行过期任务(包含降级操作)
timerTaskList.flush(this::addTask);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
时间轮
/**
* 时间轮
*/
public class TimeWheel {
/**
* 一个时间槽的范围
*/
private long tickMs;
/**
* 时间轮大小
*/
private int wheelSize;
/**
* 时间跨度
*/
private long interval;
/**
* 时间槽
*/
private TimerTaskList[] timerTaskLists;
/**
* 当前时间
*/
private long currentTime;
/**
* 上层时间轮
*/
private volatile TimeWheel overflowWheel;
/**
* 一个Timer只有一个delayQueue
*/
private DelayQueue<TimerTaskList> delayQueue;
public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
this.currentTime = currentTime;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs * wheelSize;
this.timerTaskLists = new TimerTaskList[wheelSize];
//currentTime为tickMs的整数倍 这里做取整操作
this.currentTime = currentTime - (currentTime % tickMs);
this.delayQueue = delayQueue;
for (int i = 0; i < wheelSize; i++) {
timerTaskLists[i] = new TimerTaskList();
}
}
/**
* 创建或者获取上层时间轮
*/
private TimeWheel getOverflowWheel() {
if (overflowWheel == null) {
synchronized (this) {
if (overflowWheel == null) {
overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);
}
}
}
return overflowWheel;
}
/**
* 添加任务到时间轮
*/
public boolean addTask(TimerTask timerTask) {
long expiration = timerTask.getDelayMs();
//过期任务直接执行
if (expiration < currentTime + tickMs) {
return false;
} else if (expiration < currentTime + interval) {
//当前时间轮可以容纳该任务 加入时间槽
Long virtualId = expiration / tickMs;
int index = (int) (virtualId % wheelSize);
System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration);
TimerTaskList timerTaskList = timerTaskLists[index];
timerTaskList.addTask(timerTask);
if (timerTaskList.setExpiration(virtualId * tickMs)) {
//添加到delayQueue中
delayQueue.offer(timerTaskList);
}
} else {
//放到上一层的时间轮
TimeWheel timeWheel = getOverflowWheel();
timeWheel.addTask(timerTask);
}
return true;
}
/**
* 推进时间
*/
public void advanceClock(long timestamp) {
if (timestamp >= currentTime + tickMs) {
currentTime = timestamp - (timestamp % tickMs);
if (overflowWheel != null) {
//推进上层时间轮时间
this.getOverflowWheel().advanceClock(timestamp);
}
}
}
}
任务对象
/**
* 任务
*/
public class TimerTask {
/**
* 延迟时间
*/
private long delayMs;
/**
* 任务
*/
private Runnable task;
/**
* 时间槽
*/
protected TimerTaskList timerTaskList;
/**
* 下一个节点
*/
protected TimerTask next;
/**
* 上一个节点
*/
protected TimerTask pre;
/**
* 描述
*/
public String desc;
public TimerTask(long delayMs, Runnable task) {
this.delayMs = System.currentTimeMillis() + delayMs;
this.task = task;
this.timerTaskList = null;
this.next = null;
this.pre = null;
}
public Runnable getTask() {
return task;
}
public long getDelayMs() {
return delayMs;
}
@Override
public String toString() {
return desc;
}
}
任务集合
/**
* 时间槽
*/
public class TimerTaskList implements Delayed {
/**
* 过期时间
*/
private AtomicLong expiration = new AtomicLong(-1L);
/**
* 根节点
*/
private TimerTask root = new TimerTask(-1L, null);
{
root.pre = root;
root.next = root;
}
/**
* 设置过期时间
*/
public boolean setExpiration(long expire) {
return expiration.getAndSet(expire) != expire;
}
/**
* 获取过期时间
*/
public long getExpiration() {
return expiration.get();
}
/**
* 新增任务
*/
public void addTask(TimerTask timerTask) {
synchronized (this) {
if (timerTask.timerTaskList == null) {
timerTask.timerTaskList = this;
TimerTask tail = root.pre;
timerTask.next = root;
timerTask.pre = tail;
tail.next = timerTask;
root.pre = timerTask;
}
}
}
/**
* 移除任务
*/
public void removeTask(TimerTask timerTask) {
synchronized (this) {
if (timerTask.timerTaskList.equals(this)) {
timerTask.next.pre = timerTask.pre;
timerTask.pre.next = timerTask.next;
timerTask.timerTaskList = null;
timerTask.next = null;
timerTask.pre = null;
}
}
}
/**
* 重新分配
*/
public synchronized void flush(Consumer<TimerTask> flush) {
TimerTask timerTask = root.next;
while (!timerTask.equals(root)) {
this.removeTask(timerTask);
flush.accept(timerTask);
timerTask = root.next;
}
expiration.set(-1L);
}
@Override
public long getDelay(TimeUnit unit) {
return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
}
@Override
public int compareTo(Delayed o) {
if (o instanceof TimerTaskList) {
return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
}
return 0;
}
}