/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.util;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.nio.Bits;
import com.hazelcast.internal.partition.IPartitionService;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.impl.HeapData;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.execution.SnapshotContext;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.util.AsyncSnapshotWriter;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.partition.PartitionAware;
import com.hazelcast.spi.impl.NodeEngine;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;

public class AsyncSnapshotWriterImpl
implements AsyncSnapshotWriter {
    public static final int DEFAULT_CHUNK_SIZE = 131072;
    final int usableChunkCapacity;
    final byte[] serializedByteArrayHeader = new byte[12];
    final byte[] valueTerminator;
    final AtomicInteger numConcurrentAsyncOps;
    private final IPartitionService partitionService;
    private final CustomByteArrayOutputStream[] buffers;
    private final int[] partitionKeys;
    private int partitionSequence;
    private final ILogger logger;
    private final NodeEngine nodeEngine;
    private final boolean useBigEndian;
    private final SnapshotContext snapshotContext;
    private final String vertexName;
    private final int memberCount;
    private IMap<SnapshotDataKey, Object> currentMap;
    private long currentSnapshotId;
    private final AtomicReference<Throwable> firstError = new AtomicReference();
    private final AtomicInteger numActiveFlushes = new AtomicInteger();
    private long totalKeys;
    private long totalChunks;
    private long totalPayloadBytes;
    private final BiConsumer<Object, Throwable> putResponseConsumer = this::consumePutResponse;

    public AsyncSnapshotWriterImpl(NodeEngine nodeEngine, SnapshotContext snapshotContext, String vertexName, int memberIndex, int memberCount, InternalSerializationService serializationService) {
        this(131072, nodeEngine, snapshotContext, vertexName, memberIndex, memberCount, serializationService);
    }

    AsyncSnapshotWriterImpl(int chunkSize, NodeEngine nodeEngine, SnapshotContext snapshotContext, String vertexName, int memberIndex, int memberCount, InternalSerializationService serializationService) {
        if (Integer.bitCount(chunkSize) != 1) {
            throw new IllegalArgumentException("chunkSize must be a power of two, but is " + chunkSize);
        }
        this.nodeEngine = nodeEngine;
        this.partitionService = nodeEngine.getPartitionService();
        this.logger = nodeEngine.getLogger(this.getClass());
        this.snapshotContext = snapshotContext;
        this.vertexName = vertexName;
        this.memberCount = memberCount;
        this.currentSnapshotId = snapshotContext.currentSnapshotId();
        this.useBigEndian = serializationService.getByteOrder().equals(ByteOrder.BIG_ENDIAN);
        Bits.writeInt(this.serializedByteArrayHeader, 4, -12, true);
        this.buffers = AsyncSnapshotWriterImpl.createAndInitBuffers(chunkSize, this.partitionService.getPartitionCount(), this.serializedByteArrayHeader);
        JetServiceBackend jetServiceBackend = (JetServiceBackend)nodeEngine.getService("hz:impl:jetService");
        this.partitionKeys = jetServiceBackend.getSharedPartitionKeys();
        this.partitionSequence = memberIndex;
        this.numConcurrentAsyncOps = jetServiceBackend.numConcurrentAsyncOps();
        byte[] valueTerminatorWithHeader = serializationService.toData(SnapshotDataValueTerminator.INSTANCE).toByteArray();
        this.valueTerminator = Arrays.copyOfRange(valueTerminatorWithHeader, 4, valueTerminatorWithHeader.length);
        this.usableChunkCapacity = chunkSize - this.valueTerminator.length - this.serializedByteArrayHeader.length;
        if (this.usableChunkCapacity <= 0) {
            throw new IllegalArgumentException("too small chunk size: " + chunkSize);
        }
    }

    private static CustomByteArrayOutputStream[] createAndInitBuffers(int chunkSize, int partitionCount, byte[] serializedByteArrayHeader) {
        CustomByteArrayOutputStream[] buffers = new CustomByteArrayOutputStream[partitionCount];
        for (int i = 0; i < buffers.length; ++i) {
            buffers[i] = new CustomByteArrayOutputStream(chunkSize);
            buffers[i].write(serializedByteArrayHeader, 0, serializedByteArrayHeader.length);
        }
        return buffers;
    }

    private void consumePutResponse(Object response, Throwable throwable) {
        try {
            assert (response == null) : "put operation overwrote a previous value: " + response;
        }
        catch (AssertionError e) {
            throwable = e;
        }
        if (throwable != null) {
            this.logger.severe("Error writing to snapshot map", (Throwable)throwable);
            this.firstError.compareAndSet(null, (Throwable)throwable);
        }
        this.numActiveFlushes.decrementAndGet();
        this.numConcurrentAsyncOps.decrementAndGet();
    }

    @Override
    @CheckReturnValue
    public boolean offer(Map.Entry<? extends Data, ? extends Data> entry) {
        int partitionId = this.partitionService.getPartitionId(entry.getKey());
        int length = entry.getKey().totalSize() + entry.getValue().totalSize() - 8;
        if (length > this.usableChunkCapacity) {
            return this.putAsyncToMap(partitionId, () -> {
                byte[] data = new byte[this.serializedByteArrayHeader.length + length + this.valueTerminator.length];
                ++this.totalKeys;
                int offset = 0;
                System.arraycopy(this.serializedByteArrayHeader, 0, data, offset, this.serializedByteArrayHeader.length);
                Bits.writeInt(data, offset += this.serializedByteArrayHeader.length - 4, length + this.valueTerminator.length, this.useBigEndian);
                this.copyWithoutHeader((Data)entry.getKey(), data, offset += 4);
                this.copyWithoutHeader((Data)entry.getValue(), data, offset += ((Data)entry.getKey()).totalSize() - 4);
                System.arraycopy(this.valueTerminator, 0, data, offset += ((Data)entry.getValue()).totalSize() - 4, this.valueTerminator.length);
                return new HeapData(data);
            });
        }
        CustomByteArrayOutputStream buffer = this.buffers[partitionId];
        if (buffer.size() + length + this.valueTerminator.length > buffer.capacityLimit && !this.flushPartition(partitionId)) {
            return false;
        }
        this.writeWithoutHeader(entry.getKey(), buffer);
        this.writeWithoutHeader(entry.getValue(), buffer);
        ++this.totalKeys;
        return true;
    }

    private void copyWithoutHeader(Data src, byte[] dst, int dstOffset) {
        byte[] bytes = src.toByteArray();
        System.arraycopy(bytes, 4, dst, dstOffset, bytes.length - 4);
    }

    private void writeWithoutHeader(Data src, OutputStream dst) {
        byte[] bytes = src.toByteArray();
        try {
            dst.write(bytes, 4, bytes.length - 4);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @CheckReturnValue
    private boolean flushPartition(int partitionId) {
        return this.containsOnlyHeader(this.buffers[partitionId]) || this.putAsyncToMap(partitionId, () -> this.getBufferContentsAndClear(this.buffers[partitionId]));
    }

    private boolean containsOnlyHeader(CustomByteArrayOutputStream buffer) {
        return buffer.size() == this.serializedByteArrayHeader.length;
    }

    private Data getBufferContentsAndClear(CustomByteArrayOutputStream buffer) {
        buffer.write(this.valueTerminator, 0, this.valueTerminator.length);
        byte[] data = buffer.toByteArray();
        this.updateSerializedBytesLength(data);
        buffer.reset();
        buffer.write(this.serializedByteArrayHeader, 0, this.serializedByteArrayHeader.length);
        return new HeapData(data);
    }

    private void updateSerializedBytesLength(byte[] data) {
        Bits.writeInt(data, 8, data.length - this.serializedByteArrayHeader.length, this.useBigEndian);
    }

    @CheckReturnValue
    private boolean putAsyncToMap(int partitionId, Supplier<Data> dataSupplier) {
        if (!this.initCurrentMap()) {
            return false;
        }
        if (!Util.tryIncrement(this.numConcurrentAsyncOps, 1, 1000)) {
            return false;
        }
        try {
            Data data = dataSupplier.get();
            this.totalPayloadBytes += (long)data.dataSize();
            ++this.totalChunks;
            CompletableFuture<Object> future = this.currentMap.putAsync(new SnapshotDataKey(this.partitionKeys[partitionId], this.currentSnapshotId, this.vertexName, this.partitionSequence), data).toCompletableFuture();
            this.partitionSequence += this.memberCount;
            future.whenComplete((BiConsumer)this.putResponseConsumer);
            this.numActiveFlushes.incrementAndGet();
        }
        catch (HazelcastInstanceNotActiveException ignored) {
            return false;
        }
        return true;
    }

    private boolean initCurrentMap() {
        if (this.currentMap == null) {
            String mapName = this.snapshotContext.currentMapName();
            if (mapName == null) {
                return false;
            }
            this.currentMap = JobRepository.safeImap(this.nodeEngine.getHazelcastInstance().getMap(mapName));
            this.currentSnapshotId = this.snapshotContext.currentSnapshotId();
        }
        return true;
    }

    @Override
    @CheckReturnValue
    public boolean flushAndResetMap() {
        if (!this.initCurrentMap()) {
            return false;
        }
        for (int i = 0; i < this.buffers.length; ++i) {
            if (this.flushPartition(i)) continue;
            return false;
        }
        this.currentMap = null;
        if (this.logger.isFineEnabled()) {
            this.logger.fine(String.format("Stats for %s: keys=%,d, chunks=%,d, bytes=%,d", this.vertexName, this.totalKeys, this.totalChunks, this.totalPayloadBytes));
        }
        return true;
    }

    @Override
    public void resetStats() {
        this.totalPayloadBytes = 0L;
        this.totalChunks = 0L;
        this.totalKeys = 0L;
    }

    @Override
    public boolean hasPendingAsyncOps() {
        return this.numActiveFlushes.get() > 0;
    }

    @Override
    public Throwable getError() {
        return this.firstError.getAndSet(null);
    }

    @Override
    public boolean isEmpty() {
        return this.numActiveFlushes.get() == 0 && Arrays.stream(this.buffers).allMatch(this::containsOnlyHeader);
    }

    int partitionKey(int partitionId) {
        return this.partitionKeys[partitionId];
    }

    @Override
    public long getTotalPayloadBytes() {
        return this.totalPayloadBytes;
    }

    @Override
    public long getTotalKeys() {
        return this.totalKeys;
    }

    @Override
    public long getTotalChunks() {
        return this.totalChunks;
    }

    static class CustomByteArrayOutputStream
    extends OutputStream {
        private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
        private byte[] data;
        private int size;
        private int capacityLimit;

        CustomByteArrayOutputStream(int capacityLimit) {
            this.capacityLimit = capacityLimit;
            this.data = EMPTY_BYTE_ARRAY;
        }

        @Override
        public void write(int b) {
            this.ensureCapacity(this.size + 1);
            this.data[this.size] = (byte)b;
            ++this.size;
        }

        @Override
        public void write(@Nonnull byte[] b, int off, int len) {
            if (off < 0 || off > b.length || len < 0 || off + len - b.length > 0) {
                throw new IndexOutOfBoundsException("off=" + off + ", len=" + len);
            }
            this.ensureCapacity(this.size + len);
            System.arraycopy(b, off, this.data, this.size, len);
            this.size += len;
        }

        private void ensureCapacity(int minCapacity) {
            if (minCapacity - this.data.length > 0) {
                int newCapacity = this.data.length;
                while ((newCapacity = Math.max(1, newCapacity << 1)) - minCapacity < 0) {
                }
                if (newCapacity - this.capacityLimit > 0) {
                    throw new IllegalStateException("buffer full");
                }
                this.data = Arrays.copyOf(this.data, newCapacity);
            }
        }

        void reset() {
            this.size = 0;
        }

        @Nonnull
        byte[] toByteArray() {
            return Arrays.copyOf(this.data, this.size);
        }

        int size() {
            return this.size;
        }
    }

    public static final class SnapshotDataValueTerminator
    implements IdentifiedDataSerializable {
        public static final IdentifiedDataSerializable INSTANCE = new SnapshotDataValueTerminator();

        private SnapshotDataValueTerminator() {
        }

        @Override
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override
        public int getClassId() {
            return 24;
        }

        @Override
        public void writeData(ObjectDataOutput out) {
        }

        @Override
        public void readData(ObjectDataInput in) {
        }
    }

    public static final class SnapshotDataKey
    implements IdentifiedDataSerializable,
    PartitionAware {
        private int partitionKey;
        private long snapshotId;
        private String vertexName;
        private int sequence;

        public SnapshotDataKey() {
        }

        public SnapshotDataKey(int partitionKey, long snapshotId, String vertexName, int sequence) {
            this.partitionKey = partitionKey;
            this.snapshotId = snapshotId;
            this.vertexName = vertexName;
            this.sequence = sequence;
        }

        public Object getPartitionKey() {
            return this.partitionKey;
        }

        public long snapshotId() {
            return this.snapshotId;
        }

        public String vertexName() {
            return this.vertexName;
        }

        public String toString() {
            return "SnapshotDataKey{partitionKey=" + this.partitionKey + ", snapshotId=" + this.snapshotId + ", vertexName='" + this.vertexName + '\'' + ", sequence=" + this.sequence + '}';
        }

        @Override
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override
        public int getClassId() {
            return 23;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeInt(this.partitionKey);
            out.writeLong(this.snapshotId);
            out.writeString(this.vertexName);
            out.writeInt(this.sequence);
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            this.partitionKey = in.readInt();
            this.snapshotId = in.readLong();
            this.vertexName = in.readString();
            this.sequence = in.readInt();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SnapshotDataKey that = (SnapshotDataKey)o;
            return this.partitionKey == that.partitionKey && this.snapshotId == that.snapshotId && this.sequence == that.sequence && Objects.equals(this.vertexName, that.vertexName);
        }

        public int hashCode() {
            return Objects.hash(this.partitionKey, this.snapshotId, this.vertexName, this.sequence);
        }
    }
}

