/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.internal.tpc;

import com.hazelcast.cluster.Address;
import com.hazelcast.config.AdvancedNetworkConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.EndpointConfig;
import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.config.ServerSocketEndpointConfig;
import com.hazelcast.config.tpc.TpcSocketConfig;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.instance.EndpointQualifier;
import com.hazelcast.internal.tpc.ClientAsyncSocketReader;
import com.hazelcast.internal.tpcengine.Reactor;
import com.hazelcast.internal.tpcengine.TpcEngine;
import com.hazelcast.internal.tpcengine.TpcEngineBuilder;
import com.hazelcast.internal.tpcengine.net.AcceptRequest;
import com.hazelcast.internal.tpcengine.net.AsyncServerSocket;
import com.hazelcast.internal.tpcengine.net.AsyncSocketOptions;
import com.hazelcast.internal.tpcengine.net.AsyncSocketReader;
import com.hazelcast.internal.tpcengine.nio.NioReactorBuilder;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl;
import com.hazelcast.spi.impl.operationexecutor.impl.TpcOperationScheduler;
import com.hazelcast.spi.impl.operationexecutor.impl.TpcPartitionOperationThread;
import com.hazelcast.spi.properties.HazelcastProperty;
import java.io.UncheckedIOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class TpcServerBootstrap {
    public static final HazelcastProperty TPC_ENABLED = new HazelcastProperty("hazelcast.internal.tpc.enabled");
    public static final HazelcastProperty TPC_EVENTLOOP_COUNT = new HazelcastProperty("hazelcast.internal.tpc.eventloop.count");
    private static final int TERMINATE_TIMEOUT_SECONDS = 5;
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final Address thisAddress;
    private TpcEngine tpcEngine;
    private final boolean tcpNoDelay = true;
    private final boolean enabled;
    private final Map<Reactor, Supplier<? extends AsyncSocketReader>> readHandlerSuppliers = new HashMap<Reactor, Supplier<? extends AsyncSocketReader>>();
    private final List<AsyncServerSocket> serverSockets = new ArrayList<AsyncServerSocket>();
    private final Config config;
    private volatile List<Integer> clientPorts;

    public TpcServerBootstrap(NodeEngineImpl nodeEngine) {
        this.nodeEngine = nodeEngine;
        this.logger = nodeEngine.getLogger(TpcServerBootstrap.class);
        this.config = nodeEngine.getConfig();
        this.enabled = this.loadTpcEnabled();
        this.thisAddress = nodeEngine.getThisAddress();
    }

    private boolean loadTpcEnabled() {
        String enabledString = this.nodeEngine.getProperties().getString(TPC_ENABLED);
        boolean enabled0 = enabledString != null ? Boolean.parseBoolean(enabledString) : this.config.getTpcConfig().isEnabled();
        this.logger.info("TPC: " + (enabled0 ? "enabled" : "disabled"));
        return enabled0;
    }

    public boolean isEnabled() {
        return this.enabled;
    }

    public TpcEngine getTpcEngine() {
        return this.tpcEngine;
    }

    public List<Integer> getClientPorts() {
        return this.clientPorts;
    }

    private TpcEngine newTpcEngine() {
        TpcEngineBuilder tpcEngineBuilder = new TpcEngineBuilder();
        NioReactorBuilder reactorBuilder = new NioReactorBuilder();
        reactorBuilder.setThreadFactory(new ThreadFactory(){
            int index;

            @Override
            public Thread newThread(Runnable eventloopRunnable) {
                OperationExecutorImpl operationExecutor = (OperationExecutorImpl)TpcServerBootstrap.this.nodeEngine.getOperationService().getOperationExecutor();
                TpcPartitionOperationThread operationThread = (TpcPartitionOperationThread)operationExecutor.getPartitionThreads()[this.index++];
                operationThread.setEventloopTask(eventloopRunnable);
                return operationThread;
            }
        });
        reactorBuilder.setSchedulerSupplier(() -> new TpcOperationScheduler(1));
        tpcEngineBuilder.setReactorBuilder(reactorBuilder);
        tpcEngineBuilder.setReactorCount(this.loadEventloopCount());
        return tpcEngineBuilder.build();
    }

    public int eventloopCount() {
        return this.loadEventloopCount();
    }

    private int loadEventloopCount() {
        String eventloopCountString = this.nodeEngine.getProperties().getString(TPC_EVENTLOOP_COUNT);
        if (eventloopCountString == null) {
            return this.config.getTpcConfig().getEventloopCount();
        }
        return Integer.parseInt(eventloopCountString);
    }

    public void start() {
        if (!this.enabled) {
            return;
        }
        this.logger.info("Starting TpcServerBootstrap");
        this.tpcEngine = this.newTpcEngine();
        OperationExecutorImpl operationExecutor = (OperationExecutorImpl)this.nodeEngine.getOperationService().getOperationExecutor();
        for (int k = 0; k < operationExecutor.getPartitionThreadCount(); ++k) {
            Reactor reactor = this.tpcEngine.reactor(k);
            TpcPartitionOperationThread partitionThread = (TpcPartitionOperationThread)operationExecutor.getPartitionThreads()[k];
            partitionThread.getQueue().setReactor(reactor);
        }
        this.tpcEngine.start();
        this.openServerSockets();
        this.clientPorts = this.serverSockets.stream().map(AsyncServerSocket::getLocalPort).collect(Collectors.toList());
    }

    private void openServerSockets() {
        TpcSocketConfig clientSocketConfig = this.getClientSocketConfig();
        String[] range = clientSocketConfig.getPortRange().split("-");
        int port = Integer.parseInt(range[0]);
        int limit = Integer.parseInt(range[1]);
        for (int k = 0; k < this.tpcEngine.reactorCount(); ++k) {
            Reactor reactor = this.tpcEngine.reactor(k);
            Supplier<AsyncSocketReader> readHandlerSupplier = () -> new ClientAsyncSocketReader(this.nodeEngine.getNode().clientEngine, this.nodeEngine.getProperties());
            this.readHandlerSuppliers.put(reactor, readHandlerSupplier);
            AsyncServerSocket serverSocket = reactor.newAsyncServerSocketBuilder().set(AsyncSocketOptions.SO_RCVBUF, clientSocketConfig.getReceiveBufferSizeKB() * 1024).setAcceptConsumer(acceptRequest -> reactor.newAsyncSocketBuilder((AcceptRequest)acceptRequest).setReader(this.readHandlerSuppliers.get(reactor).get()).set(AsyncSocketOptions.SO_SNDBUF, clientSocketConfig.getSendBufferSizeKB() * 1024).set(AsyncSocketOptions.SO_RCVBUF, clientSocketConfig.getReceiveBufferSizeKB() * 1024).set(AsyncSocketOptions.TCP_NODELAY, true).set(AsyncSocketOptions.SO_KEEPALIVE, true).build().start()).build();
            this.serverSockets.add(serverSocket);
            port = this.bind(serverSocket, port, limit);
            serverSocket.start();
        }
    }

    public TpcSocketConfig getClientSocketConfig() {
        this.validateSocketConfig();
        if (this.config.getAdvancedNetworkConfig().isEnabled()) {
            ServerSocketEndpointConfig endpointConfig = (ServerSocketEndpointConfig)this.config.getAdvancedNetworkConfig().getEndpointConfigs().get(EndpointQualifier.CLIENT);
            return endpointConfig.getTpcSocketConfig();
        }
        return this.config.getNetworkConfig().getTpcSocketConfig();
    }

    private void validateSocketConfig() {
        AdvancedNetworkConfig advancedNetworkConfig = this.config.getAdvancedNetworkConfig();
        if (advancedNetworkConfig.isEnabled()) {
            TpcSocketConfig defaultTpcSocketConfig = new TpcSocketConfig();
            Map<EndpointQualifier, EndpointConfig> endpointConfigs = advancedNetworkConfig.getEndpointConfigs();
            endpointConfigs.forEach((endpointQualifier, endpointConfig) -> {
                if (endpointQualifier != EndpointQualifier.CLIENT && !endpointConfig.getTpcSocketConfig().equals(defaultTpcSocketConfig)) {
                    throw new InvalidConfigurationException("TPC socket configuration is only available for clients ports for now.");
                }
            });
            if (endpointConfigs.get(EndpointQualifier.CLIENT) == null) {
                throw new InvalidConfigurationException("Missing client server socket configuration. If you have enabled TPC and advanced networking, please configure a client server socket.");
            }
        }
    }

    private int bind(AsyncServerSocket serverSocket, int port, int limit) {
        while (port < limit) {
            try {
                serverSocket.bind(new InetSocketAddress(this.thisAddress.getInetAddress(), port));
                return port + 1;
            }
            catch (UncheckedIOException e) {
                if (e.getCause() instanceof BindException) {
                    port += this.tpcEngine.reactorCount();
                    continue;
                }
                throw e;
            }
            catch (UnknownHostException e) {
                throw new UncheckedIOException(e);
            }
        }
        throw new HazelcastException("Could not find a free port in the TPC socket port range.");
    }

    public void shutdown() {
        if (!this.enabled) {
            return;
        }
        this.logger.info("TpcServerBootstrap shutdown");
        this.tpcEngine.shutdown();
        try {
            this.tpcEngine.awaitTermination(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            this.logger.warning("TpcEngine failed to terminate.");
            Thread.currentThread().interrupt();
        }
        this.logger.info("TpcServerBootstrap terminated");
    }
}

