/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.repositories.s3.async;

import com.jcraft.jzlib.JZlib;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.exception.CorruptFileException;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.async.AsyncPartsHandler;
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.io.CheckedContainer;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.CompletableFutureUtils;

public final class AsyncTransferManager {
    private static final Logger log = LogManager.getLogger(AsyncTransferManager.class);
    private final ExecutorService executorService;
    private final ExecutorService priorityExecutorService;
    private final long minimumPartSize;
    private static final long MAX_UPLOAD_PARTS = 10000L;

    public AsyncTransferManager(long minimumPartSize, ExecutorService executorService, ExecutorService priorityExecutorService) {
        this.executorService = executorService;
        this.priorityExecutorService = priorityExecutorService;
        this.minimumPartSize = minimumPartSize;
    }

    public CompletableFuture<Void> uploadObject(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext) {
        CompletableFuture<Void> returnFuture = new CompletableFuture<Void>();
        try {
            if (streamContext.getNumberOfParts() == 1) {
                log.debug(() -> "Starting the upload as a single upload part request");
                this.uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture);
            } else {
                log.debug(() -> "Starting the upload as multipart upload request");
                this.uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture);
            }
        }
        catch (Throwable throwable) {
            returnFuture.completeExceptionally(throwable);
        }
        return returnFuture;
    }

    private void uploadInParts(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext, CompletableFuture<Void> returnFuture) {
        CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = CreateMultipartUploadRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey());
        if (uploadRequest.doRemoteDataIntegrityCheck()) {
            createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
        }
        CompletableFuture createMultipartUploadFuture = SocketAccess.doPrivileged(() -> s3AsyncClient.createMultipartUpload((CreateMultipartUploadRequest)createMultipartUploadRequestBuilder.build()));
        CompletableFutureUtils.forwardExceptionTo(returnFuture, (CompletableFuture)createMultipartUploadFuture);
        createMultipartUploadFuture.whenComplete((createMultipartUploadResponse, throwable) -> {
            if (throwable != null) {
                AsyncTransferManager.handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable);
            } else {
                log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId());
                this.doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, createMultipartUploadResponse.uploadId());
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doUploadInParts(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext, CompletableFuture<Void> returnFuture, String uploadId) {
        List<CompletableFuture<CompletedPart>> futures;
        AtomicReferenceArray<CompletedPart> completedParts = new AtomicReferenceArray<CompletedPart>(streamContext.getNumberOfParts());
        AtomicReferenceArray<CheckedContainer> inputStreamContainers = new AtomicReferenceArray<CheckedContainer>(streamContext.getNumberOfParts());
        try {
            futures = AsyncPartsHandler.uploadParts(s3AsyncClient, this.executorService, this.priorityExecutorService, uploadRequest, streamContext, uploadId, completedParts, inputStreamContainers);
        }
        catch (Exception ex) {
            try {
                AsyncPartsHandler.cleanUpParts(s3AsyncClient, uploadRequest, uploadId);
            }
            finally {
                returnFuture.completeExceptionally(ex);
            }
            return;
        }
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)CompletableFutureUtils.allOfExceptionForwarded((CompletableFuture[])((CompletableFuture[])futures.toArray(CompletableFuture[]::new))).thenApply(resp -> {
            try {
                uploadRequest.getUploadFinalizer().accept((Object)true);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return resp;
        })).thenApply(ignore -> {
            if (uploadRequest.doRemoteDataIntegrityCheck()) {
                this.mergeAndVerifyChecksum(inputStreamContainers, uploadRequest.getKey(), uploadRequest.getExpectedChecksum());
            }
            return null;
        })).thenCompose(ignore -> this.completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts))).handle(this.handleExceptionOrResponse(s3AsyncClient, uploadRequest, returnFuture, uploadId))).exceptionally(throwable -> {
            AsyncTransferManager.handleException(returnFuture, () -> "Unexpected exception occurred", throwable);
            return null;
        });
    }

    private void mergeAndVerifyChecksum(AtomicReferenceArray<CheckedContainer> inputStreamContainers, String fileName, long expectedChecksum) {
        long resultantChecksum = AsyncTransferManager.fromBase64String(inputStreamContainers.get(0).getChecksum());
        for (int index = 1; index < inputStreamContainers.length(); ++index) {
            long curChecksum = AsyncTransferManager.fromBase64String(inputStreamContainers.get(index).getChecksum());
            resultantChecksum = JZlib.crc32_combine((long)resultantChecksum, (long)curChecksum, (long)inputStreamContainers.get(index).getContentLength());
        }
        if (resultantChecksum != expectedChecksum) {
            throw new RuntimeException((Throwable)new CorruptFileException("File level checksums didn't match combined part checksums", fileName));
        }
    }

    private BiFunction<CompleteMultipartUploadResponse, Throwable, Void> handleExceptionOrResponse(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, CompletableFuture<Void> returnFuture, String uploadId) {
        return (response, throwable) -> {
            if (throwable != null) {
                AsyncPartsHandler.cleanUpParts(s3AsyncClient, uploadRequest, uploadId);
                AsyncTransferManager.handleException(returnFuture, () -> "Failed to send multipart upload requests.", throwable);
            } else {
                returnFuture.complete(null);
            }
            return null;
        };
    }

    private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUpload(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, String uploadId, AtomicReferenceArray<CompletedPart> completedParts) {
        log.debug(() -> new ParameterizedMessage("Sending completeMultipartUploadRequest, uploadId: {}", (Object)uploadId));
        CompletedPart[] parts = (CompletedPart[])IntStream.range(0, completedParts.length()).mapToObj(completedParts::get).toArray(CompletedPart[]::new);
        CompleteMultipartUploadRequest completeMultipartUploadRequest = (CompleteMultipartUploadRequest)CompleteMultipartUploadRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey()).uploadId(uploadId).multipartUpload((CompletedMultipartUpload)CompletedMultipartUpload.builder().parts(parts).build()).build();
        return SocketAccess.doPrivileged(() -> s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest));
    }

    private static String base64StringFromLong(Long val) {
        return Base64.getEncoder().encodeToString(Arrays.copyOfRange(ByteUtils.toByteArrayBE((long)val), 4, 8));
    }

    private static long fromBase64String(String base64String) {
        byte[] decodedBytes = Base64.getDecoder().decode(base64String);
        if (decodedBytes.length != 4) {
            throw new IllegalArgumentException("Invalid Base64 encoded CRC32 checksum");
        }
        long result = 0L;
        for (int i = 0; i < 4; ++i) {
            result <<= 8;
            result |= (long)(decodedBytes[i] & 0xFF);
        }
        return result;
    }

    private static void handleException(CompletableFuture<Void> returnFuture, Supplier<String> message, Throwable throwable) {
        Throwable cause;
        Throwable throwable2 = cause = throwable instanceof CompletionException ? throwable.getCause() : throwable;
        if (cause instanceof Error) {
            returnFuture.completeExceptionally(cause);
        } else {
            SdkClientException exception = SdkClientException.create((String)message.get(), (Throwable)cause);
            returnFuture.completeExceptionally((Throwable)exception);
        }
    }

    public long calculateOptimalPartSize(long contentLengthOfSource) {
        if (contentLengthOfSource < ByteSizeUnit.MB.toBytes(5L)) {
            return contentLengthOfSource;
        }
        double optimalPartSize = (double)contentLengthOfSource / 10000.0;
        optimalPartSize = Math.ceil(optimalPartSize);
        return (long)Math.max(optimalPartSize, (double)this.minimumPartSize);
    }

    private void uploadInOneChunk(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, InputStreamContainer inputStreamContainer, CompletableFuture<Void> returnFuture) {
        PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey()).contentLength(Long.valueOf(uploadRequest.getContentLength()));
        if (uploadRequest.doRemoteDataIntegrityCheck()) {
            putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
            putObjectRequestBuilder.checksumCRC32(AsyncTransferManager.base64StringFromLong(uploadRequest.getExpectedChecksum()));
        }
        ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH ? this.priorityExecutorService : this.executorService;
        CompletableFuture putObjectFuture = SocketAccess.doPrivileged(() -> ((CompletableFuture)s3AsyncClient.putObject((PutObjectRequest)putObjectRequestBuilder.build(), AsyncRequestBody.fromInputStream((InputStream)inputStreamContainer.getInputStream(), (Long)inputStreamContainer.getContentLength(), (ExecutorService)streamReadExecutor)).handle((resp, throwable) -> {
            if (throwable != null) {
                S3Exception s3Exception;
                Throwable unwrappedThrowable = ExceptionsHelper.unwrap((Throwable)throwable, (Class[])new Class[]{S3Exception.class});
                if (unwrappedThrowable != null && (s3Exception = (S3Exception)unwrappedThrowable).statusCode() == 400 && "BadDigest".equals(s3Exception.awsErrorDetails().errorCode())) {
                    throw new RuntimeException((Throwable)new CorruptFileException((Throwable)s3Exception, uploadRequest.getKey()));
                }
                returnFuture.completeExceptionally((Throwable)throwable);
            } else {
                try {
                    uploadRequest.getUploadFinalizer().accept((Object)true);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
                returnFuture.complete(null);
            }
            return null;
        })).handle((resp, throwable) -> {
            if (throwable != null) {
                this.deleteUploadedObject(s3AsyncClient, uploadRequest);
                returnFuture.completeExceptionally((Throwable)throwable);
            }
            return null;
        }));
        CompletableFutureUtils.forwardExceptionTo(returnFuture, (CompletableFuture)putObjectFuture);
        CompletableFutureUtils.forwardResultTo((CompletableFuture)putObjectFuture, returnFuture);
    }

    private void deleteUploadedObject(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest) {
        DeleteObjectRequest deleteObjectRequest = (DeleteObjectRequest)DeleteObjectRequest.builder().bucket(uploadRequest.getBucket()).key(uploadRequest.getKey()).build();
        SocketAccess.doPrivileged(() -> s3AsyncClient.deleteObject(deleteObjectRequest)).exceptionally(throwable -> {
            log.error(() -> new ParameterizedMessage("Failed to delete uploaded object of key {}", (Object)uploadRequest.getKey()), throwable);
            return null;
        });
    }
}

