diff --git a/internal/integration/integration.go b/internal/integration/integration.go index 5606aab6455..498d08173f4 100644 --- a/internal/integration/integration.go +++ b/internal/integration/integration.go @@ -507,6 +507,45 @@ func SendEntry(entry *model.Entry, userIntegrations *model.Integration) { } } +// PushUpdatedEntries notifies integrations about entries that were already known but whose +// content has changed during a feed refresh (title, body, author, etc.). +// +// Unlike PushEntries, notification channels (Telegram, Pushover, Ntfy, etc.) are intentionally +// skipped to avoid duplicate or noisy alerts for entries the user has already seen. +// Only integrations that benefit from receiving fresh content — currently the webhook — are invoked. +func PushUpdatedEntries(feed *model.Feed, entries model.Entries, userIntegrations *model.Integration) { + if len(entries) == 0 { + return + } + + if userIntegrations.WebhookEnabled { + var webhookURL string + if feed.WebhookURL != "" { + webhookURL = feed.WebhookURL + } else { + webhookURL = userIntegrations.WebhookURL + } + + slog.Debug("Sending updated entries to Webhook", + slog.Int64("user_id", userIntegrations.UserID), + slog.Int("nb_entries", len(entries)), + slog.Int64("feed_id", feed.ID), + slog.String("webhook_url", webhookURL), + ) + + webhookClient := webhook.NewClient(webhookURL, userIntegrations.WebhookSecret) + if err := webhookClient.SendUpdatedEntriesWebhookEvent(feed, entries); err != nil { + slog.Warn("Unable to send updated entries to Webhook", + slog.Int64("user_id", userIntegrations.UserID), + slog.Int("nb_entries", len(entries)), + slog.Int64("feed_id", feed.ID), + slog.String("webhook_url", webhookURL), + slog.Any("error", err), + ) + } + } +} + // PushEntries pushes a list of entries to activated third-party providers during feed refreshes. func PushEntries(feed *model.Feed, entries model.Entries, userIntegrations *model.Integration) { if userIntegrations.MatrixBotEnabled { diff --git a/internal/integration/integration_test.go b/internal/integration/integration_test.go index a059ef4848e..839c6fa95a8 100644 --- a/internal/integration/integration_test.go +++ b/internal/integration/integration_test.go @@ -8,6 +8,7 @@ import ( "log/slog" "strings" "testing" + "time" "miniflux.app/v2/internal/model" ) @@ -61,3 +62,106 @@ func TestSendEntryLogsLinkwardenWithoutCollectionID(t *testing.T) { t.Fatalf("did not expect collection_id in logs; got: %s", out) } } + +func testFeedAndEntries() (*model.Feed, model.Entries) { + feed := &model.Feed{ + ID: 1, + UserID: 1, + Category: &model.Category{ + ID: 1, + Title: "Test", + }, + FeedURL: "https://example.org/feed.xml", + SiteURL: "https://example.org", + Title: "Test Feed", + CheckedAt: time.Now(), + } + entries := model.Entries{ + {ID: 10, UserID: 1, FeedID: 1, URL: "https://example.org/post-1", Title: "Post 1"}, + } + return feed, entries +} + +// TestPushUpdatedEntriesLogsWebhookAttempt verifies that PushUpdatedEntries +// attempts to call the webhook integration when it is enabled. +// The webhook call will fail (empty URL) and produce a Warn log that we capture. +func TestPushUpdatedEntriesLogsWebhookAttempt(t *testing.T) { + var buf bytes.Buffer + handler := slog.NewJSONHandler(&buf, nil) + logger := slog.New(handler) + prev := slog.Default() + slog.SetDefault(logger) + defer slog.SetDefault(prev) + + feed, entries := testFeedAndEntries() + userIntegrations := &model.Integration{ + UserID: 1, + WebhookEnabled: true, + WebhookURL: "", // empty → HTTP call fails → Warn log fires + } + + PushUpdatedEntries(feed, entries, userIntegrations) + + out := buf.String() + if !strings.Contains(out, "updated") { + t.Fatalf("expected webhook warn log for updated entries; got: %s", out) + } +} + +// TestPushUpdatedEntriesSkipsNotificationIntegrations verifies that +// PushUpdatedEntries does not invoke notification channels (Telegram, Ntfy, +// Pushover, etc.) — only the webhook integration is triggered. +func TestPushUpdatedEntriesSkipsNotificationIntegrations(t *testing.T) { + var buf bytes.Buffer + handler := slog.NewJSONHandler(&buf, nil) + logger := slog.New(handler) + prev := slog.Default() + slog.SetDefault(logger) + defer slog.SetDefault(prev) + + feed, entries := testFeedAndEntries() + // All notification integrations enabled, webhook disabled. + userIntegrations := &model.Integration{ + UserID: 1, + WebhookEnabled: false, + TelegramBotEnabled: true, + NtfyEnabled: true, + PushoverEnabled: true, + DiscordEnabled: true, + SlackEnabled: true, + MatrixBotEnabled: true, + AppriseEnabled: true, + } + + PushUpdatedEntries(feed, entries, userIntegrations) + + out := buf.String() + if out != "" { + t.Fatalf("expected no log output when webhook is disabled; got: %s", out) + } +} + +// TestPushUpdatedEntriesNoEntriesIsNoop verifies that PushUpdatedEntries +// exits immediately and calls no integrations when the entries slice is empty. +func TestPushUpdatedEntriesNoEntriesIsNoop(t *testing.T) { + var buf bytes.Buffer + handler := slog.NewJSONHandler(&buf, nil) + logger := slog.New(handler) + prev := slog.Default() + slog.SetDefault(logger) + defer slog.SetDefault(prev) + + feed, _ := testFeedAndEntries() + userIntegrations := &model.Integration{ + UserID: 1, + WebhookEnabled: true, + WebhookURL: "https://example.org/hook", + } + + PushUpdatedEntries(feed, model.Entries{}, userIntegrations) + + out := buf.String() + if out != "" { + t.Fatalf("expected no log output for empty entries; got: %s", out) + } +} diff --git a/internal/integration/webhook/webhook.go b/internal/integration/webhook/webhook.go index 87afc2e9df7..1a7dd63a560 100644 --- a/internal/integration/webhook/webhook.go +++ b/internal/integration/webhook/webhook.go @@ -21,8 +21,9 @@ import ( const ( defaultClientTimeout = 10 * time.Second - NewEntriesEventType = "new_entries" - SaveEntryEventType = "save_entry" + NewEntriesEventType = "new_entries" + UpdatedEntriesEventType = "updated_entries" + SaveEntryEventType = "save_entry" ) type Client struct { @@ -114,6 +115,50 @@ func (c *Client) SendNewEntriesWebhookEvent(feed *model.Feed, entries model.Entr }) } +func (c *Client) SendUpdatedEntriesWebhookEvent(feed *model.Feed, entries model.Entries) error { + if len(entries) == 0 { + return nil + } + + webhookEntries := make([]*WebhookEntry, 0, len(entries)) + for _, entry := range entries { + webhookEntries = append(webhookEntries, &WebhookEntry{ + ID: entry.ID, + UserID: entry.UserID, + FeedID: entry.FeedID, + Status: entry.Status, + Hash: entry.Hash, + Title: entry.Title, + URL: entry.URL, + CommentsURL: entry.CommentsURL, + Date: entry.Date, + CreatedAt: entry.CreatedAt, + ChangedAt: entry.ChangedAt, + Content: entry.Content, + Author: entry.Author, + ShareCode: entry.ShareCode, + Starred: entry.Starred, + ReadingTime: entry.ReadingTime, + Enclosures: entry.Enclosures, + Tags: entry.Tags, + }) + } + return c.makeRequest(UpdatedEntriesEventType, &WebhookUpdatedEntriesEvent{ + EventType: UpdatedEntriesEventType, + Feed: &WebhookFeed{ + ID: feed.ID, + UserID: feed.UserID, + CategoryID: feed.Category.ID, + Category: &WebhookCategory{ID: feed.Category.ID, Title: feed.Category.Title}, + FeedURL: feed.FeedURL, + SiteURL: feed.SiteURL, + Title: feed.Title, + CheckedAt: feed.CheckedAt, + }, + Entries: webhookEntries, + }) +} + func (c *Client) makeRequest(eventType string, payload any) error { if c.webhookURL == "" { return errors.New(`webhook: missing webhook URL`) @@ -192,6 +237,12 @@ type WebhookNewEntriesEvent struct { Entries []*WebhookEntry `json:"entries"` } +type WebhookUpdatedEntriesEvent struct { + EventType string `json:"event_type"` + Feed *WebhookFeed `json:"feed"` + Entries []*WebhookEntry `json:"entries"` +} + type WebhookSaveEntryEvent struct { EventType string `json:"event_type"` Entry *WebhookEntry `json:"entry"` diff --git a/internal/integration/webhook/webhook_test.go b/internal/integration/webhook/webhook_test.go new file mode 100644 index 00000000000..92f3dfb2e99 --- /dev/null +++ b/internal/integration/webhook/webhook_test.go @@ -0,0 +1,155 @@ +// SPDX-FileCopyrightText: Copyright The Miniflux Authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package webhook + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "miniflux.app/v2/internal/config" + "miniflux.app/v2/internal/model" +) + +// configureIntegrationAllowPrivateNetworksOption sets the global config option +// required to allow the webhook HTTP client to reach the httptest server on +// localhost (a private address). It restores the previous config on test cleanup. +func configureIntegrationAllowPrivateNetworksOption(t *testing.T) { + t.Helper() + + t.Setenv("INTEGRATION_ALLOW_PRIVATE_NETWORKS", "1") + + configParser := config.NewConfigParser() + parsedOptions, err := configParser.ParseEnvironmentVariables() + if err != nil { + t.Fatalf("Unable to configure test options: %v", err) + } + + previousOptions := config.Opts + config.Opts = parsedOptions + t.Cleanup(func() { + config.Opts = previousOptions + }) +} + +func testFeed() *model.Feed { + return &model.Feed{ + ID: 1, + UserID: 1, + Category: &model.Category{ + ID: 1, + Title: "Test", + }, + FeedURL: "https://example.org/feed.xml", + SiteURL: "https://example.org", + Title: "Test Feed", + CheckedAt: time.Now(), + } +} + +func testEntries() model.Entries { + return model.Entries{ + {ID: 10, UserID: 1, FeedID: 1, URL: "https://example.org/post-1", Title: "Post 1"}, + } +} + +// TestSendNewEntriesWebhookEventType verifies that SendNewEntriesWebhookEvent +// sends a request whose JSON body has event_type = "new_entries". +func TestSendNewEntriesWebhookEventType(t *testing.T) { + configureIntegrationAllowPrivateNetworksOption(t) + + var gotBody []byte + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotBody, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + client := NewClient(srv.URL, "") + if err := client.SendNewEntriesWebhookEvent(testFeed(), testEntries()); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var payload map[string]any + if err := json.Unmarshal(gotBody, &payload); err != nil { + t.Fatalf("unable to unmarshal payload: %v", err) + } + + if got := payload["event_type"]; got != NewEntriesEventType { + t.Errorf("expected event_type %q, got %q", NewEntriesEventType, got) + } +} + +// TestSendUpdatedEntriesWebhookEventType verifies that SendUpdatedEntriesWebhookEvent +// sends a request whose JSON body has event_type = "updated_entries". +func TestSendUpdatedEntriesWebhookEventType(t *testing.T) { + configureIntegrationAllowPrivateNetworksOption(t) + + var gotBody []byte + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotBody, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + client := NewClient(srv.URL, "") + if err := client.SendUpdatedEntriesWebhookEvent(testFeed(), testEntries()); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var payload map[string]any + if err := json.Unmarshal(gotBody, &payload); err != nil { + t.Fatalf("unable to unmarshal payload: %v", err) + } + + if got := payload["event_type"]; got != UpdatedEntriesEventType { + t.Errorf("expected event_type %q, got %q", UpdatedEntriesEventType, got) + } +} + +// TestSendUpdatedEntriesWebhookEventTypeHeader verifies that the +// X-Miniflux-Event-Type header is set to "updated_entries". +func TestSendUpdatedEntriesWebhookEventTypeHeader(t *testing.T) { + configureIntegrationAllowPrivateNetworksOption(t) + + var gotHeader string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotHeader = r.Header.Get("X-Miniflux-Event-Type") + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + client := NewClient(srv.URL, "") + if err := client.SendUpdatedEntriesWebhookEvent(testFeed(), testEntries()); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if gotHeader != UpdatedEntriesEventType { + t.Errorf("expected X-Miniflux-Event-Type header %q, got %q", UpdatedEntriesEventType, gotHeader) + } +} + +// TestSendUpdatedEntriesWebhookNoEntriesIsNoop verifies that +// SendUpdatedEntriesWebhookEvent returns nil and makes no HTTP call when +// the entries slice is empty. +func TestSendUpdatedEntriesWebhookNoEntriesIsNoop(t *testing.T) { + called := false + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + client := NewClient(srv.URL, "") + if err := client.SendUpdatedEntriesWebhookEvent(testFeed(), model.Entries{}); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if called { + t.Error("expected no HTTP call for empty entries, but server was called") + } +} diff --git a/internal/reader/handler/handler.go b/internal/reader/handler/handler.go index e3ee7db4a16..7ca80cf645e 100644 --- a/internal/reader/handler/handler.go +++ b/internal/reader/handler/handler.go @@ -332,7 +332,7 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool // We also skip updating existing entries if the feed has ignore_entry_updates enabled. // Unless it is forced to refresh. updateExistingEntries := forceRefresh || (!originalFeed.Crawler && !originalFeed.IgnoreEntryUpdates) - newEntries, storeErr := store.RefreshFeedEntries(originalFeed.UserID, originalFeed.ID, originalFeed.Entries, updateExistingEntries) + newEntries, updatedEntries, storeErr := store.RefreshFeedEntries(originalFeed.UserID, originalFeed.ID, originalFeed.Entries, updateExistingEntries) if storeErr != nil { localizedError := locale.NewLocalizedErrorWrapper(storeErr, "error.database_error", storeErr) return getTranslatedLocalizedError(store, userID, originalFeed, localizedError) @@ -345,8 +345,13 @@ func RefreshFeed(store *storage.Storage, userID, feedID int64, forceRefresh bool slog.Int64("feed_id", feedID), slog.Any("error", intErr), ) - } else if userIntegrations != nil && len(newEntries) > 0 { - go integration.PushEntries(originalFeed, newEntries, userIntegrations) + } else if userIntegrations != nil { + if len(newEntries) > 0 { + go integration.PushEntries(originalFeed, newEntries, userIntegrations) + } + if len(updatedEntries) > 0 { + go integration.PushUpdatedEntries(originalFeed, updatedEntries, userIntegrations) + } } originalFeed.EtagHeader = responseHandler.ETag() diff --git a/internal/storage/entry.go b/internal/storage/entry.go index 5d9d80351e8..5150a159880 100644 --- a/internal/storage/entry.go +++ b/internal/storage/entry.go @@ -377,7 +377,9 @@ func (s *Storage) ClearRemovedEntriesContent(limit int) (int64, error) { } // RefreshFeedEntries updates feed entries while refreshing a feed. -func (s *Storage) RefreshFeedEntries(userID, feedID int64, entries model.Entries, updateExistingEntries bool) (newEntries model.Entries, err error) { +// It returns two slices: newEntries (entries created for the first time) and +// updatedEntries (pre-existing entries whose content changed in the feed). +func (s *Storage) RefreshFeedEntries(userID, feedID int64, entries model.Entries, updateExistingEntries bool) (newEntries model.Entries, updatedEntries model.Entries, err error) { entryHashes := make([]string, 0, len(entries)) for _, entry := range entries { @@ -386,20 +388,23 @@ func (s *Storage) RefreshFeedEntries(userID, feedID int64, entries model.Entries tx, err := s.db.Begin() if err != nil { - return nil, fmt.Errorf(`store: unable to start transaction: %v`, err) + return nil, nil, fmt.Errorf(`store: unable to start transaction: %v`, err) } entryExists, err := s.entryExists(tx, entry) if err != nil { if rollbackErr := tx.Rollback(); rollbackErr != nil { - return nil, fmt.Errorf(`store: unable to rollback transaction: %v (rolled back due to: %v)`, rollbackErr, err) + return nil, nil, fmt.Errorf(`store: unable to rollback transaction: %v (rolled back due to: %v)`, rollbackErr, err) } - return nil, err + return nil, nil, err } if entryExists { if updateExistingEntries { err = s.updateEntry(tx, entry) + if err == nil { + updatedEntries = append(updatedEntries, entry) + } } } else { err = s.createEntry(tx, entry) @@ -410,13 +415,13 @@ func (s *Storage) RefreshFeedEntries(userID, feedID int64, entries model.Entries if err != nil { if rollbackErr := tx.Rollback(); rollbackErr != nil { - return nil, fmt.Errorf(`store: unable to rollback transaction: %v (rolled back due to: %v)`, rollbackErr, err) + return nil, nil, fmt.Errorf(`store: unable to rollback transaction: %v (rolled back due to: %v)`, rollbackErr, err) } - return nil, err + return nil, nil, err } if err := tx.Commit(); err != nil { - return nil, fmt.Errorf(`store: unable to commit transaction: %v`, err) + return nil, nil, fmt.Errorf(`store: unable to commit transaction: %v`, err) } entryHashes = append(entryHashes, entry.Hash) @@ -432,7 +437,7 @@ func (s *Storage) RefreshFeedEntries(userID, feedID int64, entries model.Entries } }() - return newEntries, nil + return newEntries, updatedEntries, nil } // ArchiveEntries changes the status of entries to "removed" after the interval (24h minimum).