Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.12.1</version>
<version>2.13.1</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/redis/clients/jedis/util/Pool.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,29 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisException;

public class Pool<T> extends GenericObjectPool<T> {

private static final Logger log = LoggerFactory.getLogger(Pool.class);

/**
* Set while the current thread is executing inside {@link #returnBrokenResource(Object)}.
* <p>
* commons-pool2 2.13.0 changed {@link GenericObjectPool#invalidateObject(Object)} to call
* {@link GenericObjectPool#addObject()} unconditionally after destroying the invalidated
* instance, in order to keep the pool warm. When the upstream is unreachable that implicit
* replacement attempt now fails with the factory's exception, which prior to 2.13 did not
* surface from {@code returnBrokenResource}. This flag lets the {@link #addObject()} override
* recognise the implicit call and swallow its failure, restoring the pre-2.13 contract for
* the broken-return path while preserving full error semantics for every other caller of
* {@code addObject()} / {@link #addObjects(int)}.
*/
private final ThreadLocal<Boolean> inReturnBrokenResource =
ThreadLocal.withInitial(() -> Boolean.FALSE);

// Legacy
public Pool(GenericObjectPoolConfig<T> poolConfig, PooledObjectFactory<T> factory) {
this(factory, poolConfig);
Expand Down Expand Up @@ -58,10 +77,34 @@ public void returnBrokenResource(final T resource) {
if (resource == null) {
return;
}
inReturnBrokenResource.set(Boolean.TRUE);
try {
super.invalidateObject(resource);
} catch (Exception e) {
throw new JedisException("Could not return the broken resource to the pool", e);
} finally {
inReturnBrokenResource.remove();
}
}

/**
* When invoked transitively from {@link #returnBrokenResource(Object)}, swallows failures
* originating from the implicit "ensure-liveness" replacement that commons-pool2 2.13.x
* performs at the end of {@link GenericObjectPool#invalidateObject(Object)}. Direct callers
* (user code, {@link #addObjects(int)}) keep full error semantics.
*/
@Override
public void addObject() throws Exception {
if (Boolean.TRUE.equals(inReturnBrokenResource.get())) {
try {
super.addObject();
} catch (Exception e) {
if (log.isTraceEnabled()) {
log.trace("Suppressed addObject() failure during returnBrokenResource", e);
}
}
} else {
super.addObject();
}
}

Expand Down
50 changes: 42 additions & 8 deletions src/test/java/redis/clients/jedis/UnavailableConnectionTest.java
Original file line number Diff line number Diff line change
@@ -1,42 +1,77 @@
package redis.clients.jedis;

import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import io.redis.test.annotations.ConditionalOnEnv;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.util.EnvCondition;

import redis.clients.jedis.util.TestEnvUtil;

import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Verifies that broken connections are not returned to the pool and that {@link Jedis#close()} on a
* broken connection does not attempt to send {@code QUIT}.
* <p>
* The unavailable Redis instance is simulated through a Toxiproxy proxy fronting the
* {@code redis-unavailable-1} container. {@code redisProxy.disable()} closes any in-flight
* connection and refuses new ones, taking the place of issuing {@code SHUTDOWN} against a real
* Redis (the previous oss-source approach).
*/
@Tag("integration")
@ConditionalOnEnv(value = TestEnvUtil.ENV_OSS_SOURCE, enabled = true)
@ConditionalOnEnv(value = TestEnvUtil.ENV_OSS_DOCKER, enabled = true)
public class UnavailableConnectionTest {

@RegisterExtension
public static EnvCondition envCondition = new EnvCondition();

private static final HostAndPort unavailableNode = new HostAndPort("localhost", 6400);
private static final String PROXY_NAME = "redis-unavailable";
private static final String PROXY_LISTEN = "0.0.0.0:26400";
private static final String PROXY_UPSTREAM = "redis-unavailable-1:6400";

private static final ToxiproxyClient tp = new ToxiproxyClient("localhost", 8474);
private static Proxy redisProxy;
private static HostAndPort unavailableNode;

private static Logger log = LoggerFactory.getLogger(UnavailableConnectionTest.class);

@BeforeAll
public static void setup() {
setupAvoidQuitInDestroyObject();
public static void setup() throws IOException {
unavailableNode = Endpoints.getRedisEndpoint(PROXY_NAME).getHostAndPort();

try (Jedis j = new Jedis(unavailableNode)) {
j.shutdown();
if (tp.getProxyOrNull(PROXY_NAME) != null) {
tp.getProxy(PROXY_NAME).delete();
}
redisProxy = tp.createProxy(PROXY_NAME, PROXY_LISTEN, PROXY_UPSTREAM);

setupAvoidQuitInDestroyObject();

// Simulate the server going away: close existing connections and refuse new ones.
redisProxy.disable();
}

public static void cleanup() {
@AfterAll
public static void cleanup() throws IOException {
cleanupAvoidQuitInDestroyObject();
if (redisProxy != null) {
redisProxy.delete();
}
}

private static JedisPool poolForBrokenJedis1;
Expand Down Expand Up @@ -65,7 +100,6 @@ public void testAvoidQuitInDestroyObjectForBrokenConnection() throws Interrupted
assertFalse(threadForBrokenJedis1.isAlive());
assertTrue(brokenJedis1.isBroken());
brokenJedis1.close(); // we need capture/mock to test this properly

try {
poolForBrokenJedis1.getResource();
fail("Should not get connection from pool");
Expand Down
166 changes: 166 additions & 0 deletions src/test/java/redis/clients/jedis/util/PoolUnitTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package redis.clients.jedis.util;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Verifies the auto-replace suppression added to {@link Pool} for commons-pool2 2.13.x, which
* unconditionally invokes {@code addObject()} from {@code GenericObjectPool#invalidateObject} after
* destroying a broken resource. {@link Pool#returnBrokenResource(Object)} sets a thread-local flag
* across that call so {@link Pool#addObject()} can swallow the auto-replace failure when the
* upstream is down, while still propagating failures from direct user calls.
*/
public class PoolUnitTest {

/**
* Programmable factory used by the tests below.
* <ul>
* <li>{@code makeFailFromCallNumber} - {@code makeObject()} starts throwing from that call.</li>
* <li>{@code destroyThrows} - {@code destroyObject(...)} raises a RuntimeException.</li>
* </ul>
*/
private static final class TestFactory implements PooledObjectFactory<String> {
final AtomicInteger makeCount = new AtomicInteger();
final AtomicInteger destroyCount = new AtomicInteger();
volatile int makeFailFromCallNumber = Integer.MAX_VALUE;
volatile boolean destroyThrows = false;

@Override
public PooledObject<String> makeObject() {
int n = makeCount.incrementAndGet();
if (n >= makeFailFromCallNumber) {
throw new JedisConnectionException("upstream is down (call #" + n + ")");
}
return new DefaultPooledObject<>("resource-" + n);
}

@Override
public void destroyObject(PooledObject<String> p) {
destroyCount.incrementAndGet();
if (destroyThrows) {
throw new RuntimeException("destroy failed");
}
}

@Override
public boolean validateObject(PooledObject<String> p) {
return true;
}

@Override
public void activateObject(PooledObject<String> p) {
}

@Override
public void passivateObject(PooledObject<String> p) {
}
}

private TestFactory factory;
private Pool<String> pool;

@BeforeEach
public void setUp() {
factory = new TestFactory();
GenericObjectPoolConfig<String> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(2);
config.setBlockWhenExhausted(false);
pool = new Pool<>(factory, config);
}

@AfterEach
public void tearDown() {
if (pool != null && !pool.isClosed()) {
pool.close();
}
}

@Test
public void returnBrokenResource_swallowsAutoReplaceMakeObjectFailure() {
String resource = pool.getResource();
factory.makeFailFromCallNumber = 2;

assertDoesNotThrow(() -> pool.returnBrokenResource(resource));
assertEquals(1, factory.destroyCount.get());
assertEquals(2, factory.makeCount.get(),
"commons-pool2 2.13.x must trigger an auto-replace makeObject() after destroy");
}

@Test
public void returnBrokenResource_propagatesDestroyFailureAsJedisException() {
String resource = pool.getResource();
factory.destroyThrows = true;

JedisException ex = assertThrows(JedisException.class,
() -> pool.returnBrokenResource(resource));
assertTrue(ex.getCause().getMessage().contains("destroy failed"));
}

@Test
public void returnBrokenResource_propagatesIllegalStateForUnknownResource() {
JedisException ex = assertThrows(JedisException.class,
() -> pool.returnBrokenResource("never-borrowed"));
assertTrue(ex.getCause() instanceof IllegalStateException);
}

@Test
public void addObjectDirect_propagatesMakeObjectFailure() {
factory.makeFailFromCallNumber = 1;

assertThrows(JedisConnectionException.class, () -> pool.addObject());
}

@Test
public void addObjectsDirect_wrapsMakeObjectFailureAsJedisException() {
factory.makeFailFromCallNumber = 1;

JedisException ex = assertThrows(JedisException.class, () -> pool.addObjects(1));
assertTrue(ex.getCause() instanceof JedisConnectionException);
}

@Test
public void invalidateObjectDirect_doesNotSuppressAutoReplaceFailure() {
String resource = pool.getResource();
factory.makeFailFromCallNumber = 2;

assertThrows(JedisConnectionException.class, () -> pool.invalidateObject(resource),
"direct invalidateObject must propagate the auto-replace failure (flag is not set)");
}

@Test
public void suppressionFlagIsClearedAfterReturnBrokenResourceSuccess() {
String resource = pool.getResource();
pool.returnBrokenResource(resource);

factory.makeFailFromCallNumber = factory.makeCount.get() + 1;
assertThrows(JedisConnectionException.class, () -> pool.addObject(),
"subsequent addObject on the same thread must propagate; the flag must not leak");
}

@Test
public void suppressionFlagIsClearedAfterReturnBrokenResourceFailure() {
String resource = pool.getResource();
factory.destroyThrows = true;
assertThrows(JedisException.class, () -> pool.returnBrokenResource(resource));

factory.destroyThrows = false;
factory.makeFailFromCallNumber = factory.makeCount.get() + 1;
assertThrows(JedisConnectionException.class, () -> pool.addObject(),
"subsequent addObject on the same thread must propagate; the flag must not leak");
}
}
6 changes: 6 additions & 0 deletions src/test/resources/endpoints.json
Original file line number Diff line number Diff line change
Expand Up @@ -218,5 +218,11 @@
"rediss://localhost:8580",
"rediss://localhost:8581"
]
},
"redis-unavailable": {
"tls": false,
"endpoints": [
"redis://localhost:26400"
]
}
}
1 change: 1 addition & 0 deletions src/test/resources/env/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
- "8474:8474" # Admin API
- "29379:29379" # redis-failover-1 proxy
- "29380:29380" # redis-failover-2 proxy
- "26400:26400" # unavailable-1 proxy - proxy endpoint that can be used to simulate connection issues, backed by redis-unavailable-1
redis-failover-1:
<<: *client-libs-image
container_name: redis-failover-1
Expand Down
Loading