1 投币寄物柜的使用方法

介绍公钥密码之前,先说说投币寄物柜:将物品放入寄物柜中,然后投入硬币并拔出钥匙,就可以将寄物柜关闭了。关闭后的寄物柜,没有钥匙是无法打开的。
只要有硬币,任何人都可以关闭寄物柜,但寄物柜一旦被关闭,只有使用钥匙才能打开,而不是硬币。

因此我们可以说,硬币是关闭寄物柜的密钥,而钥匙则是打开寄物柜的密钥。

2 本章概要

在对称密码中,由于加密和解密的密钥是相同的,因此必须向接收者配送密钥。用于解密的密钥必须被配送给接收者,这一问题称为密钥配送问题。如果使用
公钥密码,则无需向接收者配送用于解密的密钥,这样就解决了密钥配送问题。

本章先探讨一下密钥配送问题,然后再讲解公钥密码是如何解决密钥配送问题的。最后,将介绍一种最常用的公钥密码——RSA。

3 密钥配送问题

3.1 什么是密钥配送问题

在现实世界中使用对称密码时,我们一定会遇到密钥配送问题。由于密码算法本来就应该是以公开为前提的,隐蔽式安全性(security by obscurity)是
非常危险的。

密钥必须要发送,但又不能发送,这就是对称密码的密钥配送问题,解决密钥配送问题的方法有以下几种:

  1. 通过事先共享密钥来解决
  2. 通过密钥分配中心来解决
  3. 通过 Diffie-Hellman 密钥交换来解决
  4. 通过公钥密码来解决

3.2 通过事先共享密钥来解决

事先用安全的方式将密钥交给对方,这称为密钥的事先共享。但是有一定的局限性,如果是网上认识的抑或需要邮寄的,都有可能被别人窃取。以及,如果一个
公司的 1000 名员工需要彼此进行加密通信,则需要 1000 * 999 / 2 = 499500 个密钥!

3.3 通过秘钥分配中心来解决

公司存在一台专门负责秘钥分配的计算机,它保存了所有员工的秘钥,当有新员工入职时,秘钥分配中心会为该员工生成一个新的秘钥,并保存。而新员工则会
从入职时从秘钥分配中心的计算机上领取自己的秘钥。
这样一来,秘钥分配中心就拥有所有员工的秘钥,而每个员工则拥有自己的秘钥。

那么 Alice 再向 Bob 发送加密邮件时,就需要进行以下步骤:

  1. Alice 向秘钥分配中心发出希望与 Bob 进行通信的请求
  2. 秘钥分配中心通过伪随机数生成器生成一个会话秘钥,这个秘钥是供 Alice 与 Bob 在本次通信中使用的临时秘钥
  3. 秘钥分配中心从数据库中取出 Alice 的秘钥和 Bob 的秘钥
  4. 秘钥分配中心用 Alice 的秘钥对会话秘钥进行加密,并发送给 Alice
  5. 秘钥分配中心用 Bob 的秘钥对会话秘钥进行加密,并发送给 Bob
  6. Alice 对来自秘钥分配中心的会话秘钥(已使用 Alice 的秘钥加密)进行解密,得到会话秘钥
  7. Alice 用会话秘钥对邮件进行加密,并将邮件发送给 Bob
  8. Bob 对来自秘钥分配中心的会话秘钥(已使用 Bob 的秘钥加密)进行解密,得到会话秘钥
  9. Bob 用会话秘钥对来自 Alice 的密文进行解密
  10. Alice 和 Bob 删除会话秘钥

以上就是通过秘钥分配中心完成 Alice 与 Bob 的通信过程,缺点显而易见:

  1. 随着员工增加,秘钥分配中心负荷增大
  2. 如果秘钥分配中心计算机发生故障,全公司的加密通信就会瘫痪
  3. 攻击者直接对秘钥分配中心下手,盗取秘钥数据库,后果十分严重

3.4 通过 Diffie-Hellman 秘钥交换来解决秘钥配送问题

解决秘钥配送问题的第三种方法,称为 Diffie-Hellman 秘钥交换。这里的交换,指的是发送者和接收者之间相互传递信息的意思。
根据交换的信息, Alice 和 Bob 可以生成相同的秘钥,而窃听者就算得到交换的信息,也无法生成相同的秘钥,将在第十一章详解。

3.5 通过公钥密码来解决秘钥配送问题

第四种方法,就是公钥密码。

在公钥密码中,加密秘钥和解密秘钥不同,只要拥有加密秘钥,任何人都可以进行加密,但没有解密秘钥是无法解密的。因此,公钥密码的重要特性是,只有拥有
解密秘钥的人才能够进行解密。

接受者事先将加密秘钥发送给发送者,这个加密秘钥即使被窃听获取也没问题。发送者使用加密秘钥对通信内容进行加密并发送给接收者,而只要拥有解密秘钥
的人(即发送者本人)才能够解密。这样,就不用讲解密秘钥配送给接收者了,也就是说,对称密码的秘钥配送问题,可以通过使用公钥密码来结局。

4 公钥密码

4.1 什么是公钥密码

公钥密码(public-key cryptography)中,秘钥分为加密秘钥和解密秘钥两种。加密秘钥是发送者加密时使用的,而解密秘钥则是接收者解密时使用的。
加密秘钥和解密秘钥的区别:

  1. 发送者只需要加密秘钥
  2. 接收者只需要解密秘钥
  3. 解密秘钥不可以被窃听者获取
  4. 加密秘钥被窃听者获取也没问题

也就是说,解密秘钥从一开始就是由接收者自己保管的,因此只要将加密秘钥发送给发送者就可以解决秘钥配送问题了,根本不需要配送解密秘钥。

公钥和私钥是一一对应的,一对公钥和私钥统称为密钥对(key pair)。由公钥进行加密的密文,必须使用与该公钥配对的私钥才能够解密。

4.2 公钥密码的历史

  1. 1976年发表了关于公钥密码的设计思想,即将加密秘钥和解密秘钥分开。
  2. 1977年设计了一种具体的公钥密码算法,但后来被发现并不安全。
  3. 1978年发表了一种公钥密码算法——RSA,可以说是现在公钥密码的事实标准。

4.3 公钥通信的流程

Alice 是发送者, Bob 是接收者, Eve 是窃听者。在公钥密码通信中,通信过程是由接收者 Bob 来启动的:

  1. Bob 生成一个包含公钥和私钥的密钥对。私钥由 Bob 自行妥善保管。
  2. Bob 将自己的公钥发送给 Alice 。公钥被 Eve 获得。
  3. Alice 用 Bob 的公钥对消息进行加密。
  4. Alice 将密文发送给 Bob 。密文被 Eve 获得。
  5. Bob 用自己的私钥对密文进行解密。

4.4 公钥密码无法解决的问题

我们需要判断所得到的公钥是否正确合法,这个问题被称为公钥认证问题。这个问题随后将通过对中间人攻击的讲解来探讨。
同时,公钥密码的处理速度只有对称密码的几百分之一,随后在下节详解。

5 时钟算法

在讲解公钥密码的代表 RSA 之前,我们需要做一些数学方面的准备工作。

5.1 加法


指针从 11 在转就回变成 0 。即,如果当前是 5 点, 11 个小时后为:x = 5 + 11%12 = 5 - 1 = 4 。

因此加法就变成了求余数运算,即 mod 运算。

5.2 减法

减法是加法的逆运算:如果当前是 5 点, 11 个小时之前是:x + 11%12 = 5 => x = 6。

5.3 乘法

乘法即多个加法:如果当前是 5 点, 乘以 3: 5 + 5%12 + 5%12 = -9 => -9 + 12 = 3。

5.4 除法

除法是乘法的逆运算:但是,由于时钟只能是整数,因此并不是所有的数都能当被除数,要保证被除后是一个整数。

5.5 乘方

7^4:7^4 mod 12 = 2401 mod 12 = 1

5.6 对数

即乘方的逆运算:7^X = Y ,已知 Y 求 X。
在时钟运算中的对数称为离散对数。例如:
7^X mod 12 = 8,

得到结果为 9 。当数字很大时,求离散对数非常困难,也非常耗时。能快速求出离散对数的算法到现在还没有被发现。

5.7 从时钟指针到 RSA

我们知道了 7^4 mod 12 代表的含义,那么就为理解 RSA 做好准备了,因为 RSA 的加密和解密过程中所进行的正是这样的运算。

6 RSA

6.1 什么是 RSA

RSA 是一种公钥密码算法,它的名字是由它的三位开发者,即 Ron Rivest、 Adi Shamir 和 Leonard Adleman 的姓氏首字母组成的。
RSA 可以被用于公钥密码和数字签名,数字签名将在第九章详解。
RSA 在 1983年取得了专利,但现在专利已经过期。

6.2 RSA 加密

在 RSA 中,明文、秘钥和密文都是数字, RSA 的加密过程可以用下列公式来表示:

密文 = 明文 ^ E mod N

也就是说, RSA 的密文是对代表明文的数字的 E 次方求 mod N 结果。换句话说,就是将明文和自己做 E 次乘法,然后将其结果除以 N 求余数,这个余数就是密文。
就是这么简单。
其中 E 和 N 时 RSA 加密的秘钥,也就是说, E 和 N 的组合就是公钥。注意:E 和 N 并不是密钥对,“公钥是(E,N)”这种写法。
现在大家应该已经知道, RSA 的加密就是“求 E 次方的 mod N”。

6.3 RSA 解密

明文 = 密文 ^ D mod N

也就是说,对表示密文的数字的 D 次方求 mod N 就可以得到明文。

RSA 的加密和解密整理如下:

6.4 生成密钥对

由于 E 和 N 是公钥,D 和 N 是私钥,因此求 E、D 和 N 这三个数就是生成秘钥对。步骤如下:

  1. 求 N
  2. 求 L(中间值)
  3. 求 E
  4. 求 D

注:由于博主并不深究生成过程,只需要知道使用到了最大公约数以及质数的特性即可,有需要深究具体生成的请自行 Google。

7 对 RSA 的攻击

RSA 的加密是求 “E 次方的 mod N”,解密时求 “D 次方的 mod N”,原理非常简单。
破译者知道的信息:密文、E、N。
破译者不知道的信息:明文、D、一些密码算法所使用的中间变量。

7.1 通过密文来求得明文

密文 = 明文 ^ E mod N
如果没有 mod N运算,只有 密文 = 明文 ^ E ,就很简单,即求对数的问题。但是如果加上 mod N运算,就变成求离散对数的问题,这是非常困难,
因为人类还没有发现求离散对数的高效算法。

7.2 通过暴力破解来找出 D

由于暴力破解的难度会随 D 的长度增加而变大,因此 D 到达 1024 比特以上,暴力破解就很难在现实的时间内通过暴力破解找出数 D。

7.3 通过 E 和 N 求出 D

既然 D 本身是通过 E 和 N 求出,因此破译者也可以尝试,但是由于涉及到质数分解的问题,这样的方法目前还没有出现,而且我们也不知道是否真的存在这个方法。

7.4 中间人攻击

下面介绍一种名为中间人攻击(man-in-the-middle attack)的攻击方法。这种方法虽然不能破译 RSA,但却是一种针对机密性的有效攻击。

假设发送者 Alice 准备向接收者 Bob 发送一封邮件,为了解决密钥配送问题,他们使用了公钥密码。

  1. Alice 向 Bob 发送邮件索要公钥
  2. Mallory 通过窃听发现 Alice 在向 Bob 索要公钥。
  3. Bob 看到 Alice 的邮件,并将自己的公钥发送给 Alice。
  4. Mallory 拦截 Bob 的邮件,使其无法发送给 Alice 。然后,他悄悄地将 Bob 的公钥保存起来。
  5. Mallory 拦伪装成 Bob,将自己的公钥发送给 Alice。
  6. Alice 将自己的消息用 Bob 的公钥(其实是 Mallory 的公钥)进行加密并发送给 Bob。
  7. Mallory 拦截 Alice 的加密邮件。用 Mallory 的私钥解密,得到明文。
  8. Mallory 伪装成 Alice 给 Bob 写一封假邮件并用保存起来的 Bob 的公钥加密发送给 Bob。

Mallory 不仅可以篡改 Alice 消息,还可以篡改 Bob 的消息。中间人攻击进不仅针对 RSA,而是针对在座的各位公钥密码!

我们用公钥密码解决了密钥配送问题,但是又出现新的问题,如何判断收到的公钥是否来自于正确的接收者,即是否来自于 Bob 。解决这个问题成为认证,
将在第十章详解。

8 其它公钥密码

RSA 是现在最为普及的一种公钥密码算法,但除了 RSA 之外,还有其它的公钥密码。下面介绍一下 EIGamal 方式、Rabin 方式以及椭圆曲线密码。
这些密码都可以被用于一般的加密和数字签名。

8.1 EIGamal 方式

RSA 利用了质因数分解的困难度,而 EIGamal 方式利用了 mod N 下求离散对数的困难度。它的缺点是,密文是明文的两倍。

8.2 Rabin 方式

Rabin 利用了 mod N 求平方根的困难度。

8.3 椭圆曲线密码

椭圆曲线密码(Elliptic Curve Cryptosystems,ECC)是最近备受关注的一种公钥密码算法。它的特点是所需的密钥长度比 RSA 短。
它通过将椭圆曲线上特定点进行特殊的乘法运算来实现的,利用了这种乘法运算的逆运算非常困难这一特性。

9 关于公钥密码的问答

主要选择一些容易被误解的点解答疑问。

9.1 公钥密码的机密性

公钥密码比对称密码的机密性更高吗?
这个问题无法回答,因为机密性的高低是根据密钥长度而变化的。

9.2 公钥密码与对称密码的密钥长度

采用 1024 比特的密钥的公钥密码,和采用 128 比特的密钥的对称密码中,是密钥更长的公钥密码更安全吗?
不是。公钥密码的密钥长度不能与对称密码的密钥长度进行直接比较,如下是一张密钥长度的比较表(摘自《应用密码学》),看出, 1024 比特的公钥密码与
128 比特的对称密码相反,反而是 128 比特的对称密码抵御暴力破解的能力更强。

9.3 对称密码的未来

因为已经有了公钥密码,今后对称密码会消失吗?
不会。一般来说,在采用具备同等机密性的密钥长度的情况下,公钥密码的处理速度只有对称密码的几百分之一。因此,公钥密码不适合用来对很长的消息内容进行加密。
根据目的的不同,可能会配合使用对称密码和公钥密码,将在第六章介绍的混合密码系统详解。

9.4 RSA 和质数

随着越来越多的人在不断地生成 RSA 的密钥对,质数会不会被用光?
512 比特能够容纳的质数的数量大概是10^150。假设世界上有 100 亿人,每人每秒生成 100 亿个密钥对,经过 100 亿年后:
100亿人100亿个31622400秒*100亿年 < 10^39。
另外,理论上质数组合偶然撞车的可能性,事实上也可以认为是没有的。

9.5 RSA 与质因数分解

RSA 的破译与大整数的质因数分解是等价的吗?
不清楚是否是等价的。但是,只要能够快速完成质因数分解,就能够破译 RSA。

9.6 RSA 的长度

要抵御质因数分解,N 的长度需要达到多少比特呢?
N 无论多长,总有一天能够被质因数分解。在 1999年 521比特的证书由 292台计算机话费 5.2 个月完成了质因数分解。

10 本章小结

本章学习了公钥密码以及其代表性的实现方法——RSA。
使用公钥密码能够解决秘钥配送问题。公钥密码是密码学界的一项革命性的发明!
对称密码通过将明文转换为复杂的形式来保证其机密性,相对的,公钥密码则是局域数学上困难的问题来保证机密性的。例如 RSA 就利用了大整数的质因数分解
问题的难度。因此,对称密码和公钥密码源于两个根本不同的思路。

尽管公钥密码解决了秘钥配送问题,但针对公钥密码能够进行中间人攻击。要防御这种攻击,就需要回答“这个公钥是否属于合法的通信对象”这一问题,这个问题
将在第九章和第十章详解。

即使已经有了公钥密码,对称密码也不会消失。公钥密码的运行速度远远低于对称密码,因此在一般的通信过程中,往往会配合使用这两种密码,即用对称密码
提高处理速度,用公钥密码解决秘钥配送问题,这种方式称为混合密码系统,将在下一章详解。

11 小测验

  1. 秘钥分配中心的处理:当 Alice 发出希望与 Bob 进行通信的请求时,秘钥分配中心会生成一个全新的会话秘钥,并将其加密后发送给 Alice 。
    为什么秘钥分配中心不直接将 Bob 的秘钥用 Alice 的秘钥加密之后发送给 Alice 呢?
  2. 要对用公钥密码加密的密文进行解密,需要公钥密码的私钥。
  3. 公钥密码的私钥需要和加密后的消息一起被发送给接收者。
  4. 一般来说,对称密码的速度比公钥密码要快。

看完这部电影,不得不说,最后被男主拿出的戒指感动到了~戒指没有不见,它一直在那里。
回想女主第一次拒绝结婚的理由让人熟悉而又无奈,这不是我们每个男生都害怕听到的理由吗?

我还没有准备好

幸好,最后结局有情人终成眷属了。回想起剧情,就像溪水,可能会流的慢,但是总会流到那里。

最后女主结婚的理由才像她自己:

我害怕世人眼光,但更害怕没有你的日子~

啊~我需要买一个桌子,带音响的那种。


男主默默的制作家具,给人一种非常沉稳优雅的气质,不得不说,这个职业非常非常适合这个角色,在捷克依然孤独的与家居为伴,
这不是真是我们吗?终日与代码为伴,但是我们也要沉稳、也要优雅

优雅的代码千篇一律,给人赏心悦目;肮脏代码各有不同,总会让你感叹,代码还能这么脏?!


code also can be elegant。

1 炒鸡蛋与对称密码

鸡蛋炒好之后就完全分不清原来的蛋黄和蛋白了,使用对称密码进行加密,和炒鸡蛋有着异曲同工之妙。炒鸡蛋搅拌的是鸡蛋,而密文打乱的则是比特序列。
然后,它们最大的不同是,炒鸡蛋无法还原成原来的鸡蛋,但密文却必须能够让接收者正确解密才行。
因此,如果只是随意地搅拌和混合,则不能称之为加密,而必须仔细设计出一种能够还原的混合方式。

2 本章概要

学习比特序列运算和 XOR 运算。以及介绍一种称为一次性密码本的密码系统。
具体介绍几种对称密码算法,包括 DES、三重DES、AES以及其它一些密码算法。
需要注意的是,密码算法有时候会设计开发者的专利和授权等问题,记得先调查一下该算法的专利和授权信息。

3 从文字密码到比特序列密码

3.1 编码

计算机操作对象并不是文字,而是由 0 和 1 排列而成的比特序列。执行加密操作的程序,就是将表示明文的比特序列转换为表示密文的比特序列。

3.2 XOR

XOR 的全称是 exclusive or ,中文叫作异或

1. 一个比特的 XOR

0 XOR 0 = 0
0 XOR 1 = 1
1 XOR 0 = 1
1 XOR 1 = 0
可以理解为硬币翻转:
不翻转 + 不翻转 = 不翻转
不翻转 + 翻转 = 翻转
翻转 + 不翻转 = 翻转
翻转 + 翻转 = 不翻转

2. 比特序列的 XOR

A XOR B = AB
AB XOR B = A
这是因为两个相同的数进行 XOR 运算的结果一定为 0 ,因此 A XOR B XOR B = A。这不就和加密、界面的步骤非常相似么,如下:

  • 将明文 A 用密钥 B 进行加密,得到密文 AB。
  • 将密文 AB 用密钥 B 进行解密,得到明文 A。

实际上,只要选择一个合适的 B,仅仅使用 XOR 就可以实现一个高强度的密码。
对同一个比特序列进行两次 XOR 之后就会回到最初的状态。我们不妨来看看一副由很多点组成的图像。如果将白色的点作为 0 ,黑色的点作为 1 ,
那么一副黑白图像就可以表示为 0 和 1 的比特序列。我们转呗两幅图像,一幅画是英文字母 D,另一幅是用 0 和 1 交替排列形成的图像(蒙版),
并进行如下操作:

如果所使用的蒙版是完全随机的比特序列,则使用 XOR 就可以将原来的图像掩盖起来。但如果蒙版中的比特序列是可以被推测出来的,那么实质上图像就
没有被真正的掩盖。对于密码技术来说,“是否可以预测”是非常重要的一点。能够产生不可预测的比特序列,对于密码技术的贡献是巨大的。这种不可预测的比特
序列就称为随机数。将在第十二章详解。

4 一次性密码本——绝对不会被破译的密码

4.1 什么是一次性密码本

只要通过暴力破解对密钥空间进行遍历,无论任何密文总有一天都能够被破译。然后,本节中将要介绍的一次性密码本(one-time pad)却是一个例外。
即便用暴力破解法遍历整个密钥空间,一次性密码本也绝对无法被破译。

4.2 一次性密码本的加密

它的原理是“将明文与一串随机的比特序列进行 XOR 运算”。如果将硬币的正面设为 0 ,反面设为 1 ,则通过不断掷硬币就能够产生这样一串随机的比特序列。
下面我们将明文 midnight 这个字符串通过 ASCII 进行编码并产生一串比特序列。

接着,我们掷 64 次硬币产生 64 比特的随机比特序列:

下面我们将明文与密钥的比特序列进行 XOR 运算,并得到一串新的比特序列,这次运算的结构也就是一次性密码本的密文。

这样产生的比特序列如果硬要显示在计算上,那么显示结果看上去就像是乱码一样(其实是加密),因此密文通常不会被还原为字符,而是被作为二进制数据来处理。

4.3 一次性密码本的解密

用密文和密钥进行 XOR 运算,就可以得到明文:

这样显示在计算上就会是正常的文本: midnight。

4.4 一次性密码本是无法破译的

为什么一次性密码本是绝对无法破译的呢?我们假设对一次性密码本的密文尝试进行暴力破解,那么总有一天我们会尝试到和加密时相同的密钥,但是,
即便我们能够解密出 midnight 这个字符串,我们也无法判断它是否是正确的明文

因为在解密过程中,所有的 64 比特的排列组合都会出现,包括像 aaaaaaaa、bbbbbbbb、ZZZZZZZZ 这样的规则字符串,也会包含 midnight、onenight、
mistress 等英文单词,还会包含乱码看不懂的组合。由于明文中又有的可能排列组合都会出现,因此我们无法判断其中哪一个才是正确的明文(也就是用哪个
密钥才能够正确解密)。

所谓暴力破解,就是按顺序将所有的密钥都尝试一遍,并判断所得到的是不是正确的明文的方法。然而,在一次性密码本中,由于我们无法判断得到的是不是正确的明文,
因此一次性密码本是无法破译的。

一次性密码本无法破译的这一特性是由香农于 1949 年通过数学方法加以证明的。一次性密码本是无条件安全(unconditionally secure)的,
在理论上是无法破译的(theoretically unbreakable)

4.5 一次性密码本为什么没有被使用

1. 秘钥的配送

最大的问题在于秘钥的配送。
接受者 Bob 收到了 Alice 发来的密文。 Bob 要想进行解密,就必须使用和 Alice 进行加密时相同的秘钥,因此 Alice 必须将秘钥也发送给 Bob ,
且该秘钥的长度和密文是相等的。但这样就产生了一个矛盾——如果能够有一种方法将秘钥安全地发送出去,那么岂不是也可以用同样的方法来安全地发送明文了吗?

2. 秘钥的保存

既然能保存和明文一样长度的秘钥,那么不也就有办法安全保存明文本身了吗?也就是说,从一开始我们根本就不需要密码。也就是说,我们只是将“保护明文”
这一命题替换成了“保护和明文一样的秘钥”而已,问题并没有得到实质性的解决。

3. 秘钥的重用

此外,在一次性密码本中是绝对不能重用过去用过的随机比特序列的,一次性密码本中的“一次性”也正是由此而来。这是因为作为秘钥的比特序列一旦泄露,
过去所有的机密通信内容将全部被解密(假设窃听者 Eve 保存了过去所有的通信内容)。

4. 秘钥的同步

如果明文是一个大小为 100MB 的文件,则秘钥的大小也一定是 100MB 。而且在通信过程中,发送者和接受者的秘钥的比特序列不允许有任何错位,否则错位
后的比特的所有信息都将无法解密。

5. 秘钥的生成

在一次性密码本中,需要生成大量的随机数。这里的随机数必须是无重现性的真正随机数。因此,能够使用一次性密码本的,只有那些机密性重过一切,且可以
话费大量财力和人力来生成并配送秘钥的场合。

综上所述,一次性密码本是一种几乎没有实用性的密码。但是,一次性密码本的思路却孕育出了流密码(stream cipher)。流密码使用的不是真正的随机比特序列,
而是伪随机数生成器产生的比特序列。流密码虽然不是无法破译的,但只要使用高性能的伪随机数生成器,就能够构建出强度较高的密码系统。

5 DES

5.1 什么是 DES

DES(Data Encryption Standard)是 1977 年美国联邦信息处理标准(FIPS)中所采用的一种对称密码。由于其在 1999 年只用了 22 小时就可破译,
因此除了用它来解密以前的密文以外,现在我们不应该再使用 DES 了。

5.2 加密和解密

DES 是一种将 64 比特的明文加密成 64 比特的密文的对称密码算法,它的密钥长度是 56 比特。尽管从规格上来说,DES 的密钥长度是 64 比特,但由于
每隔 7 比特 会设置一个用于错误检查的比特,因此实质上其密钥长度是 56 比特。

DES 是以 64 比特的明文(比特序列)为一个单位来进行加密的,这个 64 比特的单位称为分组。一般来说,以分组为单位进行处理的算法密码称为分组密码
(block cipher),DES 就是分组密码的一种。

DES 每次只能加密 64 比特的数据,如果要加密的明文比较长,就需要对 DES 加密进行迭代(反复),而迭代的具体方式就称为模式(mode),关于模式会在第四章详解。

5.3 DES 的结构(Feistel网络)

在 Feistel 网络中,加密的各个步骤称为轮(round),整个加密过程就是进行若干次轮的循环。如下是 Feistel 网络中一轮的计算流程。而 DES 是
一种 16 轮循环的 Feistel 网络。

1. Feistel 网络的加密

输入的数据被分为左右两半分别进行处理,中间的“子密钥”指的是本轮加密所使用的密钥。在 Feistel 网络中,每一轮都需要使用一个不同的子密钥。
由于子密钥只在一轮中使用,它只是一个局部密钥,因此才成为子密钥
轮函数的作用是根据“右侧”和子密钥一起生成中间密钥,这个中间密钥对“左侧”进行加密,轮函数是该密码系统的核心。一轮的步骤总结如下:

  1. 将输入的数据等分位左右两部分
  2. 将输入的右侧直接发送到输入的右侧
  3. 将输入的右侧发送到轮函数
  4. 轮函数根据右侧数据和子密钥,计算出一串看上去是随机的比特序列
  5. 将上一步得到的比特序列与左侧数据进行 XOR 运算,并将结果作为加密后的左侧

这样一来,“右侧”根本就没有被加密,因此我们需要用不同的子密钥对一轮的处理重复若干次,并在每两轮处理之间将左侧和右侧的数据对调

2. Feistel 网路的解密

我们尝试一次下将一轮加密的输出结果用相同的子密钥重新运行一次,结果可能非常令人意外,无论轮函数的具体算法是什么,通过上述操作都能够将密文
正确地还原为明文。关于这一点,可以从 XOR 的性质(两个相同的数进行 XOR 的结果一定为 0 )进行思考。

有多个轮的情况下也是一样的。也就是说, Feistel 网络的解密操作只要按照相反的熟悉怒来使用子密钥就可以完成了。下图是三轮加密图:

下图是解密示意图:

3. Feistel 网络的性质

Feistel 网络的轮数可以任意增加。无论运行多少轮的加密计算,都不会发生无法解密的情况。以及加密时无论使用任何函数作为轮函数都可以正确解密。
最后是加密和解密可以用完全相同的结构来实现。在 Feistel 网络的一轮中,右半部分实际上没有任何处理,这在加密算法中看起来是一种浪费,
但却保证了可解密性。

6 三重 DES

现在 DES 已经可以在限时的时间内被暴力破解,因此我们需要一种用来替代 DES 的分组密码,三重 DES 就是出于这个目的被开发出来的。

6.1 什么是三重 DES

三重 DES(triple-DES)是为了增加 DES 的强度,将 DES 重复 3 次所得到的一种密码算法。

6.2 三重 DES 的加密


经过三次 DES 处理CIA能变成最后的密文,由于 DES 秘钥的长度实质上是 56 比特,因此三重 DES 的秘钥长度就是 56 * 3 = 168 比特。
从上图中,我们发现,三重 DES 并不是进行三次 DES 加密,而是加密-解密-加密的过程,实际上这是 IBM 公司设计出来的,目的是为了让三重 DES
能够兼容普通的 DES。当三重 DES 所有秘钥都相同的时候就相当于普通的 DES 了。因此,以前用 DES 加密的密文,就可以通过这种方式用三重 DES
来进行解密。也就是说三重 DES 对 DES 具备向下兼容性。

6.3 三重 DES 的解密

解密过程与加密过程顺序相反。

6.4 三重 DES 的现状

其处理速度不高,而且在安全性方面也逐渐显现了一些问题,但还被银行等机构使用。

7 AES 的选定过程

7.1 什么是 AES

AES(Advanced Encryption Standard)是取代其前任标准(DES)而成为新标准的一种对称密码。全世界的企业和密码学家提交了多个对称密码算法
作为 AES 的候选,最终在 2000 年选出了一种名为 Rijndael 的对称密码算法,并将其确定为了 AES。

7.2 AES 的选定过程

组织 AES 公开竞选活动的,是美国的一个标准化机构——NIST,参加 AES 竞选的条件是:被选为 AES 的密码算法必须无条件地免费供全世界使用。
此外,参与者还必须提交密码算法的详细规格书、以 ANSIC 和 Java 编写的实现代码以及抗密码破译强度的评估等材料。因此,在提交了详细设计和程序代码
完全公开的情况下,就杜绝了隐蔽式安全性(security by obscurity)。

评审者也是参与者,通过发现别人的弱点,实现竞争,最终实现标准化,正是密码算法选定的正确方式。只有由世界最高水平的密码学家共同尝试破译,依然未能
找到弱点,这样才能够证明一种密码算法的强度。

7.3 AES 最终候选算法的确定与 AES 的最终确定

8 Rijndael

8.1 什么是 Rijndael

由比利时密码学家 Joan Daemon 和 Vincent Rijmen 设计的分组密码算法。

8.2 Rijndael 的加密和解密

和 DES 一样, Rijndael 算法也是由多个轮所构成,但是 Rijndael 使用了 SPN 结构,Rijndael 的输入分组为 128 比特,也就是 16字节。
首先,需要逐个字节地对 16 字节地输入数据进行 SubBytes 处理,即将一个 1 字节地值替换成另一个 1 字节地值,可以想象成简单替换密码。
SubBytes 之后进行 ShiftRows处理,即将 SubBytes 的输出以字节单位进行打乱处理,这种打乱是有规律的。然后进行 MixColumns 处理,即对
一个 4 字节地值进行比特运算,将其变成另一个 4 字节值,最后与轮秘钥进行 XOR ,进行 AddRoundkey处理。这就是一轮。
实际上,在 Rijndael 需要重复进行 10 ~ 14 次计算。
![][14]
它在一轮使用了 SubBytes、ShiftRows、MixColumns 分别存在反向运算 InvSubBytes、InvShiftRows、InvMixColumns,分别按字节、行、列进行并行计算。

8.3 Rijndael 的破译

对 Rijndael 来说,可能会出现以前并不存在的新的攻击方法。它的算法背后有着严谨的数学结构,也就是说从明文到密文的计算过程可以全部用公司来表达。

这只是一种假设而已,到现在为止,还没有出现针对 Rijndael 的有效攻击。

8.4 应该使用哪种对称密码呢

介绍了 DES、三重 DES、和 AES 等对称密码。那么我们到底用哪种呢?
一般来说不应该使用任何任何自制的密码算法,而是应该使用 AES。

9 本章小结

本章介绍了对称密码,以及 DES、三重 DES、AES密码算法。

使用一种秘钥空间巨大,且在算法上没有弱点的对称密码,就可以通过密文来确保明文的机密性。巨大的秘钥空间能够抵御暴力破解,算法上没有弱点可以抵御其他
类型的攻击。

然后用对称密码进行通信时,还会出现秘钥的配送问题。为了解决秘钥配送问题,我们需要公钥密码技术,将在第五章详解。
本章所介绍的几乎所有的密码算法,都只能讲一个固定长度的分组进行加密。当需要加密的明文长度超过分组长度时,就需要对密码算法进行迭代。下一章将
探讨对分组密码进行迭代的方法。

10 小测验

  1. 一次性密码本的秘钥可以进行压缩变短。
  2. 对称密码中,加密的秘钥和解密的秘钥是相等的。
  3. 如果秘钥长度为 56 比特,那么用暴力破解找到正确秘钥需要平均尝试约 2^28 次。
  4. 三重 DES 的秘钥空间是 DES 秘钥空间的三倍大。
  5. 现在 DES 可以在限时的时间内被破译。
  6. AES 标准选定的密码算法叫 Rijndael。

注:秘钥长度为 56 比特,则秘钥空间(秘钥总数)是 2^56,平均时间除以 2 ,为 2^55。
压缩算法在于:重复。

1 本章概要

本章将介绍历史上几种著名的密码:

  1. 凯撒密码
  2. 简单替换密码
  3. Enigma

此外,还介绍两种破译密码的方法:

  1. 暴力攻击
  2. 频率分析

最后,我们还将思考密码算法与密钥之间的关系。
本章所介绍的密码在现在都已经不再适用了,但在寻找密码弱点的方法、破译密码的思路以及密码算法与密钥的关系等方面,这些密码与现在密码技术依然是相通的。

2 凯撒密码

2.1 什么是凯撒密码

凯撒密码是通过将明文中所使用的字母表按照一定的字数“平移”来进行加密的。

2.2 凯撒密码的加密

例如,将 yoshiko 这个女性的名字,加密:
y -> B
o -> R
s -> V
h -> K
i -> L
k -> N
o -> R
这样,明文 yoshiko 就被加密为了密文BRVKLNR。凯撒密码中,将字母表中的字母平移这个操作就是密码的算法,而平移的字母数量则相当于密钥。
在本例中,密钥为3:

2.3 凯撒密码的解密

我们只需要反向平移3个字母就可以解密了:
B -> y
R -> o
V -> s
K -> h
L -> i
N -> k
R -> o
这样就得到了明文 yoshiko,但是密钥 3 必须由发送者和接受者事先约定好。

2.4 用暴力破解来破译密码

通过上面的讲解,我们知道对于发送者用凯撒密码加密过的密文,接受者是能够进行解密的,但是接受者以为的人在看到密文 BRVKLNR ,是否能够进行破译
得到明文呢?在凯撒密码中,密钥就是字母表平移的字数。因此可以按顺序将这 26 种密钥都尝试一遍:
BRVKLNR -> 用密钥 0 解密 -> brvklnr
BRVKLNR -> 用密钥 1 解密 -> aqujkmq
BRVKLNR -> 用密钥 2 解密 -> zptijlp
BRVKLNR -> 用密钥 3 解密 -> yoshiko
……
BRVKLNR -> 用密钥 25 解密 -> cswlmos
尝试一遍都,发现当密钥为 3 时,可以解密出有意义的字符串 yoshiko 。这就意味着我们仅仅根据密文就推测除了密钥和明文,这样的密码有什么用呢?
凯撒密码实在是太脆弱了,无法保护重要的密码。

上面介绍的这种密码破译方法,就是讲所有可能的密钥全部尝试一遍,这种方法称为暴力破解(brute-force attack)。由于这种方法的本质是从所有的密钥
中找出正确的密钥,因此又称为穷举搜索(exhausive search)。

3 简单替换密码

3.1 什么是简单替换密码

将明文中所使用的字母表替换为另一套字母表的密码,如下图就是一个简单替换密码的对应表:

3.2 简单替换密码的加密

按照上图的替换表,对明文 yoshiko 进行加密:
y -> K
o -> B
s -> L
h -> T
i -> J
k -> S
o -> B
就得到密文:KBLTJSB。

3.3 简单替换密码的解密

需要根据使用的替换表进行解密。

3.4 简单替换密码的密钥空间

yoshiko 用凯撒密码(密钥为 3 )加密后的密文是 BRVKLNR ,二用简单替换密码(密钥为上图)加密后的密文则是 KBLTJSB。单从密文上来看,我们无法
判断出凯撒密码和简单替换密码到底哪一种更难破解。

凯撒密码可以通过暴力破解来破译,但简单替换密码很难通过暴力破解来破译。这是因为简单替换密码中可以使用的密钥数量,比凯撒密码要多得多。

为了确定这一点,我们计算一下简单替换密码中可以使用的密钥总数。一种密码能够使用的“所有密钥的集合”称为密钥空间(keyspace),所有可用密钥的总数就是密钥空间的
大小。密钥空间越大,暴力破解就越困难。

简单替换密码中,明文字母表中的 a 可以对应A,B,C,…,Z 这 26 个字母中的任意一个,b 可以对应除了 a 所对应的字母以为的剩余 25 个字母中的任意一个。
以此类推,我们可以计算出简单替换密码的密钥总数为:
26 * 25 * 24 * 23 * … * 1 = 403291461126605635584000000
即使美妙能够遍历 10 亿个密钥,也要花费 120 亿年的时间。

3.5 用频率分析来破译密码

频率分析利用了明文中的字母的出现频率与密文中的字母的出现频率一致这一特性。例如:

首先,我们统一一下这段密文中每个字母出现的频率

为了找到破译的线索,我们再来看一下英语文章中所使用的字母的频率,一般的英语文章中出现频率最高的字母是 e 。而上图中出现频率最高的两个字母是 I 和 Y,
我们假设它们中的其中一个是 e 。然后将密文中的 Y 全部替换成 e ,替换后的密文如下:

英语中出现最多的单词是 the ,因此我们可以寻找一下以 e 结尾的 3 个字母的组合,结果我们发现 MEe 这3个字母的组合是最常出现的,而且 MEe 出现在
密文的开头,因此 MEe 很可能就是 the 。于是,我们再假设 M -> t,E -> h。得到如下图的表:

让我们动员自己所有的英文词汇,在上面的文字中继续寻找可能的组合。我们发现中间有一个词 thPee 比较可以,这个词不会就是 three 吧(P -> r)?
除了高频字母以外,密文中的低频字母 Q 也可以找到一些相关的组合。直到得到最后通顺的明文,有以下结论:

  • 除了高频字母以外,低频字母也能够成为线索。
  • 搞清开头和结尾能够称为线索,搞清单词之间的分隔也能够成为线索。
  • 密文越长越容易破译。
  • 同一个字母连续出现能够成为线索(这是因为在简单替换密码中,某个字母在替换表中所对应的另一个字母是固定的)。
  • 破译的速度会越来越快。

4 Enigma

4.1 什么是 Enigma

Enigma 是由德国人于 20 世纪初发明的一种能够进行加密与解密操作的机器。Enigma 这个名字在德语里是“谜”的意思。

4.2 用 Enigma 进行加密通信

Enigma 是一种由键盘、齿轮、电池和灯泡所组成的机器,通过这一台机器就可以完成加密和解密两种操作。

发送者和接收者各自拥有一台 Enigma。发送者用 Enigma 将明文加密,将生成的密文通过无线电发送给接收者。接收者将接收到的密文用自己的 Enigma 解密。

由于发送者和接收者必须使用相同的密钥才能完成密码通信,因此发送者和接收者会事先收到一份叫国防军密码本的册子。发送者和接收者按照册子指示来设置 Enigma。

4.3 Enigma 的构造

Enigma 的构造如下图。它能对字母表中的 26 个字母进行加密和解密操作,这里将字母的数量简化为 4 个:

每当按下 Enigma 上的一个键,就会点亮一个灯泡。而接线板和轮子在每次输入的时候都会变化,这样的组合让 Enigma 看起来像是一个能够动态变化的操作。

4.4 Enigma 的加密


在进行通信之前,发送者金和接收者双方都需要持有国防军密码本,国防军密码本中记载了发送者和接收者需要使用的每日密码。

1.设置 Enigma

发送者查阅国防军密码本,找到当天的每日密码,并按照该密码来设置 Enigma。具体来说,就是接线板和转子排列。

2. 加密通信密码

接下来,发送者需要想出 3 个字母,并将其加密。这 3 个字母称为通信密码。
通信密码的加密也是通过 Enigma 完成的。假设发送者选择的通信密码为 psv,则发送者需要在 Enigma 的键盘上输入两次该通信密码,也就是说需要输入
psvpsv 这 6 个字母。

发送者每输入一个字母,转子就会旋转,同时灯泡亮起,发送者记下亮起的灯泡所对应的字母。输入全部 6 个字母之后,发送者就记下了它们所对应的密文,
假设密文是 ATCDVT(密文用大写字母来表示)。

3. 重新设置 Enigma

接下来,发送者根据通信密码重新设置 Enigma。通信密码中的 3 个字母实际上代表了三个转子的初始位置。每一个转子的上面都印有字母,可以根据字母
来设置转子的初始位置。通信密码 psv 就表示需要将转子 1、2、3 分别转到 p、s、v 所对应的位置。

4. 加密消息

接下来,发送者对消息进行加密。发送者将明文逐字从键盘输入,然后从灯泡中读取所对应的字母并记录下来。例如输入 nacht ,记录下对应的 KXNWP 。

5. 拼接

接下来,发送者将“加密后的通信密码” ATCDVT 与 “加密后的消息” KXNWP 进行拼接,将 ATCDVTKXNWP 作为电文通过无线电发出。

上面就是用 Enigma 进行加密的操作步骤,看来还真是挺麻烦的。

4.5 每日密码与通信密码

每日密码在这里不是用来加密消息的,而是用来加密通信密码的。也就是说,每日密码是一种用来加密密钥的密钥,这种密钥,一般称为密钥加密密钥(Key Encrypting Key,KEY)。

4.6 避免通信错误

在通信密码的加密中,我们需要将通信密码 psv 连续输入两次,即 psvpsv。这是因为接收者可以对通信密码进行校验,避免错误。

4.7 Enigma 的解密

1.分解

接收者将接受到的电文分解为两部分,即开头的 6 个字母 ATCDVT 和剩下的字母 KXNWP。

2. 设置 Enigma

接收者查阅国防军密码本中的每日密码,并按照该密码设置 Enigma,这一步和发送者进行的操作是相同的。

3. 解密通信密码

接下来,接收者将加密后的通信密码 ATCDVT 进行解密。接收者在 Enigma 的键盘上输入 ATCDVT 这 6 个字母,然后将亮起的灯泡对应的字母 psvpsv 记下来。
因为是 psv 重复两次的形式,所以接收者可以判断在通信过程中没有发生错误。

4. 重新设置 Enigma

接下来,接收者根据通信密码 psv 重新设置 Enigma。

5. 解密消息

接下里,接收者将电文中的剩余部分 KXNWP 逐一输入,将灯泡的结果记下来,得到了 nacht。

4.8 Enigma 的弱点

1. 将通信密码连续输入两次并加密

这样密码破译者知道密文开头的 6 个字母被解密后的明文一定是 3 个字母重复两次的形式

2. 通信密码是人为选定的

有可能使用 aaa、bbb 或者生日,女朋友的名字当做密码。密码系统中使用的密钥不能是人为选定的,而应该使用无法预测的随机数来生成。

3. 必须派发国防军密码本

如果落到敌人手里,就会带来大麻烦。如果现在所使用的国防军密码本被敌人得到,哪怕只泄漏了一本,也必须重新制作新的密码本并发到全军。
必须配送密钥这个问题,将在第五章详解。

4.9 Enigma 的破译

即使得到了 Enigma 的构造,但是由于 Enigma 的设计并不依赖于“隐蔽式安全性”,只要不知道 Enigma的设置(密钥),就无法破译密码。但是,
每日密码在一天之内不会变,即一天内截获的所有通信,都是用同一个密码进行加密的;以及通信密码都会重复两次。以 ATCDVT 为例,即知道第一个字母和
第四个字母,都是由相同的明文加密得到的;另外,由于一个轮子只有26个可能。最终经过了许多密码专家的破译,包括现代计算机之父阿兰图灵,最终破译了这个密码。

5 思考

5.1 为什么吗要将密码算法和密钥分开呢

我们来列举一下本章介绍过的密码系统的“密码算法”和“密钥”。

密码系统 密码算法 密钥
凯撒密码 将明文中的各个字母按照指定的字母数平移 平移的字母数量
简单替换密码 按照替换表对字母表进行替换 替换表
Enigma(通信密码的加密) 使用Enigma密码机,通过接线板的接线方式、三个转子的顺序、每个转子的旋转位置对字母进行替换 接线板的接线方式、转子的顺序、转子的旋转位置
Enigma(通信电文的加密) 使用接线板的接线方式和三个转子的顺序固定的Enigma密码机,按照每个转子的旋转位置对字母进行替换 每个转子的旋转位置

仔细研究一下每一对密码算法和密钥的组合就会发现,在密码算法中必然存在可变部分,而这些可变部分就相当于密钥。当密码算法和密钥都确定时,加密的方法也就确定了。
如果每次加密都必须产生一个新的密码算法,那真是太诡异了。对于已经开发出的一种密码算法,我们总是希望能够重复使用。

将密码算法和密钥分开的意义正在于此。密码算法是需要重复使用的,但在重复使用同一种算法的过程中,该算法被破译的可能性也在逐渐增大。因此,
我们就在密码算法中准备了一些可变部分,并在每次通信时都对这部分进行改变,而这一可变部分就是密钥。

将密码算法和密钥分开考虑,就解决了希望重复使用,但重复使用会增加风险这个难题。

密钥才是秘密的精华。因此,在密码技术中,如何管理密钥是一个重要的课题。这将在第十一章详解。

每个人都可以拥有相同品牌的锁,但每个人都有不同的钥匙。锁的设计师公开的——锁匠都带有详细图的书,而且绝大多数好的设计方案都在公开专利
中进行了描述——但是钥匙是秘密的。

6 本章小结

密码系统:凯撒密码、简单替换密码以及 Enigma。
密码破译:暴力破解、字母频率分析。

7 小测验

  1. 凯撒密码中,如果存在如 c -> C, q -> Q这样,明文中的字母被替换成了相同字母的密文的情况。于是Alice就想:如果替换表中不出现这种被替换
    为相同字母的情况,那么密文应该会更难被破译吧?请问 Alice 的想法正确吗?

1 本章概要

从整体上了解密码世界的模样。

2 密码

2.1 Alice 与 Bob

要讲解密码,需要给参与信息交互的人和计算机起几个名字,如下表:

名称 说明
Alice 一般角色
Bob 一般角色
Eve 窃听者,可窃听通信内容
Mallory 主动攻击者,可妨碍正常通信、伪造消息等
Trent 可信的第三方
Victor 验证者

2.2 发送者、接受者和窃听者

Alice 向 Bob 发送消息,Alice 作为 Sender,Bob 作为 Receiver,消息作为 Message。在互联网中,消息会经过许多台计算机和通信设备做中转,
在这个过程中,就存在被恶意窃听者(eavesdropper)偷看的可能。作为监听者,我们给它起名为Eve,它可能是人类也有可能是某些程序软件。

2.3 加密与解密

Alice 不想让别人看到她传递消息的内容,因为它决定将消息加密(encrypt) 后再发送出去,加密之前的是明文(plaintext),加密之后的消息称为
密文(ciphertext)。

Bob 收到了 Alice 的加密消息后,他需要进行解密(decrypt)之后再查阅,即将密文恢复成明文的过程。这样,窃听者在网络上得到的只是密文。

2.4 破译

Eve 得到了密文,试图将密文还原为明文,被称为密码破译(cryptanalysis),简称破译,或者密码分析。

3 对称密码与公钥密码

3.1 密码算法

用于解决复杂问题的步骤,通常称为算法(algorithm),明文生成密文的步骤,被称为“加密算法”;而解密的步骤被称为“解密算法”,它们两统称为密码算法。

3.2 密钥

密码算法中需要密钥(key)。如下:

无论是在加密还是解密时,都需要知道密钥。

3.3 对称密钥与公钥密码

根据密钥的使用方法,可以将密码分为对称密码和公钥密码两种。

  1. 对称密码(symmetric cryptography)是指在加密和解密时使用同一密钥的方式。
  2. 公钥密码(public-key cryptography)则是在加密和解密时使用不同密钥的方式,也称非对称密码(asymmetric cryptography)。

3.4 混合密码系统

将对称密码和公钥密码结合起来的密码方式。

4 其它密码技术

4.1 单向散列函数

为了防止下载的软件被篡改,有安全意思的软件发布者会步的散列值。散列值就是用单向散列函数(one-way hash function)计算出来的数值。

单向散列函数所保证的并不是机密性,而是完整性(integrity),即“数据是正牌的而不是伪造的”。第七章,详细讲解。

4.2 消息认证码

为了确认消息是否来自所期望的通信对象,可以使用消息认证码(message authentication code)技术,不仅能保证完整性还能提供认证(authentication)机制。

4.3 数字签名

验证消息是否被篡改,能够确保完整性、提供认证并防止否认的密码机制。将第九章详解。

4.4 伪随机数生成器

伪随机数生成器(Pseudo Random Number Generator,PRNG)是一种能够模拟产生随机数列的算法。随机数和密码技术有关,这么说可能会感到意外,
但实际上随机数确实承担着密钥生成的重要职责。例如在 Web 中进行 SSL/TLS 通信时,会生成一个仅用于当前通信的临时密钥(会话密钥),这个密钥
就是基于伪随机数生成的。将在第十二章详解。

5 密码学家的工具箱

在以上内容中,已经出现了很多种类的密码技术,其中以下六种发挥尤其重要的作用:

  1. 对称密码
  2. 公钥密码(非对称密码)
  3. 单向散列函数
  4. 消息认证码
  5. 数字签名
  6. 伪随机数生成器

在本书中,将上述六种技术统称为密码学家的工具箱。

6 隐写术与数字水印

上面讲过,密码是一种能够让消息内容变得无法解读的技术,即 cryptography。除此之外,还有另外一种技术,它不是让消息内容变得无法解读,而是能够
隐藏消息本身,这种技术称为隐写术(steganography)。例如:
们先准备一段话,
容易看懂的就可以,
闻乐见的当然更好,
迎你尝试将另一句话嵌在这段话中,
会发现这其实就是一种隐写术。

这样,就发现这段话还隐藏着另一句话“我很喜欢你”。
隐写术在计算机中,用于数字水印技术,它是一种将著作权拥有者及购买者的信息嵌入文本中的技术。但是仅凭数字水印技术时无法对信息进行加密的,因此需要
和其它技术配合使用。

例如,将密码技术和隐写术相结合的方法就很常见。首先,我们将要嵌入的文章进行加密并生成密文,然后再通过隐写术将密文隐藏到图片中。这样一来,
即便有人发现了密文的存在,也无法读取出所嵌入的文章的内容。
密码隐藏的是内容,隐写术隐藏的是消息本身。

7 密码与信息安全常识

7.1 不要使用保密的密码算法

我们不应该制作或使用任何保密的密码算法,而是应该使用那些已经公开的、被公认为强度较高的密码算法,原因如下:

  1. 密码算法的秘密早晚会公诸于世。
  2. 开发高强度的密码算法是非常困难的。而试图通过对密码算法本身进行保密来确保安全性的行为,一般称为隐蔽式安全性(security by obscurity),这是危险且愚蠢的行为。

7.2 使用低强度的密码比不进行任何加密更危险

对于用户来说,安全感与密码的强度无关,而只是由“信息已经被加密了”这一事实产生的,这通常会导致用户在处理一些机密信息的时候麻痹大意。

7.3 任何密码总会有一天都会被破解

无论使用任何密码算法锁生成的密文,只要将所有可能的密钥全部尝试一遍,就总有一天可以破译出来。因此,破译密文所需要花费的时间,与要保密的明文的
价值之间的权衡就显得非常重要。严格来说,绝对不会被破解的密码其实是存在的,这种算法称为一次性密码本(one-time pad),将在3.4节详解。
此外,还有另一种技术被认为有可能早就完美的密码技术,那就是量子密码,将在15.3.1节详解。

7.4 密码只是信息安全的一部分

社会工程学(social engineering)。最脆弱的环节并不是密码,而是人类自己。

8 本章小结

主要浏览了密码世界中的一些主要技术。

9 小测验

下面说法是否正确?

  1. 将明文转换为密文的过程称为加密。
  2. 明文是供人类读取的数据,而密文则是供计算机读取的数据。
  3. 只要检查邮件发送者(From:)一栏的内容,就能够正确判断邮件是谁发出的。
  4. 在对称密码中,加密用的密钥和解密用的密钥是相同的。
  5. 公开的密码算法容易遭到坏人的攻击,因此使用自己公司开发的保密的密码算法更加安全。

1 Lock 接口

2. 队列同步器

队列同步器 AbstractQueueSynchronizer (简称同步器),是用来构建锁或者其它同步组件的基础框架,它使用了一个 int 成员变量表示同步状态,
通过内置的 FIFO 队列完成资源获取线程的排队工作。

2.1 队列同步器的接口与示例

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者的关系:锁是面向使用者,
它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。
锁和同步器很好地隔离了使用者和实现者所需关注的领域。

只有掌握了同步器的工作原理才能更加深入地理解并发包中其它的并发组件,所以下面通过一个独占锁的实例来深入了解一下同步器的工作原理。

独占锁就是在同一时刻只能由一个线程获取到锁,而其它获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能获取锁,代码5-2如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class Mutex implements Lock{
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {

// 是否处于占用状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

@Override
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}

Condition newCondition() {
return new ConditionObject();
}
}

private final Sync sync = new Sync();

@Override
public void lock() {
sync.acquire(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
sync.release(1);
}

public boolean isLocked() {
return sync.isHeldExclusively();
}

public boolean hasQueuedThreads(){
return sync.hasQueuedThreads();
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public Condition newCondition() {
return sync.newCondition();
}
}

2.2 队列同步器的实现分析

分析同步器如何完成线程同步的,包括:同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等同步器的核心数据结构与模块方法。

2.2.1 同步队列

当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点 (Node) 并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,
会把首节点中的线程唤醒,使其再次尝试获取同步状态。

试想,当一个线程成功地获取了同步状态(锁),其它线程将无法获取到同步状态,转而被构造称为节点并加入到同步队列中,而这个加入队列的过程必须要
保证线程安全,因此同步器提供了一个基于 CAS 线程的设置尾节点的方法: compareAndSetTail(Node expect, Node update) ,它需要传递当前“认为”
的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点:

设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用 CAS 来保证。

2.2.2 独占式同步状态获取与释放

通过调用同步器的 acquire(int arg) 方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程
进行中断操作时,线程不会从同步队列中移出,源码如下:

1
2
3
4
public final void acquire(int arg) {
if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

3 重入锁

ReenTrantLock,支持重入的锁。

3.1 实现重进入

这个特性需要解决两个问题:

  1. 线程再次获取锁。锁需要去识别获取锁额线程是否为当前占据锁的线程,如果是,则再次成功获取。
  2. 锁的最终释放。线程重复 n 次获取了锁,随后在第 n 次释放锁后,其它线程能够获取到锁。要求对获取进行计数自增,释放时计数自减。

获取锁源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

释放锁源码:

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

3.2 公平与非公平获取锁的区别

如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。
源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

其中相对于非公平锁的获取,多了一个 hasQueuedPredecessors() 判断,判断是否当前节点是否有前驱节点的判断。

如果有 n 个线程争夺锁。对于公平锁来说,保证了每个线程都可以获取锁,即有 n 次线程切换。而对于非公平锁来说,有可能有的线程能获取多次锁,
有的线程根本获取不到锁,线程切换次数是小于 n 的。因此减去上下文的切换时间,非公平锁的效率更高,所以 ReentrantLock 默认为非公平锁。

4 读写锁

在没有读写锁支持的时候(Java5之前),使用 Java 的等待通知机制,就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并行通知后,
所有等待的读操作才能继续执行(写操作之间依靠 synchronized 关键字进行同步)。

一般来说,读写锁的性能都会比排他锁好(ReentrantLock 也是一种排他锁),因为大多数场景读是多于写的。Java并发包提供读写锁的实现是 ReentrantReadWriteLock

4.1 读写锁的接口与示例

通过缓存示例说明读写锁的使用方式,代码5-16:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Cache {

static Map<String, Object> map = new HashMap<>();

static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

static Lock r = rwl.readLock();

static Lock w = rwl.writeLock();

public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}

public static final Object put(String key, String value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}

public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}

使用非线程安全的 HashMap 作为缓存的实现。使用读写锁提升读操作的并发性,并简化了编程方式。

4.2 读写锁的实现分析

4.2.1 读写状态的设计

读写锁依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态
(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。

如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。

当前图的同步状态表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。读写锁是如何迅速确定读和写各自的状态呢?答案是通过位运算。
假设当前同步状态值为 S ,写状态等于 S&0x0000FFFF(将高16位全部抹去),读状态等于 S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,,
当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。

根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。

4.2.2 写锁的获取与释放

写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取或者该线程不是已经获取写锁的线程,
则当前线程进入等待状态,获取锁的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 存在读锁或者当前获取线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

4.2.3 读锁的获取与释放

读锁是一个支持重进入的共享锁,能被多个线程同时获取,在没有其它写线程访问时,读锁总会被成功获取。如果当前线程已经获取了读锁,则增加读状态。
如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。获取读锁的实现从 Java5 到 Java6 变得复杂许多,主要原因是新增了一些功能,例如
getReadHoldCount() 方法,作用是返回当前线程获取读锁的次数。读状态是所有线程获取读锁次数的综合,而每个线程各自获取读锁的次数只能选择
保存在 ThreadLocal 中,由线程自身维护,这使得获取读锁的实现变得复杂。因此,这里将获取读锁的代码做了删减,保留必要的部分:

1
2
3
4
5
6
7
8
9
10
11
12
protected final int tryAcquireShared(int unused) {
for (;;) {
int c = getState();
int nextc = c + (1 << 16);
if (nextc < c)
throw new Error("Maximum lock count exceeded");
if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
return -1;
if (compareAndSetState(c, nextc))
return 1;
}
}

如果其它线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程增加读状态,成功获取读锁。

4.2.4 锁降级

锁降级指的是写锁降级称为读锁。获取写锁-释放写锁-获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指:获取写锁-获取读锁-释放写锁。

1 等待多线程完成的 CountDownLatch

例如:解析一个 Excel 里多个 sheet 的数据,如果使用多线程,每个线程解析一个 sheet 里的数据,等到所有的 sheet 都解析完之后,程序提示解析完成。

即,需要主线程等待所有线程完成 sheet 的解析操作,最简单的做法是使用 join() 方法,代码8-1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class JoinCountDownLatchTest {
public static void main(String[] args) throws Exception {
Thread parser1 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("parser1 finish");
}
});
Thread parser2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("parser2 finish");
}
});
parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish");
}
}

join 用于让当前执行线程等待 join 线程执行结束。其实现原理是不听检查 join 线程是否存活,如果 join 线程存活则让当前线程永远等待。

CountDownLacth 内部维护一个 int 类型的参数作为计数器,每次执行 countDown() 都会让计数器减1,await()只有当计数器为0的时候,才不会阻塞当前线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CountDownLatchTest {

static CountDownLatch c = new CountDownLatch(2);

public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
c.countDown();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println("3");
}

2 同步屏障 CyclicBarrier

2.1 CyclicBarrier 简介

字面意思:可循环使用(cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,
屏障才会开门,所有被屏障拦截的线程才会继续运行。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2, new A());

public static void main(String[] args) {
Thread s = new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
Thread.sleep(1000); // 设置睡眠,输出:321,如果不设置,可能会是312
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(1);
}
});

s.start();

try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(2);
}

static class A implements Runnable {

@Override
public void run() {
System.out.println(3);
}
}
}

2.2 CyclicBarrier 的应用场景

用于多线程计算数据,最后合并计算结果的场景。例如,用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个账户近一年的每笔银行流水,
现在需要统计用户的日均银行流水,先多线程处理每个 Sheet 的银行流水,都执行完之后,再用 barrierAction 计算线程结果。代码8-5如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.lwg.current_art;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

public class BankWaterService implements Runnable{

/**
* 创建 4 个屏障,当运行了 4 个await()后,才会运行第二参数。
*/
private CyclicBarrier c = new CyclicBarrier(4, this);

/**
* 4 个sheet,创建 4 个线程的线程池
*/
private Executor executor = Executors.newFixedThreadPool(4);

/**
* 保存每个线程的结果
*/
private Map<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

private void count() {
for (int i = 0; i < 4; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
System.out.println("size:" + sheetBankWaterCount.size());
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}

@Override
public void run() {
int result = 0;
for (Map.Entry<String, Integer> sheet: sheetBankWaterCount.entrySet()){
result += sheet.getValue();
}
sheetBankWaterCount.put("result", result);
System.out.println(result);
}

public static void main(String[] agrs) {
BankWaterService bankWaterService = new BankWaterService();
bankWaterService.count();
}
}

2.3 CyclicBarrier 和 CountDownLatch 的区别

CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以使用 reset() 方法重置。所以 CyclicBarrier 能处理更为复杂的业务。
一些 API 用法如下代码8-6:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CyclicBarrierTest3 {

static CyclicBarrier c = new CyclicBarrier(2);

public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
thread.start();
thread.interrupt();
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
System.out.println(c.isBroken());
e.printStackTrace();
}
}
}

2.3 控制并发线程数的 Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用公共资源。

把 Semaphore 比作是控制流量的红绿灯。比如xx马路要限制流量,只允许同时有一百辆车在这条路上行驶,其它的都必须在路口等待,所以前一百辆车会看到绿灯,
可以开进这条马路,后面的车会看到红灯,不能驶入xx马路,但是如果前一百辆中有5辆车已经离开了xx马路,那么后面就允许有5辆车驶入xx马路,
即车就是线程,驶入马路就是线程执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞。

2.3.1 应用场景

Semaphore 可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假设有一个需求,要读取几万个文件的数据,因为都是IO密集型人物,
我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,
否则会报错无法获取数据库连接。这个时候可以使用 Semaphore 做流量控制,如下代码8-7:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SemaphoreTest {

private static final int THREAD_COUNT = 30;

private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore s = new Semaphore(10);

public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}
}

4 线程间交换数据的 Exchanger

Exchanger 提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。如果第一个线程执行 exchange() 方法,它就会一直等待第二个线程也执行 exchange 方法,
当两个线程都到达同步点时,这时就可以交换数据。

4.1 应用场景

  1. 遗传算法:选出两个人作为交配对象,交换两人的数据,并使用交叉规则得出2个交配结果。
  2. 校对工作:对两个人工录入的文件进行校对。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public class ExchangerTest {

    private static final Exchanger<String> exgr = new Exchanger<>();

    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
    threadPool.execute(new Runnable() {
    @Override
    public void run() {
    try {
    String A = "银行流水A";
    exgr.exchange(A);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
    threadPool.execute(new Runnable() {
    @Override
    public void run() {
    try {
    String B = "银行流水B";
    String A = exgr.exchange(B);
    System.out.println("A和B数据是否一致:" + A.equals(B) + ".A录入的是:" + A + ".B录入的是:" + B);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
    threadPool.shutdown();
    }
    }

5 本章小结

  1. CountDownLatch->CyclicBarrier:都是等待某些运行到某个点后,才执行后面的方法,但是 CyclicBarrier提供的 API 更多适合更复杂的场景。
  2. Semaphore:控制并发数,即创建了30个线程,但是并发最多可以设置为10。
  3. Exchanger:线程间交换数据,在同步点处,A线程可以获得B线程的数据。

1 ConcurrentHashMap 的实现原理与使用

1.1 为什么要使用 ConcurrentHashMap

  1. 线程不安全的 HashMap ,在多线程下 HashMap 的 Entry 链表导致形成环形数据结构, Entry 的 next 节点永远不为空,就会产生死循环获取 Entry 。
  2. 效率低下的 HashTable ,使用 synchronized 保证线程安全。
  3. ConcurrentHashMap 的锁分段技术有效提升并发访问率。 HashTable 效率低下是因为所有访问 HashTable 的线程都必须竞争同一把锁,如果容器里
    有多把锁,每把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程就不会存在锁竞争。

1.2 ConcurrentHashMap 的结构

ConcurrentHashMap 由 Segment 数组结构和 HashEntry 数组结构组成。 Segment 是一种可重入锁(ReentrantLock), Segment的结构和 HashMap
结构类似。

1.3 ConcurrentHashMap 的初始化

  1. 初始化 segments 数组
  2. 初始化 segmentShift 和 segmentMask。用于散列算法的一些值。
  3. 初始化每个 segment,包括容量以及负载因子。

1.4 ConcurrentHashMap 的操作

1.4.1 get 操作

get 操作的高效之处在于整个 get 过程不需要加锁,除非读到的值是空才会加锁重读。如何保证不加锁?原因在于它的 get 方法里将要使用的共享变量都
定义成 volatile 类型,如用于统计当前 Segment 大小的 count 字段和用于存储值的 HashEntry 的 value 。但只能被单线程写(有一种情况可以被
多线程写,就是写入的值不依赖于原值),在 get 操作里只需要读不需要写共享变量 count 和 value ,所以不需要加锁。即使两个线程同时修改和获取
volatile 变量, get 操作也能拿到最新的值,这是用 volatile 替换锁的经典应用场景。

1.4.2 put 操作

必须加锁。先定位到 Segment ,然后在 Segment 里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对 Segment 里的 HashEntry 数组
进行扩容,第二步定位添加元素的位置,然后将其放在 HashEntry 数组里。

  1. 是否需要扩容:插入元素前会判断 Segment 里的 HashEntry 数组是否超过容量,如果超过阈值,则对数组进行扩容。
  2. 如何扩容:先创建一个容量是原来容量两倍的数组,然后将原数组里元素进行散列后插入到新的数组里。为了高效, 只会对某个 Segment 进行扩容。

1.4.3 size 操作

每个 Segment 的count 是 valatile 变量,但是累加过程中有可能 count 发生变化,最安全的做法是在统计 size 的时候把所有 Segment 的 put、
remove 和 clean 方法全部锁住,但是非常低效。所以 ConcurrentHashMap 的做法是尝试 2 次通过不锁住 Segment 的方式来统计累和,如果 count
发生变化,再采用加锁方式统计。如果判断count 发生变化呢?使用 modCount 变量。

2 ConcurrentLinkedQueue

实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。前者可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队
用不同的锁)等方式实现。非阻塞的实现则可以使用循环 CAS 的方式来实现。

2.1 ConcurrentLinkedQueue 的结构

由 head 节点和 tail 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成。默认情况下 head 节点存储的元素为空,
tail 节点等于 head 节点。

2.2 入队列

2.2.1 入队列的过程

  1. 添加元素1。队列更新 head 节点的 next 节点为元素1节点,又因为 tail 节点默认情况下等于 head 节点,所以它们的 next 节点都指向元素1节点。
  2. 添加元素2。队列首先设置元素1节点的 next 节点为元素2节点,然后更新 tail 节点指向元素2节点。
  3. 添加元素3。设置 tail 节点的 next 节点为元素3节点。
  4. 添加元素4。设置元素3的 next 节点为元素4节点,然后将 tail 节点指向元素4节点。

    入队主要做两件事:一是将入队节点设置成当前队列尾节点的下一个节点;二是更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成
    tail 节点,如果 tail 节点的 next 节点为空,则将入队节点设置成 tail 的 next 节点,所以 tail 节点不总是尾节点。

在代码中,入队主要做两件事:一是定位出尾节点(通过 tail 的 next 是否为空);二是使用 CAS 算法将入队队列设置成尾节点的 next 节点,如不成功则重试。
(进行 tail 的 next 判断是否为空时,如果循环两次都不为空,则重新进行队列,因为肯定有别的线程加了尾)。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
// 入队前,创建一个入队节点
Node<E> n = new Node<E>(e);
retry:
// 死循环,入队不成功反复入队。
for (;;) {
// 创建一个指向tail节点的引用
Node<E> t = tail;
// p用来表示队列的尾节点,默认情况下等于tail节点。
Node<E> p = t;
for (int hops = 0; ; hops++) {
// 获得p节点的下一个节点。
Node<E> next = succ(p);
// next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点
if (next != null) {
// 循环了两次及其以上,并且当前节点还是不等于尾节点
if (hops > HOPS && t != tail)
continue retry;
p = next;
}
// 如果p是尾节点,则设置p节点的next节点为入队节点。
else if (p.casNext(null, n)) {
/*如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点,
更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点*/
if (hops >= HOPS)
casTail(t, n); // 更新tail节点,允许失败
return true;
}
// p有next节点,表示p的next节点是尾节点,则重新设置p节点
else {
p = succ(p);
}
}
}
}

2.2.2 定位尾节点

通过判断 tail 节点和 tail 节点的 next 节点。

2.2.3 HOPS 的设计意图

用如下实现是否可行:

1
2
3
4
5
6
7
8
9
10
11
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
Node<E> n = new Node<E>(e);
for (;;) {
Node<E> t = tail;
if (t.casNext(null, n) && casTail(t, n)) {
return true;
}
}
}

让 tail 节点永远作为队列的尾节点,这样实现代码量非常少。但是有个缺点是,每次都要更新 tail 节点为尾节点,而使用 HOPS 常量,进行判断,如果
当 tail 节点和尾节点的距离大于等于常量 HOPS 的值(默认等于1)时才更新 tail 节点,来通过增加对 volatile 变量的读操作来减少对 volatile 变量
的写操作,入队效率提升。

2.3 出队列

只有当 head 节点里没有元素时,出队操作才会更新 head 节点。这种做法也是通过 hops 变量来减少使用 CAS 更新 head 节点的消耗,从而提高出队效率。

3 Java 中的阻塞队列

3.1 什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  1. 支持阻塞的插入方法:意思当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  2. 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

3.2 Java 里的阻塞队列

类名 描述
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列
PriorityBlockingQueue 支持优先级排序的无界阻塞队列
DelayQueue 使用优先级队列实现的无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列
LinkedTransferQueue 由链表结构组成的无界阻塞队列
LinkedBlockingDeque 由链表结构组成的双向阻塞队列

3.3 阻塞队列的实现原理

3.3.1 使用通知模式实现

即当生产者往满的队列里添加元素时会阻塞住消费者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。

4 Fork/Join 框架

4.1 什么是 Fork/Join 框架

Fork 就是把一个大任务切分为若干子任务并行的执行, Join 就是合并这些子任务的执行结果。

4.2 工作窃取算法

某个线程先把自己队列里的任务干完,而其它线程对应的队列里还有任务等待处理,这时候,完成任务的线程去做还有任务的线程的任务。

  1. 优点:充分利用线程进行并行计算,减少线程间的竞争。
  2. 缺点:如果只有一个任务时,还是会有竞争。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。

4.3 Fork/Join 框架的设计

  1. 分割任务:首先我们需要由一个 fork 类来把大任务分割成小任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
  2. 执行任务并合并结果:分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,
    启动一个线程从队列里里拿数据,然后合并这些数据。

Fork/Join 使用两个类来完成以上两件事。

  1. ForkJoinTask:我们要使用 ForkJoin框架,必须首先创建一个 Fork/Join 任务。它提供在任务中执行 fork() 和 join() 操作的机制。通常情况下,
    我们只需要继承它的子类,而 Fork/Join 框架提供了以下两个子类。 RecursiveAction:用于没有返回结果的任务; RecursiveTask:用于有返回结果的任务。
  2. ForkJoinPool:ForkJoinTask 需要通过 ForkJoinPool 来执行:任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。
    当一个工作线程的队列里暂时没有任务时,它会随机从其它工作线程的队列的尾部 获取一个任务。

4.4 使用 Fork/Join 框架

需求:计算1+2+3+4的结果。
如果希望每个子任务最多执行两个数的相加,我们设置分割的阈值是2,由于是4个数字相加,所以fork成两个子任务,一个计算1+2,一个计算3+4,然后再join两个子任务。
由于有结果的任务,因此继承 RecursiveTask。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class CountTask extends RecursiveTask<Integer>{

private static final int THRESHOLD = 2; // 阈值
private int start;
private int end;

public CountTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
// 如果任务足够小就计算任务
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待子任务执行完,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}

public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个计算任务,负责计算1+2+3+4
CountTask task = new CountTask(1, 4);

Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

4.5 Fork/Join 框架的实现原理

ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成, ForkJoinTask 数组负责将存放程序提交给 ForkJoinPool 的任务,
而 ForkJoinWorkerThread 数组负责执行这些任务。

4.5.1 ForkJoinTask 的 fork 方法实现原理

使用 push 方法,把当前任务存放在 ForkJoinTask 数组队列中,再调用 ForkJoinPool 的 signalWork() 方法唤醒或创建一个工作线程来异步的执行这个任务,
然后立即返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}

final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}

4.5.2 ForkJoinTask 的 join 方法实现原理

Join 方法主要用于阻塞当前线程并等待获取结果。调用 doJoin() 方法,通过查看任务状态,如果执行完则直接返回任务状态;如果没执行完,
则从任务数组里取出任务并执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}

private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}

1 线程简介

1.1 什么是线程

操作系统运行一个程序时,会为其创建一个进程。而操作系统调度的最小单元是线程,也叫轻量级进程( Light Weight Process),
在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。
处理器在这些线程上高速切换,让使用者感觉到这些线程在同时执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MultiThread{
public static void main(String[] args) {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
for(ThreadInfo threadInfo : threadInfos) {
System.out.println(threadInfo.getThreadId() + "," + threadInfo.getThreadName());
}
/**
6,Monitor Ctrl-Break
5,Attach Listener
4,Signal Dispatcher
3,Finalizer
2,Reference Handler
1,main
*/
}
}

1.2 为什么要使用多线程

1.2.1 更多的处理器核心

程序运行过程中能够创建多个线程,而一个线程在一个时刻只能运行在一个处理器核心上。试想一下,一个单线程程序在运行时只能使用一个处理器
核心,那么再多的处理器核心加入也无法显著提升该程序的执行效率。相反,如果该程序使用多线程技术,将计算逻辑分配到多个处理器核心上,
就会显著减少程序的处理时间,并且随着更多处理器核心的加入而变得更有效率。

1.2.2 更快的响应时间

编写一些较为复杂的代码,例如,一笔订单的创建,它包括插入订单数据、生成订单快照、发送邮件通知卖家和记录货品销售数量等。这么多业务
操作,如何能够让其更快地完成呢?

1.2.3 更好的编程模型

Java 为多线程编程提供了良好、考究并且一致的编程模型,使开发人员更加专注于问题的解决。

1.3 线程优先级

现代操作系统基于采用时分的形式调度运行的线程,操作系统会分出一个个时间片,线程会分配到若干时间片,当线程的时间片用完了就会发生线程
调度,并等待下次分配。线程分配到的时间片多少也就决定了线程使用处理器资源的多少,而线程优先级就是决定线程需要多或者少分配一些处理器
资源的线程属性。

在 Java 线程中,通过一个整型成员变量 priority 来控制优先级,优先级的范围从 1~10 ,在线程构建的时候可以通过 setPriority(int) 方法
来修改优先级,默认优先级是 5 ,优先级高的线程分配时间片的数量要多余优先级低的线程。设置线程优先级时,针对频繁阻塞的线程需要设置较高优先级,
而偏重计算(需要较多 CPU 时间或者偏运算)的线程则设置较低的优先级。笔者在 JDK 1.8 的 WIN 10 环境:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Priority {

private static volatile boolean notStart = true;

private static volatile boolean notEnd = true;

public static void main(String[] args) throws InterruptedException {
List<Job> jobs = new ArrayList();
for (int i = 0; i < 10; i++) {
int priority = i < 5 ? Thread.MIN_PRIORITY : Thread.MAX_PRIORITY;
Job job = new Job(priority);
jobs.add(job);
Thread thread = new Thread(job, "Thread:" + i);
thread.setPriority(priority);
thread.start();
}
notStart = false;
TimeUnit.SECONDS.sleep(2);
notEnd = false;
for (Job job : jobs) {
System.out.println("Job Priority:" + job.priority + ", Count:" + job.jobCount);
}
}

static class Job implements Runnable {

private int priority;
private long jobCount;

public Job(int priority) {
this.priority = priority;
}

@Override
public void run() {
while (notStart) {
Thread.yield();
}
while (notEnd) {
Thread.yield();
jobCount++;
}
}
}
}

输出:

1
2
3
4
5
6
7
8
9
10
Job Priority:1, Count:16772
Job Priority:1, Count:16761
Job Priority:1, Count:16761
Job Priority:1, Count:16758
Job Priority:1, Count:16757
Job Priority:10, Count:756747
Job Priority:10, Count:757594
Job Priority:10, Count:757263
Job Priority:10, Count:759519
Job Priority:10, Count:760287

1.4 线程的状态

1.5 Daemon 线程

Daemon 线程是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作。这意味着,当一个 Java 虚拟机中不存在非 Daemon 线程的时候,
Java 虚拟机将会退出。可以通过调用 Thread.setDaemon(true) 将线程设置为 Daemon 线程。

Daemon 属性需要在启动线程之前设置,不能在启动线程之后设置。

Daemon 线程被用作完成支持性工作,但是在 Java 虚拟机退出时 Daemon 线程中的 finally 块并不一定会执行,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Daemon {
public static void main(String[] args) {
Thread thread = new Thread(new DaemonRunner(), "DaemonRunner");
thread.setDaemon(true);
thread.start();
}

static class DaemonRunner implements Runnable {

@Override
public void run() {
try {
SleepUtils.second(10);
} finally {
System.out.println("DaemonThread finally run.");
}
}
}
}

最终没有任何的输出,mian 线程在启动了线程 DaemonRunner 之后随着 main 方法执行完毕而终止,而此时 Java 虚拟机中已经灭有非 Daemon 线程,
虚拟机需要退出。 Java 虚拟机中的所有 Daemon 线程都需要立即终止,因此 DaemonRunner 立即终止,但是 DaemonRunner 中的 finally 块并没有执行。

2 启动和终止线程

2.1 理解中断

中断可以理解为线程的一个标识位属性,它标识一个运行中的线程是否被其它线程进行了中断操作。线程通过方法 isInterrupted 来进行判断是否被中断,
也可以调用静态方法 Thread.interrupted() 对当前线程的中断标识位进行复位。下面的例子中,创建了两个线程, SleepThread 和 BusyThread ,
前者不停地睡眠,后者一直运行,然后对这两个线程分别进行中断操作,观察二者的中断标识位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Interrupted {
public static void main(String[] args) throws InterruptedException {
Thread sleepThread = new Thread(new SleepRunner(), "SleepThread");
sleepThread.setDaemon(true);

Thread busyThread = new Thread(new BusyRunner(), "BusyThread");
busyThread.setDaemon(true);

sleepThread.start();
busyThread.start();

Thread.sleep(5 * 1000);

sleepThread.interrupt();
busyThread.interrupt();

System.out.println("SleepThread interrupted is " + sleepThread.isInterrupted());
System.out.println("BusyThread interrupted is " + busyThread.isInterrupted());

Thread.sleep(2 * 1000);


System.out.println("SleepThread interrupted is " + sleepThread.isInterrupted());
System.out.println("BusyThread interrupted is " + busyThread.isInterrupted());
// SleepThread interrupted is false
// BusyThread interrupted is true

Thread.sleep(2 * 1000);
}

static class SleepRunner implements Runnable {

@Override
public void run() {
while (true) {
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

static class BusyRunner implements Runnable {

@Override
public void run() {
while (true) {
}
}
}
}

线程 SleepThread 其中断标识位被清除了,而一直忙碌运作的线程 BusyThread ,其中断标识位没有被清除。

2.2 安全地终止线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Shutdown {

public static void main(String[] args) throws InterruptedException {
Runner one = new Runner();
Thread countThread = new Thread(one, "countThread");
countThread.start();

Thread.sleep(1 * 1000);

countThread.interrupt();

Runner two = new Runner();
countThread = new Thread(two, "CountThread");
countThread.start();

Thread.sleep(1 * 1000);

two.cancel();
}

private static class Runner implements Runnable {
private long i ;
private volatile boolean on = true;

@Override
public void run() {
while(on && !Thread.currentThread().isInterrupted()) {
i++;
}
}

private void cancel() {
on = false;
}
}
}

使用 Thread.interrupted() 或一个 boolean 来进行中断。

3. 线程间通信

3.1 volatile 和 synchronized 关键字

每个执行的线程拥有一份拷贝,这样做的目的是加速程序的执行,所以程序在执行过程中,一个线程看到的变量并不一定是最新的。

关键字 volatile 就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,保证所有线程对变量访问的可见性。

举个例子:定义一个表示程序是否运行的成员变量 boolean on = true ,另一个线程可能执行了关闭动作 on = false ,这里涉及多个线程对变量的访问,
因此需要定义称为 volatile boolean on = true ,这样其他线程对它改变时,所以线程都会感知,因为所有对 on 变量的访问和修改都需要以共享内存
为准。

关键字 synchronized 主要确保多个线程在同一时刻,只能由一个线程处于方法或者同步块中,保证了线程对变量访问的可见性和排他性。

使用 javap 工具分析 synchronized 关键字的实现细节,实例4-10:

1
2
3
4
5
6
7
8
9
10
11
12
public class Synchronized {
public static void main(String[] args) {
synchronized (Synchronized.class) {

}
m();
}

public static synchronized void m() {

}
}

javap -v Synchronized.class :同步块的前面和后面分别有 monitorenter 和 monitorexit 指令,而同步方法依靠方法修饰符 ACC_SYNCHRONIZED 来完成。
其本质是对一个对象的监视器(monitor)进行获取,这个获取过程是排他的,即同一时刻只有一个线程获取到由 sychronized 所保护对象的监视器。

任意一个对象都拥有自己的监视器,执行方法的线程必须先获取到该对象的监视器,没有获取到的线程将会阻塞在入口处,进入 BLOCKED 状态。

3.2 等待/通知机制

一个线程修改了一个对象的值,另一个线程感知到了变化,进行相应的操作,整个过程开始于一个线程,最终执行又是另一个线程。前者是生产者,后者就是消费者。
在功能上进行了解耦,“做什么”和“怎么做”。在 Java 实现类似的功能:

简单的方法就是让消费者线程不断地循环检查变量是否符合预期,如下的消费者:

1
2
3
4
while (value != desire) {
Thread.sleep(1000); // 防止过快的“无效”尝试
}
doSomething();

存在的问题:

  1. 难以确保及时性。在睡眠时,基本不消耗处理器资源,但是如果睡得过久,就不能及时发现条件已经变化。
  2. 难以降低开销。如果降低睡眠时间,比如休眠 1 毫秒,这样消费者能更加迅速地发生条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。

可以通过等待/通知的相关方法是任意 Java 对象都具备的,因为这些方法被定义在 Object ,方法和描述如下表所示:

方法名称 描述
notyfy() 通知一个在对象上等待的线程,使其从 wait() 方法返回,而返回的前提是该线程获取到了对象的锁
notifyAll() 通知所有等待在该对象上的线程
wait() 调用该方法的线程进入 WAITING 状态,只有等待另外线程的通知或被中断才会返回,需要注意,调用 wait() 方法后,会释放对象的锁
wait(long) 超时等待一段时间,也就是等待长达 n 毫秒,如果没有通知就超时返回
wait(long, int) 对于超时时间更细粒度的控制,可以达到纳秒

等待/通知机制,是指一个线程A 调用了对象O 的 wait() 方法进入等待状态,而另一个线程B 调用了对象O 的 notify() 或者 notifyAll() 方法,
线程A 收到通知后从对象O 的 wait() 方法返回,进而执行后续操作。上述两个线程通过对象O 来完成交互,而对象上的 wait() 和 notify()/notifyAll()
的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();

public static void main(String[] args) throws InterruptedException {
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
Thread.sleep(1 * 1000);
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
notifyThread.start();
}

static class Wait implements Runnable {

@Override
public void run() {
// 加锁,拥有 lock 的 Monitor
synchronized (lock) {
// 当条件不满足时,继续 wait ,同时释放了 lock 的锁
while (flag) {
try {
System.out.println("1");
System.out.println(Thread.currentThread() + " flag is true. wait " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.wait();
System.out.println("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 条件满足时,完成工作
System.out.println(Thread.currentThread() + " flag is false. running " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
}

static class Notify implements Runnable {

@Override
public void run() {
// 加锁,拥有 lock 的 Monitor
synchronized (lock) {
// 获取 lock 的锁,然后进行通知,通知时不会释放 lock 的锁
// 直到当前线程释放了 lock 后, WaitThread 才能从 wait 方法中返回
System.out.println(Thread.currentThread() + " hold lock. notify " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.notifyAll();
flag = false;
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 再次加锁
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock again. sleep " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

3.3 Thread.join() 的使用

如果一个线程A 执行了 thread.join() 语句,即:当前线程A 等待 thread 线程终止之后才从 thread.join() 返回。代码4-13如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Join {

public static void main(String[] args) {
Thread previous = Thread.currentThread();
for (int i = 0; i < 10; i++) {
// 每个线程拥有前一个线程的引用
Thread thread = new Thread(new Domino(previous), String.valueOf(i));
thread.start();
previous = thread;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " terminate. ");
}

static class Domino implements Runnable {

private Thread thread;

public Domino(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " terminate. ");
}
}
}

Thread.join() 源码大概是这样的结构:

1
2
3
4
5
6
7
8
// 加锁当前线程对象
public final synchronized void join() throws InterruptedException {
// 条件不满足,继续等待
while(isAlive()) {
wait(0);
}
// 条件符合,方法返回
}

逻辑结构和等待/通知经典范式一致,即加锁、循环和处理逻辑。

3.4 ThreadLocal 的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Profiler {
// 第一次调用 get() 方法会进行初始化,每个线程只会执行一次
private static final ThreadLocal<Long> TIME_threadLocal = new ThreadLocal() {
@Override
protected Object initialValue() {
return System.currentTimeMillis();
}
};

public static final void begin() {
TIME_threadLocal.set(System.currentTimeMillis());
}

public static final long end() {
return System.currentTimeMillis() - TIME_threadLocal.get();
}

public static void main(String[] args) throws InterruptedException {
Profiler.begin();
Thread.sleep(1000);
System.out.println("Cost: " + Profiler.end() + " mills");
}
}

4 线程应用实例

4.1 等待超时模式

调用一个方法时等待一段时间,如果该方法能够在给定的时间段之内得到结果,那么将结果立即返回,反之,超时返回默认结果。
假设超时时间为 T ,那么在 now + T 之后就会超时。

1
2
3
4
5
6
7
8
9
10
public synchronized Object get(long t) {
long now = System.currentTimeMillis();
long future = now + t;
long remaining = t;
while (result == null && remaining > 0) {
wait(remaining);
remaining = future - System.currentTimeMillis();
}
return result;
}

4.2 一个简单的数据库连接池示例

主干代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ConnectionPool {
private LinkedList<Connection> pool = new LinkedList<>();

public ConnectionPool(int initialSize) {
if (initialSize > 0) {
for (int i = 0; i < initialSize; i++) {
pool.addLast(ConnectionDriver.createConnection());
}
}
}

public void releaseConnection(Connection connection) {
if (connection != null) {
synchronized (pool) {
pool.addLast(connection);
pool.notifyAll();
}
}
}

public Connection fetchConnection(long mills) throws InterruptedException {
synchronized (pool) {
if (mills <= 0) {
while (pool.isEmpty()) {
pool.wait();
}
return pool.removeFirst();
} else {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
while (pool.isEmpty() && remaining > 0) {
pool.wait(remaining);
remaining = future - System.currentTimeMillis();
}
Connection result = null;
if (!pool.isEmpty()) {
result = pool.removeFirst();
}
return result;
}
}
}
}

4.3 线程池技术

主干代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {

private static final int MAX_WORKER_NUMBERS = 10;
private static final int DEFAULT_WORKER_NUMBERS = 5;
private static final int MIN_WORK_NUMBERS = 1;

// 任务列表
private final LinkedList<Job> jobs = new LinkedList<>();

// 工作者列表,即保存着所有的消费者列表
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());

// 工作线程的数量
private int workerNum = DEFAULT_WORKER_NUMBERS;

// 线程编号
private int threadNum = 0;

@Override
public void execute(Job job) {
if (job != null) {
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
}

@Override
public void shutdown() {
for (Worker worker : workers) {
worker.shutdown();
}
}

@Override
public void addWorkers(int num) {
synchronized (jobs) {
if (num + this.workerNum > MAX_WORKER_NUMBERS) {
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWorkers(num);
this.workerNum += num;
}
}

/**
* 移除工作线程,即移除消费者
*/
@Override
public void removeWorker(int num) {
synchronized (jobs) {
if (num >= this.workerNum) {
throw new IllegalArgumentException("beyond workNum");
}
int count = 0;
while (count < num) {
Worker worker = workers.get(count);
if (workers.remove(worker)) {
worker.shutdown();
count++;
}
}
this.workerNum -= count;
}
}

@Override
public int getJobSize() {
return jobs.size();
}

// 初始化工作者
private void initializeWorkers(int num) {
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum++);
thread.start();
}
}

// 工作者,即消费者,负责消费任务
class Worker implements Runnable {
// 允许外界控制是否停止
private volatile boolean running = true;

@Override
public void run() {
while (running) {
Job job = null;
synchronized (jobs) {
while (jobs.isEmpty()) {
try {
jobs.wait();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
return;
}
}
// 取出一个Job
job = jobs.removeFirst();
}
if (job != null) {
job.run();
}
}
}
public void shutdown() {
running = false;
}
}
}

4.4 小结

1 Java 内存模型的基础

1.1 并发编程模型的两个关键问题

线程之间如何通信
线程之间如何同步(这里的线程是指并发执行的活动实体)。
通信是指线程之间以何种机制来交换信息。在命令式编程中,线程之间的通信机制有两种:

共享内存。线程之间共享程序的公共状态,通过写读内存中的公共状态进行隐式通信。
消息传递。线程之间没有公共状态,线程之间必须通过发送消息来显式进行通信。
同步是指程序中用于控制不同线程间操作发生相对顺序的机制。
在共享内存并发模型里,同步是显式进行的。必须显式指定某个方法或某段代码需要在线程之间互斥执行。
在消息传递的并发模型里,由于消息的发送必须在消息的接收之前,因此同步是隐式进行的。

Java 的并发采用的是共享内存模型,线程之间的通信对程序员完全透明。如果编写多线程不理解隐式进行的线程之间通信的工作机制,可能会遇到奇怪的内存可见性问题。

1.2 Java 内存模型的抽象结构

在 Java 中,所有实例域、静态域、数组元素都存储在堆内存中,堆内存在线程之间共享。局部变量( Local Variables),方法定义参数和异常处理参数不会在线程之间共享, 它们不会有内存可见性问题,也不受内存模型的影响。

Java 线程之间的通信由 Java 内存模型(简称为 JMM )控制, JMM 决定一个线程对共享变量的写入何时对另一个线程可见。从抽象的角度来看, JMM 定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存中存储了该线程以读/写共享变量的副本。 本地内存是 JMM 的一个抽象概念,并不真实存在。

从上图看,如果线程A 与线程B 之间要通信的话,必须要经过下面 2 个步骤。

线程A 把本地内存A 更新过的共享变量刷新到主内存中去。
线程B 到主内存中去读取线程A 之前已更新过的共享变量。
如下图:

本地内存A 和本地内存B 由主内存中共享变量 x 副本。假设初始时,这 3 个内存中的 x 值都为 0 。线程 A 在执行时,把更新后的 x 值(假设值为 1 ) 临时存放在自己的本地内存 A 中。当线程A 和线程B 需要通信时,线程A 首先会把自己本地内存中修改后的 x 值刷新到主内存中的 x 值变为了 1 。随后, 线程B 到主内存中去读取线程A 更新后的 x 值,此时线程B 的本地内存的 x 值也变为了 1 。

从整体来看,这两个步骤实质上是线程A 在向线程B 发送消息,而且这个通信过程必须要经过主内存。 JMM 通过控制主内存与每个线程的本地内存之间的交互, 来为程序员提供内存可见性保证。

1.3 从源代码到指令序列的重排序

执行程序,为了提高性能,编译器和处理器常常会对指令做重排序。重排序分三种:

编译器优化的重排序。
指令级并行的重排序。
内存系统的重排序。
JMM 属于语言级的内存模型,它确保在不同的编译器和不同的处理器平台之上,通过禁止特定类型的编译器重排序和处理器重排序,为程序员提供一致的内存可见性保证。

1.4 并发编程模型的分类

现在的处理器使用写缓存区临时保存向内存写入的数据。同时,通过以批处理的方式刷新写缓存区,以及合并写缓冲区对统一内存的多次写,减少对内存总线的占用。

1.5 happens-before 简介

在 JMM 中,如果一个操作执行的结果需要对另一个操作可见,那么这两个操作之间必须要存在 happens-before 关系。这里提到的两个操作既可以是在一个线程之内, 也可以是在不同线程之间。

与程序员密切相关的 happens-before 规则如下:
程序顺序规则: 一个线程中的每个操作, happens-before 于该线程中的任意后续操作。
监视器锁规则: 对一个锁的解锁, happens-before 与随后对这个锁的加锁。
volatile 变量规则: 对一个 volatile 域的写, happens-before 与任意后续对这个 volatile 域的读。
传递性: 如果 A happens-before B ,且 B happens-before C , 那么 A happens-before C 。

一个 happens-before 规则对应一个或多个编译器和处理器重排序规则。它避免程序员为了理解 JMM 提供的内存可见性保证而去学习复杂的重排序规则。

2 重排序

重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。

1
2
3
4
5
6
7
8
9
10
11
12
13
class ReorderExample {
int a = 0;
bolean flag = false;
public void writer() {
a = 1; // A
flag = true; // B
}
public void reader() {
if (flag) { // C
int i = a * a; // D
}
}
}

假设有两个线程分别执行这两个方法,由于 a 变量和 flag 变量没有 happens-before 关系,因此有以下可能:
B->C->D->A。且由于 C 和 D 存在控制依赖关系,因此执行 reader() 方法的县横可以提前读取并计算 a * a ,然后把计算结果临时保存到一个名为重排序缓冲(Reorder Buffer,ROB)的硬件缓存中。 到为真时,就把该计算结果写入变量 i 中。因此,重排序在这里破坏了多线程程序的语义!

在单线程程序中,重排序不会改变执行结果。但是在多线程程序中,对存在控制依赖的操作重排序,可能会改变程序的执行结果。

3 顺序一致性

顺序一致性内存模型是一个理论参考模型,在设计的时候,处理器的内存模型和编程语言的内存模型都会以顺序一致性内存模型作为参考。

3.1 数据竞争与顺序一致性

JMM 保证,如果程序时正确同步的,程序的执行将具有顺序一致性——即程序的执行结果与该程序的顺序一致性内存模型中的执行结果相同。 这里的同步是指广义上的同步,包括对常用同步原语(synchronized 、 volatile 、 final)的正确使用。

3.2 顺序一致性内存模型

顺序一致性内存模型是一个被理想化了的理论参考模型:

一个线程中的所有操作必须按照程序的顺序来执行。
不管程序是否同步,所有线程都只能看到一个单一的曾作执行顺序。在顺序一致性内存模型中,每个操作都必须原子执行且立刻对所有线程可见。如下:

这个内存通过一个左右摆动的开关可以连接任意一个线程,同时每一个线程必须按照程序的顺序来执行内存读/写操作。当多个线程并发执行时,图中的开关装置能把所有下城的所有内存读/写操作串行化。

3.3 同步程序的顺序一致性效果

1
2
3
4
5
6
7
8
9
10
11
12
13
class SynchronizedExample {
int a = 0;
boolean synchronized void writer() { // 获取锁
a = 1;
flag = true;
} // 释放锁
public synchronized void reader() { // 获取锁
if (flag) {
int i = a;
......
}
} // 释放锁
}

下面是该程序在两个内存模型中执行时序对比图:

可以得知, JMM 在具体实现上的基本方针为:在不改变(正确同步的)程序执行结果的前提下,尽可能地位编译器和处理器的优化打开方便之门。

3.4 总线的工作机制

总线保证了处理器之间的串行化

4 volatile 的内存语义

实例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class VolatileFeaturesExample {
volatile long i = 0L;

public void set(long i) {
this.i = i;
}

public void getAndIncrement() {
i++;
}

public long get() {
return i;
}
}

假设有多个线程分别调用上面程序的 3 个方法,这个程序在语义上和下面程序等价:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class VolatileFeaturesExample {
volatile long i = 0L;

public synchronized void set(long i) {
this.i = i;
}

public void getAndIncrement() {
long temp = get(); // 调用已同步的读方法
temp += 1L;
set(temp); // 调用已同步的写方法
}

public long get() {
return i;
}
}

接着看第二个实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestVolatile {
public volatile boolean flag = false;
int a = 0;
public void writer() {
a = 1;
flag = true;
}
public void reader() {
if (flag) {
System.out.println(a);
}
}
}

当写一个 volatile 变量时, JMM 会把该线程对应的本地内存中的共享变量值刷新到主内存。在读一个 volatile 变量时, JMM 会把该线程对应的本地内存置为无效。如下图:

在读 flag 变量后,本地内存B 包含的值已经被置为无效。此时,线程B 必须从主内存中读取共享变量。线程B 的读取操作将导致本地内存B 与主内存中的共享变量的值一致。

volatile 内存语义的总结:

线程A 写一个 volatile 变量,实质上是线程A 向接下来将要读这个 volatile 变量的某个线程发出了(其对共享变量所做修改的)消息。
线程B 读一个 volatile 变量,实质上是线程B 接收了之前某个线程发出的(在读这个 volatile 变量之前对共享变量所做修改的)消息。
线程A 写一个 volatile 变量,随后线程B 读这个 volatile 变量,这个过程实质上是线程A 通过主内存向线程B 发送消息。

4.1 volatile 内存语义的实现

下面看看 JMM 如何实现 volatile 写/读的内存语义:

当第一个操作是 volatile 读时,不管第二个操作是什么,都不能重排序。
当第一个操作是 volatile 写时,第二个操作是 volatile 读时,不能重排序。
当第二个操作是 volatile 写时,不管第一个操作是什么,都不能重排序。

4.2 volatile 实践必读:https://www.ibm.com/developerworks/cn/java/j-jtp06197.html