跳至主要內容

多线程基础应用

holic-x...大约 15 分钟碎片化碎片化

多线程基础应用

参考学习资料

线程的创建

如何创建线程?

  • 方式1:继承Thread类,重写run方法

  • 方式2:实现Runnable接口,重写run方法

  • 方式3:实现Callable接口(对比Runable实现方式,Callable可以有返回值,其返回值通过FutureTask进行封装)

    • 实现Callable接口的call方法,并借助FutureTask包装Callable对象,然后通过Thread启动
  • 方式4:使用线程池

    • (1)ExecutorService:借助Executors创建线程池(Fixed、Cached、Single、Schedule、)

    • (2)CompletableFuture(本质也是线程池,默认forkjoinpool)可实现任务编排

​ 实现Runnable和Callable接口的类只能当做一个可以在线程中运行的任务,不是真正意义上的线程,其最终还是需要通过Thread来调用(可以说任务是通过线程驱动来执行的)

1.Thread

​ 通过继承Thread类,重写run方法

/**
 * 创建线程的方式
 * 01-继承Thread类,重写run方法
 */
public class NewThreadDemo1{
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        // 启动线程
        myThread.start();
    }
}

// 自定义线程类
class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("MyThread run ......");
    }
}

2.Runnable

​ 通过实现Runnable接口,重写run方法。但启动线程需依赖于Thread类

/**
 * 创建线程的方式
 * 02-实现Runnable接口,重写run方法(其启动还是要依赖于Thread)
 */
public class NewThreadDemo2 {
    public static void main(String[] args) {
        MyRunnable myRunnable = new MyRunnable();
        // 启动线程(需借助Thread启动线程)
        new Thread(myRunnable).start();
    }
}

// 自定义线程类实现Runnable接口
class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("MyRunnable run...");
    }
}

3.Callable + FutureTask

​ 和Runnable方式类似,通过实现Callable接口并重写call方法,此处还可搭配FutureTask使用(处理线程调用返回结果),但启动线程需依赖于Thread类

/**
 * 创建线程的方式
 * 03-使用Callable接口,结合FutureTask使用(可处理回调结果),需借助Thread启动线程
 */
public class NewThreadDemo3 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建MyCallable对象
        MyCallable myCallable = new MyCallable();
        // 启动线程(搭配FutureTask处理回调结果,借助Thread启动线程)
        FutureTask<Integer> futureTask = new FutureTask<Integer>(myCallable);
        new Thread(futureTask).start();
        System.out.println("调度处理返回结果:" + futureTask.get());
    }
}

// 自定义类实现Callable
class MyCallable implements Callable {
    @Override
    public Object call() throws Exception {
        System.out.println("MyCallable call .....");
        return "hello world";
    }
}

4.线程池

ExecutorService

(1)ExecutorService + Runable 组合

ExecutorService + Executors 构建线程池,然后自定义线程(Runable类型任务)进行执行

/**
 * 创建线程的方式
 * 04-线程池(借助ExecutorService、Executors 构建线程池,随后提交任务并执行)
 */
public class NewThreadDemo41 {
    public static void main(String[] args) {
        // 创建线程池(例如此处创建一个固定线程池大小的线程池,初始化值设定为10)
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new MyTask());
        }
        // 使用完毕关闭线程池
        executorService.shutdown();
    }
}

// 自定义任务
class MyTask implements Runnable{
    @Override
    public void run() {
        System.out.println("MyTask run:" + Thread.currentThread().getName());
    }
}
(2)ExecutorService + Callable+ FutureTask 组合
class NewTask implements Callable {

    // 定义任务相关参数
    public String taskNum;

    // 提供构造函数(此处用于初始化taskNum)
    public NewTask(String taskNum) {
        this.taskNum = taskNum;
    }

    @Override
    public Object call() throws Exception {
        System.out.println("NewTask run:" + Thread.currentThread().getName() + "执行任务" + taskNum);
        return taskNum;
    }
}

​ ExecutorService + Executors 构建线程池,然后自定义线程(Callable类型任务)进行执行。

方式1:收集任务执行结果(手动保存任务的返回)可按任务顺序返回结果

​ 此处针对Callable类型任务,可以通过自定义结果集在执行过程中收集结果,当所有任务执行完成之后就可以处理结果(即定义一个List收集处理结果,便于统一处理

// ExecutorService+Executors构建线程池,执行Callable类型任务,自定义保存任务返回结果
public static void test01() throws ExecutionException, InterruptedException {
    // 创建线程池(例如此处创建一个固定线程池大小的线程池,初始化值设定为10)
    ExecutorService es = Executors.newFixedThreadPool(10);
    // 定义集合接收处理结果(此处FutureTask可以根据泛型定义指定相应的类型,默认是Object)
    List<FutureTask> futureTaskList = new ArrayList<>();
    for (int i = 0; i < 100; i++) {
        // 启动线程并执行
        FutureTask futureTask = new FutureTask(new NewTask(String.valueOf("task" + i)));
        es.submit(futureTask);
        // 添加处理结果
        futureTaskList.add(futureTask);
    }
    // 执行结束关闭线程池
    es.shutdown();
    // 处理返回结果
    for (int i = 0; i < futureTaskList.size(); i++) {
        FutureTask futureTask = futureTaskList.get(i);
        System.out.println("处理响应结果:" + futureTask.get());
    }
}

方式2:ExecutorCompletionService(其通过使用阻塞队列保存各个任务的返回结果,返回是无序的)结果返回无序,谁先执行完就先入队

​ ExecutorCompletionService 通过使用阻塞队列保存各个任务的返回结果,返回是无序的。即谁先执行完成(异常、中断)谁先入队,与任务的定义和创建顺序无关

// ExecutorService+Executors构建线程池,执行Callable类型任务,使用ExecutorCompletionService保存返回结果(返回是无序的,谁先执行完成就先入队)
public static void test02() throws InterruptedException, ExecutionException {
    // 创建线程池(例如此处创建一个固定线程池大小的线程池,初始化值设定为10)
    ExecutorService es = Executors.newFixedThreadPool(10);
    ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(es);
    for (int i = 0; i < 100; i++) {
        executorCompletionService.submit(new NewTask(String.valueOf("task" + i)));
    }
    // 执行结束关闭线程池
    es.shutdown();
    // 处理返回结果
    for (int i = 0; i < 100; i++) {
        // 使用FutureTask接收返回结果(依次从executorCompletionService中取出结果)
        FutureTask futureTask = (FutureTask) executorCompletionService.take();
        System.out.println("处理响应结果:" + futureTask.get());
    }
}

方式3:ExecutorService的invokeAll方法(可用于获取对应任务Task和返回结果的关系

​ 使用ExecutorService的invokeAll 方法,invokeAll 是用于处理一组任务的方法

​ invokeAll方法入参为一组任务,返回一组Future,这两个集合是有相同结构的,即它是按照入参的任务集合中迭代器的顺序将所有的Future添加到返回的集合中,从而任务和Future在它们各自的集合中有着同样的顺序。当需要任务和结果的对应关系时,使用invokeAll方法来代替第一种方法

public class InvokeAllDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        new InvokeAllDemo().invokeAllTest();
    }

    /**
     * 方法三:ExecutorService的invokeAll方法
     * invokeAll方法入参为一组任务,返回一组Future,这两个集合是有相同结构的,
     * 即它是按照入参的任务集合中迭代器的顺序将所有的Future添加到返回的集合中,从而任务和Future在它们各自的集合中有着同样的顺序。
     * 当我们需要任务和结果的对应关系时,使用invokeAll方法来代替第一种方法
     */
    public void invokeAllTest() throws InterruptedException, ExecutionException {
        // 初始化线程池
        ExecutorService es = Executors.newFixedThreadPool(10);

        // invokeAll 接收的是一组任务,此处先初始化任务列表
        List<TestTask> tasks = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            tasks.add(new TestTask(i));
        }

        // 调用invokeAll执行,并使用Future进行接收
        List<Future<Integer>> futures = es.invokeAll(tasks); // 此处需注意TestTask的定义,需指定泛型类型,否则编译转化cue红线

        //执行完成关闭线程池
        es.shutdown();

        // 遍历返回结果
        for (int i = 0; i < futures.size(); i++) {
            System.out.println("index:" + i + ",future:" + futures.get(i).get());
        }
    }

}

/**
 * 测试任务,返回任务的序号
 */
class TestTask implements Callable<Integer> {
    int index;

    public TestTask(int index) {
        this.index = index;
    }

    @Override
    public Integer call() throws Exception {
        return index;
    }
}

CompletableFuture

​ CompletableFuture(Java8引入) 也是一种基于线程池的方式,它可以支任务并行执行、任务编排的场景。实际项目中,一个接口可能需要同时获取多种不同的数据,然后再汇总返回,这种场景还是挺常见的。举个例子:用户请求获取订单信息,可能需要同时获取用户信息、商品详情、物流信息、商品推荐等数据。

​ 如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。考虑到这些任务之间有大部分都是 无前后顺序关联 的,可以 并行执行 ,就比如说调用获取商品详情的时候,可以同时调用获取物流信息。通过并行执行多个任务的方式,接口的响应速度会得到大幅优化

image-20241013211552525

Future 概念

Future 类是异步思想的典型运用,主要用在一些需要执行耗时任务的场景,避免程序一直原地等待耗时任务执行完成,执行效率太低。具体来说是这样的:当执行某一耗时的任务时,可以将这个耗时任务交给一个子线程去异步执行,同时可以干点其他事情,不用傻傻等待耗时任务执行完成。等事情干完后,再通过 Future 类获取到耗时任务的执行结果,以此提升程序的执行效率这其实就是多线程中经典的 Future 模式,可以将其看作是一种设计模式,核心思想是异步调用,主要用在多线程领域,并非 Java 语言独有。

在 Java 中,Future 类只是一个泛型接口,位于 java.util.concurrent 包下,其中定义了 5 个方法,主要包括下面这 4 个功能:

  • 取消任务
  • 判断任务是否被取消
  • 判断任务是否已经执行完成
  • 获取任务执行结果
// V 代表了Future执行的任务返回值的类型
public interface Future<V> {
    // 取消任务执行
    // 成功取消返回 true,否则返回 false
    boolean cancel(boolean mayInterruptIfRunning);
    // 判断任务是否被取消
    boolean isCancelled();
    // 判断任务是否已经执行完成
    boolean isDone();
    // 获取任务执行结果
    V get() throws InterruptedException, ExecutionException;
    // 指定时间内没有返回计算结果就抛出 TimeOutException 异常
    V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

​ 结合CompletableFuture源码可以看到其实现了Future、CompletionStage接口,以此提供异步调度和任务编排机制

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
	...
}

​ 对于原生的Future类,只能通过轮询或者阻塞的方式来获取其结果,而不断的轮询又会消耗CPU,同时只能在获得结果之后手动做逻辑处理。而CompletableFuture拓展了Future,简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了各种转换以及组合的相关API

​ 简单CpmpletableFuture案例:

/**
 * 创建线程方式
 * 05-CompletableFuture方式
 */
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 定义程序执行开始时间
        long start = System.currentTimeMillis();

        // 有返回值的任务(设定此处返回String,通过Lambda表达式简化任务定义)
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(()->{
            // 模拟任务执行
            executeTask("cf1",1);
            return "hello world";
        });

        // 无返回值的任务
        CompletableFuture cf2 = CompletableFuture.runAsync(()->{
           executeTask("cf2",2);
        });

        // 任务编排:此处设定阻塞等待所有异步任务执行结束(即并行执行cf1、cf2,主线程阻塞等待所有任务结束)
        CompletableFuture.allOf(cf1,cf2).get();

        // 定义程序执行结束时间
        long end = System.currentTimeMillis();
        System.out.println("本次执行消耗时间为:" + (end - start) + "ms");
    }

    public static void executeTask(String taskType,int seconds) {
        System.out.println(taskType + "模拟任务执行,沉睡指定秒数" + seconds + "s");
        try{
            Thread.sleep(seconds * 1000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

// output
cf2模拟任务执行,沉睡指定秒数2s
cf1模拟任务执行,沉睡指定秒数1s
本次执行消耗时间为:2011ms

主线程 VS 守护线程

​ Java的线程分为两种:用户线程、守护线程。默认情况下创建出来的线程都是用户线程。守护线程的优先级比较低,用于为系统中的其它对象和线程提供服务,例如Java自带的GC功能,就是通过守护线程来执行的。

CompletableFuture在默认的情况下(不传入指定线程池),异步任务采用的线程是通过ForkJoinPool创建出来的子线程,是守护线程,参考验证代码如下

// CompletableFuture 主线程、守护线程 说明(验证默认情况下,如果不指定线程池,异步任务的线程采用的是通过ForkJoinPool创建的子线程,是守护线程)
public static void test02() throws ExecutionException, InterruptedException {

    CompletableFuture cf = CompletableFuture.runAsync(()->{
        // 模拟任务执行
        executeTask("cf",2);
        // 判断当前线程是否为守护线程
        boolean isDaemon = Thread.currentThread().isDaemon();
        System.out.println(Thread.currentThread().getName() + (isDaemon?"是":"不是") + "守护线程");
    });

    // 主线程等待任务执行
    CompletableFuture.allOf(cf).get();
    System.out.println("任务执行完成");

}

​ 守护线程的生命周期案例理解:

  • 守护线程会等到主线程执行完毕后也跟着结束(守护线程的生命周期)
  • 对主线程来说, 运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕, 主线程才算运行完毕
// CompletableFuture 守护线程生命周期案例理解
public static void test03() throws ExecutionException, InterruptedException {

    // 记录线程执行时间
    long start = System.currentTimeMillis();

    CompletableFuture cf = CompletableFuture.runAsync(()->{
        // 模拟任务执行(执行5s)
        executeTask("cf",5);
        // 判断当前线程是否为守护线程
        boolean isDaemon = Thread.currentThread().isDaemon();
        System.out.println(Thread.currentThread().getName() + (isDaemon?"是":"不是") + "守护线程");
    });

    // 主线程等待任务执行
    CompletableFuture.allOf(cf); // 此处没有调用get()方法进行阻塞

    long end = System.currentTimeMillis();
    System.out.println("任务执行完成,共计耗时:" + (end - start) + "ms");
}

// output
任务执行完成,共计耗时:2ms

​ 从上述案例可以看到,异步任务并没有执行预期输出,也就是说在异步任务执行的过程中主线程关闭了,通过CompleableFuture默认方式创建的异步任务(守护线程)也会随之结束。也就是说主线程执行了2ms就结束,主线程关闭则不管守护线程是否执行完成都会随之结束。要想验证这个结论的正确性,可以试着让主线程沉睡更长的一段时间,看这段时间内守护线程是否能正常完成

// CompletableFuture 守护线程生命周期案例理解
public static void test03() throws ExecutionException, InterruptedException {

    // 记录线程执行时间
    long start = System.currentTimeMillis();

    CompletableFuture cf = CompletableFuture.runAsync(()->{
        // 模拟任务执行(执行5s)
        executeTask("cf",5);
        // 判断当前线程是否为守护线程
        boolean isDaemon = Thread.currentThread().isDaemon();
        System.out.println(Thread.currentThread().getName() + (isDaemon?"是":"不是") + "守护线程");
    });

    // 主线程等待任务执行
    CompletableFuture.allOf(cf); // 此处没有调用get()方法进行阻塞

    // 主线程模拟耗时(验证守护线程的生命周期)
    // executeTask("主线程",10);

    long end = System.currentTimeMillis();
    System.out.println("任务执行完成,共计耗时:" + (end - start) + "ms");
}

// output
任务执行完成,共计耗时:2ms

​ 从上述测试案例结果分析,当主线程执行时间较短时,可能主线程执行结束了,但是此时通过CompleableFuture默认方式创建的异步任务(守护线程)还没执行结束,则不管该守护线程是否执行完成都会被结束;当适当增加主线程的执行时间,可以看到在一定的时间设定下守护线程执行完成了,也就验证了**"守护线程的生命周期是随着主线程结束而结束的"**

​ 可以进一步补充一个自定义创建的线程的案例测试(参考基础的线程执行案例),一般都是用户线程,主线程会等所有非守护线程运行结束才结束

// 自定义线程 属性验证
public static void test04() throws ExecutionException, InterruptedException {

    // 记录线程执行时间
    long start = System.currentTimeMillis();

    // 自定义普通线程
    new Thread(() -> {
        boolean isDaemon = Thread.currentThread().isDaemon();
        System.out.println(Thread.currentThread().getName() + (isDaemon ? "是" : "不是") + "守护线程");
        executeTask("我是自定义线程", 5); // 模拟普通线程执行5s
    }).start();

    long end = System.currentTimeMillis();
    System.out.println("任务执行完成,共计耗时:" + (end - start) + "ms");
}

// output
任务执行完成,共计耗时:0ms
Thread-0不是守护线程
我是自定义线程模拟任务执行,沉睡指定秒数5s
    
# 从结果分析可知,虽然主线程很快执行完毕,但是自定义的线程并不是守护线程,虽然后面的语句先打印了,但实际上异步任务(非守护线程)还没执行完毕的情况下主线程是不会结束的

​ 为了解决这个问题,有两种方案:

  • 【方案1】让主线程阻塞等待:基于CompletableFuture默认方式创建的守护线程这种方式,可以通过调用get()方法让主线程进入阻塞状态,等待守护线程(可以参考基础案例)
  • 【方案2】:通过自定义线程池,替代CompletableFuture默认的ForkJoinPool
// CompletableFuture 自定义线程池替换CompletableFuture默认的ForkJoinPool
public static void test05() throws ExecutionException, InterruptedException {

    // 通过ExecutorService创建线程池
    ExecutorService es = Executors.newFixedThreadPool(10);

    // 记录线程执行时间
    long start = System.currentTimeMillis();

    CompletableFuture cf = CompletableFuture.runAsync(() -> {
        // 判断当前线程是否为守护线程
        boolean isDaemon = Thread.currentThread().isDaemon();
        System.out.println(Thread.currentThread().getName() + (isDaemon ? "是" : "不是") + "守护线程");
        // 模拟任务执行(执行5s)
        executeTask("cf", 5);
        System.out.println("当前任务执行完成");
    },es);

    // 主线程等待任务执行
    CompletableFuture.allOf(cf); // 此处没有调用get()方法进行阻塞,而是使用自定义线程池替换原有的默认的ForkJoinPool

    // 任务执行完成,关闭线程池
    es.shutdown();

    long end = System.currentTimeMillis();
    System.out.println("任务执行完成,共计耗时:" + (end - start) + "ms");
}

// output
任务执行完成,共计耗时:3ms
pool-1-thread-1不是守护线程
cf模拟任务执行,沉睡指定秒数5s
当前任务执行完成
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.1.3