线程安全问题

两个基本概念

临界区

  • 多个线程读取共享资源的时候不会出现问题
  • 但是多个线程对共享资源进行写操作的时候可能会出现指令的交错的问题
  • 如果一段代码内存在对于共享资源的多线程读写操作,那么我们称这段代码为临界区

竞态条件

多个线程在临界区内执行,由于代码的执行序列不同导致结果无法预测,称之为发生了竞态条件

synchronized

对象锁,他采用互斥的方式让同一时刻最多只有一个线程能够持有对象锁,其他线程想要读写临界区中的资源,就会进入阻塞状态,不用担心因为线程上下文切换而引发的指令交错问题。

1
2
3
synchronized(对象) {
// 临界区
}

img

原理:用 对象锁 确保了临界区内代码的原子性

也可以在方法上加上 synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
class Test{
public synchronized void test() {

}
}
等价于
class Test{
public void test() {
synchronized(this) {

}
}
}

注意:

如果加的锁的对象是一致的话,才能够实现原子性,不同锁的对象相当于不加锁,例如:

1
2
3
4
5
6
7
synchronized(A.class) {
// 临界区
}

synchronized(B.class) {
// 临界区
}

无法实现线程安全,因为他们对于锁的对象的限制是不同的。

同样的,对于 static 方法,也需要注意,因为他们如果加在方法上,说明他们锁的是类对象,而不是实例对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Number{
public static synchronized void a() {
// 临界区
}
}

class main{
public static void main(String[] args) {
Number num1 = new Number();
new Thread(() -> {
synchronized(num1) {
// 临界区
}
});
new Thread(() -> {
Number.a();
})
}
}

这段代码也没有实现线程安全问题的解决,因为他们加锁的对象不是同一个,其中一个是实例对象类型,一个是 class 类型

变量的线程安全问题

成员变量和局部变量是否线程安全

  • 如果他们没有共享,则他们是线程安全的。

  • 如果他们被共享了:

    • 如果只有只读的操作,则他们线程安全
    • 如果有读写操作,则这段代码是临界区,需要考虑线程安全问题。

局部变量是否线程安全

  • 局部变量是线程安全的

  • 但局部变量引用的对象不一定

    • 如果该对象没有逃离方法的作用访问,那么他是线程安全的
    • 如果该对象逃离方法的作用范围,那么需要考虑线程安全问题(可能同时被其他类引用)

常见线程安全类

  • String
  • Integer
  • StringBuffer
  • Random
  • Vector
  • HashTable
  • java.util.concurrent 包下的类

他们的方法都是原子的,因为涉及到操作系统底层的内容

Monitor

mark Word对象标记,存储对于该对象的加锁情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
|-------------------------------------------------------|--------------------|
| Mark Word (32 bits) | State |
|-------------------------------------------------------|--------------------|
| hashcode:25 | age:4 | biased_lock:0 | 01 | Normal |
|-------------------------------------------------------|--------------------|
| thread:23 | epoch:2 | age:4 | biased_lock:1 | 01 | Biased |
|-------------------------------------------------------|--------------------|
| ptr_to_lock_record:30 | 00 | Lightweight Locked |
|-------------------------------------------------------|--------------------|
| ptr_to_heavyweight_monitor:30 | 10 | Heavyweight Locked |
|-------------------------------------------------------|--------------------|
| | 11 | Marked for GC |
|-------------------------------------------------------|--------------------|

Monitor 监视器模型

img

上图是Monitor监视器模型,他有WaitSet, EntrySet, Owner三个模型

当调用的临界区加上了 synchronized 对象锁后,就会触发属于该对象的监视器,注意,他和 synchronized 一样,每个对象的监视器是独有的。

  • Thread1 调用了带有 synchronized 的临界区后,他会进入到 monitor 对象,检查其中的 Owner 是否有线程占用,没有的话则自己占用。
  • Thread2,3,4调用临界区后,发现 Owner 已经被 Thread1 占有了,那么他们会进入到 EntrySet 中进行等待,并且进入 Block 阻塞状态。
  • Thread0 因为某些原因在运用时候被打断了(比如被别的线程调用wait()指令),那么他就会进入到 WaitSet 区域等待唤醒,并且进入 Waiting 状态。

Synchronized

Synchronized 原理

字节码:

1
2
3
4
5
6
7
8
9
10
0: getstatic #2 // <- lock引用 (synchronized开始)
3: dup
4: astore_1 // lock引用 -> slot 1
5: monitorenter // 将 lock对象 MarkWord 置为 Monitor 指针
6: getstatic #3 // <- i
9: iconst_1 // 准备常数 1
10: iadd // +1
11: putstatic #3 // -> i
14: aload_1 // <- lock引用
15: monitorexit // 将 lock对象 MarkWord 重置, 唤醒 EntryList

我们可以看到,他实质上是运用了monitorentermonitorexit进行对象的监控,也就是说,Synchronized 是基于 Monitor 使用的。

轻量级锁

如果一个对象虽然有多线程要加锁,但加锁的时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化。

对于使用者,仍是调用Synchronized,但底层已经优化

Synchronized 加锁的过程

  • 创建锁记录(Lock Record)对象,每个线程都的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的 Mark Word

img

  • 让锁记录中 Object reference 指向锁对象,并尝试用 cas 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录

img

  • 如果 cas 替换成功,对象头中存储了 锁记录地址和状态 00 ,表示由该线程给对象加锁,这时图示如下

img

  • 如果 cas 失败,有两种情况

    • 如果是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,进入锁膨胀过程
    • 如果是自己执行了 synchronized 锁重入,那么再添加一条 Lock Record 作为重入的计数

img

  • 当退出 synchronized 代码块(解锁时)如果有取值为 null 的锁记录,表示有重入,这时重置锁记录,表示重入计数减一

img

  • 当退出 synchronized 代码块(解锁时)锁记录的值不为 null,这时使用 cas 将 Mark Word 的值恢复给对象头

    • 成功,则解锁成功
    • 失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程

锁膨胀

开始我们检测到 synchronized 时候,加上的是轻量级锁,那么当我们要用 cas 操作加上轻量级锁的时候发现操作失败,说明有一种情况是已经有轻量级锁的存在了,那么这时候需要将锁升级为重量级锁。这个过程就叫做锁膨胀

img

  • 这时 Thread-1 加轻量级锁失败,进入锁膨胀流程

    • 即为 Object 对象申请 Monitor 锁,让 Object 指向重量级锁地址
    • 然后自己进入 Monitor 的 EntryList BLOCKED

img

  • 当 Thread-0 退出同步块解锁时,使用 cas 将 Mark Word 的值恢复给对象头,失败。这时会进入重量级解锁流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程

自旋优化

重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即这时候持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。即多试几次。

在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋。

偏向锁

轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行 CAS 操作。

Java 6 中引入了偏向锁来做进一步优化:只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现这个线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有

开启偏向锁后,对象的 markword 值最后三位是 101

没有开启偏向锁,对象创建后 markword 的值为 001

撤销偏向锁

  • 调用对象 hashCode
  • 其它线程使用对象
  • 调用 wait/notify

批量重偏向

当撤销偏向锁阈值超过 20 次后,jvm 给这些对象加锁时重新偏向至加锁线程

批量撤销

当撤销偏向锁阈值超过 40 次后,整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的

锁消除

当只有一个临界区加上锁,且只有一个线程执行,那么系统会自动运用锁消除技术。减轻系统负担。

wait notify notifyAll

sleep() 和 wait() 的区别

  1. sleep() 是 Thread 类的方法,wait() 是 object 类的方法
  2. wait() 一定要在 synchronized 代码块中使用,而 sleep() 不一定
  3. sleep() 不会释放对象锁,wait() 会释放对象锁
  4. 它们的状态都是 TIMED_WAITING

notify 随机唤醒线程,并不固定,可以采用 notifyAll 进行全部唤醒

同步设计模式——保护式暂停

guardObject

两个线程同时监听着一个中间件,生产者将产品置为成员变量,消费者监听到成员变量里的值不为null,才取出产品。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Slf4j(topic = "c.GuardedSuspension")
public class GuardedSuspension {

public static void main(String[] args) {
GuardObject go = new GuardObject();
// 线程1 等待 线程2 传递的资源
new Thread(() -> {
log.debug("开始等待获取结果");
Object o = go.get();
log.debug("获取到了结果: {}", o);
}, "t1").start();

new Thread(() -> {
try {
log.debug("start");
Thread.sleep(2000);
go.complete("result");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "t2").start();
}

}

@Slf4j
class GuardObject {
/**
* 结果储存
*/
private Object response;

/**
* 锁
*/
private final Object lock = new Object();

public Object get() {
synchronized(lock) {
// 没有值的情况下,一直等待
while(response == null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("获取到结果:{}", response);
return response;
}
}

public void complete(Object response) {
synchronized (lock) {
this.response = response;
lock.notifyAll();
}
}
}

带超时版

这个的设计主要就是 join()方法的设计,其主要运用了当前时间和运行时间的比较,再将他们进行对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Slf4j(topic = "c.GuardedSuspensionPassedTime")
public class GuardedSuspensionPassedTime {

public static void main(String[] args) {
GuardObjectPassedTime go = new GuardObjectPassedTime();
// 线程1 等待 线程2 传递的资源
new Thread(() -> {
log.debug("开始等待获取结果");
Object o = go.get(3000L);
log.debug("获取到了结果: {}", o);
}, "t1").start();

new Thread(() -> {
try {
log.debug("start");
Thread.sleep(2000);
go.complete("result");
log.debug("获取结果成功");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "t2").start();
}

}

@Slf4j(topic = "c.GuardObjectPassedTime")
class GuardObjectPassedTime {
/**
* 结果储存
*/
private Object response;

/**
* 锁
*/
private final Object lock = new Object();

public Object get(long millis) {
long current = System.currentTimeMillis();
long passed = 0L;
synchronized(lock) {
// 没有值的情况下,一直等待
while(response == null) {
if(passed >= millis) {
break;
}
try {
lock.wait(millis - passed);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 已经等待的时间
passed = System.currentTimeMillis() - current;
}
return response;
}
}

public void complete(Object response) {
synchronized (lock) {
this.response = response;
lock.notifyAll();
}
}
}

面向对象demo

主要运用了一个 Futures中间件,将对象们封装了起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@Slf4j(topic = "c.GuardedSuspensionMutiple")
public class GuardedSuspensionMutiple {

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
Thread.sleep(1000);
for(Integer id : Futures.getKeySet()) {
new Postman(id, "message" + id).start();
}
}

}


@Slf4j(topic = "c.People")
class People extends Thread {
@Override
public void run() {
log.debug("开始等待收信");
GuardMutipleObject guardMutipleObject = Futures.createGuardMutipleObject();
Object o = guardMutipleObject.get();
log.debug("收到信了: id: {} response: {}",guardMutipleObject.getId(), o);
}
}

@Slf4j(topic = "c.Postman")
class Postman extends Thread {
private Integer postmanId;

private String message;

public Postman(Integer postmanId, String message) {
this.postmanId = postmanId;
this.message = message;
}


public Integer getPostmanId() {
return postmanId;
}

public String getMessage() {
return message;
}

@Override
public void run() {
GuardMutipleObject guardMutipleObject = Futures.getGuardMutipleObject(postmanId);
log.debug("送信 id:{}, 内容:{}", postmanId, message);
guardMutipleObject.complete(message);
}
}

class Futures {

private static Map<Integer, GuardMutipleObject> guardMutipleObjectMap = new Hashtable<>();

private static Integer id = 0;

private static synchronized Integer generateId() {
return id++;
}

/**
* 获取任务并且移除
*/
public static GuardMutipleObject getGuardMutipleObject(Integer id) {
return guardMutipleObjectMap.remove(id);
}

/**
* 创建任务
*/
public static GuardMutipleObject createGuardMutipleObject() {
GuardMutipleObject guardMutipleObject = new GuardMutipleObject(generateId());
guardMutipleObjectMap.put(guardMutipleObject.getId(), guardMutipleObject);
return guardMutipleObject;
}

/**
* 获取任务编号
*/
public static Set<Integer> getKeySet() {
return guardMutipleObjectMap.keySet();
}
}

@Slf4j
class GuardMutipleObject {

public Integer getId() {
return id;
}

private Integer id;

public GuardMutipleObject(Integer id) {
this.id = id;
}

/**
* 结果储存
*/
private Object response;

/**
* 锁
*/
private final Object lock = new Object();

public Object get() {
synchronized(lock) {
// 没有值的情况下,一直等待
while(response == null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("获取到结果:{}", response);
return response;
}
}

public void complete(Object response) {
synchronized (lock) {
this.response = response;
lock.notifyAll();
}
}
}

异步设计模式——消费者和生产者

类比消息队列,但是消息队列中是进程之间的关系,而这里是线程之间的关系

img

这里主要相比较于前面的同步,无需再做到对象的一一对应,生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据。且他是有容量限制的,满时不会再加入数据,空时不会再消耗数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.arong.JUC.async;

import lombok.extern.slf4j.Slf4j;

import java.util.Deque;
import java.util.LinkedList;

@Slf4j(topic = "c.demo")
public class demo {


public static void main(String[] args) {

MessageQueue mq = new MessageQueue(2);

new Thread(() -> {
while (true) {
StandardMessage message = mq.take();
String response = message.getMessage();
log.debug("take message({}): [{}] lines", message.getId(), response);
}
}, "消费者").start();

for (int i = 0; i < 4; i++) {
StandardMessage standardMessage = new StandardMessage(i, "往消息队列发送了消息:" + i);
new Thread(()-> {
log.debug("发送消息: {}", standardMessage);
mq.put(standardMessage);
},"生产者" + i).start();
}

}
}

@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
/**
* 容量
*/
private final Integer Capacity;

/**
* 存储信息的队列
*/
private final Deque<StandardMessage> list = new LinkedList<>();

MessageQueue(Integer capacity) {
this.Capacity = capacity;
}

public void put(StandardMessage sm) {
synchronized (list) {
// 队列满了的时候
while(list.size() == Capacity) {
log.debug("队列已满");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.addFirst(sm);
list.notifyAll();
}
}

public StandardMessage take() {
synchronized (list) {
while(list.isEmpty()) {
log.debug("队列已空");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
StandardMessage standardMessage = list.removeLast();
list.notifyAll();
return standardMessage;
}
}

}


class StandardMessage {
private Integer id;

private String message;

public Integer getId() {
return id;
}

public String getMessage() {
return message;
}

@Override
public String toString() {
return "StandardMessage{" +
"id=" + id +
", message='" + message + '\'' +
'}';
}

public StandardMessage(Integer id, String message) {
this.id = id;
this.message = message;
}
}

Park & UnPark

与 Object 的 wait & notify 相比 :

  • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

线程状态

假设有线程 Thread t

情况 1 NEW –> RUNNABLE

  • 当调用 t.start() 方法时,由 NEW --> RUNNABLE

情况 2 RUNNABLE <–> WAITING

t 线程synchronized(obj) 获取了对象锁后

  • 调用 obj.wait() 方法时,t 线程RUNNABLE --> WAITING

  • 调用 obj.notify()obj.notifyAll()t.interrupt()

    • 竞争锁成功,t 线程WAITING --> RUNNABLE
    • 竞争锁失败,t 线程WAITING --> BLOCKED

情况 3 RUNNABLE <–> WAITING

  • 当前线程调用 t.join() 方法时,当前线程RUNNABLE --> WAITING

注意是当前线程t 线程对象的监视器上等待

  • t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程WAITING --> RUNNABLE

情况 4 RUNNABLE <–> WAITING

  • 当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE --> WAITING
  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING --> RUNNABLE

情况 5 RUNNABLE <–> TIMED_WAITING

  • 调用 obj.wait(long n) 方法时,t 线程RUNNABLE --> TIMED_WAITING

  • t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时

    • 竞争锁成功,t 线程TIMED_WAITING --> RUNNABLE
    • 竞争锁失败,t 线程TIMED_WAITING --> BLOCKED

情况 6 RUNNABLE <–> TIMED_WAITING

  • 当前线程调用 t.join(long n) 方法时,当前线程RUNNABLE --> TIMED_WAITING

注意是当前线程t 线程对象的监视器上等待

  • 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程TIMED_WAITING --> RUNNABLE

情况 7 RUNNABLE <–> TIMED_WAITING

  • 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE --> TIMED_WAITING
  • 当前线程等待时间超过了 n 毫秒,当前线程TIMED_WAITING --> RUNNABLE

情况 8 RUNNABLE <–> TIMED_WAITING

  • 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线程RUNNABLE --> TIMED_WAITING
  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从TIMED_WAITING--> RUNNABLE

情况 9 RUNNABLE <–> BLOCKED

  • t线程用synchronized(obj) 获取了对象锁时如果竞争失败,从RUNNABLE --> BLOCKED
  • 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争成功,从 BLOCKED --> RUNNABLE ,其它失败的线程仍然BLOCKED

情况 10 RUNNABLE <–> TERMINATED

  • 当前线程所有代码运行完毕,进入 TERMINATED