diff --git a/.github/workflows/e2e-master.yaml b/.github/workflows/e2e-master.yaml index 5819a0d796..1cf412742a 100644 --- a/.github/workflows/e2e-master.yaml +++ b/.github/workflows/e2e-master.yaml @@ -24,7 +24,7 @@ jobs: cluster_name: kubernetes-python-e2e-master-${{ matrix.python-version }} # The kind version to be used to spin the cluster up # this needs to be updated whenever a new Kind version is released - version: v0.17.0 + version: v0.31.0 # Update the config here whenever a new client snapshot is performed # This would eventually point to cluster with the latest Kubernetes version # as we sync with Kubernetes upstream diff --git a/.github/workflows/e2e-release-35.0.yaml b/.github/workflows/e2e-release-35.0.yaml index d85313c776..44c0fc5862 100644 --- a/.github/workflows/e2e-release-35.0.yaml +++ b/.github/workflows/e2e-release-35.0.yaml @@ -24,7 +24,7 @@ jobs: cluster_name: kubernetes-python-e2e-release-35.0-${{ matrix.python-version }} # The kind version to be used to spin the cluster up # this needs to be updated whenever a new Kind version is released - version: v0.17.0 + version: v0.31.0 # Update the config here whenever a new client snapshot is performed # This would eventually point to cluster with the latest Kubernetes version # as we sync with Kubernetes upstream diff --git a/kubernetes/base/stream/ws_client.py b/kubernetes/base/stream/ws_client.py index 44b6123325..27191fa93b 100644 --- a/kubernetes/base/stream/ws_client.py +++ b/kubernetes/base/stream/ws_client.py @@ -39,6 +39,10 @@ STDERR_CHANNEL = 2 ERROR_CHANNEL = 3 RESIZE_CHANNEL = 4 +CLOSE_CHANNEL = 255 + +V4_CHANNEL_PROTOCOL = "v4.channel.k8s.io" +V5_CHANNEL_PROTOCOL = "v5.channel.k8s.io" class _IgnoredIO: def write(self, _x): @@ -59,6 +63,8 @@ def __init__(self, configuration, url, headers, capture_all, binary=False): """ self._connected = False self._channels = {} + self._closed_channels = set() + self.subprotocol = None self.binary = binary self.newline = '\n' if not self.binary else b'\n' if capture_all: @@ -66,19 +72,31 @@ def __init__(self, configuration, url, headers, capture_all, binary=False): else: self._all = _IgnoredIO() self.sock = create_websocket(configuration, url, headers) + self.subprotocol = getattr(self.sock, 'subprotocol', None) + if not self.subprotocol and self.sock: + headers_dict = self.sock.getheaders() + if headers_dict: + for k, v in headers_dict.items(): + if k.lower() == 'sec-websocket-protocol': + self.subprotocol = v + break self._connected = True self._returncode = None def peek_channel(self, channel, timeout=0): """Peek a channel and return part of the input, empty string otherwise.""" + if channel in self._closed_channels and channel not in self._channels: + return b"" if self.binary else "" self.update(timeout=timeout) if channel in self._channels: return self._channels[channel] - return "" + return b"" if self.binary else "" def read_channel(self, channel, timeout=0): """Read data from a channel.""" + if channel in self._closed_channels and channel not in self._channels: + return b"" if self.binary else "" if channel not in self._channels: ret = self.peek_channel(channel, timeout) else: @@ -93,6 +111,7 @@ def readline_channel(self, channel, timeout=None): timeout = float("inf") start = time.time() while self.is_open() and time.time() - start < timeout: + # Always try to drain the channel first if channel in self._channels: data = self._channels[channel] if self.newline in data: @@ -104,6 +123,14 @@ def readline_channel(self, channel, timeout=None): else: del self._channels[channel] return ret + + if channel in self._closed_channels: + if channel in self._channels: + ret = self._channels[channel] + del self._channels[channel] + return ret + return b"" if self.binary else "" + self.update(timeout=(timeout - time.time() + start)) def write_channel(self, channel, data): @@ -119,6 +146,14 @@ def write_channel(self, channel, data): payload = channel_prefix + data self.sock.send(payload, opcode=opcode) + def close_channel(self, channel): + """Close a channel (v5 protocol only).""" + if self.subprotocol != V5_CHANNEL_PROTOCOL: + return + data = bytes([CLOSE_CHANNEL, channel]) + self.sock.send(data, opcode=ABNF.OPCODE_BINARY) + self._closed_channels.add(channel) + def peek_stdout(self, timeout=0): """Same as peek_channel with channel=1.""" return self.peek_channel(STDOUT_CHANNEL, timeout=timeout) @@ -200,13 +235,24 @@ def update(self, timeout=0): return elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT: data = frame.data - if six.PY3 and not self.binary: - data = data.decode("utf-8", "replace") - if len(data) > 1: + if len(data) > 0: + # Parse channel from raw bytes to support v5 CLOSE signal AND avoid charset issues channel = data[0] - if six.PY3 and not self.binary: - channel = ord(channel) + # In Py3, iterating bytes gives int, but indexing bytes gives int. + # websocket-client frame.data might be bytes. + + if channel == CLOSE_CHANNEL and self.subprotocol == V5_CHANNEL_PROTOCOL: # v5 CLOSE + if len(data) > 1: + # data[1] is already int in Py3 bytes + close_chan = data[1] + self._closed_channels.add(close_chan) + return + data = data[1:] + # Decode data if expected text + if not self.binary: + data = data.decode("utf-8", "replace") + if data: if channel in [STDOUT_CHANNEL, STDERR_CHANNEL]: # keeping all messages in the order they received @@ -476,7 +522,7 @@ def create_websocket(configuration, url, headers=None): header.append("sec-websocket-protocol: %s" % headers['sec-websocket-protocol']) else: - header.append("sec-websocket-protocol: v4.channel.k8s.io") + header.append("sec-websocket-protocol: %s,%s" % (V5_CHANNEL_PROTOCOL, V4_CHANNEL_PROTOCOL)) if url.startswith('wss://') and configuration.verify_ssl: ssl_opts = { diff --git a/kubernetes/base/stream/ws_client_test.py b/kubernetes/base/stream/ws_client_test.py index 3f8c022874..2785a831fb 100644 --- a/kubernetes/base/stream/ws_client_test.py +++ b/kubernetes/base/stream/ws_client_test.py @@ -13,8 +13,10 @@ # limitations under the License. import unittest +from unittest.mock import MagicMock, patch -from .ws_client import get_websocket_url +from . import ws_client as ws_client_module +from .ws_client import get_websocket_url, WSClient, V5_CHANNEL_PROTOCOL, V4_CHANNEL_PROTOCOL, CLOSE_CHANNEL, STDIN_CHANNEL from .ws_client import websocket_proxycare from kubernetes.client.configuration import Configuration import os @@ -22,6 +24,7 @@ import threading import pytest from kubernetes import stream, client, config +import websocket try: import urllib3 @@ -123,6 +126,224 @@ def test_websocket_proxycare(self): assert dictval(connect_opts, 'http_proxy_auth') == expect_auth assert dictval(connect_opts, 'http_no_proxy') == expect_noproxy + +class WSClientProtocolTest(unittest.TestCase): + """Tests for WSClient V5 protocol handling""" + + def setUp(self): + # Mock configuration to avoid real connections in WSClient.__init__ + self.config_mock = MagicMock() + self.config_mock.assert_hostname = False + self.config_mock.api_key = {} + self.config_mock.proxy = None + self.config_mock.ssl_ca_cert = None + self.config_mock.cert_file = None + self.config_mock.key_file = None + self.config_mock.verify_ssl = True + + def test_create_websocket_header(self): + """Verify sec-websocket-protocol header requests v5 first""" + # Patch WebSocket class in the module + with patch.object(ws_client_module, 'WebSocket', autospec=True) as mock_ws_cls: + mock_ws = mock_ws_cls.return_value + + WSClient(self.config_mock, "ws://test", headers=None, capture_all=True) + + mock_ws.connect.assert_called_once() + call_args = mock_ws.connect.call_args + # connect(url, **options) + # check kwargs for 'header' + kwargs = call_args[1] + self.assertIn('header', kwargs) + expected_header = f"sec-websocket-protocol: {V5_CHANNEL_PROTOCOL},{V4_CHANNEL_PROTOCOL}" + self.assertIn(expected_header, kwargs['header']) + + def test_close_channel_v5(self): + """Verify close_channel sends correct frame when v5 is negotiated""" + with patch.object(ws_client_module, 'create_websocket') as mock_create: + mock_ws = MagicMock() + mock_ws.subprotocol = V5_CHANNEL_PROTOCOL + mock_ws.connected = True + mock_create.return_value = mock_ws + + client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True) + client.close_channel(0) + + mock_ws.send.assert_called_with(bytes([CLOSE_CHANNEL, STDIN_CHANNEL]), opcode=websocket.ABNF.OPCODE_BINARY) + + def test_close_channel_v4(self): + """Verify close_channel does nothing when v4 is negotiated""" + with patch.object(ws_client_module, 'create_websocket') as mock_create: + mock_ws = MagicMock() + mock_ws.subprotocol = V4_CHANNEL_PROTOCOL + mock_ws.connected = True + mock_create.return_value = mock_ws + + client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True) + client.close_channel(0) + + mock_ws.send.assert_not_called() + + def test_update_receives_close_v5(self): + """Verify update processes close signal when v5 is negotiated""" + with patch.object(ws_client_module, 'create_websocket') as mock_create, \ + patch('select.select') as mock_select: + + mock_ws = MagicMock() + mock_ws.subprotocol = V5_CHANNEL_PROTOCOL + mock_ws.connected = True + mock_ws.sock.fileno.return_value = 10 + + # Setup frame with close signal for channel 0 + frame = MagicMock() + frame.data = bytes([CLOSE_CHANNEL, STDIN_CHANNEL]) + mock_ws.recv_data_frame.return_value = (websocket.ABNF.OPCODE_BINARY, frame) + + mock_create.return_value = mock_ws + # Make select return ready + mock_select.return_value = ([mock_ws.sock], [], []) + + client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True) + client.update(timeout=0) + + self.assertIn(0, client._closed_channels) + + def test_update_ignores_close_signal_v4(self): + """Verify update treats 0xFF as regular data (or ignores signal interpretation) when v4""" + with patch.object(ws_client_module, 'create_websocket') as mock_create, \ + patch('select.select') as mock_select: + + mock_ws = MagicMock() + mock_ws.subprotocol = V4_CHANNEL_PROTOCOL + mock_ws.connected = True + mock_ws.sock.fileno.return_value = 10 + + # Setup frame that looks like close signal but should be treated as data + frame = MagicMock() + frame.data = bytes([CLOSE_CHANNEL, STDIN_CHANNEL]) + mock_ws.recv_data_frame.return_value = (websocket.ABNF.OPCODE_BINARY, frame) + + mock_create.return_value = mock_ws + mock_select.return_value = ([mock_ws.sock], [], []) + + client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=True) # binary=True to avoid decode errors + client.update(timeout=0) + + # Should NOT be in closed channels + self.assertNotIn(0, client._closed_channels) + # Should be in data channels (channel 255 with data \x00) + # Code: channel = data[0] (255), data = data[1:] (\x00) + # if channel (255) not in _channels... + self.assertIn(255, client._channels) + self.assertEqual(client._channels[255], b'\x00') + + def test_readline_channel_closed_with_leftover_data(self): + """Verify readline_channel flushes remaining buffer when channel is closed""" + with patch.object(ws_client_module, 'create_websocket') as mock_create: + mock_ws = MagicMock() + mock_ws.subprotocol = V5_CHANNEL_PROTOCOL + mock_ws.connected = True + mock_create.return_value = mock_ws + + client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=False) + + # Simulate some data in the channel buffer, and then close it + client._channels[1] = "hello" + client._closed_channels.add(1) + + # First call to readline should flush "hello" even though there is no newline + line1 = client.readline_channel(1) + self.assertEqual(line1, "hello") + + # Subsequent call should return empty string + line2 = client.readline_channel(1) + self.assertEqual(line2, "") + + def test_readline_channel_closed_with_leftover_data_binary(self): + """Verify readline_channel flushes remaining buffer when channel is closed in binary mode""" + with patch.object(ws_client_module, 'create_websocket') as mock_create: + mock_ws = MagicMock() + mock_ws.subprotocol = V5_CHANNEL_PROTOCOL + mock_ws.connected = True + mock_create.return_value = mock_ws + + client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=True) + + # Simulate some bytes in the channel buffer, and then close it + client._channels[1] = b"hello-binary" + client._closed_channels.add(1) + + # First call to readline should flush leftover bytes + line1 = client.readline_channel(1) + self.assertEqual(line1, b"hello-binary") + + # Subsequent call should return empty bytes + line2 = client.readline_channel(1) + self.assertEqual(line2, b"") + + def test_read_channel_closed_with_leftover_data(self): + """Verify read_channel drains leftover data and then short-circuits on closed channel""" + with patch.object(ws_client_module, 'create_websocket') as mock_create: + mock_ws = MagicMock() + mock_ws.subprotocol = V5_CHANNEL_PROTOCOL + mock_ws.connected = True + mock_ws.sock.fileno.return_value = 10 + mock_create.return_value = mock_ws + + client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=False) + + # Simulate leftover data and closed channel + client._channels[1] = "hello" + client._closed_channels.add(1) + + # First call should drain data + data1 = client.read_channel(1) + self.assertEqual(data1, "hello") + + # Subsequent call should short-circuit and return empty string + # Patch `update` to assert it is NOT called (short-circuit) + with patch.object(client, 'update') as mock_update: + data2 = client.read_channel(1) + self.assertEqual(data2, "") + mock_update.assert_not_called() + + def test_peek_channel_closed_with_leftover_data(self): + """Verify peek_channel allows peeking leftover data and then short-circuits after draining""" + with patch.object(ws_client_module, 'create_websocket') as mock_create, \ + patch('select.poll') as mock_poll: + mock_poll.return_value.poll.return_value = [] + mock_ws = MagicMock() + mock_ws.subprotocol = V5_CHANNEL_PROTOCOL + mock_ws.connected = True + mock_ws.sock.fileno.return_value = 10 + mock_create.return_value = mock_ws + + client = WSClient(self.config_mock, "ws://test", headers=None, capture_all=True, binary=False) + + # Simulate leftover data and closed channel + client._channels[1] = "hello" + client._closed_channels.add(1) + + # First peek should return data without draining + data1 = client.peek_channel(1) + self.assertEqual(data1, "hello") + + # Second peek should still return data + data2 = client.peek_channel(1) + self.assertEqual(data2, "hello") + + # Drain it + client.read_channel(1) + + # Now peek should short-circuit and return empty string + # Patch `update` to assert it is NOT called (short-circuit) + with patch.object(client, 'update') as mock_update: + data3 = client.peek_channel(1) + self.assertEqual(data3, "") + mock_update.assert_not_called() + + + @pytest.fixture(scope="module") def dummy_proxy(): #Dummy Proxy diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 44fbe49141..71926295cc 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -93,6 +93,21 @@ def __init__(self, return_type=None): def stop(self): self._stop = True + if hasattr(self, '_resp') and self._resp: + import socket + try: + # Python SSL/socket GIL Workaround: Force-shutdown the raw socket under HTTP/1.1 + # to immediately unblock the background thread blocked in CPython's ssl.read() recv_into + # call. This avoids deadlock where close() hangs waiting for SSL socket locks held by + # the blocked read call. The actual response/connection closing is handled in the finally + # block when the stream loop exits. + conn = getattr(self._resp, 'connection', None) + sock = getattr(conn, 'sock', None) if conn else None + if sock: + sock.shutdown(socket.SHUT_RDWR) + except Exception: + pass + def get_return_type(self, func): if self._raw_return_type: @@ -189,6 +204,7 @@ def stream(self, func, *args, **kwargs): deserialize = kwargs.pop('deserialize', True) while True: resp = func(*args, **kwargs) + self._resp = resp try: for line in iter_resp_lines(resp): # unmarshal when we are receiving events from watch, @@ -226,6 +242,7 @@ def stream(self, func, *args, **kwargs): finally: resp.close() resp.release_conn() + self._resp = None if self.resource_version is not None: kwargs['resource_version'] = self.resource_version else: diff --git a/kubernetes/e2e_test/test_client.py b/kubernetes/e2e_test/test_client.py index 15689291e5..6d2972f2bc 100644 --- a/kubernetes/e2e_test/test_client.py +++ b/kubernetes/e2e_test/test_client.py @@ -201,6 +201,60 @@ def test_pod_apis(self): resp = api.delete_namespaced_pod(name=name, body={}, namespace='default') + def test_pod_exec_close_channel(self): + """Test sending CLOSE signal for a channel (v5 protocol).""" + client = api_client.ApiClient(configuration=self.config) + api = core_v1_api.CoreV1Api(client) + + name = 'busybox-test-' + short_uuid() + pod_manifest = manifest_with_command( + name, "while true;do date;sleep 5; done") + + resp = api.create_namespaced_pod(body=pod_manifest, namespace='default') + self.assertEqual(name, resp.metadata.name) + + # Wait for pod to be running + timeout = time.time() + 60 + while True: + resp = api.read_namespaced_pod(name=name, namespace='default') + if resp.status.phase == 'Running': + break + if time.time() > timeout: + self.fail("Timeout waiting for pod to be running") + time.sleep(1) + + # Use cat to echo stdin to stdout. + # When stdin is closed, cat should exit, terminating the command. + resp = stream(api.connect_post_namespaced_pod_exec, name, 'default', + command=['/bin/sh', '-c', 'cat'], + stderr=True, stdin=True, + stdout=True, tty=False, + _preload_content=False) + + if resp.subprotocol != "v5.channel.k8s.io": + resp.close() + api.delete_namespaced_pod(name=name, body={}, namespace='default') + self.skipTest("Skipping test: v5.channel.k8s.io subprotocol not negotiated") + + try: + resp.write_stdin("test-close\n") + line = resp.readline_stdout(timeout=5) + self.assertEqual("test-close", line) + + # Close stdin (channel 0) + # This should send EOF to cat, causing it to exit. + resp.close_channel(0) + + # Wait for process to exit + resp.run_forever(timeout=15) + + self.assertFalse(resp.is_open(), "Connection should close after cat exits") + self.assertEqual(resp.returncode, 0) + finally: + if resp.is_open(): + resp.close() + api.delete_namespaced_pod(name=name, body={}, namespace='default') + def test_exit_code(self): client = api_client.ApiClient(configuration=self.config) api = core_v1_api.CoreV1Api(client)