/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution.init;

import com.hazelcast.cluster.Address;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.partition.IPartitionService;
import com.hazelcast.internal.util.ConcurrencyUtil;
import com.hazelcast.internal.util.executor.ManagedExecutorService;
import com.hazelcast.jet.config.EdgeConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.function.RunnableEx;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobClassLoaderService;
import com.hazelcast.jet.impl.deployment.JetDelegatingClassLoader;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.execution.init.EdgeDef;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.execution.init.VertexDef;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.PrefixedLogger;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.security.auth.Subject;

public final class ExecutionPlanBuilder {
    private ExecutionPlanBuilder() {
    }

    public static CompletableFuture<Map<MemberInfo, ExecutionPlan>> createExecutionPlans(NodeEngineImpl nodeEngine, List<MemberInfo> memberInfos, DAG dag, long jobId, long executionId, JobConfig jobConfig, long lastSnapshotId, boolean isLightJob, Subject subject) {
        VerticesIdAndOrder verticesIdAndOrder = VerticesIdAndOrder.assignVertexIds(dag);
        int defaultParallelism = nodeEngine.getConfig().getJetConfig().getCooperativeThreadCount();
        Map<MemberInfo, int[]> partitionsByMember = ExecutionPlanBuilder.getPartitionAssignment(nodeEngine, memberInfos);
        Map<Address, int[]> partitionsByAddress = partitionsByMember.entrySet().stream().collect(Collectors.toMap(en -> ((MemberInfo)en.getKey()).getAddress(), Map.Entry::getValue));
        List<Address> addresses = Util.toList(partitionsByMember.keySet(), MemberInfo::getAddress);
        int clusterSize = partitionsByMember.size();
        boolean isJobDistributed = clusterSize > 1;
        EdgeConfig defaultEdgeConfig = nodeEngine.getConfig().getJetConfig().getDefaultEdgeConfig();
        HashMap<MemberInfo, ExecutionPlan> plans = new HashMap<MemberInfo, ExecutionPlan>();
        int memberIndex = 0;
        for (MemberInfo member : partitionsByMember.keySet()) {
            plans.put(member, new ExecutionPlan(partitionsByAddress, jobConfig, lastSnapshotId, memberIndex++, clusterSize, isLightJob, subject, verticesIdAndOrder.count()));
        }
        ManagedExecutorService initOffloadExecutor = nodeEngine.getExecutionService().getExecutor("hz:jet-job-offloadable");
        CompletableFuture[] futures = new CompletableFuture[verticesIdAndOrder.count()];
        for (VertexIdPos entry : verticesIdAndOrder) {
            Vertex vertex = dag.getVertex(entry.vertexName);
            assert (vertex != null);
            ProcessorMetaSupplier metaSupplier = vertex.getMetaSupplier();
            int vertexId = entry.vertexId;
            int localParallelism = vertex.determineLocalParallelism(defaultParallelism);
            int totalParallelism = localParallelism * clusterSize;
            List<EdgeDef> inbound = ExecutionPlanBuilder.toEdgeDefs(dag.getInboundEdges(vertex.getName()), defaultEdgeConfig, e -> verticesIdAndOrder.idByName(e.getSourceName()), isJobDistributed);
            List<EdgeDef> outbound = ExecutionPlanBuilder.toEdgeDefs(dag.getOutboundEdges(vertex.getName()), defaultEdgeConfig, e -> verticesIdAndOrder.idByName(e.getDestName()), isJobDistributed);
            String prefix = PrefixedLogger.prefix(jobConfig.getName(), jobId, vertex.getName(), "#PMS");
            ILogger logger = PrefixedLogger.prefixedLogger(nodeEngine.getLogger(metaSupplier.getClass()), prefix);
            RunnableEx action = () -> {
                JetServiceBackend jetBackend = (JetServiceBackend)nodeEngine.getService("hz:impl:jetService");
                JobClassLoaderService jobClassLoaderService = jetBackend.getJobClassLoaderService();
                JetDelegatingClassLoader processorClassLoader = jobClassLoaderService.getClassLoader(jobId);
                try {
                    Util.doWithClassLoader((ClassLoader)processorClassLoader, () -> metaSupplier.init(new Contexts.MetaSupplierCtx(nodeEngine, jobId, executionId, jobConfig, logger, vertex.getName(), localParallelism, totalParallelism, clusterSize, isLightJob, partitionsByAddress, subject, processorClassLoader)));
                }
                catch (Exception e) {
                    throw ExceptionUtil.sneakyThrow(ExceptionUtil.peel(e));
                }
                Function procSupplierFn = Util.doWithClassLoader((ClassLoader)processorClassLoader, () -> metaSupplier.get(addresses));
                for (Map.Entry e : plans.entrySet()) {
                    ProcessorSupplier processorSupplier = Util.doWithClassLoader((ClassLoader)processorClassLoader, () -> (ProcessorSupplier)procSupplierFn.apply(((MemberInfo)e.getKey()).getAddress()));
                    if (!isLightJob) {
                        Util.checkSerializable(processorSupplier, "ProcessorSupplier in vertex '" + vertex.getName() + '\'');
                    }
                    VertexDef vertexDef = new VertexDef(vertexId, vertex.getName(), processorSupplier, localParallelism);
                    vertexDef.addInboundEdges(inbound);
                    vertexDef.addOutboundEdges(outbound);
                    ((ExecutionPlan)e.getValue()).setVertex(entry.requiredPosition, vertexDef);
                }
            };
            Executor executor = metaSupplier.initIsCooperative() ? ConcurrencyUtil.CALLER_RUNS : initOffloadExecutor;
            futures[((VertexIdPos)entry).requiredPosition] = CompletableFuture.runAsync(action, executor);
        }
        return CompletableFuture.allOf(futures).thenCompose(r -> CompletableFuture.completedFuture(plans));
    }

    private static List<EdgeDef> toEdgeDefs(List<Edge> edges, EdgeConfig defaultEdgeConfig, Function<Edge, Integer> oppositeVtxId, boolean isJobDistributed) {
        ArrayList<EdgeDef> list = new ArrayList<EdgeDef>(edges.size());
        for (Edge edge : edges) {
            list.add(new EdgeDef(edge, edge.getConfig() == null ? defaultEdgeConfig : edge.getConfig(), oppositeVtxId.apply(edge), isJobDistributed));
        }
        return list;
    }

    public static Map<MemberInfo, int[]> getPartitionAssignment(NodeEngine nodeEngine, List<MemberInfo> memberList) {
        IPartitionService partitionService = nodeEngine.getPartitionService();
        HashMap<Address, MemberInfo> membersByAddress = new HashMap<Address, MemberInfo>();
        for (MemberInfo memberInfo : memberList) {
            membersByAddress.put(memberInfo.getAddress(), memberInfo);
        }
        HashMap<MemberInfo, FixedCapacityIntArrayList> partitionsForMember = new HashMap<MemberInfo, FixedCapacityIntArrayList>();
        int partitionCount = partitionService.getPartitionCount();
        int memberIndex = 0;
        for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
            Address address = partitionService.getPartitionOwnerOrWait(partitionId);
            MemberInfo member = (MemberInfo)membersByAddress.get(address);
            if (member == null) {
                member = memberList.get(memberIndex++ % memberList.size());
            }
            partitionsForMember.computeIfAbsent(member, ignored -> new FixedCapacityIntArrayList(partitionCount)).add(partitionId);
        }
        HashMap<MemberInfo, int[]> partitionAssignment = new HashMap<MemberInfo, int[]>();
        for (Map.Entry memberWithPartitions : partitionsForMember.entrySet()) {
            partitionAssignment.put((MemberInfo)memberWithPartitions.getKey(), ((FixedCapacityIntArrayList)memberWithPartitions.getValue()).asArray());
        }
        return partitionAssignment;
    }

    static class FixedCapacityIntArrayList {
        private int[] elements;
        private int size;

        FixedCapacityIntArrayList(int capacity) {
            this.elements = new int[capacity];
        }

        void add(int element) {
            this.elements[this.size++] = element;
        }

        int[] asArray() {
            int[] result = this.size == this.elements.length ? this.elements : Arrays.copyOfRange(this.elements, 0, this.size);
            this.elements = null;
            return result;
        }
    }

    private static final class VertexIdPos {
        private final int vertexId;
        private final String vertexName;
        private final int requiredPosition;

        private VertexIdPos(int vertexId, String vertexName, int position) {
            this.vertexId = vertexId;
            this.vertexName = vertexName;
            this.requiredPosition = position;
        }
    }

    private static final class VerticesIdAndOrder
    implements Iterable<VertexIdPos> {
        private final LinkedHashMap<String, Integer> vertexIdMap;
        private final HashMap<Integer, Integer> vertexPosById;

        private VerticesIdAndOrder(LinkedHashMap<String, Integer> vertexIdMap) {
            this.vertexIdMap = vertexIdMap;
            int index = 0;
            this.vertexPosById = new LinkedHashMap<Integer, Integer>(vertexIdMap.size());
            for (Integer vertexId : vertexIdMap.values()) {
                this.vertexPosById.put(vertexId, index++);
            }
        }

        private Integer idByName(String vertexName) {
            return this.vertexIdMap.get(vertexName);
        }

        private static VerticesIdAndOrder assignVertexIds(DAG dag) {
            LinkedHashMap<String, Integer> vertexIdMap = new LinkedHashMap<String, Integer>();
            int[] vertexId = new int[]{0};
            dag.forEach(v -> {
                int n = vertexId[0];
                vertexId[0] = n + 1;
                vertexIdMap.put(v.getName(), n);
            });
            return new VerticesIdAndOrder(vertexIdMap);
        }

        private int count() {
            return this.vertexIdMap.size();
        }

        @Override
        @Nonnull
        public Iterator<VertexIdPos> iterator() {
            return this.vertexIdMap.entrySet().stream().map(e -> new VertexIdPos((Integer)e.getValue(), (String)e.getKey(), this.vertexPosById.get(e.getValue()))).iterator();
        }
    }
}

