diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go index f909444aa..45e159863 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go @@ -218,8 +218,8 @@ func (cm *ConsumerManager) createConsumer(groupID string, topics []string, callb } // consumerGroupID generates a unique, safe consumer group ID for a callback URL. -// Format: {prefix}-websub-{sha256(callbackURL)[:16]} +// Format: {prefix}-websub-{sha256(callbackURL)[:32]} func (cm *ConsumerManager) consumerGroupID(callbackURL string) string { h := sha256.Sum256([]byte(callbackURL)) - return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:16] + return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:32] } diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go index 02ac6a981..8ee3691b6 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go @@ -99,14 +99,14 @@ func (h *HubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { // Enforce subscribe policies before processing. subMsg := httpRequestToMessage(r) - message, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) + _, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) if err != nil { slog.Error("Subscribe policy execution failed", "error", err) http.Error(w, "policy execution failed", http.StatusInternalServerError) return } if shortCircuited { - writePolicyResponse(w, message, http.StatusForbidden, "forbidden by policy") + writePolicyResponse(w, nil, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) return } @@ -203,14 +203,14 @@ func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { func (h *HubHandler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { // Enforce subscribe policies before processing. subMsg := httpRequestToMessage(r) - message, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) + _, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) if err != nil { slog.Error("Unsubscribe policy execution failed", "error", err) http.Error(w, "policy execution failed", http.StatusInternalServerError) return } if shortCircuited { - writePolicyResponse(w, message, http.StatusForbidden, "forbidden by policy") + writePolicyResponse(w, nil, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) return } @@ -342,7 +342,7 @@ func (h *WebhookReceiverHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques } if shortCircuited { slog.Info("Inbound request rejected by policy", "channel", channelName, "binding", h.bindingName) - writePolicyResponse(w, processed, http.StatusForbidden, "forbidden by policy") + writePolicyResponse(w, processed, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) return } @@ -379,6 +379,9 @@ func writePolicyResponse(w http.ResponseWriter, msg *connectors.Message, fallbac w.WriteHeader(statusCode) return } + if fallbackStatus == http.StatusUnauthorized { + w.Header().Set("WWW-Authenticate", `Bearer realm="event-gateway"`) + } http.Error(w, fallbackBody, fallbackStatus) }