【MyDB】4-VersionManager 之 4-VM的实现
- VM 的实现
- VM(VersionManager)的基本定义与实现优化
- 具体功能实现
- begin()开启事务
- commit()提交事务
- abort 中止事务
- read 读取uid对应的数据记录所在的entry
- insert方法,插入数据
- delete方法
VM 的实现
本章代码位于:top/xianghua/mydb/server/vm/VersionManagerImpl.java
top/xianghua/mydb/server/vm/VersionManager.java
VM(VersionManager)的基本定义与实现优化
接下来,在介绍完了VM所依赖的记录版本,事务隔离以及死锁检测后,我们来到了最终VersionManager的实现。
VM 层通过 VersionManager
接口,向上层提供用于管理事务和数据操作的基本功能。包括事务的开始,提交,回滚以及数据的插入,读取和删除,VersionManager
的基本定义如下:
public interface VersionManager {byte[] read(long xid, long uid) throws Exception;long insert(long xid, byte[] data) throws Exception;boolean delete(long xid, long uid) throws Exception;long begin(int level);void commit(long xid) throws Exception;void abort(long xid);
}
同时,VM 的实现类还被设计为 Entry 的缓存,需要继承 AbstractCache<Entry>
。需要实现的获取到缓存和从缓存释放的方法很简单:
@Override
protected Entry getForCache(long uid) throws Exception {Entry entry = Entry.loadEntry(this, uid);if(entry == null) {throw Error.NullEntryException;}return entry;
}@Override
protected void releaseForCache(Entry entry) {entry.remove();
}
具体功能实现
begin()开启事务
begin()
开启一个事务,并初始化事务的结构,将其存放在 activeTransaction 中,用于检查和快照使用:
@Override/*** 根据传入的隔离级别开启一个事务* level:0表示读已提交,1表示可重复读*/public long begin(int level) {lock.lock();try {long xid = tm.begin(); // 事务管理器开启一个事务,返回一个xidTransaction t = Transaction.newTransaction(xid, level, activeTransaction); // 创建一个事务对象activeTransaction.put(xid, t); // 将事务对象放入活跃事务集合中return xid; // 返回事务id} finally {lock.unlock();}}
commit()提交事务
commit()
方法提交一个事务,主要就是 free 掉相关的结构,并且释放持有的锁,并修改 TM 状态:
-
获取事务对象
-
将事务对象从活跃事务集合中移除
-
从LockTable中释放该事务占用的资源,并提交事务
/*** 提交事务* @param xid* @throws Exception*/@Overridepublic void commit(long xid) throws Exception {// 1. 获取事务对象lock.lock();Transaction t = activeTransaction.get(xid); // 获取事务对象lock.unlock();try {if(t.err != null) {throw t.err;}} catch(NullPointerException n) {System.out.println(xid);System.out.println(activeTransaction.keySet());Panic.panic(n);}// 2. 将事务对象从活跃事务集合中移除lock.lock();activeTransaction.remove(xid); // 将事务对象从活跃事务集合中移除lock.unlock();// 3. 从LockTable中释放该事务占用的资源,并提交事务lt.remove(xid); // 将事务对象从LockTable中移除tm.commit(xid); // 事务管理器提交事务}
abort 中止事务
abort 事务的方法则有两种,手动和自动。手动指的是调用 abort() 方法,而自动,则是在事务被检测出出现死锁时,会自动撤销回滚事务;或者出现版本跳跃时,也会自动回滚:
- 从activeTransaction中根据xid获取事务对象
- 从LockTable中释放该事务占用的资源,并调用事务管理器终止事务
@Overridepublic void abort(long xid) {internAbort(xid, false);}private void internAbort(long xid, boolean autoAborted) {lock.lock();Transaction t = activeTransaction.get(xid);if(!autoAborted) {activeTransaction.remove(xid);}lock.unlock();if(t.autoAborted) return;lt.remove(xid);tm.abort(xid);
}
read 读取uid对应的数据记录所在的entry
read()
方法读取一个 entry,注意判断下可见性即可:
/*** 读取数据* @param xid* @param uid* @return* @throws Exception*/@Overridepublic byte[] read(long xid, long uid) throws Exception {// 1. 获取事务对象lock.lock();Transaction t = activeTransaction.get(xid);lock.unlock();if(t.err != null) {throw t.err;}// 2. 尝试从缓存中获取对应的entryEntry entry = null;try {entry = super.get(uid);} catch(Exception e) {if(e == Error.NullEntryException) {return null;} else {throw e;}}// 3. 判断事务是否可见try {if(Visibility.isVisible(tm, t, entry)) {return entry.data();} else {return null;}} finally {entry.release();}}
insert方法,插入数据
insert()
则是将数据包裹成 Entry,无脑交给 DM 插入即可:
@Override
public long insert(long xid, byte[] data) throws Exception {lock.lock();Transaction t = activeTransaction.get(xid);lock.unlock();if(t.err != null) {throw t.err;}byte[] raw = Entry.wrapEntryRaw(xid, data);return dm.insert(xid, raw);
}
delete方法
delete()
方法看起来略为复杂:
@Overridepublic boolean delete(long xid, long uid) throws Exception {// 1. 获取事务对象lock.lock();Transaction t = activeTransaction.get(xid);lock.unlock();// 2. 尝试从缓存中获取对应的entryif(t.err != null) {throw t.err;}Entry entry = null;try {entry = super.get(uid);} catch(Exception e) {if(e == Error.NullEntryException) {return false;} else {throw e;}}// 3. 判断事务是否可见try {// 3.1 如果事务不可见,则返回falseif(!Visibility.isVisible(tm, t, entry)) {return false;}// 3.2 如果事务可见,则尝试获取锁Lock l = null;try {// 尝试获取指定 xid 和 uid 的锁l = lt.add(xid, uid);} catch(Exception e) {// 如果加锁失败,则抛出异常,并回滚事务t.err = Error.ConcurrentUpdateException;internAbort(xid, true);t.autoAborted = true;throw t.err;}// 3.3 如果成功获取到锁,锁定并立即解锁if(l != null) {l.lock();l.unlock();}// 3.4 如果 entry 的 Xmax 等于当前事务的 xid,说明该 entry 已经被当前事务删除,返回 falseif(entry.getXmax() == xid) {return false;}// 3.5 检查是否发生版本跳过,如果发生版本跳过,则抛出异常if(Visibility.isVersionSkip(tm, t, entry)) {t.err = Error.ConcurrentUpdateException;internAbort(xid, true);t.autoAborted = true;throw t.err;}// 3.6 设置 entry 的 Xmax 为当前事务的 xid,并返回 trueentry.setXmax(xid);return true;} finally {// 4. 释放锁entry.release();}}t.autoAborted = true;throw t.err;}// 3.6 设置 entry 的 Xmax 为当前事务的 xid,并返回 trueentry.setXmax(xid);return true;} finally {// 4. 释放锁entry.release();}}