Skip to content
Merged
94 changes: 73 additions & 21 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -1116,8 +1116,8 @@ class aggregate_stats_calcs
class total_aggregate_stats : public aggregate_stats
{
public:
total_aggregate_stats(const std::string &entity_type, stat_var_map &&stat_vars)
: _my_entity_type(entity_type), _my_stat_vars(std::move(stat_vars))
total_aggregate_stats(std::string entity_type, stat_var_map &&stat_vars)
: _my_entity_type(std::move(entity_type)), _my_stat_vars(std::move(stat_vars))
{
}

Expand All @@ -1134,13 +1134,14 @@ class total_aggregate_stats : public aggregate_stats

void calc_rates(double duration_s) override
{
for (auto &stat_var : _my_stat_vars) {
*stat_var.second /= duration_s;
for (auto &[_, stat_var] : _my_stat_vars) {
*stat_var /= duration_s;
}
}

private:
DISALLOW_COPY_AND_ASSIGN(total_aggregate_stats);
DISALLOW_MOVE_AND_ASSIGN(total_aggregate_stats);

const std::string _my_entity_type;
stat_var_map _my_stat_vars;
Expand All @@ -1160,10 +1161,10 @@ using table_stat_map = std::unordered_map<int32_t, stat_var_map>;
class table_aggregate_stats : public aggregate_stats
{
public:
table_aggregate_stats(const std::string &entity_type,
table_aggregate_stats(std::string entity_type,
table_stat_map &&table_stats,
const std::unordered_set<dsn::gpid> &partitions)
: _my_entity_type(entity_type),
std::unordered_set<dsn::gpid> partitions)
: _my_entity_type(std::move(entity_type)),
_my_table_stats(std::move(table_stats)),
_my_partitions(std::move(partitions))
{
Expand All @@ -1178,21 +1179,21 @@ class table_aggregate_stats : public aggregate_stats
{
RETURN_NULL_STAT_VARS_IF(entity_type != _my_entity_type);

int32_t metric_table_id;
int32_t metric_table_id{-1};
RETURN_NULL_STAT_VARS_IF_NOT_OK(dsn::parse_metric_table_id(entity_attrs, metric_table_id));

// Empty `_my_partitions` means there is no restriction; otherwise, the partition id
// should be found in `_my_partitions`.
if (!_my_partitions.empty()) {
int32_t metric_partition_id;
int32_t metric_partition_id{-1};
RETURN_NULL_STAT_VARS_IF_NOT_OK(
dsn::parse_metric_partition_id(entity_attrs, metric_partition_id));

dsn::gpid metric_pid(metric_table_id, metric_partition_id);
RETURN_NULL_STAT_VARS_IF(_my_partitions.find(metric_pid) == _my_partitions.end());
}

const auto &table_stat = _my_table_stats.find(metric_table_id);
const auto table_stat = _my_table_stats.find(metric_table_id);
CHECK_TRUE(table_stat != _my_table_stats.end());

*stat_vars = &table_stat->second;
Expand All @@ -1201,15 +1202,16 @@ class table_aggregate_stats : public aggregate_stats

void calc_rates(double duration_s) override
{
for (auto &table_stats : _my_table_stats) {
for (auto &stat_var : table_stats.second) {
*stat_var.second /= duration_s;
for (auto &[_, table_stat] : _my_table_stats) {
for (auto &[_, stat_var] : table_stat) {
*stat_var /= duration_s;
}
}
}

private:
DISALLOW_COPY_AND_ASSIGN(table_aggregate_stats);
DISALLOW_MOVE_AND_ASSIGN(table_aggregate_stats);

const std::string _my_entity_type;
table_stat_map _my_table_stats;
Expand All @@ -1225,8 +1227,8 @@ using partition_stat_map = std::unordered_map<dsn::gpid, stat_var_map>;
class partition_aggregate_stats : public aggregate_stats
{
public:
partition_aggregate_stats(const std::string &entity_type, partition_stat_map &&partition_stats)
: _my_entity_type(entity_type), _my_partition_stats(std::move(partition_stats))
partition_aggregate_stats(std::string entity_type, partition_stat_map &&partition_stats)
: _my_entity_type(std::move(entity_type)), _my_partition_stats(std::move(partition_stats))
{
}

Expand All @@ -1239,15 +1241,15 @@ class partition_aggregate_stats : public aggregate_stats
{
RETURN_NULL_STAT_VARS_IF(entity_type != _my_entity_type);

int32_t metric_table_id;
int32_t metric_table_id{-1};
RETURN_NULL_STAT_VARS_IF_NOT_OK(dsn::parse_metric_table_id(entity_attrs, metric_table_id));

int32_t metric_partition_id;
int32_t metric_partition_id{-1};
RETURN_NULL_STAT_VARS_IF_NOT_OK(
dsn::parse_metric_partition_id(entity_attrs, metric_partition_id));

dsn::gpid metric_pid(metric_table_id, metric_partition_id);
const auto &partition_stat = _my_partition_stats.find(metric_pid);
const auto partition_stat = _my_partition_stats.find(metric_pid);
RETURN_NULL_STAT_VARS_IF(partition_stat == _my_partition_stats.end());

*stat_vars = &partition_stat->second;
Expand All @@ -1256,20 +1258,70 @@ class partition_aggregate_stats : public aggregate_stats

void calc_rates(double duration_s) override
{
for (auto &partition_stats : _my_partition_stats) {
for (auto &stat_var : partition_stats.second) {
*stat_var.second /= duration_s;
for (auto &[_, partition_stat] : _my_partition_stats) {
for (auto &[_, stat_var] : partition_stat) {
*stat_var /= duration_s;
}
}
}

private:
DISALLOW_COPY_AND_ASSIGN(partition_aggregate_stats);
DISALLOW_MOVE_AND_ASSIGN(partition_aggregate_stats);

const std::string _my_entity_type;
partition_stat_map _my_partition_stats;
};

using profiler_stat_map = std::unordered_map<std::string, stat_var_map>;

// Profiler-level aggregation over the fetched metrics. There are 2 dimensions for the aggregation:
// * the task name, the name of the RPC task, from the attributes of the metric entity;
// * the metric name, which is also the key of `stat_var_map`.
class profiler_aggregate_stats : public aggregate_stats
{
public:
profiler_aggregate_stats(std::string entity_type, profiler_stat_map &&profiler_stats)
: _my_entity_type(std::move(entity_type)), _my_profiler_stats(std::move(profiler_stats))
{
}

~profiler_aggregate_stats() override = default;

protected:
dsn::error_s get_stat_vars(const std::string &entity_type,
const dsn::metric_entity::attr_map &entity_attrs,
stat_var_map **stat_vars) override
{
RETURN_NULL_STAT_VARS_IF(entity_type != _my_entity_type);

const auto attr = std::as_const(entity_attrs).find("task_name");
RETURN_NULL_STAT_VARS_IF(attr == entity_attrs.end());

const auto profiler_stat = _my_profiler_stats.find(attr->second);
RETURN_NULL_STAT_VARS_IF(profiler_stat == _my_profiler_stats.end());

*stat_vars = &profiler_stat->second;
return dsn::error_s::ok();
}

void calc_rates(double duration_s) override
{
for (auto &[_, profiler_stat] : _my_profiler_stats) {
for (auto &[_, stat_var] : profiler_stat) {
*stat_var /= duration_s;
}
}
}

private:
DISALLOW_COPY_AND_ASSIGN(profiler_aggregate_stats);
DISALLOW_MOVE_AND_ASSIGN(profiler_aggregate_stats);

const std::string _my_entity_type;
profiler_stat_map _my_profiler_stats;
};

inline std::vector<std::pair<bool, std::string>>
call_remote_command(shell_context *sc,
const std::vector<node_desc> &nodes,
Expand Down
Loading
Loading