Java多线程补坑

lambda

在正式开始Java的多线程之前,首先了解一下这个在jdk8中正式加入的为了简化操作而加入的lambda表达式。

为了便于理解,打算从最简单的开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ThreadTest {
public static void main(String[] args) {
LambdaTest lambdaTest = new Lambda();
lambdaTest.test();
}
}

interface LambdaTest {
void test();
}

class Lambda implements LambdaTest {
@Override
public void test() {
System.out.println("in class lambda");
}
}

上面这段代码是比较简单的,自己定义了一个接口,且这个接口只有一个方法(这对lambda是必须的,更加精确的说法应该是仅包含一个抽象方法),然后有一个类去实现了这个接口。最后我们在主类中调用了一下接口。

然后稍微改动下,把这个接口定义移动到别的地方(内容完全不变),同时把实现类移动到内部,也就是成为了内部类,好处是这两个类可以共存亡。当然为了能够在main方法中调用,这个类必须被声明成静态类,所以代码成了下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadTest {
static class Lambda implements LambdaTest {
@Override
public void test() {
System.out.println("in class lambda");
}
}

public static void main(String[] args) {
LambdaTest lambdaTest = new Lambda();
lambdaTest.test();
}
}

但是这么写还是比较麻烦,不如干脆把这个实现类删除(给实现类取名字还显得麻烦),然后做一个方法的内部类好了。于是有了下面的这版,这版其实是用的最广泛的,因为毕竟lambda是在jdk8之后才加入支持的嘛。

1
2
3
4
5
6
7
8
9
10
11
12
// 方法内部定义“类”
public class ThreadTest {
public static void main(String[] args) {
LambdaTest lambdaTest = new LambdaTest(){
@Override
public void test() {
System.out.println("in method");
}
};
lambdaTest.test();
}
}

最后,也就是在jdk8中正式加入的lambda,因为其实我们是可以自己推导出需要的类和需要重写的方法(因为之前提到过只能有一个方法,所以必然是它),所以最后可以简化操作成下面这样:

1
2
LambdaTest lambdaTest = () -> System.out.println("in method");
lambdaTest.test();

线程的概念

线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

那么第一个问题就是,明明已经有了进程了,而且各个进程之间都独立了,为什么还需要线程呢?第一个原因是如果只有进程而没有线程,那么程序一旦发现需要读取文件或者进行下载,会整个卡住无法动弹,如果是其中的一个线程在下载,那就没有问题。其次是进程之间并不共享数据,且进程切换的开销会比线程的开销大。随之而来的另外一个问题:为什么进程切换开销会比线程切换大呢?最主要的原因就在虚拟内存切换上。因为线程切换是不需要切换TLB,也就是TLB还是可以用;而如果切换了进程那么就完全失效了,所以就会变慢。当然还有其他的原因,只不过这是最最主要的原因。

线程之下还有更细粒度的纤程,目前还没接触过,以后有机会了试试。

JDK中线程类的翻译

Thread

Thread类就是java中用来表示线程的一个类。JVM允许一个程序并发运行多个线程。

每个线程都具有一个优先级([1,10])。优先级越高越容易被执行。每个线程都可以被标记为守护线程(即如果只剩下了守护线程,那么JVM会退出;GC线程就是守护线程,当所有工作线程都结束了,GC线程也没有存在的必要了)。如果没有显式指定,那么线程的优先级和是否是守护线程是从父线程里继承来的。

当JVM运行的时候,一般来说至少会有一个非守护进程,JVM会一直执行,直到:

  • 运行到Runtime.getRuntime().exit(0);这句代码并且security manager批准了退出这一操作。
  • 所有的非守护线程结束了。

两种方法来创建一个新的线程(注:参考的是JDK8的文档,Callable是在1.5的时候引入的,所以JDK认为Callable不算一种新的方法)。一种是创建Thread的子类并重写run方法,另外一种方法是实现Runnable接口。

每一个线程都有自己的名字,但是多个线程可以取相同的名字(可以用getId来获取线程的唯一标识符,ID是唯一的且在整个生命周期中都不会变)。如果创建者没有指定线程的名字,那么系统会为线程创建一个名字。

除非另有说明,否则传递null参数给Thread会抛出NPE异常。

内部类

其中一个是State的内部类,这个涉及到之后的生命周期;还有一个内部接口是UncaughtExceptionHandler,用来处理那些没有被捕获的异常的。

构造器

Thread的构造器多种多样,其实都可以归结到一个构造器上面,就是:

1
2
3
public Thread(ThreadGroup group, Runnable target, String name) {
init(group, target, name, 0);
}

一般情况下,我们都只会传递一个Runnable的对象给构造器,所以name默认是由系统自动生成的,也就是Thread-0这类的。ThreadGroup一般我们也不会指定,如果你的程序设置了SecurityManager(绝大部分的程序都不会有的,可以通过System.getSecurityManager()这个静态方法查询,如果是null那么说明你的程序没有设置),那么ThreadGroup就设置成当前的SecurityManager.getThreadGroup(),否则就设置成当前线程的线程组。

创建的新线程的优先级和创建者的优先级相同,只有创建者是守护者线程,创建的线程才会是守护进程。当然这俩可以在创建完成之后修改。

PS:其实还可以有第四个参数指定栈的大小,但是我没见过有用过,所以只需要有印象有这个参数就可以了。文档里也说了这个参数在某些平台上是无效的,而且JVM只会参考这个值,并不是真的把这个值当做认真的。

关键方法

  • yield:中文就是放弃的意思。用来告诉scheduler当前的线程愿意放弃CPU时间,当然CPU可以选择忽略掉。在实际中很少会使用到这个方法。只有当debug或者是错误复现的时候比较有用,其他情况下请无视它。
  • sleep:首先,这个函数的精度受制于system timers和schedulers;其次,实际中非常重要的一句话:The thread does not lose ownership of any monitors.通俗地讲就是sleep仍然保有对应的锁。
  • start:从你调用start方法开始,正式变成了两个线程并发执行。注意每个线程只能执行一次start方法,否则会抛异常。
  • run:只有是对应的Runnable的run方法才有效。否则这个方法就只是一个普通的方法。
  • stop:请不要使用。
  • interrupt:如果是自己中断自己,那没问题;如果是别的线程打断,那就需要检查权限。如果当前线程被wait、sleep、join阻塞,此时又调用了interrupt方法,那么就会抛出异常。
  • interrupted:判断当前的线程是否被中断。调用了这个方法之后会清除中断状态。也就是第二次必定返回false。
  • isInterrupted:如果你不希望改变中断状态,用这个方法。
  • join:这个方法底层是wait,等待一个线程执行完毕。比如我主线程希望能等待t1和t2这两个线程完成之后再退出,就可以用t1.join和t2.join来确保主线程等待这两个线程完成。谁调用这个方法,谁就进入到等待状态,直到这个线程死亡(执行完毕)。join底层的代码非常非常简单,就是用的synchronized修饰(所以本质上是锁住了thread对象)。推荐在线程之中,不要使用wait而使用join。对于这个函数还有一点,我自己之前一直记不住它,就觉得它很奇怪,为什么叫join呢?跟“加入”有什么关系?更神奇的是别的语言用的也是join,于是我去查了一下,也有人对这个有疑问。简单看了一眼,应该是因为最开始是德语,然后德语中用了verknüpfen这个词语。

wait() VS sleep()

首先我们知道,join、wait和sleep都有让某些线程暂时一段时间的功能,其中推荐如果使用thread对象,那么就使用join,而join底层就是wait,所以本质上其实就是wait和sleep之间的差异。

首先最大的区别自然就是wait了之后需要别的线程来进行notify,而sleep不需要。其次是wait必须在synchronized块中使用,因为必须要获得一个对象的监视器才可以。

如果用人类的话来说:

  • sleep相当于告诉调度器:“接下来的xx毫秒内不要调度我,我不需要任何时间片”。
  • yeild相当于告诉调度器:“我现在不需要时间片了,你看看别人有没有要的吧,但是我还没做完我的工作”。
  • wait相当于告诉调度器:“我现在不需要时间片了,直到有别的人来notify我”。

线程的状态

也称为线程的生命周期。

查看线程类,可以发现里面有一个叫State的枚举类,里面有线程的六种状态,在任何时候,任何线程必定属于其中之一

  • NEW
    尚未启动的线程处于此状态。即当你new完一个线程对象的时候,线程就处于这个状态。
  • RUNNABLE
    在Java虚拟机中执行的线程处于此状态。简单来说就是你调用了线程的start方法之后,线程处于这个状态。至于这个线程是不是在运行,这取决于操作系统,所以才是“可运行状态”,而不是RUNNING状态。
  • BLOCKED
    被阻塞等待监视器锁定的线程处于此状态。说的通俗点,如果线程需要等待别的线程释放锁,那么它就处于阻塞状态。
  • WAITING
    正在等待另一个线程执行特定动作的线程处于此状态。看上去和阻塞非常像,我在Thread类里的注释里,它说的是,线程等待另外一个线程执行特定的操作。可以通过Object.wait()或者Thread.join()来进入等待状态。
  • TIMED_WAITING
    正在等待另一个线程执行动作达到指定等待时间的线程处于此状态。简单来说就是上面的那个状态加了个时间。
  • TERMINATED
    已退出的线程处于此状态。

其实上面估计就阻塞等待这两个状态会有疑惑,下面详细讲述下这两个之间的区别。

上面也说了,可以通过Object.wait()或者Thread.join()(join底层就是wait)来进入WAITING状态,而在进入WAITING状态之前,线程必须要进入到synchronized方法块里面,而如果进不到synchronized方法块,就是说的BLOCKED状态。进入WAITING状态之后,线程会释放掉它所持有的监视器锁,这样别的线程就可以进入到synchronized块中,可以进行notify操作了。只有当别的线程通过notify()或者notifyAll()方法之后才会把线程从等待状态移动到阻塞状态。然后进入阻塞状态之后,线程就会去尝试获得锁,一旦获得,就进入了RUNNABLE状态。

同步

同步是线程中最核心的问题,也是最需要解决的问题。下面通过一个没有加入同步的demo来说明这一问题。

有一个银行,里面有各种账户,账户之间可以进行互相转账:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Bank {
private final double[] accounts;

/**
* 构造方法
*
* @param n 账户数目
* @param initialBalance 初始化金额
*/
public Bank(int n, double initialBalance) {
accounts = new double[n];
Arrays.fill(accounts, initialBalance);
}

/**
* 账户之间的转账
*
* @param from 从第from个数组位置取钱
* @param to 存入第to个数组位置
* @param amount 存取的金额
*/
public void transfer(int from, int to, double amount) throws InterruptedException {
System.out.println(Thread.currentThread());
accounts[from] -= amount;
Thread.sleep(200);
accounts[to] += amount;
System.out.printf("%10.2f金额从%d到%d\n", amount, from, to);
System.out.printf("整个银行剩余的金额 %10.2f\n", getTotalBalance());
}

/**
* 返回银行所有的资金
*
* @return 银行所有的金额
*/
public double getTotalBalance() {
double sum = 0;
for (double eachAmount : accounts) {
sum += eachAmount;
}
return sum;
}

/**
* 返回账户数目
*
* @return 账户数目
*/
public int size() {
return accounts.length;
}

}

然后再这些账户之间不停进行转账,理论上银行的资金总和是不会发生变化的,但是由于线程同步的问题,就会发生总金额少了或者多了的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class BankTest {
public static final int NACCOUNTS = 10;
public static final double INITIAL_BALANCE = 1000;
public static final double MAX_ACCOUNT = 1000;
public static final int DELAY = 10;

public static void main(String[] args) {
Bank bank = new Bank(NACCOUNTS, INITIAL_BALANCE);
for (int i = 0; i < NACCOUNTS; i++) {
int fromAccount = i;
new Thread(() -> {
while (true) {
int toAccount = (int) (Math.random() * bank.size());
double amount = MAX_ACCOUNT * Math.random();
try {
bank.transfer(fromAccount, toAccount, amount);
Thread.sleep((long) (DELAY * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

}

}
}

原因也相当简单,相信稍微学过点操作系统的都明白,因为这个操作它不是原子性的,导致数据已经计算完毕,但是即将写回去的时候,线程时间片到了,这个时候不得不让别人来进行操作,此时就会有错误发生了。

这里我之前写的时候遇到两个比较严重的问题(上面的代码已经解决了):第一个问题就是我一直观察不到总数发生减少的原因,就算加到100个线程还是不行,后来无奈给转账加上了线程睡眠才解决。第二个问题很严重,就是不论你加不加锁,最后的数字都是100.000000000001这样的,我一开始以为是锁机制出了问题,分析了下没问题;后来想到是不是因为我是多核的,但是分析了下这个跟多核也没关系呀;最后没辙了,去网站上下了源码,发现它运行就没问题,最后发现了是double的精度问题,我输出的时候没指定好,导致输出了高精度的值,就出现了问题。PS:本身float和double就不能用在货币上。

通过操作系统中的知识我们知道,只要通过锁机制就可以完美解决这个问题。Java天然提供了一个叫synchronized关键字来达到这个目的,并且还有一个叫ReentrantLock的类来实现这个目的,由于关键字比较抽象,所以先从ReentrantLock开始理解。

ReentrantLock类

此类属于java.util.concurrent.locks,它内部的结构就是通过上锁、解锁的相关操作来完成的,所以很好理解。这里唯一需要注意的就是,这个类的锁是可重入的,也就是会有一个计数器来记录,也就是只要线程拿到了锁,那么如果它还想继续访问上锁的资源,是可以继续访问的,此时会让计数器加一,当释放的时候计数器减一,直到最后释放这把锁。修改之后的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void transfer(int from, int to, double amount) throws InterruptedException {
lock.lock();
try{
System.out.println(Thread.currentThread());
accounts[from] -= amount;
Thread.sleep(200);
accounts[to] += amount;
System.out.printf("%10.2f金额从%d到%d\n", amount, from, to);
System.out.printf("整个银行剩余的金额 %10.2f\n", getTotalBalance());
}finally {
lock.unlock();
}

}

之所以要try和finally是为了确保锁能够释放。运行一下,就没有问题了。

但是!!如果当一个线程拿到了锁,并且进到了try代码块里,发现钱不够,那么就应该等待别人往这个账户里放入足够的钱,然后再进行操作。但是现在这个线程已经拿到了锁,别人是进不来的,这个时候当然还需要一个叫条件变量(conditional variable)的东西来协调。可以通过锁对象直接获得它,然后可以调用它的await()方法来释放锁并进入阻塞状态,直到有别的线程通过signalAll()方法唤醒它。由于完全有可能有很多线程都在等待这个条件变量,所以唤醒后他们仍然会等待调度,一旦有个线程获得了锁,它就会从await()之后继续运行,所以代码里一定要写while,如果写了if,那么就只会判定一次,但是就算存了钱,仍然有可能导致账户里的钱还是不够。修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void transfer(int from, int to, double amount) throws InterruptedException {
lock.lock();
try {
while (accounts[from] < amount) {
enoughMoney.await();
}
System.out.println(Thread.currentThread());
accounts[from] -= amount;
Thread.sleep(200);
accounts[to] += amount;
System.out.printf("%10.2f金额从%d到%d\n", amount, from, to);
System.out.printf("整个银行剩余的金额 %10.2f\n", getTotalBalance());
enoughMoney.signalAll();
} finally {
lock.unlock();
}

}

在取完钱并且还没释放锁的时候就通知了所有等待这一条件的线程,这么做并不会有问题,首先这个线程是有很大概率在现有时间片条件下完成解锁的动作的,就算真的非常非常不巧,立马时间片就给了其他线程,由于锁的存在,也是不会有事的。

synchronized关键字

这个关键字从Java1.0开始就已经内嵌于Java这门语言中,相反上面的ReentrantLock反而是Java5.0加入的。要理解这个关键字,需要知道其实每个对象内部都有一个内部所,然后如果你为一个方法声明了synchronized关键字,其实就相当于在进入方法的时候为这个对象上了锁,然后方法执行完了会释放这把锁。

而所有的对象又是都有wait、notifyAll这些方法,这些方法和上面的await、signalAll是一样的,主要是Object类中把这些方法都声明为了final,所以不得已只能重新取名字了。