diff --git a/dbms/src/Common/BackgroundTask.cpp b/dbms/src/Common/BackgroundTask.cpp index a3fc69bbeee..4ea4139170f 100644 --- a/dbms/src/Common/BackgroundTask.cpp +++ b/dbms/src/Common/BackgroundTask.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include @@ -64,11 +65,12 @@ void CollectProcInfoBackgroundTask::memCheckJob() { while (!end_syn) { - // Update the memory usage of the current process. Defined in Common/MemoryTracker.cpp - auto res = get_process_mem_usage(); - real_rss = res.resident_bytes; - proc_num_threads = res.cur_proc_num_threads; - proc_virt_size = res.cur_virt_bytes; + auto mem_usage = get_process_mem_usage(); + auto metrics = get_process_metrics(); + real_rss = static_cast(mem_usage.resident_bytes); + real_rss_file = static_cast(metrics.rss_file); + proc_num_threads = mem_usage.cur_proc_num_threads; + proc_virt_size = mem_usage.cur_virt_bytes; baseline_of_query_mem_tracker = root_of_query_mem_trackers->get(); usleep(100000); // sleep 100ms } diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index 7e875babc57..d84f6c157bf 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -31,7 +31,7 @@ extern const Metric MemoryTrackingSharedColumnData; extern const Metric MemoryTrackingKVStore; } // namespace CurrentMetrics -std::atomic real_rss{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0}; +std::atomic real_rss{0}, real_rss_file{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0}; std::atomic proc_virt_size{0}; MemoryTracker::~MemoryTracker() { @@ -120,6 +120,12 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) */ Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed); reportAmount(); + auto rollback_current_alloc = [&] { + amount.fetch_sub(size, std::memory_order_relaxed); + reportAmount(); + if (!next.load(std::memory_order_relaxed)) + CurrentMetrics::sub(metric, size); + }; if (!next.load(std::memory_order_relaxed)) { @@ -133,10 +139,18 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) { Int64 current_limit = limit.load(std::memory_order_relaxed); Int64 current_accuracy_diff_for_test = accuracy_diff_for_test.load(std::memory_order_relaxed); + Int64 current_real_rss = real_rss.load(std::memory_order_relaxed); + Int64 current_real_rss_file = real_rss_file.load(std::memory_order_relaxed); + // Exclude file-backed RSS (mmap page cache) because it can be reclaimed by OS automatically. + Int64 effective_rss = current_real_rss - current_real_rss_file; + if (unlikely(effective_rss < 0)) + effective_rss = 0; if (unlikely( !next.load(std::memory_order_relaxed) && current_accuracy_diff_for_test && current_limit - && real_rss > current_accuracy_diff_for_test + current_limit)) + && effective_rss > current_accuracy_diff_for_test + current_limit)) { + rollback_current_alloc(); + DB::FmtBuffer fmt_buf; fmt_buf.append("Memory tracker accuracy "); const char * tmp_decr = description.load(); @@ -144,9 +158,12 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) fmt_buf.fmtAppend(" {}", tmp_decr); fmt_buf.fmtAppend( - ": fault injected. real_rss ({}) is much larger than limit ({}). Debug info, threads of process: {}, " + ": fault injected. effective_rss ({}, excluding rss_file {}, rss_total {}) is much larger than limit " + "({}). Debug info, threads of process: {}, " "memory usage tracked by ProcessList: peak {}, current {}. Virtual memory size: {}.", - formatReadableSizeWithBinarySuffix(real_rss), + formatReadableSizeWithBinarySuffix(effective_rss), + formatReadableSizeWithBinarySuffix(current_real_rss_file), + formatReadableSizeWithBinarySuffix(current_real_rss), formatReadableSizeWithBinarySuffix(current_limit), proc_num_threads.load(), (root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->peak) @@ -162,8 +179,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) /// In this case, it doesn't matter. if (unlikely(fault_probability && drand48() < fault_probability)) { - amount.fetch_sub(size, std::memory_order_relaxed); - reportAmount(); + rollback_current_alloc(); DB::FmtBuffer fmt_buf; fmt_buf.append("Memory tracker"); @@ -178,16 +194,19 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail()); throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded); } - Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed); - bool is_rss_too_large - = (!next.load(std::memory_order_relaxed) && current_limit - && real_rss > current_limit + current_bytes_rss_larger_than_limit - && will_be > baseline_of_query_mem_tracker); - if (is_rss_too_large || unlikely(current_limit && will_be > current_limit)) + try + { + checkRssLimitImpl(/* require_tracked_growth */ true, will_be, size); + } + catch (...) + { + rollback_current_alloc(); + throw; + } + if (unlikely(current_limit && will_be > current_limit)) { DB::GET_METRIC(tiflash_memory_exceed_quota_count).Increment(); - amount.fetch_sub(size, std::memory_order_relaxed); - reportAmount(); + rollback_current_alloc(); DB::FmtBuffer fmt_buf; fmt_buf.append("Memory limit"); @@ -195,24 +214,12 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) if (tmp_decr) fmt_buf.fmtAppend(" {}", tmp_decr); - if (!is_rss_too_large) - { // out of memory quota - fmt_buf.fmtAppend( - " exceeded caused by 'out of memory quota for data computing' : would use {} for data computing " - "(attempt to allocate chunk of {} bytes), limit of memory for data computing: {}.", - formatReadableSizeWithBinarySuffix(will_be), - size, - formatReadableSizeWithBinarySuffix(current_limit)); - } - else - { // RSS too large - fmt_buf.fmtAppend( - " exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size would " - "be {} for (attempt to allocate chunk of {} bytes), limit of memory for data computing : {}.", - formatReadableSizeWithBinarySuffix(real_rss), - size, - formatReadableSizeWithBinarySuffix(current_limit)); - } + fmt_buf.fmtAppend( + " exceeded caused by 'out of memory quota for data computing' : would use {} for data computing " + "(attempt to allocate chunk of {} bytes), limit of memory for data computing: {}.", + formatReadableSizeWithBinarySuffix(will_be), + size, + formatReadableSizeWithBinarySuffix(current_limit)); fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail()); @@ -239,6 +246,73 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) } +void MemoryTracker::checkRssLimitImpl(bool require_tracked_growth, Int64 will_be, Int64 size) const +{ + Int64 current_limit = limit.load(std::memory_order_relaxed); + if (next.load(std::memory_order_relaxed) || !current_limit) + return; + + Int64 current_real_rss = real_rss.load(std::memory_order_relaxed); + Int64 current_real_rss_file = real_rss_file.load(std::memory_order_relaxed); + // Exclude file-backed RSS (mmap page cache) because it can be reclaimed by OS automatically. + Int64 effective_rss = current_real_rss - current_real_rss_file; + if (unlikely(effective_rss < 0)) + effective_rss = 0; + + Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed); + bool is_rss_too_large = effective_rss > current_limit + current_bytes_rss_larger_than_limit; + if (require_tracked_growth) + is_rss_too_large = is_rss_too_large && will_be > baseline_of_query_mem_tracker; + + if (!is_rss_too_large) + return; + + DB::GET_METRIC(tiflash_memory_exceed_quota_count).Increment(); + + DB::FmtBuffer fmt_buf; + fmt_buf.append("Memory limit"); + const char * tmp_decr = description.load(); + if (tmp_decr) + fmt_buf.fmtAppend(" {}", tmp_decr); + + if (require_tracked_growth) + { + fmt_buf.fmtAppend( + " exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size would " + "be {} (excluding {} page cache, rss_total {}) for (attempt to allocate chunk of {} bytes), " + "limit of memory for data computing : {}.", + formatReadableSizeWithBinarySuffix(effective_rss), + formatReadableSizeWithBinarySuffix(current_real_rss_file), + formatReadableSizeWithBinarySuffix(current_real_rss), + size, + formatReadableSizeWithBinarySuffix(current_limit)); + } + else + { + fmt_buf.fmtAppend( + " exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size is {} " + "(excluding {} page cache, rss_total {}), limit of memory for data computing : {}. " + "Detected by explicit RSS limit check.", + formatReadableSizeWithBinarySuffix(effective_rss), + formatReadableSizeWithBinarySuffix(current_real_rss_file), + formatReadableSizeWithBinarySuffix(current_real_rss), + formatReadableSizeWithBinarySuffix(current_limit)); + } + + fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail()); + throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded); +} + + +void MemoryTracker::checkRssLimit() const +{ + if (auto * loaded_next = next.load(std::memory_order_relaxed)) + loaded_next->checkRssLimit(); + else + checkRssLimitImpl(/* require_tracked_growth */ false, /* will_be */ 0, /* size */ 0); +} + + void MemoryTracker::free(Int64 size) { Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size; @@ -396,6 +470,12 @@ Int64 getLocalDeltaMemory() return local_delta; } +void checkRssLimit() +{ + if (current_memory_tracker) + current_memory_tracker->checkRssLimit(); +} + void alloc(Int64 size) { checkSubmitAndUpdateLocalDelta(local_delta + size); diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index 98775a7a13b..52668b87444 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -22,7 +22,7 @@ #include #include -extern std::atomic real_rss, proc_num_threads, baseline_of_query_mem_tracker; +extern std::atomic real_rss, real_rss_file, proc_num_threads, baseline_of_query_mem_tracker; extern std::atomic proc_virt_size; namespace CurrentMetrics { @@ -79,6 +79,7 @@ class MemoryTracker : public std::enable_shared_from_this {} void reportAmount(); + void checkRssLimitImpl(bool require_tracked_growth, Int64 will_be, Int64 size) const; public: /// Using `std::shared_ptr` and `new` instread of `std::make_shared` is because `std::make_shared` cannot call private constructors. @@ -107,6 +108,10 @@ class MemoryTracker : public std::enable_shared_from_this */ void free(Int64 size); + /// Explicitly checks whether process RSS is already much larger than the configured limit. + /// Unlike alloc(0), this probe does not require tracked memory growth in the current tracker. + void checkRssLimit() const; + Int64 get() const { return amount.load(std::memory_order_relaxed); } Int64 getPeak() const { return peak.load(std::memory_order_relaxed); } @@ -184,6 +189,7 @@ namespace CurrentMemoryTracker void disableThreshold(); void submitLocalDeltaMemory(); Int64 getLocalDeltaMemory(); +void checkRssLimit(); void alloc(Int64 size); void realloc(Int64 old_size, Int64 new_size); void free(Int64 size); diff --git a/dbms/src/Interpreters/AsynchronousMetrics.cpp b/dbms/src/Interpreters/AsynchronousMetrics.cpp index 9d09febb740..bb683fb1fba 100644 --- a/dbms/src/Interpreters/AsynchronousMetrics.cpp +++ b/dbms/src/Interpreters/AsynchronousMetrics.cpp @@ -378,6 +378,9 @@ void AsynchronousMetrics::update() set("jemalloc." NAME, value); \ } while (0); + uint64_t epoch = 1; + size_t sz = sizeof(epoch); + je_mallctl("epoch", &epoch, &sz, &epoch, sz); FOR_EACH_METRIC(GET_JEMALLOC_METRIC); #undef GET_JEMALLOC_METRIC diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 443b0691000..4388a78d5d5 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -112,7 +112,7 @@ struct Settings \ M(SettingFloat, memory_tracker_fault_probability, 0., "For testing of `exception safety` - throw an exception every time you allocate memory with the specified probability.") \ \ - M(SettingInt64, memory_tracker_accuracy_diff_for_test, 0, "For testing of the accuracy of the memory tracker - throw an exception when real_rss is much larger than tracked amount.") \ + M(SettingInt64, memory_tracker_accuracy_diff_for_test, 0, "For testing of the accuracy of the memory tracker - throw an exception when effective RSS (rss - rss_file) is much larger than tracked amount.") \ \ M(SettingString, count_distinct_implementation, "uniqExact", "What aggregate function to use for implementation of count(DISTINCT ...)") \ \ diff --git a/dbms/src/Storages/StorageDisaggregatedColumnar.cpp b/dbms/src/Storages/StorageDisaggregatedColumnar.cpp index 0bd82bf41eb..bb3043128b1 100644 --- a/dbms/src/Storages/StorageDisaggregatedColumnar.cpp +++ b/dbms/src/Storages/StorageDisaggregatedColumnar.cpp @@ -15,6 +15,7 @@ #include // for ENABLE_NEXT_GEN_COLUMNAR #if ENABLE_NEXT_GEN_COLUMNAR #include +#include #include #include #include @@ -815,6 +816,9 @@ Block RNProxyInputStream::readImpl([[maybe_unused]] FilterPtr & res_filter, [[ma if (rows == 0) return {}; + // Add a memory tracker hook after reading a block from proxy. + CurrentMemoryTracker::checkRssLimit(); + TableID physical_table_id = -1; Block header = getHeader(); const ColumnsWithTypeAndName & col_type_and_name = header.getColumnsWithTypeAndName();