且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

利用PostgreSQL的xmax实现无锁的并发队列处理

更新时间:2022-06-05 02:02:07

PostgreSQL的表里有几个系统隐藏列,xmax是其中一个,某些场景下我们可以利用PostgreSQL的xmax实现无锁的并发更新。本文介绍的消息或者任务队列的应用场景就是一例。

1. 场景和问题

当前台请求很频繁时,我们可能会把由此产生一些耗时而不紧急的任务作为后台作业延后处理,这样前台请求可以得到快速响应。
这些后台作业临时存放在一个表里,我们暂时称其为消息表,然后由后台进程处理这些消息。由于这个后台任务的工作量比较重,有时需要多个进程同时工作。这时需要考虑两个问题。

1)正确性
  消息不能被遗漏也不能被重复处理。
2)并发性能
  多个进程应避免争抢同一个消息。

2. 方案

2.1 方案1

为确保“正确性”,可以在获取消息时,用select ... for update给获得的消息加个锁,处理完把消息它删掉。这样使用select ... for update取消息时,如果该消息已被其它进程锁住它会等待,直到锁住该消息的事务结束,如果这条消息已被删除,那么select ... for update会继续查找下一条消息。
难点是让“多个进程不争抢同一个消息”。为解决这个问题,可以用某种算法对消息划分子集,每个后台进程只处理特定的消息子集。
下面是个例子。

描述
有10个后台进程,每个后台进程分配一个0~9个编号。消息按照id对10取模,取模的值即其对应的后台进程的编号 。

数据定义

点击(此处)折叠或打开

  1. postgres=# create table msg(id int primary key,msg text);
  2. CREATE TABLE
  3. postgres=# insert into msg select id,id::text from generate_series(1,1000000) id;
  4. INSERT 0 1000000

消息处理
以编号为3的后台进程为例。

点击(此处)折叠或打开

  1. postgres=# begin;
  2. BEGIN
  3. postgres=# select id from msg where mod(id,10) = 3 order by id limit 1 for update;
  4.  id
  5. ----
  6.  3
  7. (1 row)

  8. postgres=# (后面的处理略)

2.2 方案2

方案1有很多缺陷。
1)后台进程数必须事先确定
2)每个进程必须提前知道自己的编号
3)不同后台进程的工作量可能不均匀
4)消息的顺序和处理顺序可能不一致

其实利用PostgreSQL特有的隐藏列xmax,可以有一种更好的解决方案。如下
  1. postgres=# begin;
  2. BEGIN
  3. postgres=# select id from msg where xmax = 0 or (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or xmax::text::bigint = txid_current() order by id limit 1 for update;
  4.  id
  5. ----
  6.   2
  7. (1 row)

  8. postgres=# (后面的处理略)

下面解释一下。
xmax代表更新,删除或锁住(使用for update)了该元组的事务。所以当xmax对应的事务还活着,并且这个事务不是自己,那么表示别的事务正在处理这个元组,只要通过where条件跳过这样的元组就可以避免和其他事务发生竞争了。
具体到这条SQL,主要是下面4个用”or“连接起来查询条件。
1)
  1. xmax = 0 or
从逻辑上讲,这个条件也可以不要。2)已经包含了1)的情况,但从性能上考虑还是需要的。
  
2)

  1. (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or
排除事务快照中的所有活动事务。并且由于txid_snapshot_xip()取到的事务快照可能会有滞后,所以对大于等于事务快照的xmax属性的未来事务也统统排除

3)

  1. (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or
条件2)中有个事务比较的逻辑,不过那里简单的使用”事务快照的xmin到xmax+1000之间发生事务回卷的更极端情况就不考虑了,即使在这种情况下,也不会遗漏消息处理,顶多多一次消息争用而已)

4)

  1. xmax::text::bigint = txid_current()
原本以为,事务快照中的活动事务会包含自身事务,但实测发现居然不包含自己。但是进而又发现,自身事务的事务ID可能会比txid_snapshot_xip()取到的事务快照的xmax要大,所以必须要加上这个条件。

3. 验证

下面对方案2进行实测验证。

3.1 环境
测试环境为个人PC上的VMware虚拟机
PC
 CPU:Intel Core i5-3470 3.2G(4核)
 MEM:6GB
 SSD:OCZ-VERTEX4 128GB(VMware虚拟机所在磁盘,非系统盘)
 OS:Win7


VMware虚拟机
 CPU:4核
 MEM:1GB
 OS:CentOS 6.5
 PG:PostgreSQL 9.3.4(shared_buffers = 128MB,其他是默认值)

3.2 数据定义
postgres=# create table msg(id int primary key,msg text);
CREATE TABLE
postgres=# insert into msg select id,id::text from generate_series(1,1000000) id;
INSERT 0 1000000

3.3 消息处理
仅使用简单的消息删除进行测试,通过pgbench查看单并发和多并发时的消息处理性能。

3.3.1 不使用使用xmax

只通过for update加锁防止消息被重复处理。
  1. -bash-4.1$ cat test1.sql
  2. delete from msg where id = (select id from msg order by id limit 1 for update) returning *;

1并发时,tps为641。
  1. -bash-4.1$ pgbench -n -r -c 1 -j 1 -t 1000 -p 5433 -f test1.sql
  2. transaction type: Custom query
  3. scaling factor: 1
  4. query mode: simple
  5. number of clients: 1
  6. number of threads: 1
  7. number of transactions per client: 1000
  8. number of transactions actually processed: 1000/1000
  9. latency average: 0.000 ms
  10. tps = 640.605552 (including connections establishing)
  11. tps = 641.918154 (excluding connections establishing)
  12. statement latencies in milliseconds:
  13.     1.556288    delete from msg where id = (select id from msg order by id limit 1 for update) returning *;

10并发时,tps为1714,比单并发提升2.67倍。
  1. -bash-4.1$ pgbench -n -r -c 10 -j 10 -t 1000 -p 5433 -f test1.sql
  2. transaction type: Custom query
  3. scaling factor: 1
  4. query mode: simple
  5. number of clients: 10
  6. number of threads: 10
  7. number of transactions per client: 1000
  8. number of transactions actually processed: 10000/10000
  9. latency average: 0.000 ms
  10. tps = 1702.379757 (including connections establishing)
  11. tps = 1714.727506 (excluding connections establishing)
  12. statement latencies in milliseconds:
  13.     5.744313    delete from msg where id = (select id from msg order by id limit 1 for update) returning *;

3.3.2 使用xmax

结合使用for update和xmax,既防止消息被重复处理又避免消息争用。

  1. -bash-4.1$ cat test2.sql
  2. delete from msg where id = (select id from msg where xmax = 0 or (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or xmax::text::bigint = txid_current() order by id limit 1 for update) returning *;

1并发时,tps为607。
  1. -bash-4.1$ pgbench -n -r -c 1 -j 1 -t 1000 -p 5433 -f test2.sql
  2. transaction type: Custom query
  3. scaling factor: 1
  4. query mode: simple
  5. number of clients: 1
  6. number of threads: 1
  7. number of transactions per client: 1000
  8. number of transactions actually processed: 1000/1000
  9. latency average: 0.000 ms
  10. tps = 607.107038 (including connections establishing)
  11. tps = 607.815535 (excluding connections establishing)
  12. statement latencies in milliseconds:
  13.     1.643752    delete from msg where id = (select id from msg where xmax = 0 or (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or xmax::text::bigint = txid_current() order by id limit 1 for update) returning *;

10并发时,tps为2276,比单并发提升3.75倍。因为CPU是4核,所以这个提升幅度已经比较不错了。
虽然看似用了xmax后提升也不算太大(2276/1714=1.33倍),但是这个测试的消息处理只是简单的delete,如果处理复杂了,无锁方案的威力就更能显现出来了。
  1. -bash-4.1$ pgbench -n -r -c 10 -j 10 -t 1000 -p 5433 -f test2.sql
  2. transaction type: Custom query
  3. scaling factor: 1
  4. query mode: simple
  5. number of clients: 10
  6. number of threads: 10
  7. number of transactions per client: 1000
  8. number of transactions actually processed: 10000/10000
  9. latency average: 0.000 ms
  10. tps = 2268.052507 (including connections establishing)
  11. tps = 2276.312206 (excluding connections establishing)
  12. statement latencies in milliseconds:
  13.     4.350373    delete from msg where id = (select id from msg where xmax = 0 or (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or xmax::text::bigint = txid_current() order by id limit 1 for update) returning *;

注1)以上测试的前后,通过执行"select count(*) from msg",验证被删除的记录数和执行的事务数相同,即没有发生同一个消息被2个进程重复处理的情况。
注2)如果要一次处理一批消息,可以修改limit值,并把delete语句中的"id = ..."改成"id in ..."
  1. delete from msg where id in (select id from msg where xmax = 0 or (xmax::text::bigint not in (select txid_snapshot_xip(txid_current_snapshot())) and xmax::text::bigint txid_snapshot_xmax(txid_current_snapshot())) or (xmax::text::bigint > txid_snapshot_xmax(txid_current_snapshot()) + 1000) or xmax::text::bigint = txid_current() order by id limit 10 for update) returning *;


4. 总结

虽然有观点认为PostgreSQL的MVCC实现机制不如那个谁谁谁的好,但是活用好PG的MVCC,比如由MVCC带来的隐藏列xmax,有时也会带来意想不到的惊喜。