线程同步

概念:

当多个线程同时运行时,先后顺序有系统去调动,程序无法决定。因此,在多线程环境下就可能造成一些问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Main {
public static void main(String[] args) throws InterruptedException {
var a = new ThreadA();
var b = new ThreadB();
a.start();
b.start();
a.join();
b.join();
System.out.println(Counter.counter);
}

}
class Counter{
public static int counter;
}
class ThreadA extends Thread{
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
Counter.counter++;
}
}
}
class ThreadB extends Thread{
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
Counter.counter--;
}
}
}

在这个代码中如果按照正常人逻辑来看这个最后的结果为0,但是最后的结果每次都会不一样。因为这些操作顺序都不是固定的。比如一开始counter是0执行一次++的途中执行完毕了两次–,此时counter是-2但是我counter++是拿0去加最后变回了1赋值上去了。
对于一条语句n=n+1

执行了:

1
2
3
ILOAD
IADD
ISTORE

解决方案:

加锁,保证操作的原子性。synchronized

1
2
3
4
5
6
7
8
9
10
class Counter{
public static final Object lock = new Object();
public static int counter;
}
synchronized (Counter.lock){
Counter.counter++;
}
synchronized (Counter.lock){
Counter.counter--;
}

这样保证了原子性。缺点就是性能会下降

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class SharedList {
volatile String str = " ";
AtomicBoolean state = new AtomicBoolean(false);

void write() {
while (true) {
if (state.compareAndSet(false, true)) {
str = "abc";
} else if (state.compareAndSet(true, false)) {
str = "123";
}

}
}

void read() {
while (true) {
synchronized (str) {
System.out.print(str.charAt(0) + " ");
System.out.print(str.charAt(1) + " ");
System.out.println(str.charAt(2));
}
}
}
}

public class Demo {
public static void main(String[] args) {
SharedList sharedList = new SharedList();
new Thread(sharedList::write).start();
new Thread(sharedList::read).start();
}
}

对于这段代码他还是会输出
a 2 c
a b c
a b 3
1 b 3
a 2 3
明明string 是线性安全的但是为什么会这样呢?

因为当前锁的是str的那个对象。由于string 里是这样的private final byte[] value;
所以在赋值操作时实际上是new 了一个string。锁住的对象不同那当然没办法正确的输出

死锁

Java的线程锁是可重入的锁。
意思就是当我这个方法获得这个锁时可以再利用这个锁进入另一个方法。这种能被同一个线程反复获取的锁,就叫做可重入锁
这种锁不仅要判断是否是第一次获取,还要记录是第几次获取。每获取一次记录+1.每退出一次记录-1.相当于入度。减到0是才会释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Counter {
private int count = 0;

public synchronized void add(int n) {
if (n < 0) {
dec(-n);
} else {
count += n;
}
}

public synchronized void dec(int n) {
count += n;
}
}

作用:在同一个进程内防止自锁死。如果a方法上了锁,b方法也有个加锁。a调用b方法。如果没有这个可重入的锁的东西那么就会自己给自己卡死。有了这个东西,这样一来,无论是哪个线程调用b方法都不会给自己卡死。

死锁

一个线程可以获取一个锁后,再继续获取另一个锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
AtomicInteger k= new AtomicInteger();
Thread thread1 = new Thread(()->{
synchronized (lockA){
k.getAndIncrement();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (lockB){

k.getAndDecrement();
}
}
});


Thread thread2 = new Thread(()->{
synchronized (lockB){
k.getAndDecrement();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (lockA){
k.getAndIncrement();
}
}
});
thread1.start();
thread2.start();

如何解决?

保证逻辑不冲突就行,比如Thread2改成

1
2
3
4
5
6
7
8
9
10
11
12
13
Thread thread2 = new Thread(()->{
synchronized (lockA){
k.getAndDecrement();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (lockB){
k.getAndIncrement();
}
}
});

使用wait和notify

简单来说当线程使用**wait()的时候会释放锁。比如this.wait()就会释放this。然后等待其他线程去唤醒他,使用notify()notifyAll()**去唤醒锁。
使用notifyAll()将唤醒所有当前正在this锁等待的线程,而notify()只会唤醒其中一个(具体哪个依赖操作系统,有一定的随机性)。这是因为可能有多个线程正在getTask()方法内部的wait()中等待,使用notifyAll()将一次性全部唤醒。通常来说,notifyAll()更安全。有些时候,如果我们的代码逻辑考虑不周,用notify()会导致只唤醒了一个线程,而其他线程可能永远等待下去醒不过来了。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class TaskQueue{
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s){
queue.add(s);
this.notifyAll();
}

public synchronized String getTask() throws InterruptedException {
while (queue.isEmpty()){
this.wait();
}
return queue.remove();
}
}

创造一个任务队列,里面方法add和get有加锁。如果没有这个wait()那么执行getTask()方法时就会进入 while (queue.isEmpty())死循环一直不释放锁。这样程序就不会结束了。

当使用 this.wait();时这个方法会把this的锁给释放掉。此时addTask可以进入使用add方法,接着唤醒所有的锁。所有的getTask()被唤醒再去抢夺这个锁,继续执行。

ReentrantLock

从Java 5开始,引入了一个高级的处理并发的java.util.concurrent包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。

java中synchronized用于加锁,但是这个锁操作不太方便,获取时必须等待,没有额外的尝试机制。
java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁。

使用ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Counter1{
private final Lock lock = new ReentrantLock();
private int count;

public void add(int n) throws InterruptedException {
lock.lock();
try {
System.out.println("+"+n);
count+=n;
}finally {
lock.unlock();
}

}
}

ReentrantLock是可重入锁,它和synchronized一样,一个线程可以多次获取同一个锁

ReentrantLock可以尝试获取锁

1
2
3
4
5
6
7
8
9
10
11
12
public void add(int n) throws InterruptedException {
if(lock.tryLock(2, TimeUnit.SECONDS)){
lock.lock();
try {
System.out.println("+"+n);
count+=n;
Thread.sleep(5000);
}finally {
lock.unlock();
}
}

此处为如果2s后仍未获得到锁,tryLock()会返回false程序就可以做一些额外处理,而不是无限等待下去。

所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。

Condition

synchronized中我们使用wait()和notify()去实现不满足条件时等待,满足条件时唤醒。
ReentrantLock 中同样的也有实现方法:Condition

Condition提供的await()signal()signalAll()原理和synchronized锁对象的wait()notify()notifyAll()是一致的,并且其行为也是一样的:

  • await()会释放当前锁,进入等待状态;
  • signal()会唤醒某个等待线程;
  • signalAll()会唤醒所有等待线程;
  • 唤醒线程从await()返回后需要重新获得锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class TaskQueue1{
Queue<String> queue = new LinkedList<>();
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void addTask(String s){
lock.lock();
try{
queue.add(s);
condition.signalAll();
}finally {
lock.unlock();
}
}

public String getTask() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();

}finally {
lock.unlock();
}

}
}

tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来

1
2
3
4
5
if (condition.await(1, TimeUnit.SECOND)) {
// 被其他线程唤醒
} else {
// 指定时间内没有被其他线程唤醒
}

ReadWriteLock(读写锁)

单纯的使用lock在某些情况会导致保护过头。比如读写操作。如果只是大量并发读操作根本没必要加锁卡住其他的读操作。只需要在有读操作时卡住其他线程的读写操作就行。
因此引入了读写锁ReadWriteLock
基本实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Counter2{
public int[] counts = new int[10];
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final Lock rLock = lock.readLock();
private final Lock wLock = lock.writeLock();

public void add(int id){
wLock.lock();
try {
counts[id]++;
}finally {
wLock.unlock();
}
}
public void get(int id){
rLock.lock();
try {
System.out.println(counts[id]);
}finally {
rLock.unlock();
}
}

}

可以理解为当写的操作在进行时其他相同类型的lock就会阻塞(包括他自己)。但是在读操作进行时可以进来相同的读操作(写不行)。

StampedLock

对比于ReadWriteLock(读写锁)来说性能更提升了一步。在ReadWriteLock中当你在读取的时候是无法进行写操作的,这是一种悲观的读锁。

要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock

这样以来就可以在读的过程中插入写操作。不过这样的话需要额外判断并处理这次写操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Point{
private double x;
private double y;

private final StampedLock stampedLock = new StampedLock();

public void move(double xx, double yy){
long stamp = stampedLock.writeLock(); //获取写锁
try {
x+=xx;
y+=yy;
}finally {
stampedLock.unlockWrite(stamp);
}
}

public double distanceFromOrigin(){
long stamp = stampedLock.tryOptimisticRead(); //乐观读锁

double currentX = x;
double currentY = y;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if(!stampedLock.validate(stamp)){
System.out.println("触发写");
stamp = stampedLock.readLock();//悲观读锁
try {
currentX = x;
currentY = y;
}finally {
stampedLock.unlockRead(stamp);
}

}
return Math.sqrt(currentX*currentX+currentY*currentY);
}
}

Semaphore

Semaphore是一种可以控制资源受限的锁。他能保证同一时刻最多只有N个线程能访问。像前面的(ReentrantLock)只能同一时间只能一个资源访问。读写锁(ReadWriteLock)只能一个线程写入,但是读不受控制。
如果用Lock数组来实现就太麻烦了
所以这里引入Semaphore。可以直接设置能有几个线程同时访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class AccessLimit{
final Semaphore semaphore = new Semaphore(3);

public String access() throws InterruptedException {
semaphore.acquire();
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
return UUID.randomUUID().toString();
}finally {
semaphore.release();
}
}

public String tryAccess() throws InterruptedException {
if(semaphore.tryAcquire(5, TimeUnit.SECONDS)){
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
return UUID.randomUUID().toString();
}finally {
semaphore.release();
}
}
return null;
}
}

和ReentrantLock一样他是有try方法的

不同的是他并不是一个可重入的锁。每次semaphore.acquire();都会消耗一个。

Concurrent集合

之前使用了ReentrantLock + Condition 制作了一个BlockingQueue。
BlockingQueue的意思就是说,当一个线程调用这个TaskQueuegetTask()方法时,该方法内部可能会让线程变成等待状态,直到队列条件满足不为空,线程被唤醒后,getTask()方法才会返回。

因为BlockingQueue非常有用,所以我们不必自己编写,可以直接使用Java标准库的java.util.concurrent包提供的线程安全的集合:ArrayBlockingQueue

除了BlockingQueue外,针对ListMapSetDeque等,java.util.concurrent包也提供了对应的并发集合类。

例如java.util.concurrent.ConcurrentHashMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Map<String,Integer> mapThreadSafe = new ConcurrentHashMap<>();
Map<String,Integer> mapThreadUnsafe = new HashMap<>();

// 增加操作复杂度和并发压力
for(int i = 0; i < 1000; i++) {
final int index = i;
new Thread(() -> {
// 大量操作
for(int j = 0; j < 1000; j++) {
mapThreadSafe.put("key" + (index * 1000 + j), j);
mapThreadUnsafe.put("key" + (index * 1000 + j), j);
}
}).start();
}

Thread.sleep(2000); // 等待所有线程完成
System.out.println("Safe size: " + mapThreadSafe.size());
System.out.println("Unsafe size: " + mapThreadUnsafe.size());

输出结果如下

1
2
Safe size: 1000000
Unsafe size: 694319

原因

HashMap<>()并不是线程安全的类,也就是说它的put不能保证安全性。

1
2
3
4
5
6
HashMap的put方法大致流程:
1. 计算hash值
2. 定位数组位置
3. 检查是否需要扩容
4. 插入或更新节点
5. 更新size

也就是说很有可能在两个线程同时将数据插入的时候,插入到一个数组导致数据覆盖。或者在更新size的值的时候,原本是size()==100,两个线程同时更新size()最后变成101

Atomic

Java的java.util.concurrent包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic

AtomicInteger为例,它提供的主要操作有:

  • 增加值并返回新值:int addAndGet(int delta)
  • 加1后返回新值:int incrementAndGet()
  • 获取当前值:int get()
  • 用CAS方式设置:int compareAndSet(int expect, int update)

Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。

为什么可以通过无锁实现线程安全?

1
2
3
4
5
6
7
8
public int incrementAndGet(AtomicInteger var){
int pre , next;
do {
pre = var.get();
next = pre +1;
}while (!var.compareAndSet(pre,next));
return next;
}

这是AtomicInteger 的incrementAndGet的类似实现方式。
通过compareAndSet (CAS) 和do while()来实现无锁线程安全。
假设我的线程a在执行这个方法时已经获取了pre和next值。那么在dowhile的判断中根据和pre的比较,如果它没有被其他线程的操作改变数值那么就将这个AtomicInteger设置成next。

而CAS为什么能保证线性安全?
compareAndSet 确实没有加锁,但它是硬件层面的原子操作 .

-
CAS不是加锁,是硬件原子指令

  • 并发冲突时只有一个成功,其他重试
  • 最终结果正确:每次调用都会让值增加1
  • 无锁但线程安全:通过硬件原子性+重试机制实现