From d3be441772faad3f3d3ea2c1a638a44fa4d0e213 Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Tue, 21 Apr 2026 19:05:21 +0800 Subject: [PATCH 1/4] fix Signed-off-by: Weihao Li <18110526956@163.com> --- .../TCompressedElasticFramedTransport.java | 3 ++- .../iotdb/rpc/TElasticFramedTransport.java | 23 +++++++++++++++---- .../org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../FragmentInstanceDispatcherImpl.java | 11 +++++++++ 4 files changed, 33 insertions(+), 5 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java index 5b9c81ec58b91..62abc28e47023 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java @@ -44,7 +44,7 @@ protected TCompressedElasticFramedTransport( protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); - checkFrameSize(size); + validateFrame(size); readBuffer.fill(underlying, size); RpcStat.readCompressedBytes.addAndGet(size); try { @@ -69,6 +69,7 @@ public void flush() throws TTransportException { writeCompressBuffer.resizeIfNecessary(maxCompressedLength); int compressedLength = compress(writeBuffer.getBuffer(), 0, length, writeCompressBuffer.getBuffer(), 0); + checkWriteFrameSize(compressedLength); RpcStat.writeCompressedBytes.addAndGet(compressedLength); TFramedTransport.encodeFrameSize(compressedLength, i32buf); underlying.write(i32buf, 0, 4); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java index 31e0f0b696067..f1d46dbb4344e 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java @@ -174,11 +174,11 @@ public int read(byte[] buf, int off, int len) throws TTransportException { protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); - checkFrameSize(size); + validateFrame(size); readBuffer.fill(underlying, size); } - protected void checkFrameSize(int size) throws TTransportException { + protected void validateFrame(int size) throws TTransportException { final int HTTP_GET_SIGNATURE = 0x47455420; // "GET " final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST" final int TLS_MIN_VERSION = 0x160300; @@ -196,8 +196,6 @@ protected void checkFrameSize(int size) throws TTransportException { error = FrameError.TLS_REQUEST; } else if (size < 0) { error = FrameError.NEGATIVE_FRAME_SIZE; - } else if (size > thriftMaxFrameSize) { - error = FrameError.FRAME_SIZE_EXCEEDED; } } @@ -241,9 +239,26 @@ void throwException(int size, String remoteInfo, int maxSize) throws TTransportE } } + protected void checkWriteFrameSize(int size) throws TTransportException { + if (size <= thriftMaxFrameSize) { + return; + } + SocketAddress remoteAddress = null; + if (underlying instanceof TSocket) { + remoteAddress = ((TSocket) underlying).getSocket().getRemoteSocketAddress(); + } + String remoteInfo = (remoteAddress == null) ? "" : " to " + remoteAddress; + String message = + String.format( + FrameError.FRAME_SIZE_EXCEEDED.messageFormat, size, thriftMaxFrameSize, remoteInfo); + close(); + throw new TTransportException(TTransportException.CORRUPTED_DATA, message); + } + @Override public void flush() throws TTransportException { int length = writeBuffer.getPos(); + checkWriteFrameSize(length); TFramedTransport.encodeFrameSize(length, i32buf); underlying.write(i32buf, 0, 4); underlying.write(writeBuffer.getBuffer(), 0, length); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index f9e750f4012ba..9fb0bb2f46493 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -51,6 +51,7 @@ public enum TSStatusCode { INTERNAL_SERVER_ERROR(305), DISPATCH_ERROR(306), LICENSE_ERROR(307), + THRIFT_FRAME_OVERSIZE(308), // Client, REDIRECTION_RECOMMEND(400), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index c52f8f94eb2a7..dba36cc299330 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -59,6 +59,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.Preconditions; @@ -549,6 +550,16 @@ private void dispatchRemoteHelper(final FragmentInstance instance, final TEndPoi TSStatusCode.EXECUTE_STATEMENT_ERROR, String.format("unknown read type [%s]", instance.getType()))); } + } catch (TException e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + if (rootCause instanceof TTransportException + && ((TTransportException) rootCause).getType() == TTransportException.CORRUPTED_DATA) { + queryContext.addFailedEndPoint(endPoint); + throw new FragmentInstanceDispatchException( + new TSStatus(TSStatusCode.THRIFT_FRAME_OVERSIZE.getStatusCode()) + .setMessage(rootCause.getMessage())); + } + throw e; } } From e8ff4f71dde52db561c57292d7d10409a1bc2595 Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Tue, 21 Apr 2026 20:02:31 +0800 Subject: [PATCH 2/4] fix Signed-off-by: Weihao Li <18110526956@163.com> --- .../iotdb/db/protocol/thrift/OperationType.java | 3 ++- .../FragmentInstanceDispatcherImpl.java | 8 ++++---- .../iotdb/db/utils/ErrorHandlingUtils.java | 16 ++++++++++++++-- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java index 881e823ef2d67..94b5dcadecbcd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java @@ -58,7 +58,8 @@ public enum OperationType { DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"), GET_EARLIEST_TIMESLOTS("getEarliestTimeslots"), GENERATE_DATA_PARTITION_TABLE("generateDataPartitionTable"), - CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"); + CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"), + DISPATCH_FRAGMENT_INSTANCE("dispatchFragmentInstance"); private final String name; OperationType(String name) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index dba36cc299330..ac51413b794de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -79,6 +79,7 @@ import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onThriftFrameOversizeException; public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { @@ -554,10 +555,9 @@ private void dispatchRemoteHelper(final FragmentInstance instance, final TEndPoi Throwable rootCause = ExceptionUtils.getRootCause(e); if (rootCause instanceof TTransportException && ((TTransportException) rootCause).getType() == TTransportException.CORRUPTED_DATA) { - queryContext.addFailedEndPoint(endPoint); - throw new FragmentInstanceDispatchException( - new TSStatus(TSStatusCode.THRIFT_FRAME_OVERSIZE.getStatusCode()) - .setMessage(rootCause.getMessage())); + // Don't set DISPATCH_ERROR status to avoid retry if dispatch failed because of thrift frame + // is oversize + throw new FragmentInstanceDispatchException(onThriftFrameOversizeException(rootCause)); } throw e; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 34d2e9bf82cbf..094de83d3f272 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException; import static org.apache.iotdb.commons.utils.StatusUtils.needRetry; +import static org.apache.iotdb.db.protocol.thrift.OperationType.DISPATCH_FRAGMENT_INSTANCE; public class ErrorHandlingUtils { @@ -61,10 +62,11 @@ private ErrorHandlingUtils() {} "The read statement is not allowed in batch: "; private static final String ERROR_OPERATION_LOG = "Status code: {}, operation: {} failed"; + private static final String EXCEPTION_PATTERN = "[%s] Exception occurred: %s failed. "; public static TSStatus onNpeOrUnexpectedException( Exception e, String operation, TSStatusCode statusCode) { - String message = String.format("[%s] Exception occurred: %s failed. ", statusCode, operation); + String message = String.format(EXCEPTION_PATTERN, statusCode, operation); if (e instanceof IOException || e instanceof NullPointerException) { LOGGER.error(ERROR_OPERATION_LOG, statusCode, operation, e); } else { @@ -88,6 +90,16 @@ public static TSStatus onNpeOrUnexpectedException( return onNpeOrUnexpectedException(e, operation.getName(), statusCode); } + public static TSStatus onThriftFrameOversizeException(Throwable t) { + TSStatus status = + new TSStatus(TSStatusCode.THRIFT_FRAME_OVERSIZE.getStatusCode()).setNeedRetry(false); + String message = + String.format(EXCEPTION_PATTERN, status, DISPATCH_FRAGMENT_INSTANCE) + + ErrorHandlingCommonUtils.getRootCause(t).getMessage(); + LOGGER.warn(message); + return status.setMessage(message); + } + public static TSStatus onQueryException(Exception e, String operation, TSStatusCode statusCode) { TSStatus status = tryCatchQueryException(e); if (status != null) { @@ -139,7 +151,7 @@ public static TSStatus onQueryException(Exception e, OperationType operation) { return onQueryException(e, operation.getName()); } - private static TSStatus tryCatchQueryException(Exception e) { + private static TSStatus tryCatchQueryException(Throwable e) { Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e); // ignore logging sg not ready exception if (rootCause instanceof StorageGroupNotReadyException) { From f2ef9da7fc1ac47d8ad42a2bc1afe108e0a3e4af Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Fri, 24 Apr 2026 11:35:00 +0800 Subject: [PATCH 3/4] restore Signed-off-by: Weihao Li <18110526956@163.com> --- .../main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java index f1d46dbb4344e..955fdd01a9ea1 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java @@ -196,6 +196,8 @@ protected void validateFrame(int size) throws TTransportException { error = FrameError.TLS_REQUEST; } else if (size < 0) { error = FrameError.NEGATIVE_FRAME_SIZE; + } else if (size > thriftMaxFrameSize) { + error = FrameError.FRAME_SIZE_EXCEEDED; } } From ddb5e5bc74e9f933ad4bff0f0c6dc0fc044e30b7 Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Fri, 24 Apr 2026 11:36:51 +0800 Subject: [PATCH 4/4] restore some Signed-off-by: Weihao Li <18110526956@163.com> --- .../main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 094de83d3f272..95e7417821cb0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -151,7 +151,7 @@ public static TSStatus onQueryException(Exception e, OperationType operation) { return onQueryException(e, operation.getName()); } - private static TSStatus tryCatchQueryException(Throwable e) { + private static TSStatus tryCatchQueryException(Exception e) { Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e); // ignore logging sg not ready exception if (rootCause instanceof StorageGroupNotReadyException) {