diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index e306e270f8d6..43a49de4dc87 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -92,6 +92,7 @@ M(IcebergMetadataFilesCacheMisses, "Number of times iceberg metadata files have not been found in the iceberg metadata cache and had to be read from (remote) disk.", ValueType::Number) \ M(IcebergMetadataFilesCacheWeightLost, "Approximate number of bytes evicted from the iceberg metadata cache.", ValueType::Number) \ M(IcebergMetadataReadWaitTimeMicroseconds, "Total time data readers spend waiting for iceberg metadata files to be read and parsed, summed across all reader threads.", ValueType::Microseconds) \ + M(IcebergManifestFilesParallelFetchMicroseconds, "Wall-clock time saved by parallel manifest file fetching: time from submitting the first parallel fetch until the last future is consumed, measured once per iterator initialization.", ValueType::Microseconds) \ M(IcebergIteratorInitializationMicroseconds, "Total time spent on synchronous initialization of iceberg data iterators.", ValueType::Microseconds) \ M(IcebergMetadataUpdateMicroseconds, "Total time spent on synchronous initialization of iceberg data iterators.", ValueType::Microseconds) \ M(IcebergMetadataReturnedObjectInfos, "Total number of returned object infos from iceberg iterator.", ValueType::Number) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 2503e5bb3559..adbcedfa48a1 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -4910,7 +4910,15 @@ Possible values: - 0 - Disabled - 1 - Enabled )", 0) \ - \ + DECLARE(UInt64, iceberg_metadata_files_parallel_loading_threads, 8, R"( +Number of threads used to fetch Iceberg manifest files in parallel during query planning. + +When greater than 1, manifest files listed in the manifest list are fetched concurrently from object storage, reducing cold-cache query latency proportionally to the number of manifests. Each thread calls `getManifestFile` which is backed by `IcebergMetadataFilesCache` with singleflight protection, so concurrent requests for the same key do not cause duplicate S3 fetches. + +Setting this to 1 disables parallel fetching and restores fully sequential behavior, which is useful for debugging or reproducing exact serial timing. + +Valid range: 1–64. +)", 0) \ DECLARE(Bool, use_query_cache, false, R"( If turned on, `SELECT` queries may utilize the [query cache](../query-cache.md). Parameters [enable_reads_from_query_cache](#enable_reads_from_query_cache) and [enable_writes_to_query_cache](#enable_writes_to_query_cache) control in more detail how the cache is used. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index e9ac89e0b237..78296c52b59b 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -77,6 +77,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() }); addSettingsChanges(settings_changes_history, "25.8", { + {"iceberg_metadata_files_parallel_loading_threads", 1, 8, "New setting: number of threads to fetch Iceberg manifest files in parallel during query planning."}, {"output_format_json_quote_64bit_integers", true, false, "Disable quoting of the 64 bit integers in JSON by default"}, {"show_data_lake_catalogs_in_system_tables", true, true, "New setting"}, {"optimize_rewrite_regexp_functions", false, true, "A new setting"}, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index dcb4c48f73a3..6da36dcf3805 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -2,6 +2,7 @@ #if USE_AVRO #include +#include #include #include #include @@ -22,6 +23,7 @@ #include #include #include +#include #include #include @@ -62,6 +64,7 @@ extern const Event IcebergMinMaxIndexPrunedFiles; extern const Event IcebergMetadataReadWaitTimeMicroseconds; extern const Event IcebergMetadataReturnedObjectInfos; extern const Event IcebergIteratorNextMicroseconds; +extern const Event IcebergManifestFilesParallelFetchMicroseconds; }; @@ -75,6 +78,7 @@ extern const int LOGICAL_ERROR; namespace Setting { extern const SettingsBool use_iceberg_partition_pruning; +extern const SettingsUInt64 iceberg_metadata_files_parallel_loading_threads; }; @@ -118,6 +122,83 @@ defineDeletesSpan(ManifestFileEntry data_object_, const std::vectormanifest_list_entries.size(); + if (n == 0) + return; + + auto configuration_locked = configuration.lock(); + + ThreadPool & pool = getIOThreadPool().get(); + auto thread_group = CurrentThread::getGroup(); + + /// Reserve to avoid reallocation: std::future is move-only, and vector reallocation + /// with non-noexcept move + no copy would cause a compile error on some STL versions. + prefetch_entries.reserve(n); + + /// Submit one getManifestFile task per matching manifest, in manifest_list order. + /// CacheBase::getOrSet has singleflight protection — concurrent callers for the same + /// key serialize on a per-key token mutex, so we don't cause N duplicate S3 GETs. + for (size_t i = 0; i < n; ++i) + { + const auto & entry = data_snapshot->manifest_list_entries[i]; + if (entry.content_type != manifest_file_content_type) + continue; + + auto promise = std::make_shared>(); + prefetch_entries.push_back({i, promise->get_future()}); + + /// Capture all data by value. object_storage, local_context and the shared_ptrs + /// inside persistent_components are all reference-counted, so the lambda keeps + /// them alive even if SingleThreadIcebergKeysIterator is destroyed while a task + /// is still running. + pool.scheduleOrThrowOnError( + [promise_cap = std::move(promise), + object_storage_cap = this->object_storage, + configuration_cap = configuration_locked, + persistent_components_cap = this->persistent_components, + local_context_cap = this->local_context, + log_cap = this->log, + abs_path = entry.manifest_file_absolute_path, + seq_num = entry.added_sequence_number, + snap_id = entry.added_snapshot_id, + secondary_storages_cap = this->secondary_storages, + thread_group_cap = thread_group]() mutable + { + DB::ThreadGroupSwitcher switcher(thread_group_cap, "IcebergMFetch"); + try + { + auto result = Iceberg::getManifestFile( + object_storage_cap, + configuration_cap, + persistent_components_cap, + local_context_cap, + log_cap, + abs_path, + seq_num, + snap_id, + *secondary_storages_cap); + promise_cap->set_value(std::move(result)); + } + catch (...) + { + promise_cap->set_exception(std::current_exception()); + } + }); + } + + LOG_DEBUG( + log, + "Submitted {} parallel manifest file fetch tasks ({} total entries, thread_count={})", + prefetch_entries.size(), + n, + parallel_loading_threads); +} + std::optional SingleThreadIcebergKeysIterator::next() { ProfileEventTimeIncrement watch(ProfileEvents::IcebergIteratorNextMicroseconds); @@ -127,26 +208,60 @@ std::optional SingleThreadIcebergKeysIterator::next() return std::nullopt; } - while (manifest_file_index < data_snapshot->manifest_list_entries.size()) + /// Lazy initialization of parallel prefetch on the first next() call. + if (!prefetch_initialized) + { + prefetch_initialized = true; + if (parallel_loading_threads > 1) + initParallelPrefetch(); + } + + while (true) { if (!current_manifest_file_content) { - if (persistent_components.format_version > 1 && data_snapshot->manifest_list_entries[manifest_file_index].content_type != manifest_file_content_type) + if (parallel_loading_threads > 1) { - ++manifest_file_index; - continue; + /// Parallel path: consume the next pre-fetched manifest future. + if (prefetch_consume_pos >= prefetch_entries.size()) + return std::nullopt; + + auto & [manifest_idx, fut] = prefetch_entries[prefetch_consume_pos++]; + ManifestFilePtr result; + { + ProfileEventTimeIncrement wait_watch(ProfileEvents::IcebergManifestFilesParallelFetchMicroseconds); + result = fut.get(); + } + current_manifest_file_content = std::move(result); + internal_data_index = 0; + } + else + { + /// Serial path: find the next manifest with matching content type. + while (manifest_file_index < data_snapshot->manifest_list_entries.size()) + { + if (persistent_components.format_version > 1 && + data_snapshot->manifest_list_entries[manifest_file_index].content_type != manifest_file_content_type) + { + ++manifest_file_index; + continue; + } + current_manifest_file_content = Iceberg::getManifestFile( + object_storage, + configuration.lock(), + persistent_components, + local_context, + log, + data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_absolute_path, + data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number, + data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id, + *secondary_storages); + internal_data_index = 0; + break; + } + if (!current_manifest_file_content) + return std::nullopt; } - current_manifest_file_content = Iceberg::getManifestFile( - object_storage, - configuration.lock(), - persistent_components, - local_context, - log, - data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_absolute_path, - data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number, - data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id, - *secondary_storages); - internal_data_index = 0; } auto files = files_generator(current_manifest_file_content); while (internal_data_index < files.size()) @@ -215,6 +330,19 @@ SingleThreadIcebergKeysIterator::~SingleThreadIcebergKeysIterator() ProfileEvents::increment(ProfileEvents::IcebergPartitionPrunedFiles, partition_pruned_files); if (min_max_index_pruned_files > 0) ProfileEvents::increment(ProfileEvents::IcebergMinMaxIndexPrunedFiles, min_max_index_pruned_files); + + /// Wait for any in-flight prefetch tasks that were not yet consumed by next(). + /// The lambda captures shared_ptrs by value so the objects remain alive, but we + /// must wait to prevent the thread-pool thread from writing to a promise whose + /// shared state we no longer observe. + for (size_t i = prefetch_consume_pos; i < prefetch_entries.size(); ++i) + { + if (prefetch_entries[i].future.valid()) + { + try { prefetch_entries[i].future.wait(); } + catch (...) {} // NOLINT: intentional swallow in destructor + } + } } SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator( @@ -251,6 +379,8 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator( , log(getLogger("IcebergIterator")) , secondary_storages(secondary_storages_) , manifest_file_content_type(manifest_file_content_type_) + , parallel_loading_threads( + local_context_ ? local_context_->getSettingsRef()[Setting::iceberg_metadata_files_parallel_loading_threads].value : 1) { } @@ -303,6 +433,10 @@ IcebergIterator::IcebergIterator( , table_schema_id(table_snapshot_->schema_id) , secondary_storages(secondary_storages_) { + /// Drain the delete manifests. With parallel_loading_threads > 1, the first call to + /// deletes_iterator.next() triggers initParallelPrefetch() which fans out all delete + /// manifest fetches to the IO thread pool simultaneously. Subsequent next() calls + /// consume the pre-fetched futures in order, so the serial drain loop below is fast. auto delete_file = deletes_iterator.next(); while (delete_file.has_value()) { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h index 1ee3a9239cdf..e7c135c000fd 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -54,6 +55,8 @@ class SingleThreadIcebergKeysIterator ~SingleThreadIcebergKeysIterator(); private: + void initParallelPrefetch(); + ObjectStoragePtr object_storage; std::shared_ptr filter_dag; ContextPtr local_context; @@ -66,7 +69,7 @@ class SingleThreadIcebergKeysIterator LoggerPtr log; std::shared_ptr secondary_storages; - // By Iceberg design it is difficult to avoid storing position deletes in memory. + /// Serial iteration state (used when parallel_loading_threads == 1) size_t manifest_file_index = 0; size_t internal_data_index = 0; Iceberg::ManifestFilePtr current_manifest_file_content; @@ -77,6 +80,18 @@ class SingleThreadIcebergKeysIterator size_t min_max_index_pruned_files = 0; size_t partition_pruned_files = 0; + + /// Parallel prefetch state (used when parallel_loading_threads > 1) + UInt64 parallel_loading_threads = 1; + + struct PrefetchEntry + { + size_t manifest_list_index; + std::future future; + }; + std::vector prefetch_entries; + size_t prefetch_consume_pos = 0; + bool prefetch_initialized = false; }; } diff --git a/tests/integration/test_storage_iceberg/test_parallel_manifest_loading.py b/tests/integration/test_storage_iceberg/test_parallel_manifest_loading.py new file mode 100644 index 000000000000..3bc9fa06a26b --- /dev/null +++ b/tests/integration/test_storage_iceberg/test_parallel_manifest_loading.py @@ -0,0 +1,176 @@ +""" +Integration test for parallel Iceberg manifest file loading +(iceberg_metadata_files_parallel_loading_threads setting). + +Creates a table with 30 manifest files and verifies that parallel loading +(threads > 1) produces identical results to serial loading (threads = 1). + +Timing improvement of parallel over serial requires real S3 latency to be +observable and is therefore validated via unit/profiling tests, not here. +""" + +import pytest +import time + +from helpers.iceberg_utils import create_iceberg_table, get_uuid_str + + +NUM_MANIFESTS = 30 + + +@pytest.mark.parametrize("format_version", [2]) +@pytest.mark.parametrize("storage_type", ["local"]) +def test_parallel_manifest_loading_correctness( + started_cluster, format_version, storage_type +): + """Parallel and serial manifest loading must produce identical results.""" + instance = started_cluster.instances["node1"] + TABLE_NAME = "test_parallel_manifest_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster, + "(id UInt32, val String)", + format_version, + ) + + # Insert NUM_MANIFESTS separate batches — each INSERT produces one manifest file. + for i in range(NUM_MANIFESTS): + instance.query( + f"INSERT INTO {TABLE_NAME} VALUES ({i}, 'row_{i}')", + settings={"allow_experimental_insert_into_iceberg": 1}, + ) + + # ── Serial baseline ────────────────────────────────────────────────────── + serial_settings = { + "iceberg_metadata_files_parallel_loading_threads": 1, + "use_iceberg_metadata_files_cache": 0, # force cold read each time + } + + serial_count = int( + instance.query(f"SELECT count() FROM {TABLE_NAME}", settings=serial_settings) + ) + serial_sum = int( + instance.query(f"SELECT sum(id) FROM {TABLE_NAME}", settings=serial_settings) + ) + serial_rows = instance.query( + f"SELECT id, val FROM {TABLE_NAME} ORDER BY id", + settings=serial_settings, + ) + + # ── Parallel run ───────────────────────────────────────────────────────── + parallel_settings = { + "iceberg_metadata_files_parallel_loading_threads": 16, + "use_iceberg_metadata_files_cache": 0, + } + + parallel_count = int( + instance.query(f"SELECT count() FROM {TABLE_NAME}", settings=parallel_settings) + ) + parallel_sum = int( + instance.query(f"SELECT sum(id) FROM {TABLE_NAME}", settings=parallel_settings) + ) + parallel_rows = instance.query( + f"SELECT id, val FROM {TABLE_NAME} ORDER BY id", + settings=parallel_settings, + ) + + # ── Assertions ─────────────────────────────────────────────────────────── + assert serial_count == NUM_MANIFESTS, ( + f"Expected {NUM_MANIFESTS} rows (one per manifest), got {serial_count}" + ) + assert parallel_count == serial_count, ( + f"count mismatch: serial={serial_count}, parallel={parallel_count}" + ) + + expected_sum = NUM_MANIFESTS * (NUM_MANIFESTS - 1) // 2 + assert serial_sum == expected_sum, ( + f"sum mismatch: expected {expected_sum}, serial={serial_sum}" + ) + assert parallel_sum == serial_sum, ( + f"sum mismatch: serial={serial_sum}, parallel={parallel_sum}" + ) + + assert parallel_rows == serial_rows, ( + f"Row content mismatch between serial and parallel loading.\n" + f"Serial:\n{serial_rows}\nParallel:\n{parallel_rows}" + ) + + +@pytest.mark.parametrize("format_version", [2]) +@pytest.mark.parametrize("storage_type", ["local"]) +def test_parallel_manifest_loading_profile_event( + started_cluster, format_version, storage_type +): + """ + With parallel_loading_threads > 1, the IcebergManifestFilesParallelFetchMicroseconds + ProfileEvent must be incremented (confirming the parallel code path was taken). + """ + instance = started_cluster.instances["node1"] + TABLE_NAME = "test_parallel_event_" + storage_type + "_" + get_uuid_str() + + create_iceberg_table( + storage_type, + instance, + TABLE_NAME, + started_cluster, + "(id UInt32)", + format_version, + ) + + for i in range(10): + instance.query( + f"INSERT INTO {TABLE_NAME} VALUES ({i})", + settings={"allow_experimental_insert_into_iceberg": 1}, + ) + + result = instance.query( + f""" + SELECT ProfileEvents['IcebergManifestFilesParallelFetchMicroseconds'] > 0 AS parallel_triggered + FROM system.query_log + WHERE + query LIKE '%SELECT count() FROM {TABLE_NAME}%' + AND type = 'QueryFinish' + ORDER BY event_time DESC + LIMIT 1 + """, + # Run the query we're going to observe first + settings={ + "iceberg_metadata_files_parallel_loading_threads": 8, + "use_iceberg_metadata_files_cache": 0, + "log_queries": 1, + }, + ) + + # Run the actual query first, then check query_log + instance.query( + f"SELECT count() FROM {TABLE_NAME}", + settings={ + "iceberg_metadata_files_parallel_loading_threads": 8, + "use_iceberg_metadata_files_cache": 0, + "log_queries": 1, + }, + ) + + # Give query_log a moment to flush + time.sleep(1) + instance.query("SYSTEM FLUSH LOGS") + + parallel_triggered = instance.query( + f""" + SELECT ProfileEvents['IcebergManifestFilesParallelFetchMicroseconds'] > 0 + FROM system.query_log + WHERE + query LIKE '%SELECT count() FROM {TABLE_NAME}%' + AND type = 'QueryFinish' + ORDER BY event_time DESC + LIMIT 1 + """ + ).strip() + + assert parallel_triggered == "1", ( + "IcebergManifestFilesParallelFetchMicroseconds was not incremented — " + "the parallel code path was not taken." + ) diff --git a/tests/queries/0_stateless/04075_iceberg_parallel_manifest_loading.reference b/tests/queries/0_stateless/04075_iceberg_parallel_manifest_loading.reference new file mode 100644 index 000000000000..b26da9873f9b --- /dev/null +++ b/tests/queries/0_stateless/04075_iceberg_parallel_manifest_loading.reference @@ -0,0 +1,3 @@ +count OK: 30 +sum OK: 465 +rows OK diff --git a/tests/queries/0_stateless/04075_iceberg_parallel_manifest_loading.sh b/tests/queries/0_stateless/04075_iceberg_parallel_manifest_loading.sh new file mode 100755 index 000000000000..849ec1aad828 --- /dev/null +++ b/tests/queries/0_stateless/04075_iceberg_parallel_manifest_loading.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash +# Tags: no-fasttest +# Regression test for parallel Iceberg manifest file loading. +# Verifies that `iceberg_metadata_files_parallel_loading_threads` > 1 produces +# identical results to serial loading (threads=1) across count, sum, and full +# row-level comparisons. + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +TABLE="t_parallel_manifest_${CLICKHOUSE_DATABASE}" +TABLE_PATH="${USER_FILES_PATH}/${TABLE}/" + +cleanup() +{ + ${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS ${TABLE}" + rm -rf "${TABLE_PATH}" +} +trap cleanup EXIT +cleanup + +${CLICKHOUSE_CLIENT} --query " + CREATE TABLE ${TABLE} (id UInt32, val String) + ENGINE = IcebergLocal('${TABLE_PATH}', 'Parquet') +" + +# 30 separate inserts → 30 manifest files, one data file each. +for i in $(seq 1 30); do + ${CLICKHOUSE_CLIENT} --allow_insert_into_iceberg=1 \ + --query "INSERT INTO ${TABLE} VALUES (${i}, 'row_${i}')" +done + +# Serial baseline +SERIAL_COUNT=$(${CLICKHOUSE_CLIENT} \ + --iceberg_metadata_files_parallel_loading_threads=1 \ + --query "SELECT count() FROM icebergLocal('${TABLE_PATH}')") + +SERIAL_SUM=$(${CLICKHOUSE_CLIENT} \ + --iceberg_metadata_files_parallel_loading_threads=1 \ + --query "SELECT sum(id) FROM icebergLocal('${TABLE_PATH}')") + +SERIAL_ROWS=$(${CLICKHOUSE_CLIENT} \ + --iceberg_metadata_files_parallel_loading_threads=1 \ + --query "SELECT id, val FROM icebergLocal('${TABLE_PATH}') ORDER BY id") + +# Parallel run (threads=16) +PARALLEL_COUNT=$(${CLICKHOUSE_CLIENT} \ + --iceberg_metadata_files_parallel_loading_threads=16 \ + --query "SELECT count() FROM icebergLocal('${TABLE_PATH}')") + +PARALLEL_SUM=$(${CLICKHOUSE_CLIENT} \ + --iceberg_metadata_files_parallel_loading_threads=16 \ + --query "SELECT sum(id) FROM icebergLocal('${TABLE_PATH}')") + +PARALLEL_ROWS=$(${CLICKHOUSE_CLIENT} \ + --iceberg_metadata_files_parallel_loading_threads=16 \ + --query "SELECT id, val FROM icebergLocal('${TABLE_PATH}') ORDER BY id") + +# Verify counts match +if [ "${SERIAL_COUNT}" = "${PARALLEL_COUNT}" ]; then + echo "count OK: ${SERIAL_COUNT}" +else + echo "FAIL count: serial=${SERIAL_COUNT} parallel=${PARALLEL_COUNT}" +fi + +# Verify sums match +if [ "${SERIAL_SUM}" = "${PARALLEL_SUM}" ]; then + echo "sum OK: ${SERIAL_SUM}" +else + echo "FAIL sum: serial=${SERIAL_SUM} parallel=${PARALLEL_SUM}" +fi + +# Verify full row content matches +if [ "${SERIAL_ROWS}" = "${PARALLEL_ROWS}" ]; then + echo "rows OK" +else + echo "FAIL rows differ" + echo "--- serial ---" + echo "${SERIAL_ROWS}" + echo "--- parallel ---" + echo "${PARALLEL_ROWS}" +fi