Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import io.lettuce.core.MaintNotificationsConfig.EndpointTypeSource;
import io.lettuce.core.api.BaseRedisClient;
import reactor.core.publisher.Mono;
import io.lettuce.core.event.command.CommandListener;
import io.lettuce.core.event.connection.ConnectEvent;
import io.lettuce.core.event.connection.ConnectionCreatedEvent;
Expand Down Expand Up @@ -233,8 +233,8 @@ protected List<CommandListener> getCommandListeners() {
* @param connectionBuilder connection builder to configure the connection
* @param redisURI URI of the Redis instance
*/
protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
RedisURI redisURI) {
protected void connectionBuilder(Supplier<CompletionStage<SocketAddress>> socketAddressSupplier,
ConnectionBuilder connectionBuilder, RedisURI redisURI) {
connectionBuilder(socketAddressSupplier, connectionBuilder, connectionEvents, redisURI);
}

Expand All @@ -247,8 +247,8 @@ protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, Conn
* @param redisURI URI of the Redis instance
* @since 6.2
*/
protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
ConnectionEvents connectionEvents, RedisURI redisURI) {
protected void connectionBuilder(Supplier<CompletionStage<SocketAddress>> socketAddressSupplier,
ConnectionBuilder connectionBuilder, ConnectionEvents connectionEvents, RedisURI redisURI) {

Bootstrap redisBootstrap = new Bootstrap();
redisBootstrap.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
Expand Down Expand Up @@ -349,7 +349,7 @@ protected <T> T getConnection(CompletableFuture<T> connectionFuture) {
protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
ConnectionBuilder connectionBuilder) {

Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress();
Supplier<CompletionStage<SocketAddress>> socketAddressSupplier = connectionBuilder.socketAddress();

if (clientResources.eventExecutorGroup().isShuttingDown()) {
throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
Expand All @@ -367,15 +367,22 @@ protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initia
channelReadyFuture.whenComplete((channel, throwable) -> {
event.record();
});

socketAddressSupplier.doOnError(socketAddressFuture::completeExceptionally).doOnNext(socketAddressFuture::complete)
.subscribe(redisAddress -> {

if (channelReadyFuture.isCancelled()) {
return;
}
// handle synchronous exceptions during get(), before obtaining the CompletionStage
try {
socketAddressSupplier.get().thenAccept(redisAddress -> {
socketAddressFuture.complete(redisAddress);
if (!channelReadyFuture.isCancelled()) {
initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress);
}, channelReadyFuture::completeExceptionally);
}
}).exceptionally(error -> {
socketAddressFuture.completeExceptionally(error);
channelReadyFuture.completeExceptionally(error);
return null;
});
} catch (Exception e) {
socketAddressFuture.completeExceptionally(e);
channelReadyFuture.completeExceptionally(e);
}

return new DefaultConnectionFuture<>(socketAddressFuture,
channelReadyFuture.thenApply(channel -> (T) connectionBuilder.connection()));
Expand Down
39 changes: 37 additions & 2 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@
import io.netty.util.concurrent.ImmediateEventExecutor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -134,6 +136,38 @@ public abstract class AbstractRedisReactiveCommands<K, V>

private volatile EventExecutorGroup scheduler;

/**
* A thin adapter — no copying, delegates directly to ContextView
* <p>
* Adapts a {@link ContextView} to the {@link Map} interface. This allows reusing existing code that expects a
* <code>Map</code> to represent a context.
*/
static class ContextViewMapAdapter extends AbstractMap<Object, Object> {

private final ContextView ctx;

public ContextViewMapAdapter(ContextView ctx) {
this.ctx = ctx;
}

@Override
public Object get(Object key) {
return ctx.hasKey(key) ? ctx.get(key) : null;
}

@Override
public boolean containsKey(Object key) {
return ctx.hasKey(key);
}

@Override
public Set<Entry<Object, Object>> entrySet() {
// no need to implement, just for the sake of AbstractMap contract
throw new UnsupportedOperationException();
}

}

/**
* Initialize a new instance.
*
Expand Down Expand Up @@ -747,10 +781,11 @@ private <T> Flux<T> createFlux(Supplier<RedisCommand<K, V, T>> commandSupplier,
}

private Mono<TraceContext> withTraceContext() {

return Tracing.getContext()
.switchIfEmpty(Mono.fromSupplier(() -> clientResources.tracing().initialTraceContextProvider()))
.flatMap(TraceContextProvider::getTraceContextLater).defaultIfEmpty(TraceContext.EMPTY);
.flatMap(p -> Mono
.deferContextual(ctx -> Mono.justOrEmpty(p.getTraceContextAsync(new ContextViewMapAdapter(ctx)).get())))
.defaultIfEmpty(TraceContext.EMPTY);
}

protected <T> Mono<T> createMono(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.lettuce.core.protocol.ReadOnlyCommands;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.resource.ClientResources;
import reactor.core.publisher.Mono;

/**
* Client Options to control the behavior of {@link RedisClient}.
Expand Down Expand Up @@ -432,7 +431,7 @@ public Builder scriptCharset(Charset scriptCharset) {
/**
* Set a custom implementation for the {@link JsonParser} to use.
*
* @param parser a {@link Mono} that emits the {@link JsonParser} to use.
* @param parser a {@link Supplier} that emits the {@link JsonParser} to use.
* @return {@code this}
* @see JsonParser
* @since 6.5
Expand Down
16 changes: 6 additions & 10 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;

import io.lettuce.core.protocol.MaintenanceAwareComponent;
import io.lettuce.core.protocol.MaintenanceAwareConnectionWatchdog;
import reactor.core.publisher.Mono;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.CommandEncoder;
import io.lettuce.core.protocol.CommandHandler;
Expand All @@ -51,8 +51,6 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.AttributeKey;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
* Connection builder for connections. This class is part of the internal API.
Expand All @@ -62,13 +60,11 @@
*/
public class ConnectionBuilder {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionBuilder.class);

public static final AttributeKey<String> REDIS_URI = AttributeKey.valueOf("RedisURI");

public static final AttributeKey<Throwable> INIT_FAILURE = AttributeKey.valueOf("ConnectionBuilder.INIT_FAILURE");

private Mono<SocketAddress> socketAddressSupplier;
private Supplier<CompletionStage<SocketAddress>> socketAddressSupplier;

private ConnectionEvents connectionEvents;

Expand Down Expand Up @@ -156,7 +152,7 @@ protected ConnectionWatchdog createConnectionWatchdog() {
if (clientOptions.getMaintNotificationsConfig().maintNotificationsEnabled()) {
MaintenanceAwareConnectionWatchdog maintenanceAwareWatchdog = new MaintenanceAwareConnectionWatchdog(
clientResources.reconnectDelay(), clientOptions, bootstrap, clientResources.timer(),
clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener, connection,
clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,
clientResources.eventBus(), endpoint);
if (connection.getChannelWriter() instanceof MaintenanceAwareComponent) {
maintenanceAwareWatchdog.setMaintenanceEventListener((MaintenanceAwareComponent) connection.getChannelWriter());
Expand All @@ -165,7 +161,7 @@ protected ConnectionWatchdog createConnectionWatchdog() {
} else {
watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,
clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,
connection, clientResources.eventBus(), endpoint);
clientResources.eventBus(), endpoint);
}

endpoint.registerConnectionWatchdog(watchdog);
Expand All @@ -178,12 +174,12 @@ public ChannelInitializer<Channel> build(SocketAddress socketAddress) {
return new PlainChannelInitializer(this::buildHandlers, clientResources);
}

public ConnectionBuilder socketAddressSupplier(Mono<SocketAddress> socketAddressSupplier) {
public ConnectionBuilder socketAddressSupplier(Supplier<CompletionStage<SocketAddress>> socketAddressSupplier) {
this.socketAddressSupplier = socketAddressSupplier;
return this;
}

public Mono<SocketAddress> socketAddress() {
public Supplier<CompletionStage<SocketAddress>> socketAddress() {
LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set");
return socketAddressSupplier;
}
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/io/lettuce/core/Pair.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.lettuce.core;

public class Pair<T1, T2> {

final T1 t1;

final T2 t2;

Pair(T1 t1, T2 t2) {
this.t1 = t1;
this.t2 = t2;
}

public T1 getT1() {
return t1;
}

public T2 getT2() {
return t2;
}

public static <T1, T2> Pair<T1, T2> of(T1 t1, T2 t2) {
return new Pair<>(t1, t2);
}

}
Loading
Loading