Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/meta/backup_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void backup_engine::backup_app_partition(const gpid &pid)
_is_backup_failed = true;
return;
}
partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary, partition_primary);
}

if (!partition_primary) {
Expand Down
11 changes: 7 additions & 4 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,21 +764,24 @@ void meta_duplication_service::check_follower_app_if_create_completed(
query_err = ERR_INCONSISTENT_STATE;
} else {
for (const auto &pc : resp.partitions) {
if (!pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (!primary) {
// Fail once the primary replica is unavailable.
query_err = ERR_INACTIVE_STATE;
break;
}

// Once replica count is more than 1, at least one secondary replica
// is required.
if (1 + pc.hp_secondaries.size() < pc.max_replica_count &&
pc.hp_secondaries.empty()) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
if (1 + secondaries.size() < pc.max_replica_count && secondaries.empty()) {
Comment thread
empiredan marked this conversation as resolved.
query_err = ERR_NOT_ENOUGH_MEMBER;
break;
}

for (const auto &secondary : pc.hp_secondaries) {
for (const auto &secondary : secondaries) {
if (!secondary) {
// Fail once any secondary replica is unavailable.
query_err = ERR_INACTIVE_STATE;
Expand Down
9 changes: 7 additions & 2 deletions src/meta/greedy_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,13 @@ bool greedy_load_balancer::all_replica_infos_collected(const node_state &ns)
{
const auto &n = ns.host_port();
return ns.for_each_partition([this, n](const dsn::gpid &pid) {
config_context &cc = *get_config_context(*(t_global_view->apps), pid);
if (cc.find_from_serving(n) == cc.serving.end()) {
config_context *ctx = get_config_context(*(t_global_view->apps), pid);
if (ctx == nullptr) {
LOG_INFO("get_config_context return nullptr for gpid({})", pid);
return false;
}

if (ctx->find_from_serving(n) == ctx->serving.end()) {
LOG_INFO("meta server hasn't collected gpid({})'s info of {}", pid, n);
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/meta_backup_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ void policy_context::start_backup_partition_unlocked(gpid pid)
pid, cold_backup_constant::PROGRESS_FINISHED, dsn::host_port());
return;
}
partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
GET_HOST_PORT(app->pcs[pid.get_partition_index()], primary, partition_primary);
}
if (!partition_primary) {
LOG_WARNING("{}: partition {} doesn't have a primary now, retry to backup it later",
Expand Down
3 changes: 2 additions & 1 deletion src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
const std::string &app_name = request.app_name;
const gpid &pid = request.pid;
const auto &primary_addr = request.primary;
const auto &primary_hp = request.hp_primary;
host_port primary_hp;
GET_HOST_PORT(request, primary, primary_hp);

if (err != ERR_OK) {
LOG_ERROR("app({}), partition({}) failed to receive bulk load response from node({}), "
Expand Down
10 changes: 9 additions & 1 deletion src/meta/server_state_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,18 @@ void server_state::on_query_restore_status(configuration_query_restore_rpc rpc)
for (int32_t i = 0; i < app->partition_count; i++) {
const auto &r_state = app->helpers->restore_states[i];
const auto &pc = app->pcs[i];
if (pc.hp_primary || !pc.hp_secondaries.empty()) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (primary) {
// already have primary, restore succeed
continue;
}

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
if (!secondaries.empty()) {
continue;
}
if (r_state.progress < response.restore_progress[i]) {
response.restore_progress[i] = r_state.progress;
}
Expand Down
16 changes: 12 additions & 4 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -936,10 +936,12 @@ void replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
primary_state.__set_download_progress(_download_progress.load());
primary_state.__set_download_status(_download_status.load());
}
host_port primary;
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
group_bulk_load_state,
_replica->_primary_states.pc.primary,
_replica->_primary_states.pc.hp_primary,
primary,
primary_state);
LOG_INFO_PREFIX("primary = {}, download progress = {}%, status = {}",
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),
Expand Down Expand Up @@ -978,10 +980,12 @@ void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon

partition_bulk_load_state primary_state;
primary_state.__set_ingest_status(_replica->_app->get_ingestion_status());
host_port primary;
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
group_bulk_load_state,
_replica->_primary_states.pc.primary,
_replica->_primary_states.pc.hp_primary,
primary,
primary_state);
LOG_INFO_PREFIX("primary = {}, ingestion status = {}",
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),
Expand Down Expand Up @@ -1025,10 +1029,12 @@ void replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response)

partition_bulk_load_state primary_state;
primary_state.__set_is_cleaned_up(is_cleaned_up());
host_port primary;
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
group_bulk_load_state,
_replica->_primary_states.pc.primary,
_replica->_primary_states.pc.hp_primary,
primary,
primary_state);
LOG_INFO_PREFIX("primary = {}, bulk load states cleaned_up = {}",
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),
Expand Down Expand Up @@ -1064,10 +1070,12 @@ void replica_bulk_loader::report_group_is_paused(bulk_load_response &response)

partition_bulk_load_state primary_state;
primary_state.__set_is_paused(_status == bulk_load_status::BLS_PAUSED);
host_port primary;
GET_HOST_PORT(_replica->_primary_states.pc, primary, primary);
SET_VALUE_FROM_IP_AND_HOST_PORT(response,
group_bulk_load_state,
_replica->_primary_states.pc.primary,
_replica->_primary_states.pc.hp_primary,
primary,
primary_state);
LOG_INFO_PREFIX("primary = {}, bulk_load is_paused = {}",
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary),
Expand Down
16 changes: 12 additions & 4 deletions src/replica/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,18 @@ void primary_context::get_replica_config(partition_status::type st,
bool primary_context::check_exist(const ::dsn::host_port &node, partition_status::type st)
{
switch (st) {
case partition_status::PS_PRIMARY:
return pc.hp_primary == node;
case partition_status::PS_SECONDARY:
return utils::contains(pc.hp_secondaries, node);
case partition_status::PS_PRIMARY: {
DCHECK(pc.__isset.hp_primary, "");
host_port primary;
GET_HOST_PORT(pc, primary, primary);
return primary == node;
}
case partition_status::PS_SECONDARY: {
DCHECK(pc.__isset.hp_secondaries, "");
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
return utils::contains(secondaries, node);
}
case partition_status::PS_POTENTIAL_SECONDARY:
return learners.find(node) != learners.end();
default:
Expand Down
8 changes: 6 additions & 2 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,9 @@ void replica_stub::on_node_query_reply_scatter(replica_stub_ptr this_,
req.__isset.meta_split_status ? req.meta_split_status
: split_status::NOT_SPLIT);
} else {
if (req.config.hp_primary == _primary_host_port) {
host_port primary;
GET_HOST_PORT(req.config, primary, primary);
if (primary == _primary_host_port) {
LOG_INFO("{}@{}: replica not exists on replica server, which is primary, remove it "
"from meta server",
req.config.pid,
Expand Down Expand Up @@ -1751,7 +1753,9 @@ void replica_stub::remove_replica_on_meta_server(const app_info &info,
SET_IP_AND_HOST_PORT(*request, node, primary_address(), _primary_host_port);
request->type = config_type::CT_DOWNGRADE_TO_INACTIVE;

if (_primary_host_port == pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (_primary_host_port == primary) {
RESET_IP_AND_HOST_PORT(request->config, primary);
} else if (REMOVE_IP_AND_HOST_PORT(
primary_address(), _primary_host_port, request->config, secondaries)) {
Expand Down
Loading