250929-Java-Semaphore信号量多线程并发控制

多线程并发信号量控制逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**/
private static final ConcurrentHashMap<String, Long> LOCK_DTO_MAP = new ConcurrentHashMap<>();

private static final ConcurrentHashMap<String, Semaphore> SEMAPHORE_MAP =
new ConcurrentHashMap<>();

private static final long TIMEOUT_DURATION = 60_000; // 60 seconds

public static boolean lock(String key) {
Semaphore semaphore = SEMAPHORE_MAP.computeIfAbsent(key, k -> new Semaphore(1, true));
boolean acquired = false;

try {
// 1. 拿到许可证
acquired = semaphore.tryAcquire(1, TimeUnit.SECONDS);
Preconditions.checkState(acquired, "获取锁超时: " + key);

// 2. 现在互斥了,再操作被保护资源
Long old = LOCK_DTO_MAP.putIfAbsent(key, System.currentTimeMillis());
Preconditions.checkState(old == null, "Lock already held by this thread for key: " + key);

return true;
} catch (Exception ignored) {
} finally {
/* 4. 关键:只有真正拿到过许可的线程才需要“善后” */
if (acquired) {
/* 4.1 判断是否超时,超时就当“没发生过”直接删记录 */
try {
long holdTime = System.currentTimeMillis() - LOCK_DTO_MAP.get(key);
if (holdTime > TIMEOUT_DURATION) {
LOCK_DTO_MAP.remove(key); // 删记录" 超时,记录+许可证已清理"
}
} finally {
semaphore.release(); // 还许可证
}
}
}
return false;
}

/** 清空StripedLockUtils的所有数据 */
public static void clearAppRestart() {
for (String key : LOCK_DTO_MAP.keySet()) {
StreamUtils.silentException(
() -> {
unlock(key);
SEMAPHORE_MAP.remove(key);
});
}
}

public static void unlock(String key) {
StreamUtils.silentException(() -> LOCK_DTO_MAP.remove(key));
}