1818package hotspot
1919
2020import (
21+ "context"
22+ "fmt"
23+ "strconv"
2124 "time"
2225
26+ "github.com/apache/incubator-pegasus/collector/metrics"
27+ client "github.com/apache/incubator-pegasus/go-client/admin"
28+ "github.com/apache/incubator-pegasus/go-client/idl/admin"
29+ "github.com/apache/incubator-pegasus/go-client/idl/replication"
2330 log "github.com/sirupsen/logrus"
31+ "github.com/spf13/viper"
32+ "golang.org/x/sync/errgroup"
2433 "gopkg.in/tomb.v2"
2534)
2635
@@ -29,23 +38,61 @@ type PartitionDetector interface {
2938}
3039
3140type PartitionDetectorConfig struct {
32- DetectInterval time.Duration
41+ MetaServers []string
42+ RpcTimeout time.Duration
43+ DetectInterval time.Duration
44+ PullMetricsTimeout time.Duration
45+ SampleMetricsInterval time.Duration
3346}
3447
35- func NewPartitionDetector (conf PartitionDetectorConfig ) PartitionDetector {
36- return & partitionDetector {
37- detectInterval : conf .DetectInterval ,
48+ func LoadPartitionDetectorConfig () * PartitionDetectorConfig {
49+ return & PartitionDetectorConfig {
50+ MetaServers : viper .GetStringSlice ("meta_servers" ),
51+ RpcTimeout : viper .GetDuration ("hotspot.rpc_timeout" ),
52+ DetectInterval : viper .GetDuration ("hotspot.partition_detect_interval" ),
53+ PullMetricsTimeout : viper .GetDuration ("hotspot.pull_metrics_timeout" ),
54+ SampleMetricsInterval : viper .GetDuration ("hotspot.sample_metrics_interval" ),
3855 }
3956}
4057
41- type partitionDetector struct {
42- detectInterval time.Duration
58+ func NewPartitionDetector (cfg * PartitionDetectorConfig ) (PartitionDetector , error ) {
59+ if len (cfg .MetaServers ) == 0 {
60+ return nil , fmt .Errorf ("MetaServers should not be empty" )
61+ }
62+
63+ if cfg .DetectInterval <= 0 {
64+ return nil , fmt .Errorf ("DetectInterval(%d) must be > 0" , cfg .DetectInterval )
65+ }
66+
67+ if cfg .PullMetricsTimeout <= 0 {
68+ return nil , fmt .Errorf ("PullMetricsTimeout(%d) must be > 0" , cfg .PullMetricsTimeout )
69+ }
70+
71+ if cfg .SampleMetricsInterval <= 0 {
72+ return nil , fmt .Errorf ("SampleMetricsInterval(%d) must be > 0" , cfg .SampleMetricsInterval )
73+ }
74+
75+ if cfg .DetectInterval <= cfg .SampleMetricsInterval {
76+ return nil , fmt .Errorf ("DetectInterval(%d) must be > SampleMetricsInterval(%d)" ,
77+ cfg .DetectInterval , cfg .SampleMetricsInterval )
78+ }
79+
80+ return & partitionDetectorImpl {
81+ cfg : cfg ,
82+ }, nil
83+ }
84+
85+ type partitionDetectorImpl struct {
86+ cfg * PartitionDetectorConfig
4387}
4488
45- func (d * partitionDetector ) Run (tom * tomb.Tomb ) error {
89+ func (d * partitionDetectorImpl ) Run (tom * tomb.Tomb ) error {
90+ ticker := time .NewTicker (d .cfg .DetectInterval )
91+ defer ticker .Stop ()
92+
4693 for {
4794 select {
48- case <- time . After ( d . detectInterval ) :
95+ case <- ticker . C :
4996 d .detect ()
5097 case <- tom .Dying ():
5198 log .Info ("Hotspot partition detector exited." )
@@ -54,5 +101,250 @@ func (d *partitionDetector) Run(tom *tomb.Tomb) error {
54101 }
55102}
56103
57- func (d * partitionDetector ) detect () {
104+ func (d * partitionDetectorImpl ) detect () {
105+ err := d .aggregate ()
106+ if err != nil {
107+ log .Error ("failed to aggregate metrics for hotspot: " , err )
108+ }
109+ }
110+
111+ // {appID -> appStats}.
112+ type appStatsMap map [int32 ]appStats
113+
114+ type appStats struct {
115+ appName string
116+ partitionCount int32
117+ partitionConfigs []* replication.PartitionConfiguration
118+ partitionStats []map [string ]float64 // {metric_name -> metric_value} for each partition.
119+ }
120+
121+ func (d * partitionDetectorImpl ) aggregate () error {
122+ adminClient := client .NewClient (client.Config {
123+ MetaServers : d .cfg .MetaServers ,
124+ Timeout : d .cfg .RpcTimeout ,
125+ })
126+ defer adminClient .Close ()
127+
128+ // appMap is the final structure that includes all the statistical values.
129+ appMap , err := pullTablePartitions (adminClient )
130+ if err != nil {
131+ return err
132+ }
133+
134+ err = d .aggregateMetrics (adminClient , appMap )
135+ if err != nil {
136+ return err
137+ }
138+
139+ return nil
140+ }
141+
142+ // Pull metadata of all available tables with all their partitions and form the final structure
143+ // that includes all the statistical values.
144+ func pullTablePartitions (adminClient client.Client ) (appStatsMap , error ) {
145+ tables , err := adminClient .ListTables ()
146+ if err != nil {
147+ return nil , err
148+ }
149+
150+ appMap := make (appStatsMap )
151+ for _ , table := range tables {
152+ // Query metadata for each partition of each table.
153+ appID , partitionCount , partitionConfigs , err := adminClient .QueryConfig (table .AppName )
154+ if err != nil {
155+ return nil , err
156+ }
157+
158+ // Initialize statistical value for each partition.
159+ partitionStats := make ([]map [string ]float64 , 0 , len (partitionConfigs ))
160+ for range partitionConfigs {
161+ m := make (map [string ]float64 )
162+ partitionStats = append (partitionStats , m )
163+ }
164+
165+ appMap [appID ] = appStats {
166+ appName : table .AppName ,
167+ partitionCount : partitionCount ,
168+ partitionConfigs : partitionConfigs ,
169+ partitionStats : partitionStats ,
170+ }
171+ }
172+
173+ return appMap , nil
174+ }
175+
176+ type aggregator func (map [string ]float64 , string , float64 )
177+
178+ // Pull metrics from all nodes and aggregate them to produce the statistics.
179+ func (d * partitionDetectorImpl ) aggregateMetrics (adminClient client.Client , appMap appStatsMap ) error {
180+ nodes , err := adminClient .ListNodes ()
181+ if err != nil {
182+ return err
183+ }
184+
185+ // Pull multiple results of metrics to perform cumulative calculation to produce the
186+ // statistics such as QPS.
187+ startSnapshots , err := d .pullMetrics (nodes )
188+ if err != nil {
189+ return err
190+ }
191+
192+ time .Sleep (d .cfg .SampleMetricsInterval )
193+
194+ endSnapshots , err := d .pullMetrics (nodes )
195+ if err != nil {
196+ return err
197+ }
198+
199+ for i , snapshot := range endSnapshots {
200+ if snapshot .TimestampNS <= startSnapshots [i ].TimestampNS {
201+ return fmt .Errorf ("end timestamp (%d) must be greater than start timestamp (%d)" ,
202+ snapshot .TimestampNS , startSnapshots [i ].TimestampNS )
203+ }
204+
205+ d .calculateStats (snapshot , nodes [i ],
206+ func (stats map [string ]float64 , key string , operand float64 ) {
207+ // Just set the ending number of requests.
208+ stats [key ] = operand
209+ },
210+ appMap )
211+ }
212+
213+ for i , snapshot := range startSnapshots {
214+ d .calculateStats (snapshot , nodes [i ],
215+ func (duration time.Duration ) aggregator {
216+ return func (stats map [string ]float64 , key string , operand float64 ) {
217+ value , ok := stats [key ]
218+ if ! ok || value < operand {
219+ stats [key ] = 0
220+ return
221+ }
222+
223+ // Calculate QPS based on the ending number of requests that have been
224+ // set previously.
225+ stats [key ] = (value - operand ) / duration .Seconds ()
226+ }
227+ }(time .Duration (endSnapshots [i ].TimestampNS - snapshot .TimestampNS )),
228+ appMap )
229+ }
230+
231+ return nil
232+ }
233+
234+ var (
235+ readMetricNames = []string {
236+ metrics .MetricReplicaGetRequests ,
237+ metrics .MetricReplicaMultiGetRequests ,
238+ metrics .MetricReplicaBatchGetRequests ,
239+ metrics .MetricReplicaScanRequests ,
240+ }
241+
242+ writeMetricNames = []string {
243+ metrics .MetricReplicaPutRequests ,
244+ metrics .MetricReplicaMultiGetRequests ,
245+ metrics .MetricReplicaRemoveRequests ,
246+ metrics .MetricReplicaMultiRemoveRequests ,
247+ metrics .MetricReplicaIncrRequests ,
248+ metrics .MetricReplicaCheckAndSetRequests ,
249+ metrics .MetricReplicaCheckAndMutateRequests ,
250+ metrics .MetricReplicaDupRequests ,
251+ }
252+
253+ metricFilter = metrics .NewMetricBriefValueFilter (
254+ []string {metrics .MetricEntityTypeReplica },
255+ []string {},
256+ map [string ]string {},
257+ append (append ([]string (nil ), readMetricNames ... ), writeMetricNames ... ),
258+ )
259+ )
260+
261+ func (d * partitionDetectorImpl ) pullMetrics (nodes []* admin.NodeInfo ) ([]* metrics.MetricQueryBriefValueSnapshot , error ) {
262+ results := make ([]* metrics.MetricQueryBriefValueSnapshot , len (nodes ))
263+
264+ ctx , cancel := context .WithTimeout (context .Background (), d .cfg .PullMetricsTimeout )
265+ defer cancel ()
266+
267+ // Pull the metrics simultaneously from all nodes.
268+ eg , ctx := errgroup .WithContext (ctx )
269+ puller := func (index int , node * admin.NodeInfo ) func () error {
270+ return func () error {
271+ // Create a client for each target node.
272+ metricClient := metrics .NewMetricClient (& metrics.MetricClientConfig {
273+ Host : node .HpNode .GetHost (),
274+ Port : node .HpNode .GetPort (),
275+ })
276+
277+ // Pull the metrics from the target node.
278+ snapshot , err := metricClient .GetBriefValueSnapshot (ctx , metricFilter )
279+ if err != nil {
280+ return err
281+ }
282+
283+ // Place the pulled result into the position in the slice that correspond to
284+ // the target node.
285+ results [index ] = snapshot
286+ return nil
287+ }
288+ }
289+
290+ for i , node := range nodes {
291+ // Launch one Go routine for each target node to pull metrics from it.
292+ eg .Go (puller (i , node ))
293+ }
294+
295+ // Wait all requests to be finished.
296+ if err := eg .Wait (); err != nil {
297+ return nil , err
298+ }
299+
300+ return results , nil
301+ }
302+
303+ func (d * partitionDetectorImpl ) calculateStats (
304+ snapshot * metrics.MetricQueryBriefValueSnapshot ,
305+ node * admin.NodeInfo ,
306+ adder aggregator ,
307+ appMap appStatsMap ) {
308+ for _ , entity := range snapshot .Entities {
309+ // The metric must belong to the entity of "replica".
310+ if entity .Type != metrics .MetricEntityTypeReplica {
311+ continue
312+ }
313+
314+ // The metric must have valid table id.
315+ appID , err := strconv .Atoi (entity .Attributes [metrics .MetricEntityTableID ])
316+ if err != nil {
317+ continue
318+ }
319+
320+ // The table must exist in the returned metadata, which means it is available.
321+ stats , ok := appMap [int32 (appID )]
322+ if ! ok {
323+ continue
324+ }
325+
326+ // The metric must have valid partition id.
327+ partitionID , err := strconv .Atoi (entity .Attributes [metrics .MetricEntityPartitionID ])
328+ if err != nil {
329+ continue
330+ }
331+
332+ // The partition id should be less than the number of partitions.
333+ if partitionID >= len (stats .partitionConfigs ) {
334+ continue
335+ }
336+
337+ // Only primary replica of a partition will be counted.
338+ // TODO(wangdan): support Equal() for base.HostPort.
339+ primary := stats .partitionConfigs [partitionID ].HpPrimary
340+ if primary .GetHost () != node .HpNode .GetHost () ||
341+ primary .GetPort () != node .HpNode .GetPort () {
342+ continue
343+ }
344+
345+ for _ , metric := range entity .Metrics {
346+ // Perform cumulative calculation for each statistical value.
347+ adder (stats .partitionStats [partitionID ], metric .Name , metric .Value )
348+ }
349+ }
58350}
0 commit comments