Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions src/replica/duplication/load_from_private_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,35 @@ void load_from_private_log::find_log_file_to_start()
// Reopen the files. Because the internal file handle of `file_map`
// is cleared once WAL replay finished. They are unable to read.
mutation_log::log_file_map_by_index new_file_map;
for (const auto &pr : file_map) {

const decree cleanable_decree = _private_log->get_cleanable_decree();
const decree max_decree_gpid = _private_log->max_decree(get_gpid());

if (max_decree_gpid <= cleanable_decree) {
LOG_ERROR_PREFIX("max_decree_gpid({}) should be > cleanable_decree({}) for plog",
max_decree_gpid,
cleanable_decree);
return;
}

for (auto it = file_map.rbegin(); it != file_map.rend(); ++it) {
log_file_ptr file;
error_s es = log_utils::open_read(pr.second->path(), file);
error_s es = log_utils::open_read(it->second->path(), file);
if (!es.is_ok()) {
LOG_ERROR_PREFIX("{}", es);
return;
}
new_file_map.emplace(pr.first, file);

new_file_map.emplace(it->first, file);

// If the max decree of a log file falls within `cleanable_decree`, the file may be deleted
// during the GC of plog files. Therefore, these files should be skipped here.
if (cleanable_decree >= file->previous_log_max_decree(get_gpid())) {
break;
}
}

find_log_file_to_start(std::move(new_file_map));
find_log_file_to_start(new_file_map);
}

void load_from_private_log::find_log_file_to_start(
Expand Down
3 changes: 1 addition & 2 deletions src/replica/duplication/load_from_private_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ class load_from_private_log final : public replica_base,
static constexpr int MAX_ALLOWED_FILE_REPEATS{10};

private:
void find_log_file_to_start(const mutation_log::log_file_map_by_index &log_files);
void find_log_file_to_start(const mutation_log::log_file_map_by_index &log_file_map);

private:
friend class load_from_private_log_test;
friend class load_fail_mode_test;
FRIEND_TEST(load_fail_mode_test, fail_skip);
Expand Down
13 changes: 13 additions & 0 deletions src/replica/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ void mutation_log::init_states()
_private_log_info = {0, 0};
_plog_max_decree_on_disk = 0;
_plog_max_commit_on_disk = 0;
_cleanable_decree = 0;
}

error_code mutation_log::open(replay_callback read_callback,
Expand Down Expand Up @@ -898,6 +899,18 @@ void mutation_log::update_max_commit_on_disk_no_lock(decree d)
}
}

decree mutation_log::get_cleanable_decree() const
{
zauto_lock l(_lock);
return _cleanable_decree;
}

void mutation_log::set_cleanable_decree(decree d)
{
zauto_lock l(_lock);
_cleanable_decree = d;
}

bool mutation_log::get_learn_state(gpid gpid, decree start, /*out*/ learn_state &state) const
{
CHECK(_is_private, "this method is only valid for private logs");
Expand Down
17 changes: 14 additions & 3 deletions src/replica/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "task/task_code.h"
#include "task/task_tracker.h"
#include "utils/autoref_ptr.h"
#include "utils/ports.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/zlocks.h"
Expand Down Expand Up @@ -301,6 +302,9 @@ class mutation_log : public ref_counter

task_tracker *tracker() { return &_tracker; }

decree get_cleanable_decree() const;
void set_cleanable_decree(decree d);

protected:
// 'size' is data size to write; the '_global_end_offset' will be updated by 'size'.
// can switch file only when create_new_log_if_needed = true;
Expand Down Expand Up @@ -400,9 +404,14 @@ class mutation_log : public ref_counter
// for plog. Since it is set with mutation.data.header.last_committed_decree, it must
// be less than _plog_max_decree_on_disk.
decree _plog_max_commit_on_disk;

// The decree threshold for private log garbage collection. Mutations with decree <= this
// value are considered cleanable and their log files may be deleted by GC.
// Protected by _lock in get/set.
decree _cleanable_decree;
};

typedef dsn::ref_ptr<mutation_log> mutation_log_ptr;
using mutation_log_ptr = dsn::ref_ptr<mutation_log>;

class mutation_log_private : public mutation_log, private replica_base
{
Expand Down Expand Up @@ -438,6 +447,9 @@ class mutation_log_private : public mutation_log, private replica_base
void flush_once() override;

private:
DISALLOW_COPY_AND_ASSIGN(mutation_log_private);
DISALLOW_MOVE_AND_ASSIGN(mutation_log_private);

// async write pending mutations into log file
// Preconditions:
// - _pending_write != nullptr
Expand All @@ -457,9 +469,8 @@ class mutation_log_private : public mutation_log, private replica_base
// if count <= 0, means flush until all data is on disk
void flush_internal(int max_count);

private:
// bufferring - only one concurrent write is allowed
typedef std::vector<mutation_ptr> mutations;
using mutations = std::vector<mutation_ptr>;
std::atomic_bool _is_writing;
// Writes that are emitted to `commit_log_block` but are not completely written.
// The weak_ptr used here is a trick. Once the pointer freed, ie.
Expand Down
1 change: 1 addition & 0 deletions src/replica/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ void replica::on_checkpoint_timer()
}
}

_private_log->set_cleanable_decree(cleanable_decree);
tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
&_tracker,
[this, plog, cleanable_decree, valid_start_offset] {
Expand Down
Loading