Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions collector/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ hotspot:
partition_detect_interval : 30s
pull_metrics_timeout : 5s
sample_metrics_interval : 10s
max_sample_size : 128
1 change: 1 addition & 0 deletions collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go 1.18

require (
github.com/apache/incubator-pegasus/go-client v0.0.0-20260121121155-96868ed93b2a
github.com/gammazero/deque v1.0.0
github.com/kataras/iris/v12 v12.2.0
github.com/prometheus/client_golang v1.18.0
github.com/sirupsen/logrus v1.8.1
Expand Down
2 changes: 2 additions & 0 deletions collector/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
Expand Down
163 changes: 134 additions & 29 deletions collector/hotspot/partition_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/apache/incubator-pegasus/collector/metrics"
client "github.com/apache/incubator-pegasus/go-client/admin"
"github.com/apache/incubator-pegasus/go-client/idl/admin"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/gammazero/deque"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
Expand All @@ -43,6 +45,7 @@ type PartitionDetectorConfig struct {
DetectInterval time.Duration
PullMetricsTimeout time.Duration
SampleMetricsInterval time.Duration
MaxSampleSize int
}

func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
Expand All @@ -52,6 +55,7 @@ func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
DetectInterval: viper.GetDuration("hotspot.partition_detect_interval"),
PullMetricsTimeout: viper.GetDuration("hotspot.pull_metrics_timeout"),
SampleMetricsInterval: viper.GetDuration("hotspot.sample_metrics_interval"),
MaxSampleSize: viper.GetInt("hotspot.max_sample_size"),
}
}

Expand All @@ -78,12 +82,15 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig) (PartitionDetector, erro
}

return &partitionDetectorImpl{
cfg: cfg,
cfg: cfg,
analyzers: make(map[partitionAnalyzerKey]*partitionAnalyzer),
}, nil
}

type partitionDetectorImpl struct {
cfg *PartitionDetectorConfig
cfg *PartitionDetectorConfig
mtx sync.RWMutex
analyzers map[partitionAnalyzerKey]*partitionAnalyzer
}

func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
Expand Down Expand Up @@ -115,44 +122,50 @@ type appStats struct {
appName string
partitionCount int32
partitionConfigs []*replication.PartitionConfiguration
partitionStats []map[string]float64 // {metric_name -> metric_value} for each partition.
partitionStats []map[string]float64 // {metricName -> metricValue} for each partition.
}

func (d *partitionDetectorImpl) aggregate() error {
adminClient := client.NewClient(client.Config{
MetaServers: d.cfg.MetaServers,
Timeout: d.cfg.RpcTimeout,
})
defer adminClient.Close()

// appMap is the final structure that includes all the statistical values.
appMap, err := pullTablePartitions(adminClient)
// appMap includes the structures that hold all the final statistical values.
appMap, nodes, err := d.fetchMetadata()
if err != nil {
return err
}

err = d.aggregateMetrics(adminClient, appMap)
err = d.aggregateMetrics(appMap, nodes)
if err != nil {
return err
}

d.addHotspotSamples(appMap)

return nil
}

// Pull metadata of all available tables with all their partitions and form the final structure
// that includes all the statistical values.
func pullTablePartitions(adminClient client.Client) (appStatsMap, error) {
// Fetch necessary metadata from meta server for the aggregation of metrics, including:
// - the metadata of all available tables with all their partitions, and
// - the node information of all replica servers.
// Also, the returned appStatsMap includes the structures that hold all the final
// statistical values.
func (d *partitionDetectorImpl) fetchMetadata() (appStatsMap, []*admin.NodeInfo, error) {
adminClient := client.NewClient(client.Config{
MetaServers: d.cfg.MetaServers,
Timeout: d.cfg.RpcTimeout,
})
defer adminClient.Close()

// Fetch the information of all available tables.
tables, err := adminClient.ListTables()
if err != nil {
return nil, err
return nil, nil, err
}

appMap := make(appStatsMap)
for _, table := range tables {
// Query metadata for each partition of each table.
appID, partitionCount, partitionConfigs, err := adminClient.QueryConfig(table.AppName)
if err != nil {
return nil, err
return nil, nil, err
}

// Initialize statistical value for each partition.
Expand All @@ -170,20 +183,22 @@ func pullTablePartitions(adminClient client.Client) (appStatsMap, error) {
}
}

return appMap, nil
}

type aggregator func(map[string]float64, string, float64)

// Pull metrics from all nodes and aggregate them to produce the statistics.
func (d *partitionDetectorImpl) aggregateMetrics(adminClient client.Client, appMap appStatsMap) error {
// Fetch the node information of all replica servers.
nodes, err := adminClient.ListNodes()
if err != nil {
return err
return nil, nil, err
}

// Pull multiple results of metrics to perform cumulative calculation to produce the
// statistics such as QPS.
return appMap, nodes, nil
}

type aggregator func(map[string]float64, string, float64)

// Pull metric samples from nodes and aggregate them to produce the final statistical results
// into appMap.
func (d *partitionDetectorImpl) aggregateMetrics(appMap appStatsMap, nodes []*admin.NodeInfo) error {
// Pull multiple samples of metrics to perform cumulative calculation to produce the
// statistical results such as QPS.
startSnapshots, err := d.pullMetrics(nodes)
if err != nil {
return err
Expand All @@ -204,7 +219,7 @@ func (d *partitionDetectorImpl) aggregateMetrics(adminClient client.Client, appM

d.calculateStats(snapshot, nodes[i],
func(stats map[string]float64, key string, operand float64) {
// Just set the ending number of requests.
// Just set the number of requests with ending snapshot.
stats[key] = operand
},
appMap)
Expand All @@ -220,7 +235,7 @@ func (d *partitionDetectorImpl) aggregateMetrics(adminClient client.Client, appM
return
}

// Calculate QPS based on the ending number of requests that have been
// Calculate QPS based on ending snapshot that have been
// set previously.
stats[key] = (value - operand) / duration.Seconds()
}
Expand Down Expand Up @@ -348,3 +363,93 @@ func (d *partitionDetectorImpl) calculateStats(
}
}
}

// Since the partition number of a table might be changed, use (appID, partitionCount)
// pair as the key for each table.
type partitionAnalyzerKey struct {
appID int32
partitionCount int32
}

const (
readHotspotData = iota
writeHotspotData
operateHotspotDataNumber
)

// hotspotPartitionStats holds all the statistical values of each partition, used for analysis
// of hotspot partitions.
type hotspotPartitionStats struct {
totalQPS [operateHotspotDataNumber]float64
}

// Receive statistical values of all kinds of reads and writes, and by aggregating them we
// can obtain the overall statistics of reads and writes.
func calculateHotspotStats(appMap appStatsMap) map[partitionAnalyzerKey][]hotspotPartitionStats {
results := make(map[partitionAnalyzerKey][]hotspotPartitionStats)
for appID, stats := range appMap {
partitionCount := len(stats.partitionStats)
value := make([]hotspotPartitionStats, 0, partitionCount)

for _, partitionStats := range stats.partitionStats {
var hotspotStats hotspotPartitionStats

// Calculate total QPS over all kinds of reads.
for _, metricName := range readMetricNames {
hotspotStats.totalQPS[readHotspotData] += partitionStats[metricName]
}

// Calculate total QPS over all kinds of writes.
for _, metricName := range writeMetricNames {
hotspotStats.totalQPS[writeHotspotData] += partitionStats[metricName]
}

value = append(value, hotspotStats)
}

key := partitionAnalyzerKey{appID: appID, partitionCount: int32(partitionCount)}
results[key] = value
}

return results
}

// Calculate statistical values over multiples tables with all partitions of each table as
// a sample used for future analysis of hotspot partitions.
func (d *partitionDetectorImpl) addHotspotSamples(appMap appStatsMap) {
hotspotMap := calculateHotspotStats(appMap)

d.mtx.Lock()
defer d.mtx.Unlock()

for key, value := range hotspotMap {
analyzer, ok := d.analyzers[key]
if !ok {
analyzer = newPartitionAnalyzer(d.cfg.MaxSampleSize)
d.analyzers[key] = analyzer
}

analyzer.addSample(value)
}
}

func newPartitionAnalyzer(maxSampleSize int) *partitionAnalyzer {
return &partitionAnalyzer{maxSampleSize: maxSampleSize}
}

// partitionAnalyzer holds the samples for all partitions of a table and analyses hotspot
// partitions based on them.
type partitionAnalyzer struct {
// TODO(wangdan): bump gammazero/deque to the lastest version after upgrading Go to 1.23+,
// since older Go versions do not support the `Deque.Iter()` iterator interface.
maxSampleSize int
samples deque.Deque[[]hotspotPartitionStats] // Each element is a sample of all partitions of the table
}

func (a *partitionAnalyzer) addSample(sample []hotspotPartitionStats) {
for a.samples.Len() >= a.maxSampleSize {
a.samples.PopFront()
}

a.samples.PushBack(sample)
}
Loading