1、基于ZooKeeper基本API实现
pom.xml
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.7</version>
</dependency>
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class DistributeLock {private final String connectString = "127.0.0.1:2181";private final int sessionTimeout = 2000;private final ZooKeeper zk;private CountDownLatch connectLatch = new CountDownLatch(1);private CountDownLatch waitLatch = new CountDownLatch(1);private String waitPath;private String currentNode;public DistributeLock() throws IOException, InterruptedException, KeeperException {//获取连接zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {//connectLatch 如果连接上zk,释放if(event.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}// waitLatch 前一个节点释放锁删除后,释放if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {waitLatch.countDown();}}});//等待zk正常连接后,再往下执行程序connectLatch.await();// 判断根节点(/locks)是否存在Stat stat = zk.exists("/locks", false);if (null == stat) {//创建根节点zk.create("/locks", "locking".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}//加锁public void zkLock() {//创建临时节点try {currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//判断创建的节点是否是最小的序号节点,如果是则获取到锁,如果不是则监听它序号前一个节点List<String> children = zk.getChildren("/locks", false);//如果children只有一个值,那就直接获取锁;如果有多个节点,需要判断,谁最小if(children.size() == 1) {return;} else {Collections.sort(children);//获得节点名称 seq-00000000String thisNode = currentNode.substring("/locks/".length());int index = children.indexOf(thisNode);if(index == -1) {System.out.println("数据异常");} else if (index == 0) {//就一个节点,直接获取锁return;} else {//需要监听它前一个节点变化waitPath = "/locks/" + children.get(index - 1);zk.getData(waitPath, true, null);//等待监听waitLatch.await();}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}//解锁public void unZkLock() {try {zk.delete(currentNode, -1);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}
}
测试:
import org.apache.zookeeper.KeeperException;import java.io.IOException;
import java.util.concurrent.TimeUnit;public class DistributeLockTest {public static void main(String[] args) throws InterruptedException, IOException, KeeperException {final DistributeLock lock1 = new DistributeLock();final DistributeLock lock2 = new DistributeLock();new Thread(new Runnable() {@Overridepublic void run() {try {lock1.zkLock();System.out.println(Thread.currentThread().getName()+"==>获得锁");TimeUnit.SECONDS.sleep(10);lock1.unZkLock();System.out.println(Thread.currentThread().getName()+"==>释放锁");} catch (Exception e) {e.printStackTrace();}}}, "线程1").start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.zkLock();System.out.println(Thread.currentThread().getName()+"==>获得锁");TimeUnit.SECONDS.sleep(10);lock2.unZkLock();System.out.println(Thread.currentThread().getName()+"==>释放锁");} catch (Exception e) {e.printStackTrace();}}}, "线程2").start();}
}
2、基于框架curator实现
pom.xml
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.3.0</version>
</dependency>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.TimeUnit;public class CuratorLock {public static void main(String[] args) {//创建分布式锁1final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");//创建分布式锁2final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println(Thread.currentThread().getName()+"==>获得锁");lock1.acquire();System.out.println(Thread.currentThread().getName()+"==>再次获得锁");TimeUnit.SECONDS.sleep(10);lock1.release();System.out.println(Thread.currentThread().getName()+"==>释放锁");lock1.release();System.out.println(Thread.currentThread().getName()+"==>再次释放锁");} catch (Exception e) {e.printStackTrace();}}}, "线程1").start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println(Thread.currentThread().getName()+"==>获得锁");lock2.acquire();System.out.println(Thread.currentThread().getName()+"==>再次获得锁");TimeUnit.SECONDS.sleep(8);lock2.release();System.out.println(Thread.currentThread().getName()+"==>释放锁");lock2.release();System.out.println(Thread.currentThread().getName()+"==>再次释放锁");} catch (Exception e) {e.printStackTrace();}}}, "线程2").start();}private static CuratorFramework getCuratorFramework() {ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(retryPolicy).build();//启动客户端client.start();System.out.println("zookeeper启动成功");return client;}
}