diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java index b530f24783d2..4d9c76c917ae 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java @@ -20,29 +20,44 @@ import com.aliyun.oss.OSS; import com.aliyun.oss.model.GetObjectRequest; +import com.aliyun.oss.model.OSSObject; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; import java.io.IOException; import java.io.InputStream; +import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.Arrays; +import java.util.List; +import javax.net.ssl.SSLException; import org.apache.iceberg.io.FileIOMetricsContext; +import org.apache.iceberg.io.IOUtil; +import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.metrics.Counter; import org.apache.iceberg.metrics.MetricsContext; import org.apache.iceberg.metrics.MetricsContext.Unit; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class OSSInputStream extends SeekableInputStream { +class OSSInputStream extends SeekableInputStream implements RangeReadable { private static final Logger LOG = LoggerFactory.getLogger(OSSInputStream.class); - private static final int SKIP_SIZE = 1024 * 1024; + + private static final List> RETRYABLE_EXCEPTIONS = + ImmutableList.of(SSLException.class, SocketTimeoutException.class, SocketException.class); private final StackTraceElement[] createStack; private final OSS client; private final OSSURI uri; - private InputStream stream = null; + private OSSObject currentObject; + private InputStream stream; private long pos = 0; private long next = 0; private boolean closed = false; @@ -50,6 +65,24 @@ class OSSInputStream extends SeekableInputStream { private final Counter readBytes; private final Counter readOperations; + private int skipSize = 1024 * 1024; + private RetryPolicy retryPolicy = + RetryPolicy.builder() + .handle(RETRYABLE_EXCEPTIONS) + .onRetry( + e -> { + LOG.warn( + "Retrying read from OSS, reopening stream (attempt {})", e.getAttemptCount()); + resetForRetry(); + }) + .onFailure( + e -> + LOG.error( + "Failed to read from OSS input stream after exhausting all retries", + e.getException())) + .withMaxRetries(3) + .build(); + OSSInputStream(OSS client, OSSURI uri) { this(client, uri, MetricsContext.nullMetrics()); } @@ -82,12 +115,23 @@ public int read() throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); - pos += 1; - next += 1; - readBytes.increment(); - readOperations.increment(); + try { + int bytesRead = Failsafe.with(retryPolicy).get(() -> stream.read()); + if (bytesRead != -1) { + pos += 1; + next += 1; + readBytes.increment(); + readOperations.increment(); + } + + return bytesRead; + } catch (FailsafeException ex) { + if (ex.getCause() instanceof IOException) { + throw (IOException) ex.getCause(); + } - return stream.read(); + throw ex; + } } @Override @@ -95,68 +139,150 @@ public int read(byte[] b, int off, int len) throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); - int bytesRead = stream.read(b, off, len); - pos += bytesRead; - next += bytesRead; - readBytes.increment(bytesRead); - readOperations.increment(); + try { + int bytesRead = Failsafe.with(retryPolicy).get(() -> stream.read(b, off, len)); + if (bytesRead > 0) { + pos += bytesRead; + next += bytesRead; + readBytes.increment(bytesRead); + readOperations.increment(); + } + + return bytesRead; + } catch (FailsafeException ex) { + if (ex.getCause() instanceof IOException) { + throw (IOException) ex.getCause(); + } - return bytesRead; + throw ex; + } } @Override - public void close() throws IOException { - if (closed) { - return; + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + Preconditions.checkPositionIndexes(offset, offset + length, buffer.length); + + GetObjectRequest request = + new GetObjectRequest(uri.bucket(), uri.key()).withRange(position, position + length - 1); + OSSObject object = client.getObject(request); + try { + IOUtil.readFully(object.getObjectContent(), buffer, offset, length); + readBytes.increment(length); + readOperations.increment(); + } finally { + // Use close() instead of forcedClose() here because the range response data has been + // fully consumed. close() goes through InputStream -> EofSensorInputStream path which + // detects EOF and returns the connection to Apache HttpClient's connection pool. + // forcedClose() calls httpResponse.close() -> ConnectionHolder.close() which hardcodes + // releaseConnection(false), always destroying the connection even if data was consumed. + object.close(); } + } + + @Override + public int readTail(byte[] buffer, int offset, int length) throws IOException { + Preconditions.checkPositionIndexes(offset, offset + length, buffer.length); + GetObjectRequest request = new GetObjectRequest(uri.bucket(), uri.key()).withRange(-1, length); + OSSObject object = client.getObject(request); + try { + int bytesRead = IOUtil.readRemaining(object.getObjectContent(), buffer, offset, length); + readBytes.increment(bytesRead); + readOperations.increment(); + return bytesRead; + } finally { + // Same as readFully: data is fully consumed, use close() to allow connection reuse. + object.close(); + } + } + + @Override + public void close() throws IOException { super.close(); - closeStream(); closed = true; + closeStream(false); } private void positionStream() throws IOException { if ((stream != null) && (next == pos)) { - // already at specified position. + // already at specified position return; } if ((stream != null) && (next > pos)) { // seeking forwards long skip = next - pos; - if (skip <= Math.max(stream.available(), SKIP_SIZE)) { + if (skip <= Math.max(stream.available(), skipSize)) { // already buffered or seek is small enough - LOG.debug("Read-through seek for {} from {} to offset {}", uri, pos, next); + LOG.debug("Read-through seek for {} to offset {}", uri, next); try { ByteStreams.skipFully(stream, skip); pos = next; return; } catch (IOException ignored) { - // will retry by re-opening the stream. + // will retry by re-opening the stream } } } - // close the stream and open at desired position. + // close the stream and open at desired position LOG.debug("Seek with new stream for {} to offset {}", uri, next); pos = next; openStream(); } private void openStream() throws IOException { - closeStream(); + openStream(false); + } + private void openStream(boolean closeQuietly) throws IOException { GetObjectRequest request = new GetObjectRequest(uri.bucket(), uri.key()).withRange(pos, -1); - stream = client.getObject(request).getObjectContent(); + + closeStream(closeQuietly); + + currentObject = client.getObject(request); + stream = currentObject.getObjectContent(); + } + + @VisibleForTesting + void resetForRetry() throws IOException { + openStream(true); } - private void closeStream() throws IOException { - if (stream != null) { - stream.close(); - stream = null; + private void closeStream(boolean closeQuietly) throws IOException { + if (currentObject != null) { + // forcedClose() calls httpResponse.close() -> ConnectionHolder.close() which sets + // released=true and closes the socket without consuming remaining TCP data. + // After that, objectContent.close() goes through EofSensorInputStream -> + // ResponseEntityProxy.streamClosed() which detects the connection is already released + // (open=false) and swallows any IOException internally. So close() after forcedClose() + // should not throw. The catch below is a safety net for the edge case where + // forcedClose() itself fails and the connection is still open. + try { + currentObject.forcedClose(); + } catch (Exception e) { + LOG.warn("An error occurred while aborting the stream", e); + } + + try { + currentObject.close(); + } catch (IOException e) { + if (!closeQuietly) { + throw e; + } + + LOG.warn("An error occurred while closing the stream", e); + } finally { + currentObject = null; + stream = null; + } } } + public void setSkipSize(int skipSize) { + this.skipSize = skipSize; + } + @SuppressWarnings({"checkstyle:NoFinalizer", "Finalize", "deprecation"}) @Override protected void finalize() throws Throwable { @@ -164,7 +290,7 @@ protected void finalize() throws Throwable { if (!closed) { close(); // releasing resources is more important than printing the warning String trace = Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length)); - LOG.warn("Unclosed input stream created by: \n\t{}", trace); + LOG.warn("Unclosed input stream created by:\n\t{}", trace); } } } diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputStream.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputStream.java index 053610983c10..805c5fa83151 100644 --- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputStream.java +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputStream.java @@ -20,12 +20,26 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import com.aliyun.oss.OSS; +import com.aliyun.oss.model.GetObjectRequest; +import com.aliyun.oss.model.OSSObject; import java.io.ByteArrayInputStream; +import java.io.FilterInputStream; import java.io.IOException; +import java.io.InputStream; +import java.net.SocketTimeoutException; import java.util.Arrays; import java.util.Random; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; import org.junit.jupiter.api.Test; @@ -119,6 +133,335 @@ public void testSeek() throws Exception { } } + @Test + public void testReadFully() throws Exception { + int dataSize = 1024 * 1024; + byte[] data = randomData(dataSize); + OSSInputStream in = newInputStream(data); + + int offset = 100; + int length = 256; + byte[] buffer = new byte[length]; + try (in) { + ((RangeReadable) in).readFully(offset, buffer, 0, length); + } + + assertThat(buffer).isEqualTo(Arrays.copyOfRange(data, offset, offset + length)); + } + + @Test + public void testReadTail() throws Exception { + int dataSize = 1024 * 1024; + byte[] data = randomData(dataSize); + OSSInputStream in = newInputStream(data); + + int length = 256; + byte[] buffer = new byte[length]; + int bytesRead; + try (in) { + bytesRead = ((RangeReadable) in).readTail(buffer, 0, length); + } + + assertThat(bytesRead).isEqualTo(length); + assertThat(buffer).isEqualTo(Arrays.copyOfRange(data, dataSize - length, dataSize)); + } + + @Test + public void testReadTailSmallFile() throws Exception { + int dataSize = 100; + byte[] data = randomData(dataSize); + OSSInputStream in = newInputStream(data); + + int length = 256; + byte[] buffer = new byte[length]; + int bytesRead; + try (in) { + bytesRead = ((RangeReadable) in).readTail(buffer, 0, length); + } + + assertThat(bytesRead).isEqualTo(dataSize); + assertThat(Arrays.copyOfRange(buffer, 0, bytesRead)).isEqualTo(data); + } + + @Test + public void testReadFullyDoesNotAffectStreamPosition() throws Exception { + int dataSize = 1024 * 1024; + byte[] data = randomData(dataSize); + OSSInputStream in = newInputStream(data); + + try (in) { + // read some bytes to advance position + int readSize = 128; + byte[] buf = new byte[readSize]; + ByteStreams.readFully(in, buf); + long posBeforeRangeRead = in.getPos(); + assertThat(posBeforeRangeRead).isEqualTo(readSize); + + // readFully at a different position + byte[] rangeBuffer = new byte[64]; + ((RangeReadable) in).readFully(5000, rangeBuffer, 0, 64); + + // stream position unchanged + assertThat(in.getPos()).isEqualTo(posBeforeRangeRead); + + // subsequent sequential read continues from original position + byte[] nextBuf = new byte[readSize]; + ByteStreams.readFully(in, nextBuf); + assertThat(nextBuf) + .isEqualTo(Arrays.copyOfRange(data, (int) posBeforeRangeRead, readSize * 2)); + } + } + + @Test + public void testSingleByteReadAtEOF() throws Exception { + int dataSize = 128; + byte[] data = randomData(dataSize); + OSSInputStream in = newInputStream(data); + + try (in) { + in.seek(dataSize); + long posBefore = in.getPos(); + assertThat(in.read()).isEqualTo(-1); + assertThat(in.getPos()).isEqualTo(posBefore); + } + } + + @Test + public void testBulkReadAtEOF() throws Exception { + int dataSize = 128; + byte[] data = randomData(dataSize); + OSSInputStream in = newInputStream(data); + + try (in) { + in.seek(dataSize); + long posBefore = in.getPos(); + byte[] buf = new byte[64]; + assertThat(in.read(buf, 0, buf.length)).isEqualTo(-1); + assertThat(in.getPos()).isEqualTo(posBefore); + } + } + + @Test + public void testReadAfterClose() throws Exception { + OSSInputStream in = newInputStream(randomData(128)); + in.close(); + + assertThatThrownBy(() -> in.read()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot read: already closed"); + } + + @Test + public void testSeekNegativePosition() throws Exception { + try (OSSInputStream in = newInputStream(randomData(128))) { + assertThatThrownBy(() -> in.seek(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Position is negative"); + } + } + + @Test + public void testRetryOnReadFailure() throws Exception { + byte[] data = randomData(256); + OSSURI uri = randomURI(); + + OSSObject failObject = mockOSSObject(socketTimeoutStream()); + OSSObject goodObject = mockOSSObject(new ByteArrayInputStream(data)); + + OSS mockClient = mock(OSS.class); + when(mockClient.getObject(any(GetObjectRequest.class))) + .thenReturn(failObject) + .thenReturn(goodObject); + + try (OSSInputStream in = new OSSInputStream(mockClient, uri)) { + byte[] buf = new byte[data.length]; + ByteStreams.readFully(in, buf); + assertThat(buf).isEqualTo(data); + } + + verify(mockClient, times(2)).getObject(any(GetObjectRequest.class)); + } + + @Test + public void testSkipFailureFallback() throws Exception { + byte[] data = randomData(1024); + int readSize = 64; + OSSURI uri = randomURI(); + + InputStream failOnSkipStream = + new FilterInputStream(new ByteArrayInputStream(data)) { + @Override + public long skip(long n) throws IOException { + throw new IOException("skip failed"); + } + }; + + OSSObject firstObject = mockOSSObject(failOnSkipStream); + OSSObject secondObject = + mockOSSObject( + new ByteArrayInputStream(Arrays.copyOfRange(data, readSize + 1, data.length))); + + OSS mockClient = mock(OSS.class); + when(mockClient.getObject(any(GetObjectRequest.class))) + .thenReturn(firstObject) + .thenReturn(secondObject); + + try (OSSInputStream in = new OSSInputStream(mockClient, uri)) { + byte[] buf = new byte[readSize]; + ByteStreams.readFully(in, buf); + assertThat(buf).isEqualTo(Arrays.copyOfRange(data, 0, readSize)); + + // small forward seek triggers skip, which fails and falls back to new stream + in.seek(readSize + 1); + assertThat(in.read()).isEqualTo(Byte.toUnsignedInt(data[readSize + 1])); + } + } + + @Test + public void testCloseStreamQuietlyOnRetry() throws Exception { + byte[] data = randomData(256); + OSSURI uri = randomURI(); + + OSSObject failObject = mockOSSObject(socketTimeoutStream()); + doThrow(new IOException("close failed")).when(failObject).close(); + + OSSObject goodObject = mockOSSObject(new ByteArrayInputStream(data)); + + OSS mockClient = mock(OSS.class); + when(mockClient.getObject(any(GetObjectRequest.class))) + .thenReturn(failObject) + .thenReturn(goodObject); + + // close() throws during retry but closeQuietly=true swallows it + try (OSSInputStream in = new OSSInputStream(mockClient, uri)) { + byte[] buf = new byte[data.length]; + ByteStreams.readFully(in, buf); + assertThat(buf).isEqualTo(data); + } + } + + @Test + public void testSetSkipSize() throws Exception { + byte[] data = randomData(1024); + int readSize = 64; + OSSURI uri = randomURI(); + + InputStream noAvailableStream = + new FilterInputStream(new ByteArrayInputStream(data)) { + @Override + public int available() { + return 0; + } + }; + + OSSObject firstObject = mockOSSObject(noAvailableStream); + OSSObject secondObject = + mockOSSObject( + new ByteArrayInputStream(Arrays.copyOfRange(data, readSize + 1, data.length))); + + OSS mockClient = mock(OSS.class); + when(mockClient.getObject(any(GetObjectRequest.class))) + .thenReturn(firstObject) + .thenReturn(secondObject); + + try (OSSInputStream in = new OSSInputStream(mockClient, uri)) { + in.setSkipSize(0); + byte[] buf = new byte[readSize]; + ByteStreams.readFully(in, buf); + assertThat(buf).isEqualTo(Arrays.copyOfRange(data, 0, readSize)); + + // skipSize=0 and available()=0 force new stream open for any forward seek + in.seek(readSize + 1); + assertThat(in.read()).isEqualTo(Byte.toUnsignedInt(data[readSize + 1])); + verify(mockClient, times(2)).getObject(any(GetObjectRequest.class)); + } + } + + @Test + public void testRetryExhausted() throws Exception { + OSSURI uri = randomURI(); + OSSObject failObject = mockOSSObject(socketTimeoutStream()); + + OSS mockClient = mock(OSS.class); + when(mockClient.getObject(any(GetObjectRequest.class))).thenReturn(failObject); + + try (OSSInputStream in = new OSSInputStream(mockClient, uri)) { + // single-byte read exhausts all retries + assertThatThrownBy(() -> in.read()) + .isInstanceOf(SocketTimeoutException.class) + .hasMessage("timeout"); + + // bulk read also exhausts all retries + byte[] buf = new byte[64]; + assertThatThrownBy(() -> in.read(buf, 0, buf.length)) + .isInstanceOf(SocketTimeoutException.class) + .hasMessage("timeout"); + } + } + + @Test + public void testForcedCloseException() throws Exception { + byte[] data = randomData(128); + OSSURI uri = randomURI(); + + OSSObject object = mockOSSObject(new ByteArrayInputStream(data)); + doThrow(new IOException("forcedClose failed")).when(object).forcedClose(); + + OSS mockClient = mock(OSS.class); + when(mockClient.getObject(any(GetObjectRequest.class))).thenReturn(object); + + // forcedClose failure is swallowed, close succeeds + try (OSSInputStream in = new OSSInputStream(mockClient, uri)) { + assertThat(in.read()).isEqualTo(Byte.toUnsignedInt(data[0])); + } + } + + @Test + public void testCloseThrowsInNonQuietMode() throws Exception { + byte[] data = randomData(128); + OSSURI uri = randomURI(); + + OSSObject object = mockOSSObject(new ByteArrayInputStream(data)); + doThrow(new IOException("close failed")).when(object).close(); + + OSS mockClient = mock(OSS.class); + when(mockClient.getObject(any(GetObjectRequest.class))).thenReturn(object); + + OSSInputStream in = new OSSInputStream(mockClient, uri); + in.read(); + assertThatThrownBy(in::close).isInstanceOf(IOException.class).hasMessage("close failed"); + } + + private OSSURI randomURI() { + return new OSSURI(location(UUID.randomUUID() + ".dat")); + } + + private OSSInputStream newInputStream(byte[] data) { + OSSURI uri = randomURI(); + writeOSSData(uri, data); + return new OSSInputStream(ossClient().get(), uri); + } + + private static OSSObject mockOSSObject(InputStream stream) { + OSSObject object = mock(OSSObject.class); + when(object.getObjectContent()).thenReturn(stream); + return object; + } + + private static InputStream socketTimeoutStream() { + return new InputStream() { + @Override + public int read() throws IOException { + throw new SocketTimeoutException("timeout"); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + throw new SocketTimeoutException("timeout"); + } + }; + } + private byte[] randomData(int size) { byte[] data = new byte[size]; random.nextBytes(data); diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMock.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMock.java index 7894c1857d55..85152b2b88ca 100644 --- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMock.java +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMock.java @@ -224,8 +224,8 @@ private void getObject(String bucketName, String objectName, HttpExchange httpEx long bytesToRead = Math.min(fileSize - 1, rangeEnd) - rangeStart + 1; long skipSize = rangeStart; if (rangeStart == -1) { - bytesToRead = Math.min(fileSize - 1, rangeEnd); - skipSize = fileSize - rangeEnd; + bytesToRead = Math.min(fileSize, rangeEnd); + skipSize = Math.max(0, fileSize - rangeEnd); } if (rangeEnd == -1) { bytesToRead = fileSize - rangeStart; diff --git a/build.gradle b/build.gradle index 4aeb1eb93a67..3deebb3bf7a3 100644 --- a/build.gradle +++ b/build.gradle @@ -488,6 +488,7 @@ project(':iceberg-aliyun') { compileOnly libs.aliyun.sdk.oss compileOnly libs.aliyun.credentials.java compileOnly libs.aliyun.tea + implementation libs.failsafe compileOnly libs.jaxb.api compileOnly libs.activation compileOnly libs.jaxb.runtime