diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index b83852ad9a54..79eb5a888bf0 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -665,6 +665,8 @@ M(1003, SSH_EXCEPTION) \ M(1004, STARTUP_SCRIPTS_ERROR) \ M(1005, PENDING_MUTATIONS_NOT_ALLOWED) \ + M(1006, EXPORT_PARTITION_ALREADY_EXPORTED) \ + M(1007, PARTITION_EXPORT_FAILED) \ /* See END */ #ifdef APPLY_FOR_EXTERNAL_ERROR_CODES @@ -681,7 +683,7 @@ namespace ErrorCodes APPLY_FOR_ERROR_CODES(M) #undef M - constexpr ErrorCode END = 1005; + constexpr ErrorCode END = 1007; ErrorPairHolder values[END + 1]{}; struct ErrorCodesNames diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index c9994afb205e..028fbd334d12 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7575,6 +7575,14 @@ On the other hand, there is a chance once the task executes that part has alread DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, false, R"( Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information. Querying ZooKeeper is expensive, and only available if the ZooKeeper feature flag MULTI_READ is enabled. +)", 0) \ + DECLARE(ExportPartitionAllOnError, export_merge_tree_partition_all_on_error, ExportPartitionAllOnError::throw_first, R"( +Failure handling for `ALTER TABLE ... EXPORT PARTITION ALL ...`. +Possible values: +- `throw_first` (default) - stop at the first failed partition; partitions already scheduled remain scheduled. +- `collect` - try every partition and throw a single aggregated exception at the end if any failed; partitions that succeeded remain scheduled. +- `skip_conflicts` - silently skip partitions that are already exported / being exported (errors with code EXPORT_PARTITION_ALREADY_EXPORTED); fail-fast on every other error. +Has no effect on `EXPORT PARTITION ` (single-partition export). )", 0) \ DECLARE(String, export_merge_tree_part_filename_pattern, "{part_name}_{checksum}", R"( Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 7b9f64f33e67..312d7a1f706f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -118,6 +118,7 @@ class WriteBuffer; M(CLASS_NAME, JoinOrderAlgorithm) \ M(CLASS_NAME, DeduplicateInsertSelectMode) \ M(CLASS_NAME, DeduplicateInsertMode) \ + M(CLASS_NAME, ExportPartitionAllOnError) \ COMMON_SETTINGS_SUPPORTED_TYPES(Settings, DECLARE_SETTING_TRAIT) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index d785ceaf7f68..f9f3b3ab836c 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -333,6 +333,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."}, {"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."}, {"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."}, + {"export_merge_tree_partition_all_on_error", "throw_first", "throw_first", "New setting."}, {"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."}, {"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."}, {"object_storage_cluster", "", "", "Antalya: New setting"}, diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index c2b1a37f5c5a..bae824f4b32e 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -482,4 +482,6 @@ IMPLEMENT_SETTING_ENUM(JemallocProfileFormat, ErrorCodes::BAD_ARGUMENTS, IMPLEMENT_SETTING_AUTO_ENUM(MergeTreePartExportFileAlreadyExistsPolicy, ErrorCodes::BAD_ARGUMENTS); +IMPLEMENT_SETTING_AUTO_ENUM(ExportPartitionAllOnError, ErrorCodes::BAD_ARGUMENTS); + } diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 240d294188f9..9585540354a4 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -573,5 +573,13 @@ enum class MergeTreePartExportFileAlreadyExistsPolicy : uint8_t DECLARE_SETTING_ENUM(MergeTreePartExportFileAlreadyExistsPolicy) +enum class ExportPartitionAllOnError : uint8_t +{ + throw_first, + collect, + skip_conflicts, +}; + +DECLARE_SETTING_ENUM(ExportPartitionAllOnError) } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 2edf653902f9..32edb7e2162b 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6360,8 +6360,11 @@ void MergeTreeData::checkAlterPartitionIsPossible( const auto * partition_ast = command.partition->as(); if (partition_ast && partition_ast->all) { - if (command.type != PartitionCommand::DROP_PARTITION && command.type != PartitionCommand::ATTACH_PARTITION && !(command.type == PartitionCommand::REPLACE_PARTITION && !command.replace)) - throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently"); + if (command.type != PartitionCommand::DROP_PARTITION + && command.type != PartitionCommand::ATTACH_PARTITION + && command.type != PartitionCommand::EXPORT_PARTITION + && !(command.type == PartitionCommand::REPLACE_PARTITION && !command.replace)) + throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH/EXPORT PARTITION ALL currently"); } else { diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 340a2076ba3b..56315fb1f9c1 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -225,6 +225,7 @@ namespace Setting extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations; extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts; extern const SettingsBool export_merge_tree_partition_lock_inside_the_task; + extern const SettingsExportPartitionAllOnError export_merge_tree_partition_all_on_error; extern const SettingsString export_merge_tree_part_filename_pattern; extern const SettingsBool write_full_path_in_iceberg_metadata; extern const SettingsBool allow_insert_into_iceberg; @@ -341,6 +342,8 @@ namespace ErrorCodes extern const int TIMEOUT_EXCEEDED; extern const int INVALID_SETTING_VALUE; extern const int PENDING_MUTATIONS_NOT_ALLOWED; + extern const int EXPORT_PARTITION_ALREADY_EXPORTED; + extern const int PARTITION_EXPORT_FAILED; } namespace ServerSetting @@ -8296,6 +8299,82 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & "If you are exporting to an Apache Iceberg table, you also need to enable the setting `allow_experimental_insert_into_iceberg` on all replicas. The same goes for `allow_experimental_export_merge_tree_part`"); } + /// EXPORT PARTITION ALL: expand into one sub-call per active partition id. + /// Failure handling is controlled by `export_merge_tree_partition_all_on_error`. + if (const auto * partition_ast = command.partition->as(); partition_ast && partition_ast->all) + { + auto partition_id_set = getAllPartitionIds(); + if (partition_id_set.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Table {} has no active partitions to export", + getStorageID().getNameForLogs()); + + /// Sort for deterministic ordering (so failure messages and tests are stable). + std::vector partition_ids(partition_id_set.begin(), partition_id_set.end()); + std::sort(partition_ids.begin(), partition_ids.end()); + + const auto & on_error_setting = query_context->getSettingsRef()[Setting::export_merge_tree_partition_all_on_error]; + const ExportPartitionAllOnError on_error = on_error_setting.value; + + LOG_INFO(log, "EXPORT PARTITION ALL: scheduling export for {} partitions, on_error={}", + partition_ids.size(), on_error_setting.toString()); + + std::vector> failures; /// (partition_id, message) + size_t skipped_conflicts = 0; + + for (const auto & partition_id : partition_ids) + { + PartitionCommand sub = command; + auto synthetic = make_intrusive(); + synthetic->setPartitionID(make_intrusive(partition_id)); + sub.partition = synthetic; + + try + { + exportPartitionToTable(sub, query_context); + } + catch (const Exception & e) + { + switch (on_error) + { + case ExportPartitionAllOnError::throw_first: + throw; + case ExportPartitionAllOnError::skip_conflicts: + if (e.code() == ErrorCodes::EXPORT_PARTITION_ALREADY_EXPORTED) + { + ++skipped_conflicts; + LOG_INFO(log, + "EXPORT PARTITION ALL: skipping partition {} (already exported / concurrent): {}", + partition_id, e.message()); + break; + } + throw; + case ExportPartitionAllOnError::collect: + LOG_WARNING(log, "EXPORT PARTITION ALL: partition {} failed: {}", + partition_id, e.message()); + failures.emplace_back(partition_id, e.message()); + break; + } + } + } + + if (!failures.empty()) + { + String aggregated = fmt::format( + "EXPORT PARTITION ALL: {}/{} partitions failed to schedule. Per-partition errors:", + failures.size(), partition_ids.size()); + for (const auto & [pid, msg] : failures) + aggregated += fmt::format("\n {}: {}", pid, msg); + throw Exception(ErrorCodes::PARTITION_EXPORT_FAILED, "{}", aggregated); + } + + if (skipped_conflicts > 0) + LOG_INFO(log, "EXPORT PARTITION ALL: skipped {} partitions due to existing exports", + skipped_conflicts); + + return; + } + const auto dest_database = query_context->resolveDatabase(command.to_database); const auto dest_table = command.to_table; const auto dest_storage_id = StorageID(dest_database, dest_table); @@ -8375,7 +8454,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & if (!has_expired && !query_context->getSettingsRef()[Setting::export_merge_tree_partition_force_export]) { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Export with key {} already exported or it is being exported, and it has not expired. Set `export_merge_tree_partition_force_export` to overwrite it.", export_key); + throw Exception(ErrorCodes::EXPORT_PARTITION_ALREADY_EXPORTED, "Export with key {} already exported or it is being exported, and it has not expired. Set `export_merge_tree_partition_force_export` to overwrite it.", export_key); } LOG_INFO(log, "Overwriting export with key {}", export_key); @@ -8573,7 +8652,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & { /// Lost the race on the root export node. Current code already /// validated (exists / expired / force) — so this is *always* a race. - throw Exception(ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::EXPORT_PARTITION_ALREADY_EXPORTED, "Export with key {} was created concurrently by another replica. Retry if needed", export_key); } diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index af6ca75bafd7..574296f90c49 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -196,6 +196,32 @@ def test_export_two_partitions_to_iceberg(cluster): assert count_2021 == 1, f"Expected 1 row for year=2021, got {count_2021}" +def test_export_partition_all_to_iceberg(cluster): + """ + `ALTER TABLE ... EXPORT PARTITION ALL TO TABLE ...` schedules every active partition + in one statement and exercises the Iceberg-specific destination compatibility checks + (which are repeated per sub-call inside the loop). + """ + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_{uid}" + iceberg_table = f"iceberg_{uid}" + + setup_tables(cluster, mt_table, iceberg_table, nodes=["replica1"]) + + node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {iceberg_table}") + + wait_for_export_status(node, mt_table, iceberg_table, "2020", "COMPLETED") + wait_for_export_status(node, mt_table, iceberg_table, "2021", "COMPLETED") + + count_2020 = int(node.query(f"SELECT count() FROM {iceberg_table} WHERE year = 2020").strip()) + count_2021 = int(node.query(f"SELECT count() FROM {iceberg_table} WHERE year = 2021").strip()) + + assert count_2020 == 3, f"Expected 3 rows for year=2020, got {count_2020}" + assert count_2021 == 1, f"Expected 1 row for year=2021, got {count_2021}" + + def test_failure_is_logged_in_system_table(cluster): """ When S3 is unreachable the export must be marked FAILED in diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index a5e60f81194e..852e8cf78c72 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -1506,3 +1506,100 @@ def test_export_partition_resumes_after_stop_moves_during_export(cluster): row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip()) assert row_count == 3, f"Expected 3 rows in S3 after export completed, got {row_count}" + + +def test_export_partition_all(cluster): + """Happy path for `ALTER TABLE ... EXPORT PARTITION ALL TO TABLE ...`. + + Schedules one export task per active partition in a single ALTER, then + verifies every partition lands in the destination S3 table. + """ + node = cluster.instances["replica1"] + + uid = str(uuid.uuid4()).replace("-", "_") + mt_table = f"export_all_mt_{uid}" + s3_table = f"export_all_s3_{uid}" + + node.query( + f"CREATE TABLE {mt_table} (id UInt64, year UInt16)" + f" ENGINE = ReplicatedMergeTree('/clickhouse/tables/{mt_table}', 'replica1')" + f" PARTITION BY year ORDER BY tuple()" + ) + node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2021), (3, 2022)") + create_s3_table(node, s3_table) + + node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {s3_table}") + + for partition_id in ("2020", "2021", "2022"): + wait_for_export_status(node, mt_table, s3_table, partition_id, "COMPLETED", timeout=60) + + row_count = int(node.query(f"SELECT count() FROM {s3_table}").strip()) + assert row_count == 3, f"Expected 3 rows in S3 after EXPORT PARTITION ALL, got {row_count}" + + +def test_export_partition_all_failure_modes(cluster): + """Cover the three values of `export_merge_tree_partition_all_on_error`. + + Set up an already-fully-exported source table, then re-run EXPORT PARTITION ALL + with each failure mode and assert the documented behavior. + """ + node = cluster.instances["replica1"] + + uid = str(uuid.uuid4()).replace("-", "_") + mt_table = f"export_all_modes_mt_{uid}" + s3_table = f"export_all_modes_s3_{uid}" + empty_mt = f"export_all_empty_mt_{uid}" + + node.query( + f"CREATE TABLE {mt_table} (id UInt64, year UInt16)" + f" ENGINE = ReplicatedMergeTree('/clickhouse/tables/{mt_table}', 'replica1')" + f" PARTITION BY year ORDER BY tuple()" + ) + node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2021), (3, 2022)") + create_s3_table(node, s3_table) + + # First run: schedule + wait for all partitions to complete. + node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {s3_table}") + for partition_id in ("2020", "2021", "2022"): + wait_for_export_status(node, mt_table, s3_table, partition_id, "COMPLETED", timeout=60) + + # Empty table: throws BAD_ARGUMENTS (no active partitions). + node.query( + f"CREATE TABLE {empty_mt} (id UInt64, year UInt16)" + f" ENGINE = ReplicatedMergeTree('/clickhouse/tables/{empty_mt}', 'replica1')" + f" PARTITION BY year ORDER BY tuple()" + ) + error = node.query_and_get_error( + f"ALTER TABLE {empty_mt} EXPORT PARTITION ALL TO TABLE {s3_table}" + ) + assert "no active partitions to export" in error, ( + f"Expected 'no active partitions' error, got: {error}" + ) + + # throw_first (default): re-run aborts on the first conflicting partition. + error = node.query_and_get_error( + f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {s3_table}" + f" SETTINGS export_merge_tree_partition_all_on_error = 'throw_first'" + ) + assert "EXPORT_PARTITION_ALREADY_EXPORTED" in error, ( + f"Expected EXPORT_PARTITION_ALREADY_EXPORTED in error, got: {error}" + ) + + # collect: aggregated PARTITION_EXPORT_FAILED message lists every conflicting partition. + error = node.query_and_get_error( + f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {s3_table}" + f" SETTINGS export_merge_tree_partition_all_on_error = 'collect'" + ) + assert "PARTITION_EXPORT_FAILED" in error, ( + f"Expected PARTITION_EXPORT_FAILED in error, got: {error}" + ) + for partition_id in ("2020", "2021", "2022"): + assert partition_id in error, ( + f"Expected aggregated error to mention partition {partition_id}, got: {error}" + ) + + # skip_conflicts: succeeds silently because every partition conflicts and is skipped. + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {s3_table}" + f" SETTINGS export_merge_tree_partition_all_on_error = 'skip_conflicts'" + )