Lettuce是一个高性能基于Java编写的Redis驱动框架,底层集成了Project?Reactor提供天然的反应式编程,通信框架集成了Netty使用了非阻塞IO,5.x版本之后融合了JDK1.8的异步编程特性,在保证高性能的同时提供了十分丰富易用的API。本文主要介绍使用lettuce操作redis,使用到的软件版本:Java?1.8.0_191、Redis?5.0.8、lettuce?5.3.1.RELEASE。
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.3.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.0</version>
</dependency>
package com.shangjack.redis;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SetArgs;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.api.sync.RedisCommands;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.Future;
/**
*?使用Lettuce操作redis
*/
public class LettuceBaseCase {
private static RedisClient client;
private StatefulRedisConnection<String, String> connection;
????@Before
public void before() {
RedisURI redisUri = RedisURI.builder()
.withHost("10.49.196.10").withPort(6379).withPassword("123456")
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
client = RedisClient.create(redisUri);
connection = client.connect();
}
????@After
public void after() {
connection.close();
client.shutdown();
}
/**
*?同步操作
* api和jedis很类型
*/
????@Test
public void sync() {
RedisCommands<String, String> commands = connection.sync();
String result = commands.set("name", "mayun");
System.out.println(result);
SetArgs args = SetArgs.Builder.nx().ex(10);
result = commands.set("age", "30", args);
System.out.println(result);
}
/**
*?异步操作
*/
????@Test
public void async() throws Exception {
RedisAsyncCommands<String, String> commands = connection.async();
Future<String> future = commands.set("name", "mayun");
System.out.println(future.get());
SetArgs args = SetArgs.Builder.nx().ex(10);
future = commands.set("age", "30", args);
System.out.println(future.get());
}
/**
*?响应式API
*/
????@Test
public void reactive() throws Exception {
RedisReactiveCommands<String, String> commands = connection.reactive();
Mono<String> result = commands.set("name", "mayun");
System.out.println(result.block());
SetArgs args = SetArgs.Builder.nx().ex(10);
result = commands.set("age", "30", args);
result.subscribe(value -> System.out.println(value));
//开启一个事务,先把counter设置为1,再将counter自增1
commands.multi().doOnSuccess(r -> {
commands.set("count", "1").doOnNext(System.out::println).subscribe();
commands.incr("count").doOnNext(c -> System.out.println(c)).subscribe();
}).flatMap(s -> commands.exec())
.doOnNext(transactionResult -> System.out.println(transactionResult.wasDiscarded())).subscribe();
Thread.sleep(1000 * 5);
}
}
package com.shangjack.redis;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.Executions;
import io.lettuce.core.cluster.api.sync.NodeSelection;
import io.lettuce.core.cluster.api.sync.NodeSelectionCommands;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.masterreplica.MasterReplica;
import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.junit.Test;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
/**
*?高级操作
*/
public class LettuceAdvanceCase {
/**
*?主从模式操作
*/
????@Test
public void masterSlave() {
//这里只需要配置一个节点的连接信息,不一定需要是主节点的信息,从节点也可以;可以自动发现主从节点
RedisURI uri = RedisURI.builder().withHost("10.16.60.42").withPort(6379).withPassword("123456").build();
RedisClient client = RedisClient.create(uri);
StatefulRedisMasterReplicaConnection<String, String> connection = MasterReplica.connect(client, StringCodec.UTF8, uri);
//从节点读书数据
connection.setReadFrom(ReadFrom.REPLICA);
RedisCommands<String, String> commands = connection.sync();
commands.set("name", "刘备");
System.out.println(commands.get("name"));
connection.close();
client.shutdown();
}
/**
*?主从模式操作2
*/
????@Test
public void masterSlave2() {
List<RedisURI> uris = new ArrayList();
uris.add(RedisURI.builder().withHost("10.16.60.42").withPort(6379).withPassword("123456").build());
uris.add(RedisURI.builder().withHost("10.16.60.43").withPort(6379).withPassword("123456").build());
uris.add(RedisURI.builder().withHost("10.16.60.44").withPort(6379).withPassword("123456").build());
RedisClient client = RedisClient.create();
StatefulRedisMasterReplicaConnection<String, String> connection = MasterReplica.connect(client, StringCodec.UTF8, uris);
//从节点读书数据
connection.setReadFrom(ReadFrom.REPLICA);
RedisCommands<String, String> commands = connection.sync();
commands.set("name", "张飞");
System.out.println(commands.get("name"));
connection.close();
client.shutdown();
}
/**
*?哨兵模式操作
*/
????@Test
public void sentinel() {
List<RedisURI> uris = new ArrayList();
uris.add(RedisURI.builder().withSentinel("10.16.60.42", 26379).withSentinelMasterId("mymaster").withPassword("123456").build());
uris.add(RedisURI.builder().withSentinel("10.16.60.43", 26379).withSentinelMasterId("mymaster").withPassword("123456").build());
uris.add(RedisURI.builder().withSentinel("10.16.60.44", 26379).withSentinelMasterId("mymaster").withPassword("123456").build());
RedisClient client = RedisClient.create();
StatefulRedisMasterReplicaConnection<String, String> connection = MasterReplica.connect(client, StringCodec.UTF8, uris);
//从节点读书数据
connection.setReadFrom(ReadFrom.REPLICA);
RedisCommands<String, String> commands = connection.sync();
commands.set("name", "赵云");
System.out.println(commands.get("name"));
connection.close();
client.shutdown();
}
/**
*?集群操作
*/
????@Test
public void cluster() {
Set<RedisURI> uris = new HashSet<>();
uris.add(RedisURI.builder().withHost("10.16.60.42").withPort(7000).withPassword("123456").build());
uris.add(RedisURI.builder().withHost("10.16.60.42").withPort(7001).withPassword("123456").build());
uris.add(RedisURI.builder().withHost("10.16.60.43").withPort(7000).withPassword("123456").build());
uris.add(RedisURI.builder().withHost("10.16.60.43").withPort(7001).withPassword("123456").build());
uris.add(RedisURI.builder().withHost("10.16.60.44").withPort(7000).withPassword("123456").build());
uris.add(RedisURI.builder().withHost("10.16.60.44").withPort(7001).withPassword("123456").build());
RedisClusterClient client = RedisClusterClient.create(uris);
StatefulRedisClusterConnection<String, String> connection = client.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.set("name", "关羽");
System.out.println(commands.get("name"));
//选择从节点,只读
NodeSelection<String, String> replicas = commands.replicas();
NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands();
Executions<List<String>> keys = nodeSelectionCommands.keys("*");
keys.forEach(key -> System.out.println(key));
connection.close();
client.shutdown();
}
/**
*?配置客户端资源(ClientResources)及客户端参数(ClientOptions)
*/
????@Test
public void resourceAndOption() {
ClientResources resources = DefaultClientResources.builder()
.ioThreadPoolSize(4) //I/O线程数
.computationThreadPoolSize(4) //任务线程数
.build();
RedisURI redisUri = RedisURI.builder()
.withHost("10.49.196.10").withPort(6379).withPassword("123456")
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
ClientOptions options = ClientOptions.builder()
.autoReconnect(true)//是否自动重连
.pingBeforeActivateConnection(true)//连接激活之前是否执行PING命令
.build();
RedisClient client = RedisClient.create(resources, redisUri);
client.setOptions(options);
StatefulRedisConnection<String, String> connection = client.connect();
RedisCommands<String, String> commands = connection.sync();
System.out.println(commands.get("name"));
connection.close();
client.shutdown();
resources.shutdown();
}
/**
*?配置客户端资源(ClientResources)及客户端参数(ClientOptions)
*?集群
*/
????@Test
public void resourceAndOption2() {
ClientResources resources = DefaultClientResources.builder()
.ioThreadPoolSize(4) //I/O线程数
.computationThreadPoolSize(4) //任务线程数
.build();
//集群地址,配置其中一个即可,不需要配置全
RedisURI redisUri = RedisURI.builder()
.withHost("10.16.60.42").withPort(7000).withPassword("123456")
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
ClusterClientOptions options = ClusterClientOptions.builder()
.autoReconnect(true)//是否自动重连
.pingBeforeActivateConnection(true)//连接激活之前是否执行PING命令
.validateClusterNodeMembership(true)//是否校验集群节点的成员关系
.build();
RedisClusterClient client = RedisClusterClient.create(resources, redisUri);
client.setOptions(options);
StatefulRedisClusterConnection<String, String> connection = client.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
System.out.println(commands.get("name"));
connection.close();
client.shutdown();
resources.shutdown();
}
/**
*?连接池
*/
????@Test
public void pool() throws Exception {
RedisURI redisUri = RedisURI.builder()
.withHost("10.49.196.10").withPort(6379).withPassword("123456")
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClient client = RedisClient.create(redisUri);
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool(client::connect, poolConfig);
StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> commands = connection.sync();
System.out.println(commands.get("name"));
connection.close();
pool.close();
client.shutdown();
}
}