@@ -42,6 +42,7 @@ type PartitionDetector interface {
4242
4343type PartitionDetectorConfig struct {
4444 MetaServers []string
45+ RetentionPeriod time.Duration
4546 RpcTimeout time.Duration
4647 DetectInterval time.Duration
4748 PullMetricsTimeout time.Duration
@@ -54,6 +55,7 @@ type PartitionDetectorConfig struct {
5455func LoadPartitionDetectorConfig () * PartitionDetectorConfig {
5556 return & PartitionDetectorConfig {
5657 MetaServers : viper .GetStringSlice ("meta_servers" ),
58+ RetentionPeriod : viper .GetDuration ("hotspot.retention_period" ),
5759 RpcTimeout : viper .GetDuration ("hotspot.rpc_timeout" ),
5860 DetectInterval : viper .GetDuration ("hotspot.partition_detect_interval" ),
5961 PullMetricsTimeout : viper .GetDuration ("hotspot.pull_metrics_timeout" ),
@@ -69,6 +71,10 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig) (PartitionDetector, erro
6971 return nil , fmt .Errorf ("MetaServers should not be empty" )
7072 }
7173
74+ if cfg .RetentionPeriod <= 0 {
75+ return nil , fmt .Errorf ("RetentionPeriod(%d) must be > 0" , cfg .RetentionPeriod )
76+ }
77+
7278 if cfg .DetectInterval <= 0 {
7379 return nil , fmt .Errorf ("DetectInterval(%d) must be > 0" , cfg .DetectInterval )
7480 }
@@ -111,6 +117,12 @@ type partitionDetectorImpl struct {
111117}
112118
113119func (d * partitionDetectorImpl ) Run (tom * tomb.Tomb ) error {
120+ ctx , cancel := context .WithCancel (context .Background ())
121+
122+ var wg sync.WaitGroup
123+ wg .Add (1 )
124+ go d .checkExpiration (ctx , & wg )
125+
114126 ticker := time .NewTicker (d .cfg .DetectInterval )
115127 defer ticker .Stop ()
116128
@@ -119,12 +131,50 @@ func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
119131 case <- ticker .C :
120132 d .detect ()
121133 case <- tom .Dying ():
134+ cancel ()
135+ wg .Wait ()
136+
122137 log .Info ("Hotspot partition detector exited." )
123138 return nil
124139 }
125140 }
126141}
127142
143+ func (d * partitionDetectorImpl ) checkExpiration (ctx context.Context , wg * sync.WaitGroup ) {
144+ defer wg .Done ()
145+
146+ ticker := time .NewTicker (d .cfg .RetentionPeriod )
147+ defer ticker .Stop ()
148+
149+ for {
150+ select {
151+ case <- ticker .C :
152+ d .retireExpiredTables ()
153+
154+ case <- ctx .Done ():
155+ log .Info ("Expiration checker for hotspot exited." )
156+ return
157+ }
158+ }
159+ }
160+
161+ func (d * partitionDetectorImpl ) retireExpiredTables () {
162+ currentTimestampSeconds := time .Now ().Unix ()
163+
164+ d .mtx .Lock ()
165+ defer d .mtx .Unlock ()
166+
167+ log .Info ("check expired tables" )
168+
169+ for key , analyzer := range d .analyzers {
170+ if ! analyzer .isExpired (currentTimestampSeconds ) {
171+ continue
172+ }
173+
174+ delete (d .analyzers , key )
175+ }
176+ }
177+
128178func (d * partitionDetectorImpl ) detect () {
129179 appMap , err := d .aggregate ()
130180 if err != nil {
@@ -369,10 +419,8 @@ func calculateStats(
369419 }
370420
371421 // Only primary replica of a partition will be counted.
372- // TODO(wangdan): support Equal() for base.HostPort.
373422 primary := stats .partitionConfigs [partitionID ].HpPrimary
374- if primary .GetHost () != node .HpNode .GetHost () ||
375- primary .GetPort () != node .HpNode .GetPort () {
423+ if ! node .HpNode .Equal (primary ) {
376424 continue
377425 }
378426
@@ -439,6 +487,10 @@ func calculateHotspotStats(appMap appStatsMap) map[partitionAnalyzerKey][]hotspo
439487func (d * partitionDetectorImpl ) analyse (appMap appStatsMap ) {
440488 hotspotMap := calculateHotspotStats (appMap )
441489
490+ nowTime := time .Now ()
491+ expireTime := nowTime .Add (d .cfg .RetentionPeriod )
492+ expireTimestampSeconds := expireTime .Unix ()
493+
442494 d .mtx .Lock ()
443495 defer d .mtx .Unlock ()
444496
@@ -455,7 +507,7 @@ func (d *partitionDetectorImpl) analyse(appMap appStatsMap) {
455507 d .analyzers [key ] = analyzer
456508 }
457509
458- analyzer .add (value )
510+ analyzer .add (value , expireTimestampSeconds )
459511
460512 // Perform the analysis asynchronously.
461513 go analyzer .analyse ()
@@ -489,13 +541,26 @@ type partitionAnalyzer struct {
489541 appID int32
490542 partitionCount int32
491543 mtx sync.RWMutex
544+ expireTimestampSeconds int64
492545 samples deque.Deque [[]hotspotPartitionStats ] // Each element is a sample of all partitions of the table
493546}
494547
495- func (a * partitionAnalyzer ) add (sample []hotspotPartitionStats ) {
548+ func (a * partitionAnalyzer ) isExpired (currentTimestampSeconds int64 ) bool {
549+ a .mtx .RLock ()
550+ defer a .mtx .RUnlock ()
551+
552+ return currentTimestampSeconds >= a .expireTimestampSeconds
553+ }
554+
555+ func (a * partitionAnalyzer ) add (
556+ sample []hotspotPartitionStats ,
557+ expireTimestampSeconds int64 ,
558+ ) {
496559 a .mtx .Lock ()
497560 defer a .mtx .Unlock ()
498561
562+ a .expireTimestampSeconds = expireTimestampSeconds
563+
499564 for a .samples .Len () >= a .maxSampleSize {
500565 a .samples .PopFront ()
501566 }
0 commit comments