package org.eclipse.hono.client.impl;

import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.sasl.MechanismMismatchException;
import io.vertx.proton.sasl.SaslSystemException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.security.sasl.AuthenticationException;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.DisconnectListener;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.ReconnectListener;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.connection.ConnectionFactory;
import org.eclipse.hono.util.HonoProtonHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.0-M5.jar:org/eclipse/hono/client/impl/HonoConnectionImpl.class */
public class HonoConnectionImpl implements HonoConnection {
    protected final Logger log;
    protected final ClientConfigProperties clientConfigProperties;
    protected final Vertx vertx;
    protected ProtonConnection connection;
    protected volatile Context context;
    private final List<DisconnectListener<HonoConnection>> disconnectListeners;
    private final List<ReconnectListener<HonoConnection>> reconnectListeners;
    private final AtomicBoolean connecting;
    private final AtomicBoolean shuttingDown;
    private final AtomicBoolean disconnecting;
    private final ConnectionFactory connectionFactory;
    private final Object connectionLock;
    private ProtonClientOptions clientOptions;
    private AtomicInteger connectAttempts;
    private List<Symbol> offeredCapabilities;
    private Tracer tracer;

    public HonoConnectionImpl(Vertx vertx, ClientConfigProperties clientConfigProperties) {
        this(vertx, null, clientConfigProperties);
    }

    public HonoConnectionImpl(Vertx vertx, ConnectionFactory connectionFactory, ClientConfigProperties clientConfigProperties) {
        this.log = LoggerFactory.getLogger((Class<?>) HonoConnectionImpl.class);
        this.disconnectListeners = new ArrayList();
        this.reconnectListeners = new ArrayList();
        this.connecting = new AtomicBoolean(false);
        this.shuttingDown = new AtomicBoolean(false);
        this.disconnecting = new AtomicBoolean(false);
        this.connectionLock = new Object();
        this.offeredCapabilities = Collections.emptyList();
        this.tracer = NoopTracerFactory.create();
        Objects.requireNonNull(clientConfigProperties);
        if (vertx != null) {
            this.vertx = vertx;
        } else {
            this.vertx = Vertx.vertx();
        }
        if (connectionFactory != null) {
            this.connectionFactory = connectionFactory;
        } else {
            this.connectionFactory = ConnectionFactory.newConnectionFactory(this.vertx, clientConfigProperties);
        }
        this.clientConfigProperties = clientConfigProperties;
        this.connectAttempts = new AtomicInteger(0);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final Vertx getVertx() {
        return this.vertx;
    }

    @Autowired(required = false)
    public final void setTracer(Tracer tracer) {
        this.log.info("using OpenTracing implementation [{}]", tracer.getClass().getName());
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final Tracer getTracer() {
        return this.tracer;
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final ClientConfigProperties getConfig() {
        return this.clientConfigProperties;
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final void addDisconnectListener(DisconnectListener<HonoConnection> disconnectListener) {
        this.disconnectListeners.add(disconnectListener);
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final void addReconnectListener(ReconnectListener<HonoConnection> reconnectListener) {
        this.reconnectListeners.add(reconnectListener);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final <T> Future<T> executeOrRunOnContext(Handler<Future<T>> handler) {
        return this.context == null ? Future.failedFuture(new ServerErrorException(503, "not connected")) : HonoProtonHelper.executeOrRunOnContext(this.context, handler);
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final Future<Void> isConnected() {
        return executeOrRunOnContext(future -> {
            checkConnected(future);
        });
    }

    protected final Future<Void> checkConnected() {
        Future<Void> future = Future.future();
        checkConnected(future);
        return future;
    }

    private void checkConnected(Handler<AsyncResult<Void>> handler) {
        if (isConnectedInternal()) {
            handler.handle(Future.succeededFuture());
        } else {
            handler.handle(Future.failedFuture(new ServerErrorException(503, "not connected")));
        }
    }

    protected boolean isConnectedInternal() {
        return (this.connection == null || this.connection.isDisconnected()) ? false : true;
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final boolean isShutdown() {
        return this.shuttingDown.get();
    }

    void setConnection(ProtonConnection protonConnection) {
        synchronized (this.connectionLock) {
            this.connection = protonConnection;
            if (protonConnection == null) {
                this.offeredCapabilities = Collections.emptyList();
                this.context = null;
            } else {
                this.offeredCapabilities = (List) Optional.ofNullable(protonConnection.getRemoteOfferedCapabilities()).map(symbolArr -> {
                    return Collections.unmodifiableList(Arrays.asList(symbolArr));
                }).orElse(Collections.emptyList());
            }
        }
    }

    protected final ProtonConnection getConnection() {
        ProtonConnection protonConnection;
        synchronized (this.connectionLock) {
            protonConnection = this.connection;
        }
        return protonConnection;
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final boolean supportsCapability(Symbol symbol) {
        boolean contains;
        if (symbol == null) {
            return false;
        }
        synchronized (this.connectionLock) {
            contains = this.offeredCapabilities.contains(symbol);
        }
        return contains;
    }

    @Override // org.eclipse.hono.client.HonoConnection, org.eclipse.hono.client.ConnectionLifecycle
    public final Future<HonoConnection> connect() {
        return connect(null);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final Future<HonoConnection> connect(ProtonClientOptions protonClientOptions) {
        Future<HonoConnection> future = Future.future();
        if (this.shuttingDown.get()) {
            future.fail(new ClientErrorException(409, "client is already shut down"));
        } else {
            connect(protonClientOptions, future, null);
        }
        return future;
    }

    private void connect(ProtonClientOptions protonClientOptions, Handler<AsyncResult<HonoConnection>> handler, Handler<ProtonConnection> handler2) {
        this.context = this.vertx.getOrCreateContext();
        this.log.trace("running on vert.x context [event-loop context: {}]", Boolean.valueOf(this.context.isEventLoopContext()));
        executeOrRunOnContext(future -> {
            if (isConnectedInternal()) {
                this.log.debug("already connected to server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                handler.handle(Future.succeededFuture(this));
            } else if (!this.connecting.compareAndSet(false, true)) {
                this.log.debug("already trying to connect to server ...");
                handler.handle(Future.failedFuture(new ClientErrorException(409, "already connecting to server")));
            } else {
                this.log.debug("starting attempt [#{}] to connect to server [{}:{}]", Integer.valueOf(this.connectAttempts.get() + 1), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                this.clientOptions = protonClientOptions;
                this.connectionFactory.connect(this.clientOptions, asyncResult -> {
                    onRemoteClose(asyncResult, handler2);
                }, protonConnection -> {
                    onRemoteDisconnect(protonConnection, handler2);
                }, asyncResult2 -> {
                    this.connecting.compareAndSet(true, false);
                    if (asyncResult2.failed()) {
                        if (isTerminalConnectionError(asyncResult2.cause())) {
                            failConnectionAttempt(asyncResult2.cause(), handler);
                            return;
                        } else {
                            reconnect(asyncResult2.cause(), handler, handler2);
                            return;
                        }
                    }
                    ProtonConnection protonConnection2 = (ProtonConnection) asyncResult2.result();
                    if (!this.shuttingDown.get()) {
                        setConnection(protonConnection2);
                        handler.handle(Future.succeededFuture(this));
                        return;
                    }
                    protonConnection2.closeHandler(null);
                    protonConnection2.disconnectHandler(null);
                    protonConnection2.close();
                    this.connectAttempts = new AtomicInteger(0);
                    handler.handle(Future.failedFuture(new ClientErrorException(409, "client is already shut down")));
                });
            }
        });
    }

    private void onRemoteClose(AsyncResult<ProtonConnection> asyncResult, Handler<ProtonConnection> handler) {
        if (asyncResult.failed()) {
            this.log.info("remote server [{}:{}] closed connection with error condition: {}", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), asyncResult.cause().getMessage());
        } else {
            this.log.info("remote server [{}:{}] closed connection", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
        }
        this.connection.disconnectHandler(null);
        this.connection.close();
        handleConnectionLoss(handler);
    }

    private void onRemoteDisconnect(ProtonConnection protonConnection, Handler<ProtonConnection> handler) {
        if (protonConnection != this.connection) {
            this.log.warn("cannot handle failure of unknown connection");
        } else {
            this.log.debug("lost connection to server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            handleConnectionLoss(handler);
        }
    }

    private void handleConnectionLoss(Handler<ProtonConnection> handler) {
        if (isConnectedInternal()) {
            this.connection.disconnect();
        }
        ProtonConnection protonConnection = this.connection;
        clearState();
        if (handler != null) {
            handler.handle(protonConnection);
        } else {
            reconnect(this::notifyReconnectHandlers, null);
        }
    }

    private void notifyReconnectHandlers(AsyncResult<HonoConnection> asyncResult) {
        if (asyncResult.succeeded()) {
            Iterator<ReconnectListener<HonoConnection>> it = this.reconnectListeners.iterator();
            while (it.hasNext()) {
                it.next().onReconnect(this);
            }
        }
    }

    protected void clearState() {
        setConnection(null);
        notifyDisconnectHandlers();
        this.connectAttempts = new AtomicInteger(0);
    }

    private void notifyDisconnectHandlers() {
        Iterator<DisconnectListener<HonoConnection>> it = this.disconnectListeners.iterator();
        while (it.hasNext()) {
            it.next().onDisconnect(this);
        }
    }

    private void reconnect(Handler<AsyncResult<HonoConnection>> handler, Handler<ProtonConnection> handler2) {
        reconnect(null, handler, handler2);
    }

    private void reconnect(Throwable th, Handler<AsyncResult<HonoConnection>> handler, Handler<ProtonConnection> handler2) {
        if (this.shuttingDown.get()) {
            this.log.info("client is shutting down, giving up attempt to connect");
            handler.handle(Future.failedFuture(new IllegalStateException("client is shut down")));
        } else if (this.clientConfigProperties.getReconnectAttempts() - this.connectAttempts.getAndIncrement() == 0) {
            this.log.info("max number of attempts [{}] to re-connect to peer [{}:{}] have been made, giving up", Integer.valueOf(this.clientConfigProperties.getReconnectAttempts()), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            clearState();
            failConnectionAttempt(th, handler);
        } else {
            if (th != null) {
                this.log.debug("connection attempt failed", th);
            }
            long longValue = ((Long) Optional.ofNullable(this.clientOptions).map(protonClientOptions -> {
                return Long.valueOf(protonClientOptions.getReconnectInterval());
            }).orElse(500L)).longValue();
            this.log.trace("scheduling new connection attempt in {}ms ...", Long.valueOf(longValue));
            this.vertx.setTimer(longValue, l -> {
                connect(this.clientOptions, handler, handler2);
            });
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean isTerminalConnectionError(Throwable th) {
        return (th instanceof AuthenticationException) || (th instanceof MechanismMismatchException) || ((th instanceof SaslSystemException) && ((SaslSystemException) th).isPermanent());
    }

    private void failConnectionAttempt(Throwable th, Handler<AsyncResult<HonoConnection>> handler) {
        this.log.info("stopping connection attempt to server [host: {}, port: {}] due to terminal error", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), th);
        if (th == null) {
            handler.handle(Future.failedFuture(new ServerErrorException(503, "failed to connect")));
            return;
        }
        if (th instanceof AuthenticationException) {
            handler.handle(Future.failedFuture(new ClientErrorException(401, "failed to authenticate with server")));
        } else if (th instanceof MechanismMismatchException) {
            handler.handle(Future.failedFuture(new ClientErrorException(401, "no suitable SASL mechanism found for authentication with server")));
        } else {
            handler.handle(Future.failedFuture(new ServerErrorException(503, "failed to connect", th)));
        }
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public void closeAndFree(ProtonLink<?> protonLink, Handler<Void> handler) {
        HonoProtonHelper.closeAndFree(this.context, protonLink, handler);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public void closeAndFree(ProtonLink<?> protonLink, long j, Handler<Void> handler) {
        HonoProtonHelper.closeAndFree(this.context, protonLink, j, handler);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final Future<ProtonSender> createSender(String str, ProtonQoS protonQoS, Handler<String> handler) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(protonQoS);
        return executeOrRunOnContext(future -> {
            checkConnected().compose(r12 -> {
                Future future = Future.future();
                ProtonSender createSender = this.connection.createSender(str);
                createSender.setQoS(protonQoS);
                createSender.setAutoSettle(true);
                createSender.openHandler(asyncResult -> {
                    if (asyncResult.failed()) {
                        ErrorCondition remoteCondition = createSender.getRemoteCondition();
                        if (remoteCondition == null) {
                            this.log.debug("opening sender [{}] failed", str, asyncResult.cause());
                            future.tryFail(new ClientErrorException(404, "cannot open sender", asyncResult.cause()));
                            return;
                        } else {
                            this.log.debug("opening sender [{}] failed: {} - {}", str, remoteCondition.getCondition(), remoteCondition.getDescription());
                            future.tryFail(StatusCodeMapper.from(remoteCondition));
                            return;
                        }
                    }
                    if (!HonoProtonHelper.isLinkEstablished(createSender)) {
                        this.log.debug("peer did not create terminus for target [{}] and will detach the link", str);
                        future.tryFail(new ServerErrorException(503));
                        return;
                    }
                    this.log.debug("sender open [target: {}, sendQueueFull: {}]", str, Boolean.valueOf(createSender.sendQueueFull()));
                    if (createSender.getCredit() > 0) {
                        future.tryComplete(createSender);
                    } else {
                        long timer = this.vertx.setTimer(this.clientConfigProperties.getFlowLatency(), l -> {
                            this.log.debug("sender [target: {}] has {} credits after grace period of {}ms", str, Integer.valueOf(createSender.getCredit()), Long.valueOf(this.clientConfigProperties.getFlowLatency()));
                            createSender.sendQueueDrainHandler(null);
                            future.tryComplete(createSender);
                        });
                        createSender.sendQueueDrainHandler(protonSender -> {
                            this.log.debug("sender [target: {}] has received {} initial credits", str, Integer.valueOf(protonSender.getCredit()));
                            if (this.vertx.cancelTimer(timer)) {
                                future.tryComplete(protonSender);
                                protonSender.sendQueueDrainHandler(null);
                            }
                        });
                    }
                });
                HonoProtonHelper.setDetachHandler(createSender, asyncResult2 -> {
                    onRemoteDetach(createSender, this.connection.getRemoteContainer(), false, handler);
                });
                HonoProtonHelper.setCloseHandler(createSender, asyncResult3 -> {
                    onRemoteDetach(createSender, this.connection.getRemoteContainer(), true, handler);
                });
                createSender.open();
                this.vertx.setTimer(this.clientConfigProperties.getLinkEstablishmentTimeout(), l -> {
                    onTimeOut(createSender, this.clientConfigProperties, future);
                });
                return future;
            }).setHandler2(future);
        });
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public Future<ProtonReceiver> createReceiver(String str, ProtonQoS protonQoS, ProtonMessageHandler protonMessageHandler, Handler<String> handler) {
        return createReceiver(str, protonQoS, protonMessageHandler, this.clientConfigProperties.getInitialCredits(), handler);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public Future<ProtonReceiver> createReceiver(String str, ProtonQoS protonQoS, ProtonMessageHandler protonMessageHandler, int i, Handler<String> handler) {
        return createReceiver(str, protonQoS, protonMessageHandler, i, true, handler);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public Future<ProtonReceiver> createReceiver(String str, ProtonQoS protonQoS, ProtonMessageHandler protonMessageHandler, int i, boolean z, Handler<String> handler) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(protonQoS);
        Objects.requireNonNull(protonMessageHandler);
        if (i < 0) {
            throw new IllegalArgumentException("pre-fetch size must be >= 0");
        }
        return executeOrRunOnContext(future -> {
            checkConnected().compose(r14 -> {
                Future future = Future.future();
                ProtonReceiver createReceiver = this.connection.createReceiver(str);
                createReceiver.setAutoAccept(z);
                createReceiver.setQoS(protonQoS);
                createReceiver.setPrefetch(i);
                createReceiver.handler((protonDelivery, message) -> {
                    try {
                        protonMessageHandler.handle(protonDelivery, message);
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("handling message [remotely settled: {}, queued messages: {}, remaining credit: {}]", Boolean.valueOf(protonDelivery.remotelySettled()), Integer.valueOf(createReceiver.getQueued()), Integer.valueOf(createReceiver.getCredit() - createReceiver.getQueued()));
                        }
                    } catch (Exception e) {
                        this.log.warn("error handling message", (Throwable) e);
                        ProtonHelper.released(protonDelivery, true);
                    }
                });
                createReceiver.openHandler(asyncResult -> {
                    if (!asyncResult.failed()) {
                        if (HonoProtonHelper.isLinkEstablished(createReceiver)) {
                            this.log.debug("receiver open [source: {}]", str);
                            future.tryComplete((ProtonReceiver) asyncResult.result());
                            return;
                        } else {
                            this.log.debug("peer did not create terminus for source [{}] and will detach the link", str);
                            future.tryFail(new ServerErrorException(503));
                            return;
                        }
                    }
                    ErrorCondition remoteCondition = createReceiver.getRemoteCondition();
                    if (remoteCondition == null) {
                        this.log.debug("opening receiver [{}] failed", str, asyncResult.cause());
                        future.tryFail(new ClientErrorException(404, "cannot open receiver", asyncResult.cause()));
                    } else {
                        this.log.debug("opening receiver [{}] failed: {} - {}", str, remoteCondition.getCondition(), remoteCondition.getDescription());
                        future.tryFail(StatusCodeMapper.from(remoteCondition));
                    }
                });
                HonoProtonHelper.setDetachHandler(createReceiver, asyncResult2 -> {
                    onRemoteDetach(createReceiver, this.connection.getRemoteContainer(), false, handler);
                });
                HonoProtonHelper.setCloseHandler(createReceiver, asyncResult3 -> {
                    onRemoteDetach(createReceiver, this.connection.getRemoteContainer(), true, handler);
                });
                createReceiver.open();
                this.vertx.setTimer(this.clientConfigProperties.getLinkEstablishmentTimeout(), l -> {
                    onTimeOut(createReceiver, this.clientConfigProperties, future);
                });
                return future;
            }).setHandler2(future);
        });
    }

    private void onTimeOut(ProtonLink<?> protonLink, ClientConfigProperties clientConfigProperties, Future<?> future) {
        if (!protonLink.isOpen() || HonoProtonHelper.isLinkEstablished(protonLink)) {
            return;
        }
        this.log.info("link establishment [peer: {}] timed out after {}ms", clientConfigProperties.getHost(), Long.valueOf(clientConfigProperties.getLinkEstablishmentTimeout()));
        protonLink.close();
        protonLink.free();
        future.tryFail(new ServerErrorException(503));
    }

    private void onRemoteDetach(ProtonLink<?> protonLink, String str, boolean z, Handler<String> handler) {
        ErrorCondition remoteCondition = protonLink.getRemoteCondition();
        String str2 = protonLink instanceof ProtonSender ? "sender" : "receiver";
        String address = protonLink instanceof ProtonSender ? protonLink.getTarget().getAddress() : protonLink.getSource().getAddress();
        if (remoteCondition == null) {
            this.log.debug("{} [{}] detached (with closed={}) by peer [{}]", str2, address, Boolean.valueOf(z), str);
        } else {
            this.log.debug("{} [{}] detached (with closed={}) by peer [{}]: {} - {}", str2, address, Boolean.valueOf(z), str, remoteCondition.getCondition(), remoteCondition.getDescription());
        }
        protonLink.close();
        if (!HonoProtonHelper.isLinkEstablished(protonLink) || handler == null) {
            return;
        }
        handler.handle(address);
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final void shutdown() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        shutdown(asyncResult -> {
            if (asyncResult.succeeded()) {
                countDownLatch.countDown();
            } else {
                this.log.error("could not close connection to server", asyncResult.cause());
            }
        });
        try {
            if (!countDownLatch.await(5L, TimeUnit.SECONDS)) {
                this.log.error("shutdown of client timed out after 5 seconds");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.eclipse.hono.client.HonoConnection
    public final void shutdown(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        if (this.shuttingDown.compareAndSet(Boolean.FALSE.booleanValue(), Boolean.TRUE.booleanValue())) {
            closeConnection(handler);
        } else {
            handler.handle(Future.failedFuture(new ClientErrorException(409, "already shutting down")));
        }
    }

    @Override // org.eclipse.hono.client.HonoConnection, org.eclipse.hono.client.ConnectionLifecycle
    public final void disconnect() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        disconnect(asyncResult -> {
            if (!asyncResult.succeeded()) {
                this.log.error("could not disconnect from the server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            } else {
                this.log.info("successfully disconnected from the server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                countDownLatch.countDown();
            }
        });
        try {
            if (!countDownLatch.await(5L, TimeUnit.SECONDS)) {
                this.log.error("Disconnecting from the server [{}:{}] timed out after 5 seconds", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final void disconnect(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        if (this.disconnecting.compareAndSet(Boolean.FALSE.booleanValue(), Boolean.TRUE.booleanValue())) {
            closeConnection(handler);
        } else {
            handler.handle(Future.failedFuture(new ClientErrorException(409, "already disconnecting")));
        }
    }

    private void closeConnection(Handler<AsyncResult<Void>> handler) {
        Handler handler2 = asyncResult -> {
            this.disconnecting.compareAndSet(Boolean.TRUE.booleanValue(), Boolean.FALSE.booleanValue());
            if (asyncResult.succeeded()) {
                handler.handle(Future.succeededFuture());
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        };
        synchronized (this.connectionLock) {
            if (isConnectedInternal()) {
                executeOrRunOnContext(future -> {
                    ProtonConnection protonConnection = this.connection;
                    protonConnection.disconnectHandler(null);
                    Handler handler3 = asyncResult2 -> {
                        if (asyncResult2.succeeded()) {
                            this.log.info("closed connection to container [{}] at [{}:{}]", protonConnection.getRemoteContainer(), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                        } else {
                            this.log.info("closed connection to container [{}] at [{}:{}]", protonConnection.getRemoteContainer(), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), asyncResult2.cause());
                        }
                        clearState();
                        future.complete();
                    };
                    long timer = this.vertx.setTimer(this.clientConfigProperties.getConnectTimeout(), l -> {
                        this.log.info("did not receive remote peer's close frame after {}ms", Integer.valueOf(this.clientConfigProperties.getConnectTimeout()));
                        handler3.handle(Future.succeededFuture());
                    });
                    protonConnection.closeHandler(asyncResult3 -> {
                        if (this.vertx.cancelTimer(timer)) {
                            handler3.handle(asyncResult3);
                        }
                    });
                    this.log.info("closing connection to container [{}] at [{}:{}] ...", protonConnection.getRemoteContainer(), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                    protonConnection.close();
                }).setHandler2(handler2);
            } else {
                this.log.info("connection to server [{}:{}] already closed", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                handler2.handle(Future.succeededFuture());
            }
        }
    }
}
