Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions ocppj/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func NewDefaultServerDispatcher(queueMap ServerQueueMap) *DefaultServerDispatche
readyForDispatch: make(chan string, 1),
timeout: defaultMessageTimeout,
}
d.pendingRequestState = NewServerState(&d.mutex)
d.pendingRequestState = NewServerState(&sync.RWMutex{})
return d
}

Expand Down Expand Up @@ -544,7 +544,16 @@ func (d *DefaultServerDispatcher) messagePump() {
continue
}
bundle, _ := el.(RequestBundle)
d.CompleteRequest(clientID, bundle.Call.UniqueId)
// Complete the request inline instead of calling CompleteRequest,
// which sends to readyForDispatch. Since messagePump is the sole
// reader of that channel, sending to it here would self-deadlock
// if the buffer is already full from a previous iteration.
q.Pop()
d.pendingRequestState.DeletePendingRequest(clientID, bundle.Call.UniqueId)
log.Debugf("completed request %s for %s", bundle.Call.UniqueId, clientID)
// Mark this client as ready for its next queued request
clientQueue = q
rdy = true
log.Infof("request %v for %v timed out", bundle.Call.UniqueId, clientID)
if d.onRequestCancel != nil {
d.onRequestCancel(clientID, bundle.Call.UniqueId, bundle.Call.Payload,
Expand Down Expand Up @@ -621,10 +630,15 @@ func (d *DefaultServerDispatcher) waitForTimeout(clientID string, clientCtx clie
case <-clientCtx.ctx.Done():
err := clientCtx.ctx.Err()
if err == context.DeadlineExceeded {
// Timeout triggered, notifying messagePump
// Timeout triggered, notifying messagePump.
// Check running state under lock, but release before the channel
// send. Holding RLock during a potentially blocking send can cause
// a deadlock: if timerC is full, this goroutine blocks while
// holding RLock, preventing any Lock() caller from proceeding.
d.mutex.RLock()
defer d.mutex.RUnlock()
if d.running {
running := d.running
d.mutex.RUnlock()
if running {
d.timerC <- clientID
}
} else {
Expand Down
85 changes: 85 additions & 0 deletions ocppj/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ocppj_test
import (
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -457,3 +458,87 @@ func (c *ClientDispatcherTestSuite) TestClientSendPausedDispatcher() {
assert.Equal(t, requestNumber, c.queue.Size())
assert.False(t, c.state.HasPendingRequest())
}

// TestServerDispatcherConcurrentTimeoutDeadlock demonstrates a deadlock in
// DefaultServerDispatcher that occurs when many client request timeouts fire
// simultaneously.
//
// Root cause: DefaultServerDispatcher shares its RWMutex with the ServerState
// (via NewServerState(&d.mutex) in NewDefaultServerDispatcher). When more than
// 10 timeouts fire at once:
//
// 1. waitForTimeout goroutines acquire RLock and try to send to timerC (buffer=10)
// 2. Goroutines beyond the buffer capacity block on the channel send while holding RLock
// 3. messagePump drains one item from timerC, then calls HasPendingRequest,
// which needs Lock (write lock) on the same shared mutex
// 4. Lock blocks because the goroutines from step 2 still hold RLock
// 5. Those goroutines can't release RLock because they're blocked on timerC
//
// Result: circular wait — a textbook deadlock. No further timeouts or messages
// can be processed for ANY client.
//
// This is a standalone test (not part of ServerDispatcherTestSuite) because the
// suite's SetupTest replaces the ServerState with one using a separate mutex,
// which inadvertently avoids the shared-mutex deadlock path.
func TestServerDispatcherConcurrentTimeoutDeadlock(t *testing.T) {
numClients := 15 // must exceed timerC buffer size of 10
requestTimeout := 50 * time.Millisecond

queueMap := ocppj.NewFIFOQueueMap(numClients)
dispatcher := ocppj.NewDefaultServerDispatcher(queueMap)
// Important: do NOT call SetPendingRequestState here. The default setup in
// NewDefaultServerDispatcher shares the dispatcher's own mutex with the
// ServerState — this is the production code path and the root cause.

mockServer := &MockWebsocketServer{}
mockServer.On("Write", mock.AnythingOfType("string"), mock.Anything).Return(nil)
dispatcher.SetNetworkServer(mockServer)
dispatcher.SetTimeout(requestTimeout)

endpoint := ocppj.Server{}
mockProfile := ocpp.NewProfile("mock", &MockFeature{})
endpoint.AddProfile(mockProfile)

canceledC := make(chan string, numClients)
dispatcher.SetOnRequestCanceled(func(clientID string, requestID string, request ocpp.Request, err *ocpp.Error) {
canceledC <- clientID
})

dispatcher.Start()
// Note: we intentionally do not defer dispatcher.Stop() here, because if
// the deadlock occurs, Stop() itself will block on the same mutex.

// Create clients and send one request to each.
// All requests will be dispatched nearly simultaneously by messagePump,
// so all timeouts fire within a narrow window (~requestTimeout).
for i := 0; i < numClients; i++ {
clientID := fmt.Sprintf("client%d", i)
dispatcher.CreateClient(clientID)

req := newMockRequest("somevalue")
call, err := endpoint.CreateCall(req)
require.NoError(t, err)
data, err := call.MarshalJSON()
require.NoError(t, err)
bundle := ocppj.RequestBundle{Call: call, Data: data}
err = dispatcher.SendRequest(clientID, bundle)
require.NoError(t, err)
}

// All timeout callbacks should fire within 2 seconds.
// If the deadlock occurs, the messagePump freezes after processing at most
// one timeout, and the remaining callbacks never fire.
deadline := time.After(2 * time.Second)
received := 0
for received < numClients {
select {
case <-canceledC:
received++
case <-deadline:
t.Fatalf("deadlock detected: only %d/%d timeout callbacks fired within 2s", received, numClients)
}
}

// Only stop if we got here without deadlocking
dispatcher.Stop()
}
29 changes: 19 additions & 10 deletions ocppj/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func (s *clientState) AddPendingRequest(requestID string, req ocpp.Request) {
}

func (s *clientState) GetPendingRequest(requestID string) (ocpp.Request, bool) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.mutex.RLock()
defer s.mutex.RUnlock()
if s.requestID != requestID {
return nil, false
}
Expand All @@ -81,8 +81,8 @@ func (s *clientState) ClearPendingRequests() {
}

func (s *clientState) HasPendingRequest() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.requestID != ""
}

Expand Down Expand Up @@ -153,8 +153,8 @@ func (d *serverState) AddPendingRequest(clientID string, requestID string, req o

func (d *serverState) DeletePendingRequest(clientID string, requestID string) {
if d.mutex != nil {
d.mutex.Lock()
defer d.mutex.Unlock()
d.mutex.RLock()
defer d.mutex.RUnlock()
}
state, exists := d.pendingRequestState[clientID]
if !exists {
Expand All @@ -165,25 +165,34 @@ func (d *serverState) DeletePendingRequest(clientID string, requestID string) {

func (d *serverState) GetClientState(clientID string) ClientState {
if d.mutex != nil {
// Fast path: state already exists, read lock is sufficient
d.mutex.RLock()
state, exists := d.pendingRequestState[clientID]
d.mutex.RUnlock()
if exists {
return state
}
// Slow path: need to create state, requires write lock
d.mutex.Lock()
defer d.mutex.Unlock()
return d.getOrCreateState(clientID)
}
return d.getOrCreateState(clientID)
}

func (d *serverState) HasPendingRequest(clientID string) bool {
if d.mutex != nil {
d.mutex.Lock()
defer d.mutex.Unlock()
d.mutex.RLock()
defer d.mutex.RUnlock()
}
state, exists := d.pendingRequestState[clientID]
return exists && state.HasPendingRequest()
}

func (d *serverState) HasPendingRequests() bool {
if d.mutex != nil {
d.mutex.Lock()
defer d.mutex.Unlock()
d.mutex.RLock()
defer d.mutex.RUnlock()
}
for _, s := range d.pendingRequestState {
if s.HasPendingRequest() {
Expand Down