Curator Framework使用(二)

进一步了解Framework

Curator Framework 提供了一套高级的API, 简化了ZooKeeper的操作,在原来基础上添加了许多新特性,更好的处理集群中连接管理,重试操作。下面列出一些Features:

  • 自动连接管理
    • 存在一些潜在的错误,需要zk客户端重新创建connection或者重试。Curator可以自动的,透明的(大部分情况)处理这些情况。
    • 监听NodeDataChanged事件并根据需要调用updateServerList方法。
    • 监听可以被Curator recipes自动移除。
  • 简洁的API
    • 简化原生zk的方法,事件等
    • 提供了Fluent接口
  • 食谱实现
    • Leader election
    • Shared lock
    • Path cache and watcher
    • Distributed Queue
    • Distributed Priority Queue

初始化

在上一篇中,简单提到了,通过CuratorFrameworkFactory可以获取一个CuratorFramework,有两种方式生成:工厂方法(newClient())和Builder方式。

注意:CuratorFramework实例是线程安全的,客户在应用中共享。

前面也说了,获得CuratorFramework实例后调用start。在应用关闭或者结束时调用close。(shutdown hook 钩子)

1
2
3
4
client.create().forPath("/head", new byte[0]);
client.delete().inBackground().forPath("/head");
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
client.getData().watched().inBackground().forPath("/test");

Framework方法

方法名 描述
create() 创建操作,调用额外的方法(mode或background),结尾调用forPath()
delete() 删除操作,调用额外的方法(version或background),结尾调用forPath()
checkExists() 判断节点是否存在,调用额外的方法(watch或background),结尾调用forPath()
getData() 获取节点数据,调用额外的方法(watch,background或get stat),结尾调用forPath()
setData() 设置节点,调用额外的方法(version或background),结尾调用forPath()
getChildren() 获取节点下子节点的List,调用额外的方法(watch,background或get stat),结尾调用forPath()
transactionOp() 用于分配与transaction()一起使用的操作。
transaction() 把一堆操作作为一个事务提交
getACL() 返回一个节点的ACL设置,结尾调用forPath()
setACL() 设置一个节点的ACL,结尾调用forPath()

Framework事件通知

通知和监听都是通过接口CuratorListener发布出去的,可以调用addListener()方法,注册一个listener。

实现一个listener需要实现方法eventReceived,当一个后台操作完成或者一个监听被触发,就检查给定的event事件内容。

1
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
CuratorEvent

CuratorEvent是个大集合,包含所有的后台通知事件和触发的监听,可以通过getType()获取Event类型CuratorEventType,枚举项对应这CuratorFramework中的各种操作,具体用什么属性,就是根据这个类型决定的。下面列举一下,事件类型和相关属性关系。

Event Type Event Methods
CREATE getResultCode() and getPath()
DELETE getResultCode() and getPath()
EXISTS getResultCode(), getPath() and getStat()
GET_DATA getResultCode(), getPath(), getStat() and getData()
SET_DATA getResultCode(), getPath() and getStat()
CHILDREN getResultCode(), getPath(), getStat(), getChildren()
SYNC getResultCode(), getStat()
GET_ACL getResultCode(), getACLList()
SET_ACL getResultCode()
TRANSACTION getResultCode(), getOpResults()
WATCHED getWatchedEvent()
GET_CONFIG getResultCode(), getData()
RECONFIG getResultCode(), getData()

命名空间

应为zookeeper集群是一个共享的环境,所以使用命名空间,可以让各个应用避免ZK的路径冲突。

在初始化CuratorFramework实例的时候,通过Builder设置namespace。

临时连接

临时的CuratorFramework实例,是指在网络不好的时候,通过单次请求操作Zookeeper。而且CuratorTempFramework提供的APIs是有限的,并且,这个临时连接,将在一段时间不活跃后关闭。有个人提出的idea http://www.elidedbranches.com/2012/12/building-global-highly-available.html

实例化一个CuratorTempFramework,跟普通的实例一样,也是通过CuratorFrameworkFactory创建,但是调用的不是build(),是buildTemp()buildTemp() 创建了一个CuratorFrameworkFactory实例,但是它如果不活跃超过3分钟,就会被关闭。还有一个buildTemp可以指定不活跃的时间。

1
2
3
4
5
6
7
8
9
10
//默认是3分钟
private static final long DEFAULT_INACTIVE_THRESHOLD_MS = (int)TimeUnit.MINUTES.toMillis(3);
public CuratorTempFramework buildTemp()
{
return buildTemp(DEFAULT_INACTIVE_THRESHOLD_MS, TimeUnit.MILLISECONDS);
}
public CuratorTempFramework buildTemp(long inactiveThreshold, TimeUnit unit)
{
return new CuratorTempFrameworkImpl(this, unit.toMillis(inactiveThreshold));
}

看一下都有那些API,就三个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface CuratorTempFramework extends Closeable
{
/**
* Stop the client
*/
public void close();

/**
* Start a transaction builder
*
* @return builder object
* @throws Exception errors
*/
public CuratorTransaction inTransaction() throws Exception;

/**
* Start a get data builder
*
* @return builder object
* @throws Exception errors
*/
public TempGetDataBuilder getData() throws Exception;
}
坚持技术分享,您的支持将鼓励我继续创作!