Skip to content

Commit cacec6a

Browse files
committed
fix and refactor
1 parent a9c8c0d commit cacec6a

1 file changed

Lines changed: 42 additions & 35 deletions

File tree

collector/hotspot/partition_detector.go

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig) (PartitionDetector, erro
8989
type partitionDetectorImpl struct {
9090
cfg *PartitionDetectorConfig
9191
mtx sync.RWMutex
92-
analyzers map[partitionAnalyzerKey]*partitionAnalyzer // {-> partitionAnalyzer}
92+
analyzers map[partitionAnalyzerKey]*partitionAnalyzer
9393
}
9494

9595
func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
@@ -121,17 +121,17 @@ type appStats struct {
121121
appName string
122122
partitionCount int32
123123
partitionConfigs []*replication.PartitionConfiguration
124-
partitionStats []map[string]float64 // {metric_name -> metric_value} for each partition.
124+
partitionStats []map[string]float64 // {metricName -> metricValue} for each partition.
125125
}
126126

127127
func (d *partitionDetectorImpl) aggregate() error {
128-
adminClient := client.NewClient(client.Config{
129-
MetaServers: d.cfg.MetaServers,
130-
Timeout: d.cfg.RpcTimeout,
131-
})
132-
defer adminClient.Close()
128+
// appMap includes the structures that hold all the final statistical values.
129+
appMap, nodes, err := d.fetchMetadata()
130+
if err != nil {
131+
return err
132+
}
133133

134-
appMap, err := d.aggregateMetrics(adminClient)
134+
err = d.aggregateMetrics(appMap, nodes)
135135
if err != nil {
136136
return err
137137
}
@@ -141,20 +141,30 @@ func (d *partitionDetectorImpl) aggregate() error {
141141
return nil
142142
}
143143

144-
// Pull metadata of all available tables with all their partitions and form the final structure
145-
// that includes all the statistical values.
146-
func pullTablePartitions(adminClient client.Client) (appStatsMap, error) {
144+
// Fetch necessary metadata from meta server for the aggregation of metrics, including:
145+
// - the metadata of all available tables with all their partitions, and
146+
// - the node information of all replica servers.
147+
// Also, the returned appStatsMap includes the structures that hold all the final
148+
// statistical values.
149+
func (d *partitionDetectorImpl) fetchMetadata() (appStatsMap, []*admin.NodeInfo, error) {
150+
adminClient := client.NewClient(client.Config{
151+
MetaServers: d.cfg.MetaServers,
152+
Timeout: d.cfg.RpcTimeout,
153+
})
154+
defer adminClient.Close()
155+
156+
// Fetch the information of all available tables.
147157
tables, err := adminClient.ListTables()
148158
if err != nil {
149-
return nil, err
159+
return nil, nil, err
150160
}
151161

152162
appMap := make(appStatsMap)
153163
for _, table := range tables {
154164
// Query metadata for each partition of each table.
155165
appID, partitionCount, partitionConfigs, err := adminClient.QueryConfig(table.AppName)
156166
if err != nil {
157-
return nil, err
167+
return nil, nil, err
158168
}
159169

160170
// Initialize statistical value for each partition.
@@ -172,47 +182,43 @@ func pullTablePartitions(adminClient client.Client) (appStatsMap, error) {
172182
}
173183
}
174184

175-
return appMap, nil
176-
}
177-
178-
type aggregator func(map[string]float64, string, float64)
179-
180-
// Pull metrics from all nodes and aggregate them to produce the statistics.
181-
func (d *partitionDetectorImpl) aggregateMetrics(adminClient client.Client) (appStatsMap, error) {
182-
// appMap is the final structure that includes all the statistical values.
183-
appMap, err := pullTablePartitions(adminClient)
184-
if err != nil {
185-
return nil, err
186-
}
187-
185+
// Fetch the node information of all replica servers.
188186
nodes, err := adminClient.ListNodes()
189187
if err != nil {
190-
return nil, err
188+
return nil, nil, err
191189
}
192190

193-
// Pull multiple results of metrics to perform cumulative calculation to produce the
194-
// statistics such as QPS.
191+
return appMap, nodes, nil
192+
}
193+
194+
type aggregator func(map[string]float64, string, float64)
195+
196+
// Pull metric samples from nodes and aggregate them to produce the final statistical results
197+
// into appMap.
198+
func (d *partitionDetectorImpl) aggregateMetrics(appMap appStatsMap, nodes []*admin.NodeInfo) error {
199+
// Pull multiple samples of metrics to perform cumulative calculation to produce the
200+
// statistical results such as QPS.
195201
startSnapshots, err := d.pullMetrics(nodes)
196202
if err != nil {
197-
return nil, err
203+
return err
198204
}
199205

200206
time.Sleep(d.cfg.SampleMetricsInterval)
201207

202208
endSnapshots, err := d.pullMetrics(nodes)
203209
if err != nil {
204-
return nil, err
210+
return err
205211
}
206212

207213
for i, snapshot := range endSnapshots {
208214
if snapshot.TimestampNS <= startSnapshots[i].TimestampNS {
209-
return nil, fmt.Errorf("end timestamp (%d) must be greater than start timestamp (%d)",
215+
return fmt.Errorf("end timestamp (%d) must be greater than start timestamp (%d)",
210216
snapshot.TimestampNS, startSnapshots[i].TimestampNS)
211217
}
212218

213219
d.calculateStats(snapshot, nodes[i],
214220
func(stats map[string]float64, key string, operand float64) {
215-
// Just set the ending number of requests.
221+
// Just set the number of requests with ending snapshot.
216222
stats[key] = operand
217223
},
218224
appMap)
@@ -228,15 +234,15 @@ func (d *partitionDetectorImpl) aggregateMetrics(adminClient client.Client) (app
228234
return
229235
}
230236

231-
// Calculate QPS based on the ending number of requests that have been
237+
// Calculate QPS based on ending snapshot that have been
232238
// set previously.
233239
stats[key] = (value - operand) / duration.Seconds()
234240
}
235241
}(time.Duration(endSnapshots[i].TimestampNS-snapshot.TimestampNS)),
236242
appMap)
237243
}
238244

239-
return appMap, nil
245+
return nil
240246
}
241247

242248
var (
@@ -405,6 +411,7 @@ func (d *partitionDetectorImpl) addHotspotStats(appMap appStatsMap) {
405411
analyzer, ok := d.analyzers[key]
406412
if !ok {
407413
analyzer = newPartitionAnalyzer(d.cfg.MaxSampleSize)
414+
d.analyzers[key] = analyzer
408415
}
409416

410417
analyzer.add(value)

0 commit comments

Comments
 (0)