Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions jupyter_server_documents/rooms/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,14 @@ async def handle_sync(self, client_id: str, ss1_message: bytes) -> None:
# Mark client as desynced
self.clients.mark_desynced(client_id)

# Save the server's state vector before the handshake. After the
# handshake completes, get_update(pre_sync_sv) produces a single
# batched diff covering any mutations that occurred during the
# handshake gap. This replaces replaying individual queued messages,
# which triggers a pycrdt offset encoding bug (#197) where
# incremental Text updates after multi-byte characters crash JS yjs.
pre_sync_sv = self._ydoc.get_state()

# Check if client has divergent history
ss1_payload = ss1_message[1:]
divergent = self._has_divergent_history(ss1_payload)
Comment on lines +680 to 690
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are calling self._ydoc.get_state() twice - once here and another in self._has_divergent_history().

To avoid this duplicate computation, could we have _has_divergent_history() accept the server state vector (in bytes) as an additional argument? e.g. client_ss1 and server_sv_bytes.

Expand Down Expand Up @@ -728,13 +736,18 @@ async def handle_sync(self, client_id: str, ss1_message: bytes) -> None:
self.log.exception("Exception raised during sync handshake with client '%s' in room '%s':", client_id, self.room_id)
handshake_failed = True

# Restore the YDoc source, flush all document updates queued in the
# update_buffer, and resume saving, regardless of whether the handshake
# Restore the YDoc source, flush the update_buffer with a batched
# catchup diff, and resume saving, regardless of whether the handshake
# succeeded.
if saved_source is not None and self._jupyter_ydoc is not None:
self._jupyter_ydoc.source = saved_source
self.log.info("Restored YDoc source.")
self.update_buffer.resume()
catchup = self._ydoc.get_update(pre_sync_sv)
# An empty yjs update is 2 bytes (b"\x00\x00").
catchup_message = None
if catchup and len(catchup) > 2:
catchup_message = pycrdt.create_update_message(catchup)
self.update_buffer.resume(catchup_message=catchup_message)
Comment on lines +745 to +750
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the above suggestion, we can delegate this to self.update_buffer (which we can rename to self.update_channel), e.g.

Suggested change
catchup = self._ydoc.get_update(pre_sync_sv)
# An empty yjs update is 2 bytes (b"\x00\x00").
catchup_message = None
if catchup and len(catchup) > 2:
catchup_message = pycrdt.create_update_message(catchup)
self.update_buffer.resume(catchup_message=catchup_message)
self.update_channel.resume(pre_sync_sv=pre_sync_sv)

if self.file_api is not None:
self.file_api._reloading_content = False

Expand Down
17 changes: 12 additions & 5 deletions jupyter_server_documents/rooms/yroom_update_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,20 @@ def pause(self) -> None:
"""Start queuing updates instead of broadcasting them."""
self._paused = True

def resume(self) -> None:
"""Broadcast all queued updates and unpause."""
queued = self._queue
def resume(self, catchup_message: bytes | None = None) -> None:
"""Discard queued updates and unpause. If catchup_message is provided,
broadcast it as a single batched update instead of replaying individual
queued messages.

Batching avoids a pycrdt offset encoding bug
(jupyter-ai-contrib/jupyter-server-documents#197) where individual
incremental Text updates after multi-byte characters crash JS yjs
clients with findIndexSS "Unexpected case".
"""
self._queue = []
self._paused = False
for message in queued:
self._broadcast(message)
if catchup_message is not None:
self._broadcast(catchup_message)
Comment on lines +44 to +57
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we no longer need to store the update messages at all, since we always apply incoming YDoc updates to the YDoc so they will always be included with the catchup message. Also this method does nothing if catchup_message is None, which is expected. Perhaps we can require the pre-sync server state vector as an argument here, and compute the catch up message in the implementation?

The new interface would look like:

def resume(self, pre_sync_sv: bytes) -> None:

Then, we should rename this class to YRoomUpdateChannel or something similar that doesn't imply it's a buffer, but instead a broadcast channel that can be turned on or off. Not in scope for this PR, can be done in the future.


def _broadcast(self, message: bytes) -> None:
"""Send a message to all synced clients."""
Expand Down
Loading