From 17f55b0be08d83d78b576ff3ab504c55eb939eec Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 28 Apr 2026 09:20:40 +0200 Subject: [PATCH] Fix attemppt to create from table function --- .../StorageObjectStorageCluster.cpp | 3 +- tests/integration/test_s3_cluster/test.py | 28 ++ .../configs/config.d/cluster.xml | 12 + .../integration/test_storage_iceberg/test.py | 276 ++++++++++++++++++ 4 files changed, 318 insertions(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 07b6dc8471ba..87e3028febc2 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -109,7 +109,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( auto log_ = getLogger("StorageObjectStorageCluster"); - if (!columns_in_table_or_function_definition.empty() + if (!is_table_function + && !columns_in_table_or_function_definition.empty() && !is_datalake_query && mode_ == LoadingStrictnessLevel::CREATE) { diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 61914d0b222e..f4c8b3b1caa2 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1432,3 +1432,31 @@ def test_object_storage_remote_initiator(started_cluster): assert users[1:] == ["s0_0_0\tdefault", "s0_0_1\tfoo", "s0_1_0\tfoo"] + + # Random host from 'cluster_with_dots' for remote query, without type and structure + query_id = uuid.uuid4().hex + + result = node.query( + f""" + SELECT * from s3( + 'http://minio1:9001/root/data/clickhouse/*', 'minio', '{minio_secret_key}') + SETTINGS + object_storage_remote_initiator=1, + object_storage_cluster='cluster_with_dots' + """, + query_id = query_id, + ) + + assert result is not None + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'") + queries = node.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + assert queries == ["5"] diff --git a/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml b/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml index 54c08b27abe8..1f77ebbb7c0b 100644 --- a/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml +++ b/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml @@ -16,5 +16,17 @@ + + + + node2 + 9000 + + + node3 + 9000 + + + diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index b1fba2124adc..f61f4d621e72 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -91,6 +91,7 @@ def started_cluster(): "configs/users.d/users.xml", "configs/users.d/allow_local_data_lakes.xml", ], + with_zookeeper=True, with_minio=True, with_azurite=True, stay_alive=True, @@ -108,6 +109,7 @@ def started_cluster(): "configs/users.d/users.xml", "configs/users.d/allow_local_data_lakes.xml", ], + with_zookeeper=True, stay_alive=True, ) cluster.add_instance( @@ -123,6 +125,7 @@ def started_cluster(): "configs/users.d/users.xml", "configs/users.d/allow_local_data_lakes.xml", ], + with_zookeeper=True, stay_alive=True, ) @@ -4243,3 +4246,276 @@ def test_deeply_nested_struct_with_dotted_names(started_cluster, storage_type): f"SELECT `my.struct.some_dot.separated_parent.weird.field` FROM {TABLE_NAME} ORDER BY id" ).strip() assert result == expected, f"Expected:\n{expected}\nGot:\n{result}" + + +@pytest.mark.parametrize("format_version", ["1", "2"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +def test_object_storage_remote_initiator(started_cluster, format_version, storage_type): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = ( + "test_single_iceberg_file_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) + + write_iceberg_from_df(spark, generate_data(spark, 0, 100), TABLE_NAME) + + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) + + assert instance.query(f"SELECT * FROM {TABLE_NAME}") == instance.query( + "SELECT number, toString(number + 1) FROM numbers(100)" + ) + + expected_result = instance.query( + f""" + SELECT * from {TABLE_NAME} + ORDER BY ALL + """ + ).strip() + + # Simple cluster + query_id = uuid.uuid4().hex + result = instance.query( + f""" + SELECT * from {TABLE_NAME} + ORDER BY ALL + SETTINGS object_storage_remote_initiator=1 + """, + query_id = query_id, + ).strip() + + assert result == expected_result + + instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + queries = instance.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # non cluster request - only on initial node + assert queries == ["1"] + + # Remote initiator. Use 'cluster_second' to avoid choosing node1 as initiator, it can reduce number of subqueries + query_id = uuid.uuid4().hex + + result = instance.query( + f""" + SELECT * from {TABLE_NAME} + ORDER BY ALL + SETTINGS + object_storage_remote_initiator=1, + object_storage_cluster='cluster_second' + """, + query_id = query_id, + ).strip() + + assert result == expected_result + + instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + queries = instance.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + describe table + remote initiator + 2 subqueries on replicas + assert queries == ["5"] + + + query_id = uuid.uuid4().hex + + result = instance.query( + f""" + SELECT * from {TABLE_NAME} + ORDER BY ALL + SETTINGS + object_storage_remote_initiator=1, + object_storage_cluster='cluster_second', + object_storage_remote_initiator_cluster='cluster_second' + """, + query_id = query_id, + ).strip() + + assert result == expected_result + + instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + queries = instance.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + describe table + remote initiator + 2 subqueries on replicas + assert queries == ["5"] + + if storage_type == 's3': + table_function = f""" + icebergS3( + s3, + filename = 'iceberg_data/default/{TABLE_NAME}/', + format = Parquet, + url = 'http://minio1:9001/root/') + """ + table_function_with_structure = f""" + icebergS3( + s3, + filename = 'iceberg_data/default/{TABLE_NAME}/', + format = Parquet, + url = 'http://minio1:9001/root/', + structure = 'a Int64, b Int64') + """ + else: # azure + azure_storage_account_url = started_cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"] + azure_account_name = "devstoreaccount1" + azure_account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + table_function = f""" + icebergAzure( + azure, + storage_account_url = '{azure_storage_account_url}', + container = mycontainer, + blob_path = '/iceberg_data/default/{TABLE_NAME}/', + account_name = '{azure_account_name}', + account_key = '{azure_account_key}', + format = Parquet) + """ + table_function_with_structure = f""" + icebergAzure( + azure, + storage_account_url = '{azure_storage_account_url}', + container = mycontainer, + blob_path = '/iceberg_data/default/{TABLE_NAME}/', + account_name = '{azure_account_name}', + account_key = '{azure_account_key}', + format = Parquet, + structure = 'a Int64, b Int64') + """ + + + query_id = uuid.uuid4().hex + result = instance.query(f"SELECT * FROM {table_function} ORDER BY ALL", query_id=query_id).strip() + + assert result == expected_result + + instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + queries = instance.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + assert queries == ["1"] + + query_id = uuid.uuid4().hex + result = instance.query(f"SELECT * FROM {table_function} ORDER BY ALL SETTINGS object_storage_remote_initiator=1", query_id=query_id).strip() + + assert result == expected_result + + instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + queries = instance.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + assert queries == ["1"] + + query_id = uuid.uuid4().hex + result = instance.query(f"SELECT * FROM {table_function} ORDER BY ALL SETTINGS object_storage_remote_initiator=1, object_storage_cluster='cluster_second'", query_id=query_id).strip() + + assert result == expected_result + + instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + queries = instance.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + describe table + remote initiator + 2 subqueries on replicas + assert queries == ["5"] + + query_id = uuid.uuid4().hex + result = instance.query(f"SELECT * FROM {table_function_with_structure} ORDER BY ALL", query_id=query_id).strip() + + assert result == expected_result + + instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + queries = instance.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + assert queries == ["1"] + + query_id = uuid.uuid4().hex + result = instance.query(f"SELECT * FROM {table_function_with_structure} ORDER BY ALL SETTINGS object_storage_remote_initiator=1", query_id=query_id).strip() + + assert result == expected_result + + instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + queries = instance.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + assert queries == ["1"] + + query_id = uuid.uuid4().hex + result = instance.query(f"SELECT * FROM {table_function_with_structure} ORDER BY ALL SETTINGS object_storage_remote_initiator=1, object_storage_cluster='cluster_second'", query_id=query_id).strip() + + assert result == expected_result + + instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + queries = instance.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + describe table + remote initiator + 2 subqueries on replicas + assert queries == ["5"]