Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected TCompressedElasticFramedTransport(
protected void readFrame() throws TTransportException {
underlying.readAll(i32buf, 0, 4);
int size = TFramedTransport.decodeFrameSize(i32buf);
checkFrameSize(size);
validateFrame(size);
readBuffer.fill(underlying, size);
RpcStat.readCompressedBytes.addAndGet(size);
try {
Expand All @@ -69,6 +69,7 @@ public void flush() throws TTransportException {
writeCompressBuffer.resizeIfNecessary(maxCompressedLength);
int compressedLength =
compress(writeBuffer.getBuffer(), 0, length, writeCompressBuffer.getBuffer(), 0);
checkWriteFrameSize(compressedLength);
RpcStat.writeCompressedBytes.addAndGet(compressedLength);
TFramedTransport.encodeFrameSize(compressedLength, i32buf);
underlying.write(i32buf, 0, 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,11 @@ public int read(byte[] buf, int off, int len) throws TTransportException {
protected void readFrame() throws TTransportException {
underlying.readAll(i32buf, 0, 4);
int size = TFramedTransport.decodeFrameSize(i32buf);
checkFrameSize(size);
validateFrame(size);
readBuffer.fill(underlying, size);
}

protected void checkFrameSize(int size) throws TTransportException {
protected void validateFrame(int size) throws TTransportException {
final int HTTP_GET_SIGNATURE = 0x47455420; // "GET "
final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST"
final int TLS_MIN_VERSION = 0x160300;
Expand Down Expand Up @@ -241,9 +241,26 @@ void throwException(int size, String remoteInfo, int maxSize) throws TTransportE
}
}

protected void checkWriteFrameSize(int size) throws TTransportException {
if (size <= thriftMaxFrameSize) {
return;
}
SocketAddress remoteAddress = null;
if (underlying instanceof TSocket) {
remoteAddress = ((TSocket) underlying).getSocket().getRemoteSocketAddress();
}
String remoteInfo = (remoteAddress == null) ? "" : " to " + remoteAddress;
String message =
String.format(
FrameError.FRAME_SIZE_EXCEEDED.messageFormat, size, thriftMaxFrameSize, remoteInfo);
close();
throw new TTransportException(TTransportException.CORRUPTED_DATA, message);
}

@Override
public void flush() throws TTransportException {
int length = writeBuffer.getPos();
checkWriteFrameSize(length);
TFramedTransport.encodeFrameSize(length, i32buf);
underlying.write(i32buf, 0, 4);
underlying.write(writeBuffer.getBuffer(), 0, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public enum TSStatusCode {
INTERNAL_SERVER_ERROR(305),
DISPATCH_ERROR(306),
LICENSE_ERROR(307),
THRIFT_FRAME_OVERSIZE(308),

// Client,
REDIRECTION_RECOMMEND(400),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public enum OperationType {
DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"),
GET_EARLIEST_TIMESLOTS("getEarliestTimeslots"),
GENERATE_DATA_PARTITION_TABLE("generateDataPartitionTable"),
CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus");
CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"),
DISPATCH_FRAGMENT_INSTANCE("dispatchFragmentInstance");
private final String name;

OperationType(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.Preconditions;
Expand All @@ -78,6 +79,7 @@

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onThriftFrameOversizeException;

public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {

Expand Down Expand Up @@ -459,7 +461,7 @@
return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
}

private void dispatchRemoteHelper(final FragmentInstance instance, final TEndPoint endPoint)

Check warning on line 464 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 91 to 64, Complexity from 20 to 14, Nesting Level from 5 to 2, Number of Variables from 10 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ2vxNiaqg995FU64HAN&open=AZ2vxNiaqg995FU64HAN&pullRequest=17536
throws FragmentInstanceDispatchException,
TException,
ClientManagerException,
Expand Down Expand Up @@ -549,6 +551,15 @@
TSStatusCode.EXECUTE_STATEMENT_ERROR,
String.format("unknown read type [%s]", instance.getType())));
}
} catch (TException e) {
Throwable rootCause = ExceptionUtils.getRootCause(e);
if (rootCause instanceof TTransportException
&& ((TTransportException) rootCause).getType() == TTransportException.CORRUPTED_DATA) {
// Don't set DISPATCH_ERROR status to avoid retry if dispatch failed because of thrift frame
// is oversize
throw new FragmentInstanceDispatchException(onThriftFrameOversizeException(rootCause));
}
throw e;
}
}

Expand Down Expand Up @@ -591,7 +602,7 @@
}
}

private void dispatchLocally(final FragmentInstance instance)

Check warning on line 605 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 80 to 64, Complexity from 18 to 14, Nesting Level from 4 to 2, Number of Variables from 9 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ2vxNiaqg995FU64HAO&open=AZ2vxNiaqg995FU64HAO&pullRequest=17536
throws FragmentInstanceDispatchException {
// deserialize ConsensusGroupId
ConsensusGroupId groupId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ExecutionException;

import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
import static org.apache.iotdb.db.protocol.thrift.OperationType.DISPATCH_FRAGMENT_INSTANCE;

public class ErrorHandlingUtils {

Expand All @@ -61,10 +62,11 @@ private ErrorHandlingUtils() {}
"The read statement is not allowed in batch: ";

private static final String ERROR_OPERATION_LOG = "Status code: {}, operation: {} failed";
private static final String EXCEPTION_PATTERN = "[%s] Exception occurred: %s failed. ";

public static TSStatus onNpeOrUnexpectedException(
Exception e, String operation, TSStatusCode statusCode) {
String message = String.format("[%s] Exception occurred: %s failed. ", statusCode, operation);
String message = String.format(EXCEPTION_PATTERN, statusCode, operation);
if (e instanceof IOException || e instanceof NullPointerException) {
LOGGER.error(ERROR_OPERATION_LOG, statusCode, operation, e);
} else {
Expand All @@ -88,6 +90,16 @@ public static TSStatus onNpeOrUnexpectedException(
return onNpeOrUnexpectedException(e, operation.getName(), statusCode);
}

public static TSStatus onThriftFrameOversizeException(Throwable t) {
TSStatus status =
new TSStatus(TSStatusCode.THRIFT_FRAME_OVERSIZE.getStatusCode()).setNeedRetry(false);
String message =
String.format(EXCEPTION_PATTERN, status, DISPATCH_FRAGMENT_INSTANCE)
+ ErrorHandlingCommonUtils.getRootCause(t).getMessage();
LOGGER.warn(message);
return status.setMessage(message);
}

public static TSStatus onQueryException(Exception e, String operation, TSStatusCode statusCode) {
TSStatus status = tryCatchQueryException(e);
if (status != null) {
Expand Down
Loading