Redis-Client-Lettuce

lettuce-green-text@2x

Why Lettuce?

Lettuce是一个高性能基于Java编写的Redis驱动框架,底层集成了Project Reactor提供天然的反应式编程,通信框架集成了Netty使用了非阻塞IO5.x版本之后融合了JDK1.8的异步编程特性,在保证高性能的同时提供了十分丰富易用的API,Lettuce 的确有很多优秀的特性,例如:

  • 基于 netty,支持事件模型
  • 支持 同步、异步、响应式 的方式
  • 可以方便的连接 Redis Sentinel
  • 完全支持 Redis Cluster
  • SSL 连接
  • Streaming API
  • CDI 和 Spring 的集成
  • 兼容 Java 8 和 9

重要特性

多线程共享

Jedis 是直连模式,在多个线程间共享一个 Jedis 实例时是线程不安全的,如果想要在多线程环境下使用 Jedis,需要使用连接池,每个线程都去拿自己的 Jedis 实例,当连接数量增多时,物理连接成本就较高了。

Lettuce 是基于 netty 的,连接实例可以在多个线程间共享,所以,一个多线程的应用可以使用一个连接实例,而不用担心并发线程的数量,不会有并发问题。

异步

异步的方式可以让我们更好的利用系统资源,而不用浪费线程等待网络或磁盘I/O。

Lettuce 是基于 netty 的,netty 是一个多线程、事件驱动的 I/O 框架,所以 Lettuce 可以帮助我们充分利用异步的优势。

代码示例

连接

img

使用阻塞的方式读取

img

设置阻塞读取时的超时时间

img

异步方式,当 RedisFuture是完成状态时自动触发后面的动作

img

很好的支持 Redis Cluster

对 Cluster 的支持包括:

  • 支持所有的 Cluster 命令
  • 基于哈希槽的命令路由
  • 对 cluster 命令的高层抽象
  • 在多节点上执行命令
  • 根据槽和地址端口直接连接cluster中的节点
  • SSL和认证
  • cluster 拓扑的更新
  • 发布/订阅

Streaming API

Redis 中可能会有海量的数据,当你获取一个大的数据集合时,有可能会被撑爆,Lettuce 可以让我们使用流的方式来处理。

示例1

img

示例2

img

序列化类

spring-data-redis中序列化类有以下几个:

  • GenericToStringSerializer:可以将任何对象泛化为字符创并序列化
  • Jackson2JsonRedisSerializer:序列化Object对象为json字符创(与JacksonJsonRedisSerializer相同)
  • JdkSerializationRedisSerializer:序列化java对象
  • StringRedisSerializer:简单的字符串序列化

只需要引入单个依赖就可以开始愉快地使用Lettuce

  • Maven
1
2
3
4
5
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
  • Gradle
1
2
3
dependencies {
compile 'io.lettuce:lettuce-core:5.1.8.RELEASE'
}

连接Redis

单机、哨兵、集群模式下连接Redis需要一个统一的标准去表示连接的细节信息,在Lettuce中这个统一的标准是RedisURI。可以通过三种方式构造一个RedisURI实例:

  • 定制的字符串URI语法:
1
RedisURI uri = RedisURI.create("redis://localhost/");
  • 使用建造器(RedisURI.Builder):
1
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
  • 直接通过构造函数实例化:
1
RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

定制的连接URI语法

  • 单机(前缀为redis://
1
2
3
格式:redis://[[email protected]]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:redis://[email protected]:6379/0?timeout=10s
简单:redis://localhost
  • 单机并且使用SSL(前缀为rediss://) <== 注意后面多了个s
1
2
3
格式:rediss://[[email protected]]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:rediss://[email protected]:6379/0?timeout=10s
简单:rediss://localhost
  • 单机Unix Domain Sockets模式(前缀为redis-socket://
1
2
格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]]
完整:redis-socket:///tmp/redis?timeout=10s&_database=0
  • 哨兵(前缀为redis-sentinel://
1
2
格式:redis-sentinel://[[email protected]]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId
完整:redis-sentinel://[email protected]:6379,127.0.0.1:6380/0?timeout=10s#mymaster

超时时间单位:

  • d 天
  • h 小时
  • m 分钟
  • s 秒钟
  • ms 毫秒
  • us 微秒
  • ns 纳秒

个人建议使用RedisURI提供的建造器,毕竟定制的URI虽然简洁,但是比较容易出现人为错误。鉴于笔者没有SSLUnix Domain Socket的使用场景,下面不对这两种连接方式进行列举。

基本使用

Lettuce使用的时候依赖于四个主要组件:

  • RedisURI:连接信息。
  • RedisClientRedis客户端,特殊地,集群连接有一个定制的RedisClusterClient
  • ConnectionRedis连接,主要是StatefulConnection或者StatefulRedisConnection的子类,连接的类型主要由连接的具体方式(单机、哨兵、集群、订阅发布等等)选定,比较重要。
  • RedisCommandsRedis命令API接口,基本上覆盖了Redis发行版本的所有命令,提供了同步(sync)、异步(async)、反应式(reative)的调用方式,对于使用者而言,会经常跟RedisCommands系列接口打交道。

一个基本使用例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void testSetGet() throws Exception {
RedisURI redisUri = RedisURI.builder() // <1> 创建单机连接的连接信息
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri); // <2> 创建客户端
StatefulRedisConnection<String, String> connection = redisClient.connect(); // <3> 创建线程安全的连接
RedisCommands<String, String> redisCommands = connection.sync(); // <4> 创建同步命令
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
String result = redisCommands.set("name", "throwable", setArgs);
Assertions.assertThat(result).isEqualToIgnoringCase("OK");
result = redisCommands.get("name");
Assertions.assertThat(result).isEqualTo("throwable");
// ... 其他操作
connection.close(); // <5> 关闭连接
redisClient.shutdown(); // <6> 关闭客户端
}

注意:

  • <5>:关闭连接一般在应用程序停止之前操作,一个应用程序中的一个Redis驱动实例不需要太多的连接(一般情况下只需要一个连接实例就可以,如果有多个连接的需要可以考虑使用连接池,其实Redis目前处理命令的模块是单线程,在客户端多个连接多线程调用理论上没有效果)。
  • <6>:关闭客户端一般应用程序停止之前操作,如果条件允许的话,基于后开先闭原则,客户端关闭应该在连接关闭之后操作。

API

Lettuce主要提供三种API

  • 同步(sync):RedisCommands
  • 异步(async):RedisAsyncCommands
  • 反应式(reactive):RedisReactiveCommands

先准备好一个单机Redis连接备用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static StatefulRedisConnection<String, String> CONNECTION;
private static RedisClient CLIENT;

@BeforeClass
public static void beforeClass() {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
CLIENT = RedisClient.create(redisUri);
CONNECTION = CLIENT.connect();
}

@AfterClass
public static void afterClass() throws Exception {
CONNECTION.close();
CLIENT.shutdown();
}

Redis命令API的具体实现可以直接从StatefulRedisConnection实例获取,见其接口定义:

1
2
3
4
5
6
7
8
9
10
public interface StatefulRedisConnection<K, V> extends StatefulConnection<K, V> {

boolean isMulti();

RedisCommands<K, V> sync();

RedisAsyncCommands<K, V> async();

RedisReactiveCommands<K, V> reactive();
}

值得注意的是,在不指定编码解码器RedisCodec的前提下,RedisClient创建的StatefulRedisConnection实例一般是泛型实例StatefulRedisConnection,也就是所有命令APIKEYVALUE都是String类型,这种使用方式能满足大部分的使用场景。当然,必要的时候可以定制编码解码器RedisCodec

同步API

先构建RedisCommands实例:

1
2
3
4
5
6
private static RedisCommands<String, String> COMMAND;

@BeforeClass
public static void beforeClass() {
COMMAND = CONNECTION.sync();
}

基本使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void testSyncPing() throws Exception {
String pong = COMMAND.ping();
Assertions.assertThat(pong).isEqualToIgnoringCase("PONG");
}


@Test
public void testSyncSetAndGet() throws Exception {
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
COMMAND.set("name", "throwable", setArgs);
String value = COMMAND.get("name");
log.info("Get value: {}", value);
}

// Get value: throwable

同步API在所有命令调用之后会立即返回结果。如果熟悉Jedis的话,RedisCommands的用法其实和它相差不大。

异步API

先构建RedisAsyncCommands实例:

1
2
3
4
5
6
private static RedisAsyncCommands<String, String> ASYNC_COMMAND;

@BeforeClass
public static void beforeClass() {
ASYNC_COMMAND = CONNECTION.async();
}

基本使用:

1
2
3
4
5
6
@Test
public void testAsyncPing() throws Exception {
RedisFuture<String> redisFuture = ASYNC_COMMAND.ping();
log.info("Ping result:{}", redisFuture.get());
}
// Ping result:PONG

RedisAsyncCommands所有方法执行返回结果都是RedisFuture实例,而RedisFuture接口的定义如下:

1
2
3
4
5
6
public interface RedisFuture<V> extends CompletionStage<V>, Future<V> {

String getError();

boolean await(long timeout, TimeUnit unit) throws InterruptedException;
}

也就是,RedisFuture可以无缝使用Future或者JDK1.8中引入的CompletableFuture提供的方法。举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void testAsyncSetAndGet1() throws Exception {
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
RedisFuture<String> future = ASYNC_COMMAND.set("name", "throwable", setArgs);
// CompletableFuture#thenAccept()
future.thenAccept(value -> log.info("Set命令返回:{}", value));
// Future#get()
future.get();
}
// Set命令返回:OK

@Test
public void testAsyncSetAndGet2() throws Exception {
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
CompletableFuture<Void> result =
(CompletableFuture<Void>) ASYNC_COMMAND.set("name", "throwable", setArgs)
.thenAcceptBoth(ASYNC_COMMAND.get("name"),
(s, g) -> {
log.info("Set命令返回:{}", s);
log.info("Get命令返回:{}", g);
});
result.get();
}
// Set命令返回:OK
// Get命令返回:throwable

如果能熟练使用CompletableFuture和函数式编程技巧,可以组合多个RedisFuture完成一些列复杂的操作。

反应式API

Lettuce引入的反应式编程框架是Project Reactor,如果没有反应式编程经验可以先自行了解一下Project Reactor

构建RedisReactiveCommands实例:

1
2
3
4
5
6
private static RedisReactiveCommands<String, String> REACTIVE_COMMAND;

@BeforeClass
public static void beforeClass() {
REACTIVE_COMMAND = CONNECTION.reactive();
}

根据Project ReactorRedisReactiveCommands的方法如果返回的结果只包含0或1个元素,那么返回值类型是Mono,如果返回的结果包含0到N(N大于0)个元素,那么返回值是Flux。举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
public void testReactivePing() throws Exception {
Mono<String> ping = REACTIVE_COMMAND.ping();
ping.subscribe(v -> log.info("Ping result:{}", v));
Thread.sleep(1000);
}
// Ping result:PONG

@Test
public void testReactiveSetAndGet() throws Exception {
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
REACTIVE_COMMAND.set("name", "throwable", setArgs).block();
REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value));
Thread.sleep(1000);
}
// Get命令返回:throwable

@Test
public void testReactiveSet() throws Exception {
REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block();
Flux<String> flux = REACTIVE_COMMAND.smembers("food");
flux.subscribe(log::info);
REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block();
Thread.sleep(1000);
}
// meat
// bread
// fish

举个更加复杂的例子,包含了事务、函数转换等:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void testReactiveFunctional() throws Exception {
REACTIVE_COMMAND.multi().doOnSuccess(r -> {
REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe();
REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe();
}).flatMap(s -> REACTIVE_COMMAND.exec())
.doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded()))
.subscribe();
Thread.sleep(1000);
}
// OK
// 2
// Discarded:false

这个方法开启一个事务,先把counter设置为1,再将counter自增1。

发布和订阅

非集群模式下的发布订阅依赖于定制的连接StatefulRedisPubSubConnection,集群模式下的发布订阅依赖于定制的连接StatefulRedisClusterPubSubConnection,两者分别来源于RedisClient#connectPubSub()系列方法和RedisClusterClient#connectPubSub()

  • 非集群模式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 可能是单机、普通主从、哨兵等非集群模式的客户端
RedisClient client = ...
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });

// 同步命令
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");

// 异步命令
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe("channel");

// 反应式命令
RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();

reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
  • 集群模式:
1
2
3
4
5
6
7
// 使用方式其实和非集群模式基本一致
RedisClusterClient clusterClient = ...
StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
// ...

这里用单机同步命令的模式举一个Redis键空间通知(Redis Keyspace Notifications)的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Test
public void testSyncKeyspaceNotification() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
// 注意这里只能是0号库
.withDatabase(0)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri);
StatefulRedisConnection<String, String> redisConnection = redisClient.connect();
RedisCommands<String, String> redisCommands = redisConnection.sync();
// 只接收键过期的事件
redisCommands.configSet("notify-keyspace-events", "Ex");
StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();
connection.addListener(new RedisPubSubAdapter<>() {

@Override
public void psubscribed(String pattern, long count) {
log.info("pattern:{},count:{}", pattern, count);
}

@Override
public void message(String pattern, String channel, String message) {
log.info("pattern:{},channel:{},message:{}", pattern, channel, message);
}
});
RedisPubSubCommands<String, String> commands = connection.sync();
commands.psubscribe("[email protected]__:expired");
redisCommands.setex("name", 2, "throwable");
Thread.sleep(10000);
redisConnection.close();
connection.close();
redisClient.shutdown();
}
// pattern:[email protected]__:expired,count:1
// pattern:[email protected]__:expired,channel:[email protected]__:expired,message:name

实际上,在实现RedisPubSubListener的时候可以单独抽离,尽量不要设计成匿名内部类的形式。

事务和批量命令执行

事务相关的命令就是WATCHUNWATCHEXECMULTIDISCARD,在RedisCommands系列接口中有对应的方法。举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 同步模式
@Test
public void testSyncMulti() throws Exception {
COMMAND.multi();
COMMAND.setex("name-1", 2, "throwable");
COMMAND.setex("name-2", 2, "doge");
TransactionResult result = COMMAND.exec();
int index = 0;
for (Object r : result) {
log.info("Result-{}:{}", index, r);
index++;
}
}
// Result-0:OK
// Result-1:OK

RedisPipeline也就是管道机制可以理解为把多个命令打包在一次请求发送到Redis服务端,然后Redis服务端把所有的响应结果打包好一次性返回,从而节省不必要的网络资源(最主要是减少网络请求次数)。Redis对于Pipeline机制如何实现并没有明确的规定,也没有提供特殊的命令支持Pipeline机制。Jedis中底层采用BIO(阻塞IO)通讯,所以它的做法是客户端缓存将要发送的命令,最后需要触发然后同步发送一个巨大的命令列表包,再接收和解析一个巨大的响应列表包。PipelineLettuce中对使用者是透明的,由于底层的通讯框架是Netty,所以网络通讯层面的优化Lettuce不需要过多干预,换言之可以这样理解:NettyLettuce从底层实现了RedisPipeline机制。但是,Lettuce的异步API也提供了手动Flush的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void testAsyncManualFlush() {
// 取消自动flush
ASYNC_COMMAND.setAutoFlushCommands(false);
List<RedisFuture<?>> redisFutures = Lists.newArrayList();
int count = 5000;
for (int i = 0; i < count; i++) {
String key = "key-" + (i + 1);
String value = "value-" + (i + 1);
redisFutures.add(ASYNC_COMMAND.set(key, value));
redisFutures.add(ASYNC_COMMAND.expire(key, 2));
}
long start = System.currentTimeMillis();
ASYNC_COMMAND.flushCommands();
boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0]));
Assertions.assertThat(result).isTrue();
log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start);
}
// Lettuce cost:1302 ms

上面只是从文档看到的一些理论术语,但是现实是骨感的,对比了下JedisPipeline提供的方法,发现了JedisPipeline执行耗时比较低:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void testJedisPipeline() throws Exception {
Jedis jedis = new Jedis();
Pipeline pipeline = jedis.pipelined();
int count = 5000;
for (int i = 0; i < count; i++) {
String key = "key-" + (i + 1);
String value = "value-" + (i + 1);
pipeline.set(key, value);
pipeline.expire(key, 2);
}
long start = System.currentTimeMillis();
pipeline.syncAndReturnAll();
log.info("Jedis cost:{} ms", System.currentTimeMillis() - start);
}
// Jedis cost:9 ms

个人猜测Lettuce可能底层并非合并所有命令一次发送(甚至可能是单条发送),具体可能需要抓包才能定位。依此来看,如果真的有大量执行Redis命令的场景,不妨可以使用JedisPipeline

注意:由上面的测试推断RedisTemplateexecutePipelined()方法是假的Pipeline执行方法,使用RedisTemplate的时候请务必注意这一点。

Lua脚本执行

Lettuce中执行RedisLua命令的同步接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface RedisScriptingCommands<K, V> {

<T> T eval(String var1, ScriptOutputType var2, K... var3);

<T> T eval(String var1, ScriptOutputType var2, K[] var3, V... var4);

<T> T evalsha(String var1, ScriptOutputType var2, K... var3);

<T> T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4);

List<Boolean> scriptExists(String... var1);

String scriptFlush();

String scriptKill();

String scriptLoad(V var1);

String digest(V var1);
}

异步和反应式的接口方法定义差不多,不同的地方就是返回值类型,一般我们常用的是eval()evalsha()scriptLoad()方法。举个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static RedisCommands<String, String> COMMANDS;
private static String RAW_LUA = "local key = KEYS[1]\n" +
"local value = ARGV[1]\n" +
"local timeout = ARGV[2]\n" +
"redis.call('SETEX', key, tonumber(timeout), value)\n" +
"local result = redis.call('GET', key)\n" +
"return result;";
private static AtomicReference<String> LUA_SHA = new AtomicReference<>();

@Test
public void testLua() throws Exception {
LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA));
String[] keys = new String[]{"name"};
String[] args = new String[]{"throwable", "5000"};
String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args);
log.info("Get value:{}", result);
}
// Get value:throwable

高可用和分片

为了Redis的高可用,一般会采用普通主从(Master/Replica,这里笔者称为普通主从模式,也就是仅仅做了主从复制,故障需要手动切换)、哨兵和集群。普通主从模式可以独立运行,也可以配合哨兵运行,只是哨兵提供自动故障转移和主节点提升功能。普通主从和哨兵都可以使用MasterSlave,通过入参包括RedisClient、编码解码器以及一个或者多个RedisURI获取对应的Connection实例。

这里注意一点MasterSlave中提供的方法如果只要求传入一个RedisURI实例,那么Lettuce会进行拓扑发现机制,自动获取Redis主从节点信息;如果要求传入一个RedisURI集合,那么对于普通主从模式来说所有节点信息是静态的,不会进行发现和更新。

拓扑发现的规则如下:

  • 对于普通主从(Master/Replica)模式,不需要感知RedisURI指向从节点还是主节点,只会进行一次性的拓扑查找所有节点信息,此后节点信息会保存在静态缓存中,不会更新。
  • 对于哨兵模式,会订阅所有哨兵实例并侦听订阅/发布消息以触发拓扑刷新机制,更新缓存的节点信息,也就是哨兵天然就是动态发现节点信息,不支持静态配置。

拓扑发现机制的提供APITopologyProvider,需要了解其原理的可以参考具体的实现。

对于集群(Cluster)模式,Lettuce提供了一套独立的API

另外,如果Lettuce连接面向的是非单个Redis节点,连接实例提供了数据读取节点偏好ReadFrom)设置,可选值有:

  • MASTER:只从Master节点中读取。
  • MASTER_PREFERRED:优先从Master节点中读取。
  • SLAVE_PREFERRED:优先从Slavor节点中读取。
  • SLAVE:只从Slavor节点中读取。
  • NEAREST:使用最近一次连接的Redis实例读取。

普通主从模式

假设现在有三个Redis服务形成树状主从关系如下:

  • 节点一:localhost:6379,角色为Master。
  • 节点二:localhost:6380,角色为Slavor,节点一的从节点。
  • 节点三:localhost:6381,角色为Slavor,节点二的从节点。

首次动态节点发现主从模式的节点信息需要如下构建连接:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void testDynamicReplica() throws Exception {
// 这里只需要配置一个节点的连接信息,不一定需要是主节点的信息,从节点也可以
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
RedisClient redisClient = RedisClient.create(uri);
StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri);
// 只从从节点读取数据
connection.setReadFrom(ReadFrom.SLAVE);
// 执行其他Redis命令
connection.close();
redisClient.shutdown();
}

如果需要指定静态的Redis主从节点连接属性,那么可以这样构建连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void testStaticReplica() throws Exception {
List<RedisURI> uris = new ArrayList<>();
RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build();
RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build();
RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build();
uris.add(uri1);
uris.add(uri2);
uris.add(uri3);
RedisClient redisClient = RedisClient.create();
StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient,
new Utf8StringCodec(), uris);
// 只从主节点读取数据
connection.setReadFrom(ReadFrom.MASTER);
// 执行其他Redis命令
connection.close();
redisClient.shutdown();
}

哨兵模式

由于Lettuce自身提供了哨兵的拓扑发现机制,所以只需要随便配置一个哨兵节点的RedisURI实例即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void testDynamicSentinel() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withPassword("你的密码")
.withSentinel("localhost", 26379)
.withSentinelMasterId("哨兵Master的ID")
.build();
RedisClient redisClient = RedisClient.create();
StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri);
// 只允许从从节点读取数据
connection.setReadFrom(ReadFrom.SLAVE);
RedisCommands<String, String> command = connection.sync();
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
command.set("name", "throwable", setArgs);
String value = command.get("name");
log.info("Get value:{}", value);
}
// Get value:throwable

集群模式

鉴于笔者对Redis集群模式并不熟悉,Cluster模式下的API使用本身就有比较多的限制,所以这里只简单介绍一下怎么用。先说几个特性:

下面的API提供跨槽位(Slot)调用的功能

  • RedisAdvancedClusterCommands
  • RedisAdvancedClusterAsyncCommands
  • RedisAdvancedClusterReactiveCommands

静态节点选择功能:

  • masters:选择所有主节点执行命令。
  • slaves:选择所有从节点执行命令,其实就是只读模式。
  • all nodes:命令可以在所有节点执行。

集群拓扑视图动态更新功能:

  • 手动更新,主动调用RedisClusterClient#reloadPartitions()
  • 后台定时更新。
  • 自适应更新,基于连接断开和MOVED/ASK命令重定向自动更新。

Redis集群搭建详细过程可以参考官方文档,假设已经搭建好集群如下(192.168.56.200是笔者的虚拟机Host):

  • 192.168.56.200:7001 => 主节点,槽位0-5460。
  • 192.168.56.200:7002 => 主节点,槽位5461-10922。
  • 192.168.56.200:7003 => 主节点,槽位10923-16383。
  • 192.168.56.200:7004 => 7001的从节点。
  • 192.168.56.200:7005 => 7002的从节点。
  • 192.168.56.200:7006 => 7003的从节点。

简单的集群连接和使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testSyncCluster(){
RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build();
RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.setex("name",10, "throwable");
String value = commands.get("name");
log.info("Get value:{}", value);
}
// Get value:throwable

节点选择:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testSyncNodeSelection() {
RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
// commands.all(); // 所有节点
// commands.masters(); // 主节点
// 从节点只读
NodeSelection<String, String> replicas = commands.slaves();
NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands();
// 这里只是演示,一般应该禁用keys *命令
Executions<List<String>> keys = nodeSelectionCommands.keys("*");
keys.forEach(key -> log.info("key: {}", key));
connection.close();
redisClusterClient.shutdown();
}

定时更新集群拓扑视图(每隔十分钟更新一次,这个时间自行考量,不能太频繁):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void testPeriodicClusterTopology() throws Exception {
RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions
.builder()
.enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES))
.build();
redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.setex("name", 10, "throwable");
String value = commands.get("name");
log.info("Get value:{}", value);
Thread.sleep(Integer.MAX_VALUE);
connection.close();
redisClusterClient.shutdown();
}

自适应更新集群拓扑视图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void testAdaptiveClusterTopology() throws Exception {
RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder()
.enableAdaptiveRefreshTrigger(
ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,
ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS
)
.adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS))
.build();
redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.setex("name", 10, "throwable");
String value = commands.get("name");
log.info("Get value:{}", value);
Thread.sleep(Integer.MAX_VALUE);
connection.close();
redisClusterClient.shutdown();
}

动态命令和自定义命令

自定义命令是Redis命令有限集,不过可以更细粒度指定KEYARGV、命令类型、编码解码器和返回值类型,依赖于dispatch()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 自定义实现PING方法
@Test
public void testCustomPing() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri);
StatefulRedisConnection<String, String> connect = redisClient.connect();
RedisCommands<String, String> sync = connect.sync();
RedisCodec<String, String> codec = StringCodec.UTF8;
String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec));
log.info("PING:{}", result);
connect.close();
redisClient.shutdown();
}
// PING:PONG

// 自定义实现Set方法
@Test
public void testCustomSet() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri);
StatefulRedisConnection<String, String> connect = redisClient.connect();
RedisCommands<String, String> sync = connect.sync();
RedisCodec<String, String> codec = StringCodec.UTF8;
sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec),
new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable"));
String result = sync.get("name");
log.info("Get value:{}", result);
connect.close();
redisClient.shutdown();
}
// Get value:throwable

动态命令是基于Redis命令有限集,并且通过注解和动态代理完成一些复杂命令组合的实现。主要注解在io.lettuce.core.dynamic.annotation包路径下。简单举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public interface CustomCommand extends Commands {

// SET [key] [value]
@Command("SET ?0 ?1")
String setKey(String key, String value);

// SET [key] [value]
@Command("SET :key :value")
String setKeyNamed(@Param("key") String key, @Param("value") String value);

// MGET [key1] [key2]
@Command("MGET ?0 ?1")
List<String> mGet(String key1, String key2);
/**
* 方法名作为命令
*/
@CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME)
String mSet(String key1, String value1, String key2, String value2);
}


@Test
public void testCustomDynamicSet() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri);
StatefulRedisConnection<String, String> connect = redisClient.connect();
RedisCommandFactory commandFactory = new RedisCommandFactory(connect);
CustomCommand commands = commandFactory.getCommands(CustomCommand.class);
commands.setKey("name", "throwable");
commands.setKeyNamed("throwable", "doge");
log.info("MGET ===> " + commands.mGet("name", "throwable"));
commands.mSet("key1", "value1","key2", "value2");
log.info("MGET ===> " + commands.mGet("key1", "key2"));
connect.close();
redisClient.shutdown();
}
// MGET ===> [throwable, doge]
// MGET ===> [value1, value2]

高阶特性

Lettuce有很多高阶使用特性,这里只列举个人认为常用的两点:

  • 配置客户端资源。
  • 使用连接池。

更多其他特性可以自行参看官方文档。

配置客户端资源

客户端资源的设置与Lettuce的性能、并发和事件处理相关。线程池或者线程组相关配置占据客户端资源配置的大部分(EventLoopGroupsEventExecutorGroup),这些线程池或者线程组是连接程序的基础组件。一般情况下,客户端资源应该在多个Redis客户端之间共享,并且在不再使用的时候需要自行关闭。笔者认为,客户端资源是面向Netty的。注意:除非特别熟悉或者花长时间去测试调整下面提到的参数,否则在没有经验的前提下凭直觉修改默认值,有可能会踩坑。

客户端资源接口是ClientResources,实现类是DefaultClientResources

构建DefaultClientResources实例:

1
2
3
4
5
6
7
8
// 默认
ClientResources resources = DefaultClientResources.create();

// 建造器
ClientResources resources = DefaultClientResources.builder()
.ioThreadPoolSize(4)
.computationThreadPoolSize(4)
.build()

使用:

1
2
3
4
5
6
7
8
9
10
ClientResources resources = DefaultClientResources.create();
// 非集群
RedisClient client = RedisClient.create(resources, uri);
// 集群
RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris);
// ......
client.shutdown();
clusterClient.shutdown();
// 关闭资源
resources.shutdown();

客户端资源基本配置:

属性 描述 默认值
ioThreadPoolSize I/O线程数 Runtime.getRuntime().availableProcessors()
computationThreadPoolSize 任务线程数 Runtime.getRuntime().availableProcessors()

客户端资源高级配置:

属性 描述 默认值
eventLoopGroupProvider EventLoopGroup提供商 -
eventExecutorGroupProvider EventExecutorGroup提供商 -
eventBus 事件总线 DefaultEventBus
commandLatencyCollectorOptions 命令延时收集器配置 DefaultCommandLatencyCollectorOptions
commandLatencyCollector 命令延时收集器 DefaultCommandLatencyCollector
commandLatencyPublisherOptions 命令延时发布器配置 DefaultEventPublisherOptions
dnsResolver DNS处理器 JDK或者Netty提供
reconnectDelay 重连延时配置 Delay.exponential()
nettyCustomizer Netty自定义配置器 -
tracing 轨迹记录器 -

非集群客户端RedisClient的属性配置:

Redis非集群客户端RedisClient本身提供了配置属性方法:

1
2
3
4
5
RedisClient client = RedisClient.create(uri);
client.setOptions(ClientOptions.builder()
.autoReconnect(false)
.pingBeforeActivateConnection(true)
.build());

非集群客户端的配置属性列表:

属性 描述 默认值
pingBeforeActivateConnection 连接激活之前是否执行PING命令 false
autoReconnect 是否自动重连 true
cancelCommandsOnReconnectFailure 重连失败是否拒绝命令执行 false
suspendReconnectOnProtocolFailure 底层协议失败是否挂起重连操作 false
requestQueueSize 请求队列容量 2147483647(Integer#MAX_VALUE)
disconnectedBehavior 失去连接时候的行为 DEFAULT
sslOptions SSL配置 -
socketOptions Socket配置 10 seconds Connection-Timeout, no keep-alive, no TCP noDelay
timeoutOptions 超时配置 -
publishOnScheduler 发布反应式信号数据的调度器 使用I/O线程

集群客户端属性配置:

Redis集群客户端RedisClusterClient本身提供了配置属性方法:

1
2
3
4
5
6
7
8
9
RedisClusterClient client = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES))
.enableAllAdaptiveRefreshTriggers()
.build();

client.setOptions(ClusterClientOptions.builder()
.topologyRefreshOptions(topologyRefreshOptions)
.build());

集群客户端的配置属性列表:

属性 描述 默认值
enablePeriodicRefresh 是否允许周期性更新集群拓扑视图 false
refreshPeriod 更新集群拓扑视图周期 60秒
enableAdaptiveRefreshTrigger 设置自适应更新集群拓扑视图触发器RefreshTrigger -
adaptiveRefreshTriggersTimeout 自适应更新集群拓扑视图触发器超时设置 30秒
refreshTriggersReconnectAttempts 自适应更新集群拓扑视图触发重连次数 5
dynamicRefreshSources 是否允许动态刷新拓扑资源 true
closeStaleConnections 是否允许关闭陈旧的连接 true
maxRedirects 集群重定向次数上限 5
validateClusterNodeMembership 是否校验集群节点的成员关系 true

使用连接池

Lettuce 和 Jedis 的都是连接Redis Server的客户端程序。Jedis在实现上是直连redis server,多线程环境下非线程安全,除非使用连接池,为每个Jedis实例增加物理连接,当连接数量增多时,物理连接成本就较高了。Lettuce基于Netty的连接实例(StatefulRedisConnection),可以在多个线程间并发访问,且线程安全,满足多线程环境下的并发访问,同时它是可伸缩的设计,一个连接实例不够的情况也可以按需增加连接实例,所以 Lettuce 可以帮助我们充分利用异步的优势。

Lettuce 和 Jedis 的都是连接Redis Server的客户端程序。Jedis在实现上是直连redis server,多线程环境下非线程安全,除非使用连接池,为每个Jedis实例增加物理连接,当连接数量增多时,物理连接成本就较高了。Lettuce基于Netty的连接实例(StatefulRedisConnection),可以在多个线程间并发访问,且线程安全,满足多线程环境下的并发访问,同时它是可伸缩的设计,一个连接实例不够的情况也可以按需增加连接实例,所以 Lettuce 可以帮助我们充分利用异步的优势。

同步连接池

使用命令式编程,同步连接池是正确的选择,因为它在用于执行执行Redis命令的线程上执行所有操作.

前提条件
Lettuce需要依赖 Apache的 common-pool2(至少是2.2)提供连接池. 确认在你的classpath下包含这个依赖.否则你就不能使用连接池.
如果使用Maven,向你的pom.xml添加如下依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.3</version>
</dependency>

基本使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void testUseConnectionPool() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient redisClient = RedisClient.create(redisUri);
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
GenericObjectPool<StatefulRedisConnection<String, String>> pool
= ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig);
try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {
RedisCommands<String, String> command = connection.sync();
SetArgs setArgs = SetArgs.Builder.nx().ex(5);
command.set("name", "throwable", setArgs);
String n = command.get("name");
log.info("Get value:{}", n);
}
pool.close();
redisClient.shutdown();
}

Lettuce提供通用连接池支持,它需要一个用于创建任何支持类型连接(单个,发布订阅,哨兵,主从,集群)的提供者. 其中,同步连接的池化支持需要用ConnectionPoolSupport,异步连接的池化支持需要用AsyncConnectionPoolSupportLettuce5.1之后才支持)。ConnectionPoolSupport 将根据你的需求创建一个 GenericObjectPool或SoftReferenceObjectPool. 连接池可以分配包装类型或直接连接

  • 包装实例在调用StatefulConnection.close()时,会将连接归还到连接池
  • 直接连接需要调用GenericObjectPool.returnObject(…)归还到连接池

基本用法:

包装连接

1
2
3
4
5
6
7
8
9
10
11
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxIdle(2);

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool(
() -> client.connect(), poolConfig);
for (int i = 0; i < 10; i++) {
StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
sync.ping();
connection.close();
}

直接连接

1
2
3
4
5
6
7
8
9
10
11
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxIdle(2);

GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool(
() -> client.connect(), poolConfig);
for (int i = 0; i < 10; i++) {
StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> sync = connection.sync();
sync.ping();
connection.close();
}

小结

Lettuce 值得我们深入学习一下,官方文档

1
https://github.com/lettuce-io/lettuce-core/wiki
坚持技术分享,您的支持将鼓励我继续创作!