fix: batched catchup in YRoomUpdateBuffer.resume()#218
Conversation
d4f25f9 to
8e0f98f
Compare
|
@xrl Thank you for opening this PR! Really appreciate your interest in this package. 🤗 We just opened a PR that does something similar in #215. Although it was motivated by an issue different from the one you're reporting, I believe the implementation is similar. In #215 we are introducing a mechanism to make clients sync one at a time, queuing updates while a client is syncing, and then flushing the queue of updates after the client completed syncing. However it does not implement the "batched catchup diff" approach where you merge all of the updates into a single message, which you discovered. Great find! Currently we plan to merge #215 ASAP to unblock the JSD v0.2.0 release, which will deploy the memory management feature that admins really need in prod deployments. Could you help port over your changes to the new |
Port the batched catchup approach onto the YRoomUpdateBuffer class introduced in jupyter-ai-contrib#215. Instead of replaying individual queued messages in resume(), compute a single batched diff from the pre-handshake state vector via ydoc.get_update(pre_sync_sv). This fixes two issues: 1. Silent data loss (jupyter-ai-contrib#197): updates queued during a divergent handshake are now delivered as one clean diff that covers the full gap. 2. findIndexSS crash (jupyter-ai-contrib#197): individual incremental Text updates from pycrdt after multi-byte characters use offset encoding that JS yjs cannot resolve. A single batched update keeps all CRDT struct references resolvable within the same message. Changes: - yroom.py: save pre_sync_sv before handle_sync handshake, compute catchup diff after handshake, pass to resume() - yroom_update_buffer.py: resume() accepts optional catchup_message and broadcasts it instead of replaying individual queued messages Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
a84356d to
c9d9587
Compare
|
@dlqqq Thanks for the review and for merging #215! I've rebased this PR onto main (absorbing #215) and reworked the implementation to use
Stress tests are in a separate PR: #219 The diff is now just 2 files changed, +28/-8 lines. |
|
Thanks for working on this & for rebasing the PR! We will definitely give this a review and merge following JSD v0.2.0 release. |
There was a problem hiding this comment.
Thank you for opening this PR! The catchup now happens instantly all-at-once, instead of in rapid sequential updates. This is an innovative approach. Tested it locally and verified it works well, even better than our previous experience. 🎉
This change allows for broader simplifications in our existing code, which I documented as suggestions below. You can work on them in the same PR, or do it later in a new one. Totally up to you, let me know your thoughts before I merge (just to make sure you see them 😄).
| # Save the server's state vector before the handshake. After the | ||
| # handshake completes, get_update(pre_sync_sv) produces a single | ||
| # batched diff covering any mutations that occurred during the | ||
| # handshake gap. This replaces replaying individual queued messages, | ||
| # which triggers a pycrdt offset encoding bug (#197) where | ||
| # incremental Text updates after multi-byte characters crash JS yjs. | ||
| pre_sync_sv = self._ydoc.get_state() | ||
|
|
||
| # Check if client has divergent history | ||
| ss1_payload = ss1_message[1:] | ||
| divergent = self._has_divergent_history(ss1_payload) |
There was a problem hiding this comment.
We are calling self._ydoc.get_state() twice - once here and another in self._has_divergent_history().
To avoid this duplicate computation, could we have _has_divergent_history() accept the server state vector (in bytes) as an additional argument? e.g. client_ss1 and server_sv_bytes.
| def resume(self, catchup_message: bytes | None = None) -> None: | ||
| """Discard queued updates and unpause. If catchup_message is provided, | ||
| broadcast it as a single batched update instead of replaying individual | ||
| queued messages. | ||
|
|
||
| Batching avoids a pycrdt offset encoding bug | ||
| (jupyter-ai-contrib/jupyter-server-documents#197) where individual | ||
| incremental Text updates after multi-byte characters crash JS yjs | ||
| clients with findIndexSS "Unexpected case". | ||
| """ | ||
| self._queue = [] | ||
| self._paused = False | ||
| for message in queued: | ||
| self._broadcast(message) | ||
| if catchup_message is not None: | ||
| self._broadcast(catchup_message) |
There was a problem hiding this comment.
It seems like we no longer need to store the update messages at all, since we always apply incoming YDoc updates to the YDoc so they will always be included with the catchup message. Also this method does nothing if catchup_message is None, which is expected. Perhaps we can require the pre-sync server state vector as an argument here, and compute the catch up message in the implementation?
The new interface would look like:
def resume(self, pre_sync_sv: bytes) -> None:
Then, we should rename this class to YRoomUpdateChannel or something similar that doesn't imply it's a buffer, but instead a broadcast channel that can be turned on or off. Not in scope for this PR, can be done in the future.
| catchup = self._ydoc.get_update(pre_sync_sv) | ||
| # An empty yjs update is 2 bytes (b"\x00\x00"). | ||
| catchup_message = None | ||
| if catchup and len(catchup) > 2: | ||
| catchup_message = pycrdt.create_update_message(catchup) | ||
| self.update_buffer.resume(catchup_message=catchup_message) |
There was a problem hiding this comment.
With the above suggestion, we can delegate this to self.update_buffer (which we can rename to self.update_channel), e.g.
| catchup = self._ydoc.get_update(pre_sync_sv) | |
| # An empty yjs update is 2 bytes (b"\x00\x00"). | |
| catchup_message = None | |
| if catchup and len(catchup) > 2: | |
| catchup_message = pycrdt.create_update_message(catchup) | |
| self.update_buffer.resume(catchup_message=catchup_message) | |
| self.update_channel.resume(pre_sync_sv=pre_sync_sv) |
|
Merging this now as I haven't gotten a reply from you in a bit. Will work on the follow up comments in a separate PR. Thanks for working on this @xrl! |
Summary
YRoomUpdateBufferclass introduced in Fix content duplication on reconnection #215handle_sync()ydoc.get_update(pre_sync_sv)and passes it toresume()resume()broadcasts the batched diff instead of replaying individual queued messagesThis fixes two issues for divergent client handshakes:
Changes
rooms/yroom.py—handle_sync():pre_sync_sv = self._ydoc.get_state()before the handshakecatchup = self._ydoc.get_update(pre_sync_sv)self.update_buffer.resume(catchup_message=...)rooms/yroom_update_buffer.py—resume():catchup_messageparameterTest plan
pytest jupyter_server_documents/tests/test_yroom_sync.py jupyter_server_documents/tests/test_yroom.py)Related