Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions jupyter_server_documents/tests/test_yroom_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
3. Divergent client handshake resolves content duplication.
4. Timeout fires if client never sends SS2.
5. Update buffer pauses/resumes correctly during divergent handshake.
6. No data loss when mutations occur during the sync handshake.
"""

from __future__ import annotations
Expand Down Expand Up @@ -312,3 +313,167 @@ async def test_fresh_then_divergent_client(self, make_yroom: MakeYRoom):

assert jupyter_ydoc.source == "hello world"
assert ws2.source == "hello world"


async def _complete_handshake(yroom: YRoom, ws: FakeWebSocket) -> str:
"""Helper: add a FakeWebSocket client and complete the full sync handshake.
Returns the client_id."""
client_id = yroom.clients.add(ws)
yroom.add_message(client_id, ws.build_ss1())
await asyncio.sleep(0.1)
ss2_reply = ws.process_server_messages()
assert ss2_reply is not None, "Server did not send SS1 (no SS2 reply generated)"
yroom.add_message(client_id, ss2_reply)
await asyncio.sleep(0.1)
return client_id


class TestSyncHandshakeStress:
"""
Stress tests for data integrity when mutations occur during the sync
handshake.

These reproduce the scenario from jupyter-ai-contrib/jupyter-server-documents#197
where an AI agent rapidly adds content via MCP tool calls while a second
browser tab connects. Mutations that occur while a client is completing
the handshake must not be lost.
"""

@pytest.mark.asyncio
async def test_mutations_before_handshake_not_lost(self, make_yroom: MakeYRoom):
"""Mutations between client connect and handshake must be received.

Simulates: AI agent adds 20 lines while a second tab is connecting.
"""
yroom = await make_yroom()
jupyter_ydoc = await yroom.get_jupyter_ydoc()

# Sync client A (first browser tab)
ws_a = FakeWebSocket()
await _complete_handshake(yroom, ws_a)

# Client B connects (second browser tab) — starts as desynced
ws_b = FakeWebSocket()
cid_b = yroom.clients.add(ws_b)

# While B is desynced, AI agent rapidly mutates the doc
expected = ""
for i in range(20):
expected += f"AI added line {i}\n"
jupyter_ydoc.source = expected

# Complete B's handshake
yroom.add_message(cid_b, ws_b.build_ss1())
await asyncio.sleep(0.1)
ss2_reply = ws_b.process_server_messages()
assert ss2_reply is not None
yroom.add_message(cid_b, ss2_reply)
await asyncio.sleep(0.1)

# B must have the full content — no data loss
assert ws_b.source == expected

@pytest.mark.asyncio
async def test_mutations_during_handshake_await(self, make_yroom: MakeYRoom):
"""Mutations during the SS2 reply await must be received.

Simulates: AI agent adds content while the server is waiting for the
client's SS2 reply (the async gap in handle_sync).
"""
yroom = await make_yroom()
jupyter_ydoc = await yroom.get_jupyter_ydoc()
jupyter_ydoc.source = "initial"

# Sync client A
ws_a = FakeWebSocket()
await _complete_handshake(yroom, ws_a)

# Client B starts handshake
ws_b = FakeWebSocket()
cid_b = yroom.clients.add(ws_b)
yroom.add_message(cid_b, ws_b.build_ss1())
await asyncio.sleep(0.1)
# handle_sync is now awaiting B's SS2 reply

# Mutate doc while handle_sync is awaiting
jupyter_ydoc.source = "initial\nmutated during handshake"

# Complete B's handshake
ss2_reply = ws_b.process_server_messages()
assert ss2_reply is not None
yroom.add_message(cid_b, ss2_reply)
await asyncio.sleep(0.1)

assert ws_b.source == "initial\nmutated during handshake"

@pytest.mark.asyncio
async def test_no_exception_during_concurrent_handshakes(self, make_yroom: MakeYRoom):
"""Multiple clients handshaking while doc is mutated must not crash."""
yroom = await make_yroom()
jupyter_ydoc = await yroom.get_jupyter_ydoc()
jupyter_ydoc.source = "initial"

# Sync client A
ws_a = FakeWebSocket()
await _complete_handshake(yroom, ws_a)

# Connect 5 desynced clients
desynced = []
for _ in range(5):
ws = FakeWebSocket()
cid = yroom.clients.add(ws)
desynced.append((ws, cid))

# Rapid mutations while all 5 are desynced
for i in range(50):
jupyter_ydoc.source = f"mutation {i}"

# Sync all clients sequentially — no exceptions should be raised
for ws, cid in desynced:
yroom.add_message(cid, ws.build_ss1())
await asyncio.sleep(0.1)
ss2_reply = ws.process_server_messages()
assert ss2_reply is not None
yroom.add_message(cid, ss2_reply)
await asyncio.sleep(0.1)

# All must have the final state
for ws, _ in desynced:
assert ws.source == "mutation 49"

@pytest.mark.asyncio
@pytest.mark.parametrize("num_mutations", [10, 50, 100])
@pytest.mark.parametrize("num_clients", [2, 5])
async def test_concurrent_mutations_stress(
self, make_yroom: MakeYRoom, num_mutations: int, num_clients: int
):
"""N clients connect while the doc undergoes M mutations.
All clients must converge to the same final state."""
yroom = await make_yroom()
jupyter_ydoc = await yroom.get_jupyter_ydoc()

# Connect N desynced clients
clients = []
for _ in range(num_clients):
ws = FakeWebSocket()
cid = yroom.clients.add(ws)
clients.append((ws, cid))

# M mutations while all clients are desynced
expected = ""
for i in range(num_mutations):
expected += f"line {i}\n"
jupyter_ydoc.source = expected

# Sync all clients
for ws, cid in clients:
yroom.add_message(cid, ws.build_ss1())
await asyncio.sleep(0.1)
ss2_reply = ws.process_server_messages()
assert ss2_reply is not None
yroom.add_message(cid, ss2_reply)
await asyncio.sleep(0.1)

# All must have the final content
for ws, _ in clients:
assert ws.source == expected
Loading