使用线程池 Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。 所以引入了线程池 的概念。
简单地说,线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理
ExecutorService Java标准库提供了ExecutorService
接口表示线程池
1 2 3 4 5 6 7 8 9 ExecutorService executorService = Executors.newFixedThreadPool(3 );for (int i = 0 ; i < 10 ; i++) { executorService.submit(new Task (Integer.toString(i))); } executorService.shutdown();
这里首先会执行
前三个任务,其他的任务会入队列等待,然后等待任务执行完再从任务队列一个个取出来。
执行executorService.shutdown();
他会等待线程所有线程执行完才会关闭。但不回阻塞语句。executorService.shutdownNow();
执行这段他会立即停止正在执行的线程。awaitTermination(long timeout, TimeUnit unit)
它会根据传入的参数,检测超过设置的时长时线程池会不会结束。如果没结束返回false.
组合使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ExecutorService executorService = Executors.newFixedThreadPool(3 ); for (int i = 0 ; i < 15 ; i++) { executorService.submit(new Task (Integer.toString(i))); } executorService.shutdown(); try { if (!executorService.awaitTermination(3 , TimeUnit.SECONDS)){ System.out.println("线程池还未关闭即将强制关闭线程池" ); executorService.shutdownNow(); } } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("over" ); }
如果我们想把线程池的大小限制在4~10个之间动态调整怎么办? 使用CachedThreadPool
1 2 3 4 5 6 int min = 4 ;int max = 15 ;ExecutorService executorService = new ThreadPoolExecutor (min, max, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>() );
这样就能动态创建4-15个线程。 但是这里用的时 SynchronousQueue这是一个没有容量的队列。所以一但超过max就会抛出异常。Java并发基础:SynchronousQueue全面解析! - 知乎
CallerRunsPolicy 这里可以使用拒绝执行策略 CallerRunsPolicy
这样就可有把超出的数量放在之前的创建的线程中,不过这样会影响性能。 callerRunsPolicy他会在线程超出范围时执行把任务丢给哪个执行它的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) { int min = 4 ; int max = 10 ; ExecutorService executorService = new ThreadPoolExecutor (min, max, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>(), new ThreadPoolExecutor .CallerRunsPolicy() ); for (int i = 0 ; i < 15 ; i++) { executorService.submit(new Task (Integer.toString(i))); } executorService.shutdown(); }
输出结果如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 start : 10 Threadmain start : 1 Threadpool-1-thread-2 start : 7 Threadpool-1-thread-8 start : 6 Threadpool-1-thread-7 start : 5 Threadpool-1-thread-6 start : 4 Threadpool-1-thread-5 start : 0 Threadpool-1-thread-1 start : 9 Threadpool-1-thread-10 start : 8 Threadpool-1-thread-9 start : 2 Threadpool-1-thread-3 start : 3 Threadpool-1-thread-4 end : 7 end : 10 end : 1 start : 11 Threadmain end : 0 end : 5 end : 2 end : 8 end : 9 end : 3 end : 4 end : 6 end : 11 start : 12 Threadpool-1-thread-7 start : 14 Threadpool-1-thread-4 start : 13 Threadpool-1-thread-5 end : 12 end : 13 end : 14
演示一下执行过程:
创建线程池设置参数
for循环从0-9 线程池执行任务此时线程池已经满了。
for循环到第10个时,SynchronousQueue无法存储任务。所以直接丢给main线程来执行,for循环阻塞。
当main线程执行完task时,发现还没有线程空闲,main线程继续执行第11个任务。
当main线程执行完第11个任务,发现有线程空闲,将后面的线程继续往空闲的线程里放。
LinkedBlockingQueue 或者可以使用有界队列 LinkedBlockingQueue<>(‘队列大小’) 可以指定队列存储任务的大小,也可以不指定(无限)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main(String[] args) { int min = 4; int max = 5; ExecutorService executorService = new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy() ); for (int i = 0; i < 15; i++) { executorService.submit(new Task(Integer.toString(i))); } executorService.shutdown(); }
ScheduledThreadPool 还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool
。放入ScheduledThreadPool
的任务可以定期反复执行
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4 ); scheduledExecutorService.schedule(new Task ("one-time" ),1 ,TimeUnit.SECONDS); scheduledExecutorService.scheduleAtFixedRate(new Task ("fixed-rate" ),2 ,3 ,TimeUnit.SECONDS); scheduledExecutorService.scheduleWithFixedDelay(new Task ("fixed-delay" ),2 ,3 ,TimeUnit.SECONDS); }
注意FixedRate和FixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:
1 2 3 │░░░░ │░░░░░░ │░░░ │░░░░░ │░░░ ├───────┼───────┼───────┼───────┼────▶ │◀─────▶│◀─────▶│◀─────▶│◀─────▶│
而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:
1 2 3 │░░░│ │░░░░░│ │░░│ │░ └───┼───────┼─────┼───────┼──┼───────┼──▶ │◀─────▶│ │◀─────▶│ │◀─────▶│
因此,使用ScheduledThreadPool
时,我们要根据需要选择执行一次、FixedRate执行还是FixedDelay执行。
注意 :在使用FixedRate如果任务执行时间超过了固定触发时间。那么就不会执行新任务,会等待。指导任务完成后立马触发任务。 FixedDelay中超过时间,则会等待任务完成后再等待指定时间触发任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) { int min = 4 ; int max = 5 ; ExecutorService executorService = new ThreadPoolExecutor (min, max, 60L , TimeUnit.SECONDS, new LinkedBlockingQueue <>(10 ), new ThreadPoolExecutor .CallerRunsPolicy() ); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4 ); scheduledExecutorService.scheduleAtFixedRate(()->{ executorService.submit(new Task ("fixed-rate" )); },1 ,1 ,TimeUnit.SECONDS); }
这样就可以异步执行
一旦任务抛出未捕获的异常,该定时任务就会被永久停止,不会再继续执行后续的调度。
使用Future 在执行多个任务的时候,使用Java标准库提供的线程池是非常方便的。我们提交的任务只需要实现Runnable
接口,就可以让线程池去执行Runnable
接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable
接口,和Runnable
接口比,它多了一个返回值:
1 2 3 4 5 6 7 8 public class Task1 implements Callable <String> { @Override public String call () throws InterruptedException { Thread.sleep(1000 ); return new Date ().toLocaleString(); } }
并且Callable
接口是一个泛型接口,可以返回指定类型的结果。
现在的问题是,如何获得异步执行的结果?
如果仔细看ExecutorService.submit()
方法,可以看到,它返回了一个Future
类型,一个Future
类型的实例代表一个未来能获取结果的对象:
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(4 ); Callable<String> task1 = new Task1 (); Future<String>future = es.submit(task1); String res = future.get(); System.out.println(res); }
当我们提交一个Callable
任务后,我们会同时获得一个Future
对象,然后,我们在主线程某个时刻调用Future
对象的get()
方法,就可以获得异步执行的结果。在调用get()
时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()
会阻塞,直到任务完成后才返回结果。
1 2 3 4 5 6 ExecutorService es = Executors.newFixedThreadPool(4 );for (int i = 0 ; i < 5 ; i++) { Callable<String>ca = new Task1 (); Future<String> fu=es.submit(ca); System.out.println(fu.get()); }
1 2 3 4 5 6 7 输出结果: 2025年8月17日 15:20:15 2025年8月17日 15:20:16 2025年8月17日 15:20:17 2025年8月17日 15:20:18 2025年8月17日 15:20:19
使用CompletableFuture 使用Future
获得异步执行结果时,要么调用阻塞方法get()
,要么轮询看isDone()
是否为true
,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture
,它针对Future
做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class CompFu { public static void main (String[] args) throws InterruptedException { CompletableFuture<Double> cf = CompletableFuture.supplyAsync(CompFu::getP); cf.thenAccept((result) -> { System.out.println("price: " + result); }); cf.exceptionally((e) -> { e.printStackTrace(); return null ; }); Thread.sleep(2000 ); } public static double getP () { try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } if (Math.random() < 0.3 ) { throw new RuntimeException ("fetch price failed!" ); } return 5 + Math.random() * 20 ; } }
创建一个CompletableFuture
是通过CompletableFuture.supplyAsync()
实现的,它需要一个实现了Supplier
接口的对象:
1 2 3 public interface Supplier <T> { T get () ; }
这里我们用lambda语法简化了一下,直接传入Main::fetchPrice
,因为Main.fetchPrice()
静态方法的签名符合Supplier
接口的定义(除了方法名外)。
紧接着,CompletableFuture
已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture
完成时和异常时需要回调的实例。完成时,CompletableFuture
会调用Consumer
对象:
1 2 3 public interface Consumer <T> { void accept (T t) ; }
异常时,CompletableFuture
会调用Function
对象:
1 2 3 public interface Function <T, R> { R apply (T t) ; }
这里我们都用lambda语法简化了代码。
可见CompletableFuture
的优点是:
异步任务结束时,会自动回调某个对象的方法;
异步任务出错时,会自动回调某个对象的方法;
主线程设置好回调后,不再关心异步任务的执行。
ComletableFuture 和Future 相比起来优势是ComletableFuture可以串行执行。可以根据流程一步一步往下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void main (String[] args) throws InterruptedException { CompletableFuture<Long> cfQueryId = CompletableFuture.supplyAsync(()->{ return 1l ; }); Map<Long,String> map = new HashMap <>(); map.put(1l ,"aaa" ); CompletableFuture<String>cfQueryById = cfQueryId.thenApplyAsync((id)->{ System.out.println("获取Id:" +id); return map.get(id); }); cfQueryById.thenAccept((res)->{ System.out.println(res); }); cfQueryById.exceptionally((e)->{ e.printStackTrace(); return null ; }); }
此外出了串行功能它还能提供并行。 比如从a,b方法并行获取某个代码,只要获取成功其中一个,那就调用c方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class CompFU1 { public static void main (String[] args) throws InterruptedException { CompletableFuture<Long> getIdByMethod1 = CompletableFuture.supplyAsync(()->getId("method1" )); CompletableFuture<Long> getIdByMethod2 = CompletableFuture.supplyAsync(()->getId("method2" )); CompletableFuture<Object> getId = CompletableFuture.anyOf(getIdByMethod1,getIdByMethod2); CompletableFuture<String>cfFetch=getId.thenApply((res)->{ return getById((Long) res); }); cfFetch.thenAccept((res)->{ System.out.println(res); }); Thread.sleep(2000 ); } public static Long getId (String param) { System.out.println("source from " + param); try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 1l ; } public static String getById (Long id) { Map<Long,String> map = new HashMap <>(); map.put(1l ,"aaa" ); return map.get(id); } }
除了anyOf()
可以实现“任意个CompletableFuture
只要一个成功”,allOf()
可以实现“所有CompletableFuture
都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。
最后我们注意CompletableFuture
的命名规则:
xxx()
:表示该方法将继续在已有的线程中执行;
xxxAsync()
:表示将异步在线程池中执行。
使用ForkJoin Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。
Fork/Join任务的原理 :判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。 利用了分治的思想,实现起来和二分差不多。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 package com.example.demo.threadPoor;import java.util.Random;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.RecursiveTask;public class ForkJ { public static void main (String[] args) { long [] arr = new long [2000 ]; Random random = new Random (); long res1 = 0 ; for (int i = 0 ; i < arr.length; i++) { arr[i] = random.nextLong(100 ); res1 += arr[i]; } System.out.println("Exception res: " +res1); ForkJoinTask<Long> task = new SumTask (arr,0 ,arr.length); long startTime = System.currentTimeMillis(); Long result = ForkJoinPool.commonPool().invoke(task); long endTime = System.currentTimeMillis(); System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms." ); } } class SumTask extends RecursiveTask <Long> { static final int THRESHOLD = 500 ; long [] array; int start; int end; SumTask(long [] array, int start, int end) { this .array = array; this .start = start; this .end = end; } @Override protected Long compute () { if (end-start<THRESHOLD){ System.out.println("线程:" +Thread.currentThread().getName()); long sum = 0 ; for (int i=start;i<end;i++){ sum+=array[i]; try { Thread.sleep(1 ); } catch (InterruptedException e) { } } return sum; } int mid = start+end>>1 ; System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d" , start, end, start, mid, mid, end)); SumTask sumTask1 = new SumTask (this .array,start,mid); SumTask sumTask2 = new SumTask (this .array,mid,end); invokeAll(sumTask1,sumTask2); Long stRes1 = sumTask1.join(); Long stRes2 = sumTask2.join(); Long res = stRes1+stRes2; System.out.println("result = " + stRes1 + " + " + stRes2 + " ==> " + res); return res; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Exception res: 98589 split 0~2000 ==> 0~1000, 1000~2000 split 0~1000 ==> 0~500, 500~1000 split 0~500 ==> 0~250, 250~500 split 1000~2000 ==> 1000~1500, 1500~2000 split 500~1000 ==> 500~750, 750~1000 线程:ForkJoinPool.commonPool-worker-1 split 1000~1500 ==> 1000~1250, 1250~1500 线程:ForkJoinPool.commonPool-worker-4 线程:ForkJoinPool.commonPool-worker-2 线程:ForkJoinPool.commonPool-worker-5 线程:ForkJoinPool.commonPool-worker-3 split 1500~2000 ==> 1500~1750, 1750~2000 线程:ForkJoinPool.commonPool-worker-7 线程:ForkJoinPool.commonPool-worker-6 线程:ForkJoinPool.commonPool-worker-8 result = 12016 + 12618 ==> 24634 result = 11902 + 12429 ==> 24331 result = 12089 + 13162 ==> 25251 result = 11928 + 12445 ==> 24373 result = 24331 + 24634 ==> 48965 result = 24373 + 25251 ==> 49624 result = 49624 + 48965 ==> 98589 Fork/join sum: 98589 in 367 ms.
由于ForkJoinPool 能够根据系统资源情况自动调整线程数量,所以用户不需要手动指定线程数量。如果手动指定线程数量,可能会导致线程数不足或过多,从而影响并行计算的效率.
使用ThreadLocal 多线程是Java实现多任务的基础,Thread
对象代表一个线程,我们可以在代码中调用Thread.currentThread()
获取当前线程。例如,打印日志时,可以同时打印出当前线程的名字。 使用ThreadLocal 相当于在这个线程中添加了一个全局的缓存变量。可以随时取用,不需要在传参的时候作为参数传递.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class ThreadL { static ThreadLocal<String> threadLocalString = new ThreadLocal <>(); public static void main (String[] args) { ThreadL threadL = new ThreadL (); threadL.process("1111" ); } public void process (String s) { try { threadLocalString.set(s); doWork1(); } finally { threadLocalString.remove(); } doWork1(); } public void doWork1 () { String s = threadLocalString.get(); System.out.println("获得" +s); } }
最后,特别注意ThreadLocal
一定要在finally
中清除:
这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal
没有被清除,该线程执行其他代码时,会把上一次的状态带进去。
为了保证能释放ThreadLocal
关联的实例,我们可以通过AutoCloseable
接口配合try (resource) {...}
结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal
可以封装为一个StringContext
对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class StringContext implements AutoCloseable { static final ThreadLocal<String> key = new ThreadLocal <>(); private static final StringContext INSTANCE = new StringContext (); private StringContext () {} public static StringContext getInstance () { return INSTANCE; } public void set (String s) { key.set(s); } public String get () { return key.get(); } @Override public void close () throws Exception { } }
执行方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class ThreadLContext { public static void main (String[] args) { Thread thread = new Thread (()->{ StringContext.getInstance().set("1" ); process(); }); Thread thread1 = new Thread (()->{ StringContext.getInstance().set("2" ); process(); }); thread.start(); thread1.start(); } public static void process () { try { Thread.sleep(2000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println(StringContext.getInstance().get()); }
使用虚拟线程 当传统线程遇到了IO密集型的任务,比如:HTTP请求,文件读写受网络,磁盘速度影响线程会等待。这样就会浪费线程资源,导致了CPU的利用率其实没有达到理想状态。
而虚拟线程则是在线程遇到I/O阻塞时,会放暂时放弃等待去做其他事,等I/O完成它才会回来继续执行任务,无疑这种方式提高了CPU的利用率,让它无法再偷懒。 虚拟线程是由JVM来管理的,不由系统管理,并且它十分轻量
,虚拟线程之间的切换开销十分的小,所以你甚至可以开启上百万个虚拟线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 package com.example.demo.threadPoor;import java.time.Duration;import java.time.Instant;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.stream.IntStream;public class ThreadVirtual { private static final int TASK_COUNT = 1000 ; private static final int IO_DELAY_MS = 100 ; public static void main (String[] args) throws InterruptedException { System.out.println("=== 虚拟线程 vs 平台线程 IO密集型任务对比 ===\n" ); testVirtualThreads(); System.out.println(); testPlatformThreads(); } private static void testVirtualThreads () throws InterruptedException { System.out.println(" 虚拟线程测试 - " + TASK_COUNT + " 个任务" ); Instant start = Instant.now(); try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { IntStream.range(0 , TASK_COUNT).forEach(i -> { executor.submit(() -> { try { Thread.sleep(IO_DELAY_MS); int result = i * i; if (i < 10 ) { System.out.printf("虚拟线程任务 %d 完成,结果: %d, 线程: %s%n" , i, result, Thread.currentThread().getName()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); }); } Duration duration = Duration.between(start, Instant.now()); System.out.printf("虚拟线程总耗时: %d 毫秒%n" , duration.toMillis()); } private static void testPlatformThreads () throws InterruptedException { System.out.println(" 平台线程测试 - " + TASK_COUNT + " 个任务" ); Instant start = Instant.now(); try (ExecutorService executor = Executors.newFixedThreadPool(200 )) { IntStream.range(0 , TASK_COUNT).forEach(i -> { executor.submit(() -> { try { Thread.sleep(IO_DELAY_MS); int result = i * i; if (i < 10 ) { System.out.printf("平台线程任务 %d 完成,结果: %d, 线程: %s%n" , i, result, Thread.currentThread().getName()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); }); executor.shutdown(); executor.awaitTermination(30 , TimeUnit.SECONDS); } Duration duration = Duration.between(start, Instant.now()); System.out.printf(" 平台线程总耗时: %d 毫秒%n" , duration.toMillis()); } }
总而言之,在拥有大量线程IO导致线程堵塞,cpu利用率不高时,可以使用虚拟线程来解决。