首页 > 自考资讯 > 自考知识

scheduler dispatch,schedule system

头条共创 2024-07-05

最近和一个人聊天,有人问我一个问题。 《ScheduledExecutorService 的原理是什么?》

我知道这一点,但说实话,我没有详细看实现原理,所以我就一脸困惑地走了。

让我们从代码开始今天的旅程。

ScheduledExecutorService selectedExecutorService=new ScheduledThreadPoolExecutor(1);

这里,我们将为线程池1创建一个调度任务。

//1秒后启动,每3秒运行一次service.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + 'Run start ') ; }}, 1, 3, TimeUnit.SECONDS);

这里Runnable 每三次执行一次。

这是一种简单实用的安排任务服务的方法。我们来仔细分析一下源码。

public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());} 如果进入源码可以看到,这里的队列采用了我理解的延迟队列实现。

静态类DelayedWorkQueue 扩展AbstractQueueRunnable 实现BlockingQueueRunnable {

如果你按照源码分析,你会发现底层实际上是一个BlockingQueue(源码分析将在下一部分介绍)。

这是源代码:

/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */public ScheduledFuture?scheduleAtFixedRate(可执行命令,长初始延迟,长持续时间,TimeUnit 单位) { if (命令)==null || 单位==null)抛出新的NullPointerException();抛出新的IllegalArgumentException();sft=新的ScheduledFutureTaskVoid(命令,null,triggerTime(initialDelay,单位),unit.toNanos(期间) )); 返回RunnableScheduledFutureVoid t=decorateTask(command, sft)=t;

从代码分析的角度来看,delayedExecute(t);是重要的代码。事实上,该方法的名称已经标准化,因此您一眼就能看出它的作用。

我不会解释这两个标准,因为它们并不重要。

首先,让我们看看创建一个sft 对象。首先我们来分析一下参数的含义

第一个对象:显然是一个可执行对象

第二个对象:结果。从Future中可以看到,任务执行返回了一个返回值,但是这里传递的值是null,所以我们不需要关注。

第三个对象:触发时间。这可以从triggerTime看出为下次执行时间。

第四个对象:时间单位,或间隔时间,以纳秒为单位计算。

sft对象创建后,执行过程就开始了。看到这里,可能会出现一个问题:任务是如何执行的?

在这个问题中,我们将继续查看laidExecute 方法。

私有无效延迟执行(RunnableScheduledFuture?task){ if(isShutdown())拒绝(任务); else { super.getQueue()。add(任务); if(isShutdown()!canRunInCurrentRunState(task.isPeriodic())))任务.取消(假);否则确保Prestart();

重要的部分是ensurePrestart,所以我们直接进入这个方法。

确保voidPrestart() { int wc=workerCountOf(ctl.get()); if (wc corePoolSize) addWorker(null, true) else if (wc==0) addWorker(null, false);}

继续输入键码addWorker

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;) { int c=ctl.get(); int rs=runStateOf(c); //仅在必要时检查队列是否为空Masu.(rs==SHUTDOWN firstTask==null !workQueue.isEmpty())) return false; for (;) { int wc=workCountOf(c); if (wc=CAPACITY || wc=(core ? corePoolSize :minimumPoolSize ) )) return false; if (compareAndIncrementWorkerCount(c)) Break retry; c=ctl.get(); //重新加载ctl if (runStateOf(c) !=rs) 然后重试。改变;重试内部循环}}布尔workerStarted=false;worker w=null; mainLock=this.mainLock.lock(); try { //持有锁时重新检查。 //如果ThreadFactory 失败或在获取锁之前关闭。 if (rs SHUTDOWN || (rs==SHUTDOWN firstTask==null)) { if (t.isAlive()) //预先检查t 是否可以启动throw new IllegalThreadStateException(); ); if (smax 池大小)=s; } } 最后{ mainLock.unlock() } if (workerAdded) { t.start() } } } (!workerStarted) addWorkerFailed(w) } return workerStarted ;}

问题:线程的状态是如何标记的?

f0f444cc-c08c-48ae-b406-f503c4dfb8b9~noop.image?_iz=58558&from=article.pc_detail&lk3s=953192f4&x-expires=1720775358&x-signature=avaTPBOFj3t%2BE%2BO9H%2BqO10dk1do%3D

基本上,从897和898我们可以推断线程状态是用AtomicInteger标记的。

1b2d982b-c0d9-41d0-a377-927867a32cc6~noop.image?_iz=58558&from=article.pc_detail&lk3s=953192f4&x-expires=1720775358&x-signature=7fcIBd5xPlMGEQwWuSM3OABGwJw%3D

事实上,这两个循环只有一个目的:获取当前线程是否可运行。

关键代码是:

if (compareAndIncrementWorkerCount(c)) Break retry; 这里可以看到循环直到CAS计算才中断。

这是我的第一个问题。如果上一个任务没有完成,CPU 是否总是会等待?您是否见过CPU 使用率激增的情况?

退出上面的循环并实例化Worker对象后,我们进入保证原子性的加锁控制部分。

if (rs SHUTDOWN || (rs==SHUTDOWN firstTask==null)) { if (t.isAlive()) //预先检查t 是否可以启动throw new IllegalThreadStateException(); ); 如果(smaxpoolsize)maxpoolsize=s;}

从这里我们可以看出,添加worker对象的前提是线程状态为:

rs SHUTDOWN ||(rs==SHUTDOWN firstTask==null)) 然后调用t.start();

此时,线程开始执行其任务。

版权声明:本文由今日头条转载,如有侵犯您的版权,请联系本站编辑删除。

猜你喜欢