Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 156 additions & 30 deletions aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,69 @@

import com.aliyun.oss.OSS;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.OSSObject;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.List;
import javax.net.ssl.SSLException;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.metrics.MetricsContext.Unit;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class OSSInputStream extends SeekableInputStream {
class OSSInputStream extends SeekableInputStream implements RangeReadable {
private static final Logger LOG = LoggerFactory.getLogger(OSSInputStream.class);
private static final int SKIP_SIZE = 1024 * 1024;

private static final List<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS =
ImmutableList.of(SSLException.class, SocketTimeoutException.class, SocketException.class);

private final StackTraceElement[] createStack;
private final OSS client;
private final OSSURI uri;

private InputStream stream = null;
private OSSObject currentObject;
private InputStream stream;
private long pos = 0;
private long next = 0;
private boolean closed = false;

private final Counter readBytes;
private final Counter readOperations;

private int skipSize = 1024 * 1024;
private RetryPolicy<Object> retryPolicy =
RetryPolicy.builder()
.handle(RETRYABLE_EXCEPTIONS)
.onRetry(
e -> {
LOG.warn(
"Retrying read from OSS, reopening stream (attempt {})", e.getAttemptCount());
resetForRetry();
})
.onFailure(
e ->
LOG.error(
"Failed to read from OSS input stream after exhausting all retries",
e.getException()))
.withMaxRetries(3)
.build();

OSSInputStream(OSS client, OSSURI uri) {
this(client, uri, MetricsContext.nullMetrics());
}
Expand Down Expand Up @@ -82,89 +115,182 @@ public int read() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

pos += 1;
next += 1;
readBytes.increment();
readOperations.increment();
try {
int bytesRead = Failsafe.with(retryPolicy).get(() -> stream.read());
if (bytesRead != -1) {
pos += 1;
next += 1;
readBytes.increment();
readOperations.increment();
}

return bytesRead;
} catch (FailsafeException ex) {
if (ex.getCause() instanceof IOException) {
throw (IOException) ex.getCause();
}

return stream.read();
throw ex;
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

int bytesRead = stream.read(b, off, len);
pos += bytesRead;
next += bytesRead;
readBytes.increment(bytesRead);
readOperations.increment();
try {
int bytesRead = Failsafe.with(retryPolicy).get(() -> stream.read(b, off, len));
if (bytesRead > 0) {
pos += bytesRead;
next += bytesRead;
readBytes.increment(bytesRead);
readOperations.increment();
}

return bytesRead;
} catch (FailsafeException ex) {
if (ex.getCause() instanceof IOException) {
throw (IOException) ex.getCause();
}

return bytesRead;
throw ex;
}
}

@Override
public void close() throws IOException {
if (closed) {
return;
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
Preconditions.checkPositionIndexes(offset, offset + length, buffer.length);

GetObjectRequest request =
new GetObjectRequest(uri.bucket(), uri.key()).withRange(position, position + length - 1);
OSSObject object = client.getObject(request);
try {
IOUtil.readFully(object.getObjectContent(), buffer, offset, length);
readBytes.increment(length);
readOperations.increment();
} finally {
// Use close() instead of forcedClose() here because the range response data has been
// fully consumed. close() goes through InputStream -> EofSensorInputStream path which
// detects EOF and returns the connection to Apache HttpClient's connection pool.
// forcedClose() calls httpResponse.close() -> ConnectionHolder.close() which hardcodes
// releaseConnection(false), always destroying the connection even if data was consumed.
object.close();
}
}

@Override
public int readTail(byte[] buffer, int offset, int length) throws IOException {
Preconditions.checkPositionIndexes(offset, offset + length, buffer.length);

GetObjectRequest request = new GetObjectRequest(uri.bucket(), uri.key()).withRange(-1, length);
OSSObject object = client.getObject(request);
try {
int bytesRead = IOUtil.readRemaining(object.getObjectContent(), buffer, offset, length);
readBytes.increment(bytesRead);
readOperations.increment();
return bytesRead;
} finally {
// Same as readFully: data is fully consumed, use close() to allow connection reuse.
object.close();
}
}

@Override
public void close() throws IOException {
super.close();
closeStream();
closed = true;
closeStream(false);
}

private void positionStream() throws IOException {
if ((stream != null) && (next == pos)) {
// already at specified position.
// already at specified position
return;
}

if ((stream != null) && (next > pos)) {
// seeking forwards
long skip = next - pos;
if (skip <= Math.max(stream.available(), SKIP_SIZE)) {
if (skip <= Math.max(stream.available(), skipSize)) {
// already buffered or seek is small enough
LOG.debug("Read-through seek for {} from {} to offset {}", uri, pos, next);
LOG.debug("Read-through seek for {} to offset {}", uri, next);
try {
ByteStreams.skipFully(stream, skip);
pos = next;
return;
} catch (IOException ignored) {
// will retry by re-opening the stream.
// will retry by re-opening the stream
}
}
}

// close the stream and open at desired position.
// close the stream and open at desired position
LOG.debug("Seek with new stream for {} to offset {}", uri, next);
pos = next;
openStream();
}

private void openStream() throws IOException {
closeStream();
openStream(false);
}

private void openStream(boolean closeQuietly) throws IOException {
GetObjectRequest request = new GetObjectRequest(uri.bucket(), uri.key()).withRange(pos, -1);
stream = client.getObject(request).getObjectContent();

closeStream(closeQuietly);

currentObject = client.getObject(request);
stream = currentObject.getObjectContent();
}

@VisibleForTesting
void resetForRetry() throws IOException {
openStream(true);
}

private void closeStream() throws IOException {
if (stream != null) {
stream.close();
stream = null;
private void closeStream(boolean closeQuietly) throws IOException {
if (currentObject != null) {
// forcedClose() calls httpResponse.close() -> ConnectionHolder.close() which sets
// released=true and closes the socket without consuming remaining TCP data.
// After that, objectContent.close() goes through EofSensorInputStream ->
// ResponseEntityProxy.streamClosed() which detects the connection is already released
// (open=false) and swallows any IOException internally. So close() after forcedClose()
// should not throw. The catch below is a safety net for the edge case where
// forcedClose() itself fails and the connection is still open.
try {
currentObject.forcedClose();
} catch (Exception e) {
LOG.warn("An error occurred while aborting the stream", e);
}

try {
currentObject.close();
} catch (IOException e) {
if (!closeQuietly) {
throw e;
}

LOG.warn("An error occurred while closing the stream", e);
} finally {
currentObject = null;
stream = null;
}
}
}

public void setSkipSize(int skipSize) {
this.skipSize = skipSize;
}

@SuppressWarnings({"checkstyle:NoFinalizer", "Finalize", "deprecation"})
@Override
protected void finalize() throws Throwable {
super.finalize();
if (!closed) {
close(); // releasing resources is more important than printing the warning
String trace = Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
LOG.warn("Unclosed input stream created by: \n\t{}", trace);
LOG.warn("Unclosed input stream created by:\n\t{}", trace);
}
}
}
Loading