Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(

auto log_ = getLogger("StorageObjectStorageCluster");

if (!columns_in_table_or_function_definition.empty()
if (!is_table_function
&& !columns_in_table_or_function_definition.empty()
&& !is_datalake_query
&& mode_ == LoadingStrictnessLevel::CREATE)
{
Expand Down
28 changes: 28 additions & 0 deletions tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1432,3 +1432,31 @@ def test_object_storage_remote_initiator(started_cluster):
assert users[1:] == ["s0_0_0\tdefault",
"s0_0_1\tfoo",
"s0_1_0\tfoo"]

# Random host from 'cluster_with_dots' for remote query, without type and structure
query_id = uuid.uuid4().hex

result = node.query(
f"""
SELECT * from s3(
'http://minio1:9001/root/data/clickhouse/*', 'minio', '{minio_secret_key}')
SETTINGS
object_storage_remote_initiator=1,
object_storage_cluster='cluster_with_dots'
""",
query_id = query_id,
)

assert result is not None

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'")
queries = node.query(
f"""
SELECT count()
FROM clusterAllReplicas('cluster_all', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
FORMAT TSV
"""
).splitlines()

assert queries == ["5"]
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,17 @@
</replica>
</shard>
</cluster_simple>
<cluster_second>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</cluster_second>
</remote_servers>
</clickhouse>
276 changes: 276 additions & 0 deletions tests/integration/test_storage_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def started_cluster():
"configs/users.d/users.xml",
"configs/users.d/allow_local_data_lakes.xml",
],
with_zookeeper=True,
with_minio=True,
with_azurite=True,
stay_alive=True,
Expand All @@ -108,6 +109,7 @@ def started_cluster():
"configs/users.d/users.xml",
"configs/users.d/allow_local_data_lakes.xml",
],
with_zookeeper=True,
stay_alive=True,
)
cluster.add_instance(
Expand All @@ -123,6 +125,7 @@ def started_cluster():
"configs/users.d/users.xml",
"configs/users.d/allow_local_data_lakes.xml",
],
with_zookeeper=True,
stay_alive=True,
)

Expand Down Expand Up @@ -4243,3 +4246,276 @@ def test_deeply_nested_struct_with_dotted_names(started_cluster, storage_type):
f"SELECT `my.struct.some_dot.separated_parent.weird.field` FROM {TABLE_NAME} ORDER BY id"
).strip()
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"


@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure"])
def test_object_storage_remote_initiator(started_cluster, format_version, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = (
"test_single_iceberg_file_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)

write_iceberg_from_df(spark, generate_data(spark, 0, 100), TABLE_NAME)

default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)

create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)

assert instance.query(f"SELECT * FROM {TABLE_NAME}") == instance.query(
"SELECT number, toString(number + 1) FROM numbers(100)"
)

expected_result = instance.query(
f"""
SELECT * from {TABLE_NAME}
ORDER BY ALL
"""
).strip()

# Simple cluster
query_id = uuid.uuid4().hex
result = instance.query(
f"""
SELECT * from {TABLE_NAME}
ORDER BY ALL
SETTINGS object_storage_remote_initiator=1
""",
query_id = query_id,
).strip()

assert result == expected_result

instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
queries = instance.query(
f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
FORMAT TSV
"""
).splitlines()

# non cluster request - only on initial node
assert queries == ["1"]

# Remote initiator. Use 'cluster_second' to avoid choosing node1 as initiator, it can reduce number of subqueries
query_id = uuid.uuid4().hex

result = instance.query(
f"""
SELECT * from {TABLE_NAME}
ORDER BY ALL
SETTINGS
object_storage_remote_initiator=1,
object_storage_cluster='cluster_second'
""",
query_id = query_id,
).strip()

assert result == expected_result

instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
queries = instance.query(
f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
FORMAT TSV
"""
).splitlines()

# initial node + describe table + remote initiator + 2 subqueries on replicas
assert queries == ["5"]


query_id = uuid.uuid4().hex

result = instance.query(
f"""
SELECT * from {TABLE_NAME}
ORDER BY ALL
SETTINGS
object_storage_remote_initiator=1,
object_storage_cluster='cluster_second',
object_storage_remote_initiator_cluster='cluster_second'
""",
query_id = query_id,
).strip()

assert result == expected_result

instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
queries = instance.query(
f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
FORMAT TSV
"""
).splitlines()

# initial node + describe table + remote initiator + 2 subqueries on replicas
assert queries == ["5"]

if storage_type == 's3':
table_function = f"""
icebergS3(
s3,
filename = 'iceberg_data/default/{TABLE_NAME}/',
format = Parquet,
url = 'http://minio1:9001/root/')
"""
table_function_with_structure = f"""
icebergS3(
s3,
filename = 'iceberg_data/default/{TABLE_NAME}/',
format = Parquet,
url = 'http://minio1:9001/root/',
structure = 'a Int64, b Int64')
"""
else: # azure
azure_storage_account_url = started_cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_account_name = "devstoreaccount1"
azure_account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
table_function = f"""
icebergAzure(
azure,
storage_account_url = '{azure_storage_account_url}',
container = mycontainer,
blob_path = '/iceberg_data/default/{TABLE_NAME}/',
account_name = '{azure_account_name}',
account_key = '{azure_account_key}',
format = Parquet)
"""
table_function_with_structure = f"""
icebergAzure(
azure,
storage_account_url = '{azure_storage_account_url}',
container = mycontainer,
blob_path = '/iceberg_data/default/{TABLE_NAME}/',
account_name = '{azure_account_name}',
account_key = '{azure_account_key}',
format = Parquet,
structure = 'a Int64, b Int64')
"""


query_id = uuid.uuid4().hex
result = instance.query(f"SELECT * FROM {table_function} ORDER BY ALL", query_id=query_id).strip()

assert result == expected_result

instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
queries = instance.query(
f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
FORMAT TSV
"""
).splitlines()

# initial node
assert queries == ["1"]

query_id = uuid.uuid4().hex
result = instance.query(f"SELECT * FROM {table_function} ORDER BY ALL SETTINGS object_storage_remote_initiator=1", query_id=query_id).strip()

assert result == expected_result

instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
queries = instance.query(
f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
FORMAT TSV
"""
).splitlines()

# initial node
assert queries == ["1"]

query_id = uuid.uuid4().hex
result = instance.query(f"SELECT * FROM {table_function} ORDER BY ALL SETTINGS object_storage_remote_initiator=1, object_storage_cluster='cluster_second'", query_id=query_id).strip()

assert result == expected_result

instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
queries = instance.query(
f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
FORMAT TSV
"""
).splitlines()

# initial node + describe table + remote initiator + 2 subqueries on replicas
assert queries == ["5"]

query_id = uuid.uuid4().hex
result = instance.query(f"SELECT * FROM {table_function_with_structure} ORDER BY ALL", query_id=query_id).strip()

assert result == expected_result

instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
queries = instance.query(
f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
FORMAT TSV
"""
).splitlines()

# initial node
assert queries == ["1"]

query_id = uuid.uuid4().hex
result = instance.query(f"SELECT * FROM {table_function_with_structure} ORDER BY ALL SETTINGS object_storage_remote_initiator=1", query_id=query_id).strip()

assert result == expected_result

instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
queries = instance.query(
f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
FORMAT TSV
"""
).splitlines()

# initial node
assert queries == ["1"]

query_id = uuid.uuid4().hex
result = instance.query(f"SELECT * FROM {table_function_with_structure} ORDER BY ALL SETTINGS object_storage_remote_initiator=1, object_storage_cluster='cluster_second'", query_id=query_id).strip()

assert result == expected_result

instance.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
queries = instance.query(
f"""
SELECT count()
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
FORMAT TSV
"""
).splitlines()

# initial node + describe table + remote initiator + 2 subqueries on replicas
assert queries == ["5"]
Loading