-
-
Notifications
You must be signed in to change notification settings - Fork 27
feat: add resilient SSE session with reconnection/backoff #43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
97e320b
38d9edb
0a21302
9b134e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -43,9 +43,12 @@ public struct EventSource: Sendable { | |||||||||||
| private let eventParser: @Sendable () -> EventParser | ||||||||||||
|
|
||||||||||||
| public var timeoutIntervalForRequest: TimeInterval | ||||||||||||
|
|
||||||||||||
| public var timeoutIntervalForResource: TimeInterval | ||||||||||||
|
|
||||||||||||
| public var maxReconnectAttempts: Int | ||||||||||||
| public var reconnectInitialDelay: TimeInterval | ||||||||||||
| public var reconnectBackoffFactor: Double | ||||||||||||
|
|
||||||||||||
| public init( | ||||||||||||
| mode: Mode = .default, | ||||||||||||
| timeoutIntervalForRequest: TimeInterval = 60, | ||||||||||||
|
|
@@ -63,20 +66,29 @@ public struct EventSource: Sendable { | |||||||||||
| mode: Mode = .default, | ||||||||||||
| eventParser: @autoclosure @escaping @Sendable () -> EventParser, | ||||||||||||
| timeoutIntervalForRequest: TimeInterval = 60, | ||||||||||||
| timeoutIntervalForResource: TimeInterval = 300 | ||||||||||||
| timeoutIntervalForResource: TimeInterval = 300, | ||||||||||||
| maxReconnectAttempts: Int = 5, | ||||||||||||
| reconnectInitialDelay: TimeInterval = 1.0, | ||||||||||||
| reconnectBackoffFactor: Double = 2.0 | ||||||||||||
| ) { | ||||||||||||
| self.mode = mode | ||||||||||||
| self.eventParser = eventParser | ||||||||||||
| self.timeoutIntervalForRequest = timeoutIntervalForRequest | ||||||||||||
| self.timeoutIntervalForResource = timeoutIntervalForResource | ||||||||||||
| self.maxReconnectAttempts = maxReconnectAttempts | ||||||||||||
| self.reconnectInitialDelay = reconnectInitialDelay | ||||||||||||
| self.reconnectBackoffFactor = reconnectBackoffFactor | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| public func dataTask(for urlRequest: URLRequest) -> DataTask { | ||||||||||||
| DataTask( | ||||||||||||
| urlRequest: urlRequest, | ||||||||||||
| eventParser: eventParser(), | ||||||||||||
| timeoutIntervalForRequest: timeoutIntervalForRequest, | ||||||||||||
| timeoutIntervalForResource: timeoutIntervalForResource | ||||||||||||
| timeoutIntervalForResource: timeoutIntervalForResource, | ||||||||||||
| maxReconnectAttempts: maxReconnectAttempts, | ||||||||||||
| reconnectInitialDelay: reconnectInitialDelay, | ||||||||||||
| reconnectBackoffFactor: reconnectBackoffFactor | ||||||||||||
| ) | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
@@ -88,8 +100,68 @@ public extension EventSource { | |||||||||||
| /// ``EventSource/EventSource/dataTask(for:)`` method on the EventSource instance. After creating a task, | ||||||||||||
| /// it can be started by iterating event stream returned by ``DataTask/events()``. | ||||||||||||
| final class DataTask: Sendable { | ||||||||||||
| /// Initializes or reinitializes the SSE session | ||||||||||||
| private func startSession(stream continuation: AsyncStream<EventType>.Continuation) { | ||||||||||||
| let sessionDelegate = SessionDelegate() | ||||||||||||
| let urlSession = URLSession( | ||||||||||||
| configuration: urlSessionConfiguration, | ||||||||||||
| delegate: sessionDelegate, | ||||||||||||
| delegateQueue: nil | ||||||||||||
| ) | ||||||||||||
| let urlSessionDataTask = urlSession.dataTask(with: urlRequest) | ||||||||||||
| let sessionDelegateTask = Task { [weak self] in | ||||||||||||
| for await event in sessionDelegate.eventStream { | ||||||||||||
| guard let self else { return } | ||||||||||||
| switch event { | ||||||||||||
| case let .didCompleteWithError(error): | ||||||||||||
| self.handleSessionError(error, stream: continuation, urlSession: urlSession) | ||||||||||||
| case let .didReceiveResponse(response, completionHandler): | ||||||||||||
| self.handleSessionResponse( | ||||||||||||
| response, | ||||||||||||
| stream: continuation, | ||||||||||||
| urlSession: urlSession, | ||||||||||||
| completionHandler: completionHandler | ||||||||||||
| ) | ||||||||||||
| case let .didReceiveData(data): | ||||||||||||
| self.parseMessages(from: data, stream: continuation, urlSession: urlSession) | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| continuation.onTermination = { @Sendable [weak self] _ in | ||||||||||||
| sessionDelegateTask.cancel() | ||||||||||||
| Task { self?.close(stream: continuation, urlSession: urlSession) } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| urlSessionDataTask.resume() | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Helper method for reconnection | ||||||||||||
| private func attemptReconnect(stream continuation: AsyncStream<EventType>.Continuation) { | ||||||||||||
| let delay = reconnectInitialDelay * pow(reconnectBackoffFactor, Double(reconnectAttempts - 1)) | ||||||||||||
| DispatchQueue.global().asyncAfter(deadline: .now() + delay) { [weak self] in | ||||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it possible to implement this with
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Done |
||||||||||||
| self?.startSession(stream: continuation) | ||||||||||||
| self?.readyState = .connecting | ||||||||||||
| self?.consumed = true | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| private let _readyState: Mutex<ReadyState> = Mutex(.none) | ||||||||||||
|
|
||||||||||||
| // Reconnection properties | ||||||||||||
| private let maxReconnectAttempts: Int | ||||||||||||
| private let reconnectInitialDelay: TimeInterval | ||||||||||||
| private let reconnectBackoffFactor: Double | ||||||||||||
|
|
||||||||||||
| private let _reconnectAttempts: Mutex<Int> = Mutex(0) | ||||||||||||
| private var reconnectAttempts: Int { | ||||||||||||
| get { | ||||||||||||
| _reconnectAttempts.withLock { $0 } | ||||||||||||
| } | ||||||||||||
| set { | ||||||||||||
| _reconnectAttempts.withLock { $0 = newValue } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// A value representing the state of the connection. | ||||||||||||
| public var readyState: ReadyState { | ||||||||||||
| get { | ||||||||||||
|
|
@@ -170,12 +242,18 @@ public extension EventSource { | |||||||||||
| urlRequest: URLRequest, | ||||||||||||
| eventParser: EventParser, | ||||||||||||
| timeoutIntervalForRequest: TimeInterval, | ||||||||||||
| timeoutIntervalForResource: TimeInterval | ||||||||||||
| timeoutIntervalForResource: TimeInterval, | ||||||||||||
| maxReconnectAttempts: Int, | ||||||||||||
| reconnectInitialDelay: TimeInterval, | ||||||||||||
| reconnectBackoffFactor: Double | ||||||||||||
| ) { | ||||||||||||
| self.urlRequest = urlRequest | ||||||||||||
| self._eventParser = Mutex(eventParser) | ||||||||||||
| self.timeoutIntervalForRequest = timeoutIntervalForRequest | ||||||||||||
| self.timeoutIntervalForResource = timeoutIntervalForResource | ||||||||||||
| self.maxReconnectAttempts = maxReconnectAttempts | ||||||||||||
| self.reconnectInitialDelay = reconnectInitialDelay | ||||||||||||
| self.reconnectBackoffFactor = reconnectBackoffFactor | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Creates and returns event stream. | ||||||||||||
|
|
@@ -188,40 +266,7 @@ public extension EventSource { | |||||||||||
| } | ||||||||||||
|
|
||||||||||||
| return AsyncStream { continuation in | ||||||||||||
| let sessionDelegate = SessionDelegate() | ||||||||||||
| let urlSession = URLSession( | ||||||||||||
| configuration: urlSessionConfiguration, | ||||||||||||
| delegate: sessionDelegate, | ||||||||||||
| delegateQueue: nil | ||||||||||||
| ) | ||||||||||||
| let urlSessionDataTask = urlSession.dataTask(with: urlRequest) | ||||||||||||
|
|
||||||||||||
| let sessionDelegateTask = Task { [weak self] in | ||||||||||||
| for await event in sessionDelegate.eventStream { | ||||||||||||
| guard let self else { return } | ||||||||||||
|
|
||||||||||||
| switch event { | ||||||||||||
| case let .didCompleteWithError(error): | ||||||||||||
| handleSessionError(error, stream: continuation, urlSession: urlSession) | ||||||||||||
| case let .didReceiveResponse(response, completionHandler): | ||||||||||||
| handleSessionResponse( | ||||||||||||
| response, | ||||||||||||
| stream: continuation, | ||||||||||||
| urlSession: urlSession, | ||||||||||||
| completionHandler: completionHandler | ||||||||||||
| ) | ||||||||||||
| case let .didReceiveData(data): | ||||||||||||
| parseMessages(from: data, stream: continuation, urlSession: urlSession) | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| continuation.onTermination = { @Sendable [weak self] _ in | ||||||||||||
| sessionDelegateTask.cancel() | ||||||||||||
| Task { self?.close(stream: continuation, urlSession: urlSession) } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| urlSessionDataTask.resume() | ||||||||||||
| startSession(stream: continuation) | ||||||||||||
| readyState = .connecting | ||||||||||||
| consumed = true | ||||||||||||
| } | ||||||||||||
|
|
@@ -241,9 +286,15 @@ public extension EventSource { | |||||||||||
| if let error { | ||||||||||||
| sendErrorEvent(with: error, stream: continuation) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // Close connection | ||||||||||||
| close(stream: continuation, urlSession: urlSession) | ||||||||||||
|
|
||||||||||||
| // Attempts to reconnect if the limit has not been exceeded | ||||||||||||
| if reconnectAttempts < maxReconnectAttempts { | ||||||||||||
|
Comment on lines
+293
to
+294
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
| reconnectAttempts += 1 | ||||||||||||
| attemptReconnect(stream: continuation) | ||||||||||||
| } else { | ||||||||||||
| // Close connection if attempts exceeded | ||||||||||||
| close(stream: continuation, urlSession: urlSession) | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private func handleSessionResponse( | ||||||||||||
|
|
@@ -321,6 +372,7 @@ public extension EventSource { | |||||||||||
|
|
||||||||||||
| private func setOpen(stream continuation: AsyncStream<EventType>.Continuation) { | ||||||||||||
| readyState = .open | ||||||||||||
| reconnectAttempts = 0 // reset attempts when opening | ||||||||||||
| continuation.yield(.open) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more of a stylistic request to declare methods after init()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.