Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 35 additions & 17 deletions internal/callbackqueue/callbackqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,75 @@ import (
"github.com/lorenzodonini/ocpp-go/ocpp"
)

type RequestType string
type CallbackQueue struct {
callbacksMutex sync.RWMutex
callbacks map[string][]func(confirmation ocpp.Response, err error)
callbacks map[string]map[RequestType][]func(confirmation ocpp.Response, err error)
}

func New() CallbackQueue {
return CallbackQueue{
callbacks: make(map[string][]func(confirmation ocpp.Response, err error)),
callbacks: make(map[string]map[RequestType][]func(confirmation ocpp.Response, err error)),
}
}

func (cq *CallbackQueue) TryQueue(id string, try func() error, callback func(confirmation ocpp.Response, err error)) error {
func (cq *CallbackQueue) TryQueue(id string, requestType RequestType, try func() error, callback func(confirmation ocpp.Response, err error)) error {
cq.callbacksMutex.Lock()
defer cq.callbacksMutex.Unlock()

cq.callbacks[id] = append(cq.callbacks[id], callback)
if _, ok := cq.callbacks[id]; !ok {
cq.callbacks[id] = make(map[RequestType][]func(confirmation ocpp.Response, err error))
}
cq.callbacks[id][requestType] = append(cq.callbacks[id][requestType], callback)

if err := try(); err != nil {
// pop off last element
callbacks := cq.callbacks[id]
cq.callbacks[id] = callbacks[:len(callbacks)-1]
// pop off our element
if callbacks, ok := cq.callbacks[id]; ok {
delete(callbacks, requestType)
}
if len(cq.callbacks[id]) == 0 {
delete(cq.callbacks, id)
}

return err
}

return nil
}

func (cq *CallbackQueue) Dequeue(id string) (func(confirmation ocpp.Response, err error), bool) {
func (cq *CallbackQueue) Dequeue(id string, requestType RequestType) (func(confirmation ocpp.Response, err error), bool) {
cq.callbacksMutex.Lock()
defer cq.callbacksMutex.Unlock()

callbacks, ok := cq.callbacks[id]
clientCallbacks, ok := cq.callbacks[id]
if !ok {
return nil, false
}

if len(callbacks) == 0 {
panic("Internal CallbackQueue inconsistency")
if len(clientCallbacks) == 0 {
//panic("Internal CallbackQueue inconsistency")
return nil, false
}

requestTypeCallbacks, ok := clientCallbacks[requestType]
if !ok {
if requestType != "" { /* requestType known and not available... */
return nil, false
}
/* requestType any, take first one... */
for reqType, cb := range clientCallbacks {
requestType = reqType
requestTypeCallbacks = append(requestTypeCallbacks, cb...)
break // only first one
}
}

callback := callbacks[0]
callback := requestTypeCallbacks[0]

if len(callbacks) == 1 {
delete(cq.callbacks, id)
if len(requestTypeCallbacks) == 1 {
delete(cq.callbacks[id], requestType)
} else {
cq.callbacks[id] = callbacks[1:]
cq.callbacks[id][requestType] = requestTypeCallbacks[1:]
}

return callback, ok
return callback, true
}
10 changes: 5 additions & 5 deletions ocpp1.6/central_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (cs *centralSystem) SetNewChargePointHandler(handler ChargePointConnectionH

func (cs *centralSystem) SetChargePointDisconnectedHandler(handler ChargePointConnectionHandler) {
cs.server.SetDisconnectedClientHandler(func(chargePoint ws.Channel) {
for cb, ok := cs.callbackQueue.Dequeue(chargePoint.ID()); ok; cb, ok = cs.callbackQueue.Dequeue(chargePoint.ID()) {
for cb, ok := cs.callbackQueue.Dequeue(chargePoint.ID(), ""); ok; cb, ok = cs.callbackQueue.Dequeue(chargePoint.ID(), "") {
err := ocpp.NewError(ocppj.GenericError, "client disconnected, no response received from client", "")
cb(nil, err)
}
Expand Down Expand Up @@ -533,7 +533,7 @@ func (cs *centralSystem) SendRequestAsync(clientId string, request ocpp.Request,
send := func() error {
return cs.server.SendRequest(clientId, request)
}
return cs.callbackQueue.TryQueue(clientId, send, callback)
return cs.callbackQueue.TryQueue(clientId, callbackqueue.RequestType(featureName), send, callback)
}

func (cs *centralSystem) Start(listenPort int, listenPath string) {
Expand Down Expand Up @@ -695,7 +695,7 @@ func (cs *centralSystem) handleIncomingRequest(chargePoint ChargePointConnection
}

func (cs *centralSystem) handleIncomingConfirmation(chargePoint ChargePointConnection, confirmation ocpp.Response, requestId string) {
if callback, ok := cs.callbackQueue.Dequeue(chargePoint.ID()); ok {
if callback, ok := cs.callbackQueue.Dequeue(chargePoint.ID(), callbackqueue.RequestType(confirmation.GetFeatureName())); ok {
// Execute in separate goroutine, so the caller goroutine is available
go callback(confirmation, nil)
} else {
Expand All @@ -705,7 +705,7 @@ func (cs *centralSystem) handleIncomingConfirmation(chargePoint ChargePointConne
}

func (cs *centralSystem) handleIncomingError(chargePoint ChargePointConnection, err *ocpp.Error, details interface{}) {
if callback, ok := cs.callbackQueue.Dequeue(chargePoint.ID()); ok {
if callback, ok := cs.callbackQueue.Dequeue(chargePoint.ID(), ""); ok {
// Execute in separate goroutine, so the caller goroutine is available
go callback(nil, err)
} else {
Expand All @@ -715,7 +715,7 @@ func (cs *centralSystem) handleIncomingError(chargePoint ChargePointConnection,
}

func (cs *centralSystem) handleCanceledRequest(chargePointID string, request ocpp.Request, err *ocpp.Error) {
if callback, ok := cs.callbackQueue.Dequeue(chargePointID); ok {
if callback, ok := cs.callbackQueue.Dequeue(chargePointID, callbackqueue.RequestType(request.GetFeatureName())); ok {
// Execute in separate goroutine, so the caller goroutine is available
go callback(nil, err)
} else {
Expand Down
10 changes: 5 additions & 5 deletions ocpp1.6/charge_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (cp *chargePoint) SendRequest(request ocpp.Request) (ocpp.Response, error)
send := func() error {
return cp.client.SendRequest(request)
}
err := cp.callbacks.TryQueue("main", send, func(confirmation ocpp.Response, err error) {
err := cp.callbacks.TryQueue("main", callbackqueue.RequestType(request.GetFeatureName()), send, func(confirmation ocpp.Response, err error) {
asyncResponseC <- asyncResponse{r: confirmation, e: err}
})
if err != nil {
Expand Down Expand Up @@ -335,7 +335,7 @@ func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(conf
send := func() error {
return cp.client.SendRequest(request)
}
err := cp.callbacks.TryQueue("main", send, callback)
err := cp.callbacks.TryQueue("main", callbackqueue.RequestType(request.GetFeatureName()), send, callback)
return err
}

Expand All @@ -344,15 +344,15 @@ func (cp *chargePoint) asyncCallbackHandler() {
select {
case confirmation := <-cp.confirmationHandler:
// Get and invoke callback
if callback, ok := cp.callbacks.Dequeue("main"); ok {
if callback, ok := cp.callbacks.Dequeue("main", callbackqueue.RequestType(confirmation.GetFeatureName())); ok {
callback(confirmation, nil)
} else {
err := fmt.Errorf("no handler available for incoming response %v", confirmation.GetFeatureName())
cp.error(err)
}
case protoError := <-cp.errorHandler:
// Get and invoke callback
if callback, ok := cp.callbacks.Dequeue("main"); ok {
if callback, ok := cp.callbacks.Dequeue("main", ""); ok {
callback(nil, protoError)
} else {
err := fmt.Errorf("no handler available for error %v", protoError.Error())
Expand All @@ -368,7 +368,7 @@ func (cp *chargePoint) asyncCallbackHandler() {
}

func (cp *chargePoint) clearCallbacks(invokeCallback bool) {
for cb, ok := cp.callbacks.Dequeue("main"); ok; cb, ok = cp.callbacks.Dequeue("main") {
for cb, ok := cp.callbacks.Dequeue("main", ""); ok; cb, ok = cp.callbacks.Dequeue("main", "") {
if invokeCallback {
err := ocpp.NewError(ocppj.GenericError, "client stopped, no response received from server", "")
cb(nil, err)
Expand Down
7 changes: 4 additions & 3 deletions ocpp1.6/smartcharging/clear_charging_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ const ClearChargingProfileFeatureName = "ClearChargingProfile"
type ClearChargingProfileStatus string

const (
ClearChargingProfileStatusAccepted ClearChargingProfileStatus = "Accepted"
ClearChargingProfileStatusUnknown ClearChargingProfileStatus = "Unknown"
ClearChargingProfileStatusAccepted ClearChargingProfileStatus = "Accepted"
ClearChargingProfileStatusUnknown ClearChargingProfileStatus = "Unknown"
ClearChargingProfileStatusNotSupported ClearChargingProfileStatus = "NotSupported"
)

func isValidClearChargingProfileStatus(fl validator.FieldLevel) bool {
status := ClearChargingProfileStatus(fl.Field().String())
switch status {
case ClearChargingProfileStatusAccepted, ClearChargingProfileStatusUnknown:
case ClearChargingProfileStatusAccepted, ClearChargingProfileStatusUnknown, ClearChargingProfileStatusNotSupported:
return true
default:
return false
Expand Down
4 changes: 2 additions & 2 deletions ocpp1.6/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func isValidUnitOfMeasure(fl validator.FieldLevel) bool {
}

type SampledValue struct {
Value string `json:"value" validate:"required"`
Value string `json:"value" validate:"omitempty,required"`
Context ReadingContext `json:"context,omitempty" validate:"omitempty,readingContext16"`
Format ValueFormat `json:"format,omitempty" validate:"omitempty,valueFormat"`
Measurand Measurand `json:"measurand,omitempty" validate:"omitempty,measurand16"`
Expand All @@ -310,7 +310,7 @@ type SampledValue struct {

type MeterValue struct {
Timestamp *DateTime `json:"timestamp" validate:"required"`
SampledValue []SampledValue `json:"sampledValue" validate:"required,min=1,dive"`
SampledValue []SampledValue `json:"sampledValue" validate:"required,min=0,dive"`
}

// Initialize validator
Expand Down
8 changes: 4 additions & 4 deletions ocpp2.0.1/charging_station.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (cs *chargingStation) SendRequest(request ocpp.Request) (ocpp.Response, err
send := func() error {
return cs.client.SendRequest(request)
}
err := cs.callbacks.TryQueue("main", send, func(confirmation ocpp.Response, err error) {
err := cs.callbacks.TryQueue("main", callbackqueue.RequestType(request.GetFeatureName()), send, func(confirmation ocpp.Response, err error) {
asyncResponseC <- asyncResponse{r: confirmation, e: err}
})
if err != nil {
Expand Down Expand Up @@ -528,7 +528,7 @@ func (cs *chargingStation) SendRequestAsync(request ocpp.Request, callback func(
send := func() error {
return cs.client.SendRequest(request)
}
err := cs.callbacks.TryQueue("main", send, callback)
err := cs.callbacks.TryQueue("main", callbackqueue.RequestType(request.GetFeatureName()), send, callback)
return err
}

Expand All @@ -537,14 +537,14 @@ func (cs *chargingStation) asyncCallbackHandler() {
select {
case confirmation := <-cs.responseHandler:
// Get and invoke callback
if callback, ok := cs.callbacks.Dequeue("main"); ok {
if callback, ok := cs.callbacks.Dequeue("main", callbackqueue.RequestType(confirmation.GetFeatureName())); ok {
callback(confirmation, nil)
} else {
cs.error(fmt.Errorf("no callback available for incoming response %v", confirmation.GetFeatureName()))
}
case protoError := <-cs.errorHandler:
// Get and invoke callback
if callback, ok := cs.callbacks.Dequeue("main"); ok {
if callback, ok := cs.callbacks.Dequeue("main", ""); ok {
callback(nil, protoError)
} else {
cs.error(fmt.Errorf("no callback available for incoming error %w", protoError))
Expand Down
10 changes: 5 additions & 5 deletions ocpp2.0.1/csms.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func (cs *csms) SetNewChargingStationHandler(handler ChargingStationConnectionHa

func (cs *csms) SetChargingStationDisconnectedHandler(handler ChargingStationConnectionHandler) {
cs.server.SetDisconnectedClientHandler(func(chargingStation ws.Channel) {
for cb, ok := cs.callbackQueue.Dequeue(chargingStation.ID()); ok; cb, ok = cs.callbackQueue.Dequeue(chargingStation.ID()) {
for cb, ok := cs.callbackQueue.Dequeue(chargingStation.ID(), ""); ok; cb, ok = cs.callbackQueue.Dequeue(chargingStation.ID(), "") {
err := ocpp.NewError(ocppj.GenericError, "client disconnected, no response received from client", "")
cb(nil, err)
}
Expand Down Expand Up @@ -811,7 +811,7 @@ func (cs *csms) SendRequestAsync(clientId string, request ocpp.Request, callback
send := func() error {
return cs.server.SendRequest(clientId, request)
}
return cs.callbackQueue.TryQueue(clientId, send, callback)
return cs.callbackQueue.TryQueue(clientId, callbackqueue.RequestType(request.GetFeatureName()), send, callback)
}

func (cs *csms) Start(listenPort int, listenPath string) {
Expand Down Expand Up @@ -1019,7 +1019,7 @@ func (cs *csms) handleIncomingRequest(chargingStation ChargingStationConnection,
}

func (cs *csms) handleIncomingResponse(chargingStation ChargingStationConnection, response ocpp.Response, requestId string) {
if callback, ok := cs.callbackQueue.Dequeue(chargingStation.ID()); ok {
if callback, ok := cs.callbackQueue.Dequeue(chargingStation.ID(), callbackqueue.RequestType(response.GetFeatureName())); ok {
// Execute in separate goroutine, so the caller goroutine is available
go callback(response, nil)
} else {
Expand All @@ -1029,7 +1029,7 @@ func (cs *csms) handleIncomingResponse(chargingStation ChargingStationConnection
}

func (cs *csms) handleIncomingError(chargingStation ChargingStationConnection, err *ocpp.Error, details interface{}) {
if callback, ok := cs.callbackQueue.Dequeue(chargingStation.ID()); ok {
if callback, ok := cs.callbackQueue.Dequeue(chargingStation.ID(), ""); ok {
// Execute in separate goroutine, so the caller goroutine is available
go callback(nil, err)
} else {
Expand All @@ -1038,7 +1038,7 @@ func (cs *csms) handleIncomingError(chargingStation ChargingStationConnection, e
}

func (cs *csms) handleCanceledRequest(chargePointID string, request ocpp.Request, err *ocpp.Error) {
if callback, ok := cs.callbackQueue.Dequeue(chargePointID); ok {
if callback, ok := cs.callbackQueue.Dequeue(chargePointID, callbackqueue.RequestType(request.GetFeatureName())); ok {
// Execute in separate goroutine, so the caller goroutine is available
go callback(nil, err)
} else {
Expand Down
12 changes: 10 additions & 2 deletions ocppj/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type DefaultClientDispatcher struct {
}

const (
defaultTimeoutTick = 24 * time.Hour
defaultTimeoutTick = 2 * time.Minute
defaultMessageTimeout = 30 * time.Second
)

Expand Down Expand Up @@ -195,6 +195,9 @@ func (d *DefaultClientDispatcher) messagePump() {
if !ok {
continue
}
// if d == nil {
// continue // Dispatcher was stopped
// }
if d.pendingRequestState.HasPendingRequest() {
// Current request timed out. Removing request and triggering cancel callback
el := d.requestQueue.Peek()
Expand Down Expand Up @@ -235,16 +238,21 @@ func (d *DefaultClientDispatcher) dispatchNextRequest() {
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
jsonMessage := bundle.Data
d.pendingRequestState.AddPendingRequest(bundle.Call.UniqueId, bundle.Call.Payload)
if !d.pendingRequestState.AddPendingRequest(bundle.Call.UniqueId, bundle.Call.Payload) {
log.Errorf("NOT dispatched request %s to server", bundle.Call.UniqueId)
return
}
// Attempt to send over network
err := d.network.Write(jsonMessage)
if err != nil {
log.Errorf("error sending JSON message to server: %s", string(jsonMessage))
// TODO: handle retransmission instead of skipping request altogether
d.CompleteRequest(bundle.Call.GetUniqueId())
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Payload,
ocpp.NewError(InternalError, err.Error(), bundle.Call.UniqueId))
}
return
}
log.Infof("dispatched request %s to server", bundle.Call.UniqueId)
log.Debugf("sent JSON message to server: %s", string(jsonMessage))
Expand Down
3 changes: 3 additions & 0 deletions ocppj/ocppj.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ func (endpoint *Endpoint) CreateCall(request ocpp.Request) (*Call, error) {
}
// TODO: handle collisions?
uniqueId := messageIdGenerator()
if uniqueId == "" {
panic("message ID generator returned an empty string")
}
call := Call{
MessageTypeId: CALL,
UniqueId: uniqueId,
Expand Down
6 changes: 4 additions & 2 deletions ocppj/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ClientState interface {
// Sets a Request as pending on the endpoint. Requests are considered pending until a response was received.
// The function expects a unique message ID and the Request.
// If an element with the same requestID exists, the new one will be ignored.
AddPendingRequest(requestID string, req ocpp.Request)
AddPendingRequest(requestID string, req ocpp.Request) bool
// Retrieves a pending Request, using the message ID.
// If no request for the passed message ID is found, a false flag is returned.
GetPendingRequest(requestID string) (ocpp.Request, bool)
Expand Down Expand Up @@ -45,15 +45,17 @@ func NewClientState() ClientState {
return &clientState{}
}

func (s *clientState) AddPendingRequest(requestID string, req ocpp.Request) {
func (s *clientState) AddPendingRequest(requestID string, req ocpp.Request) bool {
s.mutex.Lock()
defer s.mutex.Unlock()
if requestID != "" && s.requestID == "" {
s.requestID = requestID
s.pendingRequest = pendingRequest{
request: req,
}
return true
}
return false
}

func (s *clientState) GetPendingRequest(requestID string) (ocpp.Request, bool) {
Expand Down