Skip to content

Commit 7f711c1

Browse files
committed
feat: add leader election for Counter component
1 parent ba16591 commit 7f711c1

2 files changed

Lines changed: 101 additions & 8 deletions

File tree

pkg/console/counter/component.go

Lines changed: 96 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
package counter
1919

2020
import (
21+
"context"
2122
"fmt"
2223
"math"
24+
"sync/atomic"
2325

26+
storecfg "github.com/apache/dubbo-admin/pkg/config/store"
2427
"github.com/apache/dubbo-admin/pkg/core/events"
28+
"github.com/apache/dubbo-admin/pkg/core/leader"
2529
"github.com/apache/dubbo-admin/pkg/core/logger"
2630
meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
2731
resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
@@ -50,7 +54,13 @@ func (c *managerComponent) RequiredDependencies() []runtime.ComponentType {
5054
}
5155

5256
type managerComponent struct {
53-
manager CounterManager
57+
manager CounterManager
58+
leaderElection *leader.LeaderElection
59+
needsLeaderElection bool
60+
isLeader atomic.Bool
61+
bound atomic.Bool
62+
storeRouter store.Router
63+
bus events.EventBus
5464
}
5565

5666
func (c *managerComponent) Type() runtime.ComponentType {
@@ -61,13 +71,44 @@ func (c *managerComponent) Order() int {
6171
return math.MaxInt - 1
6272
}
6373

64-
func (c *managerComponent) Init(runtime.BuilderContext) error {
74+
func (c *managerComponent) Init(ctx runtime.BuilderContext) error {
6575
mgr := NewCounterManager()
6676
c.manager = mgr
77+
78+
// Memory store runs single-replica; leader election is not needed.
79+
if ctx.Config().Store.Type == storecfg.Memory {
80+
return nil
81+
}
82+
83+
storeComponent, err := ctx.GetActivatedComponent(runtime.ResourceStore)
84+
if err != nil {
85+
return nil
86+
}
87+
dbSrc, ok := storeComponent.(leader.DBSource)
88+
if !ok {
89+
return nil
90+
}
91+
db, hasDB := dbSrc.GetDB()
92+
if !hasDB {
93+
return nil
94+
}
95+
holderID, err := leader.GenerateHolderID()
96+
if err != nil {
97+
logger.Warnf("counter: failed to generate holder ID, skipping leader election: %v", err)
98+
return nil
99+
}
100+
le := leader.NewLeaderElection(db, string(ComponentType), holderID)
101+
if err := le.EnsureTable(); err != nil {
102+
logger.Warnf("counter: failed to ensure leader lease table: %v", err)
103+
return nil
104+
}
105+
c.leaderElection = le
106+
c.needsLeaderElection = true
107+
logger.Infof("counter: leader election initialized (holder: %s)", holderID)
67108
return nil
68109
}
69110

70-
func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error {
111+
func (c *managerComponent) Start(rt runtime.Runtime, ch <-chan struct{}) error {
71112
storeComponent, err := rt.GetComponent(runtime.ResourceStore)
72113
if err != nil {
73114
return err
@@ -76,10 +117,7 @@ func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error {
76117
if !ok {
77118
return fmt.Errorf("component %s does not implement store.Router", runtime.ResourceStore)
78119
}
79-
80-
if err := c.initializeCountsFromStore(storeRouter); err != nil {
81-
logger.Warnf("Failed to initialize counter manager from store: %v", err)
82-
}
120+
c.storeRouter = storeRouter
83121

84122
component, err := rt.GetComponent(runtime.EventBus)
85123
if err != nil {
@@ -89,7 +127,57 @@ func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error {
89127
if !ok {
90128
return fmt.Errorf("component %s does not implement events.EventBus", runtime.EventBus)
91129
}
92-
return c.manager.Bind(bus)
130+
c.bus = bus
131+
132+
if !c.needsLeaderElection {
133+
return c.startBusinessLogic()
134+
}
135+
136+
ctx, cancel := context.WithCancel(context.Background())
137+
defer cancel()
138+
139+
go func() {
140+
<-ch
141+
cancel()
142+
}()
143+
144+
c.leaderElection.RunLeaderElection(ctx, ch,
145+
func() { // onStartLeading
146+
logger.Infof("counter: became leader, starting business logic")
147+
c.isLeader.Store(true)
148+
if err := c.startBusinessLogic(); err != nil {
149+
logger.Errorf("counter: failed to start business logic: %v", err)
150+
}
151+
},
152+
func() { // onStopLeading
153+
logger.Warnf("counter: lost leadership, resetting counters")
154+
c.isLeader.Store(false)
155+
c.manager.Reset()
156+
},
157+
)
158+
159+
return nil
160+
}
161+
162+
// startBusinessLogic initializes counts from store and binds to EventBus.
163+
// When re-elected, it resets and re-initializes counts; Bind is called only once.
164+
func (c *managerComponent) startBusinessLogic() error {
165+
c.manager.Reset()
166+
// Wire up leader guard so event handler skips processing when not leader.
167+
if c.needsLeaderElection {
168+
cm := c.manager.(*counterManager)
169+
cm.isLeader = &c.isLeader
170+
}
171+
if err := c.initializeCountsFromStore(c.storeRouter); err != nil {
172+
logger.Warnf("Failed to initialize counter manager from store: %v", err)
173+
}
174+
if !c.bound.Load() {
175+
if err := c.manager.Bind(c.bus); err != nil {
176+
return err
177+
}
178+
c.bound.Store(true)
179+
}
180+
return nil
93181
}
94182

95183
func (c *managerComponent) initializeCountsFromStore(storeRouter store.Router) error {

pkg/console/counter/manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package counter
1919

2020
import (
2121
"fmt"
22+
"sync/atomic"
2223

2324
"k8s.io/client-go/tools/cache"
2425

@@ -59,6 +60,7 @@ type counterManager struct {
5960
simpleCounters map[resmodel.ResourceKind]*Counter
6061
distributionConfigs map[resmodel.ResourceKind][]*distributionCounterConfig
6162
distributionByType map[CounterType]*DistributionCounter
63+
isLeader *atomic.Bool
6264
}
6365

6466
func NewCounterManager() CounterManager {
@@ -193,6 +195,9 @@ func (cm *counterManager) Bind(bus events.EventBus) error {
193195
}
194196

195197
func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event events.Event) error {
198+
if cm.isLeader != nil && !cm.isLeader.Load() {
199+
return nil
200+
}
196201
logger.Debugf("CounterManager handling %s event, type: %s", kind, event.Type())
197202
if counter := cm.simpleCounters[kind]; counter != nil {
198203
processSimpleCounter(counter, event)

0 commit comments

Comments
 (0)