商城系统的实现-高级部分

开源好用的“搜索引擎”。

环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
docker pull elasticsearch:7.4.2
docker pull kibana:7.4.2

// 查看剩余内存,确保能够装上
free -m

// 创建所需文件夹
mkdir -p /mydata/elasticsearch/config
mkdir -p /mydata/elasticsearch/data
chmod -R 777 /mydata
echo "http.host: 0.0.0.0" >> /mydata/elasticsearch/config/elasticsearch.yml

// 启动容器,如果内存够大的话,建议开大一点内存,否则我这里经常崩溃
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms128m -Xmx256m" \
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearc/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.4.2

// 确认成功,在浏览器中输入ip:9200
// 如果看到了json数据,说明成功

docker run --name kibana -e ELASTICSEARCH_HOSTS=http://your_ip_address:9200 -p 5601:5601 -d kibana:7.4.2

// 确认成功,在浏览器中输入ip:5601
// 如果看到了可视化界面 说明成功了

初步使用

本质上elastic search其实用的是http来进行通信的,所以测试可以直接上postman来进行模拟即可。

_cat

类似于数据库查看各个表格和数据库的信息的“命令”。

  • /_cat/nodes:查看所有节点信息

  • /_cat/health:查看健康信息

  • /_cat/master:查看主节点信息

  • /_cat/indices:查看所有的索引,相当于查看所有的数据库。

保存数据

类似于mysql下面将数据保存到哪个数据库(索引)中的哪个表格(类型)里面。

只需要使用PUT/POST来进行的,只需要指定好索引和类型,再添加上数据的唯一标识即可。

比如我使用PUT方法,来访问了http://ip:9200/customer/external/1这条记录,并且在请求体里面加上了一个json数据,回显值如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"_index": "customer",
"_type": "external",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}

PUT和POST两者之间的区别:

  • PUT访问必须带有id,且如果之前elastic里面没有就新增,有的话就修改
  • POST可以不带id

查询

查询和保存是一样的,只不过方法变成了GET,而且不需要请求体了。

1
2
3
4
5
6
7
8
9
10
11
12
{
"_index": "customer",
"_type": "external",
"_id": "1",
"_version": 1,
"_seq_no": 0, //版本号
"_primary_term": 1, // 并发控制字段,用来实现乐观锁
"found": true,
"_source": {
"name": "test"
}
}

里面值得注意的是,我们真正需要的json数据是在_source里面的一个json,同时_seq_no_primary_term这两个字段是用来实现乐观锁的,我们在查询的时候只需要带上?if_seq_no=1&if_primary_term=1这种条件判断即可。

更新

当然上面也说了其实保存数据也是可以更新的,但是还是用update比较正规点。

POST /customer/external/1/_update

1
2
3
4
5
{
"doc":{
"name":"测试2"
}
}

如果这次更新和原来的数据一模一样,那么就不会进行任何操作。如果要新增额外的数据,直接加到里面就行了

POST /customer/external/1

1
2
3
{
"name":"测试2"
}

不论是否相同,都会把版本号增加,即永远都会去操作,而不管原始数据如何。而且无法新增额外的数据,除非写进doc里面。

删除

DELETE /customer/external/1 非常简单。注意点:无法单独删除类型。

批量操作

POST /customer/external/_bulk,请求体里面需要这么写:

1
2
{"index":{"_id":"1"}}
{"name":"测试"}

这个如果直接借助POSTMAN会有问题,需要去kibana里面用devtool来写。

基本上的格式就是上面一条用来说明需要修改哪条数据,然后紧接着数据,所以是两两一组。当然因为delete的特殊性,所以delete只有一条。

官方数据

由于接下来需要展示检索等功能,所以需要庞大的数据量,官方给我们提供了实例,可以去这个网站上下载:https://raw.githubusercontent.com/elastic/elasticsearch/master/docs/src/test/resources/accounts.json

从上面的url可以得到一些官方提供的测试数据,全部复制下来进行操作。

POST /bank/account/_bulk +上面的数据,然后执行即可。

进阶使用

es支持两种检索,分别是通过REST request URI来发送检索参数或者是通过REST request body来发送。下面以两个例子来说明。

  1. 直接在uri里面来带上参数:http://39.96.86.104:9200/bank/_search?q=*&sort=account_number:asc
  2. 在请求体里面:http://39.96.86.104:9200/bank/_search,请求体数据如下:
1
2
3
4
5
6
7
8
9
10
{
"query": {
"match_all": {}
},
"sort": [
{
"account_number": "asc"
}
]
}

接下来的例子,都用的kibana里面的代码编辑器写了,比较方便。

Query DSL

官方文档

首先是最简单的全部查询:

1
2
3
4
5
6
GET /_search
{
"query": {
"match_all": {}
}
}

然后是指定特殊的字段:

1
2
3
4
5
6
7
8
GET /bank/_search
{
"query": {
"match": {
"address" : "mill"
}
}
}

Aggregations

官方文档

非常简单,熟练掌握好套娃即可。

Mapping

官方文档

我们之前在查询的时候,你会发现其实根本没有携带上accounts这个type,但是查询结果不受影响,这是因为这个属性已经被废弃了,以后就只有index和数据了,这样子更简洁明了。

mapping的存在就是为了能够让数据和类型一一对应;但是如果你不写的话,默认情况下ES会帮助你来自动填写。

分词

就是一大段话变成一个一个的词,就能进行检索了。

简单测试一下ES自带的标准分词器:

1
2
3
4
5
POST _analyze
{
"analyzer": "standard",
"text": "The 2 QUICK Brown-Foxes jumped over the lazy dog's bone."
}

当然自带的分词器对于中文的支持特别不友好,所以我们需要安装开源的分词器——ik

我们只需要下载对应的版本,然后解压扔到plugin目录下面的ik文件夹,并且赋予ik文件夹权限,最后删掉这个压缩文件,并且重启容器即可。

测试数据:

1
2
3
4
5
POST _analyze
{
"analyzer": "ik_smart",
"text": "中文测试"
}

显然我们需要自己来扩展词库,然后进行补充。自带的词库可以选择远程获取,所以这里打算开一个Nginx服务器来充当这个角色,仍然是docker启动。

首先启动一个Nginx,主要目的是为了“偷取”它的配置文件,并且之后可以为我们所用:

1
2
3
4
5
6
7
8
9
10
11
12
13
docker run -p 80:80 --name nginx -d nginx

docker container cp nginx:/etc/nginx /mydata/nginx/
// 然后把/mydata/nginx/nginx 改成 /mydata/nginx/conf即可

docker stop nginx
docker rm nginx

docker run -p 80:80 --name nginx \
-v /mydata/nginx/html:/usr/share/nginx/html \
-v /mydata/nginx/logs:/var/log/nginx \
-v /mydata/nginx/conf:/etc/nginx \
-d nginx

默认是没有index.html的,所以会出现403,只要自己写一个index.html丢到下面即可。

接下来就是需要告知ik去哪里找到这个字典,只需要在/mydata/elasticsearch/plugins/ik/config下面找到IKAnalyzer.cfg.xml,并且根据提示输入远程字典的地址即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict"></entry>
<!--用户可以在这里配置自己的扩展停止词字典-->
<entry key="ext_stopwords"></entry>
<!--用户可以在这里配置远程扩展字典 -->
<entry key="remote_ext_dict">词典的地址</entry>
<!--用户可以在这里配置远程扩展停止词字典-->
<!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>

最后重启一下ES,就可以输入对应的词典看到效果了。

使用Java来操作

  1. 使用9300端口,使用TCP的长连接来使用。但是官方不建议使用,且之后要废弃。
  2. 使用9200端口,使用HTTP来使用。当然如果真的要我们写那么多的解析之类的,是比较烦的,因为理论上任何能发送http请求的第三方工具都可以,所以需要借助第三方工具——Elasticsearch-Rest-Client

为了能够使用,我们创建一个Module,只需要选中web服务即可.

  1. 导入依赖
1
2
3
4
5
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.2</version>
</dependency>
  1. 编写配置信息,给容器中注入RestHighLevelClient
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class ElasticConfig {

@Bean
public RestHighLevelClient esRestClient() {
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("服务器地址", 9200, "http")));
return client;
}
}
  1. 编写测试类
1
2
3
4
5
6
7
@Autowired
RestHighLevelClient client;

@Test
void contextLoads() {
System.out.println(client);
}

我这里遇到一个比较坑爹的问题,就是spring boot的默认版本是6.X的,我用的是7.4.2的,会报Failed to load ApplicationContext,记得去改掉。

存储操作

1
2
3
4
5
6
7
8
IndexRequest request = new IndexRequest("bank"); 
request.id("1");
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);

就是自己拼接出一个json,然后就可以发送了。当然这个字符串的形式非常蠢,我们可以用提供好的map封装,清楚明了:

1
2
3
4
5
6
7
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts")
.id("1").source(jsonMap);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);

搜索操作

搜索操作其实是比较推荐先写好搜索语句,然后到kibana里面试一下,然后参照这个文档,就可以写出代码了,因为elastic对这些东西的封装还是很可以的。

es整合

项目中的使用

首先,es为了速度,本身利用的是内存空间来进行数据的操作的。所以在数据结构方面,我们需要进行一些慎重考量(尤其是我用的云服务器本身那么内存那么吃紧)。

为了搜索,我们肯定需要确定搜索出来的数据格式,经过简单思考,不难得出下面的例子:

1
2
3
4
5
6
7
8
9
skuId:1
spuId:11
skuTitle:华为
price:5999
saleCount:3000
attr:[
size:5寸,
......一堆属性
]

之前也提到过,每个SKU大体上是相似的,只是落实到具体的上面略有差异,所以其实attr那一栏,是非常非常多余的,对于每一个SKU来说其实这就是冗余的一部分。

那么怎么改进呢?既然冗余的是attr这个属性,那么就可以让它成为一个索引,然后去查这个就可以了。那么这样真的有用吗?答案是否定的,因为这样做,在使用动态查询的时候,分布查询的查询量会非常非常大,因为对于每个sku你都需要查询一下它的属性,这样直接就炸了。所以其实还是使用第一种,表面上虽然浪费了空间,但是换来了极大的时间提升。

ES的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
PUT product
{
"mappings": {
"properties": {
"skuId":{
"type":"long"
},
"spuId":{
"type":"keyword"
},
"skuTitle":{
"type":"text",
"analyzer": "ik_smart"
},
"skuPrice":{
"type": "keyword"
},
"skuImg":{
"type": "keyword"
},
"saleCount":{
"type":"long"
},
"hasStock":{
"type": "boolean"
},
"hotScore":{
"type": "long"
},
"brandId":{
"type":"long"
},
"catalogId":{
"type":"long"
},
"brandName":{
"type":"keyword"
},
"brandImg":{
"type":"keyword"
},
"catalogName":{
"type":"keyword"
},
"attrs":{
"type": "nested",
"properties": {
"attrId":{
"type":"long"
},
"attrName":{
"type":"keyword"
},
"attrValue":{
"type":"keyword"
}
}
}
}
}
}

接下来就是对我们所上架的商品加入对应的逻辑即可。

域名整合

鉴于目前手头并没有域名,就算是需要配置也需要去备案,麻烦的很,所以决定先使用Host文件来冒充一下域名好了。

修改/etc/hosts文件,让访问域名mall.com就指向我们的本机127.0.0.1,这样在浏览器里输入mall.com:7000就可以指向我们的商品服务了。

接下来是配置Nginx服务器,具体流程是这样的:在本地用docker搭建了Nginx服务器,然后每当我们在浏览器里输入了mall.com,由于host文件,就会访问本机的80端口,然后此时Nginx就进行判断,如果host的值是mall.com,就转发给网关,然后网关再分发给具体的微服务。详细的注意和配置如下:

  1. 配置Nginx:首先是我们搞了一个独立的配置文件,然后让其监听80端口,且host字段是mall.com的,且由于我们需要做负载均衡(虽然目前就一台…),所以提前在总配置文件里面配置了:
1
2
3
upstream mall{
server 192.168.2.135:12000; # 网关的地址,接下来用mall就可以代替了
}
1
2
3
4
5
6
7
8
9
10
11
12
13
server {
listen 80;
server_name mall.com;

#charset koi8-r;
#access_log /var/log/nginx/host.access.log main;

location / {
# proxy_pass http://127.0.0.1:7000;
proxy_set_header Host $host;
proxy_pass http://mall;
}
}

这里尤其需要注意配置了保留host这个,因为nginx在做转发的时候默认会丢掉它,而我们之后的网关配置则需要用到host这个属性,所以需要保留。

  1. 配置我们的网关微服务:
1
2
3
4
- id: mall_host_route
uri: lb://mall-product
predicates:
- Host=**.mall.com

网关也很简单,只要匹配到主机地址是mall.com结尾的,就转发给产品页。这里需要再次强调,网关的部分是按照顺序从上到下匹配的,所以上面的这个配置要放到我们之前的配置下面,否则那些api替换的就会失效了。

压测

我们肯定不能真的在写项目的时候就能召集百万人来同时测试,这个时候就需要软件来进行实现

JMeter

apache下的一个基于Java的压力测试软件,因为是Java写的,所以那个界面叫一个难看….

用起来还是比较简单的。我在我的Mac上面测试了一下:

结果还是挺不错的。同时也放上百度的测试结果:

可以看到百度的实际延时明显低于我的这个,而且我这个其实只是涉及到本机间的来回,性能方面来说应该是要远胜于百度的,但是实际却是被各方面吊打,所以需要进行优化。

内存监控

首先优化的我们需要确定,这个是CPU密集型(计算、排序等)还是IO密集型(网络、磁盘、数据库等)。

接下来就是通过使用java为我们提供的内存工具jvisualvm来监控下,以便能够获得更好的调优效果。这个在我的默认jdk下载下来是没有的,需要手动下载(官网说的是随着jdk下载的,实际上并没有,就很奇怪)。

然后打开这个软件,在插件中心里面找到visual gc,安装完成之后重启,就可以方便我们看到内存堆里面的变化了:

接下来我们就可以进行性能和压力测试了。

分部分测试

我们的应用流程是这样子的:浏览器->nginx->网关->我们的应用,所以我们其实需要对这些部分进行一一测试。

nginx

首先,根据我们的业务逻辑,我们大致可以推断出Nginx是计算密集型的,因为它的任务就是把请求转发给官网,不需要做什么复杂的逻辑。

使用docker stats来查看nginx的状态如下:

可以看到内存只用了4.5M,CPU几乎不用….

然后打开JMETER对其进行压力测试:

可以看到CPU使用率大幅度上升,而内存几乎没变化(同时我的Mac也变得相当烫手),可以证明,Nginx是计算密集型的。

经过测试,我这边吞吐量是500,然后三个响应时间分别是83/393/1467。

我是在自己的机器上,用的docker for mac版本,看了一眼吃了3G的内存,果断先放弃了,我还是老老实实自己装一个Nginx吧。

因为我这里是用的homebrew安装的nginx,所以稍微有点不同。

  • 配置目录:/usr/local/etc/nginx
  • 安装路径:/usr/local/Cellar/nginx/1.x.x

其它更加详细的信息,可以通过brew info nginx查看

网关

我本地的网关是http://localhost:12000/,打开会发现是404,不过没关系,因为就算是404,其实也是处理过了的。

然后进行了压测,发现吞吐量是2000,好了不少,同时发现响应时间是45/48/146。

其余结果

接下来由于我进行压力测试把自己的电脑搞炸了,所以就不敢接下来的测试了。

根据他人的测试结果,不难得出以下结论:

  • 中间件越多,效率越低。这个目前来说没什么办法….
  • 数据库非常影响性能。我这边对数据库没有任何的索引,所以非常非常慢,而且还是远程数据库,更加慢了。
  • 模板的渲染速度。这点可以通过开启缓存来提升一部分的性能,但是还算是比较有限。
  • 静态资源的分发会影响。目前微服务的静态资源是自己提供的,其实完全没必要,交给最前面的nginx让其提供就可以了。

优化

Nginx动静分离

我们把所有的静态资源都放到Nginx下面,一般我们约定,以/static开头的路径就放到Nginx中。

首先我们把微服务里面的那些静态资源统一放置到对应的Nginx中,然后修改一下首页的index.html代码,最后需要修改下配置即可。

1
2
3
location /static/ {
root /usr/share/nginx/html;
}

启用缓存

由于我们的三级页面其实长时间都不会发生变化,所以其实每次都去访问一下数据库去查资源,非常浪费时间,而且这个代码本身就有问题,理论上去查一次数据库就够了,它居然去查了n次,效率超级低。

首先哪些数据适合放入缓存?

  • 对即时性要求低。
  • 对数据一致性要求不高,即数据库和缓存内的数据一致性要求不高。
  • 访问量大且更新频率不高。也就是读的很多,但是写的很少的数据。

接下来就是使用大名鼎鼎的redis来作为缓存的中间件了。springboot中使用也非常简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>

然后在配置文件里指定好host和端口,就可以直接使用了。

存取的demo:

1
2
3
4
5
6
7
8
9
10
@Autowired
StringRedisTemplate stringRedisTemplate;

@Test
void testRedis(){
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
ops.set("hello", "world_"+ UUID.randomUUID().toString());
String hello = ops.get("hello");
System.out.println(hello);
}

通过新增缓存,我发现效率提升了5倍。美滋滋。但是缓存的使用,有时也会带来一些问题。

  • 缓存穿透。想象一下这个问题,比如有极大的查询同时引入,按照逻辑都是先判断缓存里有没有,大家都是没有,于是就去蜂拥查询数据库,这样会导致数据库瓶颈。或者说,我们查询一个一定不存在的商品,那么也会导致缓存失效,导致数据库疯狂查询。所以解决办法就是:我们就算数据库查出来是空的,也给缓存写入,并加入短暂的过期时间。
  • 缓存雪崩。缓存里的数据的过期时间是一致的,也就是到了某个时间缓存数据都没了,就又全去数据库里查了,解决办法也很简单,设置这些缓存的过期时间为随机,不要让它们同时失效就可以了。
  • 缓存击穿。对于某个热门的key,可能会在某个失效时间后,极短时间内被疯狂查询。解决办法是加锁,先只让一个人去查,那个人查完之后redis里就有数据了,就不用去db查询了。

前两个操作都非常容易,只需要稍微改改就行了,比较难的是加锁那部分。

我们都知道spring默认就是单例的,其实只有一个对象,所以我们可以直接锁this来确保同步块的完成。虽然锁的开销稍微大了那么一点,但是确实是完美完成了任务。但是synchronized只是确保了,在本机上只查询一次数据库,而不能保证在分布式的环境下也可以做到,此时就需要分布式锁来完成这个任务了。

其实我个人觉得本机上的锁确实也绰绰有余了,因为虽然它只确保了一台机器只有一个线程可以访问到数据库,但是理论上的机器也不会很多,每个机器一个线程去访问数据库确实是绰绰有余的,数据库完全可以接受。

其实分布式锁和本地锁在机制上是一致的,大家都是使用相同的资源,看看自己能不能抢到。

分布式锁-Redis

Redis支持一些原子操作,比如SET lock 1 NX,只有返回OK,说明成功占到了锁。对应的Java代码:

1
Boolean hasLock = redisTemplate.opsForValue().setIfAbsent("lock", "1");

但是这样是有问题的,比如拿到锁了要是不释放怎么办,因为是在分布式的环境中,所以就算用了finally关键字,也会有断电等问题的发生,所以此时需要设置一个过期时间,这样能够确保就算拿到锁的那位突然跪了,至少一段时间后,锁会自动释放。Redis贴心为我们设置好了:SET lock anystring NX EX 300,只要执行这条命令,就可以轻松在拿到锁的同时设置好过期时间。

其次第二个问题:释放锁的时候,如果只是简单的使用del lock,就会带来一个问题,要是这个锁不是我上的怎么办?那简单,我只需要把锁设置成特殊值,然后删掉的时候对比一下就好了。但是由于网络等原因,可能会导致一些问题,所以我们也期待对比和删除是一个原子操作。Redis的解决方案就不够优雅了,是一个lua脚本:

1
2
3
4
5
6
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end

但是至少保证了功能的可用性。最后把上面那些写成代码就大概长成这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void getWithRedisLock() {
String token = UUID.randomUUID().toString();
Boolean hasLock = redisTemplate.opsForValue().setIfAbsent("lock", token, 300, TimeUnit.SECONDS);
if (hasLock) {
try {
// 这里执行去数据库获取内容的操作
} finally {
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), token);
}
} else {
// 没有拿到分布式锁,自旋
getWithRedisLock();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

分布式锁-Redisson

显然上面的解决方案不够优雅,我们需要更加简洁的方法来实现——redisson

  1. 导入依赖
1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.5</version>
</dependency>
  1. 使用配置类
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class MyRedissonConfig {

@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
}
  1. 使用
1
2
3
RLock lock = redissonClient.getLock("my-lock");
lock.lock(30, TimeUnit.SECONDS);
lock.unlock();

接下来主要对几种分布式锁进行简单介绍。更加详细的介绍请参考官方wiki.

读写锁

比较常用的锁,读与读之间是没有关系的,日常使用绝大部分用的就是这个场景。

1
2
3
4
5
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
信号量

信号量可以用来解决高并发情况下大家排队的问题。

1
2
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();

如果获得了,就可以继续执行;否则就只能等待。

闭锁

比如可以实现读取10次,就写入一次,这样在多个线程之间就可以有操作了。

1
2
3
4
5
6
7
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

缓存失效

缓存失效了,我们目前比较通用的有三种方法来解决:

  • 更新的时候,同时也更新缓存——双写模式
  • 更新的时候,把缓存删除(这样后面的人就会重新查数据库来更新缓存)——失效模式
  • 利用从数据库机制——canal。

其中的双写模式和失效模式,由于在分布式系统中,有延时等因素的存在,所以我们在使用的时候最好加上读写锁。当然如果你对数据库和缓存的一致性的要求不高,那么是可以不用加锁的。

总结一下就是:我们之前就提到过,使用缓存本身就是为了那些对数据一致性要求不高的数据,所以我们只需要在缓存的时候为数据加上过期时间,基本可以高枕无忧。而如果是对数据一致性要求高的数据,那么我们宁可每次都去数据库查,这样更划算。

Spring Cache

之前我们通过redis增加了缓存,然后通过自己实现锁来加深了理解,最后又通过redisson这个出色的解决方案来简化了我们的操作。但是,我们现在做缓存还是需要写不少代码,而且其实缓存的逻辑又是高度一致的:先查缓存,如果查到就返回,没有查到就去数据库找,从数据库里找完放入一份到缓存再返回。所以spring为我们做了抽象。

  1. 导入starter
1
2
3
4
5
6
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>

// 还要一个redis,但是上面引过了
  1. 在配置里指定type为redis即可,然后在@EnableCaching

接下来就可以直接使用注解来使用了。但是我们还是需要稍微进行一些自定义的使用方法。

  • 指定存放在redis中的key,直接用key来指定,而且支持SeEL表达式。
  • 指定缓存数据的过期时间,这个可以直接在配置文件里指定。
  • 指定数据的序列化格式,推荐用json,这样就和平台无关了,这个就比较麻烦了。需要自己实现一个配置然后加入到容器里。
1
2
3
4
5
6
7
@Bean
public RedisCacheConfiguration redisCacheConfiguration() {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
return config;
}

最后稍微提一句,springcache没有针对写的时候加的锁,所以如果项目中真的需要,可以自己用Redisson来实现。

搜索实现

对于搜索功能的实现,我们首先需要确定,我们有哪些东西是需要搜索的,然后搜索出来的结果需要怎么封装。

搜索条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Data
public class SearchParam {


/**
* 关键字,从搜索栏过来,相当于skuTitle匹配
*/
private String keyword;

/**
* 点击分类进来的
*/
private Long catalog3Id;

/**
* 排序
*/
private String sort;

private Integer hasStock;
private String skuPrice;
private List<Long> brandId;
private List<String> attrs;
private Integer pageNum;
}
  • 搜索第一件事,那肯定是搜索的关键字嘛,根据关键字来进行搜索。比如我在京东搜索xiaomi,得到的url是这样子的:https://search.jd.com/Search?keyword=xiaomi&enc=utf-8&wq=xiaomi
  • 接下来的是对catalog3Id,也就是用户点击分类来进行筛选的。比如我在京东通过首页的笔记本这个点进去,可以看到url长这个样子:https://list.jd.com/list.html?cat=670,671,672,简单明了。
  • sort字段用来对搜索出来的商品进行排序。
  • 接下来的几个字段基本就是见名知意,这里就不多叙述了。

搜索返回的结果

搜索返回的结果倒还是蛮简单的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Data
public class SearchResult {

private List<SkuEsModel> product;

private Integer pageNumber;
private Long total;
private Integer totalPages;

private List<BrandVo> brands;
private List<AttrVo> attrs;
private List<CatalogVo> catalogs;

@Data
public static class BrandVo {
private Long brandId;
private String brandName;
private String brandImg;
}

@Data
public static class AttrVo {
private Long attrId;
private String attrName;
private String attrValue;
}

@Data
public static class CatalogVo {
private Long catalogId;
private String catalogName;
private String catalogValue;
}
}

然后就是通过页面传过来的这些条件,构造一些es的条件,并且将数据传送给es进行处理,结果在前端页面展示出来。前端页面的渲染使用的是thymeleaf,但是我这里idea一编写这个模块就卡的飞起,实在受不了。

异步和线程池

首先回顾下创建线程的几种方式:

  1. 继承Thread,重写run方法。
  2. 实现Runnable接口,重写run方法
  3. 实现Callable接口,并且通过FutureTask来进行调用。
  4. 线程池技术。

其中方法1我们基本是不用的,方法2是方法1的取代,但是方法2并不能得到线程的返回值,所以有了方法3;但是它们都不能控制系统资源且效率低下,所以有了方法4。所以以后基本上一律使用线程池。

线程池

线程池的创建也有好几种方法,但是实际上都是下面的方法的封装,所以下面的方法参透就差不多了:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize:核心线程数。就算系统空闲,这些线程数也是保留的,不会被回收。
  • maximumPoolSize:最大线程数量。
  • keepAliveTime:存活时间。当前的正在运行的线程数量大于核心的数量,即线程空闲的时间大于指定的时间,就会被释放,注意核心线程是不会被释放的。
  • unit:存活时间的时间单位。
  • workQueue:阻塞队列。提交了很多任务,就先放到这里保存一下。所以如果有线程空闲,那就会到这里取出需要的东西。
  • threadFactory:创建线程的工厂。
  • handler:如果连workQueue都存满了,那就需要用到handler来拒绝这些参数。

流程大概是这样的:

  1. 线程池创建,准备好指定的核心线程数,准备接受任务。
  2. 新的任务进来,用核心线程数来执行。
    1. 如果core都在忙碌,那就先放到workQueue里面,等core空闲了自己会去执行的。
    2. 如果workQueue放满了,此时就会去新开线程去执行,但是最大不能超过预设的值。
    3. 多出来的那些线程会在keepAliveTime时间后自动销毁,相当于系统又回到了corePoolSize。
    4. 如果已经是最大线程数量,且已经在队列里放满了任务,那么就只能通过handler来拒绝这些任务了。
  3. 任何线程的创建,都是由threadFactory来指定的。

异步编排

了解了线程池之后,我们需要知道有些任务是互相之间没有关系的,而有些任务则是需要有先后关系的,这个时候就需要异步编排来解决了,这里用的是CompletableFuture这个类来完成相关操作。

1
2
3
4
5
6
7
8
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("thread start");
Integer i = 10 / 2;
return i;
}, service).whenComplete((result, exception) -> {
System.out.println("result=" + result);
System.out.println("exception=" + exception);
});

可以看到只需要后缀上一个whenComplete就可以解决这种先后的顺序问题;其实这也就是它的设计思路,通过这些后缀来解决这些先后的问题。之后的那些复杂的逻辑,就可以通过线程池的技术来加速,但是需要注意它们之间的顺序问题。

注册功能

发送短信

这个功能其实只需要到阿里云或者腾讯云购买即可。这边简单介绍下阿里云的发短信原理。

本质上只需要给阿里云指定的服务器发送一个get的数据包,请求里带上你要发的短信信息的模板,你要发送的手机号码和验证码,并且在请求头上添加好对应的APPCODE,服务器接收到了以后就可以根据这些信息给你指定的手机号码发送指定的短信内容了。

所以前端在点击发送验证码的时候,就发送一个ajax请求给服务器,然后服务器调用对应的方法来处理。

页面注册

其它的不多解释,就是现在的注册页面都有一个短信验证码功能,点击“发送验证码”,就会发送一条短信到你手机上,然后输入验证码即可注册成功。为了防止被滥用,我们需要做一些限制。

  • 接口防止被滥用。防止同一个手机号,通过页面刷新可以让验证码再次发送。解决办法就是发送验证码的时候,把发送验证码的时间一并加入到redis中,然后就可以通过时间比对来查看。
  • 验证码二次校验。验证码我们一般会存入到redis中,并且设置好过期的时间,这样就没问题了。

社交登录

一般做这种注册页面,其实用户并没有耐心去某一些小网站单独注册,而是普遍采用QQ、微信、微博等第三方社交登录的。

这个基本上跟着API文档,都是很简单的操作,就略过了。

数据共享

之前的单应用,我们可以通过session进行数据的共享。但是到了微服务这里,因为域名的变化,导致了我们使用session已经无法共享数据了。还有一种情况是,分布式,也就是第一次访问了服务器A,第二次由于负载均衡访问了服务器B,这样就会导致问题。

同域名下解决方案

  1. Tomcat原生支持在多台服务器之间进行信息的同步,缺点是要耗费大量的网络带宽。此方案并不可取。

  2. 不在服务器里放session,而是在浏览器端放session。浪费带宽,容易修改,cookie有上限。此方案不可取。

  3. hash一致性。在Nginx上进行操作,让同一个用户永远到同一台服务器。

  4. 统一存储。所有的session都存到Redis中,速度会略微慢一点。

不同域名的解决方案

其实不同域名的解决方案很简单,只需要设置好域就可以了。

上面的这些,其实通过整合spring session就可以完成了。

Spring session

spring session本质上是一个过滤器,过滤了每次浏览器带过来的session,替换成我们需要的session。

  1. 引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-bom</artifactId>
<version>Dragonfruit-RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
  1. 配置文件里写入spring.session.store-type=redis,当然需要配置好Redis的连接信息,但是之前配置过了,所以拿来用就可以了。
  2. 在主类上面添加上@EnableRedisHttpSession就可以使用了。

经过使用,会发现存在一些痛点:session的作用域不能改,session的序列化反序列化不支持修改。当然解决办法也是非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
public class SessionConfig {

@Bean
public CookieSerializer cookieSerializer(){
DefaultCookieSerializer cookieSerializer = new DefaultCookieSerializer();
cookieSerializer.setDomainName(".mall.com");
cookieSerializer.setCookieName("MALLSESSION");
return cookieSerializer;
}

@Bean
public RedisSerializer<Object> springSessionDefaultRedisSerializer(){
return new GenericJackson2JsonRedisSerializer();
}
}

把它们俩加入到容器,就可以解决了。

核心就是通过包装request,session是通过request拿到的,所以就可以通过包装request来操纵session了。

跨域名数据共享

同一个域名之间我们可以通过将session的域放大来达成,而不同域之间,session肯定是没用了,这个时候需要我们自己来实现一个跨域的session,本质上其实就是利用cookie来达成的,网上有很多这样的例子,这里就不多说了。

购物车

购物车比较重要的一点是,就算不登录,而且把浏览器关掉了,下次打开还是能打开购物车。每个用户都有两个购物车,一个是临时购物车,一个是正式的购物车。

只要登录了,就会临时购物车的商品合并到登录状态的正式购物车。

参考下京东的实现方式:

本质上就是一个cookie,然后每次访问都带着它,而且可以看到有效期长达一个月。只要你把这个cookie一删,你购物车里面的东西就没了(当然jd马上就会发一个全新的给你)

购物车本身其实是读写高并发的,所以我们可以采用放到Redis里面。

设计的数据结构如下:

1
2
3
4
5
6
7
8
9
10
{
"skuId":"1",
"check":"true",
"title":"iphone",
"defaultImg":"xxx",
"price":"1099",
"count":"1",
"totalPrice":"1099",
"skuSaleVo":"attrs"
}

然后就是对应的java结构。基本上就是每一个条目抽象出一个类,然后整个购物车就是这些条目构成的。

消息队列

使用的是rabbit MQ

安装

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

具体的端口含义:

  • 4369和25672是erlang的发现和集群端口

  • 5671和5672是AMQP端口

  • 15672是Web管理界面

  1. 依赖:
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

会有一个RabbitAutoConfiguration类,然后这个类给容器里注入了

  • RabbitTemplate:收发消息。
  • AmqpAdmin:负责管理声明各种exchange,queue和bindings。
  • CachingConnectionFactory
  • RabbitMessagingTemplate。
  1. 配置文件里去配置一下:
1
2
spring.rabbitmq.host=xxx
spring.rabbitmq.virtual-host=/
  1. 开启功能:@EnableRabbit即可。如果需要监听,则使用@RabbitListener

消息队列抗灾

rabbitmq是支持事务级别的,但是效率会极大降低,所以并不推荐。可以通过消费者的回调和生产者的回调来完成相同的功能。

生产者回调

生产者到消息队列首先需要先发送到服务器(准确的说是Exchange)中,然后exchange在交给queue,一共这两个阶段,对于这两个阶段,可以通过设置RabbitTemplate来获取回调消息。

  1. 从生产者到exchange,需要在配置文件里设置spring.rabbitmq.publisher-confirms=true
1
2
3
4
5
6
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
// 从生产者到exchange的消息回调,不论成功失败都有
}
});
  1. 从exchange到queue,需要设置两个(第二个非必需),分别是spring.rabbitmq.publisher-returns=truespring.rabbitmq.template.mandatory=true
1
2
3
4
5
6
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
// 只有当message没有到queue才会触发
}
});

消费者Ack机制

每当消费者收到message,就会向服务器发送确认消息,而且是自动的。当然可以通过手动配置来让我们自己手动发送ack:spring.rabbitmq.listener.simple.acknowledge-mode=manual即可。

订单

订单里分别有,信息流,资金流和物流。

幂等性

很多时候我们由于网络原因,会多次发送同一个请求,那么我们是如何保证这多次请求只生效一次的呢?

  • token机制。验证码也可以理解为一个token,这样你每次发送的时候,服务器要求你的token就会发生变化,然而你如果多次重复发送,你的token并不会发生变化。且token的获取、比对和删除操作必须是原子性操作。这一点可以由Lua脚本来实现:if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end
  • 锁机制。毫无疑问,锁机制都能让解决高并发的数据一致性问题,对于这个小小的重复请求肯定是不在话下的。
  • 数据表唯一性约束:依靠数据库的唯一性约束,当然可以完成这一点。
  • 防重表。听名字就知道是干什么的了。
  • 全局请求唯一ID。

分布式事务

在理论计算机科学中,CAP定理(CAP theorem),又被称作布鲁尔定理(Brewer’s theorem),它指出对于一个分布式计算系统来说,不可能同时满足以下三点:

  • 一致性(Consistency) (等同于所有节点访问同一份最新的数据副本)
  • 可用性(Availability)(每次请求都能获取到非错的响应——但是不保证获取的数据为最新数据)
  • 分区容错性(Partition tolerance)(以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。)

在实际使用中,P我们必须保证,而A也必须保证,所以意味着我们只能舍弃了一致性,但是一致性可以通过raft算法来支持,做不了强一致性,我们只能退而求其次,最终一致性。

实际中有几种实际解决方案:

  1. 2PC模式。由一个事务管理器(事务协调器)来统筹管理所有的事务,它会发通知告知所有事务管理器是否准备好。当收到它们准备好了消息的时候,就会命令它们去执行事务。实际产品——SEATA
  2. 3PC模式。比上面那个多了一层
  3. TCC。就是自己写好回调方法,这样当事务有问题了就可以回调对应的方法来进行补偿。
  4. 最大努力通知型。通过MQ来发送http请求,设置好最大通知次数。
  5. 可靠消息+最终一致性。可靠非常重要,就是靠可靠才能实现最后的

SEATA

  1. 为每一个微服务创建 UNDO_LOG 表:
1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
  1. 创建seata服务器,注意需要把自己注册到nacos中。
  2. 导入依赖
1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
  1. 使用seata来包装自己原来的数据源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class MySeataConfig {

@Autowired
DataSourceProperties dataSourceProperties;

@Bean
public DataSource dataSource(DataSourceProperties dataSourceProperties) {
HikariDataSource source = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
if (StringUtils.hasText(dataSourceProperties.getName())) {
source.setPoolName(dataSourceProperties.getName());
}
return new DataSourceProxy(source);
}
}
  1. 每个微服务导入file.conf和registry.conf,并按照demo进行修改service.vgroup。

seata对于一般的分布式事务还可以,但是对于大并发量就不行。

延时队列

延时队列通过暂时存放,接下来空闲的时候慢慢处理,能够很好的削峰,比如秒杀的时候,超大流量引入,我们可以写入到队列中。然后订单服务慢慢悠悠地去取出来,达成效果。消息队列上面已经讲过了,它支持延时队列(在队列里逗留一会再走),可以有效达成我们的目标。

秒杀

秒杀有以下几个要求:

  1. 服务单一职责+独立部署。这意味着,当你的秒杀服务挂掉的时候,别的服务不受影响。
  2. 秒杀链接加密。只有当秒杀时间到了才会开放,才可以从服务器端获取到一个token,只有拥有了token才有资格来秒杀
  3. 库存预热+快速扣减。我们把商品加入到redis里面,通过信号量来控制秒杀的商品数量。
  4. 动静分离。一直以来都遵循这个原则。
  5. 恶意请求拦截。通过网关来做这一层。
  6. 流量错峰。最简单的就是加一个验证码,因为大家的验证码各不相同,输入速度也不同,可以稍微错开一点。
  7. 限流,熔断和降级。为了有效防止服务崩溃。
  8. 队列削峰。

Sentinel

  1. 引入依赖
1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
  1. 下载对应版本的dashboard(推荐),并启动
  2. 配置相关地址:
1
2
spring.cloud.sentinel.transport.dashboard=localhost:8333
spring.cloud.sentinel.transport.port=8719

就可以开心使用了。

  1. 导入依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
  1. 配置文件
1
management.endpoints.web.exposure.include=*
  1. 自定义sentinel的返回数据。

链路追踪

  1. 导入依赖
1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
  1. 打开日志
1
2
logging.level.org.springframework.cloud.openfeign: debug
logging.level.org.springframework.cloud.sleuth: debug
  1. 可以通过zipkin来做可视化处理。