/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.impl.ProcessorClassLoaderTLHolder;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.pipeline.Planner;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.StreamSource;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collections;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class StreamSourceTransform<T>
extends AbstractTransform
implements StreamSource<T> {
    private static final long serialVersionUID = 1L;
    public FunctionEx<? super EventTimePolicy<? super T>, ? extends ProcessorMetaSupplier> metaSupplierFn;
    private boolean isAssignedToStage;
    private boolean emitsWatermarks;
    @Nullable
    private EventTimePolicy<? super T> eventTimePolicy;
    private boolean supportsNativeTimestamps;
    private long partitionIdleTimeout = 60000L;

    public StreamSourceTransform(@Nonnull String name, @Nonnull FunctionEx<? super EventTimePolicy<? super T>, ? extends ProcessorMetaSupplier> metaSupplierFn, boolean emitsWatermarks, boolean supportsNativeTimestamps) {
        super(name, Collections.emptyList());
        this.metaSupplierFn = metaSupplierFn;
        this.emitsWatermarks = emitsWatermarks;
        this.supportsNativeTimestamps = supportsNativeTimestamps;
    }

    public void onAssignToStage() {
        if (this.isAssignedToStage) {
            throw new IllegalStateException("Source " + this.name() + " was already assigned to a source stage");
        }
        this.isAssignedToStage = true;
    }

    @Override
    public void addToDag(Planner p, PipelineImpl.Context context) {
        if (this.emitsWatermarks || this.eventTimePolicy == null) {
            ProcessorMetaSupplier metaSupplier = this.metaSupplierFn.apply(this.eventTimePolicy != null ? this.eventTimePolicy : EventTimePolicy.noEventTime());
            this.determineLocalParallelism(metaSupplier.preferredLocalParallelism(), context, false);
            p.addVertex((Transform)this, this.name(), this.determinedLocalParallelism(), metaSupplier);
        } else {
            String v1name = this.name();
            ProcessorMetaSupplier metaSupplier = this.metaSupplierFn.apply(this.eventTimePolicy);
            this.determineLocalParallelism(metaSupplier.preferredLocalParallelism(), context, false);
            Vertex v1 = p.dag.newVertex(v1name, metaSupplier).localParallelism(this.determinedLocalParallelism());
            Planner.PlannerVertex pv2 = p.addVertex((Transform)this, v1name + "-add-timestamps", this.determinedLocalParallelism(), Processors.insertWatermarksP(this.eventTimePolicy));
            p.dag.edge(Edge.between(v1, pv2.v).isolated());
        }
    }

    @Nullable
    public EventTimePolicy<? super T> getEventTimePolicy() {
        return this.eventTimePolicy;
    }

    public void setEventTimePolicy(@Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        this.eventTimePolicy = eventTimePolicy;
    }

    public boolean emitsJetEvents() {
        return this.eventTimePolicy != null;
    }

    @Override
    public boolean supportsNativeTimestamps() {
        return this.supportsNativeTimestamps;
    }

    @Override
    public StreamSource<T> setPartitionIdleTimeout(long timeoutMillis) {
        Preconditions.checkNotNegative(timeoutMillis, "timeout must be >= 0 (0 means disabled)");
        this.partitionIdleTimeout = timeoutMillis;
        return this;
    }

    @Override
    public long partitionIdleTimeout() {
        return this.partitionIdleTimeout;
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.writeObject(this.metaSupplierFn);
        out.writeBoolean(this.isAssignedToStage);
        out.writeBoolean(this.emitsWatermarks);
        out.writeObject(this.eventTimePolicy);
        out.writeBoolean(this.supportsNativeTimestamps);
        out.writeLong(this.partitionIdleTimeout);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        this.metaSupplierFn = Util.doWithClassLoader(ProcessorClassLoaderTLHolder.get(this.name()), () -> (FunctionEx)in.readObject());
        this.isAssignedToStage = in.readBoolean();
        this.emitsWatermarks = in.readBoolean();
        this.eventTimePolicy = (EventTimePolicy)in.readObject();
        this.supportsNativeTimestamps = in.readBoolean();
        this.partitionIdleTimeout = in.readLong();
    }
}

