Curator-Locks(六)

摘要

前面单独分享给分布式锁的几种实现方式分布式锁,里面也提到了如何使用Zookeeper来现实分布式锁。

这里Curator提供了几种:

  • 可重入锁(Shared Reentrant Lock)
  • 不可重入锁(Shared Lock)
  • 可重入读写锁(Shared Reentrant Read Write Lock)
  • 信号量(Shared Semaphore)
  • 多锁(Muti Shared Lock)

####可重入锁(Shared Reentrant Lock)

可重入语义跟JDK的ReentrantLock相同,全局同步意味着任何两个客户端不可能同时持有同一把锁。

InterProcessMutex

用法

创建InterProcessMutex

1
2
3
4
5
public InterProcessMutex(CuratorFramework client,
String path)
Parameters:
client - client
path - the path to lock

获取锁

1
2
3
public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()
1
2
3
4
5
6
7
8
9
public boolean acquire(long time,
TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can
call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:true if the mutex was acquired, false if not

调用 release()方法释放锁

1
2
public void release()
Perform one release of the mutex if the calling thread is the same thread that acquired it. If the thread had made multiple calls to acquire, the mutex will still be held when this method returns.

注意:InterProcessMutex实例是可以重用的,不要每次都新建一个实例,用同一个就行。

Revoking

InterProcessMutex支持Zookeeper recipes wiki上描述的可协商撤销机制

用下面方法可以将锁设置成可撤销的:

1
2
3
4
public void makeRevocable(RevocationListener<T> listener)
Make the lock revocable. Your listener will get called when another process/thread wants you to release the lock. Revocation is cooperative.
Parameters:
listener - the listener

当其他线程想让你是释放锁的时候,listener会被调用。

调用下面方法撤销:

1
2
3
4
5
6
7
8
9
public static void attemptRevoke(CuratorFramework client,
String path)
throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()
错误处理

强烈推荐注册ConnectionStateListener,监听SUSPENDED和LOST状态变化,如果SUSPEDNED状态被报告,你不能确信无疑的认为你还持有锁,除非后面收到RECONNECTED状态。如果是LOST状态被报告,可以确切的说,你不再持有锁了。

不可重入锁(Share Lock)

与上面的锁相似,区别是不可重入。

InterProcessSemaphoreMutex

用法

创建InterProcessSemaphoreMutex

1
2
3
4
5
public InterProcessSemaphoreMutex(CuratorFramework client,
String path)
Parameters:
client - client
path - the path to lock

获取锁

1
2
3
4
5
6
7
8
9
10
11
public void acquire()
Acquire the mutex - blocking until it's available. Must be balanced by a call to release().

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Must be balanced by a call to release().

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

释放锁

1
2
public void release()
Perform one release of the mutex if the calling thread is the same thread that acquired it.
错误处理

强烈推荐注册ConnectionStateListener,监听SUSPENDED和LOST状态变化,如果SUSPEDNED状态被报告,你不能确信无疑的认为你还持有锁,除非后面收到RECONNECTED状态。如果是LOST状态被报告,可以确切的说,你不再持有锁了。

可重入读写锁

这是中跨JVM使用的可重入读写锁,通过zookeeper来保持锁。所有在jvm中,用相同锁路径的进程都将获取进程临界区。而且这个锁是公平的,zk会让每个用户将按请求顺序获得锁。

读写锁保持一对相关锁,一个读锁,一个写锁,只要没有写锁,读锁可以有多次获取。写锁是独占的。

重入的语义是可以允许读取锁或者写锁,可以重入的获取读或者写锁。

在写锁没有释放前,不允许非重入读,另外,获取写锁可以再获取读锁(锁的降级),但是获取了读锁,再获取写锁是不会成功的(锁的升级)。

  • InterProcessReadWriteLock
  • InterProcessLock
用法

创建InterProcessReadWriteLock

1
2
3
4
public InterProcessReadWriteLock(CuratorFramework client, String basePath)
Parameters:
client - the client
basePath - path to use for locking

获取锁跟前面一样调用acquire()方法

1
2
public InterProcessLock readLock()
public InterProcessLock writeLock()
错误处理

强烈推荐注册ConnectionStateListener,监听SUSPENDED和LOST状态变化,如果SUSPEDNED状态被报告,你不能确信无疑的认为你还持有锁,除非后面收到RECONNECTED状态。如果是LOST状态被报告,可以确切的说,你不再持有锁了。

信号量(Shared Semaphore)

计数信号量也是跨JVM执行的。使用相同锁定路径的JVM中的所有进程,都将实现一个进程间的租约集合。而且,信号量也是公平的,用户也是根据请求顺序获取一个租约。也就是这里的lease。

有两种模式来确定信号量的最大租约数量,第一种是由给定路径的用户维护,第二种是使用SharedCountReader来确定信号量大小。如果不用SharedcountReader,不会进行内部检查。所以要保证所有进程的实例使用相同numberOfLeases值。

或许租约对象后,记得要关闭释放,否则租约会丢失泄露,但是如果客户端因为会话丢失,客户端持有的租约会自动关闭。

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader
用法

创建InterProcessSemaphoreV2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public InterProcessSemaphoreV2(CuratorFramework client,
String path,
int numberOfLeases)
Parameters:
client - client
path - the path to lock
numberOfLeases - the number of leases allowed by this semaphore

public InterProcessSemaphoreV2(CuratorFramework client,
String path,
SharedCountReader count)
Parameters:
client - the client
path - path for the semaphore
count - the shared count to use for the max leases

获取信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public Lease acquire()
Acquire a lease. If no leases are available, this method blocks until either the maximum number of
leases is increased or another client/process closes a lease.
The client must close the lease when it is done with it. You should do this in a finally block.
Returns:
the new lease

public Collection<Lease> acquire(int qty)
Acquire qty leases. If there are not enough leases available, this method blocks until either the
maximum number of leases is increased enough or other clients/processes close enough leases.
The client must close the leases when it is done with them. You should do this in a finally block.
NOTE: You can use returnAll(Collection) for this.
Parameters:
qty - number of leases to acquire
Returns:
the new leases

public Lease acquire(long time,TimeUnit unit)
Acquire a lease. If no leases are available, this method blocks until either the maximum number of
leases is increased or another client/process closes a lease. However, this method will only block
to a maximum of the time parameters given.
The client must close the lease when it is done with it. You should do this in a finally block.
Parameters:
time - time to wait
unit - time unit
Returns:
the new lease or null if time ran out


public Collection<Lease> acquire(int qty, long time, TimeUnit unit)
Acquire qty leases. If there are not enough leases available, this method blocks until either the
maximum number of leases is increased enough or other clients/processes close enough leases. However,
this method will only block to a maximum of the time parameters given. If time expires before all
leases are acquired, the subset of acquired leases are automatically closed.
The client must close the leases when it is done with them. You should do this in a finally block.
NOTE: You can use returnAll(Collection) for this.
Parameters:
qty - number of leases to acquire
time - time to wait
unit - time unit

当然你可以直接调用Lease的close的方法释放,也可以调用下面方法:

1
2
public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)
错误处理

强烈推荐注册ConnectionStateListener,监听SUSPENDED和LOST状态变化,如果SUSPEDNED状态被报告,你不能确信无疑的认为你还持有锁,除非后面收到RECONNECTED状态。如果是LOST状态被报告,可以确切的说,你不再持有锁了。

多锁对象(Multi Shared Lock)

这是一个管理多个锁的容器,看起来像一个实体。当调用 acquire() ,所有的锁都被 acquired ,如果失败,所有的锁都会被是否。相同的,如果 调用 release(),所有的锁将被释放(忽略失败)。看起来像一个事务,所有的锁在一个事务里面。

  • InterProcessMultiLock
  • InterProcessLock
用法

创建InterProcessMultiLock

1
2
3
4
5
6
7
8
9
10
11
public InterProcessMultiLock(List<InterProcessLock> locks)
Creates a multi lock of any type of inter process lock
Parameters:
locks - the locks

public InterProcessMultiLock(CuratorFramework client,
List<String> paths)
Creates a multi lock of InterProcessMutexes
Parameters:
client - client
paths - list of paths to manage in the order that they are to be locked
错误处理

强烈推荐注册ConnectionStateListener,监听SUSPENDED和LOST状态变化,如果SUSPEDNED状态被报告,你不能确信无疑的认为你还持有锁,除非后面收到RECONNECTED状态。如果是LOST状态被报告,可以确切的说,你不再持有锁了。

坚持技术分享,您的支持将鼓励我继续创作!