使用ZooKeeper实现Java跨JVM的分布式锁(优化构思)

2023-04-28,,

一、使用ZooKeeper实现Java跨JVM的分布式

二、使用ZooKeeper实现Java跨JVM的分布式锁(优化构思)

三、使用ZooKeeper实现Java跨JVM的分布式锁(读写锁)

说明:这篇文章是基于 使用ZooKeeper实现Java跨JVM的分布式锁 的,没有阅读的朋友请先阅读前面的文章后在阅读本文。

上一篇文章中介绍了如何使用分布式锁,并且对原来的公平锁进行了扩展,实现了非公平锁,已经能够满足大部分跨进程(JVM)锁的需求了。

问题:我们都知道在单个JVM内部实现锁的机制很方便,Java也提供了很丰富的API可以实现,例如Synchronized关键字, ReentrantLock等等,但是在集群环境中,都是多个JVM协同工作,当需要一些全局锁时就要用到上面介绍的分布式锁了,但是这种锁的缺点在于每次客户端(这里说的客户端可以理解为不同JVM当中的线程)需要获取锁时都需要与zook服务端交互,创建znode,等待着自己获取锁,这种网络通信无疑会给服务器带来一定的压力,那么我们有没有什么办法来减少这种压力呢?

场景:有一种很常见的场景就是更新缓存,那么我们一般的处理逻辑如下。

1、 首选根据key获取资源,如果资源存在,用之。

2、如果不存在,则申请获取锁(使用共享锁)。

3、获取到锁以后,再次判断资源是否存在(防止重复更新),如果存在说明已经有人更新了,方法退出,否则更新缓存。

4、释放锁。

假设现在有10(1-10)个线程同时执行上诉逻辑,如果资源不存在,那么它们全部会执行第(2)步获取锁,在同一时刻,只会有1个线程获取锁,其它9个线程阻塞,等待获取锁。现在我们假设线程1获取到锁,开始执行(3-4)步动作,在第(3步)当中,再次判断资源是否存在,(肯定不存在因为它是第一个进去的),所以它负责加载资源放入缓存,然后释放锁, 再说其它线程(2-10)它们依次获取到锁,然后执行(3,4)动作,再次判断资源是否存在(已经存在了,因为1号线程已经放进去了),所以他们直接退出,释放锁。由此可见只有1号线程获取锁是有意义的,但是它们都需要与zook进行网络通讯,因此会给网络带来压力。

如果说我们有A,B 二台服务器进行集群,这10个线程获取锁的请求分别来自这2台服务器,例如(1-5)号线程来自A服务器, (6-10)号线程来自B服务器,那么它们与zk交互了10次,创建10个znode来申请锁,但是如果我们进行一定的优化,它们只要与zk交互2次就够了,我们来把上面的逻辑改造一下。

1、 首选根据key获取资源,如果资源存在,用之。

2、如果不存在,则申请获取锁(JVM进程内的锁)。

3、获取到锁(JVM进程内的锁),再次判断资源是否存在,如果资源存在就退出,没啥好所的。

4、如果资源不存在,则申请获取锁(分布式锁)。

5、获取到锁(分布式锁)再次判断资源是否存在(防止重复更新),如果存在说明已经有人更新了,方法退出,否则更新缓存。

6、释放分布式锁,释放JVM进程内的锁。

代码:我把实现逻辑都放在一起了,方便给大家演示,如果有逻辑错误欢迎大家留言指正。

    package com.framework.code.demo.zook;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.ReentrantLock;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import com.framework.code.demo.zook.lock.NoFairLockDriver;
    public class Main {
    //我们用一个static的map模拟一个第三方独立缓存
    public static Map<String, Object> redis = new HashMap<String, Object>();
    public static final String key = "redisKey";
    public static void main(String[] args) throws InterruptedException {
    //创建俩个对象分别模拟2个进程
    RedisProcess processA = new RedisProcess();
    RedisProcess processB = new RedisProcess();
    //每个进程别分用50个线程并发请求
    ExecutorService service = Executors.newFixedThreadPool(100);
    for (int i = 0; i < 50; i++) {
    service.execute(processA);
    service.execute(processB);
    }
    service.shutdown();
    service.awaitTermination(30, TimeUnit.SECONDS);
    }
    public static class RedisProcess implements Runnable {
    CuratorFramework client;
    //ZK分布式锁
    InterProcessMutex distributedLock;
    //JVM内部锁
    ReentrantLock jvmLock;
    public RedisProcess() {
    client = CuratorFrameworkFactory.newClient("192.168.1.18:2181",
    new ExponentialBackoffRetry(1000,3));
    client.start();
    distributedLock = new InterProcessMutex(client,"/mylock", new NoFairLockDriver());
    jvmLock = new ReentrantLock();
    }
    @Override
    public void run() {
    //(1)首先判断缓存内资源是否存在
    if(redis.get(key) == null) {
    try {
    //这里延时1000毫秒的目的是防止线程过快的更新资源,那么其它线程在步骤(1)处就返回true了.
    Thread.sleep(1000);
    //获取JVM锁(同一进程内有效)
    jvmLock.lock();
    //(2)再次判断资源是否已经存在
    if(redis.get(key) == null) {
    System.out.println("线程:" + Thread.currentThread() + "获取到JVM锁,redis.get(key)为空, 准备获取ZK锁");
    //这里延时500毫秒的目的是防止线程过快更新资源,其它线程在步骤(2)就返回true了。
    Thread.sleep(500);
    try {
    //获取zk分布式锁
    distributedLock.acquire();
    System.out.println("线程:" + Thread.currentThread() + "获取到JVM锁,redis.get(key)为空, 获取到了ZK锁");
    //再次判断,如果为空这时可以更新资源
    if(redis.get(key) == null) {
    redis.put(key, Thread.currentThread() + "更新了缓存");
    System.out.println("线程:" + Thread.currentThread() + "更新了缓存");
    } else {
    System.out.println("线程:" + Thread.currentThread() + "当前资源已经存在,不需要更新");
    }
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    //释放ZK锁
    try {
    distributedLock.release();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    } else {
    System.out.println("线程:" + Thread.currentThread() + "获取到JVM锁,redis.get(key)不为空," + redis.get(key));
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    //释放JVM锁
    jvmLock.unlock();
    }
    } else {
    System.out.println(redis.get(key));
    }
    }
    }
    }

线程:Thread[pool-5-thread-2,5,main]获取到JVM锁,redis.get(key)为空, 准备获取ZK锁
线程:Thread[pool-5-thread-3,5,main]获取到JVM锁,redis.get(key)为空, 准备获取ZK锁
线程:Thread[pool-5-thread-3,5,main]获取到JVM锁,redis.get(key)为空, 获取到了ZK锁
线程:Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-7,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-1,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-5,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-9,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-23,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-19,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-11,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-31,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-35,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-15,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-27,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-25,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-33,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-37,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-13,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-17,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-2,5,main]获取到JVM锁,redis.get(key)为空, 获取到了ZK锁
线程:Thread[pool-5-thread-2,5,main]当前资源已经存在,不需要更新
线程:Thread[pool-5-thread-21,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-29,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-55,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-59,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-41,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-67,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-39,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-43,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-57,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-47,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-51,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-63,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-8,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-69,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-4,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-6,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-10,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-22,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-16,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-20,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-45,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-24,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-32,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-36,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-49,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-28,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-12,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-14,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-26,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-53,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-18,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-61,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-30,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-65,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-34,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-97,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-40,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-91,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-64,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-42,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-46,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-50,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-87,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-85,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-44,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-75,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-48,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-71,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-77,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-52,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-99,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-93,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-56,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-60,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-95,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-89,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-81,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-73,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-68,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-58,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-62,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-66,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-38,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-54,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-94,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-83,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-96,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-79,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-92,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-90,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-80,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-82,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-72,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-78,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-100,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-70,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-88,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-84,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-98,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-86,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-76,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存
线程:Thread[pool-5-thread-74,5,main]获取到JVM锁,redis.get(key)不为空,Thread[pool-5-thread-3,5,main]更新了缓存

我们通过观察日志,发现只有2个次需要获取分布式锁,其它的都被JVM锁给阻挡在外面了,在这种情况下可以大大的提高锁的性能。

使用ZooKeeper实现Java跨JVM的分布式锁(优化构思)的相关教程结束。

《使用ZooKeeper实现Java跨JVM的分布式锁(优化构思).doc》

下载本文的Word格式文档,以方便收藏与打印。