synchronized实现原理

前言

作为并发编程不可或缺的一部分,线程的安全一直绕不开的一道坎,而Java语言则是使用了synchronized这一关键字来作为支持,既然能够作为关键字而不是单独的一个工具类,那就很有必要好好剖析一下它。

并发问题

首先需要明确,必须是在多线程环境下,多个线程同时需要修改相同的内容才会发生并发问题,如果大家都是读取数据那是不会有问题的,但是一旦有人写数据就会出现问题了,主要是这三种:

  • 可见性:线程A修改的数据,线程B无法马上获取到。
  • 原子性:最经典的i++问题,因为自增操作对应的指令其实是4个(其实4个也不对,最后应该落实到CPU上才对),所以不是原子操作。
  • 有序性:编译器会优化你的代码,导致实际的代码并不如你编写时的顺序(但是保证在单线程下无误)。

内存模型

首先需要声明,这是一套抽象的概念,是一套规范,目的就是用来解决并发问题。Java规范规定了,有一个主内存存放了各种共享数据,然后每个线程有各自的工作内存,线程的局部变量就存放与工作内存中,线程不允许在主内存中进行操作的,需要把变量复制一份到自己的工作内存中,然后进行操作,最后放回主内存。

上面的主内存工作内存,并不是真实物理机上面的内存,也有可能是CPU的寄存器和CPU的缓存。

对于原子性问题,Java内存模型提出了8种原子操作,用来完成同步:lock - read(从主内存读取) - load(主内存到工作内存) - use - assign - store - write - unlock

对于可见性问题,我们只需要设置共享内存的变量为volatile即可,这样只要修改了之后,就会通知所有的线程工作内存中该值失效,让它们读取共享内存中的值。或者用synchronized随便锁一个对象都可以,因为synchronized会执行lock这个原子操作,刷新工作内存中的值。PS:println方法也可以,因为内部也是用的synchronized。

对于有序性问题,emmmm因为只要你锁定了代码块,里面的顺序乱了就乱了,反正也没人干扰。volatile也可以保证变量不发生重排序。

用法

修饰方法

直接在方法的声明中加入synchronized,即可将该方法声明为同步方法。普通方法(非静态方法)在被声明为同步方法之后,当前线程的锁便是实例对象,即锁住了该方法所属的对象,所以你也将无法访问该对象的其它的synchronized方法,因为一个对象只有一把锁。切记这里是对象,如果你创建线程的时候就用了两个不同的对象,那么这个锁就失去了它的意义。

如果方法同时还有了static修饰,那么说明锁是加给了当前的类(更准确的应该是当前类的class对象)。

同步代码块

有的时候如果对整个方法进行synchronized声明,可能对效率的影响会比较大,所以需要我们自己手动来指定哪些代码需要进行同步,并给哪个对象加锁。一般我们都会使用this或者Xxx.class(其实就是上面修饰方法中锁住的那两位)

特性

  • 可重入性:一个线程可以多次获取相同的锁。一个计数器就能实现,很简单。可以避免死锁和提高编程效率。
  • 不可中断性:线程A获得了锁,那么线程B为了得到这把锁只能阻塞或者等待,且线程B不可中断,就算你终止线程B也不可以。PS:Lock的lock也是不可中断,但是tryLock是可以中断的。

底层

同步在JVM中是基于进入和退出管程(Monitor)对象实现。而在此之前,还需要一些别的知识来进行补充。

对象头和monitor

对象头和monitor是实现同步的重要组件。

Java的对象在内存的堆中,这点大家都知道,它可以被分成三部分,分别是对象头、实例数据和填充部分。实例数据存放的是类的一些属性信息,而填充部分则是为了让对象满足8字节的整数倍而存在的,所以对同步有用的部分,就在对象头里。

对象头是由Mark Word (标记字段)和 Class Metadata Address(类元数据地址,即这个对象是哪个类的对象) 组成的:在32位的虚拟机中,Mark Word存放的是25bit的对象hashcode(只有你用到hashcode的时候才会生成),4bit的对象分代年龄(用于垃圾回收),1bit是否是偏向锁和2bit的锁标志位。当然也可以灵活变通一下。

Synchronization is built around an internal entity known as the intrinsic lock or monitor lock. (The API specification often refers to this entity simply as a “monitor.”)

简单点说,就是每一个对象都会有一个叫monitor的对象(monitor是C++的对象,并不是一开始就有的,而是当JVM运行到同步代码块的时候才会去创建)。只要这个monitor被任何一个线程持有,那么就说这个monitor对应的对象被锁定了。这个锁对象是JVM创建的,其内部有两个比较重要的属性,owner用来标识是哪个线程拥有,而recursions则是用来表示获取锁的次数(为了可重入性)。

同步代码块底层实现

下面用一段代码来看看:

1
2
3
4
5
6
7
8
9
10
public class test {
public int i;

// 注意这里是同步代码块!
public void run() {
synchronized (this) {
i++;
}
}
}

然后编译、反编译这段代码,得到如下结果:

image-20200330205252584

可以明显发现就是通过monitorenter指令来进入同步块,通过monitorexit指令来退出同步块。

运行monitorenter指令会创建monitor对象,而monitorexit必须由拥有monitor的线程来执行,执行一次计数器减1,到0就会失去monitor的所有权,这时那些被monitor阻塞的线程就可以尝试获得这个monitor了。

观察上图你还会发现,为什么会有两个monitorexit指令呢?这是编译器为了保证能够释放所做的。

同步方法的底层实现

首先还是先写一个同步方法:

1
2
3
4
5
6
7
8
public class test {

public int i;

public synchronized void run(){
i++;
}
}

然后反编译查看指令:

image-20200330211234502

可以看到没有对应的monitorentermonitorexit的指令,而是在方法上加了一个flag来标识这是一个同步方法。JVM在读取到这个标识的时候就会执行相应的同步调用,即隐式的使用monitorentermonitorexit指令。

源码

可惜的是,synchronized是JVM支持的,所以必须查看JVM的源代码才可以。登录https://openjdk.java.net即可下载hotspot的源码。

CAS

Compare And Swap,是现代CPU对共享内存支持的一种指令,它可以将比较和交换变成原子操作,由CPU来保证。它依赖三个值,内存中的值V,旧的预估值X和要修改的新值B。如果X=V,那么就把B代替V。这是因为之前多线程有问题,都是因为线程A更新了某个值,线程B把A更新的值覆盖掉了,所以只需要对比下旧的预估值和内存中的值是否相同,就能知道自己是否会覆盖掉别人的成果。同时CAS也是乐观锁的关键所在。

monitor结构

可以在src/share/vm/runtime/objectMonitor.hpp中找到monitor的文件,可以看到以下的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ObjectMonitor() {
_header = NULL;
_count = 0;
_waiters = 0,
_recursions = 0; // 重入次数
_object = NULL; // 存储java对象,Java对象则是通过对象头指向monitor,相互引用
_owner = NULL; // 谁拥有monitor
_WaitSet = NULL; // 处于等待状态的线程集合
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ; // 第一次阻塞会先到这里
FreeNext = NULL ;
_EntryList = NULL ; // 第二次阻塞会到这里
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}

竞争monitor的过程

src/share/vm/interpreter/interpreterRuntime.cpp文件中有竞争的过程,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
if (PrintBiasedLockingStatistics) {
Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
}
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");

// ↓重要的部分
if (UseBiasedLocking) { // 偏向锁,下面会讲
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
// ↑重要的部分
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
"must be NULL or an object");
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END

最终会来到下面的代码部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD ;
void * cur ;

// Atomic::cmpxchg_ptr是CPU保证的原子操作,就是设置_owner为当前线程
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) {
// Either ASSERT _recursions == 0 or explicitly set _recursions = 0.
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert OwnerIsThread == 1
return ;
}

// 这里是重入的部分
if (cur == Self) {
// TODO-FIXME: check for integer overflow! BUGID 6557169.
_recursions ++ ;
return ;
}

// 第一次进入
if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
_recursions = 1 ;
// Commute owner from a thread-specific on-stack BasicLockObject address to
// a full-fledged "Thread *".
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}

// 省略一部分

// TODO-FIXME: change the following for(;;) loop to straight-line code.
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()

// 没有抢到锁
EnterI (THREAD) ;

if (!ExitSuspendEquivalent(jt)) break ;

//
// We have acquired the contended monitor, but while we were
// waiting another thread suspended us. We don't want to enter
// the monitor while suspended because that would surprise the
// thread that suspended us.
//
_recursions = 0 ;
_succ = NULL ;
exit (false, Self) ;

jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
}

Atomic::dec_ptr(&_count);
assert (_count >= 0, "invariant") ;
Self->_Stalled = 0 ;

// 省略
}

具体流程分析如下:

  1. 通过CAS来把monitor设置为当前线程。
  2. 如果之前的owner就是自己,那么就把recursion++;如果是第一次进入的,就把owner设成自己,recursion设置成1。
  3. 获得锁失败就进入队列,等待其它线程释放锁,就是EnterI (THREAD),该函数位于同一个文件中。

进入等待队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;

// Try the lock - TATAS 虽然没抢到锁,但是还是最后尝试一次
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}

DeferredInitialize () ;

// We try one round of spinning *before* enqueueing Self.
//
// If the _owner is ready but OFFPROC we could use a YieldTo()
// operation to donate the remainder of this thread's quantum
// to the owner. This has subtle but beneficial affinity
// effects.

// 自旋,再抢救一下...
if (TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}

// The Spin failed -- Enqueue and park the thread ...
assert (_succ != Self , "invariant") ;
assert (_owner != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;

// 没戏了,将当前线程封装成一个ObjectWaiter,并将状态设置为TS_CXQ
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;

// Push "Self" onto the front of the _cxq.
// Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
// Note that spinning tends to reduce the rate at which threads
// enqueue and dequeue on EntryList|cxq.
// 把线程加入到_cxq队列中,同样也是CAS操作。
ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}

// 省略

for (;;) {
// 要被挂起之前再尝试下
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;

if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}

// park self 挂起,注意你if-else都会挂起的
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
// Increase the RecheckInterval, but clamp the value.
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;
}

// 被人唤醒就想抢锁
if (TryLock(Self) > 0) break ;

// 省略
return ;
}

尝试获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int ObjectMonitor::TryLock (Thread * Self) {
for (;;) {
void * own = _owner ;
if (own != NULL) return 0 ;
// 还是CAS操作哦
if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
// Either guarantee _recursions == 0 or set _recursions = 0.
assert (_recursions == 0, "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert that OwnerIsThread == 1
return 1 ;
}
// The lock had been free momentarily, but we lost the race to the lock.
// Interference -- the CAS failed.
// We can either return -1 or retry.
// Retry doesn't make as much sense because the lock was just acquired.
if (true) return -1 ;
}
}

锁的释放

首先是recursion–,然后是根据不同的模式来把_cxq和_EntrySet进行组合,然后去把其余的线程唤醒。

四种锁状态

之前synchronized的效率不高,因为Java确保了每次进入同步代码块的时候都会检查锁;在实际使用中,人们发现又不少规律,通过这些规律,对其进行了优化。

偏向锁

实际中可以发现,大部分的情况下,锁并不存在多线程之间的竞争,并且老是被同一个线程获取,所以我们应该基于此进行优化。

偏向锁的核心思想是,如果一个线程获得了锁,那么锁就进入偏向模式(通过CAS操作),此时Mark Word 的结构也变为偏向锁结构,当这个线程再次请求锁时(通过对线程id的对比可知),无需再做任何同步操作,即获取锁的过程,这样就省去了大量有关锁申请的操作,从而也就提供程序的性能。

撤销偏向锁需要等待全局安全点。

但是当大量线程竞争的时候,偏向锁显然力不从心,需要轻量锁来帮忙了。

轻量锁

对绝大部分的锁,在整个同步周期内都不存在竞争,基于此才有了轻量锁。

如果当前对象处于无锁状态,JVM就会在当前线程的栈帧中创建锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,并且让owner指向当前的对象,之后JVM利用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,如果成功表示竞争到锁,则将锁标志位变成“轻量锁”,如果失败,那就进入重量级锁了。

自旋锁

在大多数情况下,线程持有锁的时间都不会太长,基于此才有了自旋锁。我们都知道,操作系统实现线程之间的切换时需要从用户态转换到核心态,开销太大,而根据上面的理论,我们完全可以让当前线程等一会(比如空循环100次这样),大概率过一会就能拿到锁了,所以才叫“自旋锁”。值得注意的是,自旋锁需要多核处理器才有意义,单核情况下是没有意义的。但是万一拿不到,emmm就会很傻的在那里空转一会然后再乖乖挂起…所以设置等待的次数非常重要,好在JVM能够自适应调整这个次数。

重量级锁

前面都解决不掉,那只好派它登场了。毕竟,相比起效率,出错肯定是更加无法忍受的。

根据前面的代码也可以看到,为了保证原子性,需要有内核函数Atomic::cmpxchg_ptr等支持,同时线程的挂起park和唤醒unpark也需要操作系统的内核态和用户态之间转换。

锁消除

有的时候你明明不需要同步,但是你还是加上了synchronized,聪明的JIT可以发现这一点,然后自动帮你消除。

锁粗化

JVM探测到一连串细小的对象,频繁使用synchronized锁住同一个对象,会用一次大的synchronized覆盖所有的操作,这样就只需要加一次锁就可以了。

面试总结

Q:synchronized和Lock的区别?

A:有以下区别:

  • synchronized是一个关键字,是JVM来支持的;而Lock是一个接口,可以看到源码。
  • synchronized它会自动释放锁,而lock你需要手动上锁和解锁。
  • synchronized有不可中断性,但是lock可以自己调用不同的方法,既可以中断也可以不中断,同样的由于是手动编写代码,Lock可以知道线程有没有拿到锁,synchronized则不可以。
  • Lock中有个实现类是可以让多个线程同时读,写只允许一个写,这样可以让读的效率提升。
  • synchronized是非公平锁,而ReentrantLock可以自己控制,但是我个人觉得意义不大。

Q:乐观锁和悲观锁的区别?

A:悲观锁就是每次都要上锁,synchronized和ReentrantLock都是悲观锁,这样效率比较低。乐观锁典型就是CAS这种,如果改了再重试,效率较高。