From a5c8ae86a830acc3d607cdbc47cc51438b0fbfc6 Mon Sep 17 00:00:00 2001 From: Brandur Date: Mon, 22 Dec 2025 13:48:10 -0700 Subject: [PATCH] Add leadership "domains" so multiple Rivers can operate in one schema We've gotten a couple requests so far (see #342 and #1105) to be able to start multiple River clients targeting different queues within the same database/schema, and giving them the capacity to operate independently enough to be functional. This is currently not possible because a single leader is elected given a single schema and it handles all maintenance operations including non-queue ones like periodic job enqueuing. Here, add the idea of a `LeaderDomain`. This lets a user set the "domain" on which a client will elect its leader and allowing multiple leaders to be elected in a single schema. Each leader will run its own maintenance services. Setting `LeaderDomain` causes the additional effect of having maintenance services start to operate only on the queues that their client is configured for. The idea here is to give us backwards compatibility in that the default behavior (in case of an unset `LeaderDomain`) is the same, but providing a path for multiple leaders to be interoperable with each other. There are still a few edges: for example, reindexing is not queue specific, so multiple leaders could be running a reindexer. I've provided guidance in the config documentation that ideally, all clients but one should have their reindexer disabled. --- CHANGELOG.md | 4 + client.go | 61 +++++++- client_test.go | 148 ++++++++++++++++++ internal/leadership/elector.go | 37 +++-- internal/leadership/elector_test.go | 2 + internal/maintenance/job_cleaner.go | 12 ++ internal/maintenance/job_cleaner_test.go | 53 +++++-- internal/maintenance/job_rescuer.go | 16 +- internal/maintenance/job_rescuer_test.go | 38 +++++ internal/maintenance/job_scheduler.go | 43 +++-- internal/maintenance/job_scheduler_test.go | 63 ++++++-- internal/maintenance/queue_cleaner.go | 9 ++ internal/maintenance/queue_cleaner_test.go | 37 ++++- periodic_job.go | 4 + riverdriver/river_driver_interface.go | 19 ++- .../internal/dbsqlc/river_job.sql.go | 30 ++-- .../internal/dbsqlc/river_leader.sql.go | 25 ++- .../internal/dbsqlc/river_queue.sql.go | 30 ++-- ...007_river_leader_non_default_name.down.sql | 3 + .../007_river_leader_non_default_name.up.sql | 3 + .../river_database_sql_driver.go | 16 +- riverdriver/riverdrivertest/job_delete.go | 25 +++ riverdriver/riverdrivertest/job_read.go | 76 ++++++--- riverdriver/riverdrivertest/job_update.go | 24 +++ riverdriver/riverdrivertest/leader.go | 10 ++ riverdriver/riverdrivertest/queue.go | 56 +++++-- .../riverpgxv5/internal/dbsqlc/river_job.sql | 8 + .../internal/dbsqlc/river_job.sql.go | 30 ++-- .../internal/dbsqlc/river_leader.sql | 20 ++- .../internal/dbsqlc/river_leader.sql.go | 25 ++- .../internal/dbsqlc/river_queue.sql | 27 +++- .../internal/dbsqlc/river_queue.sql.go | 30 ++-- ...007_river_leader_non_default_name.down.sql | 3 + .../007_river_leader_non_default_name.up.sql | 3 + riverdriver/riverpgxv5/river_pgx_v5_driver.go | 17 +- .../riversqlite/internal/dbsqlc/river_job.sql | 37 ++--- .../internal/dbsqlc/river_job.sql.go | 61 +++----- .../internal/dbsqlc/river_leader.sql | 13 +- .../internal/dbsqlc/river_leader.sql.go | 25 ++- .../internal/dbsqlc/river_queue.sql | 1 + .../internal/dbsqlc/river_queue.sql.go | 1 + ...007_river_leader_non_default_name.down.sql | 15 ++ .../007_river_leader_non_default_name.up.sql | 16 ++ .../riversqlite/river_sqlite_driver.go | 119 +++++++++++++- .../riversqlite/river_sqlite_driver_test.go | 52 ++++++ rivershared/sqlctemplate/sqlc_template.go | 27 ++++ rivershared/testfactory/test_factory.go | 2 + rivershared/util/sliceutil/slice_util.go | 11 ++ 48 files changed, 1118 insertions(+), 269 deletions(-) create mode 100644 riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.down.sql create mode 100644 riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.up.sql create mode 100644 riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.down.sql create mode 100644 riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.up.sql create mode 100644 riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.down.sql create mode 100644 riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.up.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 048c1aa5..172ff3b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `Config.LeaderDomain` to allow multiple River clients to be elected leader within a single schema/database and run maintenance services on only their configured queues. [PR #1113](https://github.com/riverqueue/river/pull/1113). + ### Fixed - Fix unsafe concurrent producer map access in client. [PR #1236](https://github.com/riverqueue/river/pull/1236). diff --git a/client.go b/client.go index d4e0ec40..933c07c9 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "log/slog" "os" "regexp" + "slices" "strings" "sync" "time" @@ -220,6 +221,48 @@ type Config struct { // Jobs may have their own specific hooks by implementing JobArgsWithHooks. Hooks []rivertype.Hook + // LeaderDomain is an optional "domain" string to use for leader election. + // Different clients sharing the same River schema can elect multiple + // leaders as long as they're using different domains, with one leader + // elected per domain. This is an advanced feature that will almost never be + // needed. Don't set this unless you know what you're doing. + // + // Setting this value also triggers the related behavior that maintenance + // services start to only operate on the queues they're configured on. So + // for example, given client1 handling queue_a and queue_b and client2 + // handling queue_c and queue_d, whichever client is elected leader will end + // up running all maintenance services for all queues (queue_a, queue_b, + // queue_c, and queue_d). But if client1 is using domain "domain1" and + // client2 is using domain "domain2", then client1 (elected in domain1) will + // only run maintenance services on queue_a and queue_b, while client2 + // (elected in domain2) will run maintenance services on queue_c and + // queue_d. + // + // A warning though that River *does not protect against configuration + // mistakes*. If client1 on domain1 is configured for queue_a and queue_b, + // and client2 on domain2 is *also* configured for queue_a and queue_b, then + // both clients may end up running maintenance services on the same queues + // at the same time. It's the caller's responsibility to ensure that doesn't + // happen. + // + // Left empty or use of the special value "default" causes the client to + // operate on all queues. When setting this value to non-empty + // non-"default", no other clients should be left empty or use "default" + // because the default client(s) will infringe on the domains of the + // non-default one(s). + // + // Certain maintenance services that aren't queue-related like the reindexer + // will continue to run on all leaders regardless of domain. If using this + // feature, it's a good idea to configure ReindexerTimeout on all but a + // single leader domain to river.NeverSchedule(). + // + // In general, most River users should not need LeaderDomain, and when + // running multiple Rivers may want to consider using multiple databases and + // multiple schemas instead. + // + // Defaults to "default". + LeaderDomain string + // Logger is the structured logger to use for logging purposes. If none is // specified, logs will be emitted to STDOUT with messages at warn level // or higher. @@ -445,6 +488,7 @@ func (c *Config) WithDefaults() *Config { Hooks: c.Hooks, JobInsertMiddleware: c.JobInsertMiddleware, JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault), + LeaderDomain: c.LeaderDomain, Logger: logger, MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault), Middleware: c.Middleware, @@ -873,6 +917,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{ ClientID: config.ID, + Domain: config.LeaderDomain, Schema: config.Schema, }) client.services = append(client.services, client.elector) @@ -890,6 +935,12 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.services = append(client.services, pluginPilot.PluginServices()...) } + var queuesIncluded []string + if config.LeaderDomain != "" && config.LeaderDomain != leadership.DomainDefault && len(config.Queues) > 0 { + queuesIncluded = maputil.Keys(config.Queues) + slices.Sort(queuesIncluded) + } + // // Maintenance services // @@ -902,6 +953,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod, DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod, QueuesExcluded: client.pilot.JobCleanerQueuesExcluded(), + QueuesIncluded: queuesIncluded, Schema: config.Schema, Timeout: config.JobCleanerTimeout, }, driver.GetExecutor()) @@ -912,6 +964,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{ ClientRetryPolicy: config.RetryPolicy, + QueuesIncluded: queuesIncluded, RescueAfter: config.RescueStuckJobsAfter, Schema: config.Schema, WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { @@ -927,9 +980,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{ - Interval: config.schedulerInterval, - NotifyInsert: client.maybeNotifyInsertForQueues, - Schema: config.Schema, + Interval: config.schedulerInterval, + NotifyInsert: client.maybeNotifyInsertForQueues, + QueuesIncluded: queuesIncluded, + Schema: config.Schema, }, driver.GetExecutor()) maintenanceServices = append(maintenanceServices, jobScheduler) client.testSignals.jobScheduler = &jobScheduler.TestSignals @@ -955,6 +1009,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{ + QueuesIncluded: queuesIncluded, RetentionPeriod: maintenance.QueueRetentionPeriodDefault, Schema: config.Schema, }, driver.GetExecutor()) diff --git a/client_test.go b/client_test.go index f8d035c4..b372eec5 100644 --- a/client_test.go +++ b/client_test.go @@ -1491,6 +1491,154 @@ func Test_Client_Common(t *testing.T) { startstoptest.Stress(ctx, t, clientWithStop) }) + + t.Run("LeaderDomain_Alternate", func(t *testing.T) { + t.Parallel() + + var client1 *Client[pgx.Tx] + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain1" + config.ReindexerSchedule = &neverSchedule{} + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + var err error + client1, err = NewClient(bundle.driver, config) + require.NoError(t, err) + client1.testSignals.Init(t) + } + + var client2 *Client[pgx.Tx] + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain2" + config.Queues = map[string]QueueConfig{ + "queue_c": {MaxWorkers: 50}, + "queue_d": {MaxWorkers: 50}, + } + config.Schema = client1.config.Schema + config.ReindexerSchedule = &neverSchedule{} + + var err error + client2, err = NewClient(bundle.driver, config) + require.NoError(t, err) + client2.testSignals.Init(t) + } + + startClient(ctx, t, client1) + startClient(ctx, t, client2) + + // Both elected + client1.testSignals.queueMaintainerLeader.ElectedLeader.WaitOrTimeout() + client2.testSignals.queueMaintainerLeader.ElectedLeader.WaitOrTimeout() + }) + + t.Run("LeaderDomain_MaintenanceServiceConfigEmpty", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + client, err := NewClient(bundle.driver, config) + require.NoError(t, err) + client.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer) + require.Nil(t, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer) + require.Nil(t, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer) + require.Nil(t, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer) + require.Nil(t, queueCleaner.Config.QueuesIncluded) + }) + + // The domain "default" is special in that it behaves like if LeaderDomain + // was not set. + t.Run("LeaderDomain_MaintenanceServiceConfigDefault", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.LeaderDomain = "default" + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + client, err := NewClient(bundle.driver, config) + require.NoError(t, err) + client.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer) + require.Nil(t, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer) + require.Nil(t, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer) + require.Nil(t, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer) + require.Nil(t, queueCleaner.Config.QueuesIncluded) + }) + + // When non-default leader domains are configured, each client's maintenance + // services are limited to only their client's queues. + t.Run("LeaderDomain_MaintenanceServiceConfigAlternate", func(t *testing.T) { + t.Parallel() + + var client1 *Client[pgx.Tx] + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain1" + config.ReindexerSchedule = &neverSchedule{} + config.Queues = map[string]QueueConfig{ + "queue_a": {MaxWorkers: 50}, + "queue_b": {MaxWorkers: 50}, + } + + var err error + client1, err = NewClient(bundle.driver, config) + require.NoError(t, err) + client1.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client1.queueMaintainer) + require.Equal(t, []string{"queue_a", "queue_b"}, queueCleaner.Config.QueuesIncluded) + } + + { + config, bundle := setupConfig(t) + config.LeaderDomain = "domain2" + config.Queues = map[string]QueueConfig{ + "queue_c": {MaxWorkers: 50}, + "queue_d": {MaxWorkers: 50}, + } + config.Schema = client1.config.Schema + config.ReindexerSchedule = &neverSchedule{} + + client2, err := NewClient(bundle.driver, config) + require.NoError(t, err) + client2.testSignals.Init(t) + + jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, jobCleaner.Config.QueuesIncluded) + jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, jobRescuer.Config.QueuesIncluded) + jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, jobScheduler.Config.QueuesIncluded) + queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client2.queueMaintainer) + require.Equal(t, []string{"queue_c", "queue_d"}, queueCleaner.Config.QueuesIncluded) + } + }) } type workerWithMiddleware[T JobArgs] struct { diff --git a/internal/leadership/elector.go b/internal/leadership/elector.go index c9008582..8e74631c 100644 --- a/internal/leadership/elector.go +++ b/internal/leadership/elector.go @@ -23,6 +23,8 @@ import ( "github.com/riverqueue/river/rivertype" ) +const DomainDefault = "default" + const ( electIntervalDefault = 5 * time.Second electIntervalJitterDefault = 1 * time.Second @@ -202,6 +204,7 @@ func (ts *electorTestSignals) Init(tb testutil.TestingTB) { type Config struct { ClientID string + Domain string ElectInterval time.Duration // period on which each elector attempts elect even without having received a resignation notification ElectIntervalJitter time.Duration Schema string @@ -283,6 +286,7 @@ func NewElector(archetype *baseservice.Archetype, exec riverdriver.Executor, not return baseservice.Init(archetype, &Elector{ config: (&Config{ ClientID: config.ClientID, + Domain: cmp.Or(config.Domain, string(DomainDefault)), ElectInterval: cmp.Or(config.ElectInterval, electIntervalDefault), ElectIntervalJitter: cmp.Or(config.ElectIntervalJitter, electIntervalJitterDefault), Schema: config.Schema, @@ -315,9 +319,9 @@ func (e *Elector) Start(ctx context.Context) error { var sub *notifier.Subscription if e.notifier == nil { - e.Logger.DebugContext(ctx, e.Name+": No notifier configured; starting in poll mode", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": No notifier configured; starting in poll mode", "client_id", e.config.ClientID, "domain", e.config.Domain) } else { - e.Logger.DebugContext(ctx, e.Name+": Listening for leadership changes", "client_id", e.config.ClientID, "topic", notifier.NotificationTopicLeadership) + e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID, "domain", e.config.Domain, "topic", notifier.NotificationTopicLeadership) var err error sub, err = e.notifier.Listen(ctx, notifier.NotificationTopicLeadership, func(topic notifier.NotificationTopic, payload string) { e.handleLeadershipNotification(ctx, topic, payload) @@ -354,7 +358,7 @@ func (e *Elector) Start(ctx context.Context) error { } e.publishLeadershipState(true) - e.Logger.DebugContext(ctx, e.Name+": Gained leadership", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Gained leadership", "client_id", e.config.ClientID, "domain", e.config.Domain) e.testSignals.GainedLeadership.Signal(struct{}{}) err = e.runLeaderState(ctx, term) @@ -363,7 +367,7 @@ func (e *Elector) Start(ctx context.Context) error { return } - e.Logger.ErrorContext(ctx, e.Name+": Error keeping leadership", "client_id", e.config.ClientID, "err", err) + e.Logger.ErrorContext(ctx, e.Name+": Error keeping leadership", "client_id", e.config.ClientID, "domain", e.config.Domain, "err", err) } } }() @@ -377,13 +381,14 @@ func (e *Elector) runFollowerState(ctx context.Context) (leadershipTerm, error) var attempt int for { attempt++ - e.Logger.DebugContext(ctx, e.Name+": Attempting to gain leadership", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Attempting to gain leadership", "client_id", e.config.ClientID, "domain", e.config.Domain) // Use the local monotonic-bearing clock for the trust window. The - // DB-facing timestamp path stays on NowUTCOrNil below. + // DB-facing timestamp path stays on NowOrNil below. attemptStarted := e.Time.Now() leader, err := attemptElect(ctx, e.exec, &riverdriver.LeaderElectParams{ LeaderID: e.config.ClientID, + Name: e.config.Domain, Now: e.Time.NowOrNil(), Schema: e.config.Schema, TTL: e.leaderTTL(), @@ -406,7 +411,7 @@ func (e *Elector) runFollowerState(ctx context.Context) (leadershipTerm, error) attempt = 0 - e.Logger.DebugContext(ctx, e.Name+": Leadership bid was unsuccessful (not an error)", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Leadership bid was unsuccessful (not an error)", "client_id", e.config.ClientID, "domain", e.config.Domain) e.testSignals.DeniedLeadership.Signal(struct{}{}) select { @@ -427,17 +432,17 @@ func (e *Elector) runFollowerState(ctx context.Context) (leadershipTerm, error) func (e *Elector) handleLeadershipNotification(ctx context.Context, topic notifier.NotificationTopic, payload string) { if topic != notifier.NotificationTopicLeadership { // This should not happen unless the notifier is broken. - e.Logger.ErrorContext(ctx, e.Name+": Received unexpected notification", "client_id", e.config.ClientID, "topic", topic, "payload", payload) + e.Logger.ErrorContext(ctx, e.Name+": Received unexpected notification", "client_id", e.config.ClientID, "domain", e.config.Domain, "topic", topic, "payload", payload) return } notification := DBNotification{} if err := json.Unmarshal([]byte(payload), ¬ification); err != nil { - e.Logger.ErrorContext(ctx, e.Name+": Unable to unmarshal leadership notification", "client_id", e.config.ClientID, "err", err) + e.Logger.ErrorContext(ctx, e.Name+": Unable to unmarshal leadership notification", "client_id", e.config.ClientID, "domain", e.config.Domain, "err", err) return } - e.Logger.DebugContext(ctx, e.Name+": Received notification from notifier", "action", notification.Action, "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Received notification from notifier", "action", notification.Action, "client_id", e.config.ClientID, "domain", e.config.Domain) // Do an initial context check so in case context is done, it always takes // precedence over sending a leadership notification. @@ -500,7 +505,7 @@ func (e *Elector) runLeaderState(ctx context.Context, term leadershipTerm) error continue } - e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID) + e.Logger.InfoContext(ctx, e.Name+": Current leader received forced resignation", "client_id", e.config.ClientID, "domain", e.config.Domain) // This client may win leadership again, but drop out of this // function and make it start all over. @@ -510,14 +515,14 @@ func (e *Elector) runLeaderState(ctx context.Context, term leadershipTerm) error // Reelect timer expired; attempt reelection below. } - e.Logger.DebugContext(ctx, e.Name+": Current leader attempting reelect", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Current leader attempting reelect", "client_id", e.config.ClientID, "domain", e.config.Domain) // Use the local monotonic-bearing clock for the trust window. The // DB-facing timestamp path stays on NowOrNil below. attemptStarted := e.Time.Now() attemptTimeout := term.reelectAttemptTimeout(attemptStarted) if attemptTimeout <= 0 { - e.Logger.WarnContext(ctx, e.Name+": Current leader stepping down because the reelection deadline elapsed", "client_id", e.config.ClientID) + e.Logger.WarnContext(ctx, e.Name+": Current leader stepping down because the reelection deadline elapsed", "client_id", e.config.ClientID, "domain", e.config.Domain) e.testSignals.LostLeadership.Signal(struct{}{}) return nil } @@ -525,6 +530,7 @@ func (e *Elector) runLeaderState(ctx context.Context, term leadershipTerm) error leader, err := attemptReelectWithTimeout(ctx, e.exec, &riverdriver.LeaderReelectParams{ ElectedAt: term.electedAt, LeaderID: term.clientID, + Name: e.config.Domain, Now: e.Time.NowOrNil(), Schema: e.config.Schema, TTL: e.leaderTTL(), @@ -577,7 +583,7 @@ func (e *Elector) runLeaderState(ctx context.Context, term leadershipTerm) error // always surrendered in a timely manner so it can be picked up quickly by // another client, even in the event of a cancellation. func (e *Elector) attemptResignLoop(ctx context.Context, term leadershipTerm) { - e.Logger.DebugContext(ctx, e.Name+": Attempting to resign leadership", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Attempting to resign leadership", "client_id", e.config.ClientID, "domain", e.config.Domain) // Make a good faith attempt to resign, even in the presence of errors, but // don't keep hammering if it doesn't work. In case a resignation failure, @@ -623,7 +629,7 @@ func (e *Elector) attemptResign(ctx context.Context, attempt int, term leadershi } if resigned { - e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID) + e.Logger.DebugContext(ctx, e.Name+": Resigned leadership successfully", "client_id", e.config.ClientID, "domain", e.config.Domain) e.testSignals.ResignedLeadership.Signal(struct{}{}) } @@ -638,6 +644,7 @@ func (e *Elector) errorSlogArgs(err error, attempt int, sleepDuration time.Durat return []any{ slog.Int("attempt", attempt), slog.String("client_id", e.config.ClientID), + slog.String("domain", e.config.Domain), slog.String("err", err.Error()), slog.String("sleep_duration", sleepDuration.String()), } diff --git a/internal/leadership/elector_test.go b/internal/leadership/elector_test.go index f0255063..c15fb73d 100644 --- a/internal/leadership/elector_test.go +++ b/internal/leadership/elector_test.go @@ -586,6 +586,7 @@ func testElector[TElectorBundle any]( case 3: return execTx.LeaderInsert(ctx, &riverdriver.LeaderInsertParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, Schema: params.Schema, TTL: params.TTL, @@ -736,6 +737,7 @@ func testElector[TElectorBundle any]( ElectedAt: &newElectedAt, ExpiresAt: &newExpiresAt, LeaderID: leader.LeaderID, + Name: "default", Schema: elector.config.Schema, TTL: elector.leaderTTL(), }) diff --git a/internal/maintenance/job_cleaner.go b/internal/maintenance/job_cleaner.go index f3216561..7a4672c0 100644 --- a/internal/maintenance/job_cleaner.go +++ b/internal/maintenance/job_cleaner.go @@ -56,6 +56,10 @@ type JobCleanerConfig struct { // QueuesExcluded are queues that'll be excluded from cleaning. QueuesExcluded []string + // QueuesIncluded are queues that'll be included in cleaning. If set, only + // these queues will be cleaned. If nil, all queues are cleaned. + QueuesIncluded []string + // Schema where River tables are located. Empty string omits schema, causing // Postgres to default to `search_path`. Schema string @@ -79,6 +83,12 @@ func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig { if c.Interval <= 0 { panic("JobCleanerConfig.Interval must be above zero") } + if c.QueuesExcluded != nil && len(c.QueuesExcluded) == 0 { + panic("JobCleanerConfig.QueuesExcluded should be either nil or a non-empty slice") + } + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("JobCleanerConfig.QueuesIncluded should be either nil or a non-empty slice") + } if c.Timeout <= 0 { panic("JobCleanerConfig.Timeout must be above zero") } @@ -117,6 +127,7 @@ func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, e CompletedJobRetentionPeriod: cmp.Or(config.CompletedJobRetentionPeriod, riversharedmaintenance.CompletedJobRetentionPeriodDefault), DiscardedJobRetentionPeriod: cmp.Or(config.DiscardedJobRetentionPeriod, riversharedmaintenance.DiscardedJobRetentionPeriodDefault), QueuesExcluded: config.QueuesExcluded, + QueuesIncluded: config.QueuesIncluded, Interval: cmp.Or(config.Interval, riversharedmaintenance.JobCleanerIntervalDefault), Schema: config.Schema, Timeout: cmp.Or(config.Timeout, riversharedmaintenance.JobCleanerTimeoutDefault), @@ -205,6 +216,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod), Max: s.batchSize(), QueuesExcluded: s.Config.QueuesExcluded, + QueuesIncluded: s.Config.QueuesIncluded, Schema: s.Config.Schema, }) if err != nil { diff --git a/internal/maintenance/job_cleaner_test.go b/internal/maintenance/job_cleaner_test.go index 96ba0e1b..d55db8fd 100644 --- a/internal/maintenance/job_cleaner_test.go +++ b/internal/maintenance/job_cleaner_test.go @@ -328,7 +328,7 @@ func TestJobCleaner(t *testing.T) { require.ErrorIs(t, err, rivertype.ErrNotFound) }) - t.Run("OmmittedQueues", func(t *testing.T) { + t.Run("QueuesExcluded", func(t *testing.T) { t.Parallel() cleaner, bundle := setup(t) @@ -338,24 +338,24 @@ func TestJobCleaner(t *testing.T) { completedJob = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))}) discardedJob = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateDiscarded), FinalizedAt: ptrutil.Ptr(bundle.discardedDeleteHorizon.Add(-1 * time.Hour))}) - omittedQueue1 = "omitted1" - omittedQueue2 = "omitted1" + excludedQueue1 = "queue1" + excludedQueue2 = "queue2" - // Not deleted because in an omitted queue. - omittedQueueJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &omittedQueue1, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) - omittedQueueJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &omittedQueue2, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + // Not deleted because in an excluded queue. + excludedQueueJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &excludedQueue1, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + excludedQueueJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &excludedQueue2, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) ) - cleaner.Config.QueuesExcluded = []string{omittedQueue1, omittedQueue2} + cleaner.Config.QueuesExcluded = []string{excludedQueue1, excludedQueue2} require.NoError(t, cleaner.Start(ctx)) cleaner.TestSignals.DeletedBatch.WaitOrTimeout() var err error - _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: omittedQueueJob1.ID, Schema: cleaner.Config.Schema}) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: excludedQueueJob1.ID, Schema: cleaner.Config.Schema}) require.NoError(t, err) - _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: omittedQueueJob2.ID, Schema: cleaner.Config.Schema}) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: excludedQueueJob2.ID, Schema: cleaner.Config.Schema}) require.NoError(t, err) _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: cancelledJob.ID, Schema: cleaner.Config.Schema}) @@ -366,6 +366,41 @@ func TestJobCleaner(t *testing.T) { require.ErrorIs(t, err, rivertype.ErrNotFound) }) + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + cleaner, bundle := setup(t) + + var ( + // Not deleted because not in an included queue. + notIncludedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCancelled), FinalizedAt: ptrutil.Ptr(bundle.cancelledDeleteHorizon.Add(-1 * time.Hour))}) + notIncludedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour))}) + + includedQueue1 = "queue1" + includedQueue2 = "queue2" + + includedQueueJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &includedQueue1, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + includedQueueJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{FinalizedAt: ptrutil.Ptr(bundle.completedDeleteHorizon.Add(-1 * time.Hour)), Queue: &includedQueue2, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + ) + + cleaner.Config.QueuesIncluded = []string{includedQueue1, includedQueue2} + + require.NoError(t, cleaner.Start(ctx)) + + cleaner.TestSignals.DeletedBatch.WaitOrTimeout() + + var err error + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedQueueJob1.ID, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedQueueJob2.ID, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob1.ID, Schema: cleaner.Config.Schema}) + require.NoError(t, err) + _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob2.ID, Schema: cleaner.Config.Schema}) + require.NoError(t, err) + }) + t.Run("ReducedBatchSizeBreakerTrips", func(t *testing.T) { t.Parallel() diff --git a/internal/maintenance/job_rescuer.go b/internal/maintenance/job_rescuer.go index c7353b36..0547ea0a 100644 --- a/internal/maintenance/job_rescuer.go +++ b/internal/maintenance/job_rescuer.go @@ -50,6 +50,11 @@ type JobRescuerConfig struct { // Interval is the amount of time to wait between runs of the rescuer. Interval time.Duration + // QueuesIncluded are queues that'll be included when considering jobs to + // rescue. If set, only these queues will be rescued. If nil, jobs in all + // queues are rescued. + QueuesIncluded []string + // RescueAfter is the amount of time for a job to be active before it is // considered stuck and should be rescued. RescueAfter time.Duration @@ -70,6 +75,9 @@ func (c *JobRescuerConfig) mustValidate() *JobRescuerConfig { if c.Interval <= 0 { panic("RescuerConfig.Interval must be above zero") } + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("JobCleanerConfig.QueuesIncluded should be either nil or a non-empty slice") + } if c.RescueAfter <= 0 { panic("RescuerConfig.JobDuration must be above zero") } @@ -109,6 +117,7 @@ func NewRescuer(archetype *baseservice.Archetype, config *JobRescuerConfig, exec BatchSizes: batchSizes, ClientRetryPolicy: config.ClientRetryPolicy, Interval: cmp.Or(config.Interval, JobRescuerIntervalDefault), + QueuesIncluded: config.QueuesIncluded, RescueAfter: cmp.Or(config.RescueAfter, JobRescuerRescueAfterDefault), Schema: config.Schema, WorkUnitFactoryFunc: config.WorkUnitFactoryFunc, @@ -280,9 +289,10 @@ func (s *JobRescuer) getStuckJobs(ctx context.Context) ([]*rivertype.JobRow, err stuckHorizon := time.Now().Add(-s.Config.RescueAfter) return s.exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ - Max: s.batchSize(), - Schema: s.Config.Schema, - StuckHorizon: stuckHorizon, + Max: s.batchSize(), + QueuesIncluded: s.Config.QueuesIncluded, + Schema: s.Config.Schema, + StuckHorizon: stuckHorizon, }) } diff --git a/internal/maintenance/job_rescuer_test.go b/internal/maintenance/job_rescuer_test.go index d1f44ffd..71300515 100644 --- a/internal/maintenance/job_rescuer_test.go +++ b/internal/maintenance/job_rescuer_test.go @@ -405,4 +405,42 @@ func TestJobRescuer(t *testing.T) { } } }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + rescuer, bundle := setup(t) + + var ( + notIncludedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) + notIncludedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) + + includedQueue1 = "queue1" + includedQueue2 = "queue2" + + includedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5), Queue: &includedQueue1}) + includedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5), Queue: &includedQueue2}) + ) + + rescuer.Config.QueuesIncluded = []string{includedQueue1, includedQueue2} + + require.NoError(t, rescuer.Start(ctx)) + + rescuer.TestSignals.FetchedBatch.WaitOrTimeout() + rescuer.TestSignals.UpdatedBatch.WaitOrTimeout() + + includedJob1After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedJob1.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, includedJob1After.State) + includedJob2After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: includedJob2.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, includedJob2After.State) + + notIncludedJob1After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob1.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, notIncludedJob1.State, notIncludedJob1After.State) // not rescued + notIncludedJob2After, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: notIncludedJob2.ID, Schema: rescuer.Config.Schema}) + require.NoError(t, err) + require.Equal(t, notIncludedJob2.State, notIncludedJob2After.State) // not rescued + }) } diff --git a/internal/maintenance/job_scheduler.go b/internal/maintenance/job_scheduler.go index f593ae35..16d304b1 100644 --- a/internal/maintenance/job_scheduler.go +++ b/internal/maintenance/job_scheduler.go @@ -51,6 +51,10 @@ type JobSchedulerConfig struct { // where jobs were scheduled. NotifyInsert NotifyInsertFunc + // QueuesIncluded are queues that'll be included in scheduling. If set, + // only these queues will be scheduled. If nil, all queues are scheduled. + QueuesIncluded []string + // Schema where River tables are located. Empty string omits schema, causing // Postgres to default to `search_path`. Schema string @@ -59,11 +63,14 @@ type JobSchedulerConfig struct { func (c *JobSchedulerConfig) mustValidate() *JobSchedulerConfig { c.MustValidate() + if c.Default <= 0 { + panic("SchedulerConfig.Limit must be above zero") + } if c.Interval <= 0 { panic("SchedulerConfig.Interval must be above zero") } - if c.Default <= 0 { - panic("SchedulerConfig.Limit must be above zero") + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("JobSchedulerConfig.QueuesIncluded should be either nil or a non-empty slice") } return c @@ -77,10 +84,10 @@ type JobScheduler struct { startstop.BaseStartStop // exported for test purposes + Config *JobSchedulerConfig TestSignals JobSchedulerTestSignals - config *JobSchedulerConfig - exec riverdriver.Executor + exec riverdriver.Executor // Circuit breaker that tracks consecutive timeout failures from the central // query. The query starts by using the full/default batch size, but after @@ -95,11 +102,12 @@ func NewJobScheduler(archetype *baseservice.Archetype, config *JobSchedulerConfi batchSizes := config.WithDefaults() return baseservice.Init(archetype, &JobScheduler{ - config: (&JobSchedulerConfig{ - BatchSizes: batchSizes, - Interval: cmp.Or(config.Interval, JobSchedulerIntervalDefault), - NotifyInsert: config.NotifyInsert, - Schema: config.Schema, + Config: (&JobSchedulerConfig{ + BatchSizes: batchSizes, + Interval: cmp.Or(config.Interval, JobSchedulerIntervalDefault), + NotifyInsert: config.NotifyInsert, + QueuesIncluded: config.QueuesIncluded, + Schema: config.Schema, }).mustValidate(), exec: exec, reducedBatchSizeBreaker: riversharedmaintenance.ReducedBatchSizeBreaker(batchSizes), @@ -121,7 +129,7 @@ func (s *JobScheduler) Start(ctx context.Context) error { //nolint:dupl s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted) defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped) - ticker := timeutil.NewTickerWithInitialTick(ctx, s.config.Interval) + ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval) for { select { case <-ctx.Done(): @@ -150,9 +158,9 @@ func (s *JobScheduler) Start(ctx context.Context) error { //nolint:dupl func (s *JobScheduler) batchSize() int { if s.reducedBatchSizeBreaker.Open() { - return s.config.Reduced + return s.Config.Reduced } - return s.config.Default + return s.Config.Default } type schedulerRunOnceResult struct { @@ -175,12 +183,13 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er defer dbutil.RollbackWithoutCancel(ctx, execTx) now := s.Time.Now() - nowWithLookAhead := now.Add(s.config.Interval) + nowWithLookAhead := now.Add(s.Config.Interval) scheduledJobResults, err := execTx.JobSchedule(ctx, &riverdriver.JobScheduleParams{ - Max: s.batchSize(), - Now: &nowWithLookAhead, - Schema: s.config.Schema, + Max: s.batchSize(), + Now: &nowWithLookAhead, + QueuesIncluded: s.Config.QueuesIncluded, + Schema: s.Config.Schema, }) if err != nil { return 0, fmt.Errorf("error scheduling jobs: %w", err) @@ -205,7 +214,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er } if len(queues) > 0 { - if err := s.config.NotifyInsert(ctx, execTx, queues); err != nil { + if err := s.Config.NotifyInsert(ctx, execTx, queues); err != nil { return 0, fmt.Errorf("error notifying insert: %w", err) } s.TestSignals.NotifiedQueues.Signal(queues) diff --git a/internal/maintenance/job_scheduler_test.go b/internal/maintenance/job_scheduler_test.go index 25388042..0ff0713a 100644 --- a/internal/maintenance/job_scheduler_test.go +++ b/internal/maintenance/job_scheduler_test.go @@ -73,21 +73,21 @@ func TestJobScheduler(t *testing.T) { requireJobStateUnchanged := func(t *testing.T, scheduler *JobScheduler, exec riverdriver.Executor, job *rivertype.JobRow) *rivertype.JobRow { t.Helper() - newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.config.Schema}) + newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.Config.Schema}) require.NoError(t, err) require.Equal(t, job.State, newJob.State) return newJob } requireJobStateAvailable := func(t *testing.T, scheduler *JobScheduler, exec riverdriver.Executor, job *rivertype.JobRow) *rivertype.JobRow { t.Helper() - newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.config.Schema}) + newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.Config.Schema}) require.NoError(t, err) require.Equal(t, rivertype.JobStateAvailable, newJob.State) return newJob } requireJobStateDiscardedWithMeta := func(t *testing.T, scheduler *JobScheduler, exec riverdriver.Executor, job *rivertype.JobRow) *rivertype.JobRow { t.Helper() - newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.config.Schema}) + newJob, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: scheduler.Config.Schema}) require.NoError(t, err) require.Equal(t, rivertype.JobStateDiscarded, newJob.State) require.NotNil(t, newJob.FinalizedAt) @@ -100,9 +100,9 @@ func TestJobScheduler(t *testing.T) { scheduler := NewJobScheduler(riversharedtest.BaseServiceArchetype(t), &JobSchedulerConfig{}, nil) - require.Equal(t, JobSchedulerIntervalDefault, scheduler.config.Interval) - require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.config.Default) - require.Equal(t, riversharedmaintenance.BatchSizeReduced, scheduler.config.Reduced) + require.Equal(t, JobSchedulerIntervalDefault, scheduler.Config.Interval) + require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.Config.Default) + require.Equal(t, riversharedmaintenance.BatchSizeReduced, scheduler.Config.Reduced) }) t.Run("StartStopStress", func(t *testing.T) { @@ -130,7 +130,7 @@ func TestJobScheduler(t *testing.T) { scheduledJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) scheduledJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) - scheduledJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(scheduler.config.Interval - time.Millisecond))}) // won't be scheduled + scheduledJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(scheduler.Config.Interval - time.Millisecond))}) // won't be scheduled scheduledJob4 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(30 * time.Second))}) // won't be scheduled retryableJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) @@ -205,13 +205,13 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, bundle := setupTx(t) - scheduler.config.Default = 10 // reduced size for test speed + scheduler.Config.Default = 10 // reduced size for test speed now := time.Now().UTC() // Add one to our chosen batch size to get one extra job and therefore // one extra batch, ensuring that we've tested working multiple. - numJobs := scheduler.config.Default + 1 + numJobs := scheduler.Config.Default + 1 jobs := make([]*rivertype.JobRow, numJobs) @@ -243,7 +243,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, _ := setupTx(t) - scheduler.config.Interval = 1 * time.Microsecond + scheduler.Config.Interval = 1 * time.Microsecond require.NoError(t, scheduler.Start(ctx)) @@ -258,7 +258,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, _ := setupTx(t) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run require.NoError(t, scheduler.Start(ctx)) scheduler.Stop() @@ -268,7 +268,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, _ := setupTx(t) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run ctx, cancelFunc := context.WithCancel(ctx) @@ -287,7 +287,7 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, bundle := setupTx(t) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run now := time.Now().UTC() job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) @@ -320,8 +320,8 @@ func TestJobScheduler(t *testing.T) { notifyCh := make(chan []string, 10) scheduler, _ := setup(t, &testOpts{exec: exec, schema: schema}) - scheduler.config.Interval = time.Minute // should only trigger once for the initial run - scheduler.config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { + scheduler.Config.Interval = time.Minute // should only trigger once for the initial run + scheduler.Config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { notifyCh <- queues return nil } @@ -357,7 +357,7 @@ func TestJobScheduler(t *testing.T) { addJob("other_status_queue", time.Minute, rivertype.JobStateCancelled) // it's cancelled // This one is scheduled in the future, just barely before the next run, so it should // be scheduled but shouldn't trigger a notification: - addJob("queue5", scheduler.config.Interval-time.Millisecond, rivertype.JobStateRetryable) + addJob("queue5", scheduler.Config.Interval-time.Millisecond, rivertype.JobStateRetryable) // Run the scheduler and wait for it to execute once: require.NoError(t, scheduler.Start(ctx)) @@ -439,4 +439,35 @@ func TestJobScheduler(t *testing.T) { } } }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + scheduler, bundle := setupTx(t) + + var ( + now = time.Now().UTC() + + notIncludedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) + notIncludedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) + + includedQueue1 = "queue1" + includedQueue2 = "queue2" + + includedJob1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: &includedQueue1, State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))}) + includedJob2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: &includedQueue2, State: ptrutil.Ptr(rivertype.JobStateScheduled), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) + ) + + scheduler.Config.QueuesIncluded = []string{includedQueue1, includedQueue2} + + require.NoError(t, scheduler.Start(ctx)) + + scheduler.TestSignals.ScheduledBatch.WaitOrTimeout() + + requireJobStateUnchanged(t, scheduler, bundle.exec, notIncludedJob1) + requireJobStateUnchanged(t, scheduler, bundle.exec, notIncludedJob2) + + requireJobStateAvailable(t, scheduler, bundle.exec, includedJob1) + requireJobStateAvailable(t, scheduler, bundle.exec, includedJob2) + }) } diff --git a/internal/maintenance/queue_cleaner.go b/internal/maintenance/queue_cleaner.go index f64e97db..2abb328c 100644 --- a/internal/maintenance/queue_cleaner.go +++ b/internal/maintenance/queue_cleaner.go @@ -41,6 +41,10 @@ type QueueCleanerConfig struct { // Interval is the amount of time to wait between runs of the cleaner. Interval time.Duration + // QueuesIncluded are queues that'll be included in cleaning. If set, only + // these queues will be cleaned. If nil, all queues are cleaned. + QueuesIncluded []string + // RetentionPeriod is the amount of time to keep queues around before they're // removed. RetentionPeriod time.Duration @@ -56,6 +60,9 @@ func (c *QueueCleanerConfig) mustValidate() *QueueCleanerConfig { if c.Interval <= 0 { panic("QueueCleanerConfig.Interval must be above zero") } + if c.QueuesIncluded != nil && len(c.QueuesIncluded) == 0 { + panic("QueueCleanerConfig.QueuesIncluded should be either nil or a non-empty slice") + } if c.RetentionPeriod <= 0 { panic("QueueCleanerConfig.RetentionPeriod must be above zero") } @@ -91,6 +98,7 @@ func NewQueueCleaner(archetype *baseservice.Archetype, config *QueueCleanerConfi Config: (&QueueCleanerConfig{ BatchSizes: batchSizes, Interval: cmp.Or(config.Interval, queueCleanerIntervalDefault), + QueuesIncluded: config.QueuesIncluded, RetentionPeriod: cmp.Or(config.RetentionPeriod, QueueRetentionPeriodDefault), Schema: config.Schema, }).mustValidate(), @@ -163,6 +171,7 @@ func (s *QueueCleaner) runOnce(ctx context.Context) (*queueCleanerRunOnceResult, queuesDeleted, err := s.exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{ Max: s.batchSize(), + QueuesIncluded: s.Config.QueuesIncluded, Schema: s.Config.Schema, UpdatedAtHorizon: time.Now().Add(-s.Config.RetentionPeriod), }) diff --git a/internal/maintenance/queue_cleaner_test.go b/internal/maintenance/queue_cleaner_test.go index 06058e95..6e2b9f0e 100644 --- a/internal/maintenance/queue_cleaner_test.go +++ b/internal/maintenance/queue_cleaner_test.go @@ -106,12 +106,12 @@ func TestQueueCleaner(t *testing.T) { Name: queue3.Name, Schema: cleaner.Config.Schema, }) - require.ErrorIs(t, err, rivertype.ErrNotFound) // still there + require.ErrorIs(t, err, rivertype.ErrNotFound) _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{ Name: queue4.Name, Schema: cleaner.Config.Schema, }) - require.ErrorIs(t, err, rivertype.ErrNotFound) // still there + require.ErrorIs(t, err, rivertype.ErrNotFound) _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{ Name: queue5.Name, Schema: cleaner.Config.Schema, @@ -302,4 +302,37 @@ func TestQueueCleaner(t *testing.T) { } } }) + + t.Run("QueuesIncluded", func(t *testing.T) { + t.Parallel() + + cleaner, bundle := setup(t) + + var ( + now = time.Now() + + notIncludedQueue1 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) + notIncludedQueue2 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) + + includedQueue1 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("included1"), UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) + includedQueue2 = testfactory.Queue(ctx, t, bundle.exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("included2"), UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) + ) + + cleaner.Config.QueuesIncluded = []string{includedQueue1.Name, includedQueue2.Name} + + require.NoError(t, cleaner.Start(ctx)) + + cleaner.TestSignals.DeletedBatch.WaitOrTimeout() + + var err error + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: notIncludedQueue1.Name, Schema: cleaner.Config.Schema}) + require.NoError(t, err) // still there + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: notIncludedQueue2.Name, Schema: cleaner.Config.Schema}) + require.NoError(t, err) // still there + + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: includedQueue1.Name, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + _, err = bundle.exec.QueueGet(ctx, &riverdriver.QueueGetParams{Name: includedQueue2.Name, Schema: cleaner.Config.Schema}) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) } diff --git a/periodic_job.go b/periodic_job.go index a6fd8795..ddfc89c9 100644 --- a/periodic_job.go +++ b/periodic_job.go @@ -37,6 +37,10 @@ type PeriodicJobOpts struct { // ID is an optional identifier for the job. Identifiers must be unique // between all periodic jobs and adding a periodic job will error if they're // not. + // + // If using Config.LeaderDomain, ID should be unique across all domains. If + // it isn't, clients running in different domains may enqueue conflicting + // periodic jobs with the same ID. ID string // RunOnStart can be used to indicate that a periodic job should insert an diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 8727a6e3..44c15bcf 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -383,6 +383,7 @@ type JobDeleteBeforeParams struct { DiscardedDoDelete bool DiscardedFinalizedAtHorizon time.Time Max int + Queues []string QueuesExcluded []string QueuesIncluded []string Schema string @@ -422,9 +423,10 @@ type JobGetByKindManyParams struct { } type JobGetStuckParams struct { - Max int - Schema string - StuckHorizon time.Time + Max int + QueuesIncluded []string + Schema string + StuckHorizon time.Time } type JobInsertFastParams struct { @@ -515,9 +517,10 @@ type JobRetryParams struct { } type JobScheduleParams struct { - Max int - Now *time.Time - Schema string + Max int + Now *time.Time + QueuesIncluded []string + Schema string } type JobScheduleResult struct { @@ -689,6 +692,7 @@ type LeaderInsertParams struct { ElectedAt *time.Time ExpiresAt *time.Time LeaderID string + Name string Now *time.Time Schema string TTL time.Duration @@ -696,6 +700,7 @@ type LeaderInsertParams struct { type LeaderElectParams struct { LeaderID string + Name string Now *time.Time Schema string TTL time.Duration @@ -704,6 +709,7 @@ type LeaderElectParams struct { type LeaderReelectParams struct { ElectedAt time.Time LeaderID string + Name string Now *time.Time Schema string TTL time.Duration @@ -793,6 +799,7 @@ type QueueCreateOrSetUpdatedAtParams struct { type QueueDeleteExpiredParams struct { Max int + QueuesIncluded []string Schema string UpdatedAtHorizon time.Time } diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index ac01941a..17f90617 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -610,17 +610,22 @@ SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finali FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < $1::timestamptz + AND ( + $2::text[] IS NULL + OR queue = any($2) + ) ORDER BY id -LIMIT $2 +LIMIT $3 ` type JobGetStuckParams struct { - StuckHorizon time.Time - Max int32 + StuckHorizon time.Time + QueuesIncluded []string + Max int32 } func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckParams) ([]*RiverJob, error) { - rows, err := db.QueryContext(ctx, jobGetStuck, arg.StuckHorizon, arg.Max) + rows, err := db.QueryContext(ctx, jobGetStuck, arg.StuckHorizon, pq.Array(arg.QueuesIncluded), arg.Max) if err != nil { return nil, err } @@ -1336,12 +1341,16 @@ WITH jobs_to_schedule AS ( state IN ('retryable', 'scheduled') AND priority >= 0 AND queue IS NOT NULL - AND scheduled_at <= coalesce($1::timestamptz, now()) + AND ( + $1::text[] IS NULL + OR queue = any($1) + ) + AND scheduled_at <= coalesce($2::timestamptz, now()) ORDER BY priority, scheduled_at, id - LIMIT $2::bigint + LIMIT $3::bigint FOR UPDATE ), jobs_with_rownum AS ( @@ -1388,7 +1397,7 @@ updated_jobs AS ( UPDATE /* TEMPLATE: schema */river_job SET state = job_updates.new_state, - finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($1::timestamptz, now()) + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($2::timestamptz, now()) ELSE river_job.finalized_at END, metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb ELSE river_job.metadata END @@ -1406,8 +1415,9 @@ JOIN updated_jobs ON river_job.id = updated_jobs.id ` type JobScheduleParams struct { - Now *time.Time - Max int64 + QueuesIncluded []string + Now *time.Time + Max int64 } type JobScheduleRow struct { @@ -1416,7 +1426,7 @@ type JobScheduleRow struct { } func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobScheduleParams) ([]*JobScheduleRow, error) { - rows, err := db.QueryContext(ctx, jobSchedule, arg.Now, arg.Max) + rows, err := db.QueryContext(ctx, jobSchedule, pq.Array(arg.QueuesIncluded), arg.Now, arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go index a54d2bd7..16b51927 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_leader.sql.go @@ -15,12 +15,14 @@ const leaderAttemptElect = `-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( $1, coalesce($2::timestamptz, now()), -- @ttl is inserted as as seconds rather than a duration because ` + "`" + `lib/pq` + "`" + ` doesn't support the latter - coalesce($2::timestamptz, now()) + make_interval(secs => $3) + coalesce($2::timestamptz, now()) + make_interval(secs => $3), + $4 ) ON CONFLICT (name) DO NOTHING @@ -31,10 +33,16 @@ type LeaderAttemptElectParams struct { LeaderID string Now *time.Time TTL float64 + Name string } func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (*RiverLeader, error) { - row := db.QueryRowContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + row := db.QueryRowContext(ctx, leaderAttemptElect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) var i RiverLeader err := row.Scan( &i.ElectedAt, @@ -52,6 +60,7 @@ WHERE elected_at = $3::timestamptz AND expires_at >= coalesce($1::timestamptz, now()) AND leader_id = $4 + AND name = $5 RETURNING elected_at, expires_at, leader_id, name ` @@ -60,6 +69,7 @@ type LeaderAttemptReelectParams struct { TTL float64 ElectedAt time.Time LeaderID string + Name string } func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (*RiverLeader, error) { @@ -68,6 +78,7 @@ func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *Leader arg.TTL, arg.ElectedAt, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( @@ -113,11 +124,13 @@ const leaderInsert = `-- name: LeaderInsert :one INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce($1::timestamptz, coalesce($2::timestamptz, now())), coalesce($3::timestamptz, coalesce($2::timestamptz, now()) + make_interval(secs => $4)), - $5 + $5, + $6 ) RETURNING elected_at, expires_at, leader_id, name ` @@ -127,6 +140,7 @@ type LeaderInsertParams struct { ExpiresAt *time.Time TTL float64 LeaderID string + Name string } func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertParams) (*RiverLeader, error) { @@ -136,6 +150,7 @@ func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertPa arg.ExpiresAt, arg.TTL, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go index b33808c8..6533c31f 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go @@ -59,24 +59,36 @@ func (q *Queries) QueueCreateOrSetUpdatedAt(ctx context.Context, db DBTX, arg *Q } const queueDeleteExpired = `-- name: QueueDeleteExpired :many -DELETE FROM /* TEMPLATE: schema */river_queue -WHERE name IN ( - SELECT name - FROM /* TEMPLATE: schema */river_queue - WHERE river_queue.updated_at < $1 - ORDER BY name ASC - LIMIT $2::bigint +WITH deleted_queues AS ( + DELETE FROM /* TEMPLATE: schema */river_queue + WHERE name IN ( + SELECT name + FROM /* TEMPLATE: schema */river_queue + WHERE river_queue.updated_at < $1 + AND ( + $2::text[] IS NULL + OR name = any($2) + ) + ORDER BY name ASC + LIMIT $3::bigint + ) + RETURNING name, created_at, metadata, paused_at, updated_at ) -RETURNING name, created_at, metadata, paused_at, updated_at +SELECT name, created_at, metadata, paused_at, updated_at +FROM /* TEMPLATE: schema */river_queue +WHERE name IN (SELECT name FROM deleted_queues) +ORDER BY name ASC ` type QueueDeleteExpiredParams struct { UpdatedAtHorizon time.Time + QueuesIncluded []string Max int64 } +// Uses a CTE only to guarantee return order. func (q *Queries) QueueDeleteExpired(ctx context.Context, db DBTX, arg *QueueDeleteExpiredParams) ([]*RiverQueue, error) { - rows, err := db.QueryContext(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, arg.Max) + rows, err := db.QueryContext(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, pq.Array(arg.QueuesIncluded), arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.down.sql b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.down.sql new file mode 100644 index 00000000..70d21044 --- /dev/null +++ b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (name = 'default'); \ No newline at end of file diff --git a/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.up.sql b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.up.sql new file mode 100644 index 00000000..b1721078 --- /dev/null +++ b/riverdriver/riverdatabasesql/migration/main/007_river_leader_non_default_name.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128); \ No newline at end of file diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index d8a2d445..aed728b8 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -267,7 +267,7 @@ func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobD DiscardedFinalizedAtHorizon: params.DiscardedFinalizedAtHorizon, Max: int64(params.Max), QueuesExcluded: params.QueuesExcluded, - QueuesIncluded: params.QueuesIncluded, + QueuesIncluded: sliceutil.NilIfEmpty(params.QueuesIncluded), }) if err != nil { return 0, interpretError(err) @@ -332,8 +332,9 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { jobs, err := dbsqlc.New().JobGetStuck(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetStuckParams{ - Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec - StuckHorizon: params.StuckHorizon, + Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec + QueuesIncluded: sliceutil.NilIfEmpty(params.QueuesIncluded), + StuckHorizon: params.StuckHorizon, }) if err != nil { return nil, interpretError(err) @@ -594,8 +595,9 @@ func (e *Executor) JobRetry(ctx context.Context, params *riverdriver.JobRetryPar func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error) { scheduleResults, err := dbsqlc.New().JobSchedule(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobScheduleParams{ - Max: int64(params.Max), - Now: params.Now, + Max: int64(params.Max), + Now: params.Now, + QueuesIncluded: sliceutil.NilIfEmpty(params.QueuesIncluded), }) if err != nil { return nil, interpretError(err) @@ -719,6 +721,7 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { leader, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -732,6 +735,7 @@ func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver leader, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ ElectedAt: params.ElectedAt, LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -762,6 +766,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI ElectedAt: params.ElectedAt, ExpiresAt: params.ExpiresAt, LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -889,6 +894,7 @@ func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverd func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error) { queues, err := dbsqlc.New().QueueDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.QueueDeleteExpiredParams{ Max: int64(params.Max), + QueuesIncluded: sliceutil.NilIfEmpty(params.QueuesIncluded), UpdatedAtHorizon: params.UpdatedAtHorizon, }) if err != nil { diff --git a/riverdriver/riverdrivertest/job_delete.go b/riverdriver/riverdrivertest/job_delete.go index 787a5d09..c0bcd3ce 100644 --- a/riverdriver/riverdrivertest/job_delete.go +++ b/riverdriver/riverdrivertest/job_delete.go @@ -306,6 +306,31 @@ func exerciseJobDelete[TTx any](ctx context.Context, t *testing.T, executorWithT _, err = exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: deletedJob2.ID}) require.ErrorIs(t, err, rivertype.ErrNotFound) }) + + // Verifies that an empty QueuesIncluded slice is treated as nil (no + // filter) rather than as "match no queues". + t.Run("QueuesIncludedEmptySlice", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCancelled)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateDiscarded)}) + + numDeleted, err := exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{ + CancelledDoDelete: true, + CancelledFinalizedAtHorizon: horizon, + CompletedDoDelete: true, + CompletedFinalizedAtHorizon: horizon, + DiscardedDoDelete: true, + DiscardedFinalizedAtHorizon: horizon, + Max: 1_000, + QueuesIncluded: []string{}, + }) + require.NoError(t, err) + require.Equal(t, 3, numDeleted) + }) }) t.Run("JobDeleteMany", func(t *testing.T) { diff --git a/riverdriver/riverdrivertest/job_read.go b/riverdriver/riverdrivertest/job_read.go index 5649d910..6d1e5c07 100644 --- a/riverdriver/riverdrivertest/job_read.go +++ b/riverdriver/riverdrivertest/job_read.go @@ -511,40 +511,66 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx t.Run("JobGetStuck", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + t.Run("Success", func(t *testing.T) { + t.Parallel() - var ( - horizon = time.Now().UTC() - beforeHorizon = horizon.Add(-1 * time.Minute) - afterHorizon = horizon.Add(1 * time.Minute) - ) + exec, _ := setup(ctx, t) - stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - stuckJob2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + var ( + horizon = time.Now().UTC() + beforeHorizon = horizon.Add(-1 * time.Minute) + afterHorizon = horizon.Add(1 * time.Minute) + ) - t.Logf("horizon = %s", horizon) - t.Logf("stuckJob1 = %s", stuckJob1.AttemptedAt) - t.Logf("stuckJob2 = %s", stuckJob2.AttemptedAt) + stuckJob1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + stuckJob2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - t.Logf("stuckJob1 full = %s", spew.Sdump(stuckJob1)) + t.Logf("horizon = %s", horizon) + t.Logf("stuckJob1 = %s", stuckJob1.AttemptedAt) + t.Logf("stuckJob2 = %s", stuckJob2.AttemptedAt) - // Not returned because we put a maximum of two. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + t.Logf("stuckJob1 full = %s", spew.Sdump(stuckJob1)) - // Not stuck because not in running state. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + // Not returned because we put a maximum of two. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - // Not stuck because after queried horizon. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + // Not stuck because not in running state. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + + // Not stuck because after queried horizon. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) - // Max two stuck - stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ - Max: 2, - StuckHorizon: horizon, + // Max two stuck + stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ + Max: 2, + StuckHorizon: horizon, + }) + require.NoError(t, err) + require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID}, + sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID })) + }) + + // Verifies that an empty QueuesIncluded slice is treated as nil (no + // filter) rather than as "match no queues". + t.Run("QueuesIncludedEmptySlice", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + horizon := time.Now().UTC() + beforeHorizon := horizon.Add(-1 * time.Minute) + + stuckJob := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{AttemptedAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + stuckJobs, err := exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ + Max: 100, + QueuesIncluded: []string{}, + StuckHorizon: horizon, + }) + require.NoError(t, err) + require.Equal(t, []int64{stuckJob.ID}, + sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID })) }) - require.NoError(t, err) - require.Equal(t, []int64{stuckJob1.ID, stuckJob2.ID}, - sliceutil.Map(stuckJobs, func(j *rivertype.JobRow) int64 { return j.ID })) }) t.Run("JobKindList", func(t *testing.T) { diff --git a/riverdriver/riverdrivertest/job_update.go b/riverdriver/riverdrivertest/job_update.go index e2d22210..6e81d16d 100644 --- a/riverdriver/riverdrivertest/job_update.go +++ b/riverdriver/riverdrivertest/job_update.go @@ -532,6 +532,30 @@ func exerciseJobUpdate[TTx any](ctx context.Context, t *testing.T, executorWithT require.Equal(t, rivertype.JobStateDiscarded, updatedJob2.State) require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").String()) }) + + // Verifies that an empty QueuesIncluded slice is treated as nil (no + // filter) rather than as "match no queues". + t.Run("QueuesIncludedEmptySlice", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + var ( + horizon = time.Now() + beforeHorizon = horizon.Add(-1 * time.Minute) + ) + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + + result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 100, + Now: &horizon, + QueuesIncluded: []string{}, + }) + require.NoError(t, err) + require.Len(t, result, 1) + require.Equal(t, job.ID, result[0].Job.ID) + }) }) makeErrPayload := func(t *testing.T, now time.Time) []byte { diff --git a/riverdriver/riverdrivertest/leader.go b/riverdriver/riverdrivertest/leader.go index f81904f8..0879ded1 100644 --- a/riverdriver/riverdrivertest/leader.go +++ b/riverdriver/riverdrivertest/leader.go @@ -51,6 +51,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f leader, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: testClientID, + Name: "default", Now: &now, TTL: leaderTTL, }) @@ -77,6 +78,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f leaderAttempt, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: "different-client-id", + Name: "default", TTL: leaderTTL, }) require.ErrorIs(t, err, rivertype.ErrNotFound) @@ -97,6 +99,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f leader, err := exec.LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{ LeaderID: testClientID, + Name: "default", TTL: leaderTTL, }) require.NoError(t, err) @@ -124,6 +127,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ ElectedAt: leader.ElectedAt, LeaderID: testClientID, + Name: "default", TTL: leaderTTL, }) require.ErrorIs(t, err, rivertype.ErrNotFound) @@ -150,6 +154,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ ElectedAt: leader.ElectedAt, LeaderID: testClientID, + Name: "default", TTL: 30 * time.Second, }) require.NoError(t, err) @@ -179,6 +184,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ ElectedAt: leader.ElectedAt, LeaderID: testClientID, + Name: "default", TTL: 30 * time.Second, }) require.ErrorIs(t, err, rivertype.ErrNotFound) @@ -202,6 +208,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ ElectedAt: leader.ElectedAt.Add(-time.Second), LeaderID: testClientID, + Name: "default", TTL: 30 * time.Second, }) require.ErrorIs(t, err, rivertype.ErrNotFound) @@ -224,6 +231,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f updatedLeader, err := exec.LeaderAttemptReelect(ctx, &riverdriver.LeaderReelectParams{ ElectedAt: leader.ElectedAt, LeaderID: testClientID, + Name: "default", TTL: leaderTTL, }) require.NoError(t, err) @@ -304,6 +312,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f ElectedAt: &electedAt, ExpiresAt: &expiresAt, LeaderID: testClientID, + Name: "default", TTL: leaderTTL, }) require.NoError(t, err) @@ -319,6 +328,7 @@ func exerciseLeader[TTx any](ctx context.Context, t *testing.T, executorWithTx f leader, err := exec.LeaderInsert(ctx, &riverdriver.LeaderInsertParams{ LeaderID: testClientID, + Name: "default", Now: &now, TTL: leaderTTL, }) diff --git a/riverdriver/riverdrivertest/queue.go b/riverdriver/riverdrivertest/queue.go index 5c1bad68..eeafd9ef 100644 --- a/riverdriver/riverdrivertest/queue.go +++ b/riverdriver/riverdrivertest/queue.go @@ -115,27 +115,51 @@ func exerciseQueue[TTx any](ctx context.Context, t *testing.T, executorWithTx fu t.Run("QueueDeleteExpired", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + t.Run("Success", func(t *testing.T) { + t.Parallel() - now := time.Now() - _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now)}) - queue2 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) - queue3 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) - queue4 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-48 * time.Hour))}) - _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-23 * time.Hour))}) + exec, _ := setup(ctx, t) - horizon := now.Add(-24 * time.Hour) - deletedQueueNames, err := exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{Max: 2, UpdatedAtHorizon: horizon}) - require.NoError(t, err) + now := time.Now() + _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now)}) + queue2 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-25 * time.Hour))}) + queue3 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-26 * time.Hour))}) + queue4 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-48 * time.Hour))}) + _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-23 * time.Hour))}) + + horizon := now.Add(-24 * time.Hour) + deletedQueueNames, err := exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{Max: 2, UpdatedAtHorizon: horizon}) + require.NoError(t, err) - // queue2 and queue3 should be deleted, with queue4 being skipped due to max of 2: - require.Equal(t, []string{queue2.Name, queue3.Name}, deletedQueueNames) + // queue2 and queue3 should be deleted, with queue4 being skipped due to max of 2: + require.Equal(t, []string{queue2.Name, queue3.Name}, deletedQueueNames) - // Try again, make sure queue4 gets deleted this time: - deletedQueueNames, err = exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{Max: 2, UpdatedAtHorizon: horizon}) - require.NoError(t, err) + // Try again, make sure queue4 gets deleted this time: + deletedQueueNames, err = exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{Max: 2, UpdatedAtHorizon: horizon}) + require.NoError(t, err) + + require.Equal(t, []string{queue4.Name}, deletedQueueNames) + }) - require.Equal(t, []string{queue4.Name}, deletedQueueNames) + // Verifies that an empty QueuesIncluded slice is treated as nil (no + // filter) rather than as "match no queues". + t.Run("QueuesIncludedEmptySlice", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now() + queue := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{UpdatedAt: ptrutil.Ptr(now.Add(-48 * time.Hour))}) + + horizon := now.Add(-24 * time.Hour) + deletedQueueNames, err := exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{ + Max: 100, + QueuesIncluded: []string{}, + UpdatedAtHorizon: horizon, + }) + require.NoError(t, err) + require.Equal(t, []string{queue.Name}, deletedQueueNames) + }) }) t.Run("QueueGet", func(t *testing.T) { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index dde85de4..89ad3f6e 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -259,6 +259,10 @@ SELECT * FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < @stuck_horizon::timestamptz + AND ( + @queues_included::text[] IS NULL + OR queue = any(@queues_included) + ) ORDER BY id LIMIT @max; @@ -548,6 +552,10 @@ WITH jobs_to_schedule AS ( state IN ('retryable', 'scheduled') AND priority >= 0 AND queue IS NOT NULL + AND ( + @queues_included::text[] IS NULL + OR queue = any(@queues_included) + ) AND scheduled_at <= coalesce(sqlc.narg('now')::timestamptz, now()) ORDER BY priority, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 0be29561..8cf3b288 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -592,17 +592,22 @@ SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finali FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < $1::timestamptz + AND ( + $2::text[] IS NULL + OR queue = any($2) + ) ORDER BY id -LIMIT $2 +LIMIT $3 ` type JobGetStuckParams struct { - StuckHorizon time.Time - Max int32 + StuckHorizon time.Time + QueuesIncluded []string + Max int32 } func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckParams) ([]*RiverJob, error) { - rows, err := db.Query(ctx, jobGetStuck, arg.StuckHorizon, arg.Max) + rows, err := db.Query(ctx, jobGetStuck, arg.StuckHorizon, arg.QueuesIncluded, arg.Max) if err != nil { return nil, err } @@ -1303,12 +1308,16 @@ WITH jobs_to_schedule AS ( state IN ('retryable', 'scheduled') AND priority >= 0 AND queue IS NOT NULL - AND scheduled_at <= coalesce($1::timestamptz, now()) + AND ( + $1::text[] IS NULL + OR queue = any($1) + ) + AND scheduled_at <= coalesce($2::timestamptz, now()) ORDER BY priority, scheduled_at, id - LIMIT $2::bigint + LIMIT $3::bigint FOR UPDATE ), jobs_with_rownum AS ( @@ -1355,7 +1364,7 @@ updated_jobs AS ( UPDATE /* TEMPLATE: schema */river_job SET state = job_updates.new_state, - finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($1::timestamptz, now()) + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN coalesce($2::timestamptz, now()) ELSE river_job.finalized_at END, metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb ELSE river_job.metadata END @@ -1373,8 +1382,9 @@ JOIN updated_jobs ON river_job.id = updated_jobs.id ` type JobScheduleParams struct { - Now *time.Time - Max int64 + QueuesIncluded []string + Now *time.Time + Max int64 } type JobScheduleRow struct { @@ -1383,7 +1393,7 @@ type JobScheduleRow struct { } func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobScheduleParams) ([]*JobScheduleRow, error) { - rows, err := db.Query(ctx, jobSchedule, arg.Now, arg.Max) + rows, err := db.Query(ctx, jobSchedule, arg.QueuesIncluded, arg.Now, arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql index cea2195f..05e0d638 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql @@ -2,8 +2,11 @@ CREATE UNLOGGED TABLE river_leader( elected_at timestamptz NOT NULL, expires_at timestamptz NOT NULL, leader_id text NOT NULL, - name text PRIMARY KEY DEFAULT 'default' CHECK (name = 'default'), - CONSTRAINT name_length CHECK (name = 'default'), + + -- this would be more aptly called "domain", but left as is for a less + -- invasive migration change + name text PRIMARY KEY DEFAULT 'default' CHECK (char_length(name) > 0 AND char_length(name) < 128), + CONSTRAINT leader_id_length CHECK (char_length(leader_id) > 0 AND char_length(leader_id) < 128) ); @@ -11,12 +14,14 @@ CREATE UNLOGGED TABLE river_leader( INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( @leader_id, coalesce(sqlc.narg('now')::timestamptz, now()), -- @ttl is inserted as as seconds rather than a duration because `lib/pq` doesn't support the latter - coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl) + coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl), + @name ) ON CONFLICT (name) DO NOTHING @@ -29,6 +34,7 @@ WHERE elected_at = @elected_at::timestamptz AND expires_at >= coalesce(sqlc.narg('now')::timestamptz, now()) AND leader_id = @leader_id + AND name = @name RETURNING *; -- name: LeaderDeleteExpired :execrows @@ -43,11 +49,13 @@ FROM /* TEMPLATE: schema */river_leader; INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce(sqlc.narg('elected_at')::timestamptz, coalesce(sqlc.narg('now')::timestamptz, now())), coalesce(sqlc.narg('expires_at')::timestamptz, coalesce(sqlc.narg('now')::timestamptz, now()) + make_interval(secs => @ttl)), - @leader_id + @leader_id, + @name ) RETURNING *; -- name: LeaderResign :execrows diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go index 1976cf4d..8e787d91 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_leader.sql.go @@ -16,12 +16,14 @@ const leaderAttemptElect = `-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( $1, coalesce($2::timestamptz, now()), -- @ttl is inserted as as seconds rather than a duration because ` + "`" + `lib/pq` + "`" + ` doesn't support the latter - coalesce($2::timestamptz, now()) + make_interval(secs => $3) + coalesce($2::timestamptz, now()) + make_interval(secs => $3), + $4 ) ON CONFLICT (name) DO NOTHING @@ -32,10 +34,16 @@ type LeaderAttemptElectParams struct { LeaderID string Now *time.Time TTL float64 + Name string } func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (*RiverLeader, error) { - row := db.QueryRow(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + row := db.QueryRow(ctx, leaderAttemptElect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) var i RiverLeader err := row.Scan( &i.ElectedAt, @@ -53,6 +61,7 @@ WHERE elected_at = $3::timestamptz AND expires_at >= coalesce($1::timestamptz, now()) AND leader_id = $4 + AND name = $5 RETURNING elected_at, expires_at, leader_id, name ` @@ -61,6 +70,7 @@ type LeaderAttemptReelectParams struct { TTL float64 ElectedAt time.Time LeaderID string + Name string } func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (*RiverLeader, error) { @@ -69,6 +79,7 @@ func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *Leader arg.TTL, arg.ElectedAt, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( @@ -114,11 +125,13 @@ const leaderInsert = `-- name: LeaderInsert :one INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce($1::timestamptz, coalesce($2::timestamptz, now())), coalesce($3::timestamptz, coalesce($2::timestamptz, now()) + make_interval(secs => $4)), - $5 + $5, + $6 ) RETURNING elected_at, expires_at, leader_id, name ` @@ -128,6 +141,7 @@ type LeaderInsertParams struct { ExpiresAt *time.Time TTL float64 LeaderID string + Name string } func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertParams) (*RiverLeader, error) { @@ -137,6 +151,7 @@ func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertPa arg.ExpiresAt, arg.TTL, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql index 6a114720..f810a676 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql @@ -25,15 +25,26 @@ SET RETURNING *; -- name: QueueDeleteExpired :many -DELETE FROM /* TEMPLATE: schema */river_queue -WHERE name IN ( - SELECT name - FROM /* TEMPLATE: schema */river_queue - WHERE river_queue.updated_at < @updated_at_horizon - ORDER BY name ASC - LIMIT @max::bigint +WITH deleted_queues AS ( + DELETE FROM /* TEMPLATE: schema */river_queue + WHERE name IN ( + SELECT name + FROM /* TEMPLATE: schema */river_queue + WHERE river_queue.updated_at < @updated_at_horizon + AND ( + @queues_included::text[] IS NULL + OR name = any(@queues_included) + ) + ORDER BY name ASC + LIMIT @max::bigint + ) + RETURNING * ) -RETURNING *; +-- Uses a CTE only to guarantee return order. +SELECT * +FROM /* TEMPLATE: schema */river_queue +WHERE name IN (SELECT name FROM deleted_queues) +ORDER BY name ASC; -- name: QueueGet :one SELECT * diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go index 0988828b..8df24f5d 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go @@ -57,24 +57,36 @@ func (q *Queries) QueueCreateOrSetUpdatedAt(ctx context.Context, db DBTX, arg *Q } const queueDeleteExpired = `-- name: QueueDeleteExpired :many -DELETE FROM /* TEMPLATE: schema */river_queue -WHERE name IN ( - SELECT name - FROM /* TEMPLATE: schema */river_queue - WHERE river_queue.updated_at < $1 - ORDER BY name ASC - LIMIT $2::bigint +WITH deleted_queues AS ( + DELETE FROM /* TEMPLATE: schema */river_queue + WHERE name IN ( + SELECT name + FROM /* TEMPLATE: schema */river_queue + WHERE river_queue.updated_at < $1 + AND ( + $2::text[] IS NULL + OR name = any($2) + ) + ORDER BY name ASC + LIMIT $3::bigint + ) + RETURNING name, created_at, metadata, paused_at, updated_at ) -RETURNING name, created_at, metadata, paused_at, updated_at +SELECT name, created_at, metadata, paused_at, updated_at +FROM /* TEMPLATE: schema */river_queue +WHERE name IN (SELECT name FROM deleted_queues) +ORDER BY name ASC ` type QueueDeleteExpiredParams struct { UpdatedAtHorizon time.Time + QueuesIncluded []string Max int64 } +// Uses a CTE only to guarantee return order. func (q *Queries) QueueDeleteExpired(ctx context.Context, db DBTX, arg *QueueDeleteExpiredParams) ([]*RiverQueue, error) { - rows, err := db.Query(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, arg.Max) + rows, err := db.Query(ctx, queueDeleteExpired, arg.UpdatedAtHorizon, arg.QueuesIncluded, arg.Max) if err != nil { return nil, err } diff --git a/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.down.sql b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.down.sql new file mode 100644 index 00000000..70d21044 --- /dev/null +++ b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (name = 'default'); \ No newline at end of file diff --git a/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.up.sql b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.up.sql new file mode 100644 index 00000000..b1721078 --- /dev/null +++ b/riverdriver/riverpgxv5/migration/main/007_river_leader_non_default_name.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE /* TEMPLATE: schema */river_leader + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128); \ No newline at end of file diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index e0201a0e..d71a036e 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -275,7 +275,7 @@ func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobD DiscardedFinalizedAtHorizon: params.DiscardedFinalizedAtHorizon, Max: int64(params.Max), QueuesExcluded: params.QueuesExcluded, - QueuesIncluded: params.QueuesIncluded, + QueuesIncluded: sliceutil.NilIfEmpty(params.QueuesIncluded), }) if err != nil { return 0, interpretError(err) @@ -336,8 +336,9 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { jobs, err := dbsqlc.New().JobGetStuck(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetStuckParams{ - Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec - StuckHorizon: params.StuckHorizon, + Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec + QueuesIncluded: sliceutil.NilIfEmpty(params.QueuesIncluded), + StuckHorizon: params.StuckHorizon, }) if err != nil { return nil, interpretError(err) @@ -589,8 +590,9 @@ func (e *Executor) JobRetry(ctx context.Context, params *riverdriver.JobRetryPar func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error) { scheduleResults, err := dbsqlc.New().JobSchedule(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobScheduleParams{ - Max: int64(params.Max), - Now: params.Now, + Max: int64(params.Max), + Now: params.Now, + QueuesIncluded: sliceutil.NilIfEmpty(params.QueuesIncluded), }) if err != nil { return nil, interpretError(err) @@ -704,6 +706,7 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { leader, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -717,6 +720,7 @@ func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver leader, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ ElectedAt: params.ElectedAt, LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -747,6 +751,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI ElectedAt: params.ElectedAt, ExpiresAt: params.ExpiresAt, LeaderID: params.LeaderID, + Name: params.Name, Now: params.Now, TTL: params.TTL.Seconds(), }) @@ -874,11 +879,13 @@ func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverd func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error) { queues, err := dbsqlc.New().QueueDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.QueueDeleteExpiredParams{ Max: int64(params.Max), + QueuesIncluded: sliceutil.NilIfEmpty(params.QueuesIncluded), UpdatedAtHorizon: params.UpdatedAtHorizon, }) if err != nil { return nil, interpretError(err) } + queueNames := make([]string, len(queues)) for i, q := range queues { queueNames[i] = q.Name diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index dfb72dc3..a3091867 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -92,34 +92,19 @@ RETURNING *; -- name: JobDeleteBefore :execresult DELETE FROM /* TEMPLATE: schema */river_job -WHERE - id IN ( - SELECT id - FROM /* TEMPLATE: schema */river_job - WHERE +WHERE id IN ( + SELECT id + FROM /* TEMPLATE: schema */river_job + WHERE ( (state = 'cancelled' AND finalized_at < cast(@cancelled_finalized_at_horizon AS text)) OR (state = 'completed' AND finalized_at < cast(@completed_finalized_at_horizon AS text)) OR (state = 'discarded' AND finalized_at < cast(@discarded_finalized_at_horizon AS text)) - ORDER BY id - LIMIT @max - ) - -- This is really awful, but unless the `sqlc.slice` appears as the very - -- last parameter in the query things will fail if it includes more than one - -- element. The sqlc SQLite driver uses position-based placeholders (?1) for - -- most parameters, but unnamed ones with `sqlc.slice` (?), and when - -- positional parameters follow unnamed parameters great confusion is the - -- result. Making sure `sqlc.slice` is last is the only workaround I could - -- find, but it stops working if there are multiple clauses that need a - -- positional placeholder plus `sqlc.slice` like this one (the Postgres - -- driver supports a `queues_included` parameter that I couldn't support - -- here). The non-workaround version is (unfortunately) to never, ever use - -- the sqlc driver for SQLite -- it's not a little buggy, it's off the - -- charts buggy, and there's little interest from the maintainers in fixing - -- any of it. We already started using it though, so plough on. - AND ( - cast(@queues_excluded_empty AS boolean) - OR river_job.queue NOT IN (sqlc.slice('queues_excluded')) - ); + ) + AND (/* TEMPLATE_BEGIN: queues_excluded_clause */ true /* TEMPLATE_END */) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) + ORDER BY id + LIMIT @max +); -- name: JobDeleteMany :many DELETE FROM /* TEMPLATE: schema */river_job @@ -186,6 +171,7 @@ SELECT * FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < cast(@stuck_horizon AS text) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY id LIMIT @max; @@ -411,6 +397,7 @@ FROM /* TEMPLATE: schema */river_job WHERE state IN ('retryable', 'scheduled') AND scheduled_at <= coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY priority, scheduled_at, diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 1963523d..938f9a94 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -214,34 +214,19 @@ func (q *Queries) JobDelete(ctx context.Context, db DBTX, id int64) (*RiverJob, const jobDeleteBefore = `-- name: JobDeleteBefore :execresult DELETE FROM /* TEMPLATE: schema */river_job -WHERE - id IN ( - SELECT id - FROM /* TEMPLATE: schema */river_job - WHERE +WHERE id IN ( + SELECT id + FROM /* TEMPLATE: schema */river_job + WHERE ( (state = 'cancelled' AND finalized_at < cast(?1 AS text)) OR (state = 'completed' AND finalized_at < cast(?2 AS text)) OR (state = 'discarded' AND finalized_at < cast(?3 AS text)) - ORDER BY id - LIMIT ?4 - ) - -- This is really awful, but unless the ` + "`" + `sqlc.slice` + "`" + ` appears as the very - -- last parameter in the query things will fail if it includes more than one - -- element. The sqlc SQLite driver uses position-based placeholders (?1) for - -- most parameters, but unnamed ones with ` + "`" + `sqlc.slice` + "`" + ` (?), and when - -- positional parameters follow unnamed parameters great confusion is the - -- result. Making sure ` + "`" + `sqlc.slice` + "`" + ` is last is the only workaround I could - -- find, but it stops working if there are multiple clauses that need a - -- positional placeholder plus ` + "`" + `sqlc.slice` + "`" + ` like this one (the Postgres - -- driver supports a ` + "`" + `queues_included` + "`" + ` parameter that I couldn't support - -- here). The non-workaround version is (unfortunately) to never, ever use - -- the sqlc driver for SQLite -- it's not a little buggy, it's off the - -- charts buggy, and there's little interest from the maintainers in fixing - -- any of it. We already started using it though, so plough on. - AND ( - cast(?5 AS boolean) - OR river_job.queue NOT IN (/*SLICE:queues_excluded*/?) - ) + ) + AND (/* TEMPLATE_BEGIN: queues_excluded_clause */ true /* TEMPLATE_END */) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) + ORDER BY id + LIMIT ?4 +) ` type JobDeleteBeforeParams struct { @@ -249,27 +234,15 @@ type JobDeleteBeforeParams struct { CompletedFinalizedAtHorizon string DiscardedFinalizedAtHorizon string Max int64 - QueuesExcludedEmpty bool - QueuesExcluded []string } func (q *Queries) JobDeleteBefore(ctx context.Context, db DBTX, arg *JobDeleteBeforeParams) (sql.Result, error) { - query := jobDeleteBefore - var queryParams []interface{} - queryParams = append(queryParams, arg.CancelledFinalizedAtHorizon) - queryParams = append(queryParams, arg.CompletedFinalizedAtHorizon) - queryParams = append(queryParams, arg.DiscardedFinalizedAtHorizon) - queryParams = append(queryParams, arg.Max) - queryParams = append(queryParams, arg.QueuesExcludedEmpty) - if len(arg.QueuesExcluded) > 0 { - for _, v := range arg.QueuesExcluded { - queryParams = append(queryParams, v) - } - query = strings.Replace(query, "/*SLICE:queues_excluded*/?", strings.Repeat(",?", len(arg.QueuesExcluded))[1:], 1) - } else { - query = strings.Replace(query, "/*SLICE:queues_excluded*/?", "NULL", 1) - } - return db.ExecContext(ctx, query, queryParams...) + return db.ExecContext(ctx, jobDeleteBefore, + arg.CancelledFinalizedAtHorizon, + arg.CompletedFinalizedAtHorizon, + arg.DiscardedFinalizedAtHorizon, + arg.Max, + ) } const jobDeleteMany = `-- name: JobDeleteMany :many @@ -562,6 +535,7 @@ SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finali FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < cast(?1 AS text) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY id LIMIT ?2 ` @@ -1183,6 +1157,7 @@ FROM /* TEMPLATE: schema */river_job WHERE state IN ('retryable', 'scheduled') AND scheduled_at <= coalesce(cast(?1 AS text), datetime('now', 'subsec')) + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY priority, scheduled_at, diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql index 398eadf9..a61552dd 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql @@ -11,11 +11,13 @@ CREATE TABLE river_leader ( INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( @leader_id, coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)) + datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text)), + @name ) ON CONFLICT (name) DO NOTHING @@ -28,6 +30,7 @@ WHERE unixepoch(elected_at, 'subsec') = unixepoch(cast(@elected_at AS text), 'subsec') AND expires_at >= coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')) AND leader_id = @leader_id + AND name = @name RETURNING *; -- name: LeaderDeleteExpired :execrows @@ -42,11 +45,13 @@ FROM /* TEMPLATE: schema */river_leader; INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce(cast(sqlc.narg('elected_at') AS text), cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), coalesce(cast(sqlc.narg('expires_at') AS text), datetime(coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), 'subsec', cast(@ttl as text))), - @leader_id + @leader_id, + @name ) RETURNING *; -- name: LeaderResign :execrows diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go index 780b3ac6..7122ae7e 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_leader.sql.go @@ -13,11 +13,13 @@ const leaderAttemptElect = `-- name: LeaderAttemptElect :one INSERT INTO /* TEMPLATE: schema */river_leader ( leader_id, elected_at, - expires_at + expires_at, + name ) VALUES ( ?1, coalesce(cast(?2 AS text), datetime('now', 'subsec')), - datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)) + datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?3 as text)), + ?4 ) ON CONFLICT (name) DO NOTHING @@ -28,10 +30,16 @@ type LeaderAttemptElectParams struct { LeaderID string Now *string TTL string + Name string } func (q *Queries) LeaderAttemptElect(ctx context.Context, db DBTX, arg *LeaderAttemptElectParams) (*RiverLeader, error) { - row := db.QueryRowContext(ctx, leaderAttemptElect, arg.LeaderID, arg.Now, arg.TTL) + row := db.QueryRowContext(ctx, leaderAttemptElect, + arg.LeaderID, + arg.Now, + arg.TTL, + arg.Name, + ) var i RiverLeader err := row.Scan( &i.ElectedAt, @@ -49,6 +57,7 @@ WHERE unixepoch(elected_at, 'subsec') = unixepoch(cast(?3 AS text), 'subsec') AND expires_at >= coalesce(cast(?1 AS text), datetime('now', 'subsec')) AND leader_id = ?4 + AND name = ?5 RETURNING elected_at, expires_at, leader_id, name ` @@ -57,6 +66,7 @@ type LeaderAttemptReelectParams struct { TTL string ElectedAt string LeaderID string + Name string } func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *LeaderAttemptReelectParams) (*RiverLeader, error) { @@ -65,6 +75,7 @@ func (q *Queries) LeaderAttemptReelect(ctx context.Context, db DBTX, arg *Leader arg.TTL, arg.ElectedAt, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( @@ -110,11 +121,13 @@ const leaderInsert = `-- name: LeaderInsert :one INSERT INTO /* TEMPLATE: schema */river_leader( elected_at, expires_at, - leader_id + leader_id, + name ) VALUES ( coalesce(cast(?1 AS text), cast(?2 AS text), datetime('now', 'subsec')), coalesce(cast(?3 AS text), datetime(coalesce(cast(?2 AS text), datetime('now', 'subsec')), 'subsec', cast(?4 as text))), - ?5 + ?5, + ?6 ) RETURNING elected_at, expires_at, leader_id, name ` @@ -124,6 +137,7 @@ type LeaderInsertParams struct { ExpiresAt *string TTL string LeaderID string + Name string } func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertParams) (*RiverLeader, error) { @@ -133,6 +147,7 @@ func (q *Queries) LeaderInsert(ctx context.Context, db DBTX, arg *LeaderInsertPa arg.ExpiresAt, arg.TTL, arg.LeaderID, + arg.Name, ) var i RiverLeader err := row.Scan( diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql index 1aa7c5fe..f9ceb3ab 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql @@ -30,6 +30,7 @@ WHERE name IN ( SELECT name FROM /* TEMPLATE: schema */river_queue WHERE river_queue.updated_at < @updated_at_horizon + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY name ASC LIMIT @max ) diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go index d9edc714..fed5f0e9 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go @@ -63,6 +63,7 @@ WHERE name IN ( SELECT name FROM /* TEMPLATE: schema */river_queue WHERE river_queue.updated_at < ?1 + AND (/* TEMPLATE_BEGIN: queues_included_clause */ true /* TEMPLATE_END */) ORDER BY name ASC LIMIT ?2 ) diff --git a/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.down.sql b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.down.sql new file mode 100644 index 00000000..4d99b4fa --- /dev/null +++ b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.down.sql @@ -0,0 +1,15 @@ +-- +-- Alter `river_leader` to add a default value of 'default` to `name`. SQLite +-- doesn't allow schema modifications, so this redefines the table entirely. +-- + +DROP TABLE /* TEMPLATE: schema */river_leader; + +CREATE TABLE /* TEMPLATE: schema */river_leader ( + elected_at timestamp NOT NULL, + expires_at timestamp NOT NULL, + leader_id text NOT NULL, + name text PRIMARY KEY NOT NULL DEFAULT 'default' CHECK (name = 'default'), + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) +); \ No newline at end of file diff --git a/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.up.sql b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.up.sql new file mode 100644 index 00000000..a38caa24 --- /dev/null +++ b/riverdriver/riversqlite/migration/main/007_river_leader_non_default_name.up.sql @@ -0,0 +1,16 @@ +-- +-- Alter `river_leader` to remove check constraint that `name` must be +-- `default`. SQLite doesn't allow schema modifications, so this redefines the +-- table entirely. +-- + +DROP TABLE /* TEMPLATE: schema */river_leader; + +CREATE TABLE /* TEMPLATE: schema */river_leader ( + elected_at timestamp NOT NULL, + expires_at timestamp NOT NULL, + leader_id text NOT NULL, + name text PRIMARY KEY NOT NULL DEFAULT 'default', + CONSTRAINT name_length CHECK (length(name) > 0 AND length(name) < 128), + CONSTRAINT leader_id_length CHECK (length(leader_id) > 0 AND length(leader_id) < 128) +); \ No newline at end of file diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 132d0e46..7dcd03fc 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -375,17 +375,25 @@ func (e *Executor) JobDelete(ctx context.Context, params *riverdriver.JobDeleteP } func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error) { - if len(params.QueuesIncluded) > 0 { - return 0, riverdriver.ErrNotImplemented + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := addQueuesClauseSQL(replacements, namedArgs, "queues_excluded_clause", "queue", params.QueuesExcluded, true); err != nil { + return 0, err } + if err := addQueuesClauseSQL(replacements, namedArgs, "queues_included_clause", "queue", sliceutil.NilIfEmpty(params.QueuesIncluded), false); err != nil { + return 0, err + } + + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) res, err := dbsqlc.New().JobDeleteBefore(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobDeleteBeforeParams{ CancelledFinalizedAtHorizon: timeString(params.CancelledFinalizedAtHorizon), CompletedFinalizedAtHorizon: timeString(params.CompletedFinalizedAtHorizon), DiscardedFinalizedAtHorizon: timeString(params.DiscardedFinalizedAtHorizon), Max: int64(params.Max), - QueuesExcluded: params.QueuesExcluded, - QueuesExcludedEmpty: len(params.QueuesExcluded) < 1, // not in the Postgres version, but I couldn't find a way around it }) if err != nil { return 0, interpretError(err) @@ -505,6 +513,17 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job } func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := addQueuesClauseSQL(replacements, namedArgs, "queues_included_clause", "queue", sliceutil.NilIfEmpty(params.QueuesIncluded), false); err != nil { + return nil, err + } + + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) + jobs, err := dbsqlc.New().JobGetStuck(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetStuckParams{ Max: int64(params.Max), StuckHorizon: timeString(params.StuckHorizon), @@ -869,10 +888,24 @@ func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobSched ctx = schemaTemplateParam(ctx, params.Schema) dbtx := templateReplaceWrapper{dbtx: e.driver.UnwrapTx(execTx), replacer: &e.driver.replacer} - eligibleJobs, err := dbsqlc.New().JobScheduleGetEligible(schemaTemplateParam(ctx, params.Schema), dbtx, &dbsqlc.JobScheduleGetEligibleParams{ - Max: int64(params.Max), - Now: timeStringNullable(params.Now), - }) + eligibleJobs, err := func() ([]*dbsqlc.RiverJob, error) { + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := addQueuesClauseSQL(replacements, namedArgs, "queues_included_clause", "queue", sliceutil.NilIfEmpty(params.QueuesIncluded), false); err != nil { + return nil, err + } + + ctx := sqlctemplate.WithReplacementsDup(ctx) // dupe so these new replacements don't leak into queries below (WithReplacements mutates an existing context container rather copies-on-write it) + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) + + return dbsqlc.New().JobScheduleGetEligible(schemaTemplateParam(ctx, params.Schema), dbtx, &dbsqlc.JobScheduleGetEligibleParams{ + Max: int64(params.Max), + Now: timeStringNullable(params.Now), + }) + }() if err != nil { return nil, interpretError(err) } @@ -1102,6 +1135,7 @@ func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpd func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (*riverdriver.Leader, error) { leader, err := dbsqlc.New().LeaderAttemptElect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptElectParams{ LeaderID: params.LeaderID, + Name: params.Name, Now: timeStringNullable(params.Now), TTL: durationAsString(params.TTL), }) @@ -1115,6 +1149,7 @@ func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver leader, err := dbsqlc.New().LeaderAttemptReelect(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderAttemptReelectParams{ ElectedAt: timeString(params.ElectedAt), LeaderID: params.LeaderID, + Name: params.Name, Now: timeStringNullable(params.Now), TTL: durationAsString(params.TTL), }) @@ -1144,6 +1179,7 @@ func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderI leader, err := dbsqlc.New().LeaderInsert(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.LeaderInsertParams{ ElectedAt: timeStringNullable(params.ElectedAt), ExpiresAt: timeStringNullable(params.ExpiresAt), + Name: params.Name, Now: timeStringNullable(params.Now), LeaderID: params.LeaderID, TTL: durationAsString(params.TTL), @@ -1292,6 +1328,17 @@ func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverd } func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error) { + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + if err := addQueuesClauseSQL(replacements, namedArgs, "queues_included_clause", "name", sliceutil.NilIfEmpty(params.QueuesIncluded), false); err != nil { + return nil, err + } + + ctx = sqlctemplate.WithReplacements(ctx, replacements, namedArgs) + queues, err := dbsqlc.New().QueueDeleteExpired(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.QueueDeleteExpiredParams{ Max: int64(params.Max), UpdatedAtHorizon: params.UpdatedAtHorizon.UTC(), @@ -1560,6 +1607,62 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error { return nil } +// addQueuesClauseSQL generates a partial SQL fragment used to checking whether +// a query's queue is included or excluded from a given set of injected queues. +// +// This is really, really awful, and I wish there was a way I could find to +// avoid doing this, but it's the only option I could come up with to work +// around the fact that sqlc is incredibly buggy, and near non-functionally +// buggy when it comes to SQLite. +// +// I tried to use an slice-like inject using `sqlc.slice(...)`, but because that +// generates `?` placeholders without numbers, it's completely incompatible with +// other parameters if it shows up anywhere in the query except for the very +// end. It's sometimes possible to rearrange the query so it appears at the end, +// but even that causes issues in other cases like if you want to use multiple +// `sqlc.slice(...)` calls (e.g. using both a `queues_included` and +// `queues_excluded`). +// +// Next up, you could workaround the problem using a `json_each(@json_array)`, +// but sqlc can't find variables used in a function like this. Not for a good +// reason, but again, just because it's extremely buggy. +// +// So instead, we use a `json_each` approach, but we have to manually inject via +// our home-grown templating system (see the `sqlctemplate` package). It's not +// great, and possibly bad even, but it works. +func addQueuesClauseSQL(replacements map[string]sqlctemplate.Replacement, namedArgs map[string]any, clauseName, columnName string, queues []string, isExcluded bool) error { + if queues == nil { + replacements[clauseName] = sqlctemplate.Replacement{Value: "true"} + return nil + } + + var maybeNot string + if isExcluded { + maybeNot = "NOT " + } + + var ( + paramName = clauseName + "_arg" + clauseSQL = ` + ` + maybeNot + `EXISTS ( + SELECT 1 + FROM json_each(@` + paramName + `) + WHERE json_each.value = ` + columnName + ` + ) + ` + ) + + data, err := json.Marshal(queues) + if err != nil { + return fmt.Errorf("error marshaling queues: %w", err) + } + + replacements[clauseName] = sqlctemplate.Replacement{Value: clauseSQL} + namedArgs[paramName] = data + + return nil +} + func interpretError(err error) error { if errors.Is(err, sql.ErrNoRows) { return rivertype.ErrNotFound diff --git a/riverdriver/riversqlite/river_sqlite_driver_test.go b/riverdriver/riversqlite/river_sqlite_driver_test.go index fcba8f36..612de556 100644 --- a/riverdriver/riversqlite/river_sqlite_driver_test.go +++ b/riverdriver/riversqlite/river_sqlite_driver_test.go @@ -25,6 +25,58 @@ func TestDurationAsString(t *testing.T) { require.Equal(t, "3.255 seconds", durationAsString(3*time.Second+255*time.Millisecond)) } +func TestAddQueuesClauseSQL(t *testing.T) { + t.Parallel() + + t.Run("IsExcludedFalse", func(t *testing.T) { + t.Parallel() + + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + require.NoError(t, addQueuesClauseSQL(replacements, namedArgs, "is_excluded_clause", "queue", []string{"queue_a", "queue_b"}, false)) + + require.Equal(t, map[string]sqlctemplate.Replacement{ + "is_excluded_clause": { + Value: ` + EXISTS ( + SELECT 1 + FROM json_each(@is_excluded_clause_arg) + WHERE json_each.value = queue + ) + `, + }, + }, replacements) + require.Equal(t, `["queue_a","queue_b"]`, string(namedArgs["is_excluded_clause_arg"].([]byte))) //nolint:forcetypeassert + }) + + t.Run("IsExcludedTrue", func(t *testing.T) { + t.Parallel() + + var ( + replacements = make(map[string]sqlctemplate.Replacement) + namedArgs = make(map[string]any) + ) + + require.NoError(t, addQueuesClauseSQL(replacements, namedArgs, "is_excluded_clause", "queue", []string{"queue_a", "queue_b"}, true)) + + require.Equal(t, map[string]sqlctemplate.Replacement{ + "is_excluded_clause": { + Value: ` + NOT EXISTS ( + SELECT 1 + FROM json_each(@is_excluded_clause_arg) + WHERE json_each.value = queue + ) + `, + }, + }, replacements) + require.Equal(t, `["queue_a","queue_b"]`, string(namedArgs["is_excluded_clause_arg"].([]byte))) //nolint:forcetypeassert + }) +} + func TestInterpretError(t *testing.T) { t.Parallel() diff --git a/rivershared/sqlctemplate/sqlc_template.go b/rivershared/sqlctemplate/sqlc_template.go index 88c02edb..cee220e7 100644 --- a/rivershared/sqlctemplate/sqlc_template.go +++ b/rivershared/sqlctemplate/sqlc_template.go @@ -265,6 +265,33 @@ func WithReplacements(ctx context.Context, replacements map[string]Replacement, }) } +// WithReplacementsDup duplicates the template replacements container in context. +// +// Normally, adding replacements modifies an existing container in context. Call +// this function before WithReplacements to create a dup instead. +// +// TODO(brandur): This API isn't great. Reconsider it. Maybe WithReplacements +// should always dup? +func WithReplacementsDup(ctx context.Context) context.Context { + var ( + namedArgs map[string]any + replacements map[string]Replacement + ) + + if container, ok := ctx.Value(contextKey{}).(*contextContainer); ok { + namedArgs = maps.Clone(container.NamedArgs) + replacements = maps.Clone(container.Replacements) + } else { + namedArgs = make(map[string]any) + replacements = make(map[string]Replacement) + } + + return context.WithValue(ctx, contextKey{}, &contextContainer{ + NamedArgs: namedArgs, + Replacements: replacements, + }) +} + // Comparable struct that's used as a key for template caching. type replacerCacheKey struct { namedArgs string // all arg names concatenated together diff --git a/rivershared/testfactory/test_factory.go b/rivershared/testfactory/test_factory.go index 26cf4b88..84f51d85 100644 --- a/rivershared/testfactory/test_factory.go +++ b/rivershared/testfactory/test_factory.go @@ -111,6 +111,7 @@ type LeaderOpts struct { ElectedAt *time.Time ExpiresAt *time.Time LeaderID *string + Name *string Now *time.Time Schema string } @@ -122,6 +123,7 @@ func Leader(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts ElectedAt: opts.ElectedAt, ExpiresAt: opts.ExpiresAt, LeaderID: ptrutil.ValOrDefault(opts.LeaderID, "test-client-id"), + Name: ptrutil.ValOrDefault(opts.Name, "default"), Now: opts.Now, Schema: opts.Schema, TTL: 10 * time.Second, diff --git a/rivershared/util/sliceutil/slice_util.go b/rivershared/util/sliceutil/slice_util.go index c6fac024..34847483 100644 --- a/rivershared/util/sliceutil/slice_util.go +++ b/rivershared/util/sliceutil/slice_util.go @@ -44,6 +44,17 @@ func KeyBy[T any, K comparable, V any](collection []T, tupleFunc func(item T) (K return result } +// NilIfEmpty returns nil if the given slice is empty (zero length) or nil, +// and the original slice otherwise. This is useful when a nil slice has +// different semantics from an empty one, such as SQL array parameters where nil +// means "no filter" but an empty array would match nothing. +func NilIfEmpty[T any](s []T) []T { + if len(s) == 0 { + return nil + } + return s +} + // Map manipulates a slice and transforms it to a slice of another type. func Map[T any, R any](collection []T, mapFunc func(T) R) []R { result := make([]R, len(collection))