Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions dbms/src/Common/BackgroundTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/BackgroundTask.h>
#include <Common/MemoryAllocTrace.h>
#include <Common/MemoryTracker.h>
#include <ProcessMetrics/ProcessMetrics.h>

#include <fstream>

Expand Down Expand Up @@ -64,11 +65,12 @@ void CollectProcInfoBackgroundTask::memCheckJob()
{
while (!end_syn)
{
// Update the memory usage of the current process. Defined in Common/MemoryTracker.cpp
auto res = get_process_mem_usage();
real_rss = res.resident_bytes;
proc_num_threads = res.cur_proc_num_threads;
proc_virt_size = res.cur_virt_bytes;
auto mem_usage = get_process_mem_usage();
auto metrics = get_process_metrics();
real_rss = static_cast<Int64>(mem_usage.resident_bytes);
real_rss_file = static_cast<Int64>(metrics.rss_file);
proc_num_threads = mem_usage.cur_proc_num_threads;
proc_virt_size = mem_usage.cur_virt_bytes;
baseline_of_query_mem_tracker = root_of_query_mem_trackers->get();
usleep(100000); // sleep 100ms
}
Expand Down
122 changes: 94 additions & 28 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ extern const Metric MemoryTrackingSharedColumnData;
extern const Metric MemoryTrackingKVStore;
} // namespace CurrentMetrics

std::atomic<Int64> real_rss{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0};
std::atomic<Int64> real_rss{0}, real_rss_file{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0};
std::atomic<UInt64> proc_virt_size{0};
MemoryTracker::~MemoryTracker()
{
Expand Down Expand Up @@ -133,9 +133,15 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
{
Int64 current_limit = limit.load(std::memory_order_relaxed);
Int64 current_accuracy_diff_for_test = accuracy_diff_for_test.load(std::memory_order_relaxed);
Int64 current_real_rss = real_rss.load(std::memory_order_relaxed);
Int64 current_real_rss_file = real_rss_file.load(std::memory_order_relaxed);
// Exclude file-backed RSS (mmap page cache) because it can be reclaimed by OS automatically.
Int64 effective_rss = current_real_rss - current_real_rss_file;
if (unlikely(effective_rss < 0))
effective_rss = 0;
if (unlikely(
!next.load(std::memory_order_relaxed) && current_accuracy_diff_for_test && current_limit
&& real_rss > current_accuracy_diff_for_test + current_limit))
&& effective_rss > current_accuracy_diff_for_test + current_limit))
{
DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory tracker accuracy ");
Expand All @@ -144,9 +150,12 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
fmt_buf.fmtAppend(" {}", tmp_decr);

fmt_buf.fmtAppend(
": fault injected. real_rss ({}) is much larger than limit ({}). Debug info, threads of process: {}, "
": fault injected. effective_rss ({}, excluding rss_file {}, rss_total {}) is much larger than limit "
"({}). Debug info, threads of process: {}, "
"memory usage tracked by ProcessList: peak {}, current {}. Virtual memory size: {}.",
formatReadableSizeWithBinarySuffix(real_rss),
formatReadableSizeWithBinarySuffix(effective_rss),
formatReadableSizeWithBinarySuffix(current_real_rss_file),
formatReadableSizeWithBinarySuffix(current_real_rss),
formatReadableSizeWithBinarySuffix(current_limit),
proc_num_threads.load(),
(root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->peak)
Expand Down Expand Up @@ -178,12 +187,8 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed);
bool is_rss_too_large
= (!next.load(std::memory_order_relaxed) && current_limit
&& real_rss > current_limit + current_bytes_rss_larger_than_limit
&& will_be > baseline_of_query_mem_tracker);
if (is_rss_too_large || unlikely(current_limit && will_be > current_limit))
checkRssLimitImpl(/* require_tracked_growth */ true, will_be, size);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
if (unlikely(current_limit && will_be > current_limit))
{
DB::GET_METRIC(tiflash_memory_exceed_quota_count).Increment();
amount.fetch_sub(size, std::memory_order_relaxed);
Expand All @@ -195,24 +200,12 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
if (tmp_decr)
fmt_buf.fmtAppend(" {}", tmp_decr);

if (!is_rss_too_large)
{ // out of memory quota
fmt_buf.fmtAppend(
" exceeded caused by 'out of memory quota for data computing' : would use {} for data computing "
"(attempt to allocate chunk of {} bytes), limit of memory for data computing: {}.",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));
}
else
{ // RSS too large
fmt_buf.fmtAppend(
" exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size would "
"be {} for (attempt to allocate chunk of {} bytes), limit of memory for data computing : {}.",
formatReadableSizeWithBinarySuffix(real_rss),
size,
formatReadableSizeWithBinarySuffix(current_limit));
}
fmt_buf.fmtAppend(
" exceeded caused by 'out of memory quota for data computing' : would use {} for data computing "
"(attempt to allocate chunk of {} bytes), limit of memory for data computing: {}.",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));

fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail());

Expand All @@ -239,6 +232,73 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
}


void MemoryTracker::checkRssLimitImpl(bool require_tracked_growth, Int64 will_be, Int64 size) const
{
Int64 current_limit = limit.load(std::memory_order_relaxed);
if (next.load(std::memory_order_relaxed) || !current_limit)
return;

Int64 current_real_rss = real_rss.load(std::memory_order_relaxed);
Int64 current_real_rss_file = real_rss_file.load(std::memory_order_relaxed);
// Exclude file-backed RSS (mmap page cache) because it can be reclaimed by OS automatically.
Int64 effective_rss = current_real_rss - current_real_rss_file;
if (unlikely(effective_rss < 0))
effective_rss = 0;

Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed);
bool is_rss_too_large = effective_rss > current_limit + current_bytes_rss_larger_than_limit;
if (require_tracked_growth)
is_rss_too_large = is_rss_too_large && will_be > baseline_of_query_mem_tracker;

if (!is_rss_too_large)
return;

DB::GET_METRIC(tiflash_memory_exceed_quota_count).Increment();

DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory limit");
const char * tmp_decr = description.load();
if (tmp_decr)
fmt_buf.fmtAppend(" {}", tmp_decr);

if (require_tracked_growth)
{
fmt_buf.fmtAppend(
" exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size would "
"be {} (excluding {} page cache, rss_total {}) for (attempt to allocate chunk of {} bytes), "
"limit of memory for data computing : {}.",
formatReadableSizeWithBinarySuffix(effective_rss),
formatReadableSizeWithBinarySuffix(current_real_rss_file),
formatReadableSizeWithBinarySuffix(current_real_rss),
size,
formatReadableSizeWithBinarySuffix(current_limit));
}
else
{
fmt_buf.fmtAppend(
" exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size is {} "
"(excluding {} page cache, rss_total {}), limit of memory for data computing : {}. "
"Detected by explicit RSS limit check.",
formatReadableSizeWithBinarySuffix(effective_rss),
formatReadableSizeWithBinarySuffix(current_real_rss_file),
formatReadableSizeWithBinarySuffix(current_real_rss),
formatReadableSizeWithBinarySuffix(current_limit));
}

fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}


void MemoryTracker::checkRssLimit() const
{
if (auto * loaded_next = next.load(std::memory_order_relaxed))
loaded_next->checkRssLimit();
else
checkRssLimitImpl(/* require_tracked_growth */ false, /* will_be */ 0, /* size */ 0);
}


void MemoryTracker::free(Int64 size)
{
Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size;
Expand Down Expand Up @@ -396,6 +456,12 @@ Int64 getLocalDeltaMemory()
return local_delta;
}

void checkRssLimit()
{
if (current_memory_tracker)
current_memory_tracker->checkRssLimit();
}

void alloc(Int64 size)
{
checkSubmitAndUpdateLocalDelta(local_delta + size);
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <atomic>
#include <boost/noncopyable.hpp>

extern std::atomic<Int64> real_rss, proc_num_threads, baseline_of_query_mem_tracker;
extern std::atomic<Int64> real_rss, real_rss_file, proc_num_threads, baseline_of_query_mem_tracker;
extern std::atomic<UInt64> proc_virt_size;
namespace CurrentMetrics
{
Expand Down Expand Up @@ -79,6 +79,7 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
{}

void reportAmount();
void checkRssLimitImpl(bool require_tracked_growth, Int64 will_be, Int64 size) const;

public:
/// Using `std::shared_ptr` and `new` instread of `std::make_shared` is because `std::make_shared` cannot call private constructors.
Expand Down Expand Up @@ -107,6 +108,10 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
*/
void free(Int64 size);

/// Explicitly checks whether process RSS is already much larger than the configured limit.
/// Unlike alloc(0), this probe does not require tracked memory growth in the current tracker.
void checkRssLimit() const;

Int64 get() const { return amount.load(std::memory_order_relaxed); }

Int64 getPeak() const { return peak.load(std::memory_order_relaxed); }
Expand Down Expand Up @@ -184,6 +189,7 @@ namespace CurrentMemoryTracker
void disableThreshold();
void submitLocalDeltaMemory();
Int64 getLocalDeltaMemory();
void checkRssLimit();
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size);
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ void AsynchronousMetrics::update()
set("jemalloc." NAME, value); \
} while (0);

uint64_t epoch = 1;
size_t sz = sizeof(epoch);
je_mallctl("epoch", &epoch, &sz, &epoch, sz);
FOR_EACH_METRIC(GET_JEMALLOC_METRIC);

#undef GET_JEMALLOC_METRIC
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ struct Settings
\
M(SettingFloat, memory_tracker_fault_probability, 0., "For testing of `exception safety` - throw an exception every time you allocate memory with the specified probability.") \
\
M(SettingInt64, memory_tracker_accuracy_diff_for_test, 0, "For testing of the accuracy of the memory tracker - throw an exception when real_rss is much larger than tracked amount.") \
M(SettingInt64, memory_tracker_accuracy_diff_for_test, 0, "For testing of the accuracy of the memory tracker - throw an exception when effective RSS (rss - rss_file) is much larger than tracked amount.") \
\
M(SettingString, count_distinct_implementation, "uniqExact", "What aggregate function to use for implementation of count(DISTINCT ...)") \
\
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/StorageDisaggregatedColumnar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/config.h> // for ENABLE_NEXT_GEN_COLUMNAR
#if ENABLE_NEXT_GEN_COLUMNAR
#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
#include <Common/MyTime.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadManager.h>
Expand Down Expand Up @@ -845,6 +846,9 @@ Block RNProxyInputStream::readImpl([[maybe_unused]] FilterPtr & res_filter, [[ma
if (rows == 0)
return {};

// Add a memory tracker hook after reading a block from proxy.
CurrentMemoryTracker::checkRssLimit();

TableID physical_table_id = -1;
Block header = getHeader();
const ColumnsWithTypeAndName col_type_and_name = header.getColumnsWithTypeAndName();
Expand Down