/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.BranchedKStream;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.BranchedKStreamImpl;
import org.apache.kafka.streams.kstream.internals.ForeachProcessor;
import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.JoinedInternal;
import org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamFilter;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMap;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamGlobalKTableJoin;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin;
import org.apache.kafka.streams.kstream.internals.KStreamKTableJoin;
import org.apache.kafka.streams.kstream.internals.KStreamMap;
import org.apache.kafka.streams.kstream.internals.KStreamMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamPeek;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.NamedInternal;
import org.apache.kafka.streams.kstream.internals.PassThrough;
import org.apache.kafka.streams.kstream.internals.PrintedInternal;
import org.apache.kafka.streams.kstream.internals.ProducedInternal;
import org.apache.kafka.streams.kstream.internals.RepartitionedInternal;
import org.apache.kafka.streams.kstream.internals.StreamJoinedInternal;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorToStateConnectorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode;
import org.apache.kafka.streams.kstream.internals.graph.UnoptimizableRepartitionNode;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer;

public class KStreamImpl<K, V>
extends AbstractStream<K, V>
implements KStream<K, V> {
    static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
    static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
    static final String JOIN_NAME = "KSTREAM-JOIN-";
    static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
    static final String MERGE_NAME = "KSTREAM-MERGE-";
    static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
    static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
    static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
    static final String OUTERSHARED_NAME = "KSTREAM-OUTERSHARED-";
    static final String SOURCE_NAME = "KSTREAM-SOURCE-";
    static final String SINK_NAME = "KSTREAM-SINK-";
    static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
    private static final String FILTER_NAME = "KSTREAM-FILTER-";
    private static final String PEEK_NAME = "KSTREAM-PEEK-";
    private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
    private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
    private static final String MAP_NAME = "KSTREAM-MAP-";
    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
    private static final String PROCESSVALUES_NAME = "KSTREAM-PROCESSVALUES-";
    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
    private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
    private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
    private static final String TO_KTABLE_NAME = "KSTREAM-TOTABLE-";
    private static final String REPARTITION_NAME = "KSTREAM-REPARTITION-";
    private final boolean repartitionRequired;
    private OptimizableRepartitionNode<K, V> repartitionNode;

    KStreamImpl(String name, Serde<K> keySerde, Serde<V> valueSerde, Set<String> subTopologySourceNodes, boolean repartitionRequired, GraphNode graphNode, InternalStreamsBuilder builder) {
        super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
        this.repartitionRequired = repartitionRequired;
    }

    @Override
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate) {
        return this.filter(predicate, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate cannot be null");
        Objects.requireNonNull(named, "named cannot be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FILTER_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFilter<K, V>(predicate, false), name);
        ProcessorGraphNode filterProcessorNode = new ProcessorGraphNode(name, processorParameters);
        this.builder.addGraphNode(this.graphNode, filterProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valueSerde, this.subTopologySourceNodes, this.repartitionRequired, filterProcessorNode, this.builder);
    }

    @Override
    public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate) {
        return this.filterNot(predicate, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate cannot be null");
        Objects.requireNonNull(named, "named cannot be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FILTER_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFilter<K, V>(predicate, true), name);
        ProcessorGraphNode filterNotProcessorNode = new ProcessorGraphNode(name, processorParameters);
        this.builder.addGraphNode(this.graphNode, filterNotProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valueSerde, this.subTopologySourceNodes, this.repartitionRequired, filterNotProcessorNode, this.builder);
    }

    @Override
    public <KOut> KStream<KOut, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KOut> mapper) {
        return this.selectKey(mapper, NamedInternal.empty());
    }

    @Override
    public <KOut> KStream<KOut, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KOut> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper cannot be null");
        Objects.requireNonNull(named, "named cannot be null");
        ProcessorGraphNode<K, V> selectKeyProcessorNode = this.internalSelectKey(mapper, new NamedInternal(named));
        selectKeyProcessorNode.setKeyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, selectKeyProcessorNode);
        return new KStreamImpl<K, V>(selectKeyProcessorNode.nodeName(), null, this.valueSerde, this.subTopologySourceNodes, true, selectKeyProcessorNode, this.builder);
    }

    private <KR> ProcessorGraphNode<K, V> internalSelectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper, NamedInternal named) {
        String name = named.orElseGenerateWithPrefix(this.builder, KEY_SELECT_NAME);
        KStreamMap kStreamMap = new KStreamMap((key, value) -> new KeyValue(mapper.apply(key, value), value));
        ProcessorParameters processorParameters = new ProcessorParameters(kStreamMap, name);
        return new ProcessorGraphNode<Object, Object>(name, processorParameters);
    }

    @Override
    public <VOut> KStream<K, VOut> mapValues(ValueMapper<? super V, ? extends VOut> mapper) {
        return this.mapValues(KStreamImpl.withKey(mapper), (Named)NamedInternal.empty());
    }

    @Override
    public <VOut> KStream<K, VOut> mapValues(ValueMapper<? super V, ? extends VOut> mapper, Named named) {
        return this.mapValues(KStreamImpl.withKey(mapper), named);
    }

    @Override
    public <VOut> KStream<K, VOut> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper) {
        return this.mapValues(mapper, (Named)NamedInternal.empty());
    }

    @Override
    public <VOut> KStream<K, VOut> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper cannot be null");
        Objects.requireNonNull(named, "named cannot be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, MAPVALUES_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamMapValues<K, V, VOut>(mapper), name);
        ProcessorGraphNode<K, V> mapValuesProcessorNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        mapValuesProcessorNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, mapValuesProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, mapValuesProcessorNode, this.builder);
    }

    @Override
    public <KOut, VOut> KStream<KOut, VOut> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper) {
        return this.map(mapper, NamedInternal.empty());
    }

    @Override
    public <KOut, VOut> KStream<KOut, VOut> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper cannot be null");
        Objects.requireNonNull(named, "named cannot be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, MAP_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamMap(mapper), name);
        ProcessorGraphNode<K, V> mapProcessorNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        mapProcessorNode.setKeyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, mapProcessorNode);
        return new KStreamImpl<K, V>(name, null, null, this.subTopologySourceNodes, true, mapProcessorNode, this.builder);
    }

    @Override
    public <KOut, VOut> KStream<KOut, VOut> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper) {
        return this.flatMap(mapper, (Named)NamedInternal.empty());
    }

    @Override
    public <KOut, VOut> KStream<KOut, VOut> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper cannot be null");
        Objects.requireNonNull(named, "named cannot be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FLATMAP_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFlatMap(mapper), name);
        ProcessorGraphNode<K, V> flatMapNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        flatMapNode.setKeyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, flatMapNode);
        return new KStreamImpl<K, V>(name, null, null, this.subTopologySourceNodes, true, flatMapNode, this.builder);
    }

    @Override
    public <VOut> KStream<K, VOut> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VOut>> mapper) {
        return this.flatMapValues(KStreamImpl.withKey(mapper), (Named)NamedInternal.empty());
    }

    @Override
    public <VOut> KStream<K, VOut> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VOut>> mapper, Named named) {
        return this.flatMapValues(KStreamImpl.withKey(mapper), named);
    }

    @Override
    public <VOut> KStream<K, VOut> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VOut>> mapper) {
        return this.flatMapValues(mapper, (Named)NamedInternal.empty());
    }

    @Override
    public <VOut> KStream<K, VOut> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VOut>> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper cannot be null");
        Objects.requireNonNull(named, "named cannot be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FLATMAPVALUES_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFlatMapValues(mapper), name);
        ProcessorGraphNode<K, V> flatMapValuesNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        flatMapValuesNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, flatMapValuesNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, flatMapValuesNode, this.builder);
    }

    @Override
    public void print(Printed<K, V> printed) {
        Objects.requireNonNull(printed, "printed cannot be null");
        PrintedInternal<K, V> printedInternal = new PrintedInternal<K, V>(printed);
        String name = new NamedInternal(printedInternal.name()).orElseGenerateWithPrefix(this.builder, PRINTING_NAME);
        ProcessorParameters<K, V, Void, Void> processorParameters = new ProcessorParameters<K, V, Void, Void>(printedInternal.build(this.name), name);
        ProcessorGraphNode<K, V> printNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        this.builder.addGraphNode(this.graphNode, printNode);
    }

    @Override
    public void foreach(ForeachAction<? super K, ? super V> action) {
        this.foreach(action, NamedInternal.empty());
    }

    @Override
    public void foreach(ForeachAction<? super K, ? super V> action, Named named) {
        Objects.requireNonNull(action, "action cannot be null");
        Objects.requireNonNull(named, "named cannot be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FOREACH_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(() -> new ForeachProcessor(action), name);
        ProcessorGraphNode foreachNode = new ProcessorGraphNode(name, processorParameters);
        this.builder.addGraphNode(this.graphNode, foreachNode);
    }

    @Override
    public KStream<K, V> peek(ForeachAction<? super K, ? super V> action) {
        return this.peek(action, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> peek(ForeachAction<? super K, ? super V> action, Named named) {
        Objects.requireNonNull(action, "action cannot be null");
        Objects.requireNonNull(named, "named cannot be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, PEEK_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamPeek<K, V>(action), name);
        ProcessorGraphNode peekNode = new ProcessorGraphNode(name, processorParameters);
        this.builder.addGraphNode(this.graphNode, peekNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valueSerde, this.subTopologySourceNodes, this.repartitionRequired, peekNode, this.builder);
    }

    @Override
    public BranchedKStream<K, V> split() {
        return new BranchedKStreamImpl(this, this.repartitionRequired, NamedInternal.empty());
    }

    @Override
    public BranchedKStream<K, V> split(Named named) {
        Objects.requireNonNull(named, "named cannot be null");
        return new BranchedKStreamImpl(this, this.repartitionRequired, new NamedInternal(named));
    }

    @Override
    public KStream<K, V> merge(KStream<K, V> otherStream) {
        return this.doMerge(this.builder, otherStream, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> merge(KStream<K, V> otherStream, Named named) {
        return this.doMerge(this.builder, otherStream, new NamedInternal(named));
    }

    private KStream<K, V> doMerge(InternalStreamsBuilder builder, KStream<K, V> otherStream, NamedInternal named) {
        Objects.requireNonNull(otherStream, "otherStream cannot be null");
        Objects.requireNonNull(named, "named cannot be null");
        KStreamImpl streamImpl = (KStreamImpl)otherStream;
        boolean requireRepartitioning = streamImpl.repartitionRequired || this.repartitionRequired;
        String name = named.orElseGenerateWithPrefix(builder, MERGE_NAME);
        HashSet<String> allSubTopologySourceNodes = new HashSet<String>();
        allSubTopologySourceNodes.addAll(this.subTopologySourceNodes);
        allSubTopologySourceNodes.addAll(streamImpl.subTopologySourceNodes);
        ProcessorParameters processorParameters = new ProcessorParameters(new PassThrough(), name);
        ProcessorGraphNode mergeNode = new ProcessorGraphNode(name, processorParameters);
        mergeNode.setMergeNode(true);
        builder.addGraphNode(Arrays.asList(this.graphNode, streamImpl.graphNode), mergeNode);
        return new KStreamImpl<K, V>(name, null, null, allSubTopologySourceNodes, requireRepartitioning, mergeNode, builder);
    }

    @Override
    public KStream<K, V> repartition() {
        return this.doRepartition(Repartitioned.as(null));
    }

    @Override
    public KStream<K, V> repartition(Repartitioned<K, V> repartitioned) {
        return this.doRepartition(repartitioned);
    }

    private KStream<K, V> doRepartition(Repartitioned<K, V> repartitioned) {
        Objects.requireNonNull(repartitioned, "repartitioned cannot be null");
        RepartitionedInternal<K, V> repartitionedInternal = new RepartitionedInternal<K, V>(repartitioned);
        String name = repartitionedInternal.name() != null ? repartitionedInternal.name() : this.builder.newProcessorName(REPARTITION_NAME);
        Serde<V> valueSerde = repartitionedInternal.valueSerde() == null ? this.valueSerde : repartitionedInternal.valueSerde();
        Serde<K> keySerde = repartitionedInternal.keySerde() == null ? this.keySerde : repartitionedInternal.keySerde();
        UnoptimizableRepartitionNode.UnoptimizableRepartitionNodeBuilder unoptimizableRepartitionNodeBuilder = UnoptimizableRepartitionNode.unoptimizableRepartitionNodeBuilder();
        InternalTopicProperties internalTopicProperties = repartitionedInternal.toInternalTopicProperties();
        String repartitionSourceName = KStreamImpl.createRepartitionedSource(this.builder, repartitionedInternal.keySerde(), valueSerde, name, repartitionedInternal.streamPartitioner(), unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties), repartitionedInternal.name() != null);
        BaseRepartitionNode unoptimizableRepartitionNode = unoptimizableRepartitionNodeBuilder.build();
        this.builder.addGraphNode(this.graphNode, (GraphNode)unoptimizableRepartitionNode);
        HashSet<String> sourceNodes = new HashSet<String>();
        sourceNodes.add(unoptimizableRepartitionNode.nodeName());
        return new KStreamImpl<K, V>(repartitionSourceName, keySerde, valueSerde, Collections.unmodifiableSet(sourceNodes), false, unoptimizableRepartitionNode, this.builder);
    }

    @Override
    public void to(String topic) {
        this.to(new StaticTopicNameExtractor(topic), Produced.with(this.keySerde, this.valueSerde, null));
    }

    @Override
    public void to(String topic, Produced<K, V> produced) {
        this.to(new StaticTopicNameExtractor(topic), produced);
    }

    @Override
    public void to(TopicNameExtractor<K, V> topicExtractor) {
        this.to(topicExtractor, Produced.with(this.keySerde, this.valueSerde, null));
    }

    @Override
    public void to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced) {
        Objects.requireNonNull(topicExtractor, "topicExtractor cannot be null");
        Objects.requireNonNull(produced, "produced cannot be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<K, V>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valueSerde);
        }
        String name = new NamedInternal(producedInternal.name()).orElseGenerateWithPrefix(this.builder, SINK_NAME);
        StreamSinkNode<K, V> sinkNode = new StreamSinkNode<K, V>(name, topicExtractor, producedInternal);
        this.builder.addGraphNode(this.graphNode, sinkNode);
    }

    @Override
    public KTable<K, V> toTable() {
        return this.toTable(NamedInternal.empty(), Materialized.with(this.keySerde, this.valueSerde));
    }

    @Override
    public KTable<K, V> toTable(Named named) {
        return this.toTable(named, Materialized.with(this.keySerde, this.valueSerde));
    }

    @Override
    public KTable<K, V> toTable(Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.toTable(NamedInternal.empty(), materialized);
    }

    @Override
    public KTable<K, V> toTable(Named named, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Set<String> subTopologySourceNodes;
        GraphNode tableParentNode;
        Serde<V> valueSerdeOverride;
        Objects.requireNonNull(named, "named cannot be null");
        Objects.requireNonNull(materialized, "materialized cannot be null");
        NamedInternal namedInternal = new NamedInternal(named);
        String name = namedInternal.orElseGenerateWithPrefix(this.builder, TO_KTABLE_NAME);
        MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, TO_KTABLE_NAME);
        Serde<K> keySerdeOverride = materializedInternal.keySerde() == null ? this.keySerde : materializedInternal.keySerde();
        Serde<V> serde = valueSerdeOverride = materializedInternal.valueSerde() == null ? this.valueSerde : materializedInternal.valueSerde();
        if (this.repartitionRequired) {
            OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
            String sourceName = KStreamImpl.createRepartitionedSource(this.builder, keySerdeOverride, valueSerdeOverride, name, null, repartitionNodeBuilder, namedInternal.name() != null);
            tableParentNode = repartitionNodeBuilder.build();
            this.builder.addGraphNode(this.graphNode, tableParentNode);
            subTopologySourceNodes = Collections.singleton(sourceName);
        } else {
            tableParentNode = this.graphNode;
            subTopologySourceNodes = this.subTopologySourceNodes;
        }
        KTableSource<K, V> tableSource = new KTableSource<K, V>(materializedInternal);
        ProcessorParameters processorParameters = new ProcessorParameters(tableSource, name);
        StreamToTableNode tableNode = new StreamToTableNode(name, processorParameters);
        tableNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier);
        this.builder.addGraphNode(tableParentNode, tableNode);
        return new KTableImpl(name, keySerdeOverride, valueSerdeOverride, subTopologySourceNodes, materializedInternal.queryableStoreName(), tableSource, tableNode, this.builder);
    }

    @Override
    public KGroupedStream<K, V> groupByKey() {
        return this.groupByKey(Grouped.with(this.keySerde, this.valueSerde));
    }

    @Override
    public KGroupedStream<K, V> groupByKey(Grouped<K, V> grouped) {
        Objects.requireNonNull(grouped, "grouped cannot be null");
        GroupedInternal<K, V> groupedInternal = new GroupedInternal<K, V>(grouped);
        return new KGroupedStreamImpl<K, V>(this.name, this.subTopologySourceNodes, groupedInternal, this.repartitionRequired, this.graphNode, this.builder);
    }

    @Override
    public <KOut> KGroupedStream<KOut, V> groupBy(KeyValueMapper<? super K, ? super V, KOut> keySelector) {
        return this.groupBy(keySelector, Grouped.with(null, this.valueSerde));
    }

    @Override
    public <KOut> KGroupedStream<KOut, V> groupBy(KeyValueMapper<? super K, ? super V, KOut> keySelector, Grouped<KOut, V> grouped) {
        Objects.requireNonNull(keySelector, "keySelector cannot be null");
        Objects.requireNonNull(grouped, "grouped cannot be null");
        GroupedInternal<KOut, V> groupedInternal = new GroupedInternal<KOut, V>(grouped);
        ProcessorGraphNode<K, V> selectKeyMapNode = this.internalSelectKey(keySelector, new NamedInternal(groupedInternal.name()));
        selectKeyMapNode.setKeyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, selectKeyMapNode);
        return new KGroupedStreamImpl<KOut, V>(selectKeyMapNode.nodeName(), this.subTopologySourceNodes, groupedInternal, true, selectKeyMapNode, this.builder);
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> join(KStream<K, VRight> otherStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) {
        return this.doJoin(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows, StreamJoined.with(null, null, null), new KStreamImplJoin(this.builder, false, false));
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> join(KStream<K, VRight> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) {
        return this.doJoin(otherStream, joiner, windows, StreamJoined.with(null, null, null), new KStreamImplJoin(this.builder, false, false));
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> join(KStream<K, VRight> otherStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) {
        return this.doJoin(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows, streamJoined, new KStreamImplJoin(this.builder, false, false));
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> join(KStream<K, VRight> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) {
        return this.doJoin(otherStream, joiner, windows, streamJoined, new KStreamImplJoin(this.builder, false, false));
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> leftJoin(KStream<K, VRight> otherStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) {
        return this.doJoin(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows, StreamJoined.with(null, null, null), new KStreamImplJoin(this.builder, true, false));
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> leftJoin(KStream<K, VRight> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) {
        return this.doJoin(otherStream, joiner, windows, StreamJoined.with(null, null, null), new KStreamImplJoin(this.builder, true, false));
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> leftJoin(KStream<K, VRight> otherStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) {
        return this.doJoin(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows, streamJoined, new KStreamImplJoin(this.builder, true, false));
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> leftJoin(KStream<K, VRight> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) {
        return this.doJoin(otherStream, joiner, windows, streamJoined, new KStreamImplJoin(this.builder, true, false));
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> outerJoin(KStream<K, VRight> otherStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) {
        return this.doJoin(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows, StreamJoined.with(null, null, null), new KStreamImplJoin(this.builder, true, true));
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> outerJoin(KStream<K, VRight> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows) {
        return this.doJoin(otherStream, joiner, windows, StreamJoined.with(null, null, null), new KStreamImplJoin(this.builder, true, true));
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> outerJoin(KStream<K, VRight> otherStream, ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) {
        return this.doJoin(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows, streamJoined, new KStreamImplJoin(this.builder, true, true));
    }

    @Override
    public <VRight, VOut> KStream<K, VOut> outerJoin(KStream<K, VRight> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined) {
        return this.doJoin(otherStream, joiner, windows, streamJoined, new KStreamImplJoin(this.builder, true, true));
    }

    private <VRight, VOut> KStream<K, VOut> doJoin(KStream<K, VRight> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, JoinWindows windows, StreamJoined<K, V, VRight> streamJoined, KStreamImplJoin join) {
        Objects.requireNonNull(otherStream, "otherStream cannot be null");
        Objects.requireNonNull(joiner, "joiner cannot be null");
        Objects.requireNonNull(windows, "windows cannot be null");
        Objects.requireNonNull(streamJoined, "streamJoined cannot be null");
        KStreamImpl<K, V> joinThis = this;
        KStreamImpl<K, VRight> joinOther = (KStreamImpl<K, VRight>)otherStream;
        StreamJoinedInternal<K, V, VRight> streamJoinedInternal = new StreamJoinedInternal<K, V, VRight>(streamJoined, this.builder);
        NamedInternal name = new NamedInternal(streamJoinedInternal.name());
        if (joinThis.repartitionRequired) {
            String joinThisName = joinThis.name;
            String leftJoinRepartitionTopicName = name.suffixWithOrElseGet("-left", joinThisName);
            joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde(), name.name() != null);
        }
        if (joinOther.repartitionRequired) {
            String joinOtherName = joinOther.name;
            String rightJoinRepartitionTopicName = name.suffixWithOrElseGet("-right", joinOtherName);
            joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde(), name.name() != null);
        }
        joinThis.ensureCopartitionWith(Collections.singleton(joinOther));
        return join.join(joinThis, joinOther, joiner, windows, streamJoined);
    }

    private KStreamImpl<K, V> repartitionForJoin(String repartitionName, Serde<K> keySerdeOverride, Serde<V> valueSerdeOverride, boolean isRepartitionTopicNameProvidedByUser) {
        Serde repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : this.keySerde;
        Serde repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : this.valueSerde;
        OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
        String repartitionedSourceName = KStreamImpl.createRepartitionedSource(this.builder, repartitionKeySerde, repartitionValueSerde, repartitionName, null, optimizableRepartitionNodeBuilder, isRepartitionTopicNameProvidedByUser);
        if (this.repartitionNode == null || !this.name.equals(repartitionName)) {
            this.repartitionNode = optimizableRepartitionNodeBuilder.build();
            this.builder.addGraphNode(this.graphNode, this.repartitionNode);
        }
        return new KStreamImpl<K, V>(repartitionedSourceName, repartitionKeySerde, repartitionValueSerde, Collections.singleton(repartitionedSourceName), false, this.repartitionNode, this.builder);
    }

    static <Key, Value, RepartitionNode extends BaseRepartitionNode<Key, Value>> String createRepartitionedSource(InternalStreamsBuilder builder, Serde<Key> keySerde, Serde<Value> valueSerde, String repartitionTopicNamePrefix, StreamPartitioner<Key, Value> streamPartitioner, BaseRepartitionNode.BaseRepartitionNodeBuilder<Key, Value, RepartitionNode> baseRepartitionNodeBuilder, boolean isRepartitionTopicNameProvidedByUser) {
        Object nullKeyFilterProcessorName;
        Object sourceName;
        Object sinkName;
        Object repartitionTopicName;
        Object object = repartitionTopicName = repartitionTopicNamePrefix.endsWith(REPARTITION_TOPIC_SUFFIX) ? repartitionTopicNamePrefix : repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
        if (!isRepartitionTopicNameProvidedByUser) {
            builder.internalTopologyBuilder().addImplicitInternalNames(InternalResourcesNaming.builder().withRepartitionTopic((String)repartitionTopicName).build());
        }
        String genSinkName = builder.newProcessorName(SINK_NAME);
        String genNullKeyFilterProcessorName = builder.newProcessorName(FILTER_NAME);
        String genSourceName = builder.newProcessorName(SOURCE_NAME);
        if (repartitionTopicNamePrefix.matches("KSTREAM.*-[0-9]{10}")) {
            sinkName = genSinkName;
            sourceName = genSourceName;
            nullKeyFilterProcessorName = genNullKeyFilterProcessorName;
        } else {
            sinkName = (String)repartitionTopicName + "-sink";
            sourceName = (String)repartitionTopicName + "-source";
            nullKeyFilterProcessorName = (String)repartitionTopicName + "-filter";
        }
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFilter<Object, Object>((k, v) -> true, false), (String)nullKeyFilterProcessorName);
        baseRepartitionNodeBuilder.withKeySerde(keySerde).withValueSerde(valueSerde).withSourceName((String)sourceName).withRepartitionTopic((String)repartitionTopicName).withSinkName((String)sinkName).withProcessorParameters(processorParameters).withStreamPartitioner(streamPartitioner).withNodeName((String)sourceName);
        return sourceName;
    }

    @Override
    public <TableValue, VOut> KStream<K, VOut> join(KTable<K, TableValue> table, ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner) {
        return this.join(table, KStreamImpl.toValueJoinerWithKey(joiner), Joined.with(null, null, null));
    }

    @Override
    public <TableValue, VOut> KStream<K, VOut> join(KTable<K, TableValue> table, ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner) {
        return this.join(table, joiner, Joined.with(null, null, null));
    }

    @Override
    public <TableValue, VOut> KStream<K, VOut> join(KTable<K, TableValue> table, ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner, Joined<K, V, TableValue> joined) {
        return this.join(table, KStreamImpl.toValueJoinerWithKey(joiner), joined);
    }

    @Override
    public <TableValue, VOut> KStream<K, VOut> join(KTable<K, TableValue> table, ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner, Joined<K, V, TableValue> joined) {
        Objects.requireNonNull(table, "table cannot be null");
        Objects.requireNonNull(joiner, "joiner cannot be null");
        Objects.requireNonNull(joined, "joined cannot be null");
        JoinedInternal<K, V, TableValue> joinedInternal = new JoinedInternal<K, V, TableValue>(joined);
        String name = joinedInternal.name();
        if (this.repartitionRequired) {
            KStreamImpl<? super K, ? super V> thisStreamRepartitioned = this.repartitionForJoin(name != null ? name : this.name, joinedInternal.keySerde(), joinedInternal.leftValueSerde(), name != null);
            return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, false);
        }
        return this.doStreamTableJoin(table, joiner, joinedInternal, false);
    }

    @Override
    public <VTable, VOut> KStream<K, VOut> leftJoin(KTable<K, VTable> table, ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner) {
        return this.leftJoin(table, KStreamImpl.toValueJoinerWithKey(joiner), Joined.with(null, null, null));
    }

    @Override
    public <VTable, VOut> KStream<K, VOut> leftJoin(KTable<K, VTable> table, ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner) {
        return this.leftJoin(table, joiner, Joined.with(null, null, null));
    }

    @Override
    public <VTable, VOut> KStream<K, VOut> leftJoin(KTable<K, VTable> table, ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner, Joined<K, V, VTable> joined) {
        return this.leftJoin(table, KStreamImpl.toValueJoinerWithKey(joiner), joined);
    }

    @Override
    public <VTable, VOut> KStream<K, VOut> leftJoin(KTable<K, VTable> table, ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner, Joined<K, V, VTable> joined) {
        Objects.requireNonNull(table, "table cannot be null");
        Objects.requireNonNull(joiner, "joiner cannot be null");
        Objects.requireNonNull(joined, "joined cannot be null");
        JoinedInternal<K, V, VTable> joinedInternal = new JoinedInternal<K, V, VTable>(joined);
        String name = joinedInternal.name();
        if (this.repartitionRequired) {
            KStreamImpl<? super K, ? super V> thisStreamRepartitioned = this.repartitionForJoin(name != null ? name : this.name, joinedInternal.keySerde(), joinedInternal.leftValueSerde(), name != null);
            return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, true);
        }
        return this.doStreamTableJoin(table, joiner, joinedInternal, true);
    }

    private <VTable, VOut> KStream<K, VOut> doStreamTableJoin(KTable<K, VTable> table, ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner, JoinedInternal<K, V, VTable> joinedInternal, boolean leftJoin) {
        Set<String> allSourceNodes = this.ensureCopartitionWith(Collections.singleton((AbstractStream)((Object)table)));
        NamedInternal renamed = new NamedInternal(joinedInternal.name());
        String name = renamed.orElseGenerateWithPrefix(this.builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
        Optional<StoreBuilder<?>> bufferStoreBuilder = Optional.empty();
        if (joinedInternal.gracePeriod() != null) {
            if (!((KTableImpl)table).graphNode.isOutputVersioned().orElse(true).booleanValue()) {
                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
            }
            String bufferName = name + "-Buffer";
            bufferStoreBuilder = Optional.of(new RocksDBTimeOrderedKeyValueBuffer.Builder(bufferName, joinedInternal.keySerde() != null ? joinedInternal.keySerde() : this.keySerde, joinedInternal.leftValueSerde() != null ? joinedInternal.leftValueSerde() : this.valueSerde, joinedInternal.gracePeriod(), name));
            if (joinedInternal.name() == null) {
                InternalResourcesNaming internalResourcesNaming = InternalResourcesNaming.builder().withStateStore(bufferName).withChangelogTopic(bufferName + "-changelog").build();
                this.internalTopologyBuilder().addImplicitInternalNames(internalResourcesNaming);
            }
        }
        KStreamKTableJoin<? super K, ? super V, ? super VTable, ? extends VOut> processorSupplier = new KStreamKTableJoin<K, V, VTable, VOut>(((KTableImpl)table).valueGetterSupplier(), joiner, leftJoin, Optional.ofNullable(joinedInternal.gracePeriod()), bufferStoreBuilder);
        ProcessorParameters<? super K, ? super V, ? super VTable, ? extends VOut> processorParameters = new ProcessorParameters<K, V, VTable, VOut>(processorSupplier, name);
        StreamTableJoinNode<VTable, V, VOut> streamTableJoinNode = new StreamTableJoinNode<VTable, V, VOut>(name, processorParameters, ((KTableImpl)table).valueGetterSupplier().storeNames(), this.name, joinedInternal.gracePeriod());
        this.builder.addGraphNode(this.graphNode, streamTableJoinNode);
        if (leftJoin) {
            streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
        }
        return new KStreamImpl<K, V>(name, joinedInternal.keySerde() != null ? joinedInternal.keySerde() : this.keySerde, null, allSourceNodes, false, streamTableJoinNode, this.builder);
    }

    @Override
    public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner) {
        return this.doGlobalTableJoin(globalTable, keySelector, KStreamImpl.toValueJoinerWithKey(joiner), false, NamedInternal.empty());
    }

    @Override
    public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner) {
        return this.doGlobalTableJoin(globalTable, keySelector, joiner, false, NamedInternal.empty());
    }

    @Override
    public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) {
        return this.doGlobalTableJoin(globalTable, keySelector, KStreamImpl.toValueJoinerWithKey(joiner), false, named);
    }

    @Override
    public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) {
        return this.doGlobalTableJoin(globalTable, keySelector, joiner, false, named);
    }

    @Override
    public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner) {
        return this.doGlobalTableJoin(globalTable, keySelector, KStreamImpl.toValueJoinerWithKey(joiner), true, NamedInternal.empty());
    }

    @Override
    public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner) {
        return this.doGlobalTableJoin(globalTable, keySelector, joiner, true, NamedInternal.empty());
    }

    @Override
    public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) {
        return this.doGlobalTableJoin(globalTable, keySelector, KStreamImpl.toValueJoinerWithKey(joiner), true, named);
    }

    @Override
    public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner, Named named) {
        return this.doGlobalTableJoin(globalTable, keySelector, joiner, true, named);
    }

    private <GlobalKey, GlobalValue, VOut> KStream<K, VOut> doGlobalTableJoin(GlobalKTable<GlobalKey, GlobalValue> globalTable, KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner, boolean leftJoin, Named named) {
        Objects.requireNonNull(globalTable, "globalTable cannot be null");
        Objects.requireNonNull(keySelector, "keySelector cannot be null");
        Objects.requireNonNull(joiner, "joiner cannot be null");
        Objects.requireNonNull(named, "named cannot be null");
        KTableValueGetterSupplier valueGetterSupplier = ((GlobalKTableImpl)globalTable).valueGetterSupplier();
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, LEFTJOIN_NAME);
        KStreamGlobalKTableJoin<? super K, ? super V, ? extends GlobalKey, ? super GlobalValue, ? extends VOut> processorSupplier = new KStreamGlobalKTableJoin<K, V, GlobalKey, GlobalValue, VOut>(valueGetterSupplier, joiner, keySelector, leftJoin);
        ProcessorParameters processorParameters = new ProcessorParameters(processorSupplier, name);
        StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode(name, processorParameters, new String[0], null, null);
        if (leftJoin) {
            streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
        }
        this.builder.addGraphNode(this.graphNode, streamTableJoinNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, streamTableJoinNode, this.builder);
    }

    @Override
    public <KOut, VOut> KStream<KOut, VOut> process(ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier, String ... stateStoreNames) {
        return this.process(processorSupplier, Named.as(this.builder.newProcessorName(PROCESSOR_NAME)), stateStoreNames);
    }

    @Override
    public <KOut, VOut> KStream<KOut, VOut> process(ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier, Named named, String ... stateStoreNames) {
        ApiUtils.checkSupplier(processorSupplier);
        Objects.requireNonNull(named, "named cannot be null");
        Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot be a null array");
        for (String stateStoreName : stateStoreNames) {
            Objects.requireNonNull(stateStoreName, "state store name cannot be null");
        }
        String name = new NamedInternal(named).name();
        ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<K, V>(name, new ProcessorParameters<K, V, KOut, VOut>(processorSupplier, name), stateStoreNames);
        this.builder.addGraphNode(this.graphNode, processNode);
        return new KStreamImpl<K, V>(name, null, null, this.subTopologySourceNodes, true, processNode, this.builder);
    }

    @Override
    public <VOut> KStream<K, VOut> processValues(FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, String ... stateStoreNames) {
        return this.processValues(processorSupplier, Named.as(this.builder.newProcessorName(PROCESSVALUES_NAME)), stateStoreNames);
    }

    @Override
    public <VOut> KStream<K, VOut> processValues(FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, Named named, String ... stateStoreNames) {
        ApiUtils.checkSupplier(processorSupplier);
        Objects.requireNonNull(named, "named cannot be null");
        Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot be a null array");
        for (String stateStoreName : stateStoreNames) {
            Objects.requireNonNull(stateStoreName, "state store name cannot be null");
        }
        String name = new NamedInternal(named).name();
        ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<K, V>(name, new ProcessorParameters(processorSupplier, name), stateStoreNames);
        this.builder.addGraphNode(this.graphNode, processNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, processNode, this.builder);
    }
}

