java多线程补坑

lambda

在正式开始Java的多线程之前,首先了解一下这个在jdk8中正式加入的为了简化操作而加入的lambda表达式。

为了便于理解,打算从最简单的开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ThreadTest {
public static void main(String[] args) {
LambdaTest lambdaTest = new Lambda();
lambdaTest.test();
}
}

interface LambdaTest {
void test();
}

class Lambda implements LambdaTest {
@Override
public void test() {
System.out.println("in class lambda");
}
}

上面这段代码是比较简单的,自己定义了一个接口,且这个接口只有一个方法(这对lambda是必须的,更加精确的说法应该是仅包含一个抽象方法),然后有一个类去实现了这个接口。最后我们在主类中调用了一下接口。

然后稍微改动下,把这个接口定义移动到别的地方(内容完全不变),同时把实现类移动到内部,也就是成为了内部类,好处是这两个类可以共存亡。当然为了能够在main方法中调用,这个类必须被声明成静态类,所以代码成了下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadTest {
static class Lambda implements LambdaTest {
@Override
public void test() {
System.out.println("in class lambda");
}
}

public static void main(String[] args) {
LambdaTest lambdaTest = new Lambda();
lambdaTest.test();
}
}

但是这么写还是比较麻烦,不如干脆把这个实现类删除(给实现类取名字还显得麻烦),然后做一个方法的内部类好了。于是有了下面的这版,这版其实是用的最广泛的,因为毕竟lambda是在jdk8之后才加入支持的嘛。

1
2
3
4
5
6
7
8
9
10
11
12
// 方法内部定义“类”
public class ThreadTest {
public static void main(String[] args) {
LambdaTest lambdaTest = new LambdaTest(){
@Override
public void test() {
System.out.println("in method");
}
};
lambdaTest.test();
}
}

最后,也就是在jdk8中正式加入的lambda,因为其实我们是可以自己推导出需要的类和需要重写的方法(因为之前提到过只能有一个方法,所以必然是它),所以最后可以简化操作成下面这样:

1
2
LambdaTest lambdaTest = () -> System.out.println("in method");
lambdaTest.test();

线程的状态

本来是想先写什么是线程的,但是感觉太基础了,直接跳到这一块来。

查看线程类,可以发现里面有一个叫State的枚举类,里面有线程的六种状态,在任何时候,任何线程必定属于其中之一:

  • NEW
    尚未启动的线程处于此状态。即当你NEW完一个线程对象的时候,线程就处于这个状态。
  • RUNNABLE
    在Java虚拟机中执行的线程处于此状态。简单来说就是你调用了线程的start方法之后,线程处于这个状态。至于这个线程是不是在运行,这取决于操作系统,所以才是“可运行状态”
  • BLOCKED
    被阻塞等待监视器锁定的线程处于此状态。说的通俗点,如果线程需要等待别的线程释放锁,那么它就处于阻塞状态。
  • WAITING
    正在等待另一个线程执行特定动作的线程处于此状态。看上去和阻塞非常像,我在Thread类里的注释里,它说的是,线程等待另外一个线程执行特定的操作。可以通过Object.wait()或者Thread.join()来进入等待状态。
  • TIMED_WAITING
    正在等待另一个线程执行动作达到指定等待时间的线程处于此状态。简单来说就是上面的那个状态加了个时间。
  • TERMINATED
    已退出的线程处于此状态。

其实上面估计就阻塞等待这两个状态会有疑惑,下面详细讲述下这两个之间的区别。

上面也说了,可以通过Object.wait()或者Thread.join()来进入等待状态,而在进入等待状态之前,线程会释放掉它所持有的锁。只有当别的线程通过notify()或者notifyAll()方法之后才会把线程从等待状态移动到阻塞状态。然后进入阻塞状态之后,线程就会去尝试获得锁,一旦获得,就进入了RUNNABLE状态。

同步

同步是线程中最核心的问题,也是最需要解决的问题。下面通过一个没有加入同步的demo来说明这一问题。

有一个银行,里面有各种账户,账户之间可以进行互相转账:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Bank {
private final double[] accounts;

/**
* 构造方法
*
* @param n 账户数目
* @param initialBalance 初始化金额
*/
public Bank(int n, double initialBalance) {
accounts = new double[n];
Arrays.fill(accounts, initialBalance);
}

/**
* 账户之间的转账
*
* @param from 从第from个数组位置取钱
* @param to 存入第to个数组位置
* @param amount 存取的金额
*/
public void transfer(int from, int to, double amount) throws InterruptedException {
System.out.println(Thread.currentThread());
accounts[from] -= amount;
Thread.sleep(200);
accounts[to] += amount;
System.out.printf("%10.2f金额从%d到%d\n", amount, from, to);
System.out.printf("整个银行剩余的金额 %10.2f\n", getTotalBalance());
}

/**
* 返回银行所有的资金
*
* @return 银行所有的金额
*/
public double getTotalBalance() {
double sum = 0;
for (double eachAmount : accounts) {
sum += eachAmount;
}
return sum;
}

/**
* 返回账户数目
*
* @return 账户数目
*/
public int size() {
return accounts.length;
}

}

然后再这些账户之间不停进行转账,理论上银行的资金总和是不会发生变化的,但是由于线程同步的问题,就会发生总金额少了或者多了的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class BankTest {
public static final int NACCOUNTS = 10;
public static final double INITIAL_BALANCE = 1000;
public static final double MAX_ACCOUNT = 1000;
public static final int DELAY = 10;

public static void main(String[] args) {
Bank bank = new Bank(NACCOUNTS, INITIAL_BALANCE);
for (int i = 0; i < NACCOUNTS; i++) {
int fromAccount = i;
new Thread(() -> {
while (true) {
int toAccount = (int) (Math.random() * bank.size());
double amount = MAX_ACCOUNT * Math.random();
try {
bank.transfer(fromAccount, toAccount, amount);
Thread.sleep((long) (DELAY * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

}

}
}

原因也相当简单,相信稍微学过点操作系统的都明白,因为这个操作它不是原子性的,导致数据已经计算完毕,但是即将写回去的时候,线程时间片到了,这个时候不得不让别人来进行操作,此时就会有错误发生了。

这里我之前写的时候遇到两个比较严重的问题(上面的代码已经解决了):第一个问题就是我一直观察不到总数发生减少的原因,就算加到100个线程还是不行,后来无奈给转账加上了线程睡眠才解决。第二个问题很严重,就是不论你加不加锁,最后的数字都是100.000000000001这样的,我一开始以为是锁机制出了问题,分析了下没问题;后来想到是不是因为我是多核的,但是分析了下这个跟多核也没关系呀;最后没辙了,去网站上下了源码,发现它运行就没问题,最后发现了是double的精度问题,我输出的时候没指定好,导致输出了高精度的值,就出现了问题。PS:本身float和double就不能用在货币上。

通过操作系统中的知识我们知道,只要通过锁机制就可以完美解决这个问题。Java天然提供了一个叫synchronized关键字来达到这个目的,并且还有一个叫ReentrantLock的类来实现这个目的,由于关键字比较抽象,所以先从ReentrantLock开始理解。

ReentrantLock类

此类属于java.util.concurrent.locks,它内部的结构就是通过上锁、解锁的相关操作来完成的,所以很好理解。这里唯一需要注意的就是,这个类的锁是可重入的,也就是会有一个计数器来记录,也就是只要线程拿到了锁,那么如果它还想继续访问上锁的资源,是可以继续访问的,此时会让计数器加一,当释放的时候计数器减一,直到最后释放这把锁。修改之后的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void transfer(int from, int to, double amount) throws InterruptedException {
lock.lock();
try{
System.out.println(Thread.currentThread());
accounts[from] -= amount;
Thread.sleep(200);
accounts[to] += amount;
System.out.printf("%10.2f金额从%d到%d\n", amount, from, to);
System.out.printf("整个银行剩余的金额 %10.2f\n", getTotalBalance());
}finally {
lock.unlock();
}

}

之所以要try和finally是为了确保锁能够释放。运行一下,就没有问题了。

但是!!如果当一个线程拿到了锁,并且进到了try代码块里,发现钱不够,那么就应该等待别人往这个账户里放入足够的钱,然后再进行操作。但是现在这个线程已经拿到了锁,别人是进不来的,这个时候当然还需要一个叫条件变量(conditional variable)的东西来协调。可以通过锁对象直接获得它,然后可以调用它的await()方法来释放锁并进入阻塞状态,直到有别的线程通过signalAll()方法唤醒它。由于完全有可能有很多线程都在等待这个条件变量,所以唤醒后他们仍然会等待调度,一旦有个线程获得了锁,它就会从await()之后继续运行,所以代码里一定要写while,如果写了if,那么就只会判定一次,但是就算存了钱,仍然有可能导致账户里的钱还是不够。修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void transfer(int from, int to, double amount) throws InterruptedException {
lock.lock();
try {
while (accounts[from] < amount) {
enoughMoney.await();
}
System.out.println(Thread.currentThread());
accounts[from] -= amount;
Thread.sleep(200);
accounts[to] += amount;
System.out.printf("%10.2f金额从%d到%d\n", amount, from, to);
System.out.printf("整个银行剩余的金额 %10.2f\n", getTotalBalance());
enoughMoney.signalAll();
} finally {
lock.unlock();
}

}

在取完钱并且还没释放锁的时候就通知了所有等待这一条件的线程,这么做并不会有问题,首先这个线程是有很大概率在现有时间片条件下完成解锁的动作的,就算真的非常非常不巧,立马时间片就给了其他线程,由于锁的存在,也是不会有事的。

synchronized关键字

这个关键字从Java1.0开始就已经内嵌于Java这门语言中,相反上面的ReentrantLock反而是Java5.0加入的。要理解这个关键字,需要知道其实每个对象内部都有一个内部所,然后如果你为一个方法声明了synchronized关键字,其实就相当于在进入方法的时候为这个对象上了锁,然后方法执行完了会释放这把锁。

而所有的对象又是都有wait、notifyAll这些方法,这些方法和上面的await、signalAll是一样的,主要是Object类中把这些方法都声明为了final,所以不得已只能重新取名字了。