Skip to content

Commit 02eef41

Browse files
committed
refactor(FQDN): feather refactor on idl/dsn.layer2.thrift 2
1 parent 76cd798 commit 02eef41

10 files changed

Lines changed: 62 additions & 23 deletions

File tree

src/meta/backup_engine.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ void backup_engine::backup_app_partition(const gpid &pid)
182182
_is_backup_failed = true;
183183
return;
184184
}
185-
partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
185+
GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary, partition_primary);
186186
}
187187

188188
if (!partition_primary) {

src/meta/duplication/meta_duplication_service.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -764,21 +764,24 @@ void meta_duplication_service::check_follower_app_if_create_completed(
764764
query_err = ERR_INCONSISTENT_STATE;
765765
} else {
766766
for (const auto &pc : resp.partitions) {
767-
if (!pc.hp_primary) {
767+
host_port primary;
768+
std::vector<host_port> secondaries;
769+
GET_HOST_PORT(pc, primary, primary);
770+
GET_HOST_PORTS(pc, secondaries, secondaries);
771+
if (!primary) {
768772
// Fail once the primary replica is unavailable.
769773
query_err = ERR_INACTIVE_STATE;
770774
break;
771775
}
772776

773777
// Once replica count is more than 1, at least one secondary replica
774778
// is required.
775-
if (1 + pc.hp_secondaries.size() < pc.max_replica_count &&
776-
pc.hp_secondaries.empty()) {
779+
if (1 + secondaries.size() < pc.max_replica_count && secondaries.empty()) {
777780
query_err = ERR_NOT_ENOUGH_MEMBER;
778781
break;
779782
}
780783

781-
for (const auto &secondary : pc.hp_secondaries) {
784+
for (const auto &secondary : secondaries) {
782785
if (!secondary) {
783786
// Fail once any secondary replica is unavailable.
784787
query_err = ERR_INACTIVE_STATE;

src/meta/meta_backup_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ void policy_context::start_backup_partition_unlocked(gpid pid)
532532
pid, cold_backup_constant::PROGRESS_FINISHED, dsn::host_port());
533533
return;
534534
}
535-
partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
535+
GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary, partition_primary);
536536
}
537537
if (!partition_primary) {
538538
LOG_WARNING("{}: partition {} doesn't have a primary now, retry to backup it later",

src/meta/meta_bulk_load_service.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
475475
const std::string &app_name = request.app_name;
476476
const gpid &pid = request.pid;
477477
const auto &primary_addr = request.primary;
478-
const auto &primary_hp = request.hp_primary;
478+
host_port primary_hp;
479+
GET_HOST_PORT(request, primary, primary_hp);
479480

480481
if (err != ERR_OK) {
481482
LOG_ERROR("app({}), partition({}) failed to receive bulk load response from node({}), "

src/meta/server_state.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2053,14 +2053,20 @@ void server_state::drop_partition(std::shared_ptr<app_state> &app, int pidx)
20532053
SET_OBJ_IP_AND_HOST_PORT(request, node, pc, primary);
20542054

20552055
request.config = pc;
2056-
for (const auto &secondary : pc.hp_secondaries) {
2057-
maintain_drops(request.config.hp_last_drops, secondary, request.type);
2056+
host_port primary;
2057+
std::vector<host_port> secondaries;
2058+
std::vector<host_port> last_drops;
2059+
GET_HOST_PORT(pc, primary, primary);
2060+
GET_HOST_PORTS(pc, secondaries, secondaries);
2061+
GET_HOST_PORTS(pc, last_drops, last_drops);
2062+
for (const auto &secondary : secondaries) {
2063+
maintain_drops(last_drops, secondary, request.type);
20582064
}
20592065
for (const auto &secondary : pc.secondaries) {
20602066
maintain_drops(request.config.last_drops, secondary, request.type);
20612067
}
2062-
if (pc.hp_primary) {
2063-
maintain_drops(request.config.hp_last_drops, pc.hp_primary, request.type);
2068+
if (primary) {
2069+
maintain_drops(last_drops, primary, request.type);
20642070
}
20652071
if (pc.primary) {
20662072
maintain_drops(request.config.last_drops, pc.primary, request.type);
@@ -2125,7 +2131,9 @@ void server_state::downgrade_primary_to_inactive(std::shared_ptr<app_state> &app
21252131
SET_OBJ_IP_AND_HOST_PORT(request, node, pc, primary);
21262132
request.config.ballot++;
21272133
RESET_IP_AND_HOST_PORT(request.config, primary);
2128-
maintain_drops(request.config.hp_last_drops, pc.hp_primary, request.type);
2134+
host_port primary;
2135+
GET_HOST_PORT(pc, primary, primary);
2136+
maintain_drops(request.config.hp_last_drops, primary, request.type);
21292137
maintain_drops(request.config.last_drops, pc.primary, request.type);
21302138

21312139
cc.stage = config_status::pending_remote_sync;

src/meta/server_state_restore.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,11 @@ void server_state::on_query_restore_status(configuration_query_restore_rpc rpc)
251251
for (int32_t i = 0; i < app->partition_count; i++) {
252252
const auto &r_state = app->helpers->restore_states[i];
253253
const auto &pc = app->pcs[i];
254-
if (pc.hp_primary || !pc.hp_secondaries.empty()) {
254+
host_port primary;
255+
std::vector<host_port> secondaries;
256+
GET_HOST_PORT(pc, primary, primary);
257+
GET_HOST_PORTS(pc, secondaries, secondaries);
258+
if (primary || !secondaries.empty()) {
255259
// already have primary, restore succeed
256260
continue;
257261
}

src/replica/bulk_load/replica_bulk_loader.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -936,10 +936,12 @@ void replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
936936
primary_state.__set_download_progress(_download_progress.load());
937937
primary_state.__set_download_status(_download_status.load());
938938
}
939+
host_port primary;
940+
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
939941
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
940942
group_bulk_load_state,
941943
_replica->_primary_states.pc.primary,
942-
_replica->_primary_states.pc.hp_primary,
944+
primary,
943945
primary_state);
944946
LOG_INFO_PREFIX("primary = {}, download progress = {}%, status = {}",
945947
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),
@@ -978,10 +980,12 @@ void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
978980

979981
partition_bulk_load_state primary_state;
980982
primary_state.__set_ingest_status(_replica->_app->get_ingestion_status());
983+
host_port primary;
984+
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
981985
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
982986
group_bulk_load_state,
983987
_replica->_primary_states.pc.primary,
984-
_replica->_primary_states.pc.hp_primary,
988+
primary,
985989
primary_state);
986990
LOG_INFO_PREFIX("primary = {}, ingestion status = {}",
987991
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),
@@ -1025,10 +1029,12 @@ void replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response)
10251029

10261030
partition_bulk_load_state primary_state;
10271031
primary_state.__set_is_cleaned_up(is_cleaned_up());
1032+
host_port primary;
1033+
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
10281034
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
10291035
group_bulk_load_state,
10301036
_replica->_primary_states.pc.primary,
1031-
_replica->_primary_states.pc.hp_primary,
1037+
primary,
10321038
primary_state);
10331039
LOG_INFO_PREFIX("primary = {}, bulk load states cleaned_up = {}",
10341040
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),
@@ -1064,10 +1070,12 @@ void replica_bulk_loader::report_group_is_paused(bulk_load_response &response)
10641070

10651071
partition_bulk_load_state primary_state;
10661072
primary_state.__set_is_paused(_status == bulk_load_status::BLS_PAUSED);
1073+
host_port primary;
1074+
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
10671075
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
10681076
group_bulk_load_state,
10691077
_replica->_primary_states.pc.primary,
1070-
_replica->_primary_states.pc.hp_primary,
1078+
primary,
10711079
primary_state);
10721080
LOG_INFO_PREFIX("primary = {}, bulk_load is_paused = {}",
10731081
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),

src/replica/replica_context.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,17 @@ void primary_context::get_replica_config(partition_status::type st,
133133

134134
bool primary_context::check_exist(const ::dsn::host_port &node, partition_status::type st)
135135
{
136+
host_port primary;
137+
std::vector<host_port> secondaries;
138+
GET_HOST_PORT(pc, primary, primary);
139+
GET_HOST_PORTS(pc, secondaries, secondaries);
136140
switch (st) {
137141
case partition_status::PS_PRIMARY:
138-
return pc.hp_primary == node;
142+
DCHECK(pc.__isset.hp_primary, "");
143+
return primary == node;
139144
case partition_status::PS_SECONDARY:
140-
return utils::contains(pc.hp_secondaries, node);
145+
DCHECK(pc.__isset.hp_secondaries, "");
146+
return utils::contains(secondaries, node);
141147
case partition_status::PS_POTENTIAL_SECONDARY:
142148
return learners.find(node) != learners.end();
143149
default:

src/replica/replica_stub.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1699,7 +1699,9 @@ void replica_stub::on_node_query_reply_scatter(replica_stub_ptr this_,
16991699
req.__isset.meta_split_status ? req.meta_split_status
17001700
: split_status::NOT_SPLIT);
17011701
} else {
1702-
if (req.config.hp_primary == _primary_host_port) {
1702+
host_port primary;
1703+
GET_HOST_PORT(req.config, primary, primary);
1704+
if (primary == _primary_host_port) {
17031705
LOG_INFO("{}@{}: replica not exists on replica server, which is primary, remove it "
17041706
"from meta server",
17051707
req.config.pid,
@@ -1750,7 +1752,9 @@ void replica_stub::remove_replica_on_meta_server(const app_info &info,
17501752
SET_IP_AND_HOST_PORT(*request, node, primary_address(), _primary_host_port);
17511753
request->type = config_type::CT_DOWNGRADE_TO_INACTIVE;
17521754

1753-
if (_primary_host_port == pc.hp_primary) {
1755+
host_port primary;
1756+
GET_HOST_PORT(pc, primary, primary);
1757+
if (_primary_host_port == primary) {
17541758
RESET_IP_AND_HOST_PORT(request->config, primary);
17551759
} else if (REMOVE_IP_AND_HOST_PORT(
17561760
primary_address(), _primary_host_port, request->config, secondaries)) {

src/replica/storage/simple_kv/test/common.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
// IWYU pragma: no_include <ext/alloc_traits.h>
3232
#include <stddef.h>
3333
#include <sstream>
34+
#include <vector>
3435

3536
#include "checker.h"
3637
#include "common/replication_enums.h"
@@ -319,8 +320,12 @@ void parti_config::convert_from(const partition_configuration &pc)
319320
{
320321
pid = pc.pid;
321322
ballot = pc.ballot;
322-
primary = address_to_node(pc.hp_primary);
323-
for (const auto &secondary : pc.hp_secondaries) {
323+
host_port hp_primary;
324+
GET_HOST_PORT(pc, primary, hp_primary);
325+
std::vector<host_port> hp_secondaries;
326+
GET_HOST_PORTS(pc, secondaries, hp_secondaries);
327+
primary = address_to_node(hp_primary);
328+
for (const auto &secondary : hp_secondaries) {
324329
secondaries.push_back(address_to_node(secondary));
325330
}
326331
std::sort(secondaries.begin(), secondaries.end());

0 commit comments

Comments
 (0)