diff --git a/.vscode/launch.json b/.vscode/launch.json index 59012d8fe..4c7b40b11 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -107,6 +107,23 @@ "APIP_GW_ANALYTICS_ACCESS__LOGS__SERVICE_MODE": "tcp", }, }, + { + "name": "Python Executor", + "type": "debugpy", + "request": "launch", + "module": "main", + "console": "internalConsole", + "cwd": "${workspaceFolder}/gateway/gateway-runtime/python-executor", + "python": "${workspaceFolder}/gateway/gateway-runtime/python-executor/.venv/bin/python3", + "args": [ + "--listen", "localhost:9010", + "--log-level", "debug" + ], + "env": { + "PYTHONPATH": "${workspaceFolder}/gateway/gateway-runtime/python-executor" + }, + "justMyCode": false + }, { "name": "Integration Tests", "type": "go", diff --git a/gateway/DEBUG_GUIDE.md b/gateway/DEBUG_GUIDE.md index ab41c51f8..84fef3b5d 100644 --- a/gateway/DEBUG_GUIDE.md +++ b/gateway/DEBUG_GUIDE.md @@ -1,6 +1,15 @@ # Gateway Debug Guide -Two debug options are available. **Option 1 (Remote Debug)** is the recommended approach — everything runs in Docker and VS Code attaches via dlv. **Option 2 (Local Process)** runs the controller and policy engine as local VS Code processes with only the router in Docker. +Three debug options are available: + +| Option | What runs locally | Best for | +|--------|------------------|----------| +| **[Option 1 — Remote Debug](#option-1-recommended-remote-debug--all-components-in-docker)** *(recommended)* | Nothing — everything runs in Docker, VS Code attaches via dlv | Production-like debugging, Go policies only | +| **[Option 2A — Local Process (Go only)](#option-2a-local-process-debug--controller--policy-engine-in-vs-code)** | Controller + Policy Engine | Go policy development and iteration | +| **[Option 2B — Local Process (Go + Python)](#option-2b-local-process-debug--controller--policy-engine--python-executor-in-vs-code)** | Controller + Policy Engine + Python Executor | Python policy development and debugging | + +> [!TIP] +> **Choose Option 2B** if you are developing or debugging a Python policy and need breakpoints, print statements, or rapid iteration without rebuilding Docker images. It extends Option 2A with the Python Executor running on the host. --- @@ -107,11 +116,12 @@ curl http://localhost:8080/petstore/v1/pets --- -## Option 2 (Alternative): Local Process Debug — Controller + Policy Engine in VS Code +## Option 2A: Local Process Debug — Controller + Policy Engine in VS Code Gateway Controller and Policy Engine run as local VS Code processes. Only the Envoy Router runs in Docker Compose. -> **Warning:** Processes run directly on the host, so Go resolves modules via `go.work`. Local versions of `sdk` and other workspace modules are used instead of the published Go module versions — including any uncommitted or untagged changes. Behavior may differ from a production build. +> [!WARNING] +> Processes run directly on the host, so Go resolves modules via `go.work`. Local versions of `sdk` and other workspace modules are used instead of the published Go module versions — including any uncommitted or untagged changes. Behavior may differ from a production build. ### Architecture @@ -225,3 +235,287 @@ Send a request to the deployed API: ```bash curl http://localhost:8080/petstore/v1/pets ``` + +--- + +## Option 2B: Local Process Debug — Controller + Policy Engine + Python Executor in VS Code + +This extends **Option 2A** by also running the Python Executor on the host, giving you full debugger access to the Python policy runtime. + +> [!NOTE] +> **When to use this:** You are developing or debugging a Python policy and need to set breakpoints, add print statements, or iterate rapidly without rebuilding Docker images. + +> [!WARNING] +> Processes run directly on the host, so Go resolves modules via `go.work`. Local versions of `sdk` and other workspace modules are used instead of the published Go module versions — including any uncommitted or untagged changes. Behavior may differ from a production build. + +### Architecture + +```mermaid +graph TB + subgraph "VS Code Debugger (Local)" + GC["Gateway Controller
REST API: :9090
xDS: :18000 / :18001"] + PE["Policy Engine
ext_proc: :9001 (TCP)
Admin: :9002"] + PYE["Python Executor
gRPC: localhost:9010 (TCP)"] + end + + subgraph "Docker Compose" + Router["Gateway Runtime
Envoy Router
HTTP: :8080
HTTPS: :8443
Admin: :9901"] + end + + Router -->|"host.docker.internal:9001"| PE + Router -->|"host.docker.internal:18000"| GC + GC -->|"localhost:18001"| PE + PE -->|"localhost:9010"| PYE +``` + +### Prerequisites + +- Python 3.10+ with `venv` +- VS Code with Go and Python extensions installed +- Docker and Docker Compose +- Control plane host and registration token (optional, for gateway registration) + +### Step 1: Enable TCP Mode in config.toml + +Add the following block to `configs/config.toml`: + +```toml +[policy_engine.python_executor.server] +mode = "tcp" +port = 9010 +host = "localhost" +``` + +This tells the Policy Engine to connect to the Python Executor over TCP instead of the default Unix domain socket. + +> [!WARNING] +> **Remove this block when you are done debugging.** The `config.toml` is also mounted into the Docker container (`docker-compose.yaml`), where the Python Executor runs in UDS mode. If this TCP block is left in, the containerized Policy Engine will try to dial `localhost:9010` while the embedded Python Executor is listening on a UDS socket — causing silent connection failures. + +### Step 2: Run Gateway Builder + +Run the **Gateway Builder** debug configuration from VS Code. This compiles all policies (Go + Python) and generates: + +- The Policy Engine binary (compiled with the Python bridge code) +- `python_policy_registry.py` (maps policy names to Python modules) +- Merged `requirements.txt` (all Python policy dependencies) + +> **Note:** Wait for the builder to complete successfully before starting the other components. + +### Step 3: Prepare the Python Environment + +```bash +cd gateway + +# Create or activate the venv +python3 -m venv gateway-runtime/python-executor/.venv +source gateway-runtime/python-executor/.venv/bin/activate + +# Install dependencies (includes policy packages from the build) +pip install -r gateway-builder/target/output/python-executor/requirements.txt + +# Copy the generated registry into the executor source +cp gateway-builder/target/output/python-executor/python_policy_registry.py \ + gateway-runtime/python-executor/python_policy_registry.py +``` + +> [!IMPORTANT] +> Re-run the `pip install` and `cp` steps after every builder run if policies change. + +### Step 4: Update Docker Compose Configuration + +In `gateway/docker-compose.yaml`, make two changes to the `gateway-runtime` service: + +1. Set `GATEWAY_CONTROLLER_HOST` to `host.docker.internal` so the runtime reaches the locally-running controller: + +```yaml +services: + gateway-runtime: + environment: + - GATEWAY_CONTROLLER_HOST=host.docker.internal +``` + +2. Comment out the **Policy Engine** port block: + +```yaml +services: + gateway-runtime: + ports: + # Router (Envoy) - keep these + - "8080:8080" # HTTP ingress + - "8443:8443" # HTTPS ingress + - "8081:8081" # xDS-managed API listener + - "8082:8082" # WebSub Hub dynamic forward proxy + - "8083:8083" # WebSub Hub internal listener + - "9901:9901" # Envoy admin + # Policy Engine - comment these out + # - "9002:9002" # Admin API + # - "9003:9003" # Metrics +``` + + +### Step 5: Start Gateway Controller + +Run the **Gateway Controller** debug configuration from VS Code. + +> **Note:** Leave `APIP_GW_CONTROLPLANE_HOST` and `APIP_GW_GATEWAY_REGISTRATION_TOKEN` empty (`""`) in `.vscode/launch.json` if you want to run in standalone mode without control plane connection. + +### Step 6: Start the Python Executor + +Run the **Python Executor** configuration from VS Code (see [Python debugging tips](#python-debugging-tips) below for breakpoint locations). + +Alternatively, start it from the terminal: + +```bash +gateway-runtime/python-executor/.venv/bin/python3 \ + gateway-runtime/python-executor/main.py \ + --listen localhost:9010 \ + --log-level debug +``` + +You should see: + +```text +Python Executor starting (listen=localhost:9010, workers=4, ...) +Starting Python Executor on localhost:9010 (mode=tcp) +Loaded policy registry with 1 entries +Loaded policy factory: prompt-compressor:v0 from prompt_compressor_v0.policy +Python Executor ready on localhost:9010 +``` + +### Step 7: Start the Policy Engine + +Run the **Policy Engine - xDS** debug configuration from VS Code. + +The Policy Engine will connect to the Python Executor over TCP when the first Python policy is triggered. You should see: + +```text +Python executor bridge initialized address=localhost:9010 mode=tcp timeout=30s +``` + +### Step 8: Start the Gateway Runtime (Router) + +Run the router in Docker Compose: + +```bash +cd gateway +docker compose up gateway-runtime sample-backend -d +docker compose logs -ft gateway-runtime sample-backend +``` + +### Step 9: Deploy and Test + +```bash +# Deploy an API with a Python policy (e.g., prompt-compressor) +curl -X POST http://localhost:9090/api/management/v0.9/rest-apis \ + -u ":" \ + -H "Content-Type: application/yaml" \ + --data-binary @path/to/api.yaml + +# Send a request that triggers the policy +curl -X POST http://localhost:8080/your-api/chat \ + -H "Content-Type: application/json" \ + -d '{"messages": [{"role": "user", "content": "Your test prompt here"}]}' +``` + +### Step 10: Clean Up + +When you are done debugging: + +1. **Remove the TCP block** from `configs/config.toml`: + +```diff +-[policy_engine.python_executor.server] +-mode = "tcp" +-port = 9010 +-host = "localhost" +``` + +2. **Revert the Docker Compose changes** from Step 4 (restore `GATEWAY_CONTROLLER_HOST` and uncomment Policy Engine ports). + +This ensures `docker compose up` continues to work correctly with UDS mode. + +--- + +## Python Debugging Tips + +### Setting Breakpoints in VS Code + +When using the **Python Executor** launch config, set breakpoints in: + +- `executor/server.py` — gRPC servicer logic (`InitPolicy`, `ExecuteStream`) +- `executor/translator.py` — protobuf ↔ SDK type translation +- Any installed policy module (e.g., `.venv/lib/python3.*/site-packages/prompt_compressor_v0/policy.py`) + +### Debugging with pdb + +For quick terminal-based debugging, add breakpoints directly in policy code: + +```python +# In your policy's on_request_body(): +import pdb; pdb.set_trace() +``` + +--- + +## Quick Reference + +### Port Map + +| Port | Component | Protocol | +|------|-----------|----------| +| 9090 | Gateway Controller REST API | HTTP | +| 18000 | Gateway Controller xDS (Router) | gRPC | +| 18001 | Gateway Controller xDS (Policy Engine) | gRPC | +| 9001 | Policy Engine ext_proc | gRPC (TCP) | +| 9010 | Python Executor | gRPC (TCP) | +| 8080 | Router HTTP ingress | HTTP | +| 8443 | Router HTTPS ingress | HTTPS | +| 9901 | Router (Envoy) Admin | HTTP | +| 15000 | Sample Backend | HTTP | + +### Python Executor Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `PYTHON_EXECUTOR_LISTEN` | UDS socket path | Listen address — UDS path or `host:port` for TCP | +| `PYTHON_POLICY_WORKERS` | 4 | gRPC worker thread count | +| `PYTHON_POLICY_MAX_CONCURRENT` | 100 | Max concurrent policy executions | +| `PYTHON_POLICY_TIMEOUT` | 30 | Execution timeout in seconds | +| `LOG_LEVEL` | info | Log level (debug, info, warn, error) | + +### VS Code Debug Configurations + +All launch configurations live in `.vscode/launch.json`: + +| Configuration | Type | Component | +|---|---|---| +| Gateway Controller | Go (launch) | Controller process | +| Gateway Builder | Go (launch) | Build-time compilation | +| Policy Engine - xDS | Go (launch) | Policy Engine with xDS discovery | +| Policy Engine - File | Go (launch) | Policy Engine with file-based policy chains | +| Python Executor | Python (debugpy) | Python policy runtime | +| Gateway Controller (Remote) | Go (attach) | Remote attach — Option 1 | +| Policy Engine (Remote) | Go (attach) | Remote attach — Option 1 | + +--- + +## Common Issues + +**"Policy factory not found: prompt-compressor:v0.9.0"** +→ The Python Executor uses **major-version keys** (e.g., `prompt-compressor:v0`). This error means the `python_policy_registry.py` file was not regenerated after the builder ran. Re-copy it: +```bash +cp gateway-builder/target/output/python-executor/python_policy_registry.py \ + gateway-runtime/python-executor/python_policy_registry.py +``` + +**"context deadline exceeded" when calling a Python policy** +→ The Policy Engine is trying to connect to the Python Executor but failing. Check: +1. Is the Python Executor actually running? (`ps aux | grep main.py`) +2. Is it listening on the right address? (should show `localhost:9010`) +3. Does your `config.toml` have the `[policy_engine.python_executor.server]` block with `mode = "tcp"`? + +**"bind: address already in use" on port 9010** +→ Kill stale Python Executor processes: `pkill -f "python.*main.py"` + +**Container mode broken after debugging** +→ You likely left `[policy_engine.python_executor.server] mode = "tcp"` in `configs/config.toml`. Remove it — see [Step 10](#step-10-clean-up). diff --git a/gateway/configs/config-template.toml b/gateway/configs/config-template.toml index b3d83ef72..0c9b347c5 100644 --- a/gateway/configs/config-template.toml +++ b/gateway/configs/config-template.toml @@ -243,6 +243,22 @@ format = "text" enabled = true port = 9003 +# ============================================================================= +# PYTHON EXECUTOR CONFIGURATION +# ============================================================================= + +[policy_engine.python_executor] +# Timeout for policy execution +timeout = "30s" + +[policy_engine.python_executor.server] +# Mode: "uds" (Unix Domain Socket) or "tcp" +mode = "uds" +# Port (only used when mode = "tcp") +port = 9010 +# Host (only used when mode = "tcp") +host = "localhost" + # ============================================================================= # ANALYTICS CONFIGURATION # ============================================================================= diff --git a/gateway/gateway-builder/internal/policyengine/generator.go b/gateway/gateway-builder/internal/policyengine/generator.go index ca2fa0c6b..8d63e7631 100644 --- a/gateway/gateway-builder/internal/policyengine/generator.go +++ b/gateway/gateway-builder/internal/policyengine/generator.go @@ -365,6 +365,14 @@ func copyDir(src, dst string) error { return err } + // Skip common virtual environment and cache directories + if info.IsDir() && path != src { + name := info.Name() + if name == ".venv" || name == "venv" || name == "env" || name == "__pycache__" || name == ".git" { + return filepath.SkipDir + } + } + dstPath := filepath.Join(dst, relPath) if info.IsDir() { diff --git a/gateway/gateway-builder/templates/plugin_registry.go.tmpl b/gateway/gateway-builder/templates/plugin_registry.go.tmpl index 8c395cd62..3b37311d0 100644 --- a/gateway/gateway-builder/templates/plugin_registry.go.tmpl +++ b/gateway/gateway-builder/templates/plugin_registry.go.tmpl @@ -73,7 +73,6 @@ func init() { }{{ else }}nil{{ end }}, } bridgeFactory_{{ .ImportAlias }} := &pythonbridge.BridgeFactory{ - StreamManager: pythonbridge.GetStreamManager(), PolicyName: "{{ .Name }}", PolicyVersion: "{{ .Version }}", } diff --git a/gateway/gateway-runtime/docker-entrypoint.sh b/gateway/gateway-runtime/docker-entrypoint.sh index 19c50aa0f..565aa4c81 100644 --- a/gateway/gateway-runtime/docker-entrypoint.sh +++ b/gateway/gateway-runtime/docker-entrypoint.sh @@ -74,6 +74,10 @@ while [[ $# -gt 0 ]]; do log "ERROR: --py.socket override is not supported; socket path is fixed" exit 1 fi + if [[ "$py_arg" == "--py.listen" || "$py_arg" == --py.listen=* ]]; then + log "ERROR: --py.listen override is not supported; listen address is fixed in container mode" + exit 1 + fi PY_ARGS+=("--${py_arg#--py.}") shift if [[ $# -gt 0 && "$1" != --* ]]; then @@ -119,7 +123,7 @@ export XDS_SERVER_PORT="${ROUTER_XDS_PORT}" PE_XDS_SERVER="${GATEWAY_CONTROLLER_HOST}:${POLICY_ENGINE_XDS_PORT}" POLICY_ENGINE_SOCKET="/var/run/api-platform/policy-engine.sock" -PYTHON_EXECUTOR_SOCKET="/var/run/api-platform/python-executor.sock" +export PYTHON_EXECUTOR_SOCKET="/var/run/api-platform/python-executor.sock" log "Starting Gateway Runtime" log " Gateway Controller: ${GATEWAY_CONTROLLER_HOST}" @@ -181,7 +185,8 @@ trap shutdown SIGTERM SIGINT SIGQUIT # python_policy_registry.py is only generated when Python policies exist; if [ -f /app/python-executor/python_policy_registry.py ]; then log "Starting Python Executor..." - python3 /app/python-executor/main.py "${PY_ARGS[@]}" \ + unset PYTHON_EXECUTOR_LISTEN + python3 /app/python-executor/main.py --listen "${PYTHON_EXECUTOR_SOCKET}" "${PY_ARGS[@]}" \ > >(while IFS= read -r line; do echo "[pye] $line"; done) \ 2> >(while IFS= read -r line; do echo "[pye] $line" >&2; done) & PY_PID=$! diff --git a/gateway/gateway-runtime/policy-engine/cmd/policy-engine/main.go b/gateway/gateway-runtime/policy-engine/cmd/policy-engine/main.go index 4473797dc..031f52774 100644 --- a/gateway/gateway-runtime/policy-engine/cmd/policy-engine/main.go +++ b/gateway/gateway-runtime/policy-engine/cmd/policy-engine/main.go @@ -165,6 +165,9 @@ func main() { // Policy registration happens automatically via Builder-generated plugin_registry.go slog.InfoContext(ctx, "Policies registered via Builder-generated code") + // Initialize Python executor bridge from configuration + pythonbridge.Init(cfg.PolicyEngine.PythonExecutor) + // Initialize configuration source based on mode var xdsClient *xdsclient.Client var xdsSyncStatusProvider admin.XDSSyncStatusProvider = noOpXDSSyncStatusProvider{} @@ -239,9 +242,8 @@ func main() { // Start admin HTTP server if enabled var adminServer *admin.Server if cfg.PolicyEngine.Admin.Enabled { - // Check if Python executor is available (socket configured) var pythonHealthChecker admin.PythonHealthChecker - if _, err := os.Stat("/var/run/api-platform/python-executor.sock"); err == nil { + if pythonbridge.IsAvailable(cfg.PolicyEngine.PythonExecutor) { sm := pythonbridge.GetStreamManager() pythonHealthChecker = pythonbridge.NewPythonHealthAdapter(sm) } diff --git a/gateway/gateway-runtime/policy-engine/internal/config/config.go b/gateway/gateway-runtime/policy-engine/internal/config/config.go index 67ca15b35..3fc8cea9b 100644 --- a/gateway/gateway-runtime/policy-engine/internal/config/config.go +++ b/gateway/gateway-runtime/policy-engine/internal/config/config.go @@ -81,6 +81,7 @@ type PolicyEngine struct { XDS XDSConfig `koanf:"xds"` FileConfig FileConfigConfig `koanf:"file_config"` Logging LoggingConfig `koanf:"logging"` + PythonExecutor PythonExecutorConfig `koanf:"python_executor"` // Tracing holds OpenTelemetry exporter configuration TracingServiceName string `koanf:"tracing_service_name"` @@ -135,6 +136,28 @@ type ServerConfig struct { ExtProcPort int `koanf:"extproc_port"` } +// PythonExecutorConfig holds configuration for the Python executor bridge. +// The Policy Engine uses this to connect to the Python executor process. +type PythonExecutorConfig struct { + Server PythonExecutorServerConfig `koanf:"server"` + Timeout time.Duration `koanf:"timeout"` +} + +// PythonExecutorServerConfig holds Python executor connection configuration +type PythonExecutorServerConfig struct { + // Mode is the connection mode: "uds" (default) or "tcp" + Mode string `koanf:"mode"` + + // Port is the TCP port for the Python executor gRPC server (TCP mode only) + Port int `koanf:"port"` + + // Host is the TCP host for the Python executor (TCP mode only, default: "localhost") + Host string `koanf:"host"` + + // Path is the Unix Domain Socket path (UDS mode only) + Path string `koanf:"path"` +} + // AdminConfig holds admin HTTP server configuration type AdminConfig struct { // Enabled indicates whether the admin server should be started @@ -278,7 +301,7 @@ func defaultConfig() *Config { return &Config{ PolicyEngine: PolicyEngine{ Server: ServerConfig{ - Mode: "", // Empty defaults to "uds" + Mode: "", ExtProcPort: 9001, }, Admin: AdminConfig{ @@ -309,6 +332,14 @@ func defaultConfig() *Config { Level: "info", Format: "text", }, + PythonExecutor: PythonExecutorConfig{ + Server: PythonExecutorServerConfig{ + Mode: "", + Port: 9010, + Host: "localhost", + }, + Timeout: 30 * time.Second, + }, TracingServiceName: "policy-engine", }, Analytics: AnalyticsConfig{ @@ -331,7 +362,7 @@ func defaultConfig() *Config { "grpc_request_timeout": 20000000000, }, AccessLogsServiceCfg: AccessLogsServiceConfig{ - Mode: "", // Empty defaults to "uds" + Mode: "", ServerPort: 18090, ShutdownTimeout: 600 * time.Second, PublicKeyPath: "", @@ -356,12 +387,10 @@ func defaultConfig() *Config { // Validate validates the configuration func (c *Config) Validate() error { - // Validate connection mode (same pattern as gateway-controller) + // Validate policy engine connection mode switch c.PolicyEngine.Server.Mode { case "uds", "": - // UDS mode (default) - socket path is a constant, no additional validation needed case "tcp": - // TCP mode - validate port if c.PolicyEngine.Server.ExtProcPort <= 0 || c.PolicyEngine.Server.ExtProcPort > 65535 { return fmt.Errorf("invalid extproc_port: %d (must be 1-65535)", c.PolicyEngine.Server.ExtProcPort) } @@ -369,6 +398,23 @@ func (c *Config) Validate() error { return fmt.Errorf("server.mode must be 'uds' or 'tcp', got: %s", c.PolicyEngine.Server.Mode) } + // Validate python executor config + switch c.PolicyEngine.PythonExecutor.Server.Mode { + case "uds", "": + case "tcp": + if c.PolicyEngine.PythonExecutor.Server.Host == "" { + return fmt.Errorf("invalid policy_engine.python_executor.server.host: must be non-empty when mode = 'tcp'") + } + if c.PolicyEngine.PythonExecutor.Server.Port <= 0 || c.PolicyEngine.PythonExecutor.Server.Port > 65535 { + return fmt.Errorf("invalid policy_engine.python_executor.server.port: %d (must be 1-65535)", c.PolicyEngine.PythonExecutor.Server.Port) + } + default: + return fmt.Errorf("policy_engine.python_executor.server.mode must be 'uds' or 'tcp', got: %s", c.PolicyEngine.PythonExecutor.Server.Mode) + } + if c.PolicyEngine.PythonExecutor.Timeout <= 0 { + return fmt.Errorf("policy_engine.python_executor.timeout must be positive") + } + // Validate admin config if c.PolicyEngine.Admin.Enabled { if c.PolicyEngine.Admin.Port <= 0 || c.PolicyEngine.Admin.Port > 65535 { diff --git a/gateway/gateway-runtime/policy-engine/internal/config/config_test.go b/gateway/gateway-runtime/policy-engine/internal/config/config_test.go index 3860232bf..ecefe3d36 100644 --- a/gateway/gateway-runtime/policy-engine/internal/config/config_test.go +++ b/gateway/gateway-runtime/policy-engine/internal/config/config_test.go @@ -61,6 +61,13 @@ func validConfig() *Config { Level: "info", Format: "json", }, + PythonExecutor: PythonExecutorConfig{ + Server: PythonExecutorServerConfig{ + Port: 9010, + Host: "localhost", + }, + Timeout: 30 * time.Second, + }, }, Analytics: AnalyticsConfig{ Enabled: false, @@ -205,6 +212,106 @@ func TestValidate_ServerMode(t *testing.T) { } } +// TestValidate_PythonExecutorConfig tests validation rules for PythonExecutorConfig +func TestValidate_PythonExecutorConfig(t *testing.T) { + tests := []struct { + name string + mode string + host string + port int + timeout time.Duration + expectErr bool + errMsg string + }{ + { + name: "UDS mode default (empty string)", + mode: "", + timeout: 30 * time.Second, + expectErr: false, + }, + { + name: "UDS mode explicit", + mode: "uds", + timeout: 30 * time.Second, + expectErr: false, + }, + { + name: "TCP mode with valid port and host", + mode: "tcp", + host: "localhost", + port: 9010, + timeout: 30 * time.Second, + expectErr: false, + }, + { + name: "TCP mode with empty host", + mode: "tcp", + host: "", + port: 9010, + timeout: 30 * time.Second, + expectErr: true, + errMsg: "invalid policy_engine.python_executor.server.host: must be non-empty when mode = 'tcp'", + }, + { + name: "TCP mode with invalid port - zero", + mode: "tcp", + host: "localhost", + port: 0, + timeout: 30 * time.Second, + expectErr: true, + errMsg: "invalid policy_engine.python_executor.server.port", + }, + { + name: "TCP mode with invalid port - too high", + mode: "tcp", + host: "localhost", + port: 70000, + timeout: 30 * time.Second, + expectErr: true, + errMsg: "invalid policy_engine.python_executor.server.port", + }, + { + name: "invalid mode", + mode: "invalid", + timeout: 30 * time.Second, + expectErr: true, + errMsg: "policy_engine.python_executor.server.mode must be 'uds' or 'tcp'", + }, + { + name: "invalid timeout - zero", + mode: "uds", + timeout: 0, + expectErr: true, + errMsg: "policy_engine.python_executor.timeout must be positive", + }, + { + name: "invalid timeout - negative", + mode: "uds", + timeout: -1 * time.Second, + expectErr: true, + errMsg: "policy_engine.python_executor.timeout must be positive", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := validConfig() + cfg.PolicyEngine.PythonExecutor.Server.Mode = tt.mode + cfg.PolicyEngine.PythonExecutor.Server.Host = tt.host + cfg.PolicyEngine.PythonExecutor.Server.Port = tt.port + cfg.PolicyEngine.PythonExecutor.Timeout = tt.timeout + + err := cfg.Validate() + if tt.expectErr { + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.errMsg) + } else { + assert.NoError(t, err) + } + }) + } +} + // TestValidate_UDS_PortConflict tests that UDS mode skips port conflict checks func TestValidate_UDS_PortConflict(t *testing.T) { t.Run("UDS mode - admin port conflict with extproc port ignored", func(t *testing.T) { diff --git a/gateway/gateway-runtime/policy-engine/internal/pythonbridge/bridge_test.go b/gateway/gateway-runtime/policy-engine/internal/pythonbridge/bridge_test.go index 3ed17d4ae..40d380d1d 100644 --- a/gateway/gateway-runtime/policy-engine/internal/pythonbridge/bridge_test.go +++ b/gateway/gateway-runtime/policy-engine/internal/pythonbridge/bridge_test.go @@ -175,7 +175,7 @@ func (f *fakePythonExecutorClient) DestroyPolicy(context.Context, *proto.Destroy func TestBridgeCloseReturnsExecutorDestroyFailure(t *testing.T) { harness := startTestPythonExecutorServer(t, nil) - sm := NewStreamManager("bufconn") + sm := NewStreamManager("bufconn", false) sm.dialContext = func(context.Context, string) (net.Conn, error) { return harness.listener.Dial() } diff --git a/gateway/gateway-runtime/policy-engine/internal/pythonbridge/client.go b/gateway/gateway-runtime/policy-engine/internal/pythonbridge/client.go index c59773e1f..8eaa22233 100644 --- a/gateway/gateway-runtime/policy-engine/internal/pythonbridge/client.go +++ b/gateway/gateway-runtime/policy-engine/internal/pythonbridge/client.go @@ -23,6 +23,7 @@ import ( "log/slog" "net" "os" + "strconv" "sync" "sync/atomic" "time" @@ -31,12 +32,14 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" + "github.com/wso2/api-platform/gateway/gateway-runtime/policy-engine/internal/config" "github.com/wso2/api-platform/gateway/gateway-runtime/policy-engine/internal/pythonbridge/proto" ) // StreamManager manages the persistent bidirectional stream to the Python executor. type StreamManager struct { - socketPath string + address string + isTCP bool dialContext func(context.Context, string) (net.Conn, error) conn *grpc.ClientConn client proto.PythonExecutorServiceClient @@ -51,10 +54,13 @@ type StreamManager struct { connID atomic.Uint64 } -// NewStreamManager creates a StreamManager for the given Unix-domain socket. -func NewStreamManager(socketPath string) *StreamManager { +// NewStreamManager creates a StreamManager for the given address. +// If isTCP is true, the address is dialled as TCP (host:port); +// otherwise it is treated as a Unix domain socket path. +func NewStreamManager(address string, isTCP bool) *StreamManager { return &StreamManager{ - socketPath: socketPath, + address: address, + isTCP: isTCP, pendingReqs: make(map[string]chan *proto.StreamResponse), suppressedReqs: make(map[string]struct{}), } @@ -69,7 +75,7 @@ func (sm *StreamManager) Connect(ctx context.Context) error { return nil } - slogger := slog.With("component", "pythonbridge", "socket", sm.socketPath) + slogger := slog.With("component", "pythonbridge", "address", sm.address, "tcp", sm.isTCP) slogger.InfoContext(ctx, "Connecting to Python Executor") if sm.streamCancel != nil { @@ -84,14 +90,21 @@ func (sm *StreamManager) Connect(ctx context.Context) error { dialContext := sm.dialContext if dialContext == nil { - dialContext = func(ctx context.Context, addr string) (net.Conn, error) { - var d net.Dialer - return d.DialContext(ctx, "unix", addr) + if sm.isTCP { + dialContext = func(ctx context.Context, addr string) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, "tcp", addr) + } + } else { + dialContext = func(ctx context.Context, addr string) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, "unix", addr) + } } } conn, err := grpc.DialContext( ctx, - sm.socketPath, + sm.address, grpc.WithContextDialer(dialContext), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), @@ -456,31 +469,81 @@ func errorReason(err error) string { return err.Error() } -// pythonPolicyTimeout is resolved once at package init. -var pythonPolicyTimeout = func() time.Duration { - if s := os.Getenv("PYTHON_POLICY_TIMEOUT"); s != "" { - if d, err := time.ParseDuration(s); err == nil { - return d - } - } - return 30 * time.Second -}() - func getTimeout() time.Duration { - return pythonPolicyTimeout + return pythonExecutorTimeout } +const DefaultSocketPath = "/var/run/api-platform/python-executor.sock" + var ( - globalStreamManager *StreamManager - streamManagerOnce sync.Once + pythonExecutorTimeout = 30 * time.Second + globalStreamManager *StreamManager + streamManagerOnce sync.Once + configuredAddress = DefaultSocketPath + configuredIsTCP = false ) -const pythonExecutorSocketPath = "/var/run/api-platform/python-executor.sock" +// Init configures the Python executor bridge from the loaded configuration. +// Must be called once from main before GetStreamManager is used. +func Init(cfg config.PythonExecutorConfig) { + pythonExecutorTimeout = cfg.Timeout + + address := cfg.Server.Path + if address == "" { + address = DefaultSocketPath + } + isTCP := false + + mode := cfg.Server.Mode + if mode == "" { + mode = "uds" + } + if mode == "tcp" { + host := cfg.Server.Host + if host == "" { + host = "localhost" + } + address = net.JoinHostPort(host, strconv.Itoa(cfg.Server.Port)) + isTCP = true + } + + configuredAddress = address + configuredIsTCP = isTCP + + streamManagerOnce.Do(func() { + globalStreamManager = NewStreamManager(configuredAddress, configuredIsTCP) + }) + + slog.Info("Python executor bridge initialized", + "address", address, + "mode", mode, + "timeout", cfg.Timeout, + ) +} // GetStreamManager returns the singleton StreamManager instance. func GetStreamManager() *StreamManager { streamManagerOnce.Do(func() { - globalStreamManager = NewStreamManager(pythonExecutorSocketPath) + globalStreamManager = NewStreamManager(configuredAddress, configuredIsTCP) }) return globalStreamManager } + +// IsAvailable reports whether a Python executor is configured. +// Returns true if TCP mode is set (always considered available) or the +// configured UDS socket exists on disk. +func IsAvailable(cfg config.PythonExecutorConfig) bool { + mode := cfg.Server.Mode + if mode == "" { + mode = "uds" + } + if mode == "tcp" { + return true + } + address := cfg.Server.Path + if address == "" { + address = DefaultSocketPath + } + _, err := os.Stat(address) + return err == nil +} diff --git a/gateway/gateway-runtime/policy-engine/internal/pythonbridge/client_test.go b/gateway/gateway-runtime/policy-engine/internal/pythonbridge/client_test.go index 3a10d2874..ce97809a7 100644 --- a/gateway/gateway-runtime/policy-engine/internal/pythonbridge/client_test.go +++ b/gateway/gateway-runtime/policy-engine/internal/pythonbridge/client_test.go @@ -174,7 +174,7 @@ func TestStreamManagerExecuteCancellationSuppressesLateResponses(t *testing.T) { }(req.GetRequestId()) }) - sm := NewStreamManager("bufconn") + sm := NewStreamManager("bufconn", false) sm.dialContext = func(context.Context, string) (net.Conn, error) { return harness.listener.Dial() } @@ -214,7 +214,7 @@ func TestStreamManagerReconnectsAfterDisconnect(t *testing.T) { } }) - sm := NewStreamManager("bufconn") + sm := NewStreamManager("bufconn", false) sm.dialContext = func(context.Context, string) (net.Conn, error) { return harness1.listener.Dial() } @@ -264,7 +264,7 @@ func BenchmarkStreamManagerNeedsMoreRoundTrip(b *testing.B) { } }) - sm := NewStreamManager("bufconn") + sm := NewStreamManager("bufconn", false) sm.dialContext = func(context.Context, string) (net.Conn, error) { return harness.listener.Dial() } diff --git a/gateway/gateway-runtime/policy-engine/internal/pythonbridge/factory.go b/gateway/gateway-runtime/policy-engine/internal/pythonbridge/factory.go index 0529a8bd3..88d3441dd 100644 --- a/gateway/gateway-runtime/policy-engine/internal/pythonbridge/factory.go +++ b/gateway/gateway-runtime/policy-engine/internal/pythonbridge/factory.go @@ -37,8 +37,9 @@ type policyCapabilities struct { } // BridgeFactory creates Python bridge instances and validates the executor contract. +// The factory resolves the global StreamManager singleton at GetPolicy call time, +// which is guaranteed to be configured after pythonbridge.Init(cfg) runs in main(). type BridgeFactory struct { - StreamManager *StreamManager PolicyName string PolicyVersion string } @@ -73,7 +74,8 @@ func (f *BridgeFactory) GetPolicy(metadata policy.PolicyMetadata, params map[str ctx, cancel := context.WithTimeout(context.Background(), getTimeout()) defer cancel() - resp, err := f.StreamManager.InitPolicy(ctx, req) + sm := GetStreamManager() + resp, err := sm.InitPolicy(ctx, req) if err != nil { return nil, fmt.Errorf("InitPolicy RPC failed for %s:%s: %w", f.PolicyName, f.PolicyVersion, err) } @@ -102,7 +104,7 @@ func (f *BridgeFactory) GetPolicy(metadata policy.PolicyMetadata, params map[str policyVersion: f.PolicyVersion, mode: mode, metadata: metadata, - streamManager: f.StreamManager, + streamManager: sm, translator: NewTranslator(), slogger: slogger, instanceID: resp.GetInstanceId(), diff --git a/gateway/gateway-runtime/python-executor/executor/server.py b/gateway/gateway-runtime/python-executor/executor/server.py index ed1d4c7d6..385a39312 100644 --- a/gateway/gateway-runtime/python-executor/executor/server.py +++ b/gateway/gateway-runtime/python-executor/executor/server.py @@ -762,12 +762,13 @@ class PythonExecutorServer: def __init__( self, - socket_path: str, + listen_address: str, worker_count: int = 4, max_concurrent: int = 100, timeout: int = 30, ): - self.socket_path = socket_path + self.listen_address = listen_address + self._is_tcp = ":" in listen_address self.worker_count = worker_count self.max_concurrent = max_concurrent self.timeout = timeout @@ -780,7 +781,8 @@ def __init__( async def start(self) -> None: """Start the gRPC server.""" - logger.info("Starting Python Executor on %s", self.socket_path) + mode = "tcp" if self._is_tcp else "uds" + logger.info("Starting Python Executor on %s (mode=%s)", self.listen_address, mode) loaded_count = self._loader.load_policies() logger.info("Loaded %s policy factories", loaded_count) @@ -807,26 +809,33 @@ async def start(self) -> None: ) proto_grpc.add_PythonExecutorServiceServicer_to_server(self._servicer, self.server) - if os.path.exists(self.socket_path): - try: - socket_stat = os.stat(self.socket_path) - if stat.S_ISSOCK(socket_stat.st_mode): - os.remove(self.socket_path) - else: - raise RuntimeError( - f"path exists but is not a socket: {self.socket_path}" - ) - except OSError as exc: - logger.error("Error preparing socket path %s: %s", self.socket_path, exc) - raise + if self._is_tcp: + bind_address = self.listen_address + else: + self._cleanup_stale_socket() + bind_address = f"unix:{self.listen_address}" - if self.server.add_insecure_port(f"unix:{self.socket_path}") == 0: - error_message = f"failed to bind to UNIX domain socket at {self.socket_path}" - logger.error(error_message) - raise RuntimeError(error_message) + if self.server.add_insecure_port(bind_address) == 0: + raise RuntimeError(f"failed to bind to {bind_address}") await self.server.start() - logger.info("Python Executor ready on %s", self.socket_path) + logger.info("Python Executor ready on %s", bind_address) + + def _cleanup_stale_socket(self) -> None: + """Remove a leftover UDS socket file from a previous run.""" + if not os.path.exists(self.listen_address): + return + try: + socket_stat = os.stat(self.listen_address) + if stat.S_ISSOCK(socket_stat.st_mode): + os.remove(self.listen_address) + else: + raise RuntimeError( + f"path exists but is not a socket: {self.listen_address}" + ) + except OSError as exc: + logger.error("Error preparing socket path %s: %s", self.listen_address, exc) + raise async def wait_for_termination(self) -> None: if self.server: diff --git a/gateway/gateway-runtime/python-executor/main.py b/gateway/gateway-runtime/python-executor/main.py index 1467593b9..36c230a2b 100644 --- a/gateway/gateway-runtime/python-executor/main.py +++ b/gateway/gateway-runtime/python-executor/main.py @@ -15,8 +15,9 @@ """Python Executor entry point. -Starts the gRPC server on UDS, loads all registered Python policies, -and serves ExecuteStream RPCs from the Go Policy Engine. +Starts the gRPC server on a Unix domain socket (default) or TCP port, +loads all registered Python policies, and serves ExecuteStream RPCs +from the Go Policy Engine. """ import argparse @@ -29,7 +30,7 @@ from executor.server import PythonExecutorServer -PYTHON_EXECUTOR_SOCKET = "/var/run/api-platform/python-executor.sock" +DEFAULT_LISTEN_ADDRESS = "/var/run/api-platform/python-executor.sock" def positive_int(value): @@ -69,6 +70,11 @@ def _resolve_positive_int( return fallback parser = argparse.ArgumentParser(description="Python Executor gRPC server") + parser.add_argument( + "--listen", + default=os.environ.get("PYTHON_EXECUTOR_LISTEN", DEFAULT_LISTEN_ADDRESS), + help="Listen address — UDS path or host:port for TCP (env: PYTHON_EXECUTOR_LISTEN, default: %(default)s)", + ) parser.add_argument( "--workers", default=os.environ.get("PYTHON_POLICY_WORKERS"), @@ -141,14 +147,18 @@ async def main(): setup_logging() logger = logging.getLogger(__name__) - socket_path = PYTHON_EXECUTOR_SOCKET + listen_address = args.listen worker_count = args.workers max_concurrent = args.max_concurrent timeout = args.timeout - logger.info(f"Python Executor starting (workers={worker_count}, max_concurrent={max_concurrent}, timeout={timeout}s, log_level={LOG_LEVEL})") + logger.info( + "Python Executor starting (listen=%s, workers=%s, " + "max_concurrent=%s, timeout=%ss, log_level=%s)", + listen_address, worker_count, max_concurrent, timeout, LOG_LEVEL, + ) - server = PythonExecutorServer(socket_path, worker_count, max_concurrent, timeout) + server = PythonExecutorServer(listen_address, worker_count, max_concurrent, timeout) # Graceful shutdown on SIGTERM/SIGINT loop = asyncio.get_event_loop()