@@ -21,12 +21,14 @@ import (
2121 "context"
2222 "fmt"
2323 "strconv"
24+ "sync"
2425 "time"
2526
2627 "github.com/apache/incubator-pegasus/collector/metrics"
2728 client "github.com/apache/incubator-pegasus/go-client/admin"
2829 "github.com/apache/incubator-pegasus/go-client/idl/admin"
2930 "github.com/apache/incubator-pegasus/go-client/idl/replication"
31+ "github.com/gammazero/deque"
3032 log "github.com/sirupsen/logrus"
3133 "github.com/spf13/viper"
3234 "golang.org/x/sync/errgroup"
@@ -43,6 +45,7 @@ type PartitionDetectorConfig struct {
4345 DetectInterval time.Duration
4446 PullMetricsTimeout time.Duration
4547 SampleMetricsInterval time.Duration
48+ MaxSampleSize int
4649}
4750
4851func LoadPartitionDetectorConfig () * PartitionDetectorConfig {
@@ -52,6 +55,7 @@ func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
5255 DetectInterval : viper .GetDuration ("hotspot.partition_detect_interval" ),
5356 PullMetricsTimeout : viper .GetDuration ("hotspot.pull_metrics_timeout" ),
5457 SampleMetricsInterval : viper .GetDuration ("hotspot.sample_metrics_interval" ),
58+ MaxSampleSize : viper .GetInt ("hotspot.max_sample_size" ),
5559 }
5660}
5761
@@ -78,12 +82,15 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig) (PartitionDetector, erro
7882 }
7983
8084 return & partitionDetectorImpl {
81- cfg : cfg ,
85+ cfg : cfg ,
86+ analyzers : make (map [partitionAnalyzerKey ]* partitionAnalyzer ),
8287 }, nil
8388}
8489
8590type partitionDetectorImpl struct {
86- cfg * PartitionDetectorConfig
91+ cfg * PartitionDetectorConfig
92+ mtx sync.RWMutex
93+ analyzers map [partitionAnalyzerKey ]* partitionAnalyzer
8794}
8895
8996func (d * partitionDetectorImpl ) Run (tom * tomb.Tomb ) error {
@@ -115,44 +122,50 @@ type appStats struct {
115122 appName string
116123 partitionCount int32
117124 partitionConfigs []* replication.PartitionConfiguration
118- partitionStats []map [string ]float64 // {metric_name -> metric_value } for each partition.
125+ partitionStats []map [string ]float64 // {metricName -> metricValue } for each partition.
119126}
120127
121128func (d * partitionDetectorImpl ) aggregate () error {
122- adminClient := client .NewClient (client.Config {
123- MetaServers : d .cfg .MetaServers ,
124- Timeout : d .cfg .RpcTimeout ,
125- })
126- defer adminClient .Close ()
127-
128- // appMap is the final structure that includes all the statistical values.
129- appMap , err := pullTablePartitions (adminClient )
129+ // appMap includes the structures that hold all the final statistical values.
130+ appMap , nodes , err := d .fetchMetadata ()
130131 if err != nil {
131132 return err
132133 }
133134
134- err = d .aggregateMetrics (adminClient , appMap )
135+ err = d .aggregateMetrics (appMap , nodes )
135136 if err != nil {
136137 return err
137138 }
138139
140+ d .addHotspotSamples (appMap )
141+
139142 return nil
140143}
141144
142- // Pull metadata of all available tables with all their partitions and form the final structure
143- // that includes all the statistical values.
144- func pullTablePartitions (adminClient client.Client ) (appStatsMap , error ) {
145+ // Fetch necessary metadata from meta server for the aggregation of metrics, including:
146+ // - the metadata of all available tables with all their partitions, and
147+ // - the node information of all replica servers.
148+ // Also, the returned appStatsMap includes the structures that hold all the final
149+ // statistical values.
150+ func (d * partitionDetectorImpl ) fetchMetadata () (appStatsMap , []* admin.NodeInfo , error ) {
151+ adminClient := client .NewClient (client.Config {
152+ MetaServers : d .cfg .MetaServers ,
153+ Timeout : d .cfg .RpcTimeout ,
154+ })
155+ defer adminClient .Close ()
156+
157+ // Fetch the information of all available tables.
145158 tables , err := adminClient .ListTables ()
146159 if err != nil {
147- return nil , err
160+ return nil , nil , err
148161 }
149162
150163 appMap := make (appStatsMap )
151164 for _ , table := range tables {
152165 // Query metadata for each partition of each table.
153166 appID , partitionCount , partitionConfigs , err := adminClient .QueryConfig (table .AppName )
154167 if err != nil {
155- return nil , err
168+ return nil , nil , err
156169 }
157170
158171 // Initialize statistical value for each partition.
@@ -170,20 +183,22 @@ func pullTablePartitions(adminClient client.Client) (appStatsMap, error) {
170183 }
171184 }
172185
173- return appMap , nil
174- }
175-
176- type aggregator func (map [string ]float64 , string , float64 )
177-
178- // Pull metrics from all nodes and aggregate them to produce the statistics.
179- func (d * partitionDetectorImpl ) aggregateMetrics (adminClient client.Client , appMap appStatsMap ) error {
186+ // Fetch the node information of all replica servers.
180187 nodes , err := adminClient .ListNodes ()
181188 if err != nil {
182- return err
189+ return nil , nil , err
183190 }
184191
185- // Pull multiple results of metrics to perform cumulative calculation to produce the
186- // statistics such as QPS.
192+ return appMap , nodes , nil
193+ }
194+
195+ type aggregator func (map [string ]float64 , string , float64 )
196+
197+ // Pull metric samples from nodes and aggregate them to produce the final statistical results
198+ // into appMap.
199+ func (d * partitionDetectorImpl ) aggregateMetrics (appMap appStatsMap , nodes []* admin.NodeInfo ) error {
200+ // Pull multiple samples of metrics to perform cumulative calculation to produce the
201+ // statistical results such as QPS.
187202 startSnapshots , err := d .pullMetrics (nodes )
188203 if err != nil {
189204 return err
@@ -204,7 +219,7 @@ func (d *partitionDetectorImpl) aggregateMetrics(adminClient client.Client, appM
204219
205220 d .calculateStats (snapshot , nodes [i ],
206221 func (stats map [string ]float64 , key string , operand float64 ) {
207- // Just set the ending number of requests.
222+ // Just set the number of requests with ending snapshot .
208223 stats [key ] = operand
209224 },
210225 appMap )
@@ -220,7 +235,7 @@ func (d *partitionDetectorImpl) aggregateMetrics(adminClient client.Client, appM
220235 return
221236 }
222237
223- // Calculate QPS based on the ending number of requests that have been
238+ // Calculate QPS based on ending snapshot that have been
224239 // set previously.
225240 stats [key ] = (value - operand ) / duration .Seconds ()
226241 }
@@ -348,3 +363,93 @@ func (d *partitionDetectorImpl) calculateStats(
348363 }
349364 }
350365}
366+
367+ // Since the partition number of a table might be changed, use (appID, partitionCount)
368+ // pair as the key for each table.
369+ type partitionAnalyzerKey struct {
370+ appID int32
371+ partitionCount int32
372+ }
373+
374+ const (
375+ readHotspotData = iota
376+ writeHotspotData
377+ operateHotspotDataNumber
378+ )
379+
380+ // hotspotPartitionStats holds all the statistical values of each partition, used for analysis
381+ // of hotspot partitions.
382+ type hotspotPartitionStats struct {
383+ totalQPS [operateHotspotDataNumber ]float64
384+ }
385+
386+ // Receive statistical values of all kinds of reads and writes, and by aggregating them we
387+ // can obtain the overall statistics of reads and writes.
388+ func calculateHotspotStats (appMap appStatsMap ) map [partitionAnalyzerKey ][]hotspotPartitionStats {
389+ results := make (map [partitionAnalyzerKey ][]hotspotPartitionStats )
390+ for appID , stats := range appMap {
391+ partitionCount := len (stats .partitionStats )
392+ value := make ([]hotspotPartitionStats , 0 , partitionCount )
393+
394+ for _ , partitionStats := range stats .partitionStats {
395+ var hotspotStats hotspotPartitionStats
396+
397+ // Calculate total QPS over all kinds of reads.
398+ for _ , metricName := range readMetricNames {
399+ hotspotStats .totalQPS [readHotspotData ] += partitionStats [metricName ]
400+ }
401+
402+ // Calculate total QPS over all kinds of writes.
403+ for _ , metricName := range writeMetricNames {
404+ hotspotStats .totalQPS [writeHotspotData ] += partitionStats [metricName ]
405+ }
406+
407+ value = append (value , hotspotStats )
408+ }
409+
410+ key := partitionAnalyzerKey {appID : appID , partitionCount : int32 (partitionCount )}
411+ results [key ] = value
412+ }
413+
414+ return results
415+ }
416+
417+ // Calculate statistical values over multiples tables with all partitions of each table as
418+ // a sample used for future analysis of hotspot partitions.
419+ func (d * partitionDetectorImpl ) addHotspotSamples (appMap appStatsMap ) {
420+ hotspotMap := calculateHotspotStats (appMap )
421+
422+ d .mtx .Lock ()
423+ defer d .mtx .Unlock ()
424+
425+ for key , value := range hotspotMap {
426+ analyzer , ok := d .analyzers [key ]
427+ if ! ok {
428+ analyzer = newPartitionAnalyzer (d .cfg .MaxSampleSize )
429+ d .analyzers [key ] = analyzer
430+ }
431+
432+ analyzer .addSample (value )
433+ }
434+ }
435+
436+ func newPartitionAnalyzer (maxSampleSize int ) * partitionAnalyzer {
437+ return & partitionAnalyzer {maxSampleSize : maxSampleSize }
438+ }
439+
440+ // partitionAnalyzer holds the samples for all partitions of a table and analyses hotspot
441+ // partitions based on them.
442+ type partitionAnalyzer struct {
443+ // TODO(wangdan): bump gammazero/deque to the lastest version after upgrading Go to 1.23+,
444+ // since older Go versions do not support the `Deque.Iter()` iterator interface.
445+ maxSampleSize int
446+ samples deque.Deque [[]hotspotPartitionStats ] // Each element is a sample of all partitions of the table
447+ }
448+
449+ func (a * partitionAnalyzer ) addSample (sample []hotspotPartitionStats ) {
450+ for a .samples .Len () >= a .maxSampleSize {
451+ a .samples .PopFront ()
452+ }
453+
454+ a .samples .PushBack (sample )
455+ }
0 commit comments