更新时间:2021-12-05 02:16:52
谢桂起(花名:渊渱) 2020年毕业后加入阿里云,一直从事RDS PostgreSQL相关工作,善于解决线上各类RDS PostgreSQL运维管控相关问题。
前阵子某个客户反馈他的RDS PostgreSQL无法写入,报错信息如下:
postgres=# select * from test; id ---- (0 rows) postgres=# insert into test select 1; ERROR: database is not accepting commands to avoid wraparound data loss in database "xxxx" HINT: Stop the postmaster and vacuum that database in single-user mode. You might also need to commit or roll back old prepared transactions.
随后RDS工程师介入处理以后,该问题立马得到了解决。
XID(Transaction ID)是 PostgreSQL 内部的事务编号,每个事务都会分配一个XID,依次递增。PostgreSQL 数据中每个元组头部都会保存着 插入 或者 删除 这条元组的XID(Transaction ID),然后内核通过这个 XID 构造数据库的一致性读。在事务隔离级别是 可重复读 的情况下,假设如有两个事务,xid1=200,xid2=201,那么 xid1 中只能看到 t_xmin <= 200 的元组,看不到 t_xmin > 200 的元组。
typedef uint32 TransactionId; /* 事务号定义,32位无符号整数 */ typedef struct HeapTupleFields { TransactionId t_xmin; /* 插入该元组的事务号 */ TransactionId t_xmax; /* 删除或锁定该元组的事务号 */ /*** 其它属性省略 ***/ } HeapTupleFields; struct HeapTupleHeaderData { union { HeapTupleFields t_heap; DatumTupleFields t_datum; } t_choice; /*** 其它属性省略 ***/ };
从上面结构中我们可以看到,XID 是一个32位无符号整数,也就是 XID 的范围是 0到2^32-1;那么超过了 2^32-1的事务怎么办呢?其实 XID 是一个环,超过了 2^32-1 之后又会从头开始分配。通过源代码也证明了上述结论:
// 无效事务号 #define InvalidTransactionId ((TransactionId) 0) // 引导事务号,在数据库初始化过程(BKI执行)中使用 #define BootstrapTransactionId ((TransactionId) 1) // 冻结事务号用于表示非常陈旧的元组,它们比所有正常事务号都要早(也就是可见) #define FrozenTransactionId ((TransactionId) 2) // 第一个正常事务号 #define FirstNormalTransactionId ((TransactionId) 3) // 把 FullTransactionId 的低32位作为无符号整数生成 xid #define XidFromFullTransactionId(x) ((uint32) (x).value) static inline void FullTransactionIdAdvance(FullTransactionId *dest) { dest->value++; while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId) dest->value++; } FullTransactionId GetNewTransactionId(bool isSubXact) { /*** 省略 ***/ full_xid = ShmemVariableCache->nextFullXid; xid = XidFromFullTransactionId(full_xid); /*** 省略 ***/ FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid); /*** 省略 *** return full_xid; } static void AssignTransactionId(TransactionState s) { /*** 省略 ***/ s->fullTransactionId = GetNewTransactionId(isSubXact); if (!isSubXact) XactTopFullTransactionId = s->fullTransactionId; /*** 省略 ***/ } TransactionId GetTopTransactionId(void) { if (!FullTransactionIdIsValid(XactTopFullTransactionId)) AssignTransactionId(&TopTransactionStateData); return XidFromFullTransactionId(XactTopFullTransactionId); }
可以看到,新事务号保存在共享变量缓存中:ShmemVariableCache->nextFullXid,每发行一个事务号后,向上调整它的值,并跳过上述三个特殊值。三个特殊仠分别为0、1和2,作用可以看上面代码注释。
前面说到,XID 是一个环,分配到 2^32-1 之后又从 3 开始,那么内核是怎么比较两个事务的大小的呢?比如 xid 经历了这样一个过程 3-> 2^32-1 -> 5,那么内核怎么样知道 5 这个事务在 2^32-1 后面呢?我们再看一下代码:
/* * TransactionIdPrecedes --- is id1 logically < id2? */ bool TransactionIdPrecedes(TransactionId id1, TransactionId id2) { /* * If either ID is a permanent XID then we can just do unsigned * comparison. If both are normal, do a modulo-2^32 comparison. */ int32 diff; if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2)) return (id1 < id2); diff = (int32) (id1 - id2); return (diff < 0); }
可以看到,内核使用了一个比较取巧的方法:(int32) (id1 - id2) < 0,32位有符号整数的取值范围是 -2^31 到 231-1,5-(232-1) 得到的值比 2^31-1 大,所以转换成 int32 会变成负数。但是这里面有一个问题,「最新事务号-最老事务号」 必须小于 2^31,一旦大于就会出现回卷,导致老事务产生的数据对新事务不可见。
前面讲到,「最新事务号-最老事务号」 必须小于 2^31,否则会发生回卷导致老事务产生的数据对新事务不可见,那内核是怎么避免这个问题的呢?内核是这样处理的:通过定期把老事务产生的元组的 XID 更新为 FrozenTransactionId,即更新为2,来回收 XID,而 XID 为2 的元组对所有的事务可见,这个过程称为 XID 冻结,通过这个方式可以回收 XID 来保证 |最新事务号-最老事务号| < 2^31。
除了内核自动冻结回收XID,我们也可以通过命令或者 sql 的方式手动进行 xid 冻结回收
# 查看每个库的年龄 SELECT datname, age(datfrozenxid) FROM pg_database; # 1个库每个表的年龄排序 SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; # 查看1个表的年龄 select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名称.表名称'::regclass::oid;
vacuum freeze 表名;
vacuumdb -d 库名 --freeze --jobs=30 -h 连接串 -p 端口号 -U 库Owner
冻结回收过程是一个重 IO 的操作,这个过程内核会描述表的所有页面,然后把符合要求的元组的 t_xmin 字段更新为 2,所以这个过程需要在业务低峰进行,避免影响业务。
与冻结回收相关的内核参数有三个:vacuum_freeze_min_age、vacuum_freeze_table_age和autovacuum_freeze_max_age,由于笔者对于这三个参数理解不深,就不在这里班门弄斧了,感兴趣的同学可以自行找资料了解一下。
基于上面的原理分析,我们知道,「最新事务号-最老事务号」 = 2^31-1000000,即当前可用的 xid 仅剩下一百万的时候,内核就会禁止实例写入并报错:database is not accepting commands to avoid wraparound data loss in database, 这个时候必须连到提示中的 "xxxx" 对表进行 freeze 回收更多的 XID。
void SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) { TransactionId xidVacLimit; TransactionId xidWarnLimit; TransactionId xidStopLimit; TransactionId xidWrapLimit; TransactionId curXid; Assert(TransactionIdIsNormal(oldest_datfrozenxid)); /* * xidWrapLimit = 最老的事务号 + 0x7FFFFFFF,当前事务号一旦到达xidWrapLimit将发生回卷 */ xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1); if (xidWrapLimit < FirstNormalTransactionId) xidWrapLimit += FirstNormalTransactionId; /* * 一旦当前事务号到达xidStopLimit,实例将不可写入,保留 1000000 的xid用于vacuum * 每 vacuum 一张表需要占用一个xid */ xidStopLimit = xidWrapLimit - 1000000; if (xidStopLimit < FirstNormalTransactionId) xidStopLimit -= FirstNormalTransactionId; /* * 一旦当前事务号到达xidWarnLimit,将不停地收到 * WARNING: database "xxxx" must be vacuumed within 2740112 transactions */ xidWarnLimit = xidStopLimit - 10000000; if (xidWarnLimit < FirstNormalTransactionId) xidWarnLimit -= FirstNormalTransactionId; /* * 一旦当前事务号到达xidVacLimit将触发force autovacuums */ xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age; if (xidVacLimit < FirstNormalTransactionId) xidVacLimit += FirstNormalTransactionId; /* Grab lock for just long enough to set the new limit values */ LWLockAcquire(XidGenLock, LW_EXCLUSIVE); ShmemVariableCache->oldestXid = oldest_datfrozenxid; ShmemVariableCache->xidVacLimit = xidVacLimit; ShmemVariableCache->xidWarnLimit = xidWarnLimit; ShmemVariableCache->xidStopLimit = xidStopLimit; ShmemVariableCache->xidWrapLimit = xidWrapLimit; ShmemVariableCache->oldestXidDB = oldest_datoid; curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid); LWLockRelease(XidGenLock); /* Log the info */ ereport(DEBUG1, (errmsg("transaction ID wrap limit is %u, limited by database with OID %u", xidWrapLimit, oldest_datoid))); /* * 如果 当前事务号>=最老事务号+autovacuum_freeze_max_age * 触发 autovacuum 对年龄最老的数据库进行清理,如果有多个数据库达到要求,按年龄最老的顺序依次清理 * 通过设置标志位标记当前 autovacuum 结束之后再来一次 autovacuum */ if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) && IsUnderPostmaster && !InRecovery) SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER); /* Give an immediate warning if past the wrap warn point */ if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery) { char *oldest_datname; if (IsTransactionState()) oldest_datname = get_database_name(oldest_datoid); else oldest_datname = NULL; if (oldest_datname) ereport(WARNING, (errmsg("database \"%s\" must be vacuumed within %u transactions", oldest_datname, xidWrapLimit - curXid), errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n" "You might also need to commit or roll back old prepared transactions, or drop stale replication slots."))); else ereport(WARNING, (errmsg("database with OID %u must be vacuumed within %u transactions", oldest_datoid, xidWrapLimit - curXid), errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n" "You might also need to commit or roll back old prepared transactions, or drop stale replication slots."))); } } bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2) { int32 diff; if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2)) return (id1 >= id2); diff = (int32) (id1 - id2); return (diff >= 0); } FullTransactionId GetNewTransactionId(bool isSubXact) { /*** 省略 ***/ full_xid = ShmemVariableCache->nextFullXid; xid = XidFromFullTransactionId(full_xid); if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit)) { TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit; TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit; TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit; Oid oldest_datoid = ShmemVariableCache->oldestXidDB; /*** 省略 ***/ if (IsUnderPostmaster && TransactionIdFollowsOrEquals(xid, xidStopLimit)) { char *oldest_datname = get_database_name(oldest_datoid); /* complain even if that DB has disappeared */ if (oldest_datname) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"", oldest_datname), errhint("Stop the postmaster and vacuum that database in single-user mode.\n" "You might also need to commit or roll back old prepared transactions, or drop stale replication slots."))); /*** 省略 ***/ } /*** 省略 ***/ } /*** 省略 ***/ }
# 查看每个库的年龄 SELECT datname, age(datfrozenxid) FROM pg_database; # 1个库每个表的年龄排序 SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; # 查看1个表的年龄 select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名称.表名称'::regclass::oid;
问题解决
# 对指定数据库中年龄最大的前 50 张表进行 vacuum freeze for cmd in `psql -U用户名 -p端口号 -h连接串 -d数据库名 -c "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc offset 50 limit 50;" | grep -v vacuum_cmd | grep -v row | grep vacuum`; do psql -U用户名 -p端口号 -h连接串 -d数据库名 -c "$cmd" done
from multiprocessing import Pool import psycopg2 args = dict(host='pgm-bp10xxxx.pg.rds.aliyuncs.com', port=5432, dbname='数据库名', user='用户名', password='密码') def vacuum_handler(sql): sql_str = "SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc limit 10; " try: conn = psycopg2.connect(**args) cur = conn.cursor() cur.execute(sql) conn.commit() cur = conn.cursor() cur.execute(sql_str) print cur.fetchall() conn.close() except Exception as e: print str(e) # 对指定数据库中年龄最大的前 1000 张表进行 vacuum freeze,32 个进程并发执行 def multi_vacuum(): pool = Pool(processes=32) sql_str = "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc limit 1000;"; try: conn = psycopg2.connect(**args) cur = conn.cursor() cur.execute(sql_str) rows = cur.fetchall() for row in rows: cmd = row['vacuum_cmd'] pool.apply_async(vacuum_handler, (cmd, )) conn.close() pool.close() pool.join() except Exception as e: print str(e) multi_vacuum()
vacuum freeze 会扫描表的所有页面并更新,是一个重 IO 的操作,操作过程中一定要控制好并发数,否则非常容易把实例打挂。
加入我们JOIN US
欢迎加入阿里云数据库RDS团队,这里有大规模高并发的数据库场景,更有丰富的业务场景;欢迎有志之士一起加入阿里云数据库RDS团队,简历投递地址:vogts.wangt@alibaba-inc.com