Skip to content

Commit e9494f1

Browse files
committed
refactor
1 parent cacec6a commit e9494f1

1 file changed

Lines changed: 25 additions & 7 deletions

File tree

collector/hotspot/partition_detector.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (d *partitionDetectorImpl) aggregate() error {
136136
return err
137137
}
138138

139-
d.addHotspotStats(appMap)
139+
d.addHotspotSamples(appMap)
140140

141141
return nil
142142
}
@@ -363,6 +363,8 @@ func (d *partitionDetectorImpl) calculateStats(
363363
}
364364
}
365365

366+
// Since the partition number of a table might be changed, use (appID, partitionCount)
367+
// pair as the key for each table.
366368
type partitionAnalyzerKey struct {
367369
appID int32
368370
partitionCount int32
@@ -374,34 +376,46 @@ const (
374376
operateHotspotDataNumber
375377
)
376378

379+
// hotspotPartitionStats holds all the statistical values of each partition, used for analysis
380+
// of hotspot partitions.
377381
type hotspotPartitionStats struct {
378382
totalQPS [operateHotspotDataNumber]float64
379383
}
380384

385+
// Receive statistical values of all kinds of reads and writes, and by aggregating them we
386+
// can obtain the overall statistics of reads and writes.
381387
func calculateHotspotStats(appMap appStatsMap) map[partitionAnalyzerKey][]hotspotPartitionStats {
382388
results := make(map[partitionAnalyzerKey][]hotspotPartitionStats)
383389
for appID, stats := range appMap {
384-
value := make([]hotspotPartitionStats, 0, len(stats.partitionStats))
390+
partitionCount := len(stats.partitionStats)
391+
value := make([]hotspotPartitionStats, 0, partitionCount)
385392

386393
for _, partitionStats := range stats.partitionStats {
387394
var hotspotStats hotspotPartitionStats
395+
396+
// Calculate total QPS over all kinds of reads.
388397
for _, metricName := range readMetricNames {
389398
hotspotStats.totalQPS[readHotspotData] += partitionStats[metricName]
390399
}
400+
401+
// Calculate total QPS over all kinds of writes.
391402
for _, metricName := range writeMetricNames {
392403
hotspotStats.totalQPS[writeHotspotData] += partitionStats[metricName]
393404
}
405+
394406
value = append(value, hotspotStats)
395407
}
396408

397-
key := partitionAnalyzerKey{appID: appID, partitionCount: int32(len(stats.partitionStats))}
409+
key := partitionAnalyzerKey{appID: appID, partitionCount: int32(partitionCount)}
398410
results[key] = value
399411
}
400412

401413
return results
402414
}
403415

404-
func (d *partitionDetectorImpl) addHotspotStats(appMap appStatsMap) {
416+
// Calculate statistical values over multiples tables with all partitions of each table as
417+
// a sample used for future analysis of hotspot partitions.
418+
func (d *partitionDetectorImpl) addHotspotSamples(appMap appStatsMap) {
405419
hotspotMap := calculateHotspotStats(appMap)
406420

407421
d.mtx.Lock()
@@ -414,20 +428,24 @@ func (d *partitionDetectorImpl) addHotspotStats(appMap appStatsMap) {
414428
d.analyzers[key] = analyzer
415429
}
416430

417-
analyzer.add(value)
431+
analyzer.addSample(value)
418432
}
419433
}
420434

421435
func newPartitionAnalyzer(maxSampleSize int) *partitionAnalyzer {
422436
return &partitionAnalyzer{maxSampleSize: maxSampleSize}
423437
}
424438

439+
// partitionAnalyzer holds the samples for all partitions of a table and analyses hotspot
440+
// partitions based on them.
425441
type partitionAnalyzer struct {
442+
// TODO(wangdan): bump gammazero/deque to the lastest version after upgrading Go to 1.23+,
443+
// since older Go versions do not support the `Deque.Iter()` iterator interface.
426444
maxSampleSize int
427-
samples deque.Deque[[]hotspotPartitionStats]
445+
samples deque.Deque[[]hotspotPartitionStats] // Each element is a sample of all partitions of the table
428446
}
429447

430-
func (a *partitionAnalyzer) add(sample []hotspotPartitionStats) {
448+
func (a *partitionAnalyzer) addSample(sample []hotspotPartitionStats) {
431449
for a.samples.Len() >= a.maxSampleSize {
432450
a.samples.PopFront()
433451
}

0 commit comments

Comments
 (0)