diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java index 5b9c81ec58b9..62abc28e4702 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java @@ -44,7 +44,7 @@ protected TCompressedElasticFramedTransport( protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); - checkFrameSize(size); + validateFrame(size); readBuffer.fill(underlying, size); RpcStat.readCompressedBytes.addAndGet(size); try { @@ -69,6 +69,7 @@ public void flush() throws TTransportException { writeCompressBuffer.resizeIfNecessary(maxCompressedLength); int compressedLength = compress(writeBuffer.getBuffer(), 0, length, writeCompressBuffer.getBuffer(), 0); + checkWriteFrameSize(compressedLength); RpcStat.writeCompressedBytes.addAndGet(compressedLength); TFramedTransport.encodeFrameSize(compressedLength, i32buf); underlying.write(i32buf, 0, 4); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java index 31e0f0b69606..955fdd01a9ea 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java @@ -174,11 +174,11 @@ public int read(byte[] buf, int off, int len) throws TTransportException { protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); - checkFrameSize(size); + validateFrame(size); readBuffer.fill(underlying, size); } - protected void checkFrameSize(int size) throws TTransportException { + protected void validateFrame(int size) throws TTransportException { final int HTTP_GET_SIGNATURE = 0x47455420; // "GET " final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST" final int TLS_MIN_VERSION = 0x160300; @@ -241,9 +241,26 @@ void throwException(int size, String remoteInfo, int maxSize) throws TTransportE } } + protected void checkWriteFrameSize(int size) throws TTransportException { + if (size <= thriftMaxFrameSize) { + return; + } + SocketAddress remoteAddress = null; + if (underlying instanceof TSocket) { + remoteAddress = ((TSocket) underlying).getSocket().getRemoteSocketAddress(); + } + String remoteInfo = (remoteAddress == null) ? "" : " to " + remoteAddress; + String message = + String.format( + FrameError.FRAME_SIZE_EXCEEDED.messageFormat, size, thriftMaxFrameSize, remoteInfo); + close(); + throw new TTransportException(TTransportException.CORRUPTED_DATA, message); + } + @Override public void flush() throws TTransportException { int length = writeBuffer.getPos(); + checkWriteFrameSize(length); TFramedTransport.encodeFrameSize(length, i32buf); underlying.write(i32buf, 0, 4); underlying.write(writeBuffer.getBuffer(), 0, length); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index f9e750f4012b..9fb0bb2f4649 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -51,6 +51,7 @@ public enum TSStatusCode { INTERNAL_SERVER_ERROR(305), DISPATCH_ERROR(306), LICENSE_ERROR(307), + THRIFT_FRAME_OVERSIZE(308), // Client, REDIRECTION_RECOMMEND(400), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java index 881e823ef2d6..94b5dcadecbc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java @@ -58,7 +58,8 @@ public enum OperationType { DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"), GET_EARLIEST_TIMESLOTS("getEarliestTimeslots"), GENERATE_DATA_PARTITION_TABLE("generateDataPartitionTable"), - CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"); + CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"), + DISPATCH_FRAGMENT_INSTANCE("dispatchFragmentInstance"); private final String name; OperationType(String name) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index c52f8f94eb2a..ac51413b794d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -59,6 +59,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.Preconditions; @@ -78,6 +79,7 @@ import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onThriftFrameOversizeException; public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { @@ -549,6 +551,15 @@ private void dispatchRemoteHelper(final FragmentInstance instance, final TEndPoi TSStatusCode.EXECUTE_STATEMENT_ERROR, String.format("unknown read type [%s]", instance.getType()))); } + } catch (TException e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + if (rootCause instanceof TTransportException + && ((TTransportException) rootCause).getType() == TTransportException.CORRUPTED_DATA) { + // Don't set DISPATCH_ERROR status to avoid retry if dispatch failed because of thrift frame + // is oversize + throw new FragmentInstanceDispatchException(onThriftFrameOversizeException(rootCause)); + } + throw e; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 34d2e9bf82cb..95e7417821cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException; import static org.apache.iotdb.commons.utils.StatusUtils.needRetry; +import static org.apache.iotdb.db.protocol.thrift.OperationType.DISPATCH_FRAGMENT_INSTANCE; public class ErrorHandlingUtils { @@ -61,10 +62,11 @@ private ErrorHandlingUtils() {} "The read statement is not allowed in batch: "; private static final String ERROR_OPERATION_LOG = "Status code: {}, operation: {} failed"; + private static final String EXCEPTION_PATTERN = "[%s] Exception occurred: %s failed. "; public static TSStatus onNpeOrUnexpectedException( Exception e, String operation, TSStatusCode statusCode) { - String message = String.format("[%s] Exception occurred: %s failed. ", statusCode, operation); + String message = String.format(EXCEPTION_PATTERN, statusCode, operation); if (e instanceof IOException || e instanceof NullPointerException) { LOGGER.error(ERROR_OPERATION_LOG, statusCode, operation, e); } else { @@ -88,6 +90,16 @@ public static TSStatus onNpeOrUnexpectedException( return onNpeOrUnexpectedException(e, operation.getName(), statusCode); } + public static TSStatus onThriftFrameOversizeException(Throwable t) { + TSStatus status = + new TSStatus(TSStatusCode.THRIFT_FRAME_OVERSIZE.getStatusCode()).setNeedRetry(false); + String message = + String.format(EXCEPTION_PATTERN, status, DISPATCH_FRAGMENT_INSTANCE) + + ErrorHandlingCommonUtils.getRootCause(t).getMessage(); + LOGGER.warn(message); + return status.setMessage(message); + } + public static TSStatus onQueryException(Exception e, String operation, TSStatusCode statusCode) { TSStatus status = tryCatchQueryException(e); if (status != null) {