/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.cluster.coordination;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.IncompatibleClusterStateVersionException;
import org.opensearch.cluster.coordination.ApplyCommitRequest;
import org.opensearch.cluster.coordination.CompressedStreamUtils;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.coordination.PublishClusterStateStats;
import org.opensearch.cluster.coordination.PublishRequest;
import org.opensearch.cluster.coordination.PublishWithJoinResponse;
import org.opensearch.cluster.coordination.RemotePublishRequest;
import org.opensearch.cluster.coordination.RemoteStatePublishRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.TriConsumer;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.transport.BytesTransportRequest;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

public class PublicationTransportHandler {
    private static final Logger logger = LogManager.getLogger(PublicationTransportHandler.class);
    public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state";
    public static final String PUBLISH_REMOTE_STATE_ACTION_NAME = "internal:cluster/coordination/publish_remote_state";
    public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state";
    private final TransportService transportService;
    private final NamedWriteableRegistry namedWriteableRegistry;
    private final Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest;
    private final AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference();
    private final AtomicReference<PublishRequest> currentPublishRequestToSelf = new AtomicReference();
    private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
    private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
    private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
    private final AtomicBoolean allNodesRemotePublicationEnabled = new AtomicBoolean();
    private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
    private final RemoteClusterStateService remoteClusterStateService;

    public PublicationTransportHandler(TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest, TriConsumer<ApplyCommitRequest, Consumer<ClusterState>, ActionListener<Void>> handleApplyCommit, RemoteClusterStateService remoteClusterStateService) {
        this.transportService = transportService;
        this.namedWriteableRegistry = namedWriteableRegistry;
        this.handlePublishRequest = handlePublishRequest;
        this.remoteClusterStateService = remoteClusterStateService;
        transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, "generic", false, false, BytesTransportRequest::new, (request, channel, task) -> channel.sendResponse(this.handleIncomingPublishRequest((BytesTransportRequest)request)));
        transportService.registerRequestHandler(PUBLISH_REMOTE_STATE_ACTION_NAME, "generic", false, false, RemotePublishRequest::new, (request, channel, task) -> channel.sendResponse(this.handleIncomingRemotePublishRequest((RemotePublishRequest)request)));
        transportService.registerRequestHandler(COMMIT_STATE_ACTION_NAME, "generic", false, false, ApplyCommitRequest::new, (request, channel, task) -> handleApplyCommit.apply((Object)request, this::updateLastSeen, this.transportCommitCallback(channel)));
    }

    private ActionListener<Void> transportCommitCallback(final TransportChannel channel) {
        return new ActionListener<Void>(){

            public void onResponse(Void aVoid) {
                try {
                    channel.sendResponse((TransportResponse)TransportResponse.Empty.INSTANCE);
                }
                catch (IOException e) {
                    logger.debug("failed to send response on commit", (Throwable)e);
                }
            }

            public void onFailure(Exception e) {
                try {
                    channel.sendResponse(e);
                }
                catch (IOException ie) {
                    e.addSuppressed(ie);
                    logger.debug("failed to send response on commit", (Throwable)e);
                }
            }
        };
    }

    public PublishClusterStateStats stats() {
        return new PublishClusterStateStats(this.fullClusterStateReceivedCount.get(), this.incompatibleClusterStateDiffReceivedCount.get(), this.compatibleClusterStateDiffReceivedCount.get());
    }

    private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
        try (StreamInput in = CompressedStreamUtils.decompressBytes(request, this.namedWriteableRegistry);){
            ClusterState incomingState;
            if (in.readBoolean()) {
                ClusterState incomingState2;
                try (StreamInput input = in;){
                    incomingState2 = ClusterState.readFrom(input, this.transportService.getLocalNode());
                }
                catch (Exception e) {
                    logger.warn("unexpected error while deserializing an incoming cluster state", (Throwable)e);
                    throw e;
                }
                this.fullClusterStateReceivedCount.incrementAndGet();
                logger.debug("received full cluster state version [{}] with size [{}]", (Object)incomingState2.version(), (Object)request.bytes().length());
                PublishWithJoinResponse response = this.acceptState(incomingState2, null);
                this.lastSeenClusterState.set(incomingState2);
                PublishWithJoinResponse publishWithJoinResponse = response;
                return publishWithJoinResponse;
            }
            ClusterState lastSeen = this.lastSeenClusterState.get();
            if (lastSeen == null) {
                logger.debug("received diff for but don't have any local cluster state - requesting full state");
                this.incompatibleClusterStateDiffReceivedCount.incrementAndGet();
                throw new IncompatibleClusterStateVersionException("have no local cluster state");
            }
            try {
                Diff<ClusterState> diff;
                try (StreamInput input = in;){
                    diff = ClusterState.readDiffFrom(input, lastSeen.nodes().getLocalNode());
                }
                incomingState = diff.apply(lastSeen);
            }
            catch (IncompatibleClusterStateVersionException e) {
                this.incompatibleClusterStateDiffReceivedCount.incrementAndGet();
                throw e;
            }
            catch (Exception e) {
                logger.warn("unexpected error while deserializing an incoming cluster state", (Throwable)e);
                throw e;
            }
            this.compatibleClusterStateDiffReceivedCount.incrementAndGet();
            logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", (Object)incomingState.version(), (Object)incomingState.stateUUID(), (Object)request.bytes().length());
            PublishWithJoinResponse response = this.acceptState(incomingState, null);
            this.lastSeenClusterState.compareAndSet(lastSeen, incomingState);
            PublishWithJoinResponse publishWithJoinResponse = response;
            return publishWithJoinResponse;
        }
    }

    PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
        boolean applyFullState = false;
        try {
            if (this.transportService.getLocalNode().equals(request.getSourceNode())) {
                return this.acceptRemoteStateOnLocalNode(request);
            }
            ClusterMetadataManifest manifest = this.remoteClusterStateService.getClusterMetadataManifestByFileName(request.getClusterUUID(), request.getManifestFile());
            if (manifest == null) {
                throw new IllegalStateException("Publication failed as manifest was not found for " + request);
            }
            ClusterState lastSeen = this.lastSeenClusterState.get();
            if (lastSeen == null) {
                logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
                applyFullState = true;
            } else if (manifest.getDiffManifest() == null) {
                logger.debug(() -> "There is no diff in the manifest");
                applyFullState = true;
            } else if (!manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID())) {
                logger.debug(() -> "Last cluster state not compatible with the diff");
                applyFullState = true;
            }
            if (applyFullState) {
                logger.debug(() -> new ParameterizedMessage("Downloading full cluster state for term {}, version {}, stateUUID {}", new Object[]{manifest.getClusterTerm(), manifest.getStateVersion(), manifest.getStateUUID()}));
                ClusterState clusterState = this.remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, this.transportService.getLocalNode().getId(), true);
                this.fullClusterStateReceivedCount.incrementAndGet();
                PublishWithJoinResponse response = this.acceptState(clusterState, manifest);
                this.lastSeenClusterState.set(clusterState);
                return response;
            }
            logger.debug(() -> new ParameterizedMessage("Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", new Object[]{manifest.getClusterTerm(), manifest.getStateVersion(), manifest.getDiffManifest().getFromStateUUID(), manifest.getStateUUID()}));
            ClusterState clusterState = this.remoteClusterStateService.getClusterStateUsingDiff(manifest, lastSeen, this.transportService.getLocalNode().getId());
            this.compatibleClusterStateDiffReceivedCount.incrementAndGet();
            PublishWithJoinResponse response = this.acceptState(clusterState, manifest);
            this.lastSeenClusterState.compareAndSet(lastSeen, clusterState);
            return response;
        }
        catch (Exception e) {
            if (applyFullState) {
                this.remoteClusterStateService.fullDownloadFailed();
            } else {
                this.remoteClusterStateService.diffDownloadFailed();
            }
            throw e;
        }
    }

    private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterMetadataManifest manifest) {
        if (this.transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
            PublishRequest publishRequest = this.currentPublishRequestToSelf.get();
            if (publishRequest == null || !publishRequest.getAcceptedState().stateUUID().equals(incomingState.stateUUID())) {
                throw new IllegalStateException("publication to self failed for " + publishRequest);
            }
            return this.handlePublishRequest.apply(publishRequest);
        }
        if (manifest != null) {
            return this.handlePublishRequest.apply(new RemoteStatePublishRequest(incomingState, manifest));
        }
        return this.handlePublishRequest.apply(new PublishRequest(incomingState));
    }

    private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishRequest remotePublishRequest) {
        PublishRequest publishRequest = this.currentPublishRequestToSelf.get();
        if (publishRequest == null || publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term || publishRequest.getAcceptedState().version() != remotePublishRequest.version) {
            logger.debug(() -> new ParameterizedMessage("Publication failure for current publish request : {} and remote publish request: {}", (Object)publishRequest, (Object)remotePublishRequest));
            throw new IllegalStateException("publication to self failed for " + remotePublishRequest);
        }
        PublishWithJoinResponse publishWithJoinResponse = this.handlePublishRequest.apply(publishRequest);
        this.lastSeenClusterState.set(publishRequest.getAcceptedState());
        return publishWithJoinResponse;
    }

    public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, PersistedStateRegistry persistedStateRegistry) {
        if (isRemotePublicationEnabled) {
            if (!this.allNodesRemotePublicationEnabled.get() && this.validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes())) {
                this.allNodesRemotePublicationEnabled.set(true);
            }
            if (this.allNodesRemotePublicationEnabled.get()) {
                return new RemotePublicationContext(clusterChangedEvent, persistedStateRegistry);
            }
        }
        PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, persistedStateRegistry);
        publicationContext.buildDiffAndSerializeStates();
        return publicationContext;
    }

    private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) {
        assert (ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= 0);
        for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
            if (node.isRemoteStatePublicationEnabled()) continue;
            return false;
        }
        return true;
    }

    private void updateLastSeen(ClusterState clusterState) {
        this.lastSeenClusterState.set(clusterState);
    }

    void setCurrentPublishRequestToSelf(PublishRequest publishRequest) {
        this.currentPublishRequestToSelf.set(publishRequest);
    }

    void setLastSeenClusterState(ClusterState clusterState) {
        this.lastSeenClusterState.set(clusterState);
    }

    private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
        BytesReference serializedState = CompressedStreamUtils.createCompressedStream(nodeVersion, (CheckedConsumer<StreamOutput, IOException>)((CheckedConsumer)stream -> {
            stream.writeBoolean(true);
            clusterState.writeTo((StreamOutput)stream);
        }));
        logger.trace("serialized full cluster state version [{}] for node version [{}] with size [{}]", (Object)clusterState.version(), (Object)nodeVersion, (Object)serializedState.length());
        return serializedState;
    }

    private static BytesReference serializeDiffClusterState(Diff<ClusterState> diff, Version nodeVersion) throws IOException {
        return CompressedStreamUtils.createCompressedStream(nodeVersion, (CheckedConsumer<StreamOutput, IOException>)((CheckedConsumer)stream -> {
            stream.writeBoolean(false);
            diff.writeTo((StreamOutput)stream);
        }));
    }

    public class RemotePublicationContext
    extends PublicationContext {
        RemotePublicationContext(ClusterChangedEvent clusterChangedEvent, PersistedStateRegistry persistedStateRegistry) {
            super(clusterChangedEvent, persistedStateRegistry);
        }

        @Override
        public void sendClusterState(DiscoveryNode destination, final ActionListener<PublishWithJoinResponse> listener) {
            try {
                logger.debug("sending remote cluster state to node: {}", (Object)destination.getName());
                String manifestFileName = ((GatewayMetaState.RemotePersistedState)this.persistedStateRegistry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE)).getLastUploadedManifestFile();
                RemotePublishRequest remotePublishRequest = new RemotePublishRequest(this.discoveryNodes.getLocalNode(), this.newState.term(), this.newState.getVersion(), this.newState.getClusterName().value(), this.newState.metadata().clusterUUID(), manifestFileName);
                final Consumer<TransportException> transportExceptionHandler = exp -> {
                    logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", (Object)destination), (Throwable)((Object)exp));
                    listener.onFailure((Exception)((Object)exp));
                };
                TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<PublishWithJoinResponse>(){

                    public PublishWithJoinResponse read(StreamInput in) throws IOException {
                        return new PublishWithJoinResponse(in);
                    }

                    @Override
                    public void handleResponse(PublishWithJoinResponse response) {
                        listener.onResponse((Object)response);
                    }

                    @Override
                    public void handleException(TransportException exp) {
                        transportExceptionHandler.accept(exp);
                    }

                    @Override
                    public String executor() {
                        return "generic";
                    }
                };
                PublicationTransportHandler.this.transportService.sendRequest(destination, PublicationTransportHandler.PUBLISH_REMOTE_STATE_ACTION_NAME, (TransportRequest)remotePublishRequest, PublicationTransportHandler.this.stateRequestOptions, responseHandler);
            }
            catch (Exception e) {
                logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", (Object)destination), (Throwable)e);
                listener.onFailure(e);
            }
        }
    }

    public class PublicationContext {
        protected final DiscoveryNodes discoveryNodes;
        protected final ClusterState newState;
        protected final ClusterState previousState;
        protected final boolean sendFullVersion;
        private final Map<Version, BytesReference> serializedStates = new HashMap<Version, BytesReference>();
        private final Map<Version, BytesReference> serializedDiffs = new HashMap<Version, BytesReference>();
        protected final PersistedStateRegistry persistedStateRegistry;

        PublicationContext(ClusterChangedEvent clusterChangedEvent, PersistedStateRegistry persistedStateRegistry) {
            this.discoveryNodes = clusterChangedEvent.state().nodes();
            this.newState = clusterChangedEvent.state();
            this.previousState = clusterChangedEvent.previousState();
            this.sendFullVersion = this.previousState.getBlocks().disableStatePersistence();
            this.persistedStateRegistry = persistedStateRegistry;
        }

        void buildDiffAndSerializeStates() {
            Diff<ClusterState> diff = null;
            for (DiscoveryNode node : this.discoveryNodes) {
                try {
                    if (this.sendFullVersion || !this.previousState.nodes().nodeExists(node)) {
                        if (this.serializedStates.containsKey(node.getVersion())) continue;
                        this.serializedStates.put(node.getVersion(), PublicationTransportHandler.serializeFullClusterState(this.newState, node.getVersion()));
                        continue;
                    }
                    if (diff == null) {
                        diff = this.newState.diff(this.previousState);
                    }
                    if (this.serializedDiffs.containsKey(node.getVersion())) continue;
                    BytesReference serializedDiff = PublicationTransportHandler.serializeDiffClusterState(diff, node.getVersion());
                    this.serializedDiffs.put(node.getVersion(), serializedDiff);
                    logger.trace("serialized cluster state diff for version [{}] in for node version [{}] with size [{}]", (Object)this.newState.version(), (Object)node.getVersion(), (Object)serializedDiff.length());
                }
                catch (IOException e) {
                    throw new OpenSearchException("failed to serialize cluster state for publishing to node {}", (Throwable)e, new Object[]{node});
                }
            }
        }

        public void sendPublishRequest(DiscoveryNode destination, final PublishRequest publishRequest, final ActionListener<PublishWithJoinResponse> listener) {
            ActionListener<PublishWithJoinResponse> responseActionListener;
            assert (publishRequest.getAcceptedState() == this.newState) : "state got switched on us";
            assert (PublicationTransportHandler.this.transportService.getThreadPool().getThreadContext().isSystemContext());
            if (destination.equals(this.discoveryNodes.getLocalNode())) {
                PublishRequest previousRequest = PublicationTransportHandler.this.currentPublishRequestToSelf.getAndSet(publishRequest);
                assert (previousRequest == null || previousRequest.getAcceptedState().term() < publishRequest.getAcceptedState().term());
                responseActionListener = new ActionListener<PublishWithJoinResponse>(){

                    public void onResponse(PublishWithJoinResponse publishWithJoinResponse) {
                        PublicationTransportHandler.this.currentPublishRequestToSelf.compareAndSet(publishRequest, null);
                        listener.onResponse((Object)publishWithJoinResponse);
                    }

                    public void onFailure(Exception e) {
                        PublicationTransportHandler.this.currentPublishRequestToSelf.compareAndSet(publishRequest, null);
                        listener.onFailure(e);
                    }
                };
            } else {
                responseActionListener = listener;
            }
            this.sendClusterState(destination, responseActionListener);
        }

        public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest, final ActionListener<TransportResponse.Empty> listener) {
            assert (PublicationTransportHandler.this.transportService.getThreadPool().getThreadContext().isSystemContext());
            PublicationTransportHandler.this.transportService.sendRequest(destination, PublicationTransportHandler.COMMIT_STATE_ACTION_NAME, (TransportRequest)applyCommitRequest, PublicationTransportHandler.this.stateRequestOptions, new TransportResponseHandler<TransportResponse.Empty>(){

                public TransportResponse.Empty read(StreamInput in) {
                    return TransportResponse.Empty.INSTANCE;
                }

                @Override
                public void handleResponse(TransportResponse.Empty response) {
                    listener.onResponse((Object)response);
                }

                @Override
                public void handleException(TransportException exp) {
                    listener.onFailure((Exception)((Object)exp));
                }

                @Override
                public String executor() {
                    return "generic";
                }
            });
        }

        public void sendClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
            logger.trace("sending cluster state over transport to node: {}", (Object)destination.getName());
            if (this.sendFullVersion || !this.previousState.nodes().nodeExists(destination)) {
                logger.trace("sending full cluster state version [{}] to [{}]", (Object)this.newState.version(), (Object)destination);
                this.sendFullClusterState(destination, listener);
            } else {
                logger.trace("sending cluster state diff for version [{}] to [{}]", (Object)this.newState.version(), (Object)destination);
                this.sendClusterStateDiff(destination, listener);
            }
        }

        private void sendFullClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
            BytesReference bytes = this.serializedStates.get(destination.getVersion());
            if (bytes == null) {
                try {
                    bytes = PublicationTransportHandler.serializeFullClusterState(this.newState, destination.getVersion());
                    this.serializedStates.put(destination.getVersion(), bytes);
                }
                catch (Exception e) {
                    logger.warn(() -> new ParameterizedMessage("failed to serialize cluster state before publishing it to node {}", (Object)destination), (Throwable)e);
                    listener.onFailure(e);
                    return;
                }
            }
            this.sendClusterState(destination, bytes, false, listener);
        }

        private void sendClusterStateDiff(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
            BytesReference bytes = this.serializedDiffs.get(destination.getVersion());
            assert (bytes != null) : "failed to find serialized diff for node " + destination + " of version [" + destination.getVersion() + "]";
            this.sendClusterState(destination, bytes, true, listener);
        }

        private void sendClusterState(DiscoveryNode destination, BytesReference bytes, boolean retryWithFullClusterStateOnFailure, final ActionListener<PublishWithJoinResponse> listener) {
            try {
                BytesTransportRequest request = new BytesTransportRequest(bytes, destination.getVersion());
                final Consumer<TransportException> transportExceptionHandler = exp -> {
                    if (retryWithFullClusterStateOnFailure && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
                        logger.debug("resending full cluster state to node {} reason {}", (Object)destination, (Object)exp.getDetailedMessage());
                        this.sendFullClusterState(destination, listener);
                    } else {
                        logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", (Object)destination), (Throwable)((Object)exp));
                        listener.onFailure((Exception)((Object)exp));
                    }
                };
                TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<PublishWithJoinResponse>(){

                    public PublishWithJoinResponse read(StreamInput in) throws IOException {
                        return new PublishWithJoinResponse(in);
                    }

                    @Override
                    public void handleResponse(PublishWithJoinResponse response) {
                        listener.onResponse((Object)response);
                    }

                    @Override
                    public void handleException(TransportException exp) {
                        transportExceptionHandler.accept(exp);
                    }

                    @Override
                    public String executor() {
                        return "generic";
                    }
                };
                PublicationTransportHandler.this.transportService.sendRequest(destination, PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME, (TransportRequest)request, PublicationTransportHandler.this.stateRequestOptions, responseHandler);
            }
            catch (Exception e) {
                logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", (Object)destination), (Throwable)e);
                listener.onFailure(e);
            }
        }
    }
}

