-
Notifications
You must be signed in to change notification settings - Fork 103
feat(rest): add scan plan endpoints to REST catalog client #614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4c3ce8d
6aa73bf
ab6c8d4
0368952
1f2bf61
66b1c88
8112bfe
efcaab3
17f12b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -26,8 +26,10 @@ | |||||
|
|
||||||
| #include "iceberg/catalog/rest/json_serde_internal.h" | ||||||
| #include "iceberg/catalog/rest/types.h" | ||||||
| #include "iceberg/expression/json_serde_internal.h" | ||||||
| #include "iceberg/json_serde_internal.h" | ||||||
| #include "iceberg/partition_spec.h" | ||||||
| #include "iceberg/schema.h" | ||||||
| #include "iceberg/sort_order.h" | ||||||
| #include "iceberg/table_identifier.h" | ||||||
| #include "iceberg/table_requirement.h" | ||||||
|
|
@@ -78,6 +80,21 @@ constexpr std::string_view kExpiresIn = "expires_in"; | |||||
| constexpr std::string_view kIssuedTokenType = "issued_token_type"; | ||||||
| constexpr std::string_view kRefreshToken = "refresh_token"; | ||||||
| constexpr std::string_view kOAuthScope = "scope"; | ||||||
| constexpr std::string_view kPlanStatus = "status"; | ||||||
| constexpr std::string_view kPlanId = "plan-id"; | ||||||
| constexpr std::string_view kPlanTasks = "plan-tasks"; | ||||||
| constexpr std::string_view kFileScanTasks = "file-scan-tasks"; | ||||||
| constexpr std::string_view kDeleteFiles = "delete-files"; | ||||||
| constexpr std::string_view kSnapshotId = "snapshot-id"; | ||||||
| constexpr std::string_view kSelect = "select"; | ||||||
| constexpr std::string_view kFilter = "filter"; | ||||||
| constexpr std::string_view kCaseSensitive = "case-sensitive"; | ||||||
| constexpr std::string_view kUseSnapshotSchema = "use-snapshot-schema"; | ||||||
| constexpr std::string_view kStartSnapshotId = "start-snapshot-id"; | ||||||
| constexpr std::string_view kEndSnapshotId = "end-snapshot-id"; | ||||||
| constexpr std::string_view kStatsFields = "stats-fields"; | ||||||
| constexpr std::string_view kMinRowsRequired = "min-rows-required"; | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| constexpr std::string_view kPlanTask = "plan-task"; | ||||||
|
|
||||||
| } // namespace | ||||||
|
|
||||||
|
|
@@ -506,6 +523,114 @@ Result<OAuthTokenResponse> OAuthTokenResponseFromJson(const nlohmann::json& json | |||||
| return response; | ||||||
| } | ||||||
|
|
||||||
| Result<nlohmann::json> ToJson(const PlanTableScanRequest& request) { | ||||||
| nlohmann::json json; | ||||||
| if (request.snapshot_id.has_value()) { | ||||||
| json[kSnapshotId] = request.snapshot_id.value(); | ||||||
| } | ||||||
| if (!request.select.empty()) { | ||||||
| json[kSelect] = request.select; | ||||||
| } | ||||||
| if (request.filter) { | ||||||
| ICEBERG_ASSIGN_OR_RAISE(auto filter_json, iceberg::ToJson(*request.filter)); | ||||||
| json[kFilter] = std::move(filter_json); | ||||||
| } | ||||||
| json[kCaseSensitive] = request.case_sensitive; | ||||||
| json[kUseSnapshotSchema] = request.use_snapshot_schema; | ||||||
| if (request.start_snapshot_id.has_value()) { | ||||||
| json[kStartSnapshotId] = request.start_snapshot_id.value(); | ||||||
| } | ||||||
| if (request.end_snapshot_id.has_value()) { | ||||||
| json[kEndSnapshotId] = request.end_snapshot_id.value(); | ||||||
| } | ||||||
| if (!request.statsFields.empty()) { | ||||||
| json[kStatsFields] = request.statsFields; | ||||||
| } | ||||||
| if (request.min_rows_required.has_value()) { | ||||||
| json[kMinRowsRequired] = request.min_rows_required.value(); | ||||||
| } | ||||||
| return json; | ||||||
| } | ||||||
|
|
||||||
| nlohmann::json ToJson(const FetchScanTasksRequest& request) { | ||||||
| nlohmann::json json; | ||||||
| json[kPlanTask] = request.planTask; | ||||||
| return json; | ||||||
| } | ||||||
|
|
||||||
| Status BaseScanTaskResponseFromJson( | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like a helper function that isn's exposed in the header; perhaps it should be placed in an anonymous namespace? |
||||||
| const nlohmann::json& json, BaseScanTaskResponse* response, | ||||||
| const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& | ||||||
| partition_specs_by_id, | ||||||
| const Schema& schema) { | ||||||
| // 1. plan_tasks | ||||||
| ICEBERG_ASSIGN_OR_RAISE( | ||||||
| response->plan_tasks, | ||||||
| GetJsonValueOrDefault<std::vector<std::string>>(json, kPlanTasks)); | ||||||
|
|
||||||
| // 2. delete_files | ||||||
| ICEBERG_ASSIGN_OR_RAISE( | ||||||
| auto delete_files_json, | ||||||
| GetJsonValueOrDefault<nlohmann::json>(json, kDeleteFiles, nlohmann::json::array())); | ||||||
| for (const auto& entry_json : delete_files_json) { | ||||||
| ICEBERG_ASSIGN_OR_RAISE(auto delete_file, | ||||||
| DataFileFromJson(entry_json, partition_specs_by_id, schema)); | ||||||
| response->delete_files.push_back(std::move(delete_file)); | ||||||
| } | ||||||
|
|
||||||
| // 3. file_scan_tasks | ||||||
| ICEBERG_ASSIGN_OR_RAISE(auto file_scan_tasks_json, | ||||||
| GetJsonValueOrDefault<nlohmann::json>(json, kFileScanTasks, | ||||||
| nlohmann::json::array())); | ||||||
| ICEBERG_ASSIGN_OR_RAISE( | ||||||
| response->file_scan_tasks, | ||||||
| FileScanTasksFromJson(file_scan_tasks_json, response->delete_files, | ||||||
| partition_specs_by_id, schema)); | ||||||
| return {}; | ||||||
| } | ||||||
|
|
||||||
| Result<PlanTableScanResponse> PlanTableScanResponseFromJson( | ||||||
| const nlohmann::json& json, | ||||||
| const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& | ||||||
| partition_specs_by_id, | ||||||
| const Schema& schema) { | ||||||
| PlanTableScanResponse response; | ||||||
| ICEBERG_ASSIGN_OR_RAISE(response.plan_status, | ||||||
| GetJsonValue<std::string>(json, kPlanStatus)); | ||||||
| ICEBERG_ASSIGN_OR_RAISE(response.plan_id, | ||||||
| GetJsonValueOrDefault<std::string>(json, kPlanId)); | ||||||
| ICEBERG_RETURN_UNEXPECTED( | ||||||
| BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema)); | ||||||
| ICEBERG_RETURN_UNEXPECTED(response.Validate()); | ||||||
| return response; | ||||||
| } | ||||||
|
|
||||||
| Result<FetchPlanningResultResponse> FetchPlanningResultResponseFromJson( | ||||||
| const nlohmann::json& json, | ||||||
| const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& | ||||||
| partition_specs_by_id, | ||||||
| const Schema& schema) { | ||||||
| FetchPlanningResultResponse response; | ||||||
| ICEBERG_ASSIGN_OR_RAISE(response.plan_status, | ||||||
| GetJsonValue<std::string>(json, kPlanStatus)); | ||||||
| ICEBERG_RETURN_UNEXPECTED( | ||||||
| BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema)); | ||||||
| ICEBERG_RETURN_UNEXPECTED(response.Validate()); | ||||||
| return response; | ||||||
| } | ||||||
|
|
||||||
| Result<FetchScanTasksResponse> FetchScanTasksResponseFromJson( | ||||||
| const nlohmann::json& json, | ||||||
| const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& | ||||||
| partition_specs_by_id, | ||||||
| const Schema& schema) { | ||||||
| FetchScanTasksResponse response; | ||||||
| ICEBERG_RETURN_UNEXPECTED( | ||||||
| BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema)); | ||||||
| ICEBERG_RETURN_UNEXPECTED(response.Validate()); | ||||||
| return response; | ||||||
| } | ||||||
|
|
||||||
| #define ICEBERG_DEFINE_FROM_JSON(Model) \ | ||||||
| template <> \ | ||||||
| Result<Model> FromJson<Model>(const nlohmann::json& json) { \ | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,4 +102,26 @@ Result<std::string> ResourcePaths::CommitTransaction() const { | |
| return std::format("{}/v1/{}transactions/commit", base_uri_, prefix_); | ||
| } | ||
|
|
||
| Result<std::string> ResourcePaths::ScanPlan(const TableIdentifier& ident) const { | ||
| ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns)); | ||
| ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); | ||
| return std::format("{}/v1/{}namespaces/{}/tables/{}/plan", base_uri_, prefix_, | ||
| encoded_namespace, encoded_table_name); | ||
| } | ||
|
|
||
| Result<std::string> ResourcePaths::ScanPlan(const TableIdentifier& ident, | ||
| const std::string& plan_id) const { | ||
| ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns)); | ||
| ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); | ||
| return std::format("{}/v1/{}namespaces/{}/tables/{}/plan/{}", base_uri_, prefix_, | ||
| encoded_namespace, encoded_table_name, plan_id); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also encode plan_id? |
||
| } | ||
|
|
||
| Result<std::string> ResourcePaths::ScanTask(const TableIdentifier& ident) const { | ||
| ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns)); | ||
| ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); | ||
| return std::format("{}/v1/{}namespaces/{}/tables/{}/tasks", base_uri_, prefix_, | ||
| encoded_namespace, encoded_table_name); | ||
| } | ||
|
|
||
| } // namespace iceberg::rest | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a PlanTask level error handler like java implementation? see https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java#L197