diff --git a/Cargo.lock b/Cargo.lock index 2e91011c7..b955ff73c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1482,9 +1482,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4850db49bf08e663084f7fb5c87d202ef91a3907271aff24a94eb97ff039153c" +checksum = "f1dd6dbb5841937940781866fa1281a1ff7bd3bf827091440879f9994983d5c2" dependencies = [ "block-buffer 0.12.0", "const-oid 0.10.2", @@ -2070,7 +2070,7 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6303bc9732ae41b04cb554b844a762b4115a61bfaa81e3e83050991eeb56863f" dependencies = [ - "digest 0.11.2", + "digest 0.11.3", ] [[package]] @@ -2720,7 +2720,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69b6441f590336821bb897fb28fc622898ccceb1d6cea3fde5ea86b090c4de98" dependencies = [ "cfg-if", - "digest 0.11.2", + "digest 0.11.3", ] [[package]] @@ -2974,15 +2974,14 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "openssl" -version = "0.10.78" +version = "0.10.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f38c4372413cdaaf3cc79dd92d29d7d9f5ab09b51b10dded508fb90bb70b9222" +checksum = "bf0b434746ee2832f4f0baf10137e1cabb18cbe6912c69e2e33263c45250f542" dependencies = [ "bitflags 2.11.1", "cfg-if", "foreign-types", "libc", - "once_cell", "openssl-macros", "openssl-sys", ] @@ -3006,9 +3005,9 @@ checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "openssl-sys" -version = "0.9.114" +version = "0.9.115" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13ce1245cd07fcc4cfdb438f7507b0c7e4f3849a69fd84d52374c66d83741bb6" +checksum = "158fe5b292746440aa6e7a7e690e55aeb72d41505e2804c23c6973ad0e9c9781" dependencies = [ "cc", "libc", @@ -4513,7 +4512,7 @@ checksum = "446ba717509524cb3f22f17ecc096f10f4822d76ab5c0b9822c5f9c284e825f4" dependencies = [ "cfg-if", "cpufeatures 0.3.0", - "digest 0.11.2", + "digest 0.11.3", ] [[package]] @@ -4596,9 +4595,9 @@ checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" [[package]] name = "siphasher" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" +checksum = "8ee5873ec9cce0195efcb7a4e9507a04cd49aec9c83d0389df45b1ef7ba2e649" [[package]] name = "slab" diff --git a/integration/vault/docker-compose.yml b/integration/vault/docker-compose.yml new file mode 100644 index 000000000..5668cd1b1 --- /dev/null +++ b/integration/vault/docker-compose.yml @@ -0,0 +1,32 @@ +services: + postgres: + image: postgres:18 + environment: + POSTGRES_DB: pgdog + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + ports: + - "15432:5432" + volumes: + - ./setup.sql:/docker-entrypoint-initdb.d/setup.sql + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 2s + timeout: 2s + retries: 10 + + vault: + image: hashicorp/vault:1.17 + environment: + VAULT_DEV_ROOT_TOKEN_ID: "root-token" + VAULT_DEV_LISTEN_ADDRESS: "0.0.0.0:8200" + VAULT_ADDR: "http://0.0.0.0:8200" + ports: + - "18200:8200" + cap_add: + - IPC_LOCK + healthcheck: + test: ["CMD", "vault", "status"] + interval: 2s + timeout: 2s + retries: 10 diff --git a/integration/vault/pgdog.toml b/integration/vault/pgdog.toml new file mode 100644 index 000000000..82c09a947 --- /dev/null +++ b/integration/vault/pgdog.toml @@ -0,0 +1,34 @@ +#:schema ./.schema/pgdog.schema.json +# +# PgDog configuration. +[general] +host = "0.0.0.0" +port = 6432 +workers = 2 +default_pool_size = 10 +min_pool_size = 1 +pooler_mode = "transaction" +tls_verify = "disabled" +shutdown_timeout = 60_000 +shutdown_termination_timeout = 60_000 +openmetrics_port = 9090 +openmetrics_namespace = "pgdog_" +log_format = "json" +log_level = "info" +passthrough_auth = "disabled" +connect_attempt_delay = 1_000 +auth_type = "scram" + +[[databases]] +name = "pgdog" +host = "127.0.0.1" +port = 15432 +role = "primary" + +[vault] +address = "http://127.0.0.1:18200" +auth_method = "app_role" +tls_verify = "disable" +# Run integration/vault/setup-vault.sh and paste the output here. +role_id = "" +secret_id = "" diff --git a/integration/vault/setup-vault.sh b/integration/vault/setup-vault.sh new file mode 100755 index 000000000..6a9fa9e18 --- /dev/null +++ b/integration/vault/setup-vault.sh @@ -0,0 +1,94 @@ +#!/usr/bin/env bash +set -euo pipefail + +## Configure Vault's database secrets engine for the pgdog vault integration. +## Run after docker compose up. + +export VAULT_ADDR="http://127.0.0.1:18200" +export VAULT_TOKEN="root-token" + +echo "==> Enabling database secrets engine..." +vault secrets enable database 2>/dev/null || echo " (already enabled)" + +echo "==> Configuring PostgreSQL connection..." +vault write database/config/pgdog \ + plugin_name=postgresql-database-plugin \ + allowed_roles="pgdog-admin,dml-role,ddl-role,readonly-role" \ + connection_url="postgresql://{{username}}:{{password}}@host.docker.internal:15432/pgdog?sslmode=disable" \ + username="postgres" \ + password="postgres" + +echo "==> Creating DML role (30m TTL)..." +vault write database/roles/dml-role \ + db_name=pgdog \ + creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}' IN ROLE \"dml_role\";" \ + revocation_statements="DROP ROLE IF EXISTS \"{{name}}\";" \ + default_ttl="30m" \ + max_ttl="1h" + +echo "==> Creating DDL role (30m TTL)..." +vault write database/roles/ddl-role \ + db_name=pgdog \ + creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}' IN ROLE \"ddl_role\";" \ + revocation_statements="DROP ROLE IF EXISTS \"{{name}}\";" \ + default_ttl="30m" \ + max_ttl="1h" + +echo "==> Creating readonly role (30m TTL)..." +vault write database/roles/readonly-role \ + db_name=pgdog \ + creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}' IN ROLE \"readonly_role\";" \ + revocation_statements="DROP ROLE IF EXISTS \"{{name}}\";" \ + default_ttl="30m" \ + max_ttl="1h" + +echo "==> Creating pgdog-admin role (for pg_authid access, 30m TTL)..." +vault write database/roles/pgdog-admin \ + db_name=pgdog \ + creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}' IN ROLE \"pgdog_admin_role\"; GRANT pg_read_all_data TO \"{{name}}\";" \ + revocation_statements="DROP ROLE IF EXISTS \"{{name}}\";" \ + default_ttl="30m" \ + max_ttl="1h" + +echo "==> Enabling AppRole auth..." +vault auth enable approle 2>/dev/null || echo " (already enabled)" + +echo "==> Creating pgdog AppRole..." +vault write auth/approle/role/pgdog \ + token_ttl=1h \ + token_max_ttl=4h \ + token_policies="pgdog-policy" + +echo "==> Creating pgdog policy..." +vault policy write pgdog-policy - <<'POLICY' +path "database/creds/*" { + capabilities = ["read"] +} + +# Renew leases +path "sys/leases/renew" { + capabilities = ["update"] +} +POLICY + +echo "==> Fetching AppRole credentials..." +ROLE_ID=$(vault read -field=role_id auth/approle/role/pgdog/role-id) +SECRET_ID=$(vault write -field=secret_id -f auth/approle/role/pgdog/secret-id) + +echo "" +echo "============================================" +echo " Vault setup complete!" +echo "============================================" +echo "" +echo " AppRole role_id: $ROLE_ID" +echo " AppRole secret_id: $SECRET_ID" +echo "" +echo " Test credential generation:" +echo " vault read database/creds/dml-role" +echo " vault read database/creds/ddl-role" +echo " vault read database/creds/readonly-role" +echo "" +echo " To use with pgdog, update integration/vault/pgdog.toml with:" +echo " role_id = \"$ROLE_ID\"" +echo " secret_id = \"$SECRET_ID\"" +echo "" diff --git a/integration/vault/setup.sql b/integration/vault/setup.sql new file mode 100644 index 000000000..883adabf5 --- /dev/null +++ b/integration/vault/setup.sql @@ -0,0 +1,32 @@ +-- Creates the parent roles that Vault's IN ROLE clause references. + +-- DML role: read/write on application tables +CREATE ROLE dml_role NOLOGIN; +GRANT SELECT,INSERT,UPDATE,DELETE ON ALL TABLES IN SCHEMA public TO dml_role; +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT,INSERT,UPDATE,DELETE ON TABLES TO dml_role; +GRANT USAGE,SELECT,UPDATE ON ALL SEQUENCES IN SCHEMA public TO dml_role; +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT USAGE,SELECT,UPDATE ON SEQUENCES TO dml_role; + +-- DDL role: schema migrations +CREATE ROLE ddl_role NOLOGIN; +GRANT ALL ON SCHEMA public TO ddl_role; +GRANT ALL ON ALL TABLES IN SCHEMA public TO ddl_role; +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO ddl_role; + +-- Readonly role: analytics/reporting +CREATE ROLE readonly_role NOLOGIN; +GRANT SELECT ON ALL TABLES IN SCHEMA public TO readonly_role; +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO readonly_role; + +-- Admin role: referenced by the pgdog-admin Vault role in setup-vault.sh +CREATE ROLE pgdog_admin_role NOLOGIN; +GRANT pg_read_all_data TO pgdog_admin_role; + +-- Create a sample table for testing +CREATE TABLE IF NOT EXISTS demo_items ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +INSERT INTO demo_items (name) VALUES ('item-1'), ('item-2'), ('item-3'); diff --git a/integration/vault/users.toml b/integration/vault/users.toml new file mode 100644 index 000000000..c0709e99e --- /dev/null +++ b/integration/vault/users.toml @@ -0,0 +1,17 @@ +#:schema ./.schema/users.schema.json +# +# Basic users configuration. +# +[[users]] +name = "dml_role" +password = "pgdog" +database = "pgdog" +server_auth = "vault" +vault_path = "database/creds/dml-role" + +[[users]] +name = "ddl_role" +password = "pgdog" +database = "pgdog" +server_auth = "vault" +vault_path = "database/creds/ddl-role" diff --git a/pgdog-config/src/core.rs b/pgdog-config/src/core.rs index ecc1bb4ec..d49075b10 100644 --- a/pgdog-config/src/core.rs +++ b/pgdog-config/src/core.rs @@ -23,6 +23,7 @@ use super::replication::{MirrorConfig, Mirroring, MirroringLevel, ReplicaLag, Re use super::rewrite::Rewrite; use super::sharding::{ManualQuery, OmnishardedTables, ShardedMapping, ShardedTable}; use super::users::{Admin, Plugin, Users}; +use super::vault::VaultConfig; #[derive(Debug, Clone, PartialEq)] pub struct ConfigAndUsers { @@ -222,6 +223,13 @@ pub struct Config { #[schemars(default = "crate::users::Admin::schemars_default_stub")] pub admin: Admin, + /// Vault integration for dynamic credential rotation. + /// + /// When set, pgdog fetches PostgreSQL credentials from Vault for any pool + /// that has `vault_path` configured in `users.toml`. + #[serde(default)] + pub vault: Option, + /// To detect and route queries with sharding keys, PgDog expects the sharded column to be specified in the configuration. /// /// https://docs.pgdog.dev/configuration/pgdog.toml/sharded_tables/ diff --git a/pgdog-config/src/lib.rs b/pgdog-config/src/lib.rs index 1f5af5964..ee7c5e27b 100644 --- a/pgdog-config/src/lib.rs +++ b/pgdog-config/src/lib.rs @@ -17,6 +17,7 @@ pub mod system_catalogs; pub mod url; pub mod users; pub mod util; +pub mod vault; pub use auth::{AuthType, PassthroughAuth}; pub use core::{Config, ConfigAndUsers}; @@ -36,6 +37,7 @@ pub use rewrite::{Rewrite, RewriteMode}; pub use sharding::*; pub use system_catalogs::system_catalogs; pub use users::{Admin, Plugin, ServerAuth, User, Users}; +pub use vault::{VaultAuthMethod, VaultConfig, VaultTlsVerify}; use std::time::Duration; diff --git a/pgdog-config/src/users.rs b/pgdog-config/src/users.rs index f855d5535..bd4850644 100644 --- a/pgdog-config/src/users.rs +++ b/pgdog-config/src/users.rs @@ -150,6 +150,10 @@ pub enum ServerAuth { RdsIam, /// Generate an Azure Workload Identity auth token per connection attempt. AzureWorkloadIdentity, + /// Credentials are managed by the Vault integration. pgdog fetches + /// `server_user` and `server_password` from Vault at runtime; never + /// falls back to static client passwords for backend connections. + Vault, } impl ServerAuth { @@ -284,6 +288,13 @@ pub struct User { pub two_phase_commit_auto: Option, /// Server connections older than this (in milliseconds) will be closed when returned to the pool. pub server_lifetime: Option, + /// Vault database credential path for this pool, e.g. `database/creds/dml-role`. + /// + /// When set, pgdog manages backend credentials via Vault: it fetches a fresh + /// username/password from this path and rotates them before the lease expires. + /// The `[vault]` block in `pgdog.toml` must also be configured. + #[serde(default)] + pub vault_path: Option, } impl User { diff --git a/pgdog-config/src/vault.rs b/pgdog-config/src/vault.rs new file mode 100644 index 000000000..9b43638a1 --- /dev/null +++ b/pgdog-config/src/vault.rs @@ -0,0 +1,316 @@ +use std::fs::read_to_string; +use std::io::{self, Error, ErrorKind}; +use std::path::{Path, PathBuf}; + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// How pgdog verifies the Vault server's TLS certificate. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema)] +#[serde(rename_all = "kebab-case")] +pub enum VaultTlsVerify { + /// Skip all TLS verification. Accepts any certificate or hostname. + /// Use only in development; never in production. + Disable, + /// Verify the server certificate is signed by a trusted CA, but do not + /// check that the hostname matches the certificate's CN / SANs. + VerifyCa, + /// Full TLS verification: trusted CA **and** hostname match (default). + #[default] + VerifyFull, +} + +/// Vault authentication method used by pgdog to obtain a Vault token. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum VaultAuthMethod { + /// AppRole authentication — provide `role_id` and `secret_id` / `secret_id_file`. + #[default] + AppRole, + /// Kubernetes authentication — pgdog presents the pod's service account JWT + /// to Vault, which validates it against the Kubernetes API server. + Kubernetes, +} + +/// Settings for Vault integration. +/// +/// When configured, pgdog fetches dynamic PostgreSQL credentials from Vault +/// for pools that have `vault_path` set in `users.toml`, rotating them +/// automatically before expiry. +/// +/// # AppRole example +/// ```toml +/// [vault] +/// address = "https://vault.example.com:8200" +/// auth_method = "app_role" +/// role_id = "pgdog-role-id" +/// secret_id = "..." +/// ``` +/// +/// # Kubernetes example +/// ```toml +/// [vault] +/// address = "https://vault.example.com:8200" +/// auth_method = "kubernetes" +/// kubernetes_role = "pgdog" +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct VaultConfig { + /// Vault server address, e.g. `https://vault.example.com:8200`. + pub address: String, + + /// Authentication method used to obtain a Vault token (default: `approle`). + #[serde(default)] + pub auth_method: VaultAuthMethod, + + /// Fetch fresh credentials when this percentage of the lease TTL has elapsed. + /// Must be between 1 and 99. Default: 75. + #[serde(default = "VaultConfig::default_pre_rotation_pct")] + pub pre_rotation_pct: u8, + + // ── AppRole fields ──────────────────────────────────────────────────────── + /// AppRole `role_id`. Required when `auth_method = "approle"`. + pub role_id: Option, + + /// AppRole `secret_id` provided inline. Mutually exclusive with `secret_id_file`. + pub secret_id: Option, + + /// Path to a file containing the AppRole `secret_id` (e.g. a Kubernetes secret + /// mounted at `/etc/pgdog/vault-secret-id`). Mutually exclusive with `secret_id`. + pub secret_id_file: Option, + + // ── Kubernetes fields ───────────────────────────────────────────────────── + /// Vault role name to authenticate as. Required when `auth_method = "kubernetes"`. + pub kubernetes_role: Option, + + /// Path to the Kubernetes service account JWT token file. + /// Default: `/var/run/secrets/kubernetes.io/serviceaccount/token`. + #[serde(default = "VaultConfig::default_kubernetes_jwt_path")] + pub kubernetes_jwt_path: PathBuf, + + /// Vault mount path for the Kubernetes auth method. Default: `kubernetes`. + #[serde(default = "VaultConfig::default_kubernetes_mount_path")] + pub kubernetes_mount_path: String, + + // ── TLS fields ──────────────────────────────────────────────────────────── + /// TLS verification mode for connections to the Vault server. + /// Default: `verify-full`. + #[serde(default)] + pub tls_verify: VaultTlsVerify, + + /// Path to a PEM-encoded CA certificate bundle used to verify the Vault + /// server's TLS certificate. When provided, the bundle is added on top of + /// the system-wide certificate store. When omitted, only system certs are used. + pub tls_server_ca_certificate: Option, +} + +impl VaultConfig { + fn default_pre_rotation_pct() -> u8 { + 75 + } + + pub fn default_kubernetes_jwt_path() -> PathBuf { + PathBuf::from("/var/run/secrets/kubernetes.io/serviceaccount/token") + } + + pub fn default_kubernetes_mount_path() -> String { + "kubernetes".into() + } + + /// Returns the AppRole `secret_id`, reading from `secret_id_file` if necessary. + /// Trims whitespace so files written with a trailing newline work out of the box. + pub fn secret_id(&self) -> io::Result { + if let Some(ref s) = self.secret_id { + return Ok(s.clone()); + } + if let Some(ref path) = self.secret_id_file { + return read_to_string(path).map(|s| s.trim().to_string()); + } + Err(Error::new( + ErrorKind::NotFound, + "vault: neither secret_id nor secret_id_file is configured", + )) + } + + /// Returns the path to the Kubernetes service account JWT. + /// Callers should read the file asynchronously via `tokio::fs::read_to_string`. + pub fn jwt_path(&self) -> &Path { + &self.kubernetes_jwt_path + } +} + +#[cfg(test)] +mod tests { + use std::path::Path; + + use super::*; + + fn approle_base() -> VaultConfig { + VaultConfig { + address: "http://127.0.0.1:8200".into(), + auth_method: VaultAuthMethod::AppRole, + pre_rotation_pct: 75, + role_id: Some("test-role".into()), + secret_id: Some("inline-secret".into()), + secret_id_file: None, + kubernetes_role: None, + kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), + kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + tls_verify: VaultTlsVerify::VerifyFull, + tls_server_ca_certificate: None, + } + } + + fn kubernetes_base() -> VaultConfig { + VaultConfig { + address: "http://127.0.0.1:8200".into(), + auth_method: VaultAuthMethod::Kubernetes, + pre_rotation_pct: 75, + role_id: None, + secret_id: None, + secret_id_file: None, + kubernetes_role: Some("pgdog".into()), + kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), + kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + tls_verify: VaultTlsVerify::VerifyFull, + tls_server_ca_certificate: None, + } + } + + // ── AppRole ─────────────────────────────────────────────────────────────── + + #[test] + fn test_secret_id_inline() { + assert_eq!(approle_base().secret_id().unwrap(), "inline-secret"); + } + + #[test] + fn test_secret_id_missing_returns_error() { + let cfg = VaultConfig { + secret_id: None, + secret_id_file: None, + ..approle_base() + }; + assert!(cfg.secret_id().is_err()); + } + + #[test] + fn test_secret_id_file() { + use std::io::Write; + let mut f = tempfile::NamedTempFile::new().unwrap(); + writeln!(f, " file-secret ").unwrap(); + let cfg = VaultConfig { + secret_id: None, + secret_id_file: Some(f.path().into()), + ..approle_base() + }; + assert_eq!(cfg.secret_id().unwrap(), "file-secret"); + } + + #[test] + fn test_secret_id_inline_wins_over_file() { + use std::io::Write; + let mut f = tempfile::NamedTempFile::new().unwrap(); + writeln!(f, "file-secret").unwrap(); + let cfg = VaultConfig { + secret_id: Some("inline".into()), + secret_id_file: Some(f.path().into()), + ..approle_base() + }; + assert_eq!(cfg.secret_id().unwrap(), "inline"); + } + + // ── Kubernetes ──────────────────────────────────────────────────────────── + + #[test] + fn test_kubernetes_mount_path_default() { + assert_eq!(kubernetes_base().kubernetes_mount_path, "kubernetes"); + } + + #[test] + fn test_kubernetes_jwt_path_default() { + assert_eq!( + kubernetes_base().kubernetes_jwt_path, + Path::new("/var/run/secrets/kubernetes.io/serviceaccount/token") + ); + } + + #[test] + fn test_jwt_path_returns_configured_path() { + use std::io::Write; + let mut f = tempfile::NamedTempFile::new().unwrap(); + writeln!(f, " eyJhbGciOiJSUzI1NiJ9.stub ").unwrap(); + let cfg = VaultConfig { + kubernetes_jwt_path: f.path().into(), + ..kubernetes_base() + }; + assert_eq!(cfg.jwt_path(), f.path()); + let content = read_to_string(cfg.jwt_path()).unwrap(); + assert_eq!(content.trim(), "eyJhbGciOiJSUzI1NiJ9.stub"); + } + + #[test] + fn test_jwt_path_default() { + let cfg = kubernetes_base(); + assert_eq!( + cfg.jwt_path(), + Path::new("/var/run/secrets/kubernetes.io/serviceaccount/token") + ); + } + + // ── serde ───────────────────────────────────────────────────────────────── + + #[test] + fn test_default_pre_rotation_pct() { + let toml = r#" + address = "https://vault.example.com:8200" + role_id = "r" + secret_id = "s" + "#; + let cfg: VaultConfig = toml::from_str(toml).unwrap(); + assert_eq!(cfg.pre_rotation_pct, 75); + } + + #[test] + fn test_kubernetes_defaults_applied_when_not_set() { + let toml = r#" + address = "https://vault.example.com:8200" + auth_method = "kubernetes" + kubernetes_role = "pgdog" + "#; + let cfg: VaultConfig = toml::from_str(toml).unwrap(); + assert_eq!(cfg.auth_method, VaultAuthMethod::Kubernetes); + assert_eq!(cfg.kubernetes_role.as_deref(), Some("pgdog")); + assert_eq!( + cfg.kubernetes_jwt_path, + Path::new("/var/run/secrets/kubernetes.io/serviceaccount/token") + ); + assert_eq!(cfg.kubernetes_mount_path, "kubernetes"); + } + + #[test] + fn test_kubernetes_custom_mount_and_jwt_path() { + let toml = r#" + address = "https://vault.example.com:8200" + auth_method = "kubernetes" + kubernetes_role = "pgdog" + kubernetes_mount_path = "k8s-cluster-a" + kubernetes_jwt_path = "/custom/token" + "#; + let cfg: VaultConfig = toml::from_str(toml).unwrap(); + assert_eq!(cfg.kubernetes_mount_path, "k8s-cluster-a"); + assert_eq!(cfg.kubernetes_jwt_path, Path::new("/custom/token")); + } + + #[test] + fn test_unknown_field_rejected() { + let toml = r#" + address = "https://vault.example.com:8200" + role_id = "r" + secret_id = "s" + typo_field = true + "#; + assert!(toml::from_str::(toml).is_err()); + } +} diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 332287350..8e00fd13e 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -77,7 +77,7 @@ azure_identity = "0.34.0" azure_core = "0.34.0" crc32c = "0.6.8" bit-vec = "0.8" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots-no-provider"] } +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls-webpki-roots-no-provider"] } hex = "0.4" [target.'cfg(unix)'.dependencies] diff --git a/pgdog/src/auth/mod.rs b/pgdog/src/auth/mod.rs index 98d6160a8..b270c6a2e 100644 --- a/pgdog/src/auth/mod.rs +++ b/pgdog/src/auth/mod.rs @@ -3,6 +3,5 @@ pub mod error; pub mod md5; pub mod scram; - pub use error::Error; pub use md5::Client; diff --git a/pgdog/src/backend/auth/mod.rs b/pgdog/src/backend/auth/mod.rs index cda4083ee..c89d0c9c9 100644 --- a/pgdog/src/backend/auth/mod.rs +++ b/pgdog/src/backend/auth/mod.rs @@ -1,2 +1,3 @@ pub mod azure_workload_identity; pub mod rds_iam; +pub mod vault; diff --git a/pgdog/src/backend/auth/vault/api.rs b/pgdog/src/backend/auth/vault/api.rs new file mode 100644 index 000000000..a7b366541 --- /dev/null +++ b/pgdog/src/backend/auth/vault/api.rs @@ -0,0 +1,446 @@ +//! Vault HTTP API client built on `reqwest`. + +use std::fs::read; + +use serde::Deserialize; +use serde_json::json; + +use pgdog_config::{VaultConfig, VaultTlsVerify}; + +use super::Error; + +// ── public types ────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone)] +pub struct VaultToken { + pub client_token: String, + pub lease_duration: u64, + pub renewable: bool, +} + +#[derive(Debug, Clone)] +pub struct VaultCredential { + pub username: String, + pub password: String, + pub lease_duration: u64, +} + +// ── Vault JSON shapes ───────────────────────────────────────────────────────── + +/// Both AppRole and Kubernetes login responses share the same `auth` wrapper. +#[derive(Deserialize)] +struct LoginResponse { + auth: AuthData, +} + +#[derive(Deserialize)] +struct AuthData { + client_token: String, + lease_duration: u64, + renewable: bool, +} + +#[derive(Deserialize)] +struct CredentialResponse { + data: CredentialData, + lease_duration: u64, +} + +#[derive(Deserialize)] +struct CredentialData { + username: String, + password: String, +} + +// ── client construction ─────────────────────────────────────────────────────── + +/// Build a `reqwest::Client` configured with the TLS settings from `cfg`. +/// Call once per vault config (at startup and in the renewal task) and reuse +/// the resulting client for all API calls within that lifecycle. +pub fn build_client(cfg: &VaultConfig) -> Result { + let mut builder = reqwest::Client::builder(); + + builder = match cfg.tls_verify { + VaultTlsVerify::Disable => builder.danger_accept_invalid_certs(true), + VaultTlsVerify::VerifyCa => builder.danger_accept_invalid_hostnames(true), + VaultTlsVerify::VerifyFull => builder, + }; + + if let Some(ref ca_path) = cfg.tls_server_ca_certificate { + let pem = read(ca_path).map_err(|e| { + Error::Http(format!( + "[vault] failed to read tls_server_ca_certificate {}: {e}", + ca_path.display() + )) + })?; + let cert = reqwest::Certificate::from_pem(&pem) + .map_err(|e| Error::Http(format!("[vault] invalid tls_server_ca_certificate: {e}")))?; + builder = builder.add_root_certificate(cert); + } + + builder + .build() + .map_err(|e| Error::Http(format!("[vault] failed to build HTTP client: {e}"))) +} + +// ── public API ──────────────────────────────────────────────────────────────── + +pub struct AppRoleLogin<'a> { + pub client: &'a reqwest::Client, + pub addr: &'a str, + pub role_id: &'a str, + pub secret_id: &'a str, +} + +impl AppRoleLogin<'_> { + pub async fn call(self) -> Result { + #[cfg(test)] + if let Some(result) = test_support::login_override() { + return result; + } + + let url = format!("{}/v1/auth/approle/login", self.addr.trim_end_matches('/')); + let body = json!({ "role_id": self.role_id, "secret_id": self.secret_id }); + post_login(self.client, &url, &body).await + } +} + +pub struct KubernetesLogin<'a> { + pub client: &'a reqwest::Client, + pub addr: &'a str, + /// Vault auth mount path (default: `"kubernetes"`). + pub mount_path: &'a str, + pub role: &'a str, + /// Contents of the pod's service account token file. + pub jwt: &'a str, +} + +impl KubernetesLogin<'_> { + pub async fn call(self) -> Result { + #[cfg(test)] + if let Some(result) = test_support::login_override() { + return result; + } + + let url = format!( + "{}/v1/auth/{}/login", + self.addr.trim_end_matches('/'), + self.mount_path.trim_matches('/') + ); + let body = json!({ "role": self.role, "jwt": self.jwt }); + post_login(self.client, &url, &body).await + } +} + +pub struct FetchCredential<'a> { + pub client: &'a reqwest::Client, + pub addr: &'a str, + pub token: &'a str, + /// Vault path, e.g. `database/creds/dml-role`. + pub path: &'a str, +} + +impl FetchCredential<'_> { + pub async fn call(self) -> Result { + #[cfg(test)] + if let Some(result) = test_support::credential_override() { + return result; + } + + let url = format!( + "{}/v1/{}", + self.addr.trim_end_matches('/'), + self.path.trim_start_matches('/') + ); + + let response = self + .client + .get(&url) + .header("X-Vault-Token", self.token) + .send() + .await + .map_err(|e| Error::Http(e.to_string()))?; + + let response = check_status(response).await?; + + let parsed: CredentialResponse = response + .json() + .await + .map_err(|e| Error::Parse(e.to_string()))?; + + Ok(VaultCredential { + username: parsed.data.username, + password: parsed.data.password, + lease_duration: parsed.lease_duration, + }) + } +} + +async fn post_login( + client: &reqwest::Client, + url: &str, + body: &serde_json::Value, +) -> Result { + let response = client + .post(url) + .json(body) + .send() + .await + .map_err(|e| Error::Http(e.to_string()))?; + + let response = check_status(response).await?; + + let parsed: LoginResponse = response + .json() + .await + .map_err(|e| Error::Parse(e.to_string()))?; + + Ok(VaultToken { + client_token: parsed.auth.client_token, + lease_duration: parsed.auth.lease_duration, + renewable: parsed.auth.renewable, + }) +} + +async fn check_status(response: reqwest::Response) -> Result { + let status = response.status().as_u16(); + if !(200u16..300).contains(&status) { + let body = response + .text() + .await + .unwrap_or_else(|_| "".into()); + return Err(Error::VaultStatus { status, body }); + } + Ok(response) +} + +// ── test support (override hooks) ───────────────────────────────────────────── +// +// Thread-local storage so parallel `#[tokio::test]` runs don't interfere. +// Each test gets a current-thread Tokio runtime on its own thread, so +// thread_local! gives perfect isolation without any locking. + +#[cfg(test)] +pub mod test_support { + use super::{Error, VaultCredential, VaultToken}; + use std::cell::RefCell; + + thread_local! { + static LOGIN: RefCell>> = const { RefCell::new(None) }; + static CREDENTIAL: RefCell>> = const { RefCell::new(None) }; + } + + pub fn set_login(result: Option>) { + LOGIN.with(|c| *c.borrow_mut() = result); + } + + pub fn set_credential(result: Option>) { + CREDENTIAL.with(|c| *c.borrow_mut() = result); + } + + pub(super) fn login_override() -> Option> { + LOGIN.with(|c| c.borrow_mut().take()) + } + + pub(super) fn credential_override() -> Option> { + CREDENTIAL.with(|c| c.borrow_mut().take()) + } +} + +// ── tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use once_cell::sync::Lazy; + use tokio_rustls::rustls::crypto::aws_lc_rs; + + use pgdog_config::VaultAuthMethod; + + use super::*; + + // reqwest with rustls-tls needs a process-level CryptoProvider. + static RING: Lazy<()> = Lazy::new(|| { + let _ = aws_lc_rs::default_provider().install_default(); + }); + + fn test_client() -> reqwest::Client { + let _ = *RING; + reqwest::Client::new() + } + + #[test] + fn test_parse_login_response() { + let json = r#"{ + "auth": { + "client_token": "s.abc123", + "lease_duration": 86400, + "renewable": true, + "accessor": "ignored", + "policies": [] + } + }"#; + let parsed: LoginResponse = serde_json::from_str(json).unwrap(); + assert_eq!(parsed.auth.client_token, "s.abc123"); + assert_eq!(parsed.auth.lease_duration, 86400); + assert!(parsed.auth.renewable); + } + + #[tokio::test] + async fn test_kubernetes_login_uses_override() { + test_support::set_login(Some(Ok(VaultToken { + client_token: "k8s-token".into(), + lease_duration: 7200, + renewable: true, + }))); + let token = KubernetesLogin { + client: &test_client(), + addr: "http://irrelevant", + mount_path: "kubernetes", + role: "pgdog", + jwt: "eyJhbGciOiJSUzI1NiJ9.stub", + } + .call() + .await + .unwrap(); + assert_eq!(token.client_token, "k8s-token"); + assert_eq!(token.lease_duration, 7200); + } + + #[tokio::test] + async fn test_kubernetes_login_propagates_error() { + test_support::set_login(Some(Err(Error::VaultStatus { + status: 403, + body: "".into(), + }))); + let err = KubernetesLogin { + client: &test_client(), + addr: "http://irrelevant", + mount_path: "kubernetes", + role: "pgdog", + jwt: "jwt", + } + .call() + .await + .unwrap_err(); + assert!(matches!(err, Error::VaultStatus { status: 403, .. })); + } + + #[test] + fn test_parse_credential_response() { + let json = r#"{ + "data": { + "username": "v-approle-dml-AbCdEf", + "password": "s3cr3t-pass" + }, + "lease_duration": 3600, + "lease_id": "database/creds/dml-role/xyz", + "renewable": true + }"#; + let parsed: CredentialResponse = serde_json::from_str(json).unwrap(); + assert_eq!(parsed.data.username, "v-approle-dml-AbCdEf"); + assert_eq!(parsed.data.password, "s3cr3t-pass"); + assert_eq!(parsed.lease_duration, 3600); + } + + #[tokio::test] + async fn test_approle_login_uses_override() { + test_support::set_login(Some(Ok(VaultToken { + client_token: "test-token".into(), + lease_duration: 3600, + renewable: true, + }))); + let token = AppRoleLogin { + client: &test_client(), + addr: "http://irrelevant", + role_id: "role", + secret_id: "secret", + } + .call() + .await + .unwrap(); + assert_eq!(token.client_token, "test-token"); + assert_eq!(token.lease_duration, 3600); + } + + #[tokio::test] + async fn test_fetch_credential_uses_override() { + test_support::set_credential(Some(Ok(VaultCredential { + username: "v-approle-dml-XyZ".into(), + password: "pw".into(), + lease_duration: 86400, + }))); + let cred = FetchCredential { + client: &test_client(), + addr: "http://irrelevant", + token: "tok", + path: "database/creds/dml-role", + } + .call() + .await + .unwrap(); + assert_eq!(cred.username, "v-approle-dml-XyZ"); + assert_eq!(cred.lease_duration, 86400); + } + + #[tokio::test] + async fn test_approle_login_propagates_error_override() { + test_support::set_login(Some(Err(Error::VaultStatus { + status: 403, + body: "".into(), + }))); + let err = AppRoleLogin { + client: &test_client(), + addr: "http://irrelevant", + role_id: "role", + secret_id: "bad", + } + .call() + .await + .unwrap_err(); + assert!(matches!(err, Error::VaultStatus { status: 403, .. })); + } + + // ── build_client TLS ───────────────────────────────────────────────────── + + fn tls_cfg(tls_verify: VaultTlsVerify, ca: Option<&str>) -> VaultConfig { + VaultConfig { + address: "https://vault.example.com".into(), + auth_method: VaultAuthMethod::AppRole, + pre_rotation_pct: 75, + role_id: None, + secret_id: None, + secret_id_file: None, + kubernetes_role: None, + kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), + kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + tls_verify, + tls_server_ca_certificate: ca.map(PathBuf::from), + } + } + + #[test] + fn test_build_client_verify_full_default() { + assert!(build_client(&tls_cfg(VaultTlsVerify::VerifyFull, None)).is_ok()); + } + + #[test] + fn test_build_client_disable() { + assert!(build_client(&tls_cfg(VaultTlsVerify::Disable, None)).is_ok()); + } + + #[test] + fn test_build_client_verify_ca() { + assert!(build_client(&tls_cfg(VaultTlsVerify::VerifyCa, None)).is_ok()); + } + + #[test] + fn test_build_client_invalid_ca_path_errors() { + assert!(build_client(&tls_cfg( + VaultTlsVerify::VerifyFull, + Some("/nonexistent/ca.pem") + )) + .is_err()); + } +} diff --git a/pgdog/src/backend/auth/vault/error.rs b/pgdog/src/backend/auth/vault/error.rs new file mode 100644 index 000000000..d8a35867e --- /dev/null +++ b/pgdog/src/backend/auth/vault/error.rs @@ -0,0 +1,19 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("[vault] HTTP error: {0}")] + Http(String), + + #[error("[vault] unexpected status {status}: {body}")] + VaultStatus { status: u16, body: String }, + + #[error("[vault] response parse error: {0}")] + Parse(String), + + #[error("[vault] secret_id not available: {0}")] + SecretId(String), + + #[error("[vault] config update failed: {0}")] + ConfigUpdate(String), +} diff --git a/pgdog/src/backend/auth/vault/mod.rs b/pgdog/src/backend/auth/vault/mod.rs new file mode 100644 index 000000000..e70a4c202 --- /dev/null +++ b/pgdog/src/backend/auth/vault/mod.rs @@ -0,0 +1,590 @@ +//! Vault dynamic credential lifecycle management for pgdog backend pools. +//! +//! ## Startup + background renewal +//! +//! 1. Authenticates to Vault, fetches credentials for all pools, writes them to the +//! in-memory [`STORE`] cache so pools can connect with the correct dynamic credentials. +//! 2. Sleeps until `pre_rotation_pct`% of the shortest lease TTL has elapsed, +//! then repeats and calls reload_from_existing to re-create connection with the new set of credentials. +//! +//! If any step fails the task retries with exponential backoff (capped at 60 s). + +pub mod api; +pub mod error; + +use std::collections::HashMap; +use std::sync::RwLock; +use std::time::Duration; + +use once_cell::sync::Lazy; +use tokio::fs::read_to_string; +use tokio::task::JoinHandle; +use tracing::{error, info, warn}; + +use crate::backend::databases::reload_from_existing; + +use api::{build_client, AppRoleLogin, FetchCredential, KubernetesLogin}; +pub use api::{VaultCredential, VaultToken}; +pub use error::Error; + +use pgdog_config::{User, VaultAuthMethod, VaultConfig}; + +// ── credential cache ─────────────────────────────────────────────────────────── +// +// Credentials are stored here rather than in the parsed config so they survive +// config reloads from disk. `Address::new()` reads from here for Vault users. + +#[derive(Clone)] +pub struct CachedCredential { + pub username: String, + pub password: String, +} + +static STORE: Lazy>> = + Lazy::new(|| RwLock::new(HashMap::new())); + +pub fn cache_set(pool_name: &str, username: &str, password: &str) { + let mut w = STORE.write().unwrap_or_else(|e| e.into_inner()); + w.insert( + pool_name.to_string(), + CachedCredential { + username: username.to_string(), + password: password.to_string(), + }, + ); +} + +pub fn cache_get(pool_name: &str) -> Option { + STORE + .read() + .unwrap_or_else(|e| e.into_inner()) + .get(pool_name) + .cloned() +} + +// ── Vault HTTP client ───────────────────────────────────────────────────────── + +/// Holds the `reqwest` client and config for a single Vault connection. +/// Built once so TLS handshakes and connection pool state are reused across all API calls. +struct VaultClient { + client: reqwest::Client, + cfg: VaultConfig, +} + +impl VaultClient { + fn new(cfg: VaultConfig) -> Result { + let client = build_client(&cfg)?; + Ok(Self { client, cfg }) + } + + /// Authenticate to Vault and return a short-lived token. + async fn login(&self) -> Result { + match self.cfg.auth_method { + VaultAuthMethod::AppRole => { + let role_id = self.cfg.role_id.as_deref().ok_or_else(|| { + Error::SecretId("[vault] role_id is required for AppRole auth".into()) + })?; + let secret_id = self + .cfg + .secret_id() + .map_err(|e| Error::SecretId(e.to_string()))?; + AppRoleLogin { + client: &self.client, + addr: &self.cfg.address, + role_id, + secret_id: &secret_id, + } + .call() + .await + } + VaultAuthMethod::Kubernetes => { + let role = self.cfg.kubernetes_role.as_deref().ok_or_else(|| { + Error::SecretId( + "[vault] kubernetes_role is required for Kubernetes auth".into(), + ) + })?; + let jwt = read_to_string(self.cfg.jwt_path()) + .await + .map(|s| s.trim().to_string()) + .map_err(|e| { + Error::SecretId(format!( + "[vault] failed to read JWT from {}: {e}", + self.cfg.jwt_path().display() + )) + })?; + KubernetesLogin { + client: &self.client, + addr: &self.cfg.address, + mount_path: &self.cfg.kubernetes_mount_path, + role, + jwt: &jwt, + } + .call() + .await + } + } + } + + /// Authenticate once and fetch credentials for every pool. + /// Returns all credentials and the minimum lease duration seen. + async fn fetch_credentials( + &self, + pools: &[(String, String)], + ) -> Result<(Vec<(String, VaultCredential)>, u64), Error> { + let token = self.login().await?; + let mut credentials = Vec::with_capacity(pools.len()); + let mut min_lease = u64::MAX; + + for (pool_name, vault_path) in pools { + let cred = match (FetchCredential { + client: &self.client, + addr: &self.cfg.address, + token: &token.client_token, + path: vault_path, + }) + .call() + .await + { + Ok(c) => c, + Err(e) => { + error!(pool = %pool_name, "[vault] failed to fetch credential: {e}"); + continue; + } + }; + if cred.lease_duration == 0 { + warn!( + pool = %pool_name, + "[vault] lease_duration is 0 — credentials may not be renewable; check Vault backend config" + ); + } + min_lease = min_lease.min(cred.lease_duration); + credentials.push((pool_name.clone(), cred)); + } + + Ok(( + credentials, + if min_lease == u64::MAX { 0 } else { min_lease }, + )) + } +} + +// ── public interface ────────────────────────────────────────────────────────── + +/// Owns the single background renewal task. +pub struct VaultManager { + handle: JoinHandle<()>, +} + +impl VaultManager { + /// Fetch initial Vault credentials (awaitable), then spawn the background + /// renewal task. Returns `None` when no users have `vault_path` set. + pub async fn start(vault_config: &VaultConfig, users: &[User]) -> Option { + let pools = vault_pools(users); + if pools.is_empty() { + return None; + } + + let client = match VaultClient::new(vault_config.clone()) { + Ok(c) => c, + Err(err) => { + error!("[vault] failed to build HTTP client: {err}"); + return None; + } + }; + + let initial_sleep = match client.fetch_credentials(&pools).await { + Ok((creds, min_lease)) => { + for (pool_name, cred) in &creds { + cache_set(pool_name, &cred.username, &cred.password); + } + info!(pools = pools.len(), "[vault] initial credentials fetched"); + rotation_interval(min_lease, client.cfg.pre_rotation_pct) + } + Err(err) => { + error!("[vault] initial credential fetch failed: {err}"); + Duration::from_secs(1) + } + }; + + let handle = tokio::spawn(renewal_task(client, pools, initial_sleep)); + + Some(Self { handle }) + } +} + +impl Drop for VaultManager { + fn drop(&mut self) { + self.handle.abort(); + } +} + +// ── background task ─────────────────────────────────────────────────────────── + +/// Sleeps for `initial_sleep` (the rotation interval from the initial fetch), +/// then loops: fetch → sleep(rotation_interval) → repeat. +async fn renewal_task(client: VaultClient, pools: Vec<(String, String)>, initial_sleep: Duration) { + tokio::time::sleep(initial_sleep).await; + let mut backoff = Duration::from_secs(1); + const MAX_BACKOFF: Duration = Duration::from_secs(60); + + loop { + match client.fetch_credentials(&pools).await { + Ok((creds, min_lease)) => { + backoff = Duration::from_secs(1); + for (pool_name, cred) in &creds { + cache_set(pool_name, &cred.username, &cred.password); + } + if let Err(e) = reload_from_existing() { + error!("[vault] pool reload failed after credential rotation: {e}"); + } + let next = rotation_interval(min_lease, client.cfg.pre_rotation_pct); + info!( + pools = pools.len(), + min_lease_secs = min_lease, + next_refresh_secs = next.as_secs(), + "[vault] credentials rotated" + ); + tokio::time::sleep(next).await; + } + Err(err) => { + error!( + backoff_secs = backoff.as_secs(), + "[vault] rotation failed: {err}" + ); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(MAX_BACKOFF); + } + } + } +} + +// ── helpers ─────────────────────────────────────────────────────────────────── + +/// Composite cache key so credentials are scoped per (user, database) pair. +fn pool_key(user_name: &str, database: &str) -> String { + format!("{user_name}/{database}") +} + +fn vault_pools(users: &[User]) -> Vec<(String, String)> { + users + .iter() + .filter_map(|u| { + u.vault_path + .as_ref() + .map(|path| (pool_key(&u.name, &u.database), path.clone())) + }) + .collect() +} + +/// Minimum rotation interval regardless of TTL, to prevent a busy-loop when +/// Vault returns `lease_duration = 0` (non-renewable credentials, misconfigured backend). +const MIN_ROTATION_INTERVAL: Duration = Duration::from_secs(10); + +/// How long to wait before fetching the next credential generation. +/// Clamps `pre_rotation_pct` to [1, 99] and enforces a 10-second minimum +/// so a zero-TTL response never causes a tight loop hammering Vault. +pub fn rotation_interval(lease_duration_secs: u64, pre_rotation_pct: u8) -> Duration { + let pct = pre_rotation_pct.clamp(1, 99) as f64 / 100.0; + let computed = Duration::from_secs_f64(lease_duration_secs as f64 * pct); + computed.max(MIN_ROTATION_INTERVAL) +} + +// ── tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::api::test_support; + use super::*; + use crate::config::load_test; + use once_cell::sync::Lazy; + use pgdog_config::VaultConfig; + + // reqwest with rustls-tls needs a process-level CryptoProvider. + static RING: Lazy<()> = Lazy::new(|| { + let _ = tokio_rustls::rustls::crypto::aws_lc_rs::default_provider().install_default(); + }); + + fn vault_cfg() -> VaultConfig { + let _ = *RING; + VaultConfig { + address: "http://127.0.0.1:8200".into(), + auth_method: pgdog_config::VaultAuthMethod::AppRole, + pre_rotation_pct: 75, + role_id: Some("test-role-id".into()), + secret_id: Some("test-secret-id".into()), + secret_id_file: None, + kubernetes_role: None, + kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), + kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + tls_verify: pgdog_config::VaultTlsVerify::VerifyFull, + tls_server_ca_certificate: None, + } + } + + fn k8s_cfg() -> VaultConfig { + VaultConfig { + address: "http://127.0.0.1:8200".into(), + auth_method: pgdog_config::VaultAuthMethod::Kubernetes, + pre_rotation_pct: 75, + role_id: None, + secret_id: None, + secret_id_file: None, + kubernetes_role: Some("pgdog".into()), + kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), + kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + tls_verify: pgdog_config::VaultTlsVerify::VerifyFull, + tls_server_ca_certificate: None, + } + } + + fn vault_client() -> VaultClient { + VaultClient::new(vault_cfg()).expect("failed to build VaultClient in test") + } + + fn test_cred(username: &str) -> VaultCredential { + test_cred_with_lease(username, 3600) + } + + fn test_cred_with_lease(username: &str, lease_duration: u64) -> VaultCredential { + VaultCredential { + username: username.into(), + password: "s3cr3t".into(), + lease_duration, + } + } + + // ── rotation_interval ──────────────────────────────────────────────────── + + #[test] + fn test_rotation_interval_75pct_of_one_hour() { + assert_eq!(rotation_interval(3600, 75), Duration::from_secs(2700)); + } + + #[test] + fn test_rotation_interval_50pct_of_one_day() { + assert_eq!(rotation_interval(86400, 50), Duration::from_secs(43200)); + } + + #[test] + fn test_rotation_interval_clamps_100_to_99() { + assert_eq!(rotation_interval(100, 100), rotation_interval(100, 99)); + assert_eq!(rotation_interval(100, 99), Duration::from_secs(99)); + } + + #[test] + fn test_rotation_interval_clamps_0_to_1() { + assert_eq!(rotation_interval(1000, 0), Duration::from_secs(10)); + } + + #[test] + fn test_rotation_interval_zero_lease_uses_minimum() { + assert_eq!(rotation_interval(0, 75), MIN_ROTATION_INTERVAL); + assert_eq!(rotation_interval(0, 0), MIN_ROTATION_INTERVAL); + } + + #[test] + fn test_rotation_interval_short_lease_uses_minimum() { + assert_eq!(rotation_interval(1, 75), MIN_ROTATION_INTERVAL); + } + + // ── fetch_credentials (mocked) ─────────────────────────────────────────── + + #[tokio::test] + async fn test_fetch_credentials_returns_credentials_and_lease() { + load_test(); + test_support::set_login(Some(Ok(VaultToken { + client_token: "tok".into(), + lease_duration: 3600, + renewable: true, + }))); + test_support::set_credential(Some(Ok(test_cred("v-approle-dml-mock")))); + + let pools = vec![("pgdog".into(), "database/creds/dml-role".into())]; + let (creds, lease) = vault_client() + .fetch_credentials(&pools) + .await + .expect("fetch_credentials should succeed with mocked API"); + assert_eq!(lease, 3600); + assert_eq!(creds.len(), 1); + assert_eq!(creds[0].0, "pgdog"); + assert_eq!(creds[0].1.username, "v-approle-dml-mock"); + } + + #[tokio::test] + async fn test_fetch_credentials_propagates_login_error() { + test_support::set_login(Some(Err(Error::VaultStatus { + status: 403, + body: "".into(), + }))); + let pools = vec![("pool".into(), "database/creds/role".into())]; + let err = vault_client().fetch_credentials(&pools).await.unwrap_err(); + assert!(matches!(err, Error::VaultStatus { status: 403, .. })); + } + + #[tokio::test] + async fn test_fetch_credentials_skips_failed_credential() { + test_support::set_login(Some(Ok(VaultToken { + client_token: "tok".into(), + lease_duration: 3600, + renewable: true, + }))); + test_support::set_credential(Some(Err(Error::VaultStatus { + status: 500, + body: "".into(), + }))); + let pools = vec![("pool".into(), "database/creds/role".into())]; + let (creds, lease) = vault_client() + .fetch_credentials(&pools) + .await + .expect("should succeed with failed pool skipped"); + assert!(creds.is_empty(), "failed pool should be skipped"); + assert_eq!(lease, 0); + } + + // ── VaultManager ───────────────────────────────────────────────────────── + + #[tokio::test] + async fn test_manager_returns_none_when_no_vault_users() { + let users: Vec = vec![pgdog_config::User { + name: "plain_user".into(), + vault_path: None, + ..Default::default() + }]; + assert!(VaultManager::start(&vault_cfg(), &users).await.is_none()); + } + + #[tokio::test] + async fn test_manager_spawns_tasks_for_vault_users() { + load_test(); + test_support::set_login(Some(Ok(VaultToken { + client_token: "tok".into(), + lease_duration: 3600, + renewable: true, + }))); + test_support::set_credential(Some(Ok(test_cred("v-approle-manager-AbCdEf")))); + + let users = vec![pgdog_config::User { + name: "pgdog".into(), + vault_path: Some("database/creds/dml-role".into()), + ..Default::default() + }]; + assert!(VaultManager::start(&vault_cfg(), &users).await.is_some()); + } + + // ── cache ───────────────────────────────────────────────────────────────── + + #[test] + fn test_cache_set_and_get() { + cache_set("cache_test_pool", "v-approle-abc", "pw123"); + let cred = cache_get("cache_test_pool").expect("should find cached credential"); + assert_eq!(cred.username, "v-approle-abc"); + assert_eq!(cred.password, "pw123"); + } + + #[test] + fn test_cache_get_missing_returns_none() { + assert!(cache_get("pool_that_was_never_cached_xyz").is_none()); + } + + #[test] + fn test_cache_overwrite() { + cache_set("overwrite_pool", "old-user", "old-pw"); + cache_set("overwrite_pool", "new-user", "new-pw"); + let cred = cache_get("overwrite_pool").unwrap(); + assert_eq!(cred.username, "new-user"); + assert_eq!(cred.password, "new-pw"); + } + + // ── VaultClient::login ──────────────────────────────────────────────────── + + #[tokio::test] + async fn test_login_approle() { + test_support::set_login(Some(Ok(VaultToken { + client_token: "approle-tok".into(), + lease_duration: 3600, + renewable: true, + }))); + let tok = vault_client().login().await.unwrap(); + assert_eq!(tok.client_token, "approle-tok"); + } + + #[tokio::test] + async fn test_login_approle_missing_role_id_errors() { + let client = VaultClient::new(VaultConfig { + role_id: None, + ..vault_cfg() + }) + .unwrap(); + let err = client.login().await.unwrap_err(); + assert!(matches!(err, Error::SecretId(_))); + } + + #[tokio::test] + async fn test_login_kubernetes() { + use std::io::Write; + let mut f = tempfile::NamedTempFile::new().unwrap(); + writeln!(f, "eyJhbGciOiJSUzI1NiJ9.stub").unwrap(); + + test_support::set_login(Some(Ok(VaultToken { + client_token: "k8s-tok".into(), + lease_duration: 7200, + renewable: true, + }))); + + let client = VaultClient::new(VaultConfig { + kubernetes_jwt_path: f.path().into(), + ..k8s_cfg() + }) + .unwrap(); + let tok = client.login().await.unwrap(); + assert_eq!(tok.client_token, "k8s-tok"); + } + + #[tokio::test] + async fn test_login_kubernetes_missing_role_errors() { + let client = VaultClient::new(VaultConfig { + kubernetes_role: None, + ..k8s_cfg() + }) + .unwrap(); + let err = client.login().await.unwrap_err(); + assert!(matches!(err, Error::SecretId(_))); + } + + #[tokio::test] + async fn test_login_kubernetes_missing_jwt_file_errors() { + let client = VaultClient::new(VaultConfig { + kubernetes_jwt_path: std::path::PathBuf::from("/nonexistent/token"), + ..k8s_cfg() + }) + .unwrap(); + let err = client.login().await.unwrap_err(); + assert!(matches!(err, Error::SecretId(_))); + } + + #[tokio::test] + async fn test_fetch_credentials_kubernetes() { + use std::io::Write; + load_test(); + + let mut f = tempfile::NamedTempFile::new().unwrap(); + writeln!(f, "eyJhbGciOiJSUzI1NiJ9.stub").unwrap(); + + test_support::set_login(Some(Ok(VaultToken { + client_token: "k8s-tok".into(), + lease_duration: 7200, + renewable: true, + }))); + test_support::set_credential(Some(Ok(test_cred_with_lease("v-k8s-dml-AbCdEf", 7200)))); + + let client = VaultClient::new(VaultConfig { + kubernetes_jwt_path: f.path().into(), + ..k8s_cfg() + }) + .unwrap(); + let pools = vec![("pgdog".into(), "database/creds/dml-role".into())]; + let (creds, lease) = client.fetch_credentials(&pools).await.unwrap(); + assert_eq!(lease, 7200); + assert_eq!(creds.len(), 1); + } +} diff --git a/pgdog/src/backend/pool/address.rs b/pgdog/src/backend/pool/address.rs index 37a28b3bc..1bb17cab3 100644 --- a/pgdog/src/backend/pool/address.rs +++ b/pgdog/src/backend/pool/address.rs @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize}; use url::Url; use super::Password; +use crate::backend::auth::vault::cache_get; use crate::backend::{pool::dns_cache::DnsCache, Error}; use crate::config::{config, Database, ServerAuth, User}; @@ -56,6 +57,12 @@ impl Address { pub fn new(database: &Database, user: &User, database_number: usize) -> Self { let server_auth = user.server_auth; + let vault_cred = if server_auth == ServerAuth::Vault { + cache_get(&format!("{}/{}", user.name, user.database)) + } else { + None + }; + Address { host: database.host.clone(), port: database.port, @@ -66,6 +73,8 @@ impl Address { }, user: if let Some(user) = database.user.clone() { user + } else if let Some(ref cred) = vault_cred { + cred.username.clone() } else if let Some(user) = user.server_user.clone() { user } else { @@ -73,6 +82,11 @@ impl Address { }, passwords: if server_auth.is_external_identity() { vec![] + } else if server_auth == ServerAuth::Vault { + // On a cache miss (manager not yet run) return empty so connections fail cleanly rather than sending the client-side pgdog password to PostgreSQL. + vault_cred + .map(|cred| vec![cred.password.into()]) + .unwrap_or_default() } else if let Some(password) = database.password.clone() { vec![password.into()] } else if let Some(password) = user.server_password.clone() { @@ -94,7 +108,7 @@ impl Address { /// Get address passwords, in valid order. pub async fn auth_secrets(&self) -> Result, Error> { let mut secrets = match self.server_auth { - ServerAuth::Password => self.passwords.clone(), + ServerAuth::Vault | ServerAuth::Password => self.passwords.clone(), ServerAuth::RdsIam => vec![crate::backend::auth::rds_iam::token(self).await?.into()], ServerAuth::AzureWorkloadIdentity => { vec![crate::backend::auth::azure_workload_identity::token(self) @@ -217,6 +231,66 @@ mod test { assert_eq!(address.passwords.first().unwrap(), "hunter3"); } + #[test] + fn test_vault_reads_from_cache() { + // Simulate vault::init() having run: cache has dynamic credentials. + // Key must be composite (name/database) matching Address::new lookup. + crate::backend::auth::vault::cache_set( + "dml_role_addr_test/myapp", + "v-approle-dml-XyZ", + "vault-pass", + ); + + let database = Database { + name: "myapp".into(), + host: "127.0.0.1".into(), + port: 5432, + ..Default::default() + }; + let user = User { + name: "dml_role_addr_test".into(), + password: Some("client-pass".into()), + server_auth: ServerAuth::Vault, + vault_path: Some("database/creds/dml-role".into()), + database: "myapp".into(), + ..Default::default() + }; + + let address = Address::new(&database, &user, 0); + assert_eq!( + address.user, "v-approle-dml-XyZ", + "must use vault-generated username from cache" + ); + assert_eq!( + address.passwords, + vec!["vault-pass"], + "must use vault-generated password from cache" + ); + assert_eq!(address.server_auth, ServerAuth::Vault); + } + + #[test] + fn test_vault_empty_before_init() { + // vault::init() not yet run: cache has no entry for this pool name. + let database = Database { + name: "myapp".into(), + host: "127.0.0.1".into(), + port: 5432, + ..Default::default() + }; + let user = User { + name: "dml_role_not_in_cache_xyz".into(), + server_auth: ServerAuth::Vault, + vault_path: Some("database/creds/dml-role".into()), + database: "myapp".into(), + ..Default::default() + }; + + let address = Address::new(&database, &user, 0); + assert_eq!(address.user, "dml_role_not_in_cache_xyz"); + assert!(address.passwords.is_empty()); + } + #[test] fn test_rds_iam_does_not_use_static_password() { let database = Database { diff --git a/pgdog/src/main.rs b/pgdog/src/main.rs index fd1179fd0..f4d8d3649 100644 --- a/pgdog/src/main.rs +++ b/pgdog/src/main.rs @@ -115,10 +115,18 @@ async fn pgdog(command: Option) -> Result<(), Box