Skip to content

Commit e4555f0

Browse files
authored
build(go-collector): bump go-client dependency in collector to latest version including ListNodes interface (#2322)
Upgrade to go-client version to [5eb1665](5eb1665) to introduce [ListNodes](#1939) for hotsport detection.
1 parent 68310cb commit e4555f0

11 files changed

Lines changed: 74 additions & 119 deletions

File tree

collector/aggregate/aggregator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"fmt"
2222
"time"
2323

24-
"github.com/apache/incubator-pegasus/go-client/idl/admin"
24+
"github.com/apache/incubator-pegasus/go-client/idl/replication"
2525
log "github.com/sirupsen/logrus"
2626
"github.com/spf13/viper"
2727
"gopkg.in/tomb.v2"
@@ -133,7 +133,7 @@ func (ag *tableStatsAggregator) updateTableMap() error {
133133
return nil
134134
}
135135

136-
func (ag *tableStatsAggregator) doUpdateTableMap(tables []*admin.AppInfo) {
136+
func (ag *tableStatsAggregator) doUpdateTableMap(tables []*replication.AppInfo) {
137137
currentTableSet := make(map[int32]*struct{})
138138
for _, tb := range tables {
139139
currentTableSet[tb.AppID] = nil

collector/aggregate/aggregator_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package aggregate
2020
import (
2121
"testing"
2222

23-
"github.com/apache/incubator-pegasus/go-client/idl/admin"
2423
"github.com/apache/incubator-pegasus/go-client/idl/base"
24+
"github.com/apache/incubator-pegasus/go-client/idl/replication"
2525
"github.com/stretchr/testify/assert"
2626
)
2727

@@ -36,7 +36,7 @@ func TestUpdateLocalTableMap(t *testing.T) {
3636
assert.Equal(t, len(ag.tables[1].Partitions), 4) // test
3737
assert.Equal(t, len(ag.tables[2].Partitions), 8) // stat
3838

39-
tables := []*admin.AppInfo{
39+
tables := []*replication.AppInfo{
4040
{AppID: 1, AppName: "stat", PartitionCount: 4},
4141
{AppID: 2, AppName: "test", PartitionCount: 8},
4242
{AppID: 3, AppName: "new_table", PartitionCount: 16},
@@ -45,7 +45,7 @@ func TestUpdateLocalTableMap(t *testing.T) {
4545
assert.Equal(t, len(ag.tables), 3)
4646
assert.Equal(t, len(ag.tables[3].Partitions), 16)
4747

48-
tables = []*admin.AppInfo{
48+
tables = []*replication.AppInfo{
4949
{AppID: 1, AppName: "stat", PartitionCount: 4},
5050
}
5151
ag.doUpdateTableMap(tables)
@@ -57,7 +57,7 @@ func TestUpdatePartitionStats(t *testing.T) {
5757
ag := &tableStatsAggregator{
5858
tables: make(map[int32]*TableStats),
5959
}
60-
tables := []*admin.AppInfo{
60+
tables := []*replication.AppInfo{
6161
{AppID: 1, AppName: "stat", PartitionCount: 4},
6262
}
6363
ag.doUpdateTableMap(tables)

collector/aggregate/perf_client.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/apache/incubator-pegasus/go-client/idl/admin"
2727
"github.com/apache/incubator-pegasus/go-client/idl/base"
28+
"github.com/apache/incubator-pegasus/go-client/idl/replication"
2829
"github.com/apache/incubator-pegasus/go-client/session"
2930
log "github.com/sirupsen/logrus"
3031
batchErr "k8s.io/apimachinery/pkg/util/errors"
@@ -171,7 +172,7 @@ func (m *PerfClient) GetNodeStats(filter string) ([]*NodeStat, error) {
171172
func (m *PerfClient) listNodes() ([]*admin.NodeInfo, error) {
172173
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
173174
defer cancel()
174-
resp, err := m.meta.ListNodes(ctx, &admin.ListNodesRequest{
175+
resp, err := m.meta.ListNodes(ctx, &admin.ConfigurationListNodesRequest{
175176
Status: admin.NodeStatus_NS_ALIVE,
176177
})
177178
if err != nil {
@@ -180,11 +181,11 @@ func (m *PerfClient) listNodes() ([]*admin.NodeInfo, error) {
180181
return resp.Infos, nil
181182
}
182183

183-
func (m *PerfClient) listTables() ([]*admin.AppInfo, error) {
184+
func (m *PerfClient) listTables() ([]*replication.AppInfo, error) {
184185
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
185186
defer cancel()
186-
resp, err := m.meta.ListApps(ctx, &admin.ListAppsRequest{
187-
Status: admin.AppStatus_AS_AVAILABLE,
187+
resp, err := m.meta.ListApps(ctx, &admin.ConfigurationListAppsRequest{
188+
Status: replication.AppStatus_AS_AVAILABLE,
188189
})
189190
if err != nil {
190191
return nil, err
@@ -202,7 +203,7 @@ func (m *PerfClient) updateNodes() {
202203

203204
newNodes := make(map[string]*PerfSession)
204205
for _, n := range nodeInfos {
205-
addr := n.Address.GetAddress()
206+
addr := n.Node.GetAddress()
206207
node, found := m.nodes[addr]
207208
if !found {
208209
newNodes[addr] = NewPerfSession(addr)

collector/aggregate/perf_session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (p *PerfCounter) String() string {
4848
func NewPerfSession(addr string) *PerfSession {
4949
return &PerfSession{
5050
Address: addr,
51-
NodeSession: session.NewNodeSession(addr, session.NodeTypeReplica),
51+
NodeSession: session.NewNodeSession(addr, session.NodeTypeReplica, false),
5252
}
5353
}
5454

collector/aggregate/table_stats.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package aggregate
2020
import (
2121
"time"
2222

23-
"github.com/apache/incubator-pegasus/go-client/idl/admin"
2423
"github.com/apache/incubator-pegasus/go-client/idl/base"
24+
"github.com/apache/incubator-pegasus/go-client/idl/replication"
2525
)
2626

2727
// PartitionStats is a set of metrics retrieved from this partition.
@@ -59,7 +59,7 @@ type ClusterStats struct {
5959
Stats map[string]float64
6060
}
6161

62-
func newTableStats(info *admin.AppInfo) *TableStats {
62+
func newTableStats(info *replication.AppInfo) *TableStats {
6363
tb := &TableStats{
6464
TableName: info.AppName,
6565
AppID: int(info.AppID),

collector/avail/detector.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
"github.com/apache/incubator-pegasus/go-client/admin"
26+
"github.com/apache/incubator-pegasus/go-client/config"
2627
"github.com/apache/incubator-pegasus/go-client/pegasus"
2728
"github.com/prometheus/client_golang/prometheus"
2829
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -39,16 +40,35 @@ type Detector interface {
3940

4041
// NewDetector returns a service-availability detector.
4142
func NewDetector(detectInterval time.Duration,
42-
detectTimeout time.Duration, partitionCount int) Detector {
43+
detectTimeout time.Duration) Detector {
4344
metaServers := viper.GetStringSlice("meta_servers")
44-
tableName := viper.GetStringMapString("availablity_detect")["table_name"]
45+
if len(metaServers) == 0 {
46+
log.Fatal("meta_servers is empty")
47+
}
48+
49+
tableName := viper.GetString("availability_detect.table_name")
50+
if len(tableName) == 0 {
51+
log.Fatal("availability_detect.table_name is empty")
52+
}
53+
54+
partitionCount := viper.GetInt32("availability_detect.partition_count")
55+
if partitionCount <= 0 || (partitionCount&(partitionCount-1)) != 0 {
56+
log.Fatalf("availability_detect.partition_count(%d) must be power of 2", partitionCount)
57+
}
58+
59+
maxReplicaCount := viper.GetInt32("availability_detect.max_replica_count")
60+
if maxReplicaCount <= 0 {
61+
log.Fatalf("availability_detect.max_replica_count(%d) must be > 0", partitionCount)
62+
}
63+
4564
// Create detect table.
46-
adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
47-
err := adminClient.CreateTable(context.Background(), tableName, partitionCount)
65+
adminClient := admin.NewClient(admin.Config{MetaServers: metaServers, Timeout: 10 * time.Second})
66+
_, err := adminClient.CreateTable(tableName, partitionCount, maxReplicaCount, make(map[string]string), 600, true)
4867
if err != nil {
49-
log.Errorf("Create detect table %s failed, error: %s", tableName, err)
68+
log.Fatalf("Create detect table %s failed, error: %s", tableName, err)
5069
}
51-
pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers: metaServers})
70+
71+
pegasusClient := pegasus.NewClient(*config.NewConfig(metaServers))
5272
return &pegasusDetector{
5373
client: pegasusClient,
5474
detectTableName: tableName,
@@ -93,7 +113,7 @@ type pegasusDetector struct {
93113
// timeout of a single detect.
94114
detectTimeout time.Duration
95115
// partition count.
96-
partitionCount int
116+
partitionCount int32
97117
}
98118

99119
func (d *pegasusDetector) Run(tom *tomb.Tomb) error {
@@ -143,7 +163,7 @@ func (d *pegasusDetector) detectPartition() {
143163
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
144164

145165
// Generate a random string.
146-
func RandStringBytes(n int) string {
166+
func RandStringBytes(n int32) string {
147167
b := make([]byte, n)
148168
for i := range b {
149169
b[i] = letterBytes[rand.Intn(len(letterBytes))]

collector/config.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ falcon_agent:
4242
port : 1988
4343
http_path : "/v1/push"
4444

45-
availablity_detect:
45+
availability_detect:
4646
table_name : test
47+
partition_count : 16
48+
max_replica_count : 3
4749

4850
hotspot:
4951
partition_detect_interval : 10s

collector/go.mod

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ module github.com/apache/incubator-pegasus/collector
2020
go 1.18
2121

2222
require (
23-
github.com/apache/incubator-pegasus/go-client v0.0.0-20220526071020-be5634371701
23+
github.com/apache/incubator-pegasus/go-client v0.0.0-20251112031012-5eb1665e0630
2424
github.com/kataras/iris/v12 v12.2.0
25-
github.com/prometheus/client_golang v1.11.1
25+
github.com/prometheus/client_golang v1.18.0
2626
github.com/sirupsen/logrus v1.8.1
2727
github.com/spf13/viper v1.7.1
2828
github.com/stretchr/testify v1.8.2
@@ -43,13 +43,12 @@ require (
4343
github.com/aymerick/douceur v0.2.0 // indirect
4444
github.com/beorn7/perks v1.0.1 // indirect
4545
github.com/cenkalti/backoff/v4 v4.1.0 // indirect
46-
github.com/cespare/xxhash/v2 v2.1.2 // indirect
46+
github.com/cespare/xxhash/v2 v2.2.0 // indirect
4747
github.com/davecgh/go-spew v1.1.1 // indirect
4848
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 // indirect
4949
github.com/fatih/structs v1.1.0 // indirect
5050
github.com/flosch/pongo2/v4 v4.0.2 // indirect
5151
github.com/fsnotify/fsnotify v1.5.4 // indirect
52-
github.com/golang/protobuf v1.5.2 // indirect
5352
github.com/golang/snappy v0.0.4 // indirect
5453
github.com/google/uuid v1.3.0 // indirect
5554
github.com/gorilla/css v1.0.0 // indirect
@@ -65,14 +64,14 @@ require (
6564
github.com/magiconair/properties v1.8.1 // indirect
6665
github.com/mailgun/raymond/v2 v2.0.48 // indirect
6766
github.com/mailru/easyjson v0.7.7 // indirect
68-
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
67+
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
6968
github.com/microcosm-cc/bluemonday v1.0.23 // indirect
7069
github.com/mitchellh/mapstructure v1.1.2 // indirect
7170
github.com/pelletier/go-toml v1.2.0 // indirect
7271
github.com/pmezard/go-difflib v1.0.0 // indirect
73-
github.com/prometheus/client_model v0.2.0 // indirect
74-
github.com/prometheus/common v0.26.0 // indirect
75-
github.com/prometheus/procfs v0.6.0 // indirect
72+
github.com/prometheus/client_model v0.5.0 // indirect
73+
github.com/prometheus/common v0.45.0 // indirect
74+
github.com/prometheus/procfs v0.12.0 // indirect
7675
github.com/russross/blackfriday/v2 v2.1.0 // indirect
7776
github.com/schollz/closestmatch v2.1.0+incompatible // indirect
7877
github.com/sergi/go-diff v1.1.0 // indirect

0 commit comments

Comments
 (0)