多线程初阶(3)
在程序编程的时候总会遇到很多重复的问题,编程大佬们为我们提供了解决这些问题的方案,就叫做设计模式。
设计模式一共有23种,接下来我会先介绍一种:
1.单例模式
单例模式就是在某些情形下某个类只允许实例化一次。
介绍两个典型的例子:
1.饿汉模式
顾名思义,饿汉模式就是在加载这个类的时候就会创建一个实例,也就是尽早的创建实例。那根据我们的要求,不能在实例化对象了,所以我们用private来修饰构造方法,这样外部就无法调用构造函数,也就无法实例化对象。
创建实例的话需要申请内存空间并且初始化返回地址是需要时间的,如果在使用的时候再实例化的话就需要等待,所以如果在确定需要使用对象的话就尽早创建对象,这样会节约时间,提高效率。
class SingleHungry{private static SingleHungry p = new SingleHungry();private SingleHungry(){}public static SingleHungry getIntance(){return p;}
}
public class Test19 {public static void main(String[] args) {SingleHungry singleHungry = SingleHungry.getIntance();}
}
可以发现如果在main方法中实例化对象是会报错的。这也代表我们要的效果已经实现了。
2.懒汉模式
懒汉模式与饿汉模式相反,也就是尽可能晚的创建实例,因为创建实例的话需要申请内存空间并且初始化返回地址,也会有所开销。如果根本就不需要实例化对象的话这样的行为是浪费时间的行为,面对这种有概率不创建对象的情况,就可以采用懒汉模式提高效率。
class SingleLazy{private static SingleLazy intance = null;private SingleLazy(){}public SingleLazy getIntance(){if(intance == null){intance = new SingleLazy();}return intance;}
}
public class Test20 {public static void main(String[] args) {}
}
可以看见无法 实例化对象,效果已实现。
1.加锁:
我们仔细看这个代码,其实是能发现一些问题的。图片中这段代码是非原子的操作,赋值操作一般是原子的,但是这段代码是存在if语句的,所以该操作不是原子的。
我们画一个时间线:这种情况是会创建出两个实例的,针对这种情况我们需要加锁。
public SingleLazy getIntance(){synchronized (this){if(intance == null){intance = new SingleLazy();}}return intance;}
2.双重if:
在加锁之后还有一点,再画一个时间线并说明情况:
要想解决这个问题,可以在synchronized外面再加一个if语句用来判断intance是否为空。
public SingleLazy getIntance(){if (intance == null){synchronized (this){if(intance == null){intance = new SingleLazy();}}}return intance;}
3.指令重排序问题
当一个线程调用getIntance方法后,进行第一个if语句,获取锁,执行指令,但是实例化对象实际上是三个指令,申请获取到内存,初始化内存,调用构造函数。
由于存在指令重排序,申请到内存和调用构造函数的顺序不固定,有可能在申请到内存并且初始化内存后,另一个线程也调用getIntance方法,此时intance已经不为空,所以会直接返回intance,这时它所指向的内存还没有任何数据,所以这时候再执行一些该对象的方法,是会出现问题的。
解决方案:
将成员变量加上volatile修饰。
private volatile static SingleLazy intance = null;
2.阻塞队列
之前学的数据结构在多线程中是不安全的,Java标准库中提供了线程安全的数据结构,我们先从阻塞队列学起。
构成:
1.生产者
2.消费者
3.交易场所
阻塞队列是一个能保证线程安全的数据结构,它具有两个特点:
1.当队列为空的时候,一个线程如果要取元素就会产生阻塞,直到队列中有元素。
2.当阻塞队列满的时候,一个线程如果想放入元素就会产生阻塞,直到队列不为满的时候。
生产者消费者模型
阻塞队列最常见的用法就是实现生产者消费者模型,他的大概逻辑是这样的:
解耦合
如果只有生产者和消费者进行交互,那么生产者中就一定会有消费者的相关代码,消费者中一定会有生产者的代码,当需要对某段代码进行修改的时候就会很复杂,但是加上阻塞队列作为中间媒介后,由于阻塞队列实现的功能一般是固定的,所以很少需要进行代码修改,所以就在一定程度上降低了代码的耦合性。
削峰填谷
在开发过程中,会遇到某一时刻数据量飞增的情况,在这种情况下就会导致消费者的崩溃,导致程序挂掉,但是有了阻塞队列后,它会将数据进行缓冲和存储,当阻塞队列满后生产者就会进入阻塞状态,等待消费者取出元素后再放入数据,当这段时间过去后,消费者还可以有序的处理阻塞队列中的指令。这就实现了削峰填谷的作用。
有所得就会有所失,加入阻塞队列也会有一些缺陷:
1.结构变得更复杂了
原本只有生产者和消费者进行数据交互,加上阻塞队列就会使结构变得复杂。
2.性能降低
原本生产者产生的数据会直接交给消费者,但加上阻塞队列后,生产者将数据交给队列会消耗时间,队列内部执行任务也需要消耗时间,这就会使消费者需要执行的任务产生堆积,从而影响性能。而且队列里会有锁操作,当多个线程试图获取同一个数据时会产生锁竞争,从而降低性能。
实现阻塞队列的类:
LinkedBlockingQueue 链表实现的阻塞队列,如果不指定容量,默认是Integer.MAX_VALUE。
ArraysBlockingQueue 顺序表实现的阻塞队列,内部通过数组储存数据,容量是固定的。
PriorityBlockingQueue 优先级队列实现的阻塞队列,按照优先级进行出队列操作。
手写阻塞队列:
简单实现一个阻塞队列,需要注意的是要用synchronized来修饰put和take方法,保证线程安全。
class MyBlockingQueue{//存储数据的数组private String[] data;//有效数据的长度private int sz = 0;//有效元素的起始索引private int head = 0;//有效数据的结束索引private int tail = 0;//构造方法,指定阻塞队列容量public MyBlockingQueue(int cap){data = new String[cap];}//存数据public synchronized void put(String s) throws InterruptedException {while (sz >= data.length) {this.wait();}data[tail] = s;tail++;sz++;if (tail >= data.length) tail = 0;this.notify();}//取数据public synchronized String take() throws InterruptedException {while (sz == 0) {this.wait();}String tmp = data[head];head++;sz--;//如果索引超出范围就归零if (head >= data.length) head = 0;this.notify();return tmp;}
}
另一个需要额外注意的点:
这一段代码,为什么要用whil而不用if,原因就是如果wait是被非正常唤醒的话,那这时sz还是>=data.length,所以根本就不满足要唤醒的情况,这样代码运行下去的话会出现问题,所以需要在唤醒之后再次进行判断,以确保满足唤醒条件。
3.线程池
有时候我们需要很多线程,如果一个一个去创建的话会非常低效,所以Java中就为我们提供了线程池这个类(ThreadPoolExecutor)。
我们引入线程的原因就是进程的创建和销毁开销太大,所以就引入了轻量级进程(线程),那随着互联网的发展,我们开始觉得频繁的创建和销毁线程也是一个比较大的开销,所以就引入了线程池这一概念。当创建线程池对象时对其进行初始化,初始化的时候就会创建一定数量的线程,以供使用;当需要使用线程的时候就会拿出来一个线程用来执行任务,执行完后将线程放回线程池。
那我们为什么觉得从线程池中取线程比创建销毁线程效率高,这就涉及到了操作系统层面的操作,一个操作系统 = 内核 + 配套的应用程序,创建线程就需要操作系统内核配合完成,而从线程池中取线程,纯应用程序代码就可以完成。
ThreadPoolExecutor这个类在创建时的参数比较复杂,所以我们学习线程池这个类,重点在于搞清楚这些参数的作用及意义:
查询JDK手册,发现ThreadPoolExecutor类的构造方法有四种,我们选择最后一种最复杂也就是参数最多的方法进行学习。
int corePoolsize(核心线程数):
在创建线程池时会创建一些线程供其他线程使用,当任务数量超出线程池中的线程承载能力时线程池就会继续创建新的线程,但是线程的数量也是有所限制的,这个取决于另一个参数。
int maximumPoolsize (最大线程数):
最大线程数这个参数就决定着线程池中的线程最多有多少,那当除核心线程以外的线程执行完任务后,肯定不能就放在那,会在一定空闲时间后进行销毁,那最大空闲时间就取决于另一个参数。
long keepAliveTime(非核心线程允许空闲的最大时间):
当非核心线程长时间没有任务时就会销毁,而这个允许最大空闲时间就由这个参数决定。但是时间单位有比较多,所以为了方便,还提供了另一个参数 进行时间单位的定义。
TimeUnit unit (时间单位):
我们传入时间单位后就可以实现非核心线程的及时销毁。
BlockingQueue<Runnable> workQueue(工作队列):
我们前面学了阻塞队列的优点,而这些优点正是线程池所需要的,比如说缓冲作用, 当有大量任务传入线程池时,会先进入工作队列而不是直接进入线程池,防止系统因任务过多导致资源耗尽以及性能下降,所以我们可以传入一个工作队列来提高线程池的性能。
ThreadFactory factory(线程工厂):
线程工厂是工厂模式在多线程编程下的具体应用,而工厂模式也是设计模式的一种,工程模式可以弥补构造方法的缺陷,比如说:
class Point{double x;double y;//传入平面直角坐标参数public Point(double x, double y){this.x = x;this.y = y;}//传入极坐标参数public Point(double r, double a){//通过极坐标参数来初始化x,y的值}
}
这样的一段代码是不允许的,编译器会报错:
Java不提供这样的机制,这样的代码不满足方法重载的条件,所以这就对编程造成了一些限制。
通过这样的设计方式就可以解决问题,将类的创建与类剥离开来,单独使用其他组件进行封装。
这种包含其他类的创建的类就叫做工厂类。
class MyThreadFactory{public Point constructByXY(double x, double y){//定义属性Point point = new Point(x,y);return point;}public Point constructByRA(double r, double a){int x = 0,y = 0;//对x,y进行初始化Point point = new Point(x,y);return point;}
}
RejectedExecutionHandler handler(拒绝策略):
当线程池无法再接受其他任务时别的线程再调用submit时线程池就可以采用不同的拒绝策略,拒绝策略有以下几种:
1.ThreadPoolExecutor.AbortPolicy 线程池直接抛出异常。
2.ThreadPoolExecutor.CallerRunsPolicy 让调用submit的线程自行执行任务。
3.ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列中最老的任务。
4.ThreadPoolExecutor.DiscardPolicy 丢弃最新的任务,当前submit的这个任务。
Executor类:
由于ThreadPoolExecutor的参数很复杂,在一些情况下,我们可能只是需要一个简单的线程池,所以Java就提供了Executor接口,使用起来就会非常方便。可以将其理解为一个简易版的线程池。
我们因为Executor是一个接口,所以无法创建对象,但他有一个子接口叫做ExecutorService子接口,并且ThreadPoolExecutor类实现了ExecutorService接口。
Executors接口中的静态方法如图:
在静态方法中,new了一个ThreadPoolExecutor对象。
public class Test25 {public static void main(String[] args) {ExecutorService executorServiceFix = Executors.newFixedThreadPool(10);
// ExecutorService executorServiceCache = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {int id = i;executorServiceFix.submit(()->{System.out.println(Thread.currentThread().getName()+"执行任务" + id);});}}
}
运行结果:
进程并未结束,如果想要结束进程,可以使用shutdown和awaitTermination方法实现进程的关闭,
shutdown方法会更改线程池状态,使其无法再接收新的任务,但会继续执行已提交的任务,包括队列中等待的,而awaitTermination方法会阻塞当前线程,等待线程池中的任务全部执行完。
上面两种方法需要结合使用,先调用shutdown方法使线程池不再接受新的任务,在调用awaitTermination方法阻塞等待任务完成。
手写线程池:
class MyThreadPool{/*创建一个阻塞队列,作为缓冲区,当队列为空时,调用take的线程会进入阻塞,直到队列中传入任务。当队列满时,调用put的线程会进入阻塞,直到队列不为满。*/BlockingQueue<Runnable> q = null;public MyThreadPool(int capacity){//尽晚的创建q = new ArrayBlockingQueue<>(capacity);//创建固定数量的线程for (int i = 0; i < capacity; i++) {Thread thread = new Thread(()->{//每个线程都进行无限循环用来捕获队列中的任务while (true) {Runnable runnable = null;try {runnable = q.take();} catch (InterruptedException e) {throw new RuntimeException(e);}runnable.run();}});thread.start();}}public void submit(Runnable runnable) throws InterruptedException {//将任务放入队列q.put(runnable);}
}public class Test24 {public static void main(String[] args) throws InterruptedException {MyThreadPool myThreadPool = new MyThreadPool(10);for (int i = 0; i < 10; i++) {int id = i;myThreadPool.submit(()->{System.out.println(Thread.currentThread().getName()+"执行任务"+id);});}}
}
运行结果:
可以发现进程并未结束,这是因为我们自己创建的线程都是前台线程,前台线程不结束进程就不会结束。要想让进程结束可以在开启线程前调用setDaemon将线程改为后台线程。
4.定时器
定时器也是多线程中的一个类Timer。当我们需要过一段时间再执行一个任务时就可以使用这个类中的schedule,里面的参数为TimerTask类型的和long类型。分别表示要执行的任务和等待时间,单位默认是毫秒。在等待时当前线程处于阻塞状态,直到到达规定时间。
使用演示:
public class Test26 {public static void main(String[] args) {Timer t = new Timer();TimerTask timerTask = new TimerTask() {@Overridepublic void run() {System.out.println("hello");}};t.schedule(timerTask,1000);}
}
运行结果:
进程未结束,因为Timer内部使用的是前台线程。
手写定时器:
class MyTimerTask implements Comparable<MyTimerTask>{private Runnable runnable = null;private long time;public MyTimerTask(Runnable runnable,long time){this.runnable = runnable;this.time = time;}public Runnable getRunnable(){return this.runnable;}public long getTime(){return this.time;}@Overridepublic int compareTo(MyTimerTask o) {return (int)(this.time - o.time);}
}class MyTimer{private PriorityQueue<MyTimerTask> q = new PriorityQueue<>();private Object locker = new Object();public MyTimer(){Thread t = new Thread(()->{while (true) {synchronized (locker) {try {while (q.isEmpty()) {locker.wait();}} catch (InterruptedException e) {throw new RuntimeException(e);}MyTimerTask tmp = q.peek();if (System.currentTimeMillis() < tmp.getTime()){try {locker.wait(tmp.getTime()-System.currentTimeMillis());} catch (InterruptedException e) {throw new RuntimeException(e);}}else {tmp.getRunnable().run();q.poll();}}}});t.start();}public void schedule(Runnable runnable, int time){synchronized (locker) {MyTimerTask myTimerTask = new MyTimerTask(runnable,time + System.currentTimeMillis());q.add(myTimerTask);locker.notify();}}
}public class Test27 {public static void main(String[] args) {MyTimer timer = new MyTimer();timer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("hello 3000");}}, 3000);timer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("hello 2000");}}, 2000);timer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("hello 1000");}}, 1000);}
}
运行结果: