Java 模拟实现 定时器 和 线程池
定时器
定时器是软件开发中的一个重要组件.类似于一个"闹钟".达到一个设定的时间之后,就执行某个指定好的代码.
标准库中的定时器
- 标准库中提供了一个 Timer 类(java.util.Timer).Timer 类的核心方法为 schedule
- schedule 包含两个参数.第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后执行(单位为毫秒).
import java.util.Timer;
public class Test {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello timer");
}
},3000);
System.out.println("main");
}
}
定时器的简单实现
- 一个带优先级的阻塞队列
- 为啥要带优先级呢?
- 因为阻塞队列中的任务都有各自的执行时刻 (delay). 最先执行的任务一定是 delay 最小的. 使用带优先级的队列就可以高效的把这个 delay 最小的任务找出来.
- 为啥要带优先级呢?
- 队列中的每个元素是一个 Task 对象.
- Task 中带有一个时间属性, 队首元素就是即将被执行的任务
- 同时有一个 worker 线程一直扫描队首元素, 看队首元素是否需要执行
package hello;
import java.util.concurrent.PriorityBlockingQueue;
//实现Comparable 接口 用于 插入时比较
class MyTask implements Comparable<MyTask>{
//任务干什么
private Runnable runnable;
//任务什么时候干
private long time;
//delay 是一个时间间隔
public MyTask(Runnable runnable, long delay) {
this.runnable = runnable;
this.time = System.currentTimeMillis()+delay;
}
public void run(){
runnable.run();
}
public long getTime(){
return time;
}
@Override
public int compareTo(MyTask o) {
return (int)(this.time - o.time);
}
}
class MyTimer{
//定时器内部能够存放多个任务
//优先级阻塞堆
private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();
public void schedule(Runnable runnable,long delay){
MyTask myTask = new MyTask(runnable, delay);
queue.put(myTask);
//每次添加新的任务时,唤醒线程,再次计算时间
synchronized (locker){
locker.notify();
}
}
private Object locker = new Object();
public MyTimer() {
Thread t = new Thread(()->{
while(true){
try {
//取出队首元素
MyTask task = queue.take();
//比较时间到了没
long curTime = System.currentTimeMillis();
if(curTime < task.getTime()){
//时间没到,放回队列
queue.put(task);
//队首元素为最早执行的任务,阻塞等待队首元素 或者 等待新的任务添加进来后唤醒
//阻塞等待最早的任务执行,防止 一直while(true)循环,无效消耗CPU
synchronized (locker){
locker.wait(task.getTime()-curTime);
}
}else{
//时间到了,执行
task.run();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
}
public class Test {
public static void main(String[] args) {
MyTimer timer = new MyTimer();
timer.schedule(()->{
System.out.println("hello timer");
},3000);
System.out.println("main");
}
}
线程池
什么是线程池
- 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中也是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这就是"池化资源"技术产生的原因.
- 线程池顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销.
标准库中的线程池
- 使用 Executors.newFixedThreadPool(10) 能创建出固定包含10个线程的线程池.
- 返回值类型为 **ExecutorService **
- 通过 ExecutorService.submit 可以注册一个任务到线程池中.
-
ExecutorService pool = Executors.newFixedThreadPool(10); pool.submit(new Runnable() { @Override public void run() { System.out.println("hello"); } });
-
- Executors 创建线程池的几种方式
- newFixedThreadPool: 创建固定线程数的线程池
- newCachedThreadPool: 创建线程数目动态增长的线程池
- newSingleThreadExecutor: 创建只包含单个线程的线程池
- newScheduledThreadPool: 设定延迟时间后执行命令.或者定期执行命令.是进阶版的 Timer.
- ThreadPoolExecutor 的参数说明
- int corePoolSize 核心线程数
- int maximumPoolSize 最大线程数
- long keepAliveTime 允许临时工摸鱼时间
- TimeUnit unit 时间的单位
- BlockingQueue workQueue 任务队列 提供一个 submit方法 让程序员可以把任务注册到线程池中
- RejectedExecutionHandler handler 拒绝策略 用于决定在线程池中任务满的时候,抛弃哪个任务的策略
线程池的简单实现
- 核心操作为 submit, 将任务加入线程池中
- 使用 Worker 类描述一个工作线程. 使用 Runnable 描述一个任务.
- 使用一个 BlockingQueue 组织所有的任务
- 每个 worker 线程要做的事情: 不停的从 BlockingQueue 中取任务并执行.
- 指定一下线程池中的最大线程数 maxWorkerCount; 当当前线程数超过这个最大值时, 就不再新增线程了.
package hello;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 认为内核态效率低,不一定是真的低,而是随着代码进入内核态,就不可控了.内核可能还要做其他的事情
* 内核什么时候把活干完,把结果给你 (有时候快,有时候慢)
*/
class MyThreadPool{
//描述一个任务,直接使用Runnable ,不需要额外创建类
//使用一个数据结构来组织若干任务
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
//描述一个线程,工作线程的功能就是从任务队列中取任务并执行
static class Worker extends Thread{
private BlockingQueue<Runnable> queue = null;
public Worker(BlockingQueue<Runnable> queue){
this.queue = queue;
}
@Override
public void run() {
while(true){
try {
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private List<Thread> workers = new ArrayList<>();
public MyThreadPool(int n){
for (int i = 0; i < n; i++) {
Worker worker = new Worker(queue);
worker.start();
workers.add(worker);
}
}
public void submit(Runnable runnable){
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(10);
for (int i = 0; i < 100; i++) {
myThreadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello threadpool");
}
});
}
}
}