diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index d8c2bccaa97c9..12e81e15aa633 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -380,8 +380,11 @@ private void generateColumnIndexMapper( // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c // in this case, all data can be matched without checking the measurements if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) { + int filteredCount = 0; for (int i = 0; i < originColumnSize; i++) { - originColumnIndex2FilteredColumnIndexMapperList[i] = i; + if (originMeasurementList[i] != null) { + originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java index 48c2f65a46f84..c5a697975076f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java @@ -100,7 +100,13 @@ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) { @Override public void transferType(ZoneId zoneId) throws QueryProcessException { + if (measurementSchemas == null) { + return; + } for (int i = 0; i < measurementSchemas.length; i++) { + if (!isColumnPresent(i) || dataTypes == null || i >= dataTypes.length) { + continue; + } // null when time series doesn't exist if (measurementSchemas[i] == null) { if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { @@ -119,6 +125,9 @@ public void transferType(ZoneId zoneId) throws QueryProcessException { // parse string value to specific type dataTypes[i] = measurementSchemas[i].getType(); + if (values == null || i >= values.length || values[i] == null) { + continue; + } try { values[i] = ValueConverter.parse(values[i].toString(), dataTypes[i]); } catch (Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java index 9616777b021f2..06039752ff02b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java @@ -488,6 +488,9 @@ private static long sizeOfBitMap(final BitMap bitMaps) { public static long sizeOfColumns( final Object[] columns, final MeasurementSchema[] measurementSchemas) { + if (Objects.isNull(columns)) { + return 0L; + } // Directly calculate if measurementSchemas are absent if (Objects.isNull(measurementSchemas)) { return RamUsageEstimator.shallowSizeOf(columns) @@ -499,7 +502,10 @@ public static long sizeOfColumns( RamUsageEstimator.alignObjectSize( NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns.length); for (int i = 0; i < columns.length; i++) { - if (measurementSchemas[i] == null || measurementSchemas[i].getType() == null) { + if (columns[i] == null + || i >= measurementSchemas.length + || measurementSchemas[i] == null + || measurementSchemas[i].getType() == null) { continue; } switch (measurementSchemas[i].getType()) { @@ -563,6 +569,9 @@ private static long getBinarySize(final Binary[] binaries) { public static long sizeOfValues( final Object[] values, final MeasurementSchema[] measurementSchemas) { + if (Objects.isNull(values)) { + return 0L; + } // Directly calculate if measurementSchemas are absent if (Objects.isNull(measurementSchemas)) { return RamUsageEstimator.shallowSizeOf(values) @@ -574,7 +583,9 @@ public static long sizeOfValues( RamUsageEstimator.alignObjectSize( NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * values.length); for (int i = 0; i < values.length; i++) { - if (measurementSchemas[i] == null || measurementSchemas[i].getType() == null) { + if (i >= measurementSchemas.length + || measurementSchemas[i] == null + || measurementSchemas[i].getType() == null) { size += NUM_BYTES_OBJECT_HEADER; continue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java index af39cef95e695..fed150210d7eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java @@ -166,17 +166,24 @@ public ClusterSchemaTree getMatchedNormalSchema(final PartialPath fullPath) { public List computeWithoutTemplate(final ISchemaComputation schemaComputation) { final List indexOfMissingMeasurements = new ArrayList<>(); final String[] measurements = schemaComputation.getMeasurements(); + if (measurements == null) { + return indexOfMissingMeasurements; + } final IDeviceSchema schema = deviceSchemaCache.getDeviceSchema(schemaComputation.getDevicePath()); if (!(schema instanceof DeviceNormalSchema)) { - return IntStream.range(0, schemaComputation.getMeasurements().length) + return IntStream.range(0, measurements.length) + .filter(i -> measurements[i] != null) .boxed() .collect(Collectors.toList()); } final DeviceNormalSchema treeSchema = (DeviceNormalSchema) schema; - for (int i = 0; i < schemaComputation.getMeasurements().length; i++) { + for (int i = 0; i < measurements.length; i++) { + if (measurements[i] == null) { + continue; + } final SchemaCacheEntry value = treeSchema.getSchemaCacheEntry(measurements[i]); if (value == null) { indexOfMissingMeasurements.add(i); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java index 6d50905bbcac3..a46d784530eea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java @@ -67,7 +67,9 @@ public int update(final String[] measurements, final IMeasurementSchema[] schema for (int i = 0; i < length; ++i) { // Skip this to avoid instance creation/gc for writing performance - if (measurements[i] == null || measurementMap.containsKey(measurements[i])) { + if (measurements[i] == null + || schemas[i] == null + || measurementMap.containsKey(measurements[i])) { continue; } diff += putEntry(measurements[i], schemas[i], null); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/NormalSchemaFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/NormalSchemaFetcher.java index 7607b22dd47de..e81abb85769e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/NormalSchemaFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/NormalSchemaFetcher.java @@ -337,7 +337,9 @@ void processNormalTimeSeries( schemaComputationWithAutoCreationList.stream() .map( o -> { - TSDataType[] dataTypes = new TSDataType[o.getMeasurements().length]; + final String[] measurements = o.getMeasurements(); + TSDataType[] dataTypes = + new TSDataType[measurements == null ? 0 : measurements.length]; for (int i = 0, length = dataTypes.length; i < length; i++) { dataTypes[i] = o.getDataType(i); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index aee5cc1ec9384..f81797399b9cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -168,7 +168,7 @@ public TSDataType[] getDataTypes() { } public TSDataType getDataType(int index) { - return dataTypes[index]; + return dataTypes == null || index < 0 || index >= dataTypes.length ? null : dataTypes[index]; } public void setDataTypes(TSDataType[] dataTypes) { @@ -318,9 +318,12 @@ private static Map deserializeProps(ByteBuffer buffer) { /** Serialized size of measurement schemas, ignoring failed time series */ protected int serializeMeasurementSchemasSize() { int byteLen = 0; - for (int i = 0; i < measurements.length; i++) { + for (int i = 0; measurements != null && i < measurements.length; i++) { // ignore failed partial insert - if (measurements[i] == null) { + if (measurements[i] == null + || measurementSchemas == null + || i >= measurementSchemas.length + || measurementSchemas[i] == null) { continue; } byteLen += WALWriteUtils.sizeToWrite(measurementSchemas[i]); @@ -330,9 +333,12 @@ protected int serializeMeasurementSchemasSize() { /** Serialize measurement schemas, ignoring failed time series */ protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) { - for (int i = 0; i < measurements.length; i++) { + for (int i = 0; measurements != null && i < measurements.length; i++) { // ignore failed partial insert - if (measurements[i] == null) { + if (measurements[i] == null + || measurementSchemas == null + || i >= measurementSchemas.length + || measurementSchemas[i] == null) { continue; } WALWriteUtils.write(measurementSchemas[i], buffer); @@ -373,8 +379,8 @@ public void markFailedMeasurement(int index) { } public boolean hasValidMeasurements() { - for (Object o : measurements) { - if (o != null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (!isMeasurementFailed(i)) { return true; } } @@ -389,11 +395,25 @@ public int getFailedMeasurementNumber() { return failedMeasurementNumber; } - public boolean allMeasurementFailed() { - if (measurements != null) { - return failedMeasurementNumber >= measurements.length; + protected int getValidMeasurementNumber() { + int validMeasurementNumber = 0; + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (!isMeasurementFailed(i)) { + validMeasurementNumber++; + } } - return true; + return validMeasurementNumber; + } + + public boolean isMeasurementFailed(int index) { + return measurements == null + || index < 0 + || index >= measurements.length + || measurements[index] == null; + } + + public boolean allMeasurementFailed() { + return measurements == null || !hasValidMeasurements(); } public String[] getRawMeasurements() { @@ -401,7 +421,9 @@ public String[] getRawMeasurements() { MeasurementSchema[] measurementSchemas = getMeasurementSchemas(); String[] rawMeasurements = new String[measurements.length]; for (int i = 0; i < measurements.length; i++) { - if (measurementSchemas[i] != null) { + if (measurementSchemas != null + && i < measurementSchemas.length + && measurementSchemas[i] != null) { // get raw measurement rather than alias rawMeasurements[i] = measurementSchemas[i].getMeasurementId(); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index 23c17808d493c..866f39f895b9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java @@ -61,7 +61,8 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { private static final byte TYPE_RAW_STRING = -1; - private static final byte TYPE_NULL = -2; + private static final byte TYPE_NULL_WITHOUT_TYPE = -2; + private static final byte TYPE_NULL_WITH_TYPE = -3; private static final String UNSUPPORTED_DATA_TYPE = "Unsupported data type: "; @@ -167,9 +168,13 @@ public List getOutputColumnNames() { @Override public TSDataType[] getDataTypes() { if (isNeedInferType) { - TSDataType[] predictedDataTypes = new TSDataType[dataTypes.length]; - for (int i = 0; i < dataTypes.length; i++) { - predictedDataTypes[i] = TypeInferenceUtils.getPredictedDataType(values[i], true); + TSDataType[] predictedDataTypes = + new TSDataType + [dataTypes == null ? (values == null ? 0 : values.length) : dataTypes.length]; + for (int i = 0; i < predictedDataTypes.length; i++) { + predictedDataTypes[i] = + TypeInferenceUtils.getPredictedDataType( + values != null && i < values.length ? values[i] : null, true); } return predictedDataTypes; } @@ -180,9 +185,10 @@ public TSDataType[] getDataTypes() { @Override public TSDataType getDataType(int index) { if (isNeedInferType) { - return TypeInferenceUtils.getPredictedDataType(values[index], true); + return TypeInferenceUtils.getPredictedDataType( + values != null && index >= 0 && index < values.length ? values[index] : null, true); } else { - return dataTypes[index]; + return getDataTypeIfPresent(index); } } @@ -217,12 +223,84 @@ public List getTimePartitionSlots() { @Override public void markFailedMeasurement(int index) { - if (measurements[index] == null) { + if (measurements == null + || index < 0 + || index >= measurements.length + || measurements[index] == null) { return; } measurements[index] = null; - dataTypes[index] = null; - values[index] = null; + if (dataTypes != null && index < dataTypes.length) { + dataTypes[index] = null; + } + if (values != null && index < values.length) { + values[index] = null; + } + } + + @Override + protected int getValidMeasurementNumber() { + int validMeasurementNumber = 0; + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurement(i)) { + validMeasurementNumber++; + } + } + return validMeasurementNumber; + } + + protected int getValidMeasurementNumberForWAL() { + int validMeasurementNumber = 0; + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurementToWAL(i)) { + validMeasurementNumber++; + } + } + return validMeasurementNumber; + } + + @Override + protected int serializeMeasurementSchemasSize() { + int byteLen = 0; + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurementToWAL(i)) { + byteLen += WALWriteUtils.sizeToWrite(measurementSchemas[i]); + } + } + return byteLen; + } + + @Override + protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurementToWAL(i)) { + WALWriteUtils.write(measurementSchemas[i], buffer); + } + } + } + + private boolean shouldSerializeMeasurement(final int index) { + return measurements != null + && index >= 0 + && index < measurements.length + && measurements[index] != null + && values != null + && index < values.length + && (measurementSchemas == null + || index < measurementSchemas.length && measurementSchemas[index] != null) + && (values[index] == null || isNeedInferType || getDataTypeIfPresent(index) != null); + } + + private boolean shouldSerializeMeasurementToWAL(final int index) { + return shouldSerializeMeasurement(index) + && measurementSchemas != null + && index < measurementSchemas.length + && measurementSchemas[index] != null + && (values[index] == null || !isNeedInferType && getDataTypeIfPresent(index) != null); + } + + private TSDataType getDataTypeIfPresent(final int index) { + return dataTypes != null && index >= 0 && index < dataTypes.length ? dataTypes[index] : null; } @Override @@ -251,7 +329,7 @@ void subSerialize(DataOutputStream stream) throws IOException { /** Serialize measurements and values, ignoring failed time series. */ void serializeMeasurementsAndValues(ByteBuffer buffer) { - ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), buffer); + ReadWriteIOUtils.write(getValidMeasurementNumber(), buffer); serializeMeasurementsOrSchemas(buffer); putDataTypesAndValues(buffer); ReadWriteIOUtils.write((byte) (isNeedInferType ? 1 : 0), buffer); @@ -265,7 +343,7 @@ void serializeMeasurementsAndValues(ByteBuffer buffer) { * @throws IOException - If an I/O error occurs. */ void serializeMeasurementsAndValues(DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), stream); + ReadWriteIOUtils.write(getValidMeasurementNumber(), stream); serializeMeasurementsOrSchemas(stream); putDataTypesAndValues(stream); ReadWriteIOUtils.write((byte) (isNeedInferType ? 1 : 0), stream); @@ -275,9 +353,9 @@ void serializeMeasurementsAndValues(DataOutputStream stream) throws IOException /** Serialize measurements or measurement schemas, ignoring failed time series. */ private void serializeMeasurementsOrSchemas(ByteBuffer buffer) { ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), buffer); - for (int i = 0; i < measurements.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } // serialize measurement schemas when exist @@ -297,9 +375,9 @@ private void serializeMeasurementsOrSchemas(ByteBuffer buffer) { */ private void serializeMeasurementsOrSchemas(DataOutputStream stream) throws IOException { ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), stream); - for (int i = 0; i < measurements.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } // serialize measurement schemas when exist @@ -318,14 +396,19 @@ private void serializeMeasurementsOrSchemas(DataOutputStream stream) throws IOEx * @throws UnSupportedDataTypeException - If meets unsupported data type. */ private void putDataTypesAndValues(ByteBuffer buffer) { - for (int i = 0; i < values.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; values != null && i < values.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } + final TSDataType dataType = getDataTypeIfPresent(i); // serialize null value if (values[i] == null) { - ReadWriteIOUtils.write(TYPE_NULL, buffer); + ReadWriteIOUtils.write( + dataType == null ? TYPE_NULL_WITHOUT_TYPE : TYPE_NULL_WITH_TYPE, buffer); + if (dataType != null) { + ReadWriteIOUtils.write(dataType, buffer); + } continue; } // types are not determined, the situation mainly occurs when the plan uses string values @@ -334,8 +417,8 @@ private void putDataTypesAndValues(ByteBuffer buffer) { ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer); serializeString(values[i].toString(), buffer); } else { - ReadWriteIOUtils.write(dataTypes[i], buffer); - switch (dataTypes[i]) { + ReadWriteIOUtils.write(dataType, buffer); + switch (dataType) { case BOOLEAN: ReadWriteIOUtils.write((Boolean) values[i], buffer); break; @@ -359,7 +442,7 @@ private void putDataTypesAndValues(ByteBuffer buffer) { ReadWriteIOUtils.write((Binary) values[i], buffer); break; default: - throw new UnSupportedDataTypeException(UNSUPPORTED_DATA_TYPE + dataTypes[i]); + throw new UnSupportedDataTypeException(UNSUPPORTED_DATA_TYPE + dataType); } } } @@ -373,14 +456,19 @@ private void putDataTypesAndValues(ByteBuffer buffer) { * @throws UnSupportedDataTypeException - If meets unsupported data type. */ private void putDataTypesAndValues(DataOutputStream stream) throws IOException { - for (int i = 0; i < values.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; values != null && i < values.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } + final TSDataType dataType = getDataTypeIfPresent(i); // serialize null value if (values[i] == null) { - ReadWriteIOUtils.write(TYPE_NULL, stream); + ReadWriteIOUtils.write( + dataType == null ? TYPE_NULL_WITHOUT_TYPE : TYPE_NULL_WITH_TYPE, stream); + if (dataType != null) { + ReadWriteIOUtils.write(dataType, stream); + } continue; } // types are not determined, the situation mainly occurs when the plan uses string values @@ -389,8 +477,8 @@ private void putDataTypesAndValues(DataOutputStream stream) throws IOException { ReadWriteIOUtils.write(TYPE_RAW_STRING, stream); serializeString(values[i].toString(), stream); } else { - ReadWriteIOUtils.write(dataTypes[i], stream); - switch (dataTypes[i]) { + ReadWriteIOUtils.write(dataType, stream); + switch (dataType) { case BOOLEAN: ReadWriteIOUtils.write((Boolean) values[i], stream); break; @@ -414,7 +502,7 @@ private void putDataTypesAndValues(DataOutputStream stream) throws IOException { ReadWriteIOUtils.write((Binary) values[i], stream); break; default: - throw new UnSupportedDataTypeException(UNSUPPORTED_DATA_TYPE + dataTypes[i]); + throw new UnSupportedDataTypeException(UNSUPPORTED_DATA_TYPE + dataType); } } } @@ -475,8 +563,13 @@ private void fillDataTypesAndValues(ByteBuffer buffer) { // types are not determined, the situation mainly occurs when the node uses string values // and is forwarded to other nodes byte typeNum = (byte) ReadWriteIOUtils.read(buffer); - if (typeNum == TYPE_RAW_STRING || typeNum == TYPE_NULL) { + if (typeNum == TYPE_RAW_STRING + || typeNum == TYPE_NULL_WITHOUT_TYPE + || typeNum == TYPE_NULL_WITH_TYPE) { values[i] = typeNum == TYPE_RAW_STRING ? deserializeString(buffer) : null; + if (typeNum == TYPE_NULL_WITH_TYPE) { + dataTypes[i] = ReadWriteIOUtils.readDataType(buffer); + } continue; } dataTypes[i] = TSDataType.values()[typeNum]; @@ -535,18 +628,22 @@ private int serializeMeasurementsAndValuesSize() { size += serializeMeasurementSchemasSize(); // putValues - for (int i = 0; i < values.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; values != null && i < values.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurementToWAL(i)) { continue; } + final TSDataType dataType = getDataTypeIfPresent(i); // serialize null value if (values[i] == null) { size += Byte.BYTES; + if (dataType != null) { + size += Byte.BYTES; + } continue; } size += Byte.BYTES; - switch (dataTypes[i]) { + switch (dataType) { case BOOLEAN: size += Byte.BYTES; break; @@ -570,7 +667,7 @@ private int serializeMeasurementsAndValuesSize() { size += ReadWriteIOUtils.sizeToWrite((Binary) values[i]); break; default: - throw new UnSupportedDataTypeException(UNSUPPORTED_DATA_TYPE + dataTypes[i]); + throw new UnSupportedDataTypeException(UNSUPPORTED_DATA_TYPE + dataType); } } @@ -597,7 +694,7 @@ protected void subSerialize(IWALByteBufferView buffer) { /** Serialize measurements and values, ignoring failed time series. */ private void serializeMeasurementsAndValues(IWALByteBufferView buffer) { - buffer.putInt(measurements.length - getFailedMeasurementNumber()); + buffer.putInt(getValidMeasurementNumberForWAL()); serializeMeasurementSchemasToWAL(buffer); putDataTypesAndValues(buffer); buffer.put((byte) (isAligned ? 1 : 0)); @@ -610,18 +707,23 @@ private void serializeMeasurementsAndValues(IWALByteBufferView buffer) { * @throws UnSupportedDataTypeException - If meets unsupported data type. */ private void putDataTypesAndValues(IWALByteBufferView buffer) { - for (int i = 0; i < values.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; values != null && i < values.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurementToWAL(i)) { continue; } + final TSDataType dataType = getDataTypeIfPresent(i); // serialize null value if (values[i] == null) { - WALWriteUtils.write(TYPE_NULL, buffer); + WALWriteUtils.write( + dataType == null ? TYPE_NULL_WITHOUT_TYPE : TYPE_NULL_WITH_TYPE, buffer); + if (dataType != null) { + WALWriteUtils.write(dataType, buffer); + } continue; } - WALWriteUtils.write(dataTypes[i], buffer); - switch (dataTypes[i]) { + WALWriteUtils.write(dataType, buffer); + switch (dataType) { case BOOLEAN: WALWriteUtils.write((Boolean) values[i], buffer); break; @@ -645,7 +747,7 @@ private void putDataTypesAndValues(IWALByteBufferView buffer) { WALWriteUtils.write((Binary) values[i], buffer); break; default: - throw new UnSupportedDataTypeException(UNSUPPORTED_DATA_TYPE + dataTypes[i]); + throw new UnSupportedDataTypeException(UNSUPPORTED_DATA_TYPE + dataType); } } } @@ -703,7 +805,13 @@ void deserializeMeasurementsAndValuesFromWAL(DataInputStream stream) throws IOEx public void fillDataTypesAndValuesFromWAL(DataInputStream stream) throws IOException { for (int i = 0; i < dataTypes.length; i++) { byte typeNum = stream.readByte(); - if (typeNum == TYPE_NULL) { + if (typeNum == TYPE_RAW_STRING + || typeNum == TYPE_NULL_WITHOUT_TYPE + || typeNum == TYPE_NULL_WITH_TYPE) { + values[i] = typeNum == TYPE_RAW_STRING ? ReadWriteIOUtils.readString(stream) : null; + if (typeNum == TYPE_NULL_WITH_TYPE) { + dataTypes[i] = ReadWriteIOUtils.readDataType(stream); + } continue; } dataTypes[i] = TSDataType.values()[typeNum]; @@ -788,7 +896,13 @@ void deserializeMeasurementsAndValuesFromWAL(ByteBuffer buffer) { public void fillDataTypesAndValuesFromWAL(ByteBuffer buffer) { for (int i = 0; i < dataTypes.length; i++) { byte typeNum = buffer.get(); - if (typeNum == TYPE_NULL) { + if (typeNum == TYPE_RAW_STRING + || typeNum == TYPE_NULL_WITHOUT_TYPE + || typeNum == TYPE_NULL_WITH_TYPE) { + values[i] = typeNum == TYPE_RAW_STRING ? ReadWriteIOUtils.readString(buffer) : null; + if (typeNum == TYPE_NULL_WITH_TYPE) { + dataTypes[i] = ReadWriteIOUtils.readDataType(buffer); + } continue; } dataTypes[i] = TSDataType.values()[typeNum]; @@ -853,13 +967,19 @@ public R accept(PlanVisitor visitor, C context) { } public TimeValuePair composeTimeValuePair(int columnIndex) { - if (columnIndex >= values.length || Objects.isNull(dataTypes[columnIndex])) { + if (measurements == null + || columnIndex < 0 + || columnIndex >= measurements.length + || values == null + || columnIndex >= values.length + || values[columnIndex] == null + || dataTypes == null + || columnIndex >= dataTypes.length + || Objects.isNull(dataTypes[columnIndex])) { return null; } Object value = values[columnIndex]; - return Objects.nonNull(value) - ? new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value)) - : null; + return new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value)); } public void updateLastCache(String databaseName) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java index d7a6e8b6f5f59..786dbbc246de6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java @@ -216,7 +216,10 @@ private void storeMeasurementsAndDataType() { for (InsertRowNode insertRowNode : insertRowNodeList) { String[] measurements = insertRowNode.getMeasurements(); TSDataType[] dataTypes = insertRowNode.getDataTypes(); - for (int i = 0; i < measurements.length; i++) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (measurements[i] == null || dataTypes == null || i >= dataTypes.length) { + continue; + } if (!measurementSet.contains(measurements[i])) { measurementList.add(measurements[i]); dataTypeList.add(dataTypes[i]); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index 472c1e79d7e47..a903340077d5a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -278,15 +278,17 @@ public List splitByPartition(IAnalysis analysis) { int count = end - start; long[] subTimes = new long[count]; int destLoc = 0; - Object[] values = initTabletValues(dataTypes.length, count, dataTypes); - BitMap[] bitMaps = initBitmapsForSplit(dataTypes.length, count); + final int columnSize = getColumnArrayLength(); + Object[] values = initTabletValues(columnSize, count, dataTypes); + BitMap[] bitMaps = initBitmapsForSplit(columnSize, count); System.arraycopy(times, start, subTimes, destLoc, end - start); for (int k = 0; k < values.length; k++) { - if (dataTypes[k] != null) { + if (hasColumnForSplit(k)) { System.arraycopy(columns[k], start, values[k], destLoc, end - start); } - if (bitMaps != null && this.bitMaps[k] != null) { - BitMap.copyOfRange(this.bitMaps[k], start, bitMaps[k], destLoc, end - start); + final BitMap sourceBitMap = getBitMapIfPresent(k); + if (bitMaps != null && bitMaps[k] != null && sourceBitMap != null) { + BitMap.copyOfRange(sourceBitMap, start, bitMaps[k], destLoc, end - start); } } InsertTabletNode subNode = @@ -331,7 +333,7 @@ public List getTimePartitionSlots() { private Object[] initTabletValues(int columnSize, int rowSize, TSDataType[] dataTypes) { Object[] values = new Object[columnSize]; for (int i = 0; i < values.length; i++) { - if (dataTypes[i] != null) { + if (dataTypes != null && i < dataTypes.length && dataTypes[i] != null) { switch (dataTypes[i]) { case TEXT: case BLOB: @@ -378,7 +380,7 @@ protected BitMap[] initBitmapsForSplit(int columnSize, int rowSize) { final BitMap[] splitBitMaps = new BitMap[columnSize]; boolean hasBitMap = false; for (int i = 0; i < columnSize && i < this.bitMaps.length; ++i) { - if (this.bitMaps[i] != null && !this.bitMaps[i].isAllUnmarked()) { + if (hasColumnForSplit(i) && this.bitMaps[i] != null && !this.bitMaps[i].isAllUnmarked()) { splitBitMaps[i] = new BitMap(rowSize); hasBitMap = true; } @@ -386,14 +388,58 @@ protected BitMap[] initBitmapsForSplit(int columnSize, int rowSize) { return hasBitMap ? splitBitMaps : null; } + private int getColumnArrayLength() { + int length = 0; + if (measurements != null) { + length = Math.max(length, measurements.length); + } + if (measurementSchemas != null) { + length = Math.max(length, measurementSchemas.length); + } + if (dataTypes != null) { + length = Math.max(length, dataTypes.length); + } + if (columns != null) { + length = Math.max(length, columns.length); + } + if (bitMaps != null) { + length = Math.max(length, bitMaps.length); + } + return length; + } + + private boolean hasColumnForSplit(int index) { + return dataTypes != null + && index >= 0 + && index < dataTypes.length + && dataTypes[index] != null + && columns != null + && index < columns.length + && columns[index] != null + && (measurements == null || index < measurements.length && measurements[index] != null) + && (measurementSchemas == null + || index < measurementSchemas.length && measurementSchemas[index] != null); + } + + private BitMap getBitMapIfPresent(final int index) { + return bitMaps != null && index >= 0 && index < bitMaps.length ? bitMaps[index] : null; + } + @Override public void markFailedMeasurement(int index) { - if (measurements[index] == null) { + if (measurements == null + || index < 0 + || index >= measurements.length + || measurements[index] == null) { return; } measurements[index] = null; - dataTypes[index] = null; - columns[index] = null; + if (dataTypes != null && index < dataTypes.length) { + dataTypes[index] = null; + } + if (columns != null && index < columns.length) { + columns[index] = null; + } } @Override @@ -435,12 +481,12 @@ void subSerialize(DataOutputStream stream) throws IOException { /** Serialize measurements or measurement schemas, ignoring failed time series */ private void writeMeasurementsOrSchemas(ByteBuffer buffer) { - ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), buffer); + ReadWriteIOUtils.write(getValidMeasurementNumber(), buffer); ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), buffer); - for (int i = 0; i < measurements.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } // serialize measurement schemas when exist @@ -454,12 +500,12 @@ private void writeMeasurementsOrSchemas(ByteBuffer buffer) { /** Serialize measurements or measurement schemas, ignoring failed time series */ private void writeMeasurementsOrSchemas(DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), stream); + ReadWriteIOUtils.write(getValidMeasurementNumber(), stream); ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), stream); - for (int i = 0; i < measurements.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } // serialize measurement schemas when exist @@ -473,9 +519,9 @@ private void writeMeasurementsOrSchemas(DataOutputStream stream) throws IOExcept /** Serialize data types, ignoring failed time series */ private void writeDataTypes(ByteBuffer buffer) { - for (int i = 0; i < dataTypes.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } dataTypes[i].serializeTo(buffer); @@ -484,9 +530,9 @@ private void writeDataTypes(ByteBuffer buffer) { /** Serialize data types, ignoring failed time series */ private void writeDataTypes(DataOutputStream stream) throws IOException { - for (int i = 0; i < dataTypes.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } dataTypes[i].serializeTo(stream); @@ -511,17 +557,18 @@ private void writeTimes(final DataOutputStream stream) throws IOException { private void writeBitMaps(ByteBuffer buffer) { ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), buffer); if (bitMaps != null) { - for (int i = 0; i < bitMaps.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } - if (bitMaps[i] == null) { + final BitMap bitMap = getBitMapIfPresent(i); + if (bitMap == null) { ReadWriteIOUtils.write(BytesUtils.boolToByte(false), buffer); } else { ReadWriteIOUtils.write(BytesUtils.boolToByte(true), buffer); - buffer.put(bitMaps[i].getByteArray(), 0, rowCount / 8 + 1); + buffer.put(bitMap.getByteArray(), 0, rowCount / 8 + 1); } } } @@ -531,17 +578,18 @@ private void writeBitMaps(ByteBuffer buffer) { private void writeBitMaps(DataOutputStream stream) throws IOException { ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream); if (bitMaps != null) { - for (int i = 0; i < bitMaps.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } - if (bitMaps[i] == null) { + final BitMap bitMap = getBitMapIfPresent(i); + if (bitMap == null) { ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream); } else { ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream); - stream.write(bitMaps[i].getByteArray(), 0, rowCount / 8 + 1); + stream.write(bitMap.getByteArray(), 0, rowCount / 8 + 1); } } } @@ -549,9 +597,9 @@ private void writeBitMaps(DataOutputStream stream) throws IOException { /** Serialize values, ignoring failed time series */ private void writeValues(ByteBuffer buffer) { - for (int i = 0; i < columns.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; columns != null && i < columns.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } serializeColumn(dataTypes[i], columns[i], buffer); @@ -560,9 +608,9 @@ private void writeValues(ByteBuffer buffer) { /** Serialize values, ignoring failed time series */ private void writeValues(DataOutputStream stream) throws IOException { - for (int i = 0; i < columns.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; columns != null && i < columns.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurement(i)) { continue; } serializeColumn(dataTypes[i], columns[i], stream); @@ -748,24 +796,25 @@ int subSerializeSize(int start, int end) { // bitmaps size size += Byte.BYTES; if (bitMaps != null) { - for (int i = 0; i < bitMaps.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurementToWAL(i)) { continue; } size += Byte.BYTES; - if (bitMaps[i] != null) { + final BitMap bitMap = getBitMapIfPresent(i); + if (bitMap != null) { int len = end - start; BitMap partBitMap = new BitMap(len); - BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len); + BitMap.copyOfRange(bitMap, start, partBitMap, 0, len); size += partBitMap.getByteArray().length; } } } // values size - for (int i = 0; i < dataTypes.length; i++) { - if (columns[i] != null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurementToWAL(i)) { size += getColumnSize(dataTypes[i], columns[i], start, end); } } @@ -833,7 +882,7 @@ void subSerialize(IWALByteBufferView buffer, int start, int end) { /** Serialize measurement schemas, ignoring failed time series */ private void writeMeasurementSchemas(IWALByteBufferView buffer) { - buffer.putInt(measurements.length - getFailedMeasurementNumber()); + buffer.putInt(getValidMeasurementNumberForWAL()); serializeMeasurementSchemasToWAL(buffer); } @@ -848,19 +897,20 @@ private void writeTimes(IWALByteBufferView buffer, int start, int end) { private void writeBitMaps(IWALByteBufferView buffer, int start, int end) { buffer.put(BytesUtils.boolToByte(bitMaps != null)); if (bitMaps != null) { - for (int i = 0; i < bitMaps.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurementToWAL(i)) { continue; } - if (bitMaps[i] == null) { + final BitMap bitMap = getBitMapIfPresent(i); + if (bitMap == null) { buffer.put(BytesUtils.boolToByte(false)); } else { buffer.put(BytesUtils.boolToByte(true)); int len = end - start; BitMap partBitMap = new BitMap(len); - BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len); + BitMap.copyOfRange(bitMap, start, partBitMap, 0, len); buffer.put(partBitMap.getByteArray()); } } @@ -869,15 +919,78 @@ private void writeBitMaps(IWALByteBufferView buffer, int start, int end) { /** Serialize values, ignoring failed time series */ private void writeValues(IWALByteBufferView buffer, int start, int end) { - for (int i = 0; i < columns.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + // ignore failed partial insert and incomplete columns + if (!shouldSerializeMeasurementToWAL(i)) { continue; } serializeColumn(dataTypes[i], columns[i], buffer, start, end); } } + @Override + protected int getValidMeasurementNumber() { + int validMeasurementNumber = 0; + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurement(i)) { + validMeasurementNumber++; + } + } + return validMeasurementNumber; + } + + protected int getValidMeasurementNumberForWAL() { + int validMeasurementNumber = 0; + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurementToWAL(i)) { + validMeasurementNumber++; + } + } + return validMeasurementNumber; + } + + @Override + protected int serializeMeasurementSchemasSize() { + int byteLen = 0; + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurementToWAL(i)) { + byteLen += WALWriteUtils.sizeToWrite(measurementSchemas[i]); + } + } + return byteLen; + } + + @Override + protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurementToWAL(i)) { + WALWriteUtils.write(measurementSchemas[i], buffer); + } + } + } + + private boolean shouldSerializeMeasurement(int index) { + return measurements != null + && index >= 0 + && index < measurements.length + && measurements[index] != null + && (measurementSchemas == null + || index < measurementSchemas.length && measurementSchemas[index] != null) + && dataTypes != null + && index < dataTypes.length + && dataTypes[index] != null + && columns != null + && index < columns.length + && columns[index] != null; + } + + private boolean shouldSerializeMeasurementToWAL(int index) { + return shouldSerializeMeasurement(index) + && measurementSchemas != null + && index < measurementSchemas.length + && measurementSchemas[index] != null; + } + private void serializeColumn( TSDataType dataType, Object column, IWALByteBufferView buffer, int start, int end) { switch (dataType) { @@ -1043,8 +1156,9 @@ private boolean equals(Object[] columns) { } for (int i = 0; i < columns.length; i++) { - if (dataTypes[i] != null) { - switch (dataTypes[i]) { + final TSDataType dataType = getDataType(i); + if (dataType != null) { + switch (dataType) { case INT32: case DATE: if (!Arrays.equals((int[]) this.columns[i], (int[]) columns[i])) { @@ -1080,10 +1194,9 @@ private boolean equals(Object[] columns) { } break; default: - throw new UnSupportedDataTypeException( - String.format(DATATYPE_UNSUPPORTED, dataTypes[i])); + throw new UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED, dataType)); } - } else if (!columns[i].equals(columns)) { + } else if (!Objects.equals(this.columns[i], columns[i])) { return false; } } @@ -1098,14 +1211,14 @@ public R accept(PlanVisitor visitor, C context) { public TimeValuePair composeLastTimeValuePair( int measurementIndex, int startOffset, int endOffset) { - if (measurementIndex >= columns.length || Objects.isNull(dataTypes[measurementIndex])) { + if (!canComposeLastTimeValuePair(measurementIndex)) { return null; } // get non-null value int lastIdx = Math.min(endOffset - 1, rowCount - 1); - if (bitMaps != null && bitMaps[measurementIndex] != null) { - BitMap bitMap = bitMaps[measurementIndex]; + final BitMap bitMap = getBitMapIfPresent(measurementIndex); + if (bitMap != null) { while (lastIdx >= startOffset) { if (!bitMap.isMarked(lastIdx)) { break; @@ -1127,11 +1240,11 @@ protected TimeValuePair composeLastTimeValuePair( if (results == null) { return composeLastTimeValuePair(measurementIndex, startOffset, endOffset); } - if (measurementIndex >= columns.length || Objects.isNull(dataTypes[measurementIndex])) { + if (!canComposeLastTimeValuePair(measurementIndex)) { return null; } - final BitMap bitMap = bitMaps == null ? null : bitMaps[measurementIndex]; + final BitMap bitMap = getBitMapIfPresent(measurementIndex); int lastIdx = Math.min(endOffset - 1, rowCount - 1); while (lastIdx >= startOffset) { if (results[lastIdx] != null @@ -1148,6 +1261,19 @@ protected TimeValuePair composeLastTimeValuePair( return lastIdx < startOffset ? null : composeTimeValuePair(measurementIndex, lastIdx); } + private boolean canComposeLastTimeValuePair(final int measurementIndex) { + return measurements != null + && measurementIndex >= 0 + && measurementIndex < measurements.length + && measurements[measurementIndex] != null + && columns != null + && measurementIndex < columns.length + && columns[measurementIndex] != null + && dataTypes != null + && measurementIndex < dataTypes.length + && dataTypes[measurementIndex] != null; + } + private TimeValuePair composeTimeValuePair(final int measurementIndex, final int rowIndex) { TsPrimitiveType value; switch (dataTypes[measurementIndex]) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java index b6c4d6a4382fd..1563c3b56cff1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java @@ -119,6 +119,16 @@ public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) { this.measurementSchemas = measurementSchemas; } + public void setMeasurementSchema(MeasurementSchema measurementSchema, int index) { + if (measurementSchemas == null || index >= measurementSchemas.length) { + measurementSchemas = + measurementSchemas == null + ? new MeasurementSchema[getRequiredColumnArrayLength(index)] + : Arrays.copyOf(measurementSchemas, getRequiredColumnArrayLength(index)); + } + measurementSchemas[index] = measurementSchema; + } + public boolean isAligned() { return isAligned; } @@ -131,10 +141,24 @@ public TSDataType[] getDataTypes() { return dataTypes; } + public TSDataType getDataType(int index) { + return dataTypes == null || index < 0 || index >= dataTypes.length ? null : dataTypes[index]; + } + public void setDataTypes(TSDataType[] dataTypes) { this.dataTypes = dataTypes; } + public void setDataType(TSDataType dataType, int index) { + if (dataTypes == null || index >= dataTypes.length) { + dataTypes = + dataTypes == null + ? new TSDataType[getRequiredColumnArrayLength(index)] + : Arrays.copyOf(dataTypes, getRequiredColumnArrayLength(index)); + } + dataTypes[index] = dataType; + } + /** Returns true when this statement is empty and no need to write into the server */ public abstract boolean isEmpty(); @@ -165,35 +189,46 @@ public void updateAfterSchemaValidation(MPPQueryContext context) throws QueryPro /** Check whether data types are matched with measurement schemas */ protected void selfCheckDataTypes(int index) throws DataTypeMismatchException, PathNotExistException { + final MeasurementSchema measurementSchema = + measurementSchemas != null && index >= 0 && index < measurementSchemas.length + ? measurementSchemas[index] + : null; + final TSDataType dataType = getDataType(index); + final String measurement = + measurements != null && index >= 0 && index < measurements.length + ? measurements[index] + : null; + final String fullPath = + measurement == null + ? devicePath.getFullPath() + : devicePath.concatNode(measurement).getFullPath(); if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { // if enable partial insert, mark failed measurements with exception - if (measurementSchemas[index] == null) { - markFailedMeasurement( - index, - new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath())); - } else if ((dataTypes[index] != measurementSchemas[index].getType() - && !checkAndCastDataType(index, measurementSchemas[index].getType()))) { + if (measurementSchema == null) { + markFailedMeasurement(index, new PathNotExistException(fullPath)); + } else if ((dataType != measurementSchema.getType() + && !checkAndCastDataType(index, measurementSchema.getType()))) { markFailedMeasurement( index, new DataTypeMismatchException( devicePath.getFullPath(), - measurements[index], - dataTypes[index], - measurementSchemas[index].getType(), + measurement, + dataType, + measurementSchema.getType(), getMinTime(), getFirstValueOfIndex(index))); } } else { // if not enable partial insert, throw the exception directly - if (measurementSchemas[index] == null) { - throw new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath()); - } else if ((dataTypes[index] != measurementSchemas[index].getType() - && !checkAndCastDataType(index, measurementSchemas[index].getType()))) { + if (measurementSchema == null) { + throw new PathNotExistException(fullPath); + } else if ((dataType != measurementSchema.getType() + && !checkAndCastDataType(index, measurementSchema.getType()))) { throw new DataTypeMismatchException( devicePath.getFullPath(), - measurements[index], - dataTypes[index], - measurementSchemas[index].getType(), + measurement, + dataType, + measurementSchema.getType(), getMinTime(), getFirstValueOfIndex(index)); } @@ -208,8 +243,12 @@ protected void selfCheckDataTypes(int index) public void semanticCheck() { Set deduplicatedMeasurements = new HashSet<>(); - for (String measurement : measurements) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + final String measurement = measurements[i]; if (measurement == null || measurement.isEmpty()) { + if (failedMeasurementIndex2Info != null && failedMeasurementIndex2Info.containsKey(i)) { + continue; + } throw new SemanticException( "Measurement contains null or empty string: " + Arrays.toString(measurements)); } @@ -241,14 +280,21 @@ public void removeAllFailedMeasurementMarks() { } public boolean hasValidMeasurements() { - for (Object o : measurements) { - if (o != null) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (isColumnPresent(i)) { return true; } } return false; } + public boolean isColumnPresent(final int index) { + return measurements != null + && index >= 0 + && index < measurements.length + && measurements[index] != null; + } + public boolean hasFailedMeasurements() { return failedMeasurementIndex2Info != null && !failedMeasurementIndex2Info.isEmpty(); } @@ -357,6 +403,9 @@ protected Map>> getMapFromDeviceToMeasur Map>> mapFromDeviceToMeasurementAndIndex = new HashMap<>(); for (int i = 0; i < this.measurements.length; i++) { + if (!isColumnPresent(i)) { + continue; + } PartialPath targetDevicePath; String measurementName; if (isLogicalView[i]) { @@ -442,5 +491,9 @@ private long shallowSizeOfList(List list) { : 0L; } + private int getRequiredColumnArrayLength(final int index) { + return Math.max(measurements == null ? 0 : measurements.length, index + 1); + } + protected abstract long calculateBytesUsed(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java index faa46cadd9dd6..fe78b48fa2252 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java @@ -88,8 +88,11 @@ public InsertRowStatement() { @Override public List getPaths() { List ret = new ArrayList<>(); - for (String m : measurements) { - PartialPath fullPath = devicePath.concatNode(m); + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (!isColumnPresent(i)) { + continue; + } + PartialPath fullPath = devicePath.concatNode(measurements[i]); ret.add(fullPath); } return ret; @@ -121,7 +124,7 @@ public void setNeedInferType(boolean needInferType) { @Override public boolean isEmpty() { - return values.length == 0; + return values == null || values.length == 0; } public void fillValues(ByteBuffer buffer) throws QueryProcessException { @@ -181,11 +184,14 @@ public long getMinTime() { @Override public Object getFirstValueOfIndex(int index) { - return values[index]; + return values == null || index < 0 || index >= values.length ? null : values[index]; } @Override protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) { + if (dataTypes == null || columnIndex < 0 || columnIndex >= dataTypes.length) { + return false; + } if (CommonUtils.checkCanCastType(dataTypes[columnIndex], dataType)) { values[columnIndex] = CommonUtils.castValue(dataTypes[columnIndex], dataType, values[columnIndex]); @@ -202,7 +208,13 @@ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) { @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public void transferType(ZoneId zoneId) throws QueryProcessException { + if (measurementSchemas == null) { + return; + } for (int i = 0; i < measurementSchemas.length; i++) { + if (!isColumnPresent(i) || dataTypes == null || i >= dataTypes.length) { + continue; + } // null when time series doesn't exist if (measurementSchemas[i] == null) { if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { @@ -220,6 +232,9 @@ public void transferType(ZoneId zoneId) throws QueryProcessException { } // parse string value to specific type dataTypes[i] = measurementSchemas[i].getType(); + if (values == null || i >= values.length || values[i] == null) { + continue; + } try { values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString(), zoneId); } catch (Exception e) { @@ -243,7 +258,10 @@ public void transferType(ZoneId zoneId) throws QueryProcessException { @Override public void markFailedMeasurement(int index, Exception cause) { - if (measurements[index] == null) { + if (measurements == null + || index < 0 + || index >= measurements.length + || measurements[index] == null) { return; } @@ -253,12 +271,19 @@ public void markFailedMeasurement(int index, Exception cause) { InsertBaseStatement.FailedMeasurementInfo failedMeasurementInfo = new InsertBaseStatement.FailedMeasurementInfo( - measurements[index], dataTypes[index], values[index], cause); + measurements[index], + getDataType(index), + values != null && index < values.length ? values[index] : null, + cause); failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo); measurements[index] = null; - dataTypes[index] = null; - values[index] = null; + if (dataTypes != null && index < dataTypes.length) { + dataTypes[index] = null; + } + if (values != null && index < values.length) { + values[index] = null; + } } @Override @@ -268,15 +293,15 @@ public void removeAllFailedMeasurementMarks() { } failedMeasurementIndex2Info.forEach( (index, info) -> { - if (measurements != null) { + if (measurements != null && index < measurements.length) { measurements[index] = info.getMeasurement(); } - if (dataTypes != null) { + if (dataTypes != null && index < dataTypes.length) { dataTypes[index] = info.getDataType(); } - if (values != null) { + if (values != null && index < values.length) { values[index] = info.getValue(); } }); @@ -290,7 +315,7 @@ public Map getFailedMeasurementInfoMap() { @Override public void semanticCheck() { super.semanticCheck(); - if (measurements.length != values.length) { + if (measurements != null && values != null && measurements.length != values.length) { throw new SemanticException( String.format( "the measurementList's size %d is not consistent with the valueList's size %d", @@ -376,12 +401,18 @@ public void updateAfterSchemaValidation(MPPQueryContext context) throws QueryPro @Override public TSDataType getDataType(int index) { if (isNeedInferType) { - return TypeInferenceUtils.getPredictedDataType(values[index], true); + return TypeInferenceUtils.getPredictedDataType( + values != null && index >= 0 && index < values.length ? values[index] : null, true); } else { - return dataTypes[index]; + return super.getDataType(index); } } + @Override + public boolean isColumnPresent(final int index) { + return super.isColumnPresent(index) && values != null && index < values.length; + } + @Override public TSEncoding getEncoding(int index) { return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java index 5013a13915c17..ef42489e87530 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java @@ -75,7 +75,7 @@ public void setInsertRowStatementList(List insertRowStatemen for (InsertRowStatement insertRowStatement : insertRowStatementList) { String[] measurements = insertRowStatement.getMeasurements(); for (String measurement : measurements) { - if (!measurementSet.contains(measurement)) { + if (measurement != null && !measurementSet.contains(measurement)) { measurementList.add(measurement); measurementSet.add(measurement); } @@ -102,6 +102,9 @@ public R accept(StatementVisitor visitor, C context) { public List getPaths() { List ret = new ArrayList<>(); for (String m : measurements) { + if (m == null) { + continue; + } PartialPath fullPath = devicePath.concatNode(m); ret.add(fullPath); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index b5672cbbb4fc5..354856e2a2401 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -181,9 +181,13 @@ public void setTimes(long[] times) { @Override public boolean isEmpty() { return rowCount == 0 + || times == null || times.length == 0 + || measurements == null || measurements.length == 0 + || dataTypes == null || dataTypes.length == 0 + || columns == null || columns.length == 0; } @@ -211,8 +215,11 @@ public R accept(StatementVisitor visitor, C context) { @Override public List getPaths() { List ret = new ArrayList<>(); - for (String m : measurements) { - PartialPath fullPath = devicePath.concatNode(m); + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (!isColumnPresent(i)) { + continue; + } + PartialPath fullPath = devicePath.concatNode(measurements[i]); ret.add(fullPath); } return ret; @@ -230,6 +237,9 @@ public List getSchemaValidationList() { @Override protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) { + if (dataTypes == null || columnIndex < 0 || columnIndex >= dataTypes.length) { + return false; + } if (CommonUtils.checkCanCastType(dataTypes[columnIndex], dataType)) { columns[columnIndex] = CommonUtils.castArray(dataTypes[columnIndex], dataType, columns[columnIndex]); @@ -241,7 +251,10 @@ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) { @Override public void markFailedMeasurement(int index, Exception cause) { - if (measurements[index] == null) { + if (measurements == null + || index < 0 + || index >= measurements.length + || measurements[index] == null) { return; } @@ -251,12 +264,19 @@ public void markFailedMeasurement(int index, Exception cause) { InsertBaseStatement.FailedMeasurementInfo failedMeasurementInfo = new InsertBaseStatement.FailedMeasurementInfo( - measurements[index], dataTypes[index], columns[index], cause); + measurements[index], + getDataType(index), + columns != null && index < columns.length ? columns[index] : null, + cause); failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo); measurements[index] = null; - dataTypes[index] = null; - columns[index] = null; + if (dataTypes != null && index < dataTypes.length) { + dataTypes[index] = null; + } + if (columns != null && index < columns.length) { + columns[index] = null; + } } @Override @@ -266,9 +286,15 @@ public void removeAllFailedMeasurementMarks() { } failedMeasurementIndex2Info.forEach( (index, info) -> { - measurements[index] = info.getMeasurement(); - dataTypes[index] = info.getDataType(); - columns[index] = info.getValue(); + if (measurements != null && index < measurements.length) { + measurements[index] = info.getMeasurement(); + } + if (dataTypes != null && index < dataTypes.length) { + dataTypes[index] = info.getDataType(); + } + if (columns != null && index < columns.length) { + columns[index] = info.getValue(); + } }); failedMeasurementIndex2Info.clear(); } @@ -276,7 +302,7 @@ public void removeAllFailedMeasurementMarks() { @Override public void semanticCheck() { super.semanticCheck(); - if (measurements.length != columns.length) { + if (measurements != null && columns != null && measurements.length != columns.length) { throw new SemanticException( String.format( "the measurementList's size %d is not consistent with the columnList's size %d", @@ -356,6 +382,15 @@ public long getMinTime() { @Override public Object getFirstValueOfIndex(int index) { + if (dataTypes == null + || columns == null + || index < 0 + || index >= dataTypes.length + || index >= columns.length + || dataTypes[index] == null + || columns[index] == null) { + return null; + } Object value; switch (dataTypes[index]) { case INT32: @@ -395,7 +430,12 @@ public Object getFirstValueOfIndex(int index) { @Override public TSDataType getDataType(int index) { - return dataTypes[index]; + return super.getDataType(index); + } + + @Override + public boolean isColumnPresent(final int index) { + return super.isColumnPresent(index) && columns != null && index < columns.length; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index ab95fc57662aa..1b374ae40c5ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -172,7 +172,7 @@ private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet( memTableMap.computeIfAbsent( deviceId, k -> { - seriesNumber += schemaList.size(); + seriesNumber += schemaList.stream().filter(Objects::nonNull).count(); return new AlignedWritableMemChunkGroup( schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList())); }); @@ -192,28 +192,27 @@ public int insert(InsertRowNode insertRowNode) { List schemaList = new ArrayList<>(); List dataTypes = new ArrayList<>(); - int nullPointsNumber = 0; - for (int i = 0; i < insertRowNode.getMeasurements().length; i++) { + int pointsInserted = 0; + for (int i = 0; measurements != null && i < measurements.length; i++) { // Use measurements[i] to ignore failed partial insert - if (measurements[i] == null || values[i] == null) { - if (values[i] == null) { - nullPointsNumber++; - } + if (measurements[i] == null + || values == null + || i >= values.length + || values[i] == null + || insertRowNode.getMeasurementSchemas() == null + || i >= insertRowNode.getMeasurementSchemas().length + || insertRowNode.getMeasurementSchemas()[i] == null) { schemaList.add(null); } else { IMeasurementSchema schema = insertRowNode.getMeasurementSchemas()[i]; schemaList.add(schema); dataTypes.add(schema.getType()); + pointsInserted++; } } memSize += MemUtils.getRowRecordSize(dataTypes, values); write(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values); - int pointsInserted = - insertRowNode.getMeasurements().length - - insertRowNode.getFailedMeasurementNumber() - - nullPointsNumber; - totalPointsNum += pointsInserted; return pointsInserted; } @@ -225,23 +224,29 @@ public int insertAlignedRow(InsertRowNode insertRowNode) { Object[] values = insertRowNode.getValues(); List schemaList = new ArrayList<>(); List dataTypes = new ArrayList<>(); - for (int i = 0; i < insertRowNode.getMeasurements().length; i++) { + int pointsInserted = 0; + for (int i = 0; measurements != null && i < measurements.length; i++) { // Use measurements[i] to ignore failed partial insert - if (measurements[i] == null || values[i] == null) { + if (measurements[i] == null + || values == null + || i >= values.length + || values[i] == null + || insertRowNode.getMeasurementSchemas() == null + || i >= insertRowNode.getMeasurementSchemas().length + || insertRowNode.getMeasurementSchemas()[i] == null) { schemaList.add(null); continue; } IMeasurementSchema schema = insertRowNode.getMeasurementSchemas()[i]; schemaList.add(schema); dataTypes.add(schema.getType()); + pointsInserted++; } if (schemaList.isEmpty()) { return 0; } memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values); writeAlignedRow(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values); - int pointsInserted = - insertRowNode.getMeasurements().length - insertRowNode.getFailedMeasurementNumber(); totalPointsNum += pointsInserted; return pointsInserted; } @@ -252,9 +257,7 @@ public int insertTablet(InsertTabletNode insertTabletNode, int start, int end) try { writeTabletNode(insertTabletNode, start, end); memSize += MemUtils.getTabletSize(insertTabletNode, start, end); - int pointsInserted = - (insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber()) - * (end - start); + int pointsInserted = countWritableMeasurements(insertTabletNode) * (end - start); totalPointsNum += pointsInserted; return pointsInserted; } catch (RuntimeException e) { @@ -268,9 +271,7 @@ public int insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int try { writeAlignedTablet(insertTabletNode, start, end); memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end); - int pointsInserted = - (insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber()) - * (end - start); + int pointsInserted = countWritableMeasurements(insertTabletNode) * (end - start); totalPointsNum += pointsInserted; return pointsInserted; } catch (RuntimeException e) { @@ -333,8 +334,10 @@ public void writeAlignedRow( public void writeTabletNode(InsertTabletNode insertTabletNode, int start, int end) { List schemaList = new ArrayList<>(); - for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) { - if (insertTabletNode.getColumns()[i] == null) { + for (int i = 0; + insertTabletNode.getMeasurements() != null && i < insertTabletNode.getMeasurements().length; + i++) { + if (!isWritableMeasurement(insertTabletNode, i)) { schemaList.add(null); } else { schemaList.add(insertTabletNode.getMeasurementSchemas()[i]); @@ -354,8 +357,10 @@ public void writeTabletNode(InsertTabletNode insertTabletNode, int start, int en public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start, int end) { List schemaList = new ArrayList<>(); - for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) { - if (insertTabletNode.getColumns()[i] == null) { + for (int i = 0; + insertTabletNode.getMeasurements() != null && i < insertTabletNode.getMeasurements().length; + i++) { + if (!isWritableMeasurement(insertTabletNode, i)) { schemaList.add(null); } else { schemaList.add(insertTabletNode.getMeasurementSchemas()[i]); @@ -375,6 +380,31 @@ public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start, int end); } + private int countWritableMeasurements(InsertTabletNode insertTabletNode) { + int count = 0; + for (int i = 0; + insertTabletNode.getMeasurements() != null && i < insertTabletNode.getMeasurements().length; + i++) { + if (isWritableMeasurement(insertTabletNode, i)) { + count++; + } + } + return count; + } + + private boolean isWritableMeasurement(InsertTabletNode insertTabletNode, int index) { + return insertTabletNode.getMeasurements()[index] != null + && insertTabletNode.getMeasurementSchemas() != null + && index < insertTabletNode.getMeasurementSchemas().length + && insertTabletNode.getMeasurementSchemas()[index] != null + && insertTabletNode.getDataTypes() != null + && index < insertTabletNode.getDataTypes().length + && insertTabletNode.getDataTypes()[index] != null + && insertTabletNode.getColumns() != null + && index < insertTabletNode.getColumns().length + && insertTabletNode.getColumns()[index] != null; + } + @Override public boolean checkIfChunkDoesNotExist(IDeviceID deviceId, String measurement) { IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index 6c773942fb72b..d91945f993f8f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -272,6 +273,16 @@ public abstract void encode( @Override public abstract int serializedSize(); + protected static byte[] serializeSchemaToWALBytes(IMeasurementSchema schema) { + byte[] bytes = new byte[schema.serializedSize()]; + schema.serializeTo(ByteBuffer.wrap(bytes)); + return bytes; + } + + protected static int getSerializedSchemaSize(IMeasurementSchema schema) { + return schema.serializedSize(); + } + @Override public synchronized TVList initWorkingListForFlushIfNecessary( TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 06bd5404be0f1..b0c04d08e842d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -43,7 +43,6 @@ import java.io.DataInputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashMap; @@ -642,7 +641,7 @@ public int serializedSize() { int size = 0; size += Integer.BYTES; for (IMeasurementSchema schema : schemaList) { - size += schema.serializedSize(); + size += getSerializedSchemaSize(schema); } size += Integer.BYTES; for (AlignedTVList alignedTvList : sortedList) { @@ -656,9 +655,7 @@ public int serializedSize() { public void serializeToWAL(IWALByteBufferView buffer) { WALWriteUtils.write(schemaList.size(), buffer); for (IMeasurementSchema schema : schemaList) { - byte[] bytes = new byte[schema.serializedSize()]; - schema.serializeTo(ByteBuffer.wrap(bytes)); - buffer.put(bytes); + buffer.put(serializeSchemaToWALBytes(schema)); } buffer.putInt(sortedList.size()); for (AlignedTVList alignedTvList : sortedList) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 6bbcb427f8798..7a268860ef30c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -587,9 +587,9 @@ private long[] checkMemCostAndAddToTspInfoForRow( long textDataIncrement = 0L; long chunkMetadataIncrement = 0L; - for (int i = 0; i < dataTypes.length; i++) { + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null || measurements[i] == null) { + if (!isWritableMeasurement(measurements, dataTypes, values, i)) { continue; } IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurements[i]); @@ -605,7 +605,7 @@ private long[] checkMemCostAndAddToTspInfoForRow( } } // TEXT data mem size - if (dataTypes[i].isBinary() && values[i] != null) { + if (dataTypes[i].isBinary()) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); } } @@ -627,9 +627,9 @@ private long[] checkMemCostAndAddToTspInfoForRows(List insertRowN TSDataType[] dataTypes = insertRowNode.getDataTypes(); Object[] values = insertRowNode.getValues(); String[] measurements = insertRowNode.getMeasurements(); - for (int i = 0; i < dataTypes.length; i++) { + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null || measurements[i] == null) { + if (!isWritableMeasurement(measurements, dataTypes, values, i)) { continue; } IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurements[i]); @@ -658,7 +658,7 @@ private long[] checkMemCostAndAddToTspInfoForRows(List insertRowN increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v) -> v + 1); } // TEXT data mem size - if (dataTypes[i].isBinary() && values[i] != null) { + if (dataTypes[i].isBinary()) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); } } @@ -683,15 +683,17 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow( // ChunkMetadataIncrement chunkMetadataIncrement += ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR) - * dataTypes.length; - memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes); - for (int i = 0; i < dataTypes.length; i++) { + * countWritableMeasurements(measurements, dataTypes, values); + memTableIncrement += + AlignedTVList.alignedTvListArrayMemCost( + getWritableDataTypes(measurements, dataTypes, values)); + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null || measurements[i] == null) { + if (!isWritableMeasurement(measurements, dataTypes, values, i)) { continue; } // TEXT data mem size - if (dataTypes[i].isBinary() && values[i] != null) { + if (dataTypes[i].isBinary()) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); } } @@ -699,9 +701,9 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow( // For existed device of this mem table AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; List dataTypesInTVList = new ArrayList<>(); - for (int i = 0; i < dataTypes.length; i++) { + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null || measurements[i] == null) { + if (!isWritableMeasurement(measurements, dataTypes, values, i)) { continue; } @@ -713,7 +715,7 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow( dataTypesInTVList.add(dataTypes[i]); } // TEXT data mem size - if (dataTypes[i].isBinary() && values[i] != null) { + if (dataTypes[i].isBinary()) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); } } @@ -749,11 +751,13 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List ins // ChunkMetadataIncrement chunkMetadataIncrement += ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR) - * dataTypes.length; - memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes); - for (int i = 0; i < dataTypes.length; i++) { + * countWritableMeasurements(measurements, dataTypes, values); + memTableIncrement += + AlignedTVList.alignedTvListArrayMemCost( + getWritableDataTypes(measurements, dataTypes, values)); + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null || measurements[i] == null) { + if (!isWritableMeasurement(measurements, dataTypes, values, i)) { continue; } increasingMemTableInfo @@ -761,7 +765,7 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List ins .left .put(measurements[i], dataTypes[i]); // TEXT data mem size - if (dataTypes[i].isBinary() && values[i] != null) { + if (dataTypes[i].isBinary()) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); } } @@ -773,9 +777,9 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List ins List dataTypesInTVList = new ArrayList<>(); Pair, Integer> addingPointNumInfo = increasingMemTableInfo.computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 0)); - for (int i = 0; i < dataTypes.length; i++) { + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null || measurements[i] == null) { + if (!isWritableMeasurement(measurements, dataTypes, values, i)) { continue; } @@ -797,7 +801,7 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List ins addingPointNumInfo.left.put(measurements[i], dataTypes[i]); } // TEXT data mem size - if (dataTypes[i].isBinary() && values[i] != null) { + if (dataTypes[i].isBinary()) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); } } @@ -834,9 +838,9 @@ private long[] checkMemCostAndAddToTspInfoForTablet( } long[] memIncrements = new long[3]; // memTable, text, chunk metadata - for (int i = 0; i < dataTypes.length; i++) { + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null || columns[i] == null || measurements[i] == null) { + if (!isWritableMeasurement(measurements, dataTypes, columns, i)) { continue; } updateMemCost(dataTypes[i], measurements[i], deviceId, start, end, memIncrements, columns[i]); @@ -922,16 +926,15 @@ private void updateAlignedMemCost( if (memChunk == null) { // ChunkMetadataIncrement memIncrements[2] += - dataTypes.length + countWritableMeasurements(measurementIds, dataTypes, columns) * ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR); memIncrements[0] += ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1) - * AlignedTVList.alignedTvListArrayMemCost(dataTypes); - for (int i = 0; i < dataTypes.length; i++) { + * AlignedTVList.alignedTvListArrayMemCost( + getWritableDataTypes(measurementIds, dataTypes, columns)); + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { TSDataType dataType = dataTypes[i]; - String measurement = measurementIds[i]; - Object column = columns[i]; - if (dataType == null || column == null || measurement == null) { + if (!isWritableMeasurement(measurementIds, dataTypes, columns, i)) { continue; } // TEXT data size @@ -944,15 +947,14 @@ private void updateAlignedMemCost( } else { AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; List dataTypesInTVList = new ArrayList<>(); - for (int i = 0; i < dataTypes.length; i++) { + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { TSDataType dataType = dataTypes[i]; - String measurement = measurementIds[i]; - Object column = columns[i]; - if (dataType == null || column == null || measurement == null) { + if (!isWritableMeasurement(measurementIds, dataTypes, columns, i)) { continue; } + String measurement = measurementIds[i]; // Extending the column of aligned mem chunk - if (!alignedMemChunk.containsMeasurement(measurementIds[i])) { + if (!alignedMemChunk.containsMeasurement(measurement)) { memIncrements[0] += (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1) * AlignedTVList.valueListArrayMemCost(dataType); @@ -984,6 +986,42 @@ private void updateAlignedMemCost( } } + private static boolean isWritableMeasurement( + String[] measurements, TSDataType[] dataTypes, Object[] valuesOrColumns, int index) { + return measurements != null + && index >= 0 + && index < measurements.length + && measurements[index] != null + && dataTypes != null + && index < dataTypes.length + && dataTypes[index] != null + && valuesOrColumns != null + && index < valuesOrColumns.length + && valuesOrColumns[index] != null; + } + + private static int countWritableMeasurements( + String[] measurements, TSDataType[] dataTypes, Object[] columns) { + int count = 0; + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { + if (isWritableMeasurement(measurements, dataTypes, columns, i)) { + count++; + } + } + return count; + } + + private static TSDataType[] getWritableDataTypes( + String[] measurements, TSDataType[] dataTypes, Object[] columns) { + List writableDataTypes = new ArrayList<>(); + for (int i = 0; dataTypes != null && i < dataTypes.length; i++) { + if (isWritableMeasurement(measurements, dataTypes, columns, i)) { + writableDataTypes.add(dataTypes[i]); + } + } + return writableDataTypes.toArray(new TSDataType[0]); + } + private void updateMemoryInfo( long memTableIncrement, long chunkMetadataIncrement, long textDataIncrement) throws WriteProcessRejectException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index 90e96597b0cd2..544819c03915c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -43,7 +43,6 @@ import java.io.DataInputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -505,7 +504,7 @@ public void release() { @Override public int serializedSize() { - int serializedSize = schema.serializedSize() + list.serializedSize(); + int serializedSize = getSerializedSchemaSize(schema) + list.serializedSize(); serializedSize += Integer.BYTES; for (TVList tvList : sortedList) { serializedSize += tvList.serializedSize(); @@ -515,9 +514,7 @@ public int serializedSize() { @Override public void serializeToWAL(IWALByteBufferView buffer) { - byte[] bytes = new byte[schema.serializedSize()]; - schema.serializeTo(ByteBuffer.wrap(bytes)); - buffer.put(bytes); + buffer.put(serializeSchemaToWALBytes(schema)); buffer.putInt(sortedList.size()); for (TVList tvList : sortedList) { tvList.serializeToWAL(buffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java index 18c016bb3190a..46844c160afaf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java @@ -153,7 +153,8 @@ public TriggerFireResult visitInsertTablet(InsertTabletNode node, TriggerEvent c boolean hasFailedTrigger = false; for (Map.Entry> entry : triggerNameToMeasurementList.entrySet()) { Tablet tablet; - if (entry.getValue().size() == measurementSchemas.length) { + if (entry.getValue().size() == measurementSchemas.length + && measurementToSchemaIndexMap.size() == measurementSchemas.length) { // all measurements are included tablet = new Tablet( @@ -261,8 +262,8 @@ private Map constructMeasurementToSchemaIndexMap( // The index of measurement and schema is the same now. // However, in case one day the order changes, we need to construct an index map. Map indexMap = new HashMap<>(); - for (int i = 0, n = measurements.length; i < n; i++) { - if (measurements[i] == null) { + for (int i = 0, n = measurements == null ? 0 : measurements.length; i < n; i++) { + if (measurements[i] == null || schemas == null || i >= schemas.length) { continue; } // It is the same now diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java index 961a027e10372..74498eab0911a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java @@ -63,14 +63,16 @@ public static long getRecordSize(TSDataType dataType, Object value, boolean addi * memory before insertion */ public static long getRowRecordSize(List dataTypes, Object[] value) { - int emptyRecordCount = 0; + if (dataTypes == null) { + return 0L; + } + int dataTypeIndex = 0; long memSize = 0L; - for (int i = 0; i < value.length; i++) { + for (int i = 0; value != null && i < value.length && dataTypeIndex < dataTypes.size(); i++) { if (value[i] == null) { - emptyRecordCount++; continue; } - memSize += getRecordSize(dataTypes.get(i - emptyRecordCount), value[i], false); + memSize += getRecordSize(dataTypes.get(dataTypeIndex++), value[i], false); } return memSize; } @@ -80,13 +82,20 @@ public static long getRowRecordSize(List dataTypes, Object[] value) * memory before insertion */ public static long getAlignedRowRecordSize(List dataTypes, Object[] value) { + if (dataTypes == null) { + return 8L + 4L; + } // time and index size long memSize = 8L + 4L; - for (int i = 0; i < dataTypes.size(); i++) { - if (value[i] == null || dataTypes.get(i).isBinary()) { + int dataTypeIndex = 0; + for (int i = 0; value != null && i < value.length && dataTypeIndex < dataTypes.size(); i++) { + if (value[i] == null) { continue; } - memSize += dataTypes.get(i).getDataTypeSize(); + TSDataType dataType = dataTypes.get(dataTypeIndex++); + if (!dataType.isBinary()) { + memSize += dataType.getDataTypeSize(); + } } return memSize; } @@ -99,7 +108,9 @@ public static long getBinaryColumnSize(Binary[] column, int start, int end) { long memSize = 0; memSize += (long) (end - start) * RamUsageEstimator.NUM_BYTES_OBJECT_HEADER; for (int i = start; i < end; i++) { - memSize += RamUsageEstimator.sizeOf(column[i].getValues()); + if (column[i] != null) { + memSize += RamUsageEstimator.sizeOf(column[i].getValues()); + } } return memSize; } @@ -113,13 +124,22 @@ public static long getTabletSize(InsertTabletNode insertTabletNode, int start, i return 0L; } long memSize = 0; - for (int i = 0; i < insertTabletNode.getMeasurements().length; i++) { - if (insertTabletNode.getMeasurements()[i] == null) { + String[] measurements = insertTabletNode.getMeasurements(); + TSDataType[] dataTypes = insertTabletNode.getDataTypes(); + Object[] columns = insertTabletNode.getColumns(); + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (measurements[i] == null + || dataTypes == null + || i >= dataTypes.length + || dataTypes[i] == null + || columns == null + || i >= columns.length + || columns[i] == null) { continue; } // Time column memSize memSize += (end - start) * 8L; - memSize += (long) (end - start) * insertTabletNode.getDataTypes()[i].getDataTypeSize(); + memSize += (long) (end - start) * dataTypes[i].getDataTypeSize(); } return memSize; } @@ -129,11 +149,20 @@ public static long getAlignedTabletSize(InsertTabletNode insertTabletNode, int s return 0L; } long memSize = 0; - for (int i = 0; i < insertTabletNode.getMeasurements().length; i++) { - if (insertTabletNode.getMeasurements()[i] == null) { + String[] measurements = insertTabletNode.getMeasurements(); + TSDataType[] dataTypes = insertTabletNode.getDataTypes(); + Object[] columns = insertTabletNode.getColumns(); + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (measurements[i] == null + || dataTypes == null + || i >= dataTypes.length + || dataTypes[i] == null + || columns == null + || i >= columns.length + || columns[i] == null) { continue; } - memSize += (long) (end - start) * insertTabletNode.getDataTypes()[i].getDataTypeSize(); + memSize += (long) (end - start) * dataTypes[i].getDataTypeSize(); } // time and index column memSize for vector memSize += (end - start) * (8L + 4L); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatementTest.java new file mode 100644 index 0000000000000..8c563a84b1db4 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatementTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.transform.statement; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.time.ZoneId; + +public class PipeConvertedInsertRowStatementTest { + + @Test + public void testTransferTypeKeepsNullValue() throws Exception { + final InsertRowStatement statement = new InsertRowStatement(); + statement.setDevicePath(new PartialPath("root.sg.d1")); + statement.setMeasurements(new String[] {"s0"}); + statement.setMeasurementSchemas( + new MeasurementSchema[] {new MeasurementSchema("s0", TSDataType.INT32)}); + statement.setDataTypes(new TSDataType[] {null}); + statement.setTime(1L); + statement.setValues(new Object[] {null}); + statement.setNeedInferType(true); + + final PipeConvertedInsertRowStatement convertedStatement = + new PipeConvertedInsertRowStatement(statement); + convertedStatement.transferType(ZoneId.systemDefault()); + + Assert.assertEquals(TSDataType.INT32, convertedStatement.getDataTypes()[0]); + Assert.assertEquals("s0", convertedStatement.getMeasurements()[0]); + Assert.assertNull(convertedStatement.getValues()[0]); + Assert.assertFalse(convertedStatement.isNeedInferType()); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java index 58b2756b16f6b..22f2f95fd65ae 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java @@ -134,6 +134,18 @@ public void testInsertMultiTabletsNodeLaterTabletSizeIsEstimated() throws Illega Assert.assertTrue(largerNodeSize > baselineSize); } + @Test + public void testInsertTabletNodeWithNullColumnIsEstimated() throws IllegalPathException { + InsertTabletNode tablet = createTextInsertTabletNode("tablet", "root.sg.d1", 2, 4, 8); + + long fullSize = InsertNodeMemoryEstimator.sizeOf(tablet); + tablet.getColumns()[1] = null; + long sizeWithNullColumn = InsertNodeMemoryEstimator.sizeOf(tablet); + + Assert.assertTrue(sizeWithNullColumn > 0); + Assert.assertTrue(sizeWithNullColumn < fullSize); + } + @Test public void testPlanNodeIdIsEstimated() throws IllegalPathException { InsertRowNode shortPlanNodeIdRow = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java index a4e19d9717629..269d3c514f7f9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java @@ -98,6 +98,111 @@ public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, IOE Assert.assertEquals(insertRowNode, tmpNode); } + @Test + public void testSerializeSkipsRetainedMeasurementWithMissingValue() throws IllegalPathException { + InsertRowNode insertRowNode = + new InsertRowNode( + new PlanNodeId("plannode missing value"), + new PartialPath("root.sg.d1"), + false, + new String[] {"s1", "s2", "s3"}, + new TSDataType[] {TSDataType.INT32, TSDataType.INT32, TSDataType.INT32}, + 1L, + new Object[] {1, 2}, + false); + + ByteBuffer byteBuffer = ByteBuffer.allocate(10000); + insertRowNode.serialize(byteBuffer); + byteBuffer.flip(); + + Assert.assertEquals(PlanNodeType.INSERT_ROW.getNodeType(), byteBuffer.getShort()); + + InsertRowNode tmpNode = InsertRowNode.deserialize(byteBuffer); + Assert.assertArrayEquals(new String[] {"s1", "s2"}, tmpNode.getMeasurements()); + } + + @Test + public void testSerializeKeepsNullRowValueWithoutType() throws IllegalPathException { + InsertRowNode insertRowNode = + new InsertRowNode( + new PlanNodeId("plannode null value"), + new PartialPath("root.sg.d1"), + false, + new String[] {"s1", "s2"}, + new TSDataType[] {TSDataType.INT32, null}, + 1L, + new Object[] {1, null}, + false); + + ByteBuffer byteBuffer = ByteBuffer.allocate(10000); + insertRowNode.serialize(byteBuffer); + byteBuffer.flip(); + + Assert.assertEquals(PlanNodeType.INSERT_ROW.getNodeType(), byteBuffer.getShort()); + + InsertRowNode tmpNode = InsertRowNode.deserialize(byteBuffer); + Assert.assertArrayEquals(new String[] {"s1", "s2"}, tmpNode.getMeasurements()); + Assert.assertArrayEquals(new TSDataType[] {TSDataType.INT32, null}, tmpNode.getDataTypes()); + Assert.assertArrayEquals(new Object[] {1, null}, tmpNode.getValues()); + } + + @Test + public void testDeserializeFromWALSkipsRetainedMeasurementWithNullSchema() + throws IllegalPathException, IOException { + InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas(); + insertRowNode.getMeasurementSchemas()[1] = null; + + byte[] bytes = new byte[insertRowNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertRowNode.serializeToWAL(walBuffer); + Assert.assertFalse(walBuffer.getBuffer().hasRemaining()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + Assert.assertEquals(PlanNodeType.INSERT_ROW.getNodeType(), dataInputStream.readShort()); + + InsertRowNode tmpNode = InsertRowNode.deserializeFromWAL(dataInputStream); + Assert.assertArrayEquals( + new String[] {"\u6e29\u5ea6", "s3", "s4", "s5"}, tmpNode.getMeasurements()); + } + + @Test + public void testDeserializeFromWALSkipsRetainedMeasurementWithMissingValue() + throws IllegalPathException, IOException { + InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas(); + insertRowNode.setValues(new Object[] {5.0, 6.0f}); + + byte[] bytes = new byte[insertRowNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertRowNode.serializeToWAL(walBuffer); + Assert.assertFalse(walBuffer.getBuffer().hasRemaining()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + Assert.assertEquals(PlanNodeType.INSERT_ROW.getNodeType(), dataInputStream.readShort()); + + InsertRowNode tmpNode = InsertRowNode.deserializeFromWAL(dataInputStream); + Assert.assertArrayEquals( + new String[] {"\u6e29\u5ea6", "\u6e7f\u5ea6"}, tmpNode.getMeasurements()); + } + + @Test + public void testDeserializeFromWALWithMarkedFailedMeasurementOnly() + throws IllegalPathException, IOException { + InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas(); + insertRowNode.markFailedMeasurement(1); + + byte[] bytes = new byte[insertRowNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertRowNode.serializeToWAL(walBuffer); + Assert.assertFalse(walBuffer.getBuffer().hasRemaining()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + Assert.assertEquals(PlanNodeType.INSERT_ROW.getNodeType(), dataInputStream.readShort()); + + InsertRowNode tmpNode = InsertRowNode.deserializeFromWAL(dataInputStream); + Assert.assertArrayEquals( + new String[] {"\u6e29\u5ea6", "s3", "s4", "s5"}, tmpNode.getMeasurements()); + } + private InsertRowNode getInsertRowNode() throws IllegalPathException { long time = 110L; TSDataType[] dataTypes = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java index d992090ba0868..dc22033ff2854 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java @@ -79,4 +79,38 @@ public void TestSerializeAndDeserialize() throws IllegalPathException { Assert.assertEquals(node, InsertRowsOfOneDeviceNode.deserialize(byteBuffer)); } + + @Test + public void testStoreMeasurementsSkipsFailedMeasurements() throws IllegalPathException { + PartialPath device = new PartialPath("root.sg.d"); + InsertRowsOfOneDeviceNode node = new InsertRowsOfOneDeviceNode(new PlanNodeId("plan node 1")); + + List insertRowNodeList = new ArrayList<>(); + InsertRowNode firstRow = + new InsertRowNode( + new PlanNodeId("plan node 1"), + device, + false, + new String[] {"s1", "failed"}, + new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT}, + 1000L, + new Object[] {1.0, 2f}, + false); + firstRow.markFailedMeasurement(1); + insertRowNodeList.add(firstRow); + insertRowNodeList.add( + new InsertRowNode( + new PlanNodeId("plan node 1"), + device, + false, + new String[] {"s2"}, + new TSDataType[] {TSDataType.INT64}, + 2000L, + new Object[] {300L}, + false)); + + node.setInsertRowNodeList(insertRowNodeList); + + Assert.assertArrayEquals(new String[] {"s1", "s2"}, node.getMeasurements()); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java index a883b7d24a87e..309bd46d2c227 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java @@ -28,6 +28,7 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; @@ -93,6 +94,96 @@ public void testSerializeAndDeserializeForWAL() throws IllegalPathException, IOE Assert.assertEquals(insertTabletNode, tmpNode); } + @Test + public void testSerializedSizeWithClearedMeasurementAndRetainedColumn() + throws IllegalPathException { + InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema(); + insertTabletNode.getMeasurements()[1] = null; + insertTabletNode.getMeasurementSchemas()[1] = null; + insertTabletNode.getDataTypes()[1] = null; + + ByteBuffer byteBuffer = ByteBuffer.allocate(insertTabletNode.serializedSize()); + insertTabletNode.serializeToWAL(new WALByteBufferForTest(byteBuffer)); + + Assert.assertEquals(insertTabletNode.serializedSize(), byteBuffer.position()); + } + + @Test + public void testSerializedSizeWithRetainedMeasurementAndNullColumn() + throws IllegalPathException, IOException { + InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema(); + insertTabletNode.getColumns()[1] = null; + + byte[] bytes = new byte[insertTabletNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertTabletNode.serializeToWAL(walBuffer); + Assert.assertFalse(walBuffer.getBuffer().hasRemaining()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + Assert.assertEquals(PlanNodeType.INSERT_TABLET.getNodeType(), dataInputStream.readShort()); + + InsertTabletNode tmpNode = InsertTabletNode.deserializeFromWAL(dataInputStream); + Assert.assertArrayEquals( + new String[] {"\u6e29\u5ea6", "s3", "s4", "s5"}, tmpNode.getMeasurements()); + } + + @Test + public void testSerializeToWALWithoutMeasurementSchemas() throws Exception { + InsertTabletNode insertTabletNode = getInsertTabletNode(); + + byte[] bytes = new byte[insertTabletNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertTabletNode.serializeToWAL(walBuffer); + Assert.assertFalse(walBuffer.getBuffer().hasRemaining()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + Assert.assertEquals(PlanNodeType.INSERT_TABLET.getNodeType(), dataInputStream.readShort()); + + InsertTabletNode tmpNode = InsertTabletNode.deserializeFromWAL(dataInputStream); + Assert.assertArrayEquals(new String[0], tmpNode.getMeasurements()); + } + + @Test + public void testSerializeToWALWithShortBitMaps() throws Exception { + InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema(); + BitMap bitMap = new BitMap(insertTabletNode.getRowCount()); + bitMap.mark(0); + insertTabletNode.setBitMaps(new BitMap[] {bitMap}); + + byte[] bytes = new byte[insertTabletNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertTabletNode.serializeToWAL(walBuffer); + Assert.assertFalse(walBuffer.getBuffer().hasRemaining()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + Assert.assertEquals(PlanNodeType.INSERT_TABLET.getNodeType(), dataInputStream.readShort()); + + InsertTabletNode tmpNode = InsertTabletNode.deserializeFromWAL(dataInputStream); + Assert.assertArrayEquals( + new String[] {"\u6e29\u5ea6", "\u6e7f\u5ea6", "s3", "s4", "s5"}, tmpNode.getMeasurements()); + Assert.assertNotNull(tmpNode.getBitMaps()); + Assert.assertTrue(tmpNode.getBitMaps()[0].isMarked(0)); + } + + @Test + public void testDeserializeFromWALWithMarkedFailedMeasurementOnly() + throws IllegalPathException, IOException { + InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema(); + insertTabletNode.markFailedMeasurement(1); + + byte[] bytes = new byte[insertTabletNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertTabletNode.serializeToWAL(walBuffer); + Assert.assertFalse(walBuffer.getBuffer().hasRemaining()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + Assert.assertEquals(PlanNodeType.INSERT_TABLET.getNodeType(), dataInputStream.readShort()); + + InsertTabletNode tmpNode = InsertTabletNode.deserializeFromWAL(dataInputStream); + Assert.assertArrayEquals( + new String[] {"\u6e29\u5ea6", "s3", "s4", "s5"}, tmpNode.getMeasurements()); + } + @Test public void testInitTabletValuesWithAllTypes() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertStatementPartialInsertTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertStatementPartialInsertTest.java new file mode 100644 index 0000000000000..d696745edf35f --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertStatementPartialInsertTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.crud; + +import org.apache.iotdb.commons.path.PartialPath; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class InsertStatementPartialInsertTest { + + @Test + public void testInsertRowStatementGetPathsSkipsFailedMeasurements() throws Exception { + final InsertRowStatement statement = createInsertRowStatement(); + statement.markFailedMeasurement(0, new RuntimeException("failed")); + + Assert.assertEquals(1, statement.getPaths().size()); + Assert.assertEquals("root.sg.d1.s2", statement.getPaths().get(0).getFullPath()); + } + + @Test + public void testInsertTabletStatementGetPathsSkipsFailedMeasurements() throws Exception { + final InsertTabletStatement statement = createInsertTabletStatement(); + statement.markFailedMeasurement(1, new RuntimeException("failed")); + + Assert.assertEquals(1, statement.getPaths().size()); + Assert.assertEquals("root.sg.d1.s1", statement.getPaths().get(0).getFullPath()); + } + + @Test + public void testInsertRowsOfOneDeviceStatementSkipsFailedMeasurements() throws Exception { + final InsertRowStatement statement = createInsertRowStatement(); + statement.markFailedMeasurement(0, new RuntimeException("failed")); + final InsertRowsOfOneDeviceStatement rowsOfOneDeviceStatement = + new InsertRowsOfOneDeviceStatement(); + rowsOfOneDeviceStatement.setInsertRowStatementList(Arrays.asList(statement)); + + Assert.assertArrayEquals(new String[] {"s2"}, rowsOfOneDeviceStatement.getMeasurements()); + Assert.assertEquals(1, rowsOfOneDeviceStatement.getPaths().size()); + Assert.assertEquals("root.sg.d1.s2", rowsOfOneDeviceStatement.getPaths().get(0).getFullPath()); + } + + @Test + public void testInsertRowStatementMarkFailedMeasurementHandlesMissingValue() throws Exception { + final InsertRowStatement statement = createInsertRowStatement(); + statement.setValues(new Object[] {1}); + + statement.markFailedMeasurement(1, new RuntimeException("failed")); + + Assert.assertNull(statement.getMeasurements()[1]); + Assert.assertNull(statement.getDataTypes()[1]); + + statement.removeAllFailedMeasurementMarks(); + + Assert.assertEquals("s2", statement.getMeasurements()[1]); + Assert.assertEquals(TSDataType.INT64, statement.getDataTypes()[1]); + } + + @Test + public void testInsertTabletStatementMarkFailedMeasurementHandlesMissingColumn() + throws Exception { + final InsertTabletStatement statement = createInsertTabletStatement(); + statement.setColumns(new Object[] {new int[] {1, 2}}); + + statement.markFailedMeasurement(1, new RuntimeException("failed")); + + Assert.assertNull(statement.getMeasurements()[1]); + Assert.assertNull(statement.getDataTypes()[1]); + + statement.removeAllFailedMeasurementMarks(); + + Assert.assertEquals("s2", statement.getMeasurements()[1]); + Assert.assertEquals(TSDataType.INT64, statement.getDataTypes()[1]); + } + + @Test + public void testInsertTabletStatementGetFirstValueOfIndexReturnsNullForNullColumn() + throws Exception { + final InsertTabletStatement statement = createInsertTabletStatement(); + statement.getColumns()[0] = null; + + Assert.assertNull(statement.getFirstValueOfIndex(0)); + } + + @Test + public void testGetDataTypeReturnsNullForShortTypeArray() throws Exception { + final InsertRowStatement rowStatement = createInsertRowStatement(); + rowStatement.setDataTypes(new TSDataType[] {TSDataType.INT32}); + Assert.assertEquals(TSDataType.INT32, rowStatement.getDataType(0)); + Assert.assertNull(rowStatement.getDataType(1)); + + final InsertTabletStatement tabletStatement = createInsertTabletStatement(); + tabletStatement.setDataTypes(new TSDataType[] {TSDataType.INT32}); + Assert.assertEquals(TSDataType.INT32, tabletStatement.getDataType(0)); + Assert.assertNull(tabletStatement.getDataType(1)); + } + + @Test + public void testBaseSettersExpandShortArrays() throws Exception { + final InsertRowStatement statement = createInsertRowStatement(); + statement.setDataTypes(new TSDataType[] {TSDataType.INT32}); + statement.setMeasurementSchemas( + new MeasurementSchema[] {new MeasurementSchema("s1", TSDataType.INT32)}); + + statement.setDataType(TSDataType.INT64, 1); + statement.setMeasurementSchema(new MeasurementSchema("s2", TSDataType.INT64), 1); + + Assert.assertEquals(TSDataType.INT64, statement.getDataType(1)); + Assert.assertEquals("s2", statement.getMeasurementSchemas()[1].getMeasurementId()); + } + + @Test + public void testDeviceMeasurementMapSkipsMissingRowValues() throws Exception { + final InsertRowStatement statement = createInsertRowStatement(); + statement.setValues(new Object[] {1}); + + final Map>> result = + statement.getMapFromDeviceToMeasurementAndIndex(); + + Assert.assertEquals(1, result.get(statement.getDevicePath()).size()); + Assert.assertEquals("s1", result.get(statement.getDevicePath()).get(0).left); + } + + @Test + public void testDeviceMeasurementMapSkipsMissingTabletColumns() throws Exception { + final InsertTabletStatement statement = createInsertTabletStatement(); + statement.setColumns(new Object[] {new int[] {1, 2}}); + + final Map>> result = + statement.getMapFromDeviceToMeasurementAndIndex(); + + Assert.assertEquals(1, result.get(statement.getDevicePath()).size()); + Assert.assertEquals("s1", result.get(statement.getDevicePath()).get(0).left); + } + + private static InsertRowStatement createInsertRowStatement() throws Exception { + final InsertRowStatement statement = new InsertRowStatement(); + statement.setDevicePath(new PartialPath("root.sg.d1")); + statement.setMeasurements(new String[] {"s1", "s2"}); + statement.setDataTypes(new TSDataType[] {TSDataType.INT32, TSDataType.INT64}); + statement.setMeasurementSchemas( + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT64) + }); + statement.setValues(new Object[] {1, 2L}); + statement.setTime(1L); + return statement; + } + + private static InsertTabletStatement createInsertTabletStatement() throws Exception { + final InsertTabletStatement statement = new InsertTabletStatement(); + statement.setDevicePath(new PartialPath("root.sg.d1")); + statement.setMeasurements(new String[] {"s1", "s2"}); + statement.setDataTypes(new TSDataType[] {TSDataType.INT32, TSDataType.INT64}); + statement.setMeasurementSchemas( + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT64) + }); + statement.setColumns(new Object[] {new int[] {1, 2}, new long[] {3L, 4L}}); + statement.setTimes(new long[] {1L, 2L}); + statement.setRowCount(2); + return statement; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index f7a79eb8bf8e5..c4c9c14ab89aa 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -322,12 +322,12 @@ public void testIoTDBTabletWriteAndSyncClose() new QueryId("test_write").genPlanNodeId(), new PartialPath("root.vehicle.d0"), false, - measurements, - dataTypes, - measurementSchemas, + measurements.clone(), + dataTypes.clone(), + measurementSchemas.clone(), times, null, - columns, + columns.clone(), times.length); dataRegion.insertTablet(insertTabletNode2); @@ -538,13 +538,15 @@ public void testAllMeasurementsFailedTabletWriteAndSyncClose() new QueryId("test_write").genPlanNodeId(), new PartialPath("root.vehicle.d0"), false, - measurements, - dataTypes, - measurementSchemas, + measurements.clone(), + dataTypes.clone(), + measurementSchemas.clone(), times, null, - columns, + columns.clone(), times.length); + insertTabletNode1.markFailedMeasurement(0); + insertTabletNode1.markFailedMeasurement(1); insertTabletNode1.setFailedMeasurementNumber(2); dataRegion.insertTablet(insertTabletNode1); @@ -561,13 +563,15 @@ public void testAllMeasurementsFailedTabletWriteAndSyncClose() new QueryId("test_write").genPlanNodeId(), new PartialPath("root.vehicle.d0"), false, - measurements, - dataTypes, - measurementSchemas, + measurements.clone(), + dataTypes.clone(), + measurementSchemas.clone(), times, null, - columns, + columns.clone(), times.length); + insertTabletNode2.markFailedMeasurement(0); + insertTabletNode2.markFailedMeasurement(1); insertTabletNode2.setFailedMeasurementNumber(2); dataRegion.insertTablet(insertTabletNode2); @@ -629,6 +633,7 @@ public void testAllMeasurementsFailedRecordSeqAndUnSeqSyncClose() TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record); + rowNode.markFailedMeasurement(0); rowNode.setFailedMeasurementNumber(1); dataRegion.insert(rowNode); dataRegion.syncCloseAllWorkingTsFileProcessors(); @@ -638,6 +643,7 @@ public void testAllMeasurementsFailedRecordSeqAndUnSeqSyncClose() TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); InsertRowNode rowNode = buildInsertRowNodeByTSRecord(record); + rowNode.markFailedMeasurement(0); rowNode.setFailedMeasurementNumber(1); dataRegion.insert(rowNode); dataRegion.syncCloseAllWorkingTsFileProcessors(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTablePartialInsertTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTablePartialInsertTest.java new file mode 100644 index 0000000000000..6e98093141691 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTablePartialInsertTest.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.memtable; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.exception.WriteProcessException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests that verify the fix for the negative {@code pointsInserted} bug when a partial insert + * contains failed measurements (i.e. {@code measurements[i] == null}). + * + *

Before the fix: + * + *

    + *
  • {@code insertAlignedRow}: a failed measurement whose value slot is also {@code null} was + * incorrectly counted as a {@code nullPoint}, making {@code pointsInserted} go negative. + *
  • {@code computeTabletNullPointsNumber}: failed measurements were not skipped, so their + * bitmap marks were counted, again producing a negative {@code pointsInserted}. + *
+ * + *

After the fix both paths skip failed measurements, so {@code pointsInserted >= 0} always. + */ +public class AbstractMemTablePartialInsertTest { + + private PrimitiveMemTable memTable; + + @Before + public void setUp() { + memTable = new PrimitiveMemTable("root.sg", "0"); + } + + // ========================================================================= + // insertAlignedRow – failed measurement must not be counted as nullPoint + // ========================================================================= + + /** All measurements succeed, no null values → pointsInserted == total measurements (3). */ + @Test + public void testInsertAlignedRow_noFailure_noNullValue_pointsInsertedEqualsTotal() + throws IllegalPathException { + // 3 measurements, all valid, all have values + // formula: getMeasurementColumnCnt(3) - failedNum(0) - nullPoints(0) = 3 + InsertRowNode node = + buildAlignedInsertRowNode( + new String[] {"s0", "s1", "s2"}, new Object[] {1, 2, 3}, -1 /* no failure */); + + int points = memTable.insertAlignedRow(node); + + assertEquals(3, points); + assertEquals(3, memTable.getTotalPointsNum()); + } + + /** + * One measurement fails (partial insert). The failed slot has measurements[i]==null and + * values[i]==null. Before the fix this null value was counted as a nullPoint, making + * pointsInserted negative. After the fix it is skipped. + * + *

formula: getMeasurementColumnCnt(2) - failedNum(1) - nullPoints(0) = 1 + */ + @Test + public void testInsertAlignedRow_oneFailedMeasurement_pointsInsertedNotNegative() + throws IllegalPathException { + // 2 measurements, first one fails + InsertRowNode node = + buildAlignedInsertRowNode( + new String[] {"s0", "s1"}, new Object[] {1, 2}, 0 /* mark index 0 as failed */); + + int points = memTable.insertAlignedRow(node); + + assertEquals(1, points); + assertEquals(1, memTable.getTotalPointsNum()); + } + + @Test + public void testInsertAlignedRow_markedFailedMeasurementOnly_pointsInsertedMatchesWrittenPoints() + throws IllegalPathException { + InsertRowNode node = + buildAlignedInsertRowNode( + new String[] {"s0", "s1"}, new Object[] {1, 2}, -1 /* no failure */); + node.markFailedMeasurement(0); + + int points = memTable.insertAlignedRow(node); + + assertEquals(1, points); + assertEquals(1, memTable.getTotalPointsNum()); + } + + /** All measurements fail → insertAlignedRow returns 0 early (schemaList is empty). */ + @Test + public void testInsertAlignedRow_allMeasurementsFailed_pointsInsertedIsZero() + throws IllegalPathException { + InsertRowNode node = + buildAlignedInsertRowNode( + new String[] {"s0", "s1"}, + new Object[] {1, 2L}, + -1 /* mark all failed manually below */); + // mark both as failed + node.markFailedMeasurement(0); + node.markFailedMeasurement(1); + node.setFailedMeasurementNumber(2); + + int points = memTable.insertAlignedRow(node); + + assertEquals(0, points); + assertEquals(0, memTable.getTotalPointsNum()); + } + + // ========================================================================= + // insertTablet – failed measurement must be skipped in null-point counting + // ========================================================================= + + /** Normal tablet insert with no failures and no null values → pointsInserted == cols * rows. */ + @Test + public void testInsertTablet_noFailure_noNullBits_pointsInsertedEqualsColsTimesRows() + throws IllegalPathException, WriteProcessException { + // formula: (dataTypes.length(2) - failedNum(0)) * rows(3) - nullPoints(0) = 6 + InsertTabletNode node = + buildInsertTabletNode( + new String[] {"s0", "s1"}, 3, null /* no bitmaps */, -1 /* no failure */); + + int points = memTable.insertTablet(node, 0, 3); + + assertEquals(6, points); + assertEquals(6, memTable.getTotalPointsNum()); + } + + /** + * One measurement fails. Before the fix, if the failed column's bitmap had marks, those marks + * were counted as null points, making pointsInserted negative. After the fix the failed column is + * skipped entirely. + * + *

formula: (dataTypes.length(2) - failedNum(1)) * rows(3) - nullPoints(0, col-0 skipped) = 3 + */ + @Test + public void testInsertTablet_oneFailedMeasurement_withBitmap_pointsInsertedNotNegative() + throws IllegalPathException, WriteProcessException { + int rowCount = 3; + // bitmap for column 0: all rows marked as null + BitMap[] bitMaps = new BitMap[2]; + bitMaps[0] = new BitMap(rowCount); + bitMaps[0].markAll(); + bitMaps[1] = null; + + // mark column 0 as failed (partial insert) + InsertTabletNode node = + buildInsertTabletNode( + new String[] {"s0", "s1"}, rowCount, bitMaps, 0 /* mark index 0 as failed */); + + int points = memTable.insertTablet(node, 0, rowCount); + + assertEquals(3, points); + assertEquals(3, memTable.getTotalPointsNum()); + } + + @Test + public void testInsertTablet_markedFailedMeasurementOnly_pointsInsertedMatchesWrittenPoints() + throws IllegalPathException, WriteProcessException { + int rowCount = 3; + InsertTabletNode node = + buildInsertTabletNode(new String[] {"s0", "s1"}, rowCount, null, -1 /* no failure */); + node.markFailedMeasurement(0); + + int points = memTable.insertTablet(node, 0, rowCount); + + assertEquals(3, points); + assertEquals(3, memTable.getTotalPointsNum()); + } + + /** All measurements fail → pointsInserted == 0. formula: (2-2)*3 - 0 = 0 */ + @Test + public void testInsertTablet_allMeasurementsFailed_pointsInsertedIsZero() + throws IllegalPathException, WriteProcessException { + int rowCount = 3; + InsertTabletNode node = buildInsertTabletNode(new String[] {"s0", "s1"}, rowCount, null, -1); + node.markFailedMeasurement(0); + node.markFailedMeasurement(1); + node.setFailedMeasurementNumber(2); + + int points = memTable.insertTablet(node, 0, rowCount); + + assertEquals(0, points); + assertEquals(0, memTable.getTotalPointsNum()); + } + + /** Tablet with no failures and some null values in bitmap still counts all writable cells. */ + @Test + public void testInsertTablet_noFailure_withNullBits_pointsInsertedNotNegative() + throws IllegalPathException, WriteProcessException { + int rowCount = 4; + // column 1: rows 0 and 2 are null + BitMap[] bitMaps = new BitMap[2]; + bitMaps[0] = null; + bitMaps[1] = new BitMap(rowCount); + bitMaps[1].mark(0); + bitMaps[1].mark(2); + + InsertTabletNode node = + buildInsertTabletNode(new String[] {"s0", "s1"}, rowCount, bitMaps, -1 /* no failure */); + + int points = memTable.insertTablet(node, 0, rowCount); + + assertEquals(8, points); + assertEquals(8, memTable.getTotalPointsNum()); + } + + // ========================================================================= + // Helpers + // ========================================================================= + + /** + * Builds an aligned InsertRowNode. If {@code failedIndex >= 0} that measurement is marked failed + * via {@link InsertRowNode#markFailedMeasurement(int)}. + */ + private static InsertRowNode buildAlignedInsertRowNode( + String[] measurementNames, Object[] values, int failedIndex) throws IllegalPathException { + int n = measurementNames.length; + TSDataType[] dataTypes = new TSDataType[n]; + MeasurementSchema[] schemas = new MeasurementSchema[n]; + for (int i = 0; i < n; i++) { + dataTypes[i] = TSDataType.INT32; + schemas[i] = new MeasurementSchema(measurementNames[i], TSDataType.INT32); + } + InsertRowNode node = + new InsertRowNode( + new PlanNodeId("test"), + new PartialPath("root.sg.d1"), + true /* isAligned */, + measurementNames, + dataTypes, + schemas, + 1L, + values, + false); + if (failedIndex >= 0) { + node.markFailedMeasurement(failedIndex); + node.setFailedMeasurementNumber(1); + } + return node; + } + + /** + * Builds a non-aligned InsertTabletNode. If {@code failedIndex >= 0} that measurement is marked + * failed via {@link InsertTabletNode#markFailedMeasurement(int)}. + */ + private static InsertTabletNode buildInsertTabletNode( + String[] measurementNames, int rowCount, BitMap[] bitMaps, int failedIndex) + throws IllegalPathException { + int n = measurementNames.length; + TSDataType[] dataTypes = new TSDataType[n]; + Object[] columns = new Object[n]; + MeasurementSchema[] schemas = new MeasurementSchema[n]; + for (int i = 0; i < n; i++) { + dataTypes[i] = TSDataType.INT32; + columns[i] = new int[rowCount]; + schemas[i] = new MeasurementSchema(measurementNames[i], TSDataType.INT32); + } + long[] times = new long[rowCount]; + for (int i = 0; i < rowCount; i++) { + times[i] = i + 1L; + } + InsertTabletNode node = + new InsertTabletNode( + new PlanNodeId("test"), + new PartialPath("root.sg.d1"), + false /* isAligned */, + measurementNames, + dataTypes, + schemas, + times, + bitMaps, + columns, + rowCount); + if (failedIndex >= 0) { + node.markFailedMeasurement(failedIndex); + node.setFailedMeasurementNumber(1); + } + return node; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/MemChunkDeserializeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/MemChunkDeserializeTest.java index 04a81ee1866bc..cd5e6cc3b4a86 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/MemChunkDeserializeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/MemChunkDeserializeTest.java @@ -284,6 +284,29 @@ public void testAlignedSeries() throws IOException, QueryProcessException, Metad } } + @Test + public void testNonAlignedMemChunkGroupSerializedSizeWithNonAsciiMeasurement() + throws IOException { + String measurement = "\u6e29\u5ea6"; + WritableMemChunk series = + new WritableMemChunk( + new MeasurementSchema(measurement, TSDataType.INT32, TSEncoding.PLAIN)); + series.writeNonAlignedPoint(1, 1); + + WritableMemChunkGroup group = new WritableMemChunkGroup(); + group.getMemChunkMap().put(measurement, series); + + WALByteBufferForTest walBuffer = + new WALByteBufferForTest(ByteBuffer.allocate(group.serializedSize())); + group.serializeToWAL(walBuffer); + Assert.assertEquals(group.serializedSize(), walBuffer.getBuffer().position()); + + DataInputStream inputStream = + new DataInputStream(new ByteArrayInputStream(walBuffer.getBuffer().array())); + WritableMemChunkGroup deserialized = WritableMemChunkGroup.deserialize(inputStream); + Assert.assertTrue(deserialized.getMemChunkMap().containsKey(measurement)); + } + private WritableMemChunk createWritableMemChunkFromBytes(WritableMemChunk series) throws IOException { int serializedSize = series.serializedSize();