diff --git a/Cargo.toml b/Cargo.toml index a6aed05..721b5f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ tokio-comp = ["redis/tokio-rustls-comp"] default = ["async-std-comp"] [dependencies] -redis = { version = "0.32.2" } +redis = { version = "0.32.2", features = ["cluster-async"]} tokio = { version = "1.45.1", features = ["rt", "time"] } rand = "0.9.1" futures = "0.3.31" diff --git a/README.md b/README.md index b8ffcc7..5619baf 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ This is an implementation of Redlock, the [distributed locking mechanism](http:/ - Lock extending - Async runtime support (async-std and tokio) - Async redis +- Support for both standalone Redis and Redis Cluster ## Install @@ -45,9 +46,16 @@ async fn main() { "redis://127.0.0.1:6382/", ]; - // Initialize the LockManager using `new` + // Initialize the LockManager using `new` for standalone Redis let rl = LockManager::new(uris); + // For Redis Cluster, use: + // let cluster_uris = vec![ + // vec!["redis://127.0.0.1:7000/", "redis://127.0.0.1:7001/"], + // vec!["redis://127.0.0.1:7002/", "redis://127.0.0.1:7003/"], + // ]; + // let rl = LockManager::new_cluster(cluster_uris)?; + // Acquire a lock let lock = loop { if let Ok(lock) = rl @@ -73,6 +81,11 @@ async fn main() { } ``` +## Locking Behavior + +- **Single cluster**: Simple Redis lock (non-distributed, no quorum) +- **Multiple clusters**: Distributed Redlock (quorum-based, N≥3) + ## Extending Locks Extending a lock effectively renews its duration instead of adding extra time to it. For instance, if a 1000ms lock is extended by 1000ms after 500ms pass, it will only last for a total of 1500ms, not 2000ms. This approach is consistent with the [Node.js Redlock implementation](https://www.npmjs.com/package/redlock). See the [extend script](https://github.com/hexcowboy/rslock/blob/main/src/lock.rs#L22-L30). @@ -87,7 +100,9 @@ cargo test --all-features ## Examples -Start the redis servers mentioned in the example code: +### Basic Examples + +Start the redis servers: ```bash docker compose -f examples/docker-compose.yml up -d @@ -107,6 +122,15 @@ Stop the redis servers: docker compose -f examples/docker-compose.yml down ``` +### Cluster Examples + +Test single-cluster (simple lock) and multi-cluster (Redlock) behavior: + +```bash +# Executes both single and multi-cluster examples +docker compose -f examples/docker-compose-cluster.yml up --build +``` + ## Contribute If you find bugs or want to help otherwise, please [open an issue](https://github.com/hexcowboy/rslock/issues). diff --git a/examples/Dockerfile.test b/examples/Dockerfile.test new file mode 100644 index 0000000..11d481c --- /dev/null +++ b/examples/Dockerfile.test @@ -0,0 +1,3 @@ +FROM rust:alpine +RUN apk add --no-cache musl-dev +WORKDIR /app \ No newline at end of file diff --git a/examples/docker-compose-cluster.yml b/examples/docker-compose-cluster.yml new file mode 100644 index 0000000..d926659 --- /dev/null +++ b/examples/docker-compose-cluster.yml @@ -0,0 +1,99 @@ +version: '3.8' + +networks: + cluster-net: + +services: + # CLUSTER 1 + c1-n1: + image: redis:7-alpine + container_name: c1-n1 + ports: ["7000:6379"] + command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --appendonly yes + networks: [cluster-net] + + c1-n2: + image: redis:7-alpine + container_name: c1-n2 + ports: ["7001:6379"] + command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --appendonly yes + networks: [cluster-net] + + c1-n3: + image: redis:7-alpine + container_name: c1-n3 + ports: ["7002:6379"] + command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --appendonly yes + networks: [cluster-net] + + # CLUSTER 2 + c2-n1: + image: redis:7-alpine + container_name: c2-n1 + ports: ["7100:6379"] + command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --appendonly yes + networks: [cluster-net] + + c2-n2: + image: redis:7-alpine + container_name: c2-n2 + ports: ["7101:6379"] + command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --appendonly yes + networks: [cluster-net] + + c2-n3: + image: redis:7-alpine + container_name: c2-n3 + ports: ["7102:6379"] + command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --appendonly yes + networks: [cluster-net] + + # CLUSTER 3 + c3-n1: + image: redis:7-alpine + container_name: c3-n1 + ports: ["7200:6379"] + command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --appendonly yes + networks: [cluster-net] + + c3-n2: + image: redis:7-alpine + container_name: c3-n2 + ports: ["7201:6379"] + command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --appendonly yes + networks: [cluster-net] + + c3-n3: + image: redis:7-alpine + container_name: c3-n3 + ports: ["7202:6379"] + command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --appendonly yes + networks: [cluster-net] + + # INIT + cluster-init: + image: redis:7-alpine + depends_on: [c1-n1, c1-n2, c1-n3, c2-n1, c2-n2, c2-n3, c3-n1, c3-n2, c3-n3] + entrypoint: ["/bin/sh", "-c"] + command: + - | + sleep 5 + redis-cli --cluster create c1-n1:6379 c1-n2:6379 c1-n3:6379 --cluster-replicas 0 --cluster-yes + redis-cli --cluster create c2-n1:6379 c2-n2:6379 c2-n3:6379 --cluster-replicas 0 --cluster-yes + redis-cli --cluster create c3-n1:6379 c3-n2:6379 c3-n3:6379 --cluster-replicas 0 --cluster-yes + networks: [cluster-net] + restart: "no" + + # TEST RUNNER + test-runner: + build: + context: .. + dockerfile: examples/Dockerfile.test + depends_on: + cluster-init: + condition: service_completed_successfully + volumes: ["..:/app"] + working_dir: /app + networks: [cluster-net] + command: sh -c "sleep 2 && cargo run --example single_cluster && cargo run --example multi_cluster" + restart: "no" diff --git a/examples/multi_cluster.rs b/examples/multi_cluster.rs new file mode 100644 index 0000000..1fb3632 --- /dev/null +++ b/examples/multi_cluster.rs @@ -0,0 +1,93 @@ +use rslock::LockManager; +use std::time::Duration; +use tokio::time::sleep; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("REDLOCK TEST (Three Clusters)\n"); + + let lm = LockManager::new_cluster(vec![ + vec!["redis://c1-n1:6379"], + vec!["redis://c2-n1:6379"], + vec!["redis://c3-n1:6379"], + ])?; + + // Test 1: Basic Redlock + println!("Test 1: Basic Redlock (quorum=2/3)"); + let lock = lm.lock(b"resource:{1}", Duration::from_secs(5)).await?; + println!("Lock acquired on quorum: validity={}ms", lock.validity_time); + lm.unlock(&lock).await; + println!("Lock released from all clusters\n"); + + // Test 2: Concurrent lock attempt (Redlock should prevent) + println!("Test 2: Concurrent Redlock attempt"); + let lock1 = lm.lock(b"resource:{2}", Duration::from_secs(5)).await?; + println!("Lock1 acquired on quorum"); + + let lm2 = LockManager::new_cluster(vec![ + vec!["redis://c1-n1:6379"], + vec!["redis://c2-n1:6379"], + vec!["redis://c3-n1:6379"], + ])?; + + match lm2.lock(b"resource:{2}", Duration::from_secs(5)).await { + Ok(_) => println!("Lock2 should NOT get quorum!"), + Err(_) => println!("Lock2 correctly failed (no quorum)"), + } + + lm.unlock(&lock1).await; + println!("Lock1 released from all clusters\n"); + + // Test 3: Lock expiration across clusters + println!("Test 3: Lock expiration (all clusters)"); + let lock = lm.lock(b"resource:{3}", Duration::from_millis(500)).await?; + println!("Lock acquired on quorum: validity={}ms", lock.validity_time); + sleep(Duration::from_millis(600)).await; + println!("Lock expired across all clusters\n"); + + // Test 4: Re-acquire after expiration + println!("Test 4: Re-acquire after expiration"); + let lock = lm.lock(b"resource:{3}", Duration::from_secs(5)).await?; + println!("Lock re-acquired successfully on quorum"); + lm.unlock(&lock).await; + println!("Lock released\n"); + + // Test 5: Multiple resources (Redlock for each) + println!("Test 5: Multiple independent Redlocks"); + let lock_a = lm.lock(b"resource:{5a}", Duration::from_secs(5)).await?; + let lock_b = lm.lock(b"resource:{5b}", Duration::from_secs(5)).await?; + let lock_c = lm.lock(b"resource:{5c}", Duration::from_secs(5)).await?; + println!("Three Redlocks acquired (each has quorum)"); + lm.unlock(&lock_a).await; + lm.unlock(&lock_b).await; + lm.unlock(&lock_c).await; + println!("All Redlocks released\n"); + + // Test 6: Validity time check + println!("Test 6: Validity time validation"); + let lock = lm.lock(b"resource:{6}", Duration::from_secs(10)).await?; + let validity = lock.validity_time; + println!("Lock acquired: validity={}ms", validity); + + if validity > 9000 && validity < 10000 { + println!("Validity time is reasonable (9-10s)"); + } else { + println!("Validity time unexpected: {}ms", validity); + } + + lm.unlock(&lock).await; + println!("Lock released\n"); + + // Test 7: Quick successive locks + println!("Test 7: Quick successive locks"); + for i in 0..5 { + let resource = format!("resource:{{7:{}}}", i); + let lock = lm.lock(resource.as_bytes(), Duration::from_secs(2)).await?; + println!(" Lock {} acquired", i); + lm.unlock(&lock).await; + } + println!("All successive locks worked\n"); + + println!("REDLOCK: All tests passed!\n"); + Ok(()) +} \ No newline at end of file diff --git a/examples/single_cluster.rs b/examples/single_cluster.rs new file mode 100644 index 0000000..4581274 --- /dev/null +++ b/examples/single_cluster.rs @@ -0,0 +1,69 @@ +use rslock::LockManager; +use std::time::Duration; +use tokio::time::sleep; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("SIMPLE REDIS LOCK TEST (Single Cluster)\n"); + + let lm = LockManager::new_cluster(vec![ + vec!["redis://c1-n1:6379"], + ])?; + + // Test 1: Basic lock/unlock + println!("Test 1: Basic lock/unlock"); + let lock = lm.lock(b"resource:{1}", Duration::from_secs(5)).await?; + println!("Lock acquired: validity={}ms", lock.validity_time); + lm.unlock(&lock).await; + println!("Lock released\n"); + + // Test 2: Lock expiration + println!("Test 2: Lock expiration"); + let lock = lm.lock(b"resource:{2}", Duration::from_millis(500)).await?; + println!("Lock acquired: validity={}ms", lock.validity_time); + sleep(Duration::from_millis(600)).await; + println!("⏰ Lock expired (no unlock needed)\n"); + + // Test 3: Concurrent lock attempt + println!("Test 3: Concurrent lock (should fail)"); + let lock1 = lm.lock(b"resource:{3}", Duration::from_secs(5)).await?; + println!("Lock1 acquired by first client"); + + let lm2 = LockManager::new_cluster(vec![ + vec!["redis://c1-n1:6379"], + ])?; + + match lm2.lock(b"resource:{3}", Duration::from_secs(5)).await { + Ok(_) => println!("Lock2 should NOT succeed!"), + Err(_) => println!("Lock2 correctly rejected"), + } + + lm.unlock(&lock1).await; + println!("Lock1 released\n"); + + // Test 4: Re-acquire after unlock + println!("Test 4: Re-acquire after unlock"); + let lock = lm.lock(b"resource:{4}", Duration::from_secs(5)).await?; + println!("Lock acquired"); + lm.unlock(&lock).await; + println!("Lock released"); + + let lock2 = lm.lock(b"resource:{4}", Duration::from_secs(5)).await?; + println!("Lock re-acquired successfully"); + lm.unlock(&lock2).await; + println!("Lock released\n"); + + // Test 5: Multiple resources + println!("Test 5: Multiple independent resources"); + let lock_a = lm.lock(b"resource:{5a}", Duration::from_secs(5)).await?; + let lock_b = lm.lock(b"resource:{5b}", Duration::from_secs(5)).await?; + let lock_c = lm.lock(b"resource:{5c}", Duration::from_secs(5)).await?; + println!("Three locks acquired simultaneously"); + lm.unlock(&lock_a).await; + lm.unlock(&lock_b).await; + lm.unlock(&lock_c).await; + println!("All locks released\n"); + + println!("SIMPLE REDIS LOCK: All tests passed!\n"); + Ok(()) +} \ No newline at end of file diff --git a/src/lock.rs b/src/lock.rs index 57d9f02..4f37896 100644 --- a/src/lock.rs +++ b/src/lock.rs @@ -1,13 +1,15 @@ +use std::fmt::Debug; use std::io; use std::sync::Arc; use std::time::{Duration, Instant}; use futures::future::join_all; use rand::{rng, Rng, RngCore}; -use redis::aio::MultiplexedConnection; +use redis::aio::{ConnectionLike, MultiplexedConnection}; use redis::Value::Okay; -use redis::{Client, IntoConnectionInfo, RedisError, RedisResult, Value}; - +use redis::{Client, Cmd, IntoConnectionInfo, Pipeline, RedisError, RedisFuture, RedisResult, Value}; +use redis::cluster::ClusterClient; +use redis::cluster_async::ClusterConnection; use crate::resource::{LockResource, ToLockResource}; const DEFAULT_RETRY_COUNT: u32 = 3; @@ -68,6 +70,95 @@ pub enum LockError { type Mutex = tokio::sync::Mutex; type MutexGuard<'a, K> = tokio::sync::MutexGuard<'a, K>; +/// Connection type supporting both standalone and cluster modes +#[derive(Clone)] +enum Connection { + Standalone(MultiplexedConnection), + Cluster(ClusterConnection), +} + +impl Debug for Connection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Connection::Standalone(_) => write!(f, "Connection::Standalone(..)"), + Connection::Cluster(_) => write!(f, "Connection::Cluster(..)"), + } + } +} + +impl ConnectionLike for Connection { + fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> { + match self { + Connection::Standalone(conn) => conn.req_packed_command(cmd), + Connection::Cluster(conn) => conn.req_packed_command(cmd), + } + } + + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a Pipeline, + offset: usize, + count: usize, + ) -> RedisFuture<'a, Vec> { + match self { + Connection::Standalone(conn) => conn.req_packed_commands(cmd, offset, count), + Connection::Cluster(conn) => conn.req_packed_commands(cmd, offset, count), + } + } + + fn get_db(&self) -> i64 { + match self { + Connection::Standalone(conn) => conn.get_db(), + Connection::Cluster(conn) => conn.get_db(), + } + } +} + +/// Client information supporting both standalone and cluster modes +enum ClientInfo { + Standalone(Client), + Cluster(ClusterClient), +} + +impl ClientInfo { + async fn get_connection(&self) -> Result { + match self { + ClientInfo::Standalone(client) => { + let conn = client + .get_multiplexed_async_connection() + .await + .map_err(LockError::Redis)?; + Ok(Connection::Standalone(conn)) + } + ClientInfo::Cluster(client) => { + let conn = client + .get_async_connection() + .await + .map_err(LockError::Redis)?; + Ok(Connection::Cluster(conn)) + } + } + } +} + +impl Clone for ClientInfo { + fn clone(&self) -> Self { + match self { + ClientInfo::Standalone(client) => ClientInfo::Standalone(client.clone()), + ClientInfo::Cluster(client) => ClientInfo::Cluster(client.clone()), + } + } +} + +impl Debug for ClientInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ClientInfo::Standalone(_) => write!(f, "ClientInfo::Standalone(..)"), + ClientInfo::Cluster(_) => write!(f, "ClientInfo::Cluster(..)"), + } + } +} + /// The lock manager. /// /// Implements the necessary functionality to acquire and release locks @@ -93,27 +184,29 @@ impl LockManagerInner { #[derive(Debug, Clone)] struct RestorableConnection { - client: Client, - con: Arc>>, + client_info: ClientInfo, + con: Arc>>, } impl RestorableConnection { - pub fn new(client: Client) -> Self { + pub fn new_standalone(client: Client) -> Self { + Self { + client_info: ClientInfo::Standalone(client), + con: Arc::new(Mutex::new(None)), + } + } + + pub fn new_cluster(client: ClusterClient) -> Self { Self { - client, - con: Arc::new(tokio::sync::Mutex::new(None)), + client_info: ClientInfo::Cluster(client), + con: Arc::new(Mutex::new(None)), } } - pub async fn get_connection(&mut self) -> Result { + pub async fn get_connection(&mut self) -> Result { let mut lock = self.con.lock().await; if lock.is_none() { - *lock = Some( - self.client - .get_multiplexed_async_connection() - .await - .map_err(LockError::Redis)?, - ); + *lock = Some(self.client_info.get_connection().await?); } match (*lock).clone() { Some(conn) => Ok(conn), @@ -127,12 +220,7 @@ impl RestorableConnection { Ok(()) } else { let mut lock = self.con.lock().await; - *lock = Some( - self.client - .get_multiplexed_async_connection() - .await - .map_err(LockError::Redis)?, - ); + *lock = Some(self.client_info.get_connection().await?); Ok(()) } } @@ -265,9 +353,7 @@ impl Drop for LockGuard { } impl LockManager { - /// Create a new lock manager instance, defined by the given Redis connection uris. - /// - /// Sample URI: `"redis://127.0.0.1:6379"` + /// Create a new lock manager for standalone Redis instances pub fn new(uris: Vec) -> LockManager { let servers: Vec = uris .into_iter() @@ -277,11 +363,64 @@ impl LockManager { Self::from_clients(servers) } - /// Create a new lock manager instance, defined by the given Redis clients. - /// Quorum is defined to be N/2+1, with N being the number of given Redis instances. + /// Create a new lock manager for Redis Cluster + /// + /// `uris`: list of clusters; each inner list contains the startup node URLs of **one** cluster. + /// + /// **Important** + /// - Passing **one cluster** -> this is **not Redlock**; it's a **single-store simple Redis lock**. + /// - Passing **multiple independent clusters** (`uris.len() > 1`) -> **Redlock quorum** (≥ N/2+1). + /// + /// Example: + /// ```rust + /// // Single cluster (simple Redis lock, NOT Redlock) + /// use rslock::LockManager; + /// let lm = LockManager::new_cluster(vec![vec![ + /// "redis://node-a1:6379", "redis://node-a2:6379" + /// ]])?; + /// + /// // Multiple independent clusters (Redlock with quorum) + /// let lm = LockManager::new_cluster(vec![ + /// vec!["redis://a1:6379","redis://a2:6379"], + /// vec!["redis://b1:6379","redis://b2:6379"], + /// vec!["redis://c1:6379","redis://c2:6379"], + /// ])?; + /// ``` + pub fn new_cluster(uris: Vec>) -> Result { + let clients: Result, _> = uris + .into_iter() + .map(|cluster_uris| { + ClusterClient::builder(cluster_uris) + .retries(3) + .build() + .map_err(LockError::Redis) + }) + .collect(); + + Ok(Self::from_cluster_clients(clients?)) + } + + /// Create from standalone clients pub fn from_clients(clients: Vec) -> LockManager { - let clients: Vec = - clients.into_iter().map(RestorableConnection::new).collect(); + let clients: Vec = clients + .into_iter() + .map(RestorableConnection::new_standalone) + .collect(); + + LockManager { + lock_manager_inner: Arc::new(Mutex::new(LockManagerInner { servers: clients })), + retry_count: DEFAULT_RETRY_COUNT, + retry_delay: DEFAULT_RETRY_DELAY, + } + } + + /// Create from cluster clients + pub fn from_cluster_clients(clients: Vec) -> LockManager { + let clients: Vec = clients + .into_iter() + .map(RestorableConnection::new_cluster) + .collect(); + LockManager { lock_manager_inner: Arc::new(Mutex::new(LockManagerInner { servers: clients })), retry_count: DEFAULT_RETRY_COUNT,