package org.eclipse.hono.client.impl;

import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.sasl.SaslSystemException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.security.sasl.AuthenticationException;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.cache.CacheProvider;
import org.eclipse.hono.client.AsyncCommandClient;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.CommandClient;
import org.eclipse.hono.client.CredentialsClient;
import org.eclipse.hono.client.DisconnectListener;
import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.ReconnectListener;
import org.eclipse.hono.client.RegistrationClient;
import org.eclipse.hono.client.RequestResponseClient;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.TenantClient;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.connection.ConnectionFactory;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.0-M1.jar:org/eclipse/hono/client/impl/HonoClientImpl.class */
public class HonoClientImpl implements HonoClient {
    protected final Logger log;
    protected final ClientConfigProperties clientConfigProperties;
    protected final Map<String, MessageSender> activeSenders;
    protected final Vertx vertx;
    protected ProtonConnection connection;
    protected volatile Context context;
    private final Map<String, RequestResponseClient> activeRequestResponseClients;
    private final Map<String, Boolean> creationLocks;
    private final List<Handler<Void>> creationRequests;
    private final List<DisconnectListener> disconnectListeners;
    private final List<ReconnectListener> reconnectListeners;
    private final AtomicBoolean connecting;
    private final AtomicBoolean shuttingDown;
    private final AtomicBoolean disconnecting;
    private final ConnectionFactory connectionFactory;
    private final Object connectionLock;
    private ProtonClientOptions clientOptions;
    private CacheProvider cacheProvider;
    private AtomicInteger connectAttempts;
    private List<Symbol> offeredCapabilities;
    private Tracer tracer;

    public HonoClientImpl(Vertx vertx, ClientConfigProperties clientConfigProperties) {
        this(vertx, null, clientConfigProperties);
    }

    public HonoClientImpl(Vertx vertx, ConnectionFactory connectionFactory, ClientConfigProperties clientConfigProperties) {
        this.log = LoggerFactory.getLogger((Class<?>) HonoClientImpl.class);
        this.activeSenders = new HashMap();
        this.activeRequestResponseClients = new HashMap();
        this.creationLocks = new HashMap();
        this.creationRequests = new ArrayList();
        this.disconnectListeners = new ArrayList();
        this.reconnectListeners = new ArrayList();
        this.connecting = new AtomicBoolean(false);
        this.shuttingDown = new AtomicBoolean(false);
        this.disconnecting = new AtomicBoolean(false);
        this.connectionLock = new Object();
        this.offeredCapabilities = Collections.emptyList();
        this.tracer = NoopTracerFactory.create();
        Objects.requireNonNull(clientConfigProperties);
        if (vertx != null) {
            this.vertx = vertx;
        } else {
            this.vertx = Vertx.vertx();
        }
        if (connectionFactory != null) {
            this.connectionFactory = connectionFactory;
        } else {
            this.connectionFactory = ConnectionFactory.newConnectionFactory(this.vertx, clientConfigProperties);
        }
        this.clientConfigProperties = clientConfigProperties;
        this.connectAttempts = new AtomicInteger(0);
    }

    public final void setCacheProvider(CacheProvider cacheProvider) {
        this.cacheProvider = (CacheProvider) Objects.requireNonNull(cacheProvider);
    }

    @Autowired(required = false)
    public final void setTracer(Tracer tracer) {
        this.log.info("using OpenTracing implementation [{}]", tracer.getClass().getName());
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
    }

    public final Tracer getTracer() {
        return this.tracer;
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final void addDisconnectListener(DisconnectListener disconnectListener) {
        this.disconnectListeners.add(disconnectListener);
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final void addReconnectListener(ReconnectListener reconnectListener) {
        this.reconnectListeners.add(reconnectListener);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final <T> Future<T> executeOrRunOnContext(Handler<Future<T>> handler) {
        return this.context == null ? Future.failedFuture(new ServerErrorException(503, "not connected")) : HonoProtonHelper.executeOrRunOnContext(this.context, handler);
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final Future<Void> isConnected() {
        return executeOrRunOnContext(future -> {
            checkConnected(future);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Future<Void> checkConnected() {
        Future<Void> future = Future.future();
        checkConnected(future);
        return future;
    }

    private void checkConnected(Handler<AsyncResult<Void>> handler) {
        if (isConnectedInternal()) {
            handler.handle(Future.succeededFuture());
        } else {
            handler.handle(Future.failedFuture(new ServerErrorException(503, "not connected")));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isConnectedInternal() {
        return (this.connection == null || this.connection.isDisconnected()) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean isShutdown() {
        return this.shuttingDown.get();
    }

    void setConnection(ProtonConnection protonConnection) {
        synchronized (this.connectionLock) {
            this.connection = protonConnection;
            if (protonConnection == null) {
                this.offeredCapabilities = Collections.emptyList();
                this.context = null;
            } else {
                this.offeredCapabilities = (List) Optional.ofNullable(protonConnection.getRemoteOfferedCapabilities()).map(symbolArr -> {
                    return Collections.unmodifiableList(Arrays.asList(symbolArr));
                }).orElse(Collections.emptyList());
            }
        }
    }

    protected final ProtonConnection getConnection() {
        ProtonConnection protonConnection;
        synchronized (this.connectionLock) {
            protonConnection = this.connection;
        }
        return protonConnection;
    }

    @Override // org.eclipse.hono.client.HonoClient
    public final boolean supportsCapability(Symbol symbol) {
        boolean contains;
        if (symbol == null) {
            return false;
        }
        synchronized (this.connectionLock) {
            contains = this.offeredCapabilities.contains(symbol);
        }
        return contains;
    }

    @Override // org.eclipse.hono.client.HonoClient
    public final Future<HonoClient> connect() {
        return connect(null, null);
    }

    @Override // org.eclipse.hono.client.HonoClient
    public final Future<HonoClient> connect(ProtonClientOptions protonClientOptions) {
        return connect((ProtonClientOptions) Objects.requireNonNull(protonClientOptions), null);
    }

    @Override // org.eclipse.hono.client.HonoClient
    public final Future<HonoClient> connect(Handler<ProtonConnection> handler) {
        return connect(null, (Handler) Objects.requireNonNull(handler));
    }

    @Override // org.eclipse.hono.client.HonoClient
    public final Future<HonoClient> connect(ProtonClientOptions protonClientOptions, Handler<ProtonConnection> handler) {
        Future<HonoClient> future = Future.future();
        if (this.shuttingDown.get()) {
            future.fail(new ClientErrorException(409, "client is already shut down"));
        } else {
            connect(protonClientOptions, future.completer(), handler);
        }
        return future;
    }

    private void connect(ProtonClientOptions protonClientOptions, Handler<AsyncResult<HonoClient>> handler, Handler<ProtonConnection> handler2) {
        this.context = this.vertx.getOrCreateContext();
        this.log.trace("running on vert.x context [event-loop context: {}]", Boolean.valueOf(this.context.isEventLoopContext()));
        executeOrRunOnContext(future -> {
            if (isConnectedInternal()) {
                this.log.debug("already connected to server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                handler.handle(Future.succeededFuture(this));
            } else if (!this.connecting.compareAndSet(false, true)) {
                this.log.debug("already trying to connect to server ...");
                handler.handle(Future.failedFuture(new ClientErrorException(409, "already connecting to server")));
            } else {
                this.log.debug("starting attempt [#{}] to connect to server [{}:{}]", Integer.valueOf(this.connectAttempts.get() + 1), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                this.clientOptions = protonClientOptions;
                this.connectionFactory.connect(this.clientOptions, asyncResult -> {
                    onRemoteClose(asyncResult, handler2);
                }, protonConnection -> {
                    onRemoteDisconnect(protonConnection, handler2);
                }, asyncResult2 -> {
                    this.connecting.compareAndSet(true, false);
                    if (asyncResult2.failed()) {
                        if (isTerminalConnectionError(asyncResult2.cause())) {
                            failConnectionAttempt(asyncResult2.cause(), handler);
                            return;
                        } else {
                            reconnect(asyncResult2.cause(), handler, handler2);
                            return;
                        }
                    }
                    ProtonConnection protonConnection2 = (ProtonConnection) asyncResult2.result();
                    if (!this.shuttingDown.get()) {
                        setConnection(protonConnection2);
                        handler.handle(Future.succeededFuture(this));
                        return;
                    }
                    protonConnection2.closeHandler(null);
                    protonConnection2.disconnectHandler(null);
                    protonConnection2.close();
                    this.connectAttempts = new AtomicInteger(0);
                    handler.handle(Future.failedFuture(new ClientErrorException(409, "client is already shut down")));
                });
            }
        });
    }

    private void onRemoteClose(AsyncResult<ProtonConnection> asyncResult, Handler<ProtonConnection> handler) {
        if (asyncResult.failed()) {
            this.log.info("remote server [{}:{}] closed connection with error condition: {}", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), asyncResult.cause().getMessage());
        } else {
            this.log.info("remote server [{}:{}] closed connection", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
        }
        this.connection.disconnectHandler(null);
        this.connection.close();
        handleConnectionLoss(handler);
    }

    private void onRemoteDisconnect(ProtonConnection protonConnection, Handler<ProtonConnection> handler) {
        if (protonConnection != this.connection) {
            this.log.warn("cannot handle failure of unknown connection");
        } else {
            this.log.debug("lost connection to server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            handleConnectionLoss(handler);
        }
    }

    private void handleConnectionLoss(Handler<ProtonConnection> handler) {
        if (isConnectedInternal()) {
            this.connection.disconnect();
        }
        ProtonConnection protonConnection = this.connection;
        clearState();
        if (handler != null) {
            handler.handle(protonConnection);
        } else {
            reconnect(this::notifyReconnectHandlers, null);
        }
    }

    private void notifyReconnectHandlers(AsyncResult<HonoClient> asyncResult) {
        if (asyncResult.succeeded()) {
            Iterator<ReconnectListener> it = this.reconnectListeners.iterator();
            while (it.hasNext()) {
                it.next().onReconnect(this);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void clearState() {
        setConnection(null);
        this.activeSenders.clear();
        this.activeRequestResponseClients.clear();
        failAllCreationRequests();
        notifyDisconnectHandlers();
        this.connectAttempts = new AtomicInteger(0);
    }

    private void notifyDisconnectHandlers() {
        Iterator<DisconnectListener> it = this.disconnectListeners.iterator();
        while (it.hasNext()) {
            it.next().onDisconnect(this);
        }
    }

    private void failAllCreationRequests() {
        Iterator<Handler<Void>> it = this.creationRequests.iterator();
        while (it.hasNext()) {
            it.next().handle(null);
            it.remove();
        }
    }

    private void reconnect(Handler<AsyncResult<HonoClient>> handler, Handler<ProtonConnection> handler2) {
        reconnect(null, handler, handler2);
    }

    private void reconnect(Throwable th, Handler<AsyncResult<HonoClient>> handler, Handler<ProtonConnection> handler2) {
        if (this.shuttingDown.get()) {
            this.log.info("client is shutting down, giving up attempt to connect");
            handler.handle(Future.failedFuture(new IllegalStateException("client is shut down")));
        } else if (this.clientConfigProperties.getReconnectAttempts() - this.connectAttempts.getAndIncrement() == 0) {
            this.log.info("max number of attempts [{}] to re-connect to peer [{}:{}] have been made, giving up", Integer.valueOf(this.clientConfigProperties.getReconnectAttempts()), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            clearState();
            failConnectionAttempt(th, handler);
        } else {
            if (th != null) {
                this.log.debug("connection attempt failed", th);
            }
            long longValue = ((Long) Optional.ofNullable(this.clientOptions).map(protonClientOptions -> {
                return Long.valueOf(protonClientOptions.getReconnectInterval());
            }).orElse(500L)).longValue();
            this.log.trace("scheduling new connection attempt in {}ms ...", Long.valueOf(longValue));
            this.vertx.setTimer(longValue, l -> {
                connect(this.clientOptions, handler, handler2);
            });
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean isTerminalConnectionError(Throwable th) {
        return (th instanceof AuthenticationException) || ((th instanceof SaslSystemException) && ((SaslSystemException) th).isPermanent());
    }

    private void failConnectionAttempt(Throwable th, Handler<AsyncResult<HonoClient>> handler) {
        this.log.info("stopping connection attempt to server [host: {}, port: {}] due to terminal error", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), th);
        if (th == null) {
            handler.handle(Future.failedFuture(new ServerErrorException(503, "failed to connect")));
            return;
        }
        if (th instanceof AuthenticationException) {
            handler.handle(Future.failedFuture(new ClientErrorException(401, "failed to authenticate with server")));
        } else if ((th instanceof SaslSystemException) && th.getMessage().contains("Could not find a suitable SASL mechanism")) {
            handler.handle(Future.failedFuture(new ClientErrorException(401, "no suitable SASL mechanism found for authentication with server")));
        } else {
            handler.handle(Future.failedFuture(new ServerErrorException(503, "failed to connect", th)));
        }
    }

    @Override // org.eclipse.hono.client.DownstreamSenderFactory
    public final Future<MessageSender> getOrCreateTelemetrySender(String str) {
        Objects.requireNonNull(str);
        return getOrCreateSender(getResourcesKeyForSender(TelemetrySenderImpl.class, TelemetrySenderImpl.getTargetAddress(str, null)), () -> {
            return createTelemetrySender(str);
        });
    }

    private Future<MessageSender> createTelemetrySender(String str) {
        return checkConnected().compose(r11 -> {
            Future future = Future.future();
            TelemetrySenderImpl.create(this.context, this.clientConfigProperties, this.connection, str, null, str2 -> {
                this.activeSenders.remove(getResourcesKeyForSender(TelemetrySenderImpl.class, TelemetrySenderImpl.getTargetAddress(str, null)));
            }, future.completer(), this.tracer);
            return future;
        });
    }

    @Override // org.eclipse.hono.client.DownstreamSenderFactory
    public final Future<MessageSender> getOrCreateEventSender(String str) {
        Objects.requireNonNull(str);
        return getOrCreateSender(getResourcesKeyForSender(EventSenderImpl.class, EventSenderImpl.getTargetAddress(str, null)), () -> {
            return createEventSender(str);
        });
    }

    private Future<MessageSender> createEventSender(String str) {
        return checkConnected().compose(r11 -> {
            Future future = Future.future();
            EventSenderImpl.create(this.context, this.clientConfigProperties, this.connection, str, null, str2 -> {
                this.activeSenders.remove(getResourcesKeyForSender(EventSenderImpl.class, EventSenderImpl.getTargetAddress(str, null)));
            }, future.completer(), this.tracer);
            return future;
        });
    }

    protected Future<MessageSender> getOrCreateSender(String str, Supplier<Future<MessageSender>> supplier) {
        return executeOrRunOnContext(future -> {
            getOrCreateSender(str, supplier, future);
        });
    }

    private static String getResourcesKeyForSender(Class<?> cls, String str) {
        return cls.getSimpleName() + "#" + str;
    }

    private void getOrCreateSender(String str, Supplier<Future<MessageSender>> supplier, Future<MessageSender> future) {
        MessageSender messageSender = this.activeSenders.get(str);
        if (messageSender != null && messageSender.isOpen()) {
            this.log.debug("reusing existing message sender [target: {}, credit: {}]", str, Integer.valueOf(messageSender.getCredit()));
            future.tryComplete(messageSender);
        } else {
            if (this.creationLocks.computeIfAbsent(str, str2 -> {
                return Boolean.FALSE;
            }).booleanValue()) {
                this.log.debug("already trying to create a message sender for {}", str);
                future.tryFail(new ServerErrorException(503, "no connection to service"));
                return;
            }
            Handler<Void> handler = r9 -> {
                this.creationLocks.remove(str);
                future.tryFail(new ServerErrorException(503, "no connection to service"));
            };
            this.creationRequests.add(handler);
            this.creationLocks.put(str, Boolean.TRUE);
            this.log.debug("creating new message sender for {}", str);
            supplier.get().setHandler2(asyncResult -> {
                this.creationLocks.remove(str);
                this.creationRequests.remove(handler);
                if (!asyncResult.succeeded()) {
                    this.log.debug("failed to create new message sender for {}", str, asyncResult.cause());
                    this.activeSenders.remove(str);
                    future.tryFail(asyncResult.cause());
                } else {
                    MessageSender messageSender2 = (MessageSender) asyncResult.result();
                    this.log.debug("successfully created new message sender for {}", str);
                    this.activeSenders.put(str, messageSender2);
                    future.complete(messageSender2);
                }
            });
        }
    }

    @Override // org.eclipse.hono.client.ApplicationClientFactory
    public final Future<MessageConsumer> createTelemetryConsumer(String str, Consumer<Message> consumer, Handler<Void> handler) {
        return createConsumer(str, () -> {
            return newTelemetryConsumer(str, consumer, handler);
        });
    }

    private Future<MessageConsumer> newTelemetryConsumer(String str, Consumer<Message> consumer, Handler<Void> handler) {
        return checkConnected().compose(r13 -> {
            Future future = Future.future();
            TelemetryConsumerImpl.create(this.context, this.clientConfigProperties, this.connection, str, this.connectionFactory.getPathSeparator(), consumer, future.completer(), str2 -> {
                handler.handle(null);
            });
            return future;
        });
    }

    @Override // org.eclipse.hono.client.ApplicationClientFactory
    public final Future<MessageConsumer> createEventConsumer(String str, Consumer<Message> consumer, Handler<Void> handler) {
        return createEventConsumer(str, (protonDelivery, message) -> {
            consumer.accept(message);
        }, handler);
    }

    @Override // org.eclipse.hono.client.ApplicationClientFactory
    public final Future<MessageConsumer> createEventConsumer(String str, BiConsumer<ProtonDelivery, Message> biConsumer, Handler<Void> handler) {
        return createConsumer(str, () -> {
            return newEventConsumer(str, biConsumer, handler);
        });
    }

    private Future<MessageConsumer> newEventConsumer(String str, BiConsumer<ProtonDelivery, Message> biConsumer, Handler<Void> handler) {
        return checkConnected().compose(r13 -> {
            Future future = Future.future();
            EventConsumerImpl.create(this.context, this.clientConfigProperties, this.connection, str, this.connectionFactory.getPathSeparator(), biConsumer, future.completer(), str2 -> {
                handler.handle(null);
            });
            return future;
        });
    }

    @Override // org.eclipse.hono.client.ApplicationClientFactory
    public final Future<MessageConsumer> createAsyncCommandResponseConsumer(String str, String str2, Consumer<Message> consumer, Handler<Void> handler) {
        return createAsyncCommandResponseConsumer(str, str2, (protonDelivery, message) -> {
            consumer.accept(message);
        }, handler);
    }

    @Override // org.eclipse.hono.client.ApplicationClientFactory
    public final Future<MessageConsumer> createAsyncCommandResponseConsumer(String str, String str2, BiConsumer<ProtonDelivery, Message> biConsumer, Handler<Void> handler) {
        return createConsumer(str, () -> {
            return newAsyncCommandResponseConsumer(str, str2, biConsumer, handler);
        });
    }

    private Future<MessageConsumer> newAsyncCommandResponseConsumer(String str, String str2, BiConsumer<ProtonDelivery, Message> biConsumer, Handler<Void> handler) {
        return checkConnected().compose(r15 -> {
            Future future = Future.future();
            AsyncCommandResponseConsumerImpl.create(this.context, this.clientConfigProperties, this.connection, str, str2, this.connectionFactory.getPathSeparator(), biConsumer, future.completer(), str3 -> {
                handler.handle(null);
            });
            return future;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Future<MessageConsumer> createConsumer(String str, Supplier<Future<MessageConsumer>> supplier) {
        return executeOrRunOnContext(future -> {
            createConsumer(str, supplier, future);
        });
    }

    private void createConsumer(String str, Supplier<Future<MessageConsumer>> supplier, Future<MessageConsumer> future) {
        Handler<Void> handler = r7 -> {
            future.tryFail(new ServerErrorException(503, "connection to server lost"));
        };
        this.creationRequests.add(handler);
        supplier.get().setHandler2(asyncResult -> {
            this.creationRequests.remove(handler);
            if (asyncResult.succeeded()) {
                future.tryComplete((MessageConsumer) asyncResult.result());
            } else {
                future.tryFail(asyncResult.cause());
            }
        });
    }

    @Override // org.eclipse.hono.client.CredentialsClientFactory
    public final Future<CredentialsClient> getOrCreateCredentialsClient(String str) {
        Objects.requireNonNull(str);
        return getOrCreateRequestResponseClient(CredentialsClientImpl.getTargetAddress(str), () -> {
            return newCredentialsClient(str);
        }).map(requestResponseClient -> {
            return (CredentialsClient) requestResponseClient;
        });
    }

    protected Future<RequestResponseClient> newCredentialsClient(String str) {
        return checkConnected().compose(r11 -> {
            Future future = Future.future();
            CredentialsClientImpl.create(this.context, this.clientConfigProperties, this.tracer, this.connection, str, this::removeCredentialsClient, this::removeCredentialsClient, future.completer());
            return future.map(credentialsClient -> {
                return credentialsClient;
            });
        });
    }

    protected final void removeCredentialsClient(String str) {
        removeActiveRequestResponseClient(CredentialsClientImpl.getTargetAddress(str));
    }

    private void removeActiveRequestResponseClient(String str) {
        RequestResponseClient remove = this.activeRequestResponseClients.remove(str);
        if (remove != null) {
            remove.close(asyncResult -> {
            });
            this.log.debug("closed and removed client for [{}]", str);
        }
    }

    @Override // org.eclipse.hono.client.RegistrationClientFactory
    public final Future<RegistrationClient> getOrCreateRegistrationClient(String str) {
        Objects.requireNonNull(str);
        return getOrCreateRequestResponseClient(RegistrationClientImpl.getTargetAddress(str), () -> {
            return newRegistrationClient(str);
        }).map(requestResponseClient -> {
            return (RegistrationClient) requestResponseClient;
        });
    }

    protected Future<RequestResponseClient> newRegistrationClient(String str) {
        Objects.requireNonNull(str);
        return checkConnected().compose(r12 -> {
            Future future = Future.future();
            RegistrationClientImpl.create(this.context, this.clientConfigProperties, this.cacheProvider, this.tracer, this.connection, str, this::removeRegistrationClient, this::removeRegistrationClient, future.completer());
            return future.map(registrationClient -> {
                return registrationClient;
            });
        });
    }

    protected final void removeRegistrationClient(String str) {
        removeActiveRequestResponseClient(RegistrationClientImpl.getTargetAddress(str));
    }

    @Override // org.eclipse.hono.client.TenantClientFactory
    public Future<TenantClient> getOrCreateTenantClient() {
        return getOrCreateRequestResponseClient(TenantClientImpl.getTargetAddress(), () -> {
            return newTenantClient();
        }).map(requestResponseClient -> {
            return (TenantClient) requestResponseClient;
        });
    }

    protected Future<RequestResponseClient> newTenantClient() {
        return checkConnected().compose(r10 -> {
            Future future = Future.future();
            TenantClientImpl.create(this.context, this.clientConfigProperties, this.cacheProvider, this.tracer, this.connection, this::removeTenantClient, this::removeTenantClient, future.completer());
            return future.map(tenantClient -> {
                return tenantClient;
            });
        });
    }

    private void removeTenantClient(String str) {
        removeTenantClient();
    }

    protected void removeTenantClient() {
        removeActiveRequestResponseClient(TenantClientImpl.getTargetAddress());
    }

    @Override // org.eclipse.hono.client.ApplicationClientFactory
    public Future<CommandClient> getOrCreateCommandClient(String str, String str2) {
        return getOrCreateCommandClient(str, str2, UUID.randomUUID().toString());
    }

    @Override // org.eclipse.hono.client.ApplicationClientFactory
    public Future<CommandClient> getOrCreateCommandClient(String str, String str2, String str3) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        this.log.debug("get or create command client for [tenantId: {}, deviceId: {}, replyId: {}]", str, str2, str3);
        return getOrCreateRequestResponseClient(ResourceIdentifier.from(CommandConstants.COMMAND_ENDPOINT, str, str2).toString(), () -> {
            return newCommandClient(str, str2, str3);
        }).map(requestResponseClient -> {
            return (CommandClient) requestResponseClient;
        });
    }

    private Future<RequestResponseClient> newCommandClient(String str, String str2, String str3) {
        return checkConnected().compose(r14 -> {
            Future future = Future.future();
            CommandClientImpl.create(this.context, this.clientConfigProperties, this.connection, str, str2, str3, this::removeActiveRequestResponseClient, this::removeActiveRequestResponseClient, future.completer());
            return future.map(commandClient -> {
                return commandClient;
            });
        });
    }

    @Override // org.eclipse.hono.client.ApplicationClientFactory
    public Future<AsyncCommandClient> getOrCreateAsyncCommandClient(String str, String str2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        return getOrCreateSender(getResourcesKeyForSender(AsyncCommandClientImpl.class, AsyncCommandClientImpl.getTargetAddress(str, str2)), () -> {
            return newAsyncCommandClient(str, str2);
        }).map(messageSender -> {
            return (AsyncCommandClient) messageSender;
        });
    }

    private Future<MessageSender> newAsyncCommandClient(String str, String str2) {
        return checkConnected().compose(r12 -> {
            Future future = Future.future();
            AsyncCommandClientImpl.create(this.context, this.clientConfigProperties, this.connection, str, str2, str3 -> {
                this.activeSenders.remove(getResourcesKeyForSender(AsyncCommandClientImpl.class, AsyncCommandClientImpl.getTargetAddress(str, str2)));
            }, future.completer());
            return future.map(asyncCommandClient -> {
                return asyncCommandClient;
            });
        });
    }

    protected Future<RequestResponseClient> getOrCreateRequestResponseClient(String str, Supplier<Future<RequestResponseClient>> supplier) {
        return executeOrRunOnContext(future -> {
            getOrCreateRequestResponseClient(str, supplier, future);
        });
    }

    private void getOrCreateRequestResponseClient(String str, Supplier<Future<RequestResponseClient>> supplier, Future<RequestResponseClient> future) {
        RequestResponseClient requestResponseClient = this.activeRequestResponseClients.get(str);
        if (requestResponseClient != null && requestResponseClient.isOpen()) {
            this.log.debug("reusing existing client [target: {}]", str);
            future.complete(requestResponseClient);
        } else {
            if (this.creationLocks.computeIfAbsent(str, str2 -> {
                return Boolean.FALSE;
            }).booleanValue()) {
                this.log.debug("already trying to create a client [target: {}]", str);
                future.fail(new ServerErrorException(503, "no connection to service"));
                return;
            }
            Handler<Void> handler = r9 -> {
                this.creationLocks.remove(str);
                future.tryFail(new ServerErrorException(503, "no connection to service"));
            };
            this.creationRequests.add(handler);
            this.creationLocks.put(str, Boolean.TRUE);
            this.log.debug("creating new client [target: {}]", str);
            supplier.get().setHandler2(asyncResult -> {
                if (asyncResult.succeeded()) {
                    this.log.debug("successfully created new client [target: {}]", str);
                    this.activeRequestResponseClients.put(str, (RequestResponseClient) asyncResult.result());
                    future.tryComplete((RequestResponseClient) asyncResult.result());
                } else {
                    this.log.debug("failed to create new client [target: {}]", str, asyncResult.cause());
                    this.activeRequestResponseClients.remove(str);
                    future.tryFail(asyncResult.cause());
                }
                this.creationLocks.remove(str);
                this.creationRequests.remove(handler);
            });
        }
    }

    @Override // org.eclipse.hono.client.HonoClient
    public final void shutdown() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        shutdown(asyncResult -> {
            if (asyncResult.succeeded()) {
                countDownLatch.countDown();
            } else {
                this.log.error("could not close connection to server", asyncResult.cause());
            }
        });
        try {
            if (!countDownLatch.await(5L, TimeUnit.SECONDS)) {
                this.log.error("shutdown of client timed out after 5 seconds");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.eclipse.hono.client.HonoClient
    public final void shutdown(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        if (this.shuttingDown.compareAndSet(Boolean.FALSE.booleanValue(), Boolean.TRUE.booleanValue())) {
            closeConnection(handler);
        } else {
            handler.handle(Future.failedFuture(new ClientErrorException(409, "already shutting down")));
        }
    }

    @Override // org.eclipse.hono.client.HonoClient, org.eclipse.hono.client.ConnectionLifecycle
    public final void disconnect() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        disconnect(asyncResult -> {
            if (!asyncResult.succeeded()) {
                this.log.error("could not disconnect from the server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            } else {
                this.log.info("successfully disconnected from the server [{}:{}]", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                countDownLatch.countDown();
            }
        });
        try {
            if (!countDownLatch.await(5L, TimeUnit.SECONDS)) {
                this.log.error("Disconnecting from the server [{}:{}] timed out after 5 seconds", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.eclipse.hono.client.ConnectionLifecycle
    public final void disconnect(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        if (this.disconnecting.compareAndSet(Boolean.FALSE.booleanValue(), Boolean.TRUE.booleanValue())) {
            closeConnection(handler);
        } else {
            handler.handle(Future.failedFuture(new ClientErrorException(409, "already disconnecting")));
        }
    }

    private void closeConnection(Handler<AsyncResult<Void>> handler) {
        Handler handler2 = asyncResult -> {
            this.disconnecting.compareAndSet(Boolean.TRUE.booleanValue(), Boolean.FALSE.booleanValue());
            if (asyncResult.succeeded()) {
                handler.handle(Future.succeededFuture());
            } else {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        };
        synchronized (this.connectionLock) {
            if (isConnectedInternal()) {
                executeOrRunOnContext(future -> {
                    ProtonConnection protonConnection = this.connection;
                    protonConnection.disconnectHandler(null);
                    Handler handler3 = asyncResult2 -> {
                        if (asyncResult2.succeeded()) {
                            this.log.info("closed connection to container [{}] at [{}:{}]", protonConnection.getRemoteContainer(), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                        } else {
                            this.log.info("closed connection to container [{}] at [{}:{}]", protonConnection.getRemoteContainer(), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()), asyncResult2.cause());
                        }
                        clearState();
                        future.complete();
                    };
                    long timer = this.vertx.setTimer(this.clientConfigProperties.getConnectTimeout(), l -> {
                        this.log.info("did not receive remote peer's close frame after {}ms", Integer.valueOf(this.clientConfigProperties.getConnectTimeout()));
                        handler3.handle(Future.succeededFuture());
                    });
                    protonConnection.closeHandler(asyncResult3 -> {
                        if (this.vertx.cancelTimer(timer)) {
                            handler3.handle(asyncResult3);
                        }
                    });
                    this.log.info("closing connection to container [{}] at [{}:{}] ...", protonConnection.getRemoteContainer(), this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                    protonConnection.close();
                }).setHandler2(handler2);
            } else {
                this.log.info("connection to server [{}:{}] already closed", this.connectionFactory.getHost(), Integer.valueOf(this.connectionFactory.getPort()));
                handler2.handle(Future.succeededFuture());
            }
        }
    }
}
