Java 协程(虚拟线程)
小轲

Java 虚拟协程是什么

  • 平台线程:传统 Java 线程是和操作系统线程 1:1 映射的,平台线程由操作系统负责调度。平台线程是一种重量级资源,受限于创建销毁线程的成本、内存、线程切换带来的影响,我们无法在单机上运行大量的线程
  • 虚拟线程:虚拟线程是由 Java 实现的,也是由 Java 来调度的,虚拟线程的执行会被挂载到平台线程上。Java 21 中用了一个固定大小(大小等于CPU核数)的 ForkJoinPool 来调度、运行虚拟线程。虚拟线程属于轻量级资源,我们可以在 JVM 中启动大量的虚拟线程

下面是虚拟线程的调度图:

image

使用

使用须知

  1. 要在io 密集型的场景下使用虚拟线程,阻塞后可以立即调度其他虚拟线程,cpu 计算型没有平台线程快
  2. 多线程传递需求,依旧可以使用阿里的 ttl 工具,使用装饰器模型进行装饰即可
    1. 问题如下:https://github.com/alibaba/transmittable-thread-local/issues/599
    2. TtlExecutors.getTtlExecutorService
  3. 不要在虚拟线程上使用 synchronized,要使用 java 官方优化过的Lock 接口 api,如申请阻塞后则会调度其他线程
    1. synchronized 将虚拟线程固定到了平台线程上,平台线程无法在发生 IO 阻塞时切换到其他虚拟线程上,因为synchronized 会导致虚拟线程pin 到平台线程
  4. 创建成本很低,无需池化虚拟线程,因为你没办法获取数量,池化完全没意义

demo code

创建虚拟线程调度器(forkjoinpool)

1
2
3
4
5
6
7
8

/**
* 虚拟线程执行器(类级别单例,支持 TTL 上下文传递)
* 虚拟线程池本身设计就是可以重用的,无需每次创建
*/
private static final ExecutorService VIRTUAL_THREAD_EXECUTOR =
TtlExecutors.getTtlExecutorService(Executors.newVirtualThreadPerTaskExecutor());

推荐与 ttl 一起使用,保证 threadlocal 的使用,如果不引用可以删除

1
2
3
4
5
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.2.0</version>
</dependency>

有返回值的使用

  • 使用CompletableFuture相关 api 来调度线程顺序,并且可以使用Future的有返回值 api
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public ExtLearningVO buildExtLearningData(Long studentId, Long parentId) {
log.info("buildExtLearningData, studentId={}, parentId={}", studentId, parentId);

try {
// 使用类级别的虚拟线程执行器并行执行三个独立的 build 方法
CompletableFuture<ExtLearningVO.CTDataVO> ctFuture = CompletableFuture.supplyAsync(
() -> buildCTData(studentId), VIRTUAL_THREAD_EXECUTOR);

CompletableFuture<ExtLearningVO.BookDataVO> bookFuture = CompletableFuture.supplyAsync(
() -> buildBookData(studentId), VIRTUAL_THREAD_EXECUTOR);

CompletableFuture<ExtLearningVO.LexileDataVO> lexileFuture = CompletableFuture.supplyAsync(
() -> buildLexileData(studentId, parentId), VIRTUAL_THREAD_EXECUTOR);

// 等待所有任务完成并聚合结果
CompletableFuture<Void> allFutures = CompletableFuture.allOf(ctFuture, bookFuture, lexileFuture);
allFutures.join();

// 构建聚合返回数据
return ExtLearningVO.builder()
.ct(ctFuture.join())
.book(bookFuture.join())
.lexile(lexileFuture.join())
.build();

} catch (Exception e) {
log.error("buildExtLearningData error, studentId={}", studentId, e);
throw e;
}
}

无返回值怎么用?

  • 直接使用execute() 的API即可
1
2
3
4
5
public void fireAndForget(Long studentId, Long parentId) {
VIRTUAL_THREAD_EXECUTOR.execute(() -> buildCTSideEffect(studentId));
VIRTUAL_THREAD_EXECUTOR.execute(() -> buildBookSideEffect(studentId));
VIRTUAL_THREAD_EXECUTOR.execute(() -> buildLexileSideEffect(studentId, parentId));
}

性能测试

测试代码

  1. 性能测试代码使用 Apple M3 16g 的机器进行测试
  2. 对比为虚拟线程+线程池(8核心线程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
package com.vipkid.international.web.hub.test;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;


import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 虚拟线程 vs 普通线程池性能对比测试
*
* 测试场景:
* 1. I/O密集型任务(模拟网络请求、数据库查询)
* 2. CPU密集型任务(计算密集操作)
* 3. 混合任务(I/O + CPU)
* 4. 大量并发任务
*/
@Slf4j
public class VirtualThreadVsThreadPoolPerformanceTest {

// 虚拟线程执行器
private static ExecutorService virtualThreadExecutor;

// 普通线程池执行器(核心线程8个)
private static ExecutorService threadPoolExecutor;

@BeforeAll
public static void setup() {
log.info("===============================================");
log.info("初始化测试环境");
log.info("===============================================");

// 创建虚拟线程执行器
virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();

// 创建普通线程池(核心线程8个,最大线程100个,队列容量1000)
threadPoolExecutor = new ThreadPoolExecutor(
8, // 核心线程数
100, // 最大线程数
60L, // 线程空闲存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(1000), // 阻塞队列
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "pool-thread-" + threadNumber.getAndIncrement());
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

log.info("虚拟线程执行器:已创建");
log.info("普通线程池配置:核心线程数=8, 最大线程数=100, 队列容量=1000");
}

@AfterAll
public static void teardown() {
log.info("===============================================");
log.info("清理测试环境");
log.info("===============================================");

if (virtualThreadExecutor != null) {
virtualThreadExecutor.shutdown();
}
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdown();
}
}

/**
* 测试1:I/O密集型任务性能对比
* 模拟场景:大量的网络请求、数据库查询等需要等待的操作
*/
@Test
public void testIOIntensiveTasks() throws Exception {
log.info("\n\n");
log.info("═══════════════════════════════════════════════");
log.info(" 测试1:I/O密集型任务性能对比");
log.info("═══════════════════════════════════════════════");

int taskCount = 1000;
int ioDelayMs = 100; // 模拟100ms的I/O延迟

log.info("测试参数:任务数量={}, 每个任务I/O延迟={}ms", taskCount, ioDelayMs);

// 测试虚拟线程
PerformanceResult virtualResult = executeIOTasks(
virtualThreadExecutor,
"虚拟线程",
taskCount,
ioDelayMs
);

// 等待一下,让系统稳定
Thread.sleep(1000);

// 测试普通线程池
PerformanceResult threadPoolResult = executeIOTasks(
threadPoolExecutor,
"普通线程池",
taskCount,
ioDelayMs
);

// 对比结果
printComparisonResult(virtualResult, threadPoolResult, "I/O密集型");
}

/**
* 测试2:CPU密集型任务性能对比
* 模拟场景:大量计算操作
*/
@Test
public void testCPUIntensiveTasks() throws Exception {
log.info("\n\n");
log.info("═══════════════════════════════════════════════");
log.info(" 测试2:CPU密集型任务性能对比");
log.info("═══════════════════════════════════════════════");

int taskCount = 100;
int computeIterations = 1000000; // 计算迭代次数

log.info("测试参数:任务数量={}, 每个任务计算迭代次数={}", taskCount, computeIterations);

// 测试虚拟线程
PerformanceResult virtualResult = executeCPUTasks(
virtualThreadExecutor,
"虚拟线程",
taskCount,
computeIterations
);

// 等待一下
Thread.sleep(1000);

// 测试普通线程池
PerformanceResult threadPoolResult = executeCPUTasks(
threadPoolExecutor,
"普通线程池",
taskCount,
computeIterations
);

// 对比结果
printComparisonResult(virtualResult, threadPoolResult, "CPU密集型");
}

/**
* 测试3:混合任务性能对比
* 模拟场景:既有I/O等待,又有CPU计算
*/
@Test
public void testMixedTasks() throws Exception {
log.info("\n\n");
log.info("═══════════════════════════════════════════════");
log.info(" 测试3:混合任务性能对比");
log.info("═══════════════════════════════════════════════");

int taskCount = 500;
int ioDelayMs = 50;
int computeIterations = 500000;

log.info("测试参数:任务数量={}, I/O延迟={}ms, 计算迭代={}",
taskCount, ioDelayMs, computeIterations);

// 测试虚拟线程
PerformanceResult virtualResult = executeMixedTasks(
virtualThreadExecutor,
"虚拟线程",
taskCount,
ioDelayMs,
computeIterations
);

// 等待一下
Thread.sleep(1000);

// 测试普通线程池
PerformanceResult threadPoolResult = executeMixedTasks(
threadPoolExecutor,
"普通线程池",
taskCount,
ioDelayMs,
computeIterations
);

// 对比结果
printComparisonResult(virtualResult, threadPoolResult, "混合任务");
}

/**
* 测试4:大量并发任务性能对比
* 测试在高并发场景下的表现
*/
@Test
public void testHighConcurrency() throws Exception {
log.info("\n\n");
log.info("═══════════════════════════════════════════════");
log.info(" 测试4:大量并发任务性能对比");
log.info("═══════════════════════════════════════════════");

int taskCount = 10000;
int ioDelayMs = 50;

log.info("测试参数:任务数量={}, I/O延迟={}ms", taskCount, ioDelayMs);

// 测试虚拟线程
PerformanceResult virtualResult = executeIOTasks(
virtualThreadExecutor,
"虚拟线程",
taskCount,
ioDelayMs
);

// 等待一下
Thread.sleep(2000);

// 测试普通线程池
PerformanceResult threadPoolResult = executeIOTasks(
threadPoolExecutor,
"普通线程池",
taskCount,
ioDelayMs
);

// 对比结果
printComparisonResult(virtualResult, threadPoolResult, "高并发场景");
}

/**
* 测试5:实际业务场景模拟
* 模拟一个请求需要调用3个外部接口(耗时80ms、100ms、120ms)
* 并发调用的总耗时为最长接口的耗时,即120ms
*/
@Test
public void testRealWorldScenario() throws Exception {
log.info("\n\n");
log.info("═══════════════════════════════════════════════");
log.info(" 测试5:实际业务场景模拟(每个请求调用3个接口)");
log.info("═══════════════════════════════════════════════");

int requestCount = 500;

log.info("测试参数:请求数量={}, 每个请求并发调用3个接口(耗时120ms)", requestCount);

// 测试虚拟线程
PerformanceResult virtualResult = executeRealWorldScenario(
virtualThreadExecutor,
"虚拟线程",
requestCount
);

// 等待一下
Thread.sleep(2000);

// 测试普通线程池
PerformanceResult threadPoolResult = executeRealWorldScenario(
threadPoolExecutor,
"普通线程池",
requestCount
);

// 对比结果
printComparisonResult(virtualResult, threadPoolResult, "业务场景");
}

// ==================== 执行方法 ====================

/**
* 执行I/O密集型任务
*/
private PerformanceResult executeIOTasks(ExecutorService executor, String executorName,
int taskCount, int ioDelayMs) throws Exception {
log.info("\n--- {} 开始执行 ---", executorName);

CountDownLatch latch = new CountDownLatch(taskCount);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger errorCount = new AtomicInteger(0);

long startTime = System.currentTimeMillis();
long startMemory = getUsedMemory();

// 提交所有任务
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
executor.submit(() -> {
try {
// 模拟I/O操作(网络请求、数据库查询等)
Thread.sleep(ioDelayMs);

// 简单的业务逻辑
String result = "Task-" + taskId + " completed";

successCount.incrementAndGet();
} catch (Exception e) {
errorCount.incrementAndGet();
log.error("任务执行失败: {}", taskId, e);
} finally {
latch.countDown();
}
});
}

// 等待所有任务完成
boolean completed = latch.await(120, TimeUnit.SECONDS);

long endTime = System.currentTimeMillis();
long endMemory = getUsedMemory();
long duration = endTime - startTime;
long memoryUsed = endMemory - startMemory;

log.info("--- {} 执行完成 ---", executorName);
log.info("总耗时: {}ms", duration);
log.info("成功任务数: {}", successCount.get());
log.info("失败任务数: {}", errorCount.get());
log.info("吞吐量: {} 任务/秒", String.format("%.2f", (taskCount * 1000.0) / duration));
log.info("内存使用: {} MB", String.format("%.2f", memoryUsed / 1024.0 / 1024.0));

return new PerformanceResult(
executorName,
taskCount,
duration,
successCount.get(),
errorCount.get(),
memoryUsed
);
}

/**
* 执行CPU密集型任务
*/
private PerformanceResult executeCPUTasks(ExecutorService executor, String executorName,
int taskCount, int computeIterations) throws Exception {
log.info("\n--- {} 开始执行 ---", executorName);

CountDownLatch latch = new CountDownLatch(taskCount);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger errorCount = new AtomicInteger(0);

long startTime = System.currentTimeMillis();
long startMemory = getUsedMemory();

// 提交所有任务
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
executor.submit(() -> {
try {
// 模拟CPU密集型计算
double result = 0;
for (int j = 0; j < computeIterations; j++) {
result += Math.sqrt(j) * Math.sin(j) + Math.cos(j);
}

successCount.incrementAndGet();
} catch (Exception e) {
errorCount.incrementAndGet();
log.error("任务执行失败: {}", taskId, e);
} finally {
latch.countDown();
}
});
}

// 等待所有任务完成
boolean completed = latch.await(120, TimeUnit.SECONDS);

long endTime = System.currentTimeMillis();
long endMemory = getUsedMemory();
long duration = endTime - startTime;
long memoryUsed = endMemory - startMemory;

log.info("--- {} 执行完成 ---", executorName);
log.info("总耗时: {}ms", duration);
log.info("成功任务数: {}", successCount.get());
log.info("失败任务数: {}", errorCount.get());
log.info("吞吐量: {} 任务/秒", String.format("%.2f", (taskCount * 1000.0) / duration));
log.info("内存使用: {} MB", String.format("%.2f", memoryUsed / 1024.0 / 1024.0));

return new PerformanceResult(
executorName,
taskCount,
duration,
successCount.get(),
errorCount.get(),
memoryUsed
);
}

/**
* 执行混合任务
*/
private PerformanceResult executeMixedTasks(ExecutorService executor, String executorName,
int taskCount, int ioDelayMs,
int computeIterations) throws Exception {
log.info("\n--- {} 开始执行 ---", executorName);

CountDownLatch latch = new CountDownLatch(taskCount);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger errorCount = new AtomicInteger(0);

long startTime = System.currentTimeMillis();
long startMemory = getUsedMemory();

// 提交所有任务
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
executor.submit(() -> {
try {
// 1. 模拟I/O操作
Thread.sleep(ioDelayMs);

// 2. 模拟CPU计算
double result = 0;
for (int j = 0; j < computeIterations; j++) {
result += Math.sqrt(j) * Math.sin(j);
}

// 3. 再次I/O操作
Thread.sleep(ioDelayMs / 2);

successCount.incrementAndGet();
} catch (Exception e) {
errorCount.incrementAndGet();
log.error("任务执行失败: {}", taskId, e);
} finally {
latch.countDown();
}
});
}

// 等待所有任务完成
boolean completed = latch.await(120, TimeUnit.SECONDS);

long endTime = System.currentTimeMillis();
long endMemory = getUsedMemory();
long duration = endTime - startTime;
long memoryUsed = endMemory - startMemory;

log.info("--- {} 执行完成 ---", executorName);
log.info("总耗时: {}ms", duration);
log.info("成功任务数: {}", successCount.get());
log.info("失败任务数: {}", errorCount.get());
log.info("吞吐量: {} 任务/秒", String.format("%.2f", (taskCount * 1000.0) / duration));
log.info("内存使用: {} MB", String.format("%.2f", memoryUsed / 1024.0 / 1024.0));

return new PerformanceResult(
executorName,
taskCount,
duration,
successCount.get(),
errorCount.get(),
memoryUsed
);
}

/**
* 执行真实业务场景
* 模拟每个请求需要并发调用3个外部接口
* 3个接口分别耗时80ms、100ms、120ms,并发调用总耗时120ms
*/
private PerformanceResult executeRealWorldScenario(ExecutorService executor,
String executorName,
int requestCount) throws Exception {
log.info("\n--- {} 开始执行 ---", executorName);

CountDownLatch latch = new CountDownLatch(requestCount);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger errorCount = new AtomicInteger(0);

long startTime = System.currentTimeMillis();
long startMemory = getUsedMemory();

// 每个请求
for (int i = 0; i < requestCount; i++) {
final int requestId = i;

executor.submit(() -> {
try {
// 模拟并发调用3个接口
// 实际业务中是并发的,总耗时=max(80,100,120)=120ms
Thread.sleep(120);

// 模拟合并结果的业务逻辑
String result = "API1-Result,API2-Result,API3-Result";

successCount.incrementAndGet();
} catch (Exception e) {
errorCount.incrementAndGet();
log.error("请求执行失败: {}", requestId, e);
} finally {
latch.countDown();
}
});
}

// 等待所有请求完成
boolean completed = latch.await(180, TimeUnit.SECONDS);

long endTime = System.currentTimeMillis();
long endMemory = getUsedMemory();
long duration = endTime - startTime;
long memoryUsed = endMemory - startMemory;

log.info("--- {} 执行完成 ---", executorName);
log.info("总耗时: {}ms", duration);
log.info("成功请求数: {}", successCount.get());
log.info("失败请求数: {}", errorCount.get());
log.info("吞吐量: {} 请求/秒", String.format("%.2f", (requestCount * 1000.0) / duration));
log.info("内存使用: {} MB", String.format("%.2f", memoryUsed / 1024.0 / 1024.0));

return new PerformanceResult(
executorName,
requestCount,
duration,
successCount.get(),
errorCount.get(),
memoryUsed
);
}

// ==================== 辅助方法 ====================

/**
* 获取当前使用的内存(字节)
*/
private long getUsedMemory() {
Runtime runtime = Runtime.getRuntime();
return runtime.totalMemory() - runtime.freeMemory();
}

/**
* 打印对比结果
*/
private void printComparisonResult(PerformanceResult virtualResult,
PerformanceResult threadPoolResult,
String testType) {
log.info("\n");
log.info("╔═══════════════════════════════════════════════╗");
log.info("║ {} 性能对比结果 ", testType);
log.info("╠═══════════════════════════════════════════════╣");
log.info("║ 指标 虚拟线程 普通线程池");
log.info("╟───────────────────────────────────────────────╢");
log.info("║ 总耗时(ms) {} {}",
String.format("%10d", virtualResult.duration),
String.format("%10d", threadPoolResult.duration));
log.info("║ 吞吐量(任务/秒) {} {}",
String.format("%10.2f", virtualResult.getThroughput()),
String.format("%10.2f", threadPoolResult.getThroughput()));
log.info("║ 成功任务数 {} {}",
String.format("%10d", virtualResult.successCount),
String.format("%10d", threadPoolResult.successCount));
log.info("║ 失败任务数 {} {}",
String.format("%10d", virtualResult.errorCount),
String.format("%10d", threadPoolResult.errorCount));
log.info("║ 内存使用(MB) {} {}",
String.format("%10.2f", virtualResult.getMemoryUsedMB()),
String.format("%10.2f", threadPoolResult.getMemoryUsedMB()));
log.info("╠═══════════════════════════════════════════════╣");

// 计算性能提升百分比
double timeImprovement = ((threadPoolResult.duration - virtualResult.duration) * 100.0)
/ threadPoolResult.duration;
double throughputImprovement = ((virtualResult.getThroughput() - threadPoolResult.getThroughput()) * 100.0)
/ threadPoolResult.getThroughput();

if (timeImprovement > 0) {
log.info("║ 虚拟线程耗时减少:{}%", String.format("%.2f", timeImprovement));
} else {
log.info("║ 虚拟线程耗时增加:{}%", String.format("%.2f", Math.abs(timeImprovement)));
}

if (throughputImprovement > 0) {
log.info("║ 虚拟线程吞吐量提升:{}%", String.format("%.2f", throughputImprovement));
} else {
log.info("║ 虚拟线程吞吐量降低:{}%", String.format("%.2f", Math.abs(throughputImprovement)));
}

log.info("╚═══════════════════════════════════════════════╝");
log.info("\n");
}

// ==================== 数据类 ====================

/**
* 性能测试结果
*/
@Data
@AllArgsConstructor
private static class PerformanceResult {
private String executorName;
private int taskCount;
private long duration; // 总耗时(毫秒)
private int successCount; // 成功任务数
private int errorCount; // 失败任务数
private long memoryUsed; // 内存使用(字节)

/**
* 计算吞吐量(任务/秒)
*/
public double getThroughput() {
if (duration == 0) return 0;
return (taskCount * 1000.0) / duration;
}

/**
* 获取内存使用(MB)
*/
public double getMemoryUsedMB() {
return memoryUsed / 1024.0 / 1024.0;
}
}
}

测试报告

测试场景 执行器 任务数 总耗时(ms) 吞吐量(任务/秒) 成功 内存(MB) 性能提升 推荐场景
I/O密集型(100ms) 虚拟线程 1000 105 9,523.81 1000 1.32 耗时↓89.91% ✅ 数据库查询、HTTP请求
普通线程池 1000 1,041 960.61 1000 0.00 吞吐量↑891.43%
CPU密集型(100万次) 虚拟线程 100 209 478.47 100 0.36 耗时↑4.50%
普通线程池 100 200 500.00 100 0.00 吞吐量↓4.31% ✅ 图像处理、加密计算
混合任务(I/O+CPU) 虚拟线程 500 323 1,547.99 500 2.22 耗时↓94.15% ✅ 查询后处理、分页查询
普通线程池 500 5,524 90.51 500 0.88 吞吐量↑1,610.22%
高并发(10000任务) 虚拟线程 10000 83 120,481.93 10000 14.24 耗时↓98.45% ✅ Web服务、消息队列消费
普通线程池 10000 5,350 1,869.16 10000 10.76 吞吐量↑6,345.78%
业务场景(3接口) 虚拟线程 500 129 3,875.97 500 1.19 耗时↓79.36% ✅ 微服务聚合、并行调用
普通线程池 500 625 800.00 500 0.00 吞吐量↑384.50%

结构化并发和作用域值

在引入虚拟线程后,Java 的并发成本被大幅降低,但如何安全、清晰地组织多个并发任务,以及如何在并发任务之间传递上下文信息,仍然是工程实践中的难点。

为此,JDK 21 进一步提供了 Structured Concurrency(结构化并发)Scoped Values(作用域值) 两个预览特性,用于简化多任务并发编排模型(需通过 --enable-preview 启用)。以下内容仅用于理解 Java 并发模型的未来演进方向。

  • 结构化并发:将一组并发任务组织在明确的作用域内,统一管理其生命周期和异常,使并发行为像普通代码块一样可控。
  • 作用域(Scoped Values):通过受限作用域显式传递只读上下文数据,避免 ThreadLocal 带来的隐式共享和生命周期失控问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 定义一个作用域级上下文变量,用于替代 ThreadLocal
static final ScopedValue<String> REQUEST_ID = ScopedValue.newInstance();

// 在当前作用域内绑定上下文值,作用范围仅限于 run 方法
ScopedValue.where(REQUEST_ID, "req-123").run(() -> {

// 创建结构化并发作用域,任一子任务失败则整体失败
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

// 启动子任务 A,自动继承当前作用域的 ScopedValue
var a = scope.fork(() -> {
Thread.sleep(100); // 模拟 I/O 等待
return "A(" + REQUEST_ID.get() + ")";
});

// 启动子任务 B,同样可以安全获取上下文信息
var b = scope.fork(() -> {
Thread.sleep(200); // 模拟 I/O 等待
return "B(" + REQUEST_ID.get() + ")";
});

// 等待所有子任务完成
scope.join();

// 若任一子任务失败,则抛出异常并取消其他任务
scope.throwIfFailed();

// 在作用域内安全地获取子任务结果
System.out.println(a.resultNow() + ", " + b.resultNow());
}
});

参考

  1. https://zhuanlan.zhihu.com/p/696243786聊聊 Java 21 虚拟线程
  2. https://liaoxuefeng.com/books/java/threading/virtual-thread/index.html (使用虚拟线程)
  3. https://www.cnblogs.com/echo1937/p/18969306 (Java 的作用域值(Scoped Values)和结构化并发(Structured Concurrency)
 评论
评论插件加载失败
正在加载评论插件