上节我们完成了:
这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。
之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。
假设有两个订单同时执行,分别有两个机器执行,那么这两个请求就是可以同时执行了,这样就依然出现了超卖的问题。
我们需要使用分布式锁来解决上面出现的问题。
分布式锁的作用就是在整个系统中提供一个全局的、唯一的锁,在分布式系统中每个系统进行相关的操作时都需要获取到该锁,才能够执行相应的操作。
package icu.wzk.zk.demo02; public class LockTest { public static void main(String[] args) { for (int i = 0; i < 10; i ++) { // 启动10个 new Thread(new LockRunnable()).start(); } } static class LockRunnable implements Runnable { @Override public void run() { final ClientTest clientTest = new ClientTest(); clientTest.getLock(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } clientTest.deleteLock(); } } }
package icu.wzk.zk.demo02; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; public class ClientTest { private ZkClient zkClient = new ZkClient("h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181"); String beforeNodePath; String currentNodePath; CountDownLatch countDownLatch = null; public ClientTest() { synchronized (ClientTest.class) { if (!zkClient.exists("/lock")) { zkClient.createPersistent("/lock"); } } } public boolean tryGetLock() { if (null == currentNodePath || currentNodePath.isEmpty()) { currentNodePath = zkClient.createEphemeralSequential("/lock/", "lock"); } final List childs = zkClient.getChildren("/lock"); Collections.sort(childs); final String minNode = childs.get(0); if (currentNodePath.equals("/lock/" + minNode)) { return true; } else { final int i = Collections.binarySearch(childs, currentNodePath.substring("/lock/".length())); String lastNodeChild = childs.get(i - 1); beforeNodePath = "/lock/" + lastNodeChild; } return false; } public void waitForLock() { final IZkDataListener iZkDataListener = new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { // } @Override public void handleDataDeleted(String dataPath) throws Exception { countDownLatch.countDown(); } }; zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener); if (zkClient.exists(beforeNodePath)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(beforeNodePath, iZkDataListener); } public void deleteLock() { if (zkClient != null) { zkClient.delete(currentNodePath); zkClient.close(); } } public void getLock() { final String threadName = Thread.currentThread().getName(); if (tryGetLock()) { System.out.println(threadName + ": 获取到了锁!"); } else { System.out.println(threadName + ": 没有获取到锁!"); waitForLock(); // 自己调用自己 getLock(); } } }