多线程基础应用
多线程基础应用
参考学习资料
线程的创建
如何创建线程?
方式1:继承Thread类,重写run方法
方式2:实现Runnable接口,重写run方法
方式3:实现Callable接口(对比Runable实现方式,Callable可以有返回值,其返回值通过FutureTask进行封装)
- 实现Callable接口的call方法,并借助FutureTask包装Callable对象,然后通过Thread启动
方式4:使用线程池
(1)ExecutorService:借助Executors创建线程池(Fixed、Cached、Single、Schedule、)
(2)CompletableFuture(本质也是线程池,默认forkjoinpool)可实现任务编排
实现Runnable和Callable接口的类只能当做一个可以在线程中运行的任务,不是真正意义上的线程,其最终还是需要通过Thread来调用(可以说任务是通过线程驱动来执行的)
1.Thread
通过继承Thread类,重写run方法
/**
* 创建线程的方式
* 01-继承Thread类,重写run方法
*/
public class NewThreadDemo1{
public static void main(String[] args) {
MyThread myThread = new MyThread();
// 启动线程
myThread.start();
}
}
// 自定义线程类
class MyThread extends Thread {
@Override
public void run() {
System.out.println("MyThread run ......");
}
}
2.Runnable
通过实现Runnable接口,重写run方法。但启动线程需依赖于Thread类
/**
* 创建线程的方式
* 02-实现Runnable接口,重写run方法(其启动还是要依赖于Thread)
*/
public class NewThreadDemo2 {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
// 启动线程(需借助Thread启动线程)
new Thread(myRunnable).start();
}
}
// 自定义线程类实现Runnable接口
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("MyRunnable run...");
}
}
3.Callable + FutureTask
和Runnable方式类似,通过实现Callable接口并重写call方法,此处还可搭配FutureTask使用(处理线程调用返回结果),但启动线程需依赖于Thread类
/**
* 创建线程的方式
* 03-使用Callable接口,结合FutureTask使用(可处理回调结果),需借助Thread启动线程
*/
public class NewThreadDemo3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建MyCallable对象
MyCallable myCallable = new MyCallable();
// 启动线程(搭配FutureTask处理回调结果,借助Thread启动线程)
FutureTask<Integer> futureTask = new FutureTask<Integer>(myCallable);
new Thread(futureTask).start();
System.out.println("调度处理返回结果:" + futureTask.get());
}
}
// 自定义类实现Callable
class MyCallable implements Callable {
@Override
public Object call() throws Exception {
System.out.println("MyCallable call .....");
return "hello world";
}
}
4.线程池
ExecutorService
(1)ExecutorService + Runable 组合
ExecutorService + Executors 构建线程池,然后自定义线程(Runable类型任务)进行执行
/**
* 创建线程的方式
* 04-线程池(借助ExecutorService、Executors 构建线程池,随后提交任务并执行)
*/
public class NewThreadDemo41 {
public static void main(String[] args) {
// 创建线程池(例如此处创建一个固定线程池大小的线程池,初始化值设定为10)
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new MyTask());
}
// 使用完毕关闭线程池
executorService.shutdown();
}
}
// 自定义任务
class MyTask implements Runnable{
@Override
public void run() {
System.out.println("MyTask run:" + Thread.currentThread().getName());
}
}
(2)ExecutorService + Callable+ FutureTask 组合
class NewTask implements Callable {
// 定义任务相关参数
public String taskNum;
// 提供构造函数(此处用于初始化taskNum)
public NewTask(String taskNum) {
this.taskNum = taskNum;
}
@Override
public Object call() throws Exception {
System.out.println("NewTask run:" + Thread.currentThread().getName() + "执行任务" + taskNum);
return taskNum;
}
}
ExecutorService + Executors 构建线程池,然后自定义线程(Callable类型任务)进行执行。
方式1:收集任务执行结果(手动保存任务的返回)可按任务顺序返回结果
此处针对Callable类型任务,可以通过自定义结果集在执行过程中收集结果,当所有任务执行完成之后就可以处理结果(即定义一个List收集处理结果,便于统一处理)
// ExecutorService+Executors构建线程池,执行Callable类型任务,自定义保存任务返回结果
public static void test01() throws ExecutionException, InterruptedException {
// 创建线程池(例如此处创建一个固定线程池大小的线程池,初始化值设定为10)
ExecutorService es = Executors.newFixedThreadPool(10);
// 定义集合接收处理结果(此处FutureTask可以根据泛型定义指定相应的类型,默认是Object)
List<FutureTask> futureTaskList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
// 启动线程并执行
FutureTask futureTask = new FutureTask(new NewTask(String.valueOf("task" + i)));
es.submit(futureTask);
// 添加处理结果
futureTaskList.add(futureTask);
}
// 执行结束关闭线程池
es.shutdown();
// 处理返回结果
for (int i = 0; i < futureTaskList.size(); i++) {
FutureTask futureTask = futureTaskList.get(i);
System.out.println("处理响应结果:" + futureTask.get());
}
}
方式2:ExecutorCompletionService(其通过使用阻塞队列保存各个任务的返回结果,返回是无序的)结果返回无序,谁先执行完就先入队
ExecutorCompletionService 通过使用阻塞队列保存各个任务的返回结果,返回是无序的。即谁先执行完成(异常、中断)谁先入队,与任务的定义和创建顺序无关
// ExecutorService+Executors构建线程池,执行Callable类型任务,使用ExecutorCompletionService保存返回结果(返回是无序的,谁先执行完成就先入队)
public static void test02() throws InterruptedException, ExecutionException {
// 创建线程池(例如此处创建一个固定线程池大小的线程池,初始化值设定为10)
ExecutorService es = Executors.newFixedThreadPool(10);
ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(es);
for (int i = 0; i < 100; i++) {
executorCompletionService.submit(new NewTask(String.valueOf("task" + i)));
}
// 执行结束关闭线程池
es.shutdown();
// 处理返回结果
for (int i = 0; i < 100; i++) {
// 使用FutureTask接收返回结果(依次从executorCompletionService中取出结果)
FutureTask futureTask = (FutureTask) executorCompletionService.take();
System.out.println("处理响应结果:" + futureTask.get());
}
}
方式3:ExecutorService的invokeAll方法(可用于获取对应任务Task和返回结果的关系)
使用ExecutorService的invokeAll 方法,invokeAll 是用于处理一组任务的方法
invokeAll方法入参为一组任务,返回一组Future,这两个集合是有相同结构的,即它是按照入参的任务集合中迭代器的顺序将所有的Future添加到返回的集合中,从而任务和Future在它们各自的集合中有着同样的顺序。当需要任务和结果的对应关系时,使用invokeAll方法来代替第一种方法
public class InvokeAllDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
new InvokeAllDemo().invokeAllTest();
}
/**
* 方法三:ExecutorService的invokeAll方法
* invokeAll方法入参为一组任务,返回一组Future,这两个集合是有相同结构的,
* 即它是按照入参的任务集合中迭代器的顺序将所有的Future添加到返回的集合中,从而任务和Future在它们各自的集合中有着同样的顺序。
* 当我们需要任务和结果的对应关系时,使用invokeAll方法来代替第一种方法
*/
public void invokeAllTest() throws InterruptedException, ExecutionException {
// 初始化线程池
ExecutorService es = Executors.newFixedThreadPool(10);
// invokeAll 接收的是一组任务,此处先初始化任务列表
List<TestTask> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
tasks.add(new TestTask(i));
}
// 调用invokeAll执行,并使用Future进行接收
List<Future<Integer>> futures = es.invokeAll(tasks); // 此处需注意TestTask的定义,需指定泛型类型,否则编译转化cue红线
//执行完成关闭线程池
es.shutdown();
// 遍历返回结果
for (int i = 0; i < futures.size(); i++) {
System.out.println("index:" + i + ",future:" + futures.get(i).get());
}
}
}
/**
* 测试任务,返回任务的序号
*/
class TestTask implements Callable<Integer> {
int index;
public TestTask(int index) {
this.index = index;
}
@Override
public Integer call() throws Exception {
return index;
}
}
CompletableFuture
CompletableFuture(Java8引入) 也是一种基于线程池的方式,它可以支任务并行执行、任务编排的场景。实际项目中,一个接口可能需要同时获取多种不同的数据,然后再汇总返回,这种场景还是挺常见的。举个例子:用户请求获取订单信息,可能需要同时获取用户信息、商品详情、物流信息、商品推荐等数据。
如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。考虑到这些任务之间有大部分都是 无前后顺序关联 的,可以 并行执行 ,就比如说调用获取商品详情的时候,可以同时调用获取物流信息。通过并行执行多个任务的方式,接口的响应速度会得到大幅优化

Future 概念
Future 类是异步思想的典型运用,主要用在一些需要执行耗时任务的场景,避免程序一直原地等待耗时任务执行完成,执行效率太低。具体来说是这样的:当执行某一耗时的任务时,可以将这个耗时任务交给一个子线程去异步执行,同时可以干点其他事情,不用傻傻等待耗时任务执行完成。等事情干完后,再通过 Future 类获取到耗时任务的执行结果,以此提升程序的执行效率这其实就是多线程中经典的 Future 模式,可以将其看作是一种设计模式,核心思想是异步调用,主要用在多线程领域,并非 Java 语言独有。
在 Java 中,Future 类只是一个泛型接口,位于 java.util.concurrent 包下,其中定义了 5 个方法,主要包括下面这 4 个功能:
- 取消任务
- 判断任务是否被取消
- 判断任务是否已经执行完成
- 获取任务执行结果
// V 代表了Future执行的任务返回值的类型
public interface Future<V> {
// 取消任务执行
// 成功取消返回 true,否则返回 false
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否被取消
boolean isCancelled();
// 判断任务是否已经执行完成
boolean isDone();
// 获取任务执行结果
V get() throws InterruptedException, ExecutionException;
// 指定时间内没有返回计算结果就抛出 TimeOutException 异常
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
结合CompletableFuture源码可以看到其实现了Future、CompletionStage接口,以此提供异步调度和任务编排机制
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
...
}
对于原生的Future类,只能通过轮询或者阻塞的方式来获取其结果,而不断的轮询又会消耗CPU,同时只能在获得结果之后手动做逻辑处理。而CompletableFuture拓展了Future,简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了各种转换以及组合的相关API
简单CpmpletableFuture案例:
/**
* 创建线程方式
* 05-CompletableFuture方式
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 定义程序执行开始时间
long start = System.currentTimeMillis();
// 有返回值的任务(设定此处返回String,通过Lambda表达式简化任务定义)
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(()->{
// 模拟任务执行
executeTask("cf1",1);
return "hello world";
});
// 无返回值的任务
CompletableFuture cf2 = CompletableFuture.runAsync(()->{
executeTask("cf2",2);
});
// 任务编排:此处设定阻塞等待所有异步任务执行结束(即并行执行cf1、cf2,主线程阻塞等待所有任务结束)
CompletableFuture.allOf(cf1,cf2).get();
// 定义程序执行结束时间
long end = System.currentTimeMillis();
System.out.println("本次执行消耗时间为:" + (end - start) + "ms");
}
public static void executeTask(String taskType,int seconds) {
System.out.println(taskType + "模拟任务执行,沉睡指定秒数" + seconds + "s");
try{
Thread.sleep(seconds * 1000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
// output
cf2模拟任务执行,沉睡指定秒数2s
cf1模拟任务执行,沉睡指定秒数1s
本次执行消耗时间为:2011ms
主线程 VS 守护线程
Java的线程分为两种:用户线程、守护线程。默认情况下创建出来的线程都是用户线程。守护线程的优先级比较低,用于为系统中的其它对象和线程提供服务,例如Java自带的GC功能,就是通过守护线程来执行的。
CompletableFuture在默认的情况下(不传入指定线程池),异步任务采用的线程是通过ForkJoinPool创建出来的子线程,是守护线程,参考验证代码如下
// CompletableFuture 主线程、守护线程 说明(验证默认情况下,如果不指定线程池,异步任务的线程采用的是通过ForkJoinPool创建的子线程,是守护线程)
public static void test02() throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.runAsync(()->{
// 模拟任务执行
executeTask("cf",2);
// 判断当前线程是否为守护线程
boolean isDaemon = Thread.currentThread().isDaemon();
System.out.println(Thread.currentThread().getName() + (isDaemon?"是":"不是") + "守护线程");
});
// 主线程等待任务执行
CompletableFuture.allOf(cf).get();
System.out.println("任务执行完成");
}
守护线程的生命周期案例理解:
- 守护线程会等到主线程执行完毕后也跟着结束(守护线程的生命周期)
- 对主线程来说, 运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕, 主线程才算运行完毕
// CompletableFuture 守护线程生命周期案例理解
public static void test03() throws ExecutionException, InterruptedException {
// 记录线程执行时间
long start = System.currentTimeMillis();
CompletableFuture cf = CompletableFuture.runAsync(()->{
// 模拟任务执行(执行5s)
executeTask("cf",5);
// 判断当前线程是否为守护线程
boolean isDaemon = Thread.currentThread().isDaemon();
System.out.println(Thread.currentThread().getName() + (isDaemon?"是":"不是") + "守护线程");
});
// 主线程等待任务执行
CompletableFuture.allOf(cf); // 此处没有调用get()方法进行阻塞
long end = System.currentTimeMillis();
System.out.println("任务执行完成,共计耗时:" + (end - start) + "ms");
}
// output
任务执行完成,共计耗时:2ms
从上述案例可以看到,异步任务并没有执行预期输出,也就是说在异步任务执行的过程中主线程关闭了,通过CompleableFuture默认方式创建的异步任务(守护线程)也会随之结束。也就是说主线程执行了2ms就结束,主线程关闭则不管守护线程是否执行完成都会随之结束。要想验证这个结论的正确性,可以试着让主线程沉睡更长的一段时间,看这段时间内守护线程是否能正常完成
// CompletableFuture 守护线程生命周期案例理解
public static void test03() throws ExecutionException, InterruptedException {
// 记录线程执行时间
long start = System.currentTimeMillis();
CompletableFuture cf = CompletableFuture.runAsync(()->{
// 模拟任务执行(执行5s)
executeTask("cf",5);
// 判断当前线程是否为守护线程
boolean isDaemon = Thread.currentThread().isDaemon();
System.out.println(Thread.currentThread().getName() + (isDaemon?"是":"不是") + "守护线程");
});
// 主线程等待任务执行
CompletableFuture.allOf(cf); // 此处没有调用get()方法进行阻塞
// 主线程模拟耗时(验证守护线程的生命周期)
// executeTask("主线程",10);
long end = System.currentTimeMillis();
System.out.println("任务执行完成,共计耗时:" + (end - start) + "ms");
}
// output
任务执行完成,共计耗时:2ms
从上述测试案例结果分析,当主线程执行时间较短时,可能主线程执行结束了,但是此时通过CompleableFuture默认方式创建的异步任务(守护线程)还没执行结束,则不管该守护线程是否执行完成都会被结束;当适当增加主线程的执行时间,可以看到在一定的时间设定下守护线程执行完成了,也就验证了**"守护线程的生命周期是随着主线程结束而结束的"**
可以进一步补充一个自定义创建的线程的案例测试(参考基础的线程执行案例),一般都是用户线程,主线程会等所有非守护线程运行结束才结束
// 自定义线程 属性验证
public static void test04() throws ExecutionException, InterruptedException {
// 记录线程执行时间
long start = System.currentTimeMillis();
// 自定义普通线程
new Thread(() -> {
boolean isDaemon = Thread.currentThread().isDaemon();
System.out.println(Thread.currentThread().getName() + (isDaemon ? "是" : "不是") + "守护线程");
executeTask("我是自定义线程", 5); // 模拟普通线程执行5s
}).start();
long end = System.currentTimeMillis();
System.out.println("任务执行完成,共计耗时:" + (end - start) + "ms");
}
// output
任务执行完成,共计耗时:0ms
Thread-0不是守护线程
我是自定义线程模拟任务执行,沉睡指定秒数5s
# 从结果分析可知,虽然主线程很快执行完毕,但是自定义的线程并不是守护线程,虽然后面的语句先打印了,但实际上异步任务(非守护线程)还没执行完毕的情况下主线程是不会结束的
为了解决这个问题,有两种方案:
- 【方案1】让主线程阻塞等待:基于CompletableFuture默认方式创建的守护线程这种方式,可以通过调用
get()方法让主线程进入阻塞状态,等待守护线程(可以参考基础案例) - 【方案2】:通过自定义线程池,替代CompletableFuture默认的ForkJoinPool
// CompletableFuture 自定义线程池替换CompletableFuture默认的ForkJoinPool
public static void test05() throws ExecutionException, InterruptedException {
// 通过ExecutorService创建线程池
ExecutorService es = Executors.newFixedThreadPool(10);
// 记录线程执行时间
long start = System.currentTimeMillis();
CompletableFuture cf = CompletableFuture.runAsync(() -> {
// 判断当前线程是否为守护线程
boolean isDaemon = Thread.currentThread().isDaemon();
System.out.println(Thread.currentThread().getName() + (isDaemon ? "是" : "不是") + "守护线程");
// 模拟任务执行(执行5s)
executeTask("cf", 5);
System.out.println("当前任务执行完成");
},es);
// 主线程等待任务执行
CompletableFuture.allOf(cf); // 此处没有调用get()方法进行阻塞,而是使用自定义线程池替换原有的默认的ForkJoinPool
// 任务执行完成,关闭线程池
es.shutdown();
long end = System.currentTimeMillis();
System.out.println("任务执行完成,共计耗时:" + (end - start) + "ms");
}
// output
任务执行完成,共计耗时:3ms
pool-1-thread-1不是守护线程
cf模拟任务执行,沉睡指定秒数5s
当前任务执行完成
