diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 92fd0c1c2..b0f03b527 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -119,6 +119,7 @@ add_iceberg_test(util_test location_util_test.cc roaring_position_bitmap_test.cc position_delete_index_test.cc + retry_util_test.cc string_util_test.cc struct_like_set_test.cc transform_util_test.cc diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index f3eeb7da5..ed9a5866a 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -91,6 +91,7 @@ iceberg_tests = { 'formatter_test.cc', 'location_util_test.cc', 'position_delete_index_test.cc', + 'retry_util_test.cc', 'roaring_position_bitmap_test.cc', 'string_util_test.cc', 'struct_like_set_test.cc', diff --git a/src/iceberg/test/retry_util_test.cc b/src/iceberg/test/retry_util_test.cc new file mode 100644 index 000000000..05c7cb0e1 --- /dev/null +++ b/src/iceberg/test/retry_util_test.cc @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/retry_util.h" + +#include +#include + +#include + +#include "iceberg/result.h" +#include "iceberg/test/matchers.h" + +namespace iceberg { + +TEST(RetryRunnerTest, SuccessOnFirstAttempt) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + return 42; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 42); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +TEST(RetryRunnerTest, RetryOnceThenSucceed) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return CommitFailed("transient failure"); + } + return 42; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 42); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(attempts, 2); +} + +TEST(RetryRunnerTest, MaxAttemptsExhausted) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 2, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + return CommitFailed("always fails"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +TEST(RetryRunnerTest, OnlyRetryOnFilter) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .OnlyRetryOn(ErrorKind::kCommitFailed) + .Run( + [&]() -> Result { + ++call_count; + return ValidationFailed("schema conflict"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +TEST(RetryRunnerTest, OnlyRetryOnMatchingError) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 2, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .OnlyRetryOn(ErrorKind::kCommitFailed) + .Run( + [&]() -> Result { + ++call_count; + if (call_count <= 2) { + return CommitFailed("transient"); + } + return 100; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 100); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +TEST(RetryRunnerTest, StopRetryOnMatchingError) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 5, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .StopRetryOn({ErrorKind::kCommitStateUnknown}) + .Run( + [&]() -> Result { + ++call_count; + return CommitStateUnknown("datacenter on fire"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kCommitStateUnknown)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +TEST(RetryRunnerTest, ZeroRetries) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 0, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + return CommitFailed("fail"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +TEST(RetryRunnerTest, TotalTimeoutStopsBeforeStartingAnotherAttempt) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 20, + .max_wait_ms = 20, + .total_timeout_ms = 15}) + .Run( + [&]() -> Result { + ++call_count; + // The first failure consumes most of the 15 ms budget, so the + // next 20 ms backoff should prevent another attempt from + // starting. + if (call_count == 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + return CommitFailed("retry budget exhausted"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) { + int call_count = 0; + int32_t attempts = 0; + + auto result = MakeCommitRetryRunner(2, 1, 10, 5000) + .Run( + [&]() -> Result { + ++call_count; + return ValidationFailed("not retryable"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +TEST(RetryRunnerTest, MakeCommitRetryRunnerRetriesCommitFailed) { + int call_count = 0; + int32_t attempts = 0; + + auto result = MakeCommitRetryRunner(3, 1, 10, 5000) + .Run( + [&]() -> Result { + ++call_count; + if (call_count <= 2) { + return CommitFailed("transient"); + } + return 99; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 99); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) { + int call_count = 0; + int32_t attempts = 0; + + auto result = + RetryRunner(RetryConfig{.num_retries = 5, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .OnlyRetryOn({ErrorKind::kCommitFailed, ErrorKind::kServiceUnavailable}) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return CommitFailed("conflict"); + } + if (call_count == 2) { + return ServiceUnavailable("server busy"); + } + return 77; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 77); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +TEST(RetryRunnerTest, DefaultRetryAllErrors) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return IOError("disk full"); + } + if (call_count == 2) { + return ValidationFailed("bad schema"); + } + return 55; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 55); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +} // namespace iceberg diff --git a/src/iceberg/test/table_test.cc b/src/iceberg/test/table_test.cc index e445d9011..0ad8e8ced 100644 --- a/src/iceberg/test/table_test.cc +++ b/src/iceberg/test/table_test.cc @@ -128,6 +128,9 @@ TYPED_TEST(TypedTableTest, Refresh) { .WillOnce(::testing::Return(refreshed)); } EXPECT_THAT(table->Refresh(), IsOk()); + if constexpr (std::is_same_v) { + EXPECT_EQ(table->metadata_file_location(), "s3://bucket/meta2.json"); + } } else { EXPECT_THAT(table->Refresh(), IsError(ErrorKind::kNotSupported)); } diff --git a/src/iceberg/test/transaction_test.cc b/src/iceberg/test/transaction_test.cc index 232febc1f..3a13b7bc5 100644 --- a/src/iceberg/test/transaction_test.cc +++ b/src/iceberg/test/transaction_test.cc @@ -23,9 +23,12 @@ #include "iceberg/expression/term.h" #include "iceberg/sort_order.h" #include "iceberg/test/matchers.h" +#include "iceberg/test/mock_catalog.h" #include "iceberg/test/update_test_base.h" #include "iceberg/transform.h" +#include "iceberg/type.h" #include "iceberg/update/update_properties.h" +#include "iceberg/update/update_schema.h" #include "iceberg/update/update_sort_order.h" namespace iceberg { @@ -94,4 +97,147 @@ TEST_F(TransactionTest, MultipleUpdatesInTransaction) { EXPECT_EQ(*sort_order, *expected_sort_order); } +class TransactionRetryTest : public UpdateTestBase { + protected: + void SetUp() override { + UpdateTestBase::SetUp(); + + // Create a MockCatalog and wire it to the existing table + mock_catalog_ = std::make_shared<::testing::NiceMock>(); + + ON_CALL(*mock_catalog_, LoadTable(::testing::_)) + .WillByDefault([this](const TableIdentifier&) -> Result> { + return Table::Make(table_->name(), table_->metadata(), + std::string(table_->metadata_file_location()), table_->io(), + mock_catalog_); + }); + + // Create a table instance bound to the mock catalog + auto result = Table::Make(table_->name(), table_->metadata(), + std::string(table_->metadata_file_location()), table_->io(), + mock_catalog_); + ASSERT_THAT(result, IsOk()); + mock_table_ = std::move(result.value()); + } + + std::shared_ptr<::testing::NiceMock> mock_catalog_; + std::shared_ptr mock_table_; +}; + +TEST_F(TransactionRetryTest, CommitRetrySucceedsAfterConflict) { + int update_call_count = 0; + ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_)) + .WillByDefault([this, &update_call_count]( + const TableIdentifier&, + const std::vector>&, + const std::vector>&) + -> Result> { + ++update_call_count; + if (update_call_count == 1) { + return CommitFailed("conflict on first attempt"); + } + return Table::Make(mock_table_->name(), mock_table_->metadata(), + std::string(mock_table_->metadata_file_location()), + mock_table_->io(), mock_catalog_); + }); + + ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties()); + update->Set("retry.test", "value"); + EXPECT_THAT(update->Commit(), IsOk()); + + auto result = txn->Commit(); + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(update_call_count, 2); +} + +TEST_F(TransactionRetryTest, CommitRetryExhausted) { + int update_call_count = 0; + ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_)) + .WillByDefault( + [&update_call_count](const TableIdentifier&, + const std::vector>&, + const std::vector>&) + -> Result> { + ++update_call_count; + return CommitFailed("always conflicts"); + }); + + ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties()); + update->Set("retry.test", "value"); + EXPECT_THAT(update->Commit(), IsOk()); + + auto result = txn->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(update_call_count, 5); +} + +TEST_F(TransactionRetryTest, CommitNonRetryableErrorStopsImmediately) { + int update_call_count = 0; + ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_)) + .WillByDefault( + [&update_call_count](const TableIdentifier&, + const std::vector>&, + const std::vector>&) + -> Result> { + ++update_call_count; + return CommitStateUnknown("unknown state"); + }); + + ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties()); + update->Set("retry.test", "value"); + EXPECT_THAT(update->Commit(), IsOk()); + + auto result = txn->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitStateUnknown)); + EXPECT_EQ(update_call_count, 1); // Should not retry +} + +TEST_F(TransactionRetryTest, CreateTransactionDoesNotRetry) { + int update_call_count = 0; + ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_)) + .WillByDefault( + [&update_call_count](const TableIdentifier&, + const std::vector>&, + const std::vector>&) + -> Result> { + ++update_call_count; + return CommitFailed("conflict"); + }); + + ICEBERG_UNWRAP_OR_FAIL(auto txn, + Transaction::Make(mock_table_, TransactionKind::kCreate)); + ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties()); + update->Set("create.test", "value"); + EXPECT_THAT(update->Commit(), IsOk()); + + auto result = txn->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(update_call_count, 1); // No retry for kCreate +} + +TEST_F(TransactionRetryTest, NonRetryableUpdatePreventsRetry) { + int update_call_count = 0; + ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_)) + .WillByDefault( + [&update_call_count](const TableIdentifier&, + const std::vector>&, + const std::vector>&) + -> Result> { + ++update_call_count; + return CommitFailed("conflict"); + }); + + ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto schema_update, txn->NewUpdateSchema()); + schema_update->AddColumn("new_col", int64()); + EXPECT_THAT(schema_update->Commit(), IsOk()); + + auto result = txn->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(update_call_count, 1); +} + } // namespace iceberg diff --git a/src/iceberg/test/update_statistics_test.cc b/src/iceberg/test/update_statistics_test.cc index 7b9d4f582..d6721e5bd 100644 --- a/src/iceberg/test/update_statistics_test.cc +++ b/src/iceberg/test/update_statistics_test.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -28,6 +29,7 @@ #include "iceberg/result.h" #include "iceberg/statistics_file.h" #include "iceberg/test/matchers.h" +#include "iceberg/test/mock_catalog.h" #include "iceberg/test/update_test_base.h" namespace iceberg { @@ -66,6 +68,35 @@ class UpdateStatisticsTest : public UpdateTestBase { } }; +class UpdateStatisticsRetryTest : public UpdateStatisticsTest { + protected: + void SetUp() override { + UpdateStatisticsTest::SetUp(); + + mock_catalog_ = std::make_shared<::testing::NiceMock>(); + + ON_CALL(*mock_catalog_, LoadTable(::testing::_)) + .WillByDefault([this](const TableIdentifier&) -> Result> { + ++load_table_count_; + auto refreshed_metadata = std::make_shared(*table_->metadata()); + auto refreshed_location = table_location_ + "/metadata/reload-" + + std::to_string(load_table_count_) + ".metadata.json"; + return Table::Make(table_->name(), std::move(refreshed_metadata), + std::move(refreshed_location), table_->io(), mock_catalog_); + }); + + auto result = Table::Make(table_->name(), table_->metadata(), + std::string(table_->metadata_file_location()), table_->io(), + mock_catalog_); + ASSERT_THAT(result, IsOk()); + mock_table_ = std::move(result.value()); + } + + int load_table_count_ = 0; + std::shared_ptr<::testing::NiceMock> mock_catalog_; + std::shared_ptr
mock_table_; +}; + TEST_F(UpdateStatisticsTest, EmptyUpdate) { ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); @@ -218,4 +249,36 @@ TEST_F(UpdateStatisticsTest, CommitSuccess) { EXPECT_EQ(*statistics[0], *stats_file); } +TEST_F(UpdateStatisticsRetryTest, StandaloneCommitRetriesAfterConflict) { + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, mock_table_->current_snapshot()); + auto stats_file = MakeStatisticsFile(current_snapshot->snapshot_id, + "/warehouse/test_table/metadata/stats-1.puffin"); + + int update_call_count = 0; + ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_)) + .WillByDefault([this, &update_call_count, stats_file]( + const TableIdentifier&, + const std::vector>&, + const std::vector>&) + -> Result> { + ++update_call_count; + if (update_call_count == 1) { + return CommitFailed("conflict on first attempt"); + } + auto committed_metadata = + std::make_shared(*mock_table_->metadata()); + committed_metadata->statistics = {stats_file}; + return Table::Make(mock_table_->name(), std::move(committed_metadata), + table_location_ + "/metadata/committed.metadata.json", + mock_table_->io(), mock_catalog_); + }); + + ICEBERG_UNWRAP_OR_FAIL(auto update, mock_table_->NewUpdateStatistics()); + update->SetStatistics(stats_file); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_EQ(update_call_count, 2); + EXPECT_EQ(load_table_count_, 1); +} + } // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index c552c8fd9..049b0f49d 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -50,6 +49,7 @@ #include "iceberg/util/checked_cast.h" #include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" +#include "iceberg/util/retry_util.h" namespace iceberg { @@ -346,22 +346,22 @@ Result> Transaction::Commit() { return ctx_->table; } - std::vector> requirements; - switch (ctx_->kind) { - case TransactionKind::kCreate: { - ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(updates)); - } break; - case TransactionKind::kUpdate: { - ICEBERG_ASSIGN_OR_RAISE( - requirements, - TableRequirements::ForUpdateTable(*ctx_->metadata_builder->base(), updates)); + const auto& props = ctx_->table->properties(); + int32_t num_retries = + CanRetry() ? static_cast(props.Get(TableProperties::kCommitNumRetries)) + : 0; + int32_t min_wait_ms = props.Get(TableProperties::kCommitMinRetryWaitMs); + int32_t max_wait_ms = props.Get(TableProperties::kCommitMaxRetryWaitMs); + int32_t total_timeout_ms = props.Get(TableProperties::kCommitTotalRetryTimeMs); - } break; - } - - // XXX: we should handle commit failure and retry here. + bool is_first_attempt = true; auto commit_result = - ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements, updates); + MakeCommitRetryRunner(num_retries, min_wait_ms, max_wait_ms, total_timeout_ms) + .Run([this, &is_first_attempt]() -> Result> { + auto result = CommitOnce(is_first_attempt); + is_first_attempt = false; + return result; + }); Result finalize_result = commit_result.has_value() @@ -381,6 +381,47 @@ Result> Transaction::Commit() { return ctx_->table; } +Result> Transaction::CommitOnce(bool is_first_attempt) { + std::vector> requirements; + + switch (ctx_->kind) { + case TransactionKind::kCreate: { + ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable( + ctx_->metadata_builder->changes())); + } break; + case TransactionKind::kUpdate: { + if (!is_first_attempt) { + ICEBERG_RETURN_UNEXPECTED(ctx_->table->Refresh()); + } + if (ctx_->metadata_builder->base() != ctx_->table->metadata().get()) { + ctx_->metadata_builder = + TableMetadataBuilder::BuildFrom(ctx_->table->metadata().get()); + for (const auto& update : pending_updates_) { + ICEBERG_RETURN_UNEXPECTED(Apply(*update)); + } + } + ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForUpdateTable( + *ctx_->metadata_builder->base(), + ctx_->metadata_builder->changes())); + } break; + } + + return ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements, + ctx_->metadata_builder->changes()); +} + +bool Transaction::CanRetry() const { + if (ctx_->kind == TransactionKind::kCreate) { + return false; + } + for (const auto& update : pending_updates_) { + if (!update->IsRetryable()) { + return false; + } + } + return true; +} + Result> Transaction::NewUpdatePartitionSpec() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_spec, UpdatePartitionSpec::Make(ctx_)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index ec8c4db0a..60fe935f3 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -138,6 +138,12 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> CommitOnce(bool is_first_attempt); + + /// \brief Whether this transaction can retry after a commit conflict. + bool CanRetry() const; + private: friend class PendingUpdate; diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index 23b8743b9..7c1588aa5 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -140,6 +140,7 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { ExpireSnapshots& CleanExpiredMetadata(bool clean); Kind kind() const final { return Kind::kExpireSnapshots; } + bool IsRetryable() const override { return true; } /// \brief Apply the pending changes and return the results /// \return The results of changes diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index ad6b91e2f..19998ddb3 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -57,6 +57,9 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { /// \brief Return the kind of this pending update. virtual Kind kind() const = 0; + /// \brief Whether this update can be retried after a commit conflict. + virtual bool IsRetryable() const = 0; + /// \brief Apply the pending changes and commit. /// /// \return An OK status if the commit was successful, or an error: diff --git a/src/iceberg/update/set_snapshot.h b/src/iceberg/update/set_snapshot.h index 6aeb92652..431e636b2 100644 --- a/src/iceberg/update/set_snapshot.h +++ b/src/iceberg/update/set_snapshot.h @@ -51,6 +51,7 @@ class ICEBERG_EXPORT SetSnapshot : public PendingUpdate { SetSnapshot& RollbackTo(int64_t snapshot_id); Kind kind() const final { return Kind::kSetSnapshot; } + bool IsRetryable() const override { return true; } /// \brief Apply the pending changes and return the target snapshot ID. Result Apply(); diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index fd1c21f55..a59ebdc72 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -226,6 +226,18 @@ int64_t SnapshotUpdate::SnapshotId() { Result SnapshotUpdate::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + if (staged_snapshot_ != nullptr) { + for (const auto& manifest_list : manifest_lists_) { + std::ignore = DeleteFile(manifest_list); + } + manifest_lists_.clear(); + CleanUncommitted(std::unordered_set{}); + + staged_snapshot_ = nullptr; + summary_.Clear(); + } + ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot, SnapshotUtil::OptionalLatestSnapshot(base(), target_branch_)); diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 97ec29b28..f48e5f44d 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -52,6 +52,7 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { ~SnapshotUpdate() override; Kind kind() const override { return Kind::kUpdateSnapshot; } + bool IsRetryable() const override { return true; } /// \brief Set a callback to delete files instead of the table's default. /// diff --git a/src/iceberg/update/update_location.h b/src/iceberg/update/update_location.h index 33864380e..48fb84c21 100644 --- a/src/iceberg/update/update_location.h +++ b/src/iceberg/update/update_location.h @@ -45,6 +45,7 @@ class ICEBERG_EXPORT UpdateLocation : public PendingUpdate { UpdateLocation& SetLocation(std::string_view location); Kind kind() const final { return Kind::kUpdateLocation; } + bool IsRetryable() const override { return true; } /// \brief Apply the pending changes and return the new location. Result Apply(); diff --git a/src/iceberg/update/update_partition_spec.h b/src/iceberg/update/update_partition_spec.h index 6b3dd40ee..67dcb1413 100644 --- a/src/iceberg/update/update_partition_spec.h +++ b/src/iceberg/update/update_partition_spec.h @@ -100,6 +100,13 @@ class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate { Kind kind() const final { return Kind::kUpdatePartitionSpec; } + /// \brief Partition spec updates are not retryable. + /// + /// The update caches the current schema/spec state and may allocate or recycle + /// partition field IDs from that base. Replaying after a refresh can change the + /// intended transform bindings or field ID assignment semantics. + bool IsRetryable() const override { return false; } + struct ApplyResult { std::shared_ptr spec; bool set_as_default; diff --git a/src/iceberg/update/update_partition_statistics.h b/src/iceberg/update/update_partition_statistics.h index bdaf5a703..982b1bd39 100644 --- a/src/iceberg/update/update_partition_statistics.h +++ b/src/iceberg/update/update_partition_statistics.h @@ -63,6 +63,13 @@ class ICEBERG_EXPORT UpdatePartitionStatistics : public PendingUpdate { Kind kind() const final { return Kind::kUpdatePartitionStatistics; } + /// \brief Partition statistics updates are intentionally not retried today. + /// + /// This matches the current Java `SetPartitionStatistics` behavior, which commits + /// directly without a retry loop. Keep this conservative until we add explicit replay + /// coverage for this update type. + bool IsRetryable() const override { return false; } + struct ApplyResult { std::vector>> to_set; std::vector to_remove; diff --git a/src/iceberg/update/update_properties.h b/src/iceberg/update/update_properties.h index 491a55678..dee923410 100644 --- a/src/iceberg/update/update_properties.h +++ b/src/iceberg/update/update_properties.h @@ -66,6 +66,7 @@ class ICEBERG_EXPORT UpdateProperties : public PendingUpdate { UpdateProperties& Remove(const std::string& key); Kind kind() const final { return Kind::kUpdateProperties; } + bool IsRetryable() const override { return true; } /// \brief Apply the pending changes and return the updates and removals. Result Apply(); diff --git a/src/iceberg/update/update_schema.h b/src/iceberg/update/update_schema.h index 564a03df1..2be3732a0 100644 --- a/src/iceberg/update/update_schema.h +++ b/src/iceberg/update/update_schema.h @@ -334,6 +334,13 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate { Kind kind() const final { return Kind::kUpdateSchema; } + /// \brief Schema updates are not retryable. + /// + /// The update records field IDs, move targets, and last-column-id-derived state from + /// the schema that was current when the builder was created. Replaying after a refresh + /// can apply a different schema evolution than the caller originally authored. + bool IsRetryable() const override { return false; } + struct ApplyResult { std::shared_ptr schema; int32_t new_last_column_id; diff --git a/src/iceberg/update/update_snapshot_reference.h b/src/iceberg/update/update_snapshot_reference.h index 7d061ea3b..9ff0a5083 100644 --- a/src/iceberg/update/update_snapshot_reference.h +++ b/src/iceberg/update/update_snapshot_reference.h @@ -134,6 +134,13 @@ class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate { Kind kind() const final { return Kind::kUpdateSnapshotReference; } + /// \brief Snapshot reference updates are not retryable. + /// + /// The update snapshots the ref map at construction time and validates rename or + /// fast-forward operations against that captured state. Replaying after a refresh can + /// clobber or reinterpret concurrent ref changes. + bool IsRetryable() const override { return false; } + struct ApplyResult { /// References to set or update (name, ref pairs) std::vector>> to_set; diff --git a/src/iceberg/update/update_sort_order.h b/src/iceberg/update/update_sort_order.h index 53fe927ef..4696ae72d 100644 --- a/src/iceberg/update/update_sort_order.h +++ b/src/iceberg/update/update_sort_order.h @@ -66,6 +66,7 @@ class ICEBERG_EXPORT UpdateSortOrder : public PendingUpdate { UpdateSortOrder& CaseSensitive(bool case_sensitive); Kind kind() const final { return Kind::kUpdateSortOrder; } + bool IsRetryable() const override { return true; } /// \brief Apply the pending changes and return the new SortOrder. Result> Apply(); diff --git a/src/iceberg/update/update_statistics.h b/src/iceberg/update/update_statistics.h index 6441c02a3..4a6c12fc4 100644 --- a/src/iceberg/update/update_statistics.h +++ b/src/iceberg/update/update_statistics.h @@ -62,6 +62,12 @@ class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate { Kind kind() const final { return Kind::kUpdateStatistics; } + /// \brief Statistics updates are retryable. + /// + /// The operation is keyed by snapshot ID and only replaces or removes statistics file + /// references, so replaying it after a refresh preserves the caller's intent. + bool IsRetryable() const override { return true; } + struct ApplyResult { std::vector>> to_set; std::vector to_remove; diff --git a/src/iceberg/util/meson.build b/src/iceberg/util/meson.build index 496a75758..a38dc886c 100644 --- a/src/iceberg/util/meson.build +++ b/src/iceberg/util/meson.build @@ -34,6 +34,7 @@ install_headers( 'macros.h', 'partition_value_util.h', 'property_util.h', + 'retry_util.h', 'string_util.h', 'temporal_util.h', 'timepoint.h', diff --git a/src/iceberg/util/retry_util.h b/src/iceberg/util/retry_util.h new file mode 100644 index 000000000..7041a40ef --- /dev/null +++ b/src/iceberg/util/retry_util.h @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Configuration for retry behavior +struct ICEBERG_EXPORT RetryConfig { + /// Maximum number of retry attempts (not including the first attempt) + int32_t num_retries = 4; + /// Minimum wait time between retries in milliseconds + int32_t min_wait_ms = 100; + /// Maximum wait time between retries in milliseconds + int32_t max_wait_ms = 60 * 1000; // 1 minute + /// Total wall-clock time budget for retries, including backoff sleeps. + int32_t total_timeout_ms = 30 * 60 * 1000; // 30 minutes + /// Exponential backoff scale factor + double scale_factor = 2.0; +}; + +/// \brief Utility class for running tasks with retry logic +class ICEBERG_EXPORT RetryRunner { + public: + /// \brief Construct a RetryRunner with the given configuration + explicit RetryRunner(RetryConfig config = {}) : config_(std::move(config)) {} + + /// \brief Specify error types that should trigger a retry. + /// + /// When set, only errors matching one of these kinds will be retried. + /// All other errors will stop retries immediately. + /// + /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, + /// StopRetryOn is ignored. + RetryRunner& OnlyRetryOn(std::initializer_list error_kinds) { + only_retry_on_ = std::vector(error_kinds); + return *this; + } + + /// \brief Specify a single error type that should trigger a retry. + /// + /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, + /// StopRetryOn is ignored. + RetryRunner& OnlyRetryOn(ErrorKind error_kind) { + only_retry_on_ = std::vector{error_kind}; + return *this; + } + + /// \brief Specify error types that should stop retries immediately. + /// + /// When set, errors matching one of these kinds will not be retried. + /// All other errors will be retried. + /// + /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, + /// StopRetryOn is ignored. + RetryRunner& StopRetryOn(std::initializer_list error_kinds) { + stop_retry_on_ = std::vector(error_kinds); + return *this; + } + + /// \brief Run a task that returns a Result + /// + /// TODO: Replace attempt_counter with a metrics reporter once it is available. + template ::value_type> + Result Run(F&& task, int32_t* attempt_counter = nullptr) { + if (config_.num_retries < 0) { + return InvalidArgument("num_retries must be non-negative, got {}", + config_.num_retries); + } + + const auto deadline = ComputeDeadline(); + int32_t attempt = 0; + const int32_t max_attempts = config_.num_retries + 1; + + while (true) { + ++attempt; + if (attempt_counter != nullptr) { + *attempt_counter = attempt; + } + + auto result = task(); + if (result.has_value()) { + return result; + } + + if (!CanRetry(result.error().kind, attempt, max_attempts, deadline)) { + return result; + } + + if (!WaitForNextAttempt(attempt, deadline)) { + return result; + } + } + } + + private: + using Clock = std::chrono::steady_clock; + using Duration = std::chrono::milliseconds; + using TimePoint = Clock::time_point; + + std::optional ComputeDeadline() const { + if (config_.total_timeout_ms <= 0) { + return std::nullopt; + } + return Clock::now() + Duration(config_.total_timeout_ms); + } + + bool HasTimedOut(const std::optional& deadline) const { + return deadline.has_value() && Clock::now() >= *deadline; + } + + /// \brief Check if the given error kind should trigger a retry. + bool ShouldRetry(ErrorKind kind) const { + if (!only_retry_on_.empty()) { + return std::ranges::any_of(only_retry_on_, + [kind](ErrorKind k) { return kind == k; }); + } + + if (!stop_retry_on_.empty()) { + return !std::ranges::any_of(stop_retry_on_, + [kind](ErrorKind k) { return kind == k; }); + } + + return true; + } + + bool CanRetry(ErrorKind kind, int32_t attempt, int32_t max_attempts, + const std::optional& deadline) const { + return attempt < max_attempts && !HasTimedOut(deadline) && ShouldRetry(kind); + } + + std::optional RetryDelayWithinBudget( + int32_t attempt, const std::optional& deadline) const { + const auto delay = Duration(CalculateDelay(attempt)); + if (!deadline.has_value()) { + return delay; + } + + const auto now = Clock::now(); + if (now >= *deadline) { + return std::nullopt; + } + + const auto remaining = std::chrono::duration_cast(*deadline - now); + if (remaining <= Duration::zero() || delay >= remaining) { + return std::nullopt; + } + + return delay; + } + + bool WaitForNextAttempt(int32_t attempt, + const std::optional& deadline) const { + const auto delay = RetryDelayWithinBudget(attempt, deadline); + if (!delay.has_value()) { + return false; + } + + std::this_thread::sleep_for(*delay); + return !HasTimedOut(deadline); + } + + /// \brief Calculate delay with exponential backoff and jitter + int32_t CalculateDelay(int32_t attempt) const { + // Calculate base delay with exponential backoff + double base_delay = config_.min_wait_ms * std::pow(config_.scale_factor, attempt - 1); + int32_t delay_ms = static_cast( + std::min(base_delay, static_cast(config_.max_wait_ms))); + + static thread_local std::mt19937 gen(std::random_device{}()); + int32_t jitter_range = std::max(1, delay_ms / 10); + std::uniform_int_distribution<> dis(0, jitter_range - 1); + delay_ms += dis(gen); + return std::max(1, delay_ms); + } + + RetryConfig config_; + std::vector only_retry_on_; + std::vector stop_retry_on_; +}; + +/// \brief Helper function to create a RetryRunner with table commit configuration +ICEBERG_EXPORT inline RetryRunner MakeCommitRetryRunner(int32_t num_retries, + int32_t min_wait_ms, + int32_t max_wait_ms, + int32_t total_timeout_ms) { + return RetryRunner(RetryConfig{.num_retries = num_retries, + .min_wait_ms = min_wait_ms, + .max_wait_ms = max_wait_ms, + .total_timeout_ms = total_timeout_ms}) + .OnlyRetryOn(ErrorKind::kCommitFailed); +} + +} // namespace iceberg