package org.eclipse.hono.client.impl;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.CommandConnection;
import org.eclipse.hono.client.CommandConsumer;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponseSender;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ResourceConflictException;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.connection.ConnectionFactory;

/* loaded from: input_file:BOOT-INF/lib/hono-client-0.9-M1.jar:org/eclipse/hono/client/impl/CommandConnectionImpl.class */
public class CommandConnectionImpl extends HonoClientImpl implements CommandConnection {
    public static final long MIN_LIVENESS_CHECK_INTERVAL_MILLIS = 2000;
    private final Map<String, MessageConsumer> commandConsumers;
    private final Map<String, Long> livenessChecks;

    public CommandConnectionImpl(Vertx vertx, ClientConfigProperties clientConfigProperties) {
        super(vertx, clientConfigProperties);
        this.commandConsumers = new HashMap();
        this.livenessChecks = new HashMap();
    }

    public CommandConnectionImpl(Vertx vertx, ConnectionFactory connectionFactory, ClientConfigProperties clientConfigProperties) {
        super(vertx, connectionFactory, clientConfigProperties);
        this.commandConsumers = new HashMap();
        this.livenessChecks = new HashMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.eclipse.hono.client.impl.HonoClientImpl
    public void clearState() {
        super.clearState();
        this.commandConsumers.clear();
    }

    @Override // org.eclipse.hono.client.CommandConnection
    public final Future<MessageConsumer> createCommandConsumer(String str, String str2, Handler<CommandContext> handler, Handler<Void> handler2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(handler);
        return executeOrRunOnContext(future -> {
            String asAddress = Device.asAddress(str, str2);
            if (this.commandConsumers.get(asAddress) == null) {
                createConsumer(str, () -> {
                    return newCommandConsumer(str, str2, handler, handler2);
                }).map(messageConsumer -> {
                    this.commandConsumers.put(asAddress, messageConsumer);
                    return messageConsumer;
                }).setHandler2(future);
            } else {
                this.log.debug("cannot create concurrent command consumer [tenant: {}, device-id: {}]", str, str2);
                future.fail(new ResourceConflictException("message consumer already in use"));
            }
        });
    }

    @Override // org.eclipse.hono.client.CommandConnection
    public final Future<MessageConsumer> createCommandConsumer(String str, String str2, Handler<CommandContext> handler, Handler<Void> handler2, long j) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(handler);
        if (j < 0) {
            throw new IllegalArgumentException("liveness check interval must be > 0");
        }
        return createCommandConsumer(str, str2, handler, handler2).map(messageConsumer -> {
            String asAddress = Device.asAddress(str, str2);
            this.livenessChecks.put(asAddress, Long.valueOf(this.vertx.setPeriodic(Math.max(2000L, j), newLivenessCheck(str, str2, asAddress, handler, handler2))));
            return messageConsumer;
        });
    }

    Handler<Long> newLivenessCheck(String str, String str2, String str3, Handler<CommandContext> handler, Handler<Void> handler2) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        return l -> {
            if (isShutdown()) {
                this.vertx.cancelTimer(l.longValue());
                return;
            }
            if (!isConnectedInternal() || this.commandConsumers.containsKey(str3)) {
                return;
            }
            if (!atomicBoolean.compareAndSet(false, true)) {
                this.log.debug("already trying to re-create command consumer [tenant: {}, device-id: {}], yielding ...", str, str2);
            } else {
                this.log.debug("trying to re-create command consumer [tenant: {}, device-id: {}]", str, str2);
                createCommandConsumer(str, str2, handler, handler2).map(messageConsumer -> {
                    this.log.debug("successfully re-created command consumer [tenant: {}, device-id: {}]", str, str2);
                    return messageConsumer;
                }).otherwise(th -> {
                    this.log.info("failed to re-create command consumer [tenant: {}, device-id: {}]: {}", str, str2, th.getMessage());
                    return null;
                }).setHandler2(asyncResult -> {
                    atomicBoolean.compareAndSet(true, false);
                });
            }
        };
    }

    private Future<MessageConsumer> newCommandConsumer(String str, String str2, Handler<CommandContext> handler, Handler<Void> handler2) {
        return checkConnected().compose(r16 -> {
            Future future = Future.future();
            String asAddress = Device.asAddress(str, str2);
            CommandConsumer.create(this.context, this.clientConfigProperties, this.connection, str, str2, handler, str3 -> {
                Optional ofNullable = Optional.ofNullable(this.livenessChecks.remove(asAddress));
                Vertx vertx = this.vertx;
                Objects.requireNonNull(vertx);
                ofNullable.ifPresent((v1) -> {
                    r1.cancelTimer(v1);
                });
                this.commandConsumers.remove(asAddress);
            }, str4 -> {
                this.commandConsumers.remove(asAddress);
                handler2.handle(null);
            }, future, getTracer());
            return future;
        });
    }

    @Override // org.eclipse.hono.client.CommandConnection
    public Future<Void> closeCommandConsumer(String str, String str2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        return executeOrRunOnContext(future -> {
            String asAddress = Device.asAddress(str, str2);
            Optional ofNullable = Optional.ofNullable(this.livenessChecks.remove(asAddress));
            Vertx vertx = this.vertx;
            Objects.requireNonNull(vertx);
            ofNullable.ifPresent((v1) -> {
                r1.cancelTimer(v1);
            });
            Optional.ofNullable(this.commandConsumers.remove(asAddress)).ifPresent(messageConsumer -> {
                messageConsumer.close(future);
            });
        });
    }

    @Override // org.eclipse.hono.client.CommandConnection
    public Future<CommandResponseSender> getCommandResponseSender(String str, String str2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        return executeOrRunOnContext(future -> {
            checkConnected().setHandler2(asyncResult -> {
                if (asyncResult.succeeded()) {
                    CommandResponseSenderImpl.create(this.context, this.clientConfigProperties, this.connection, str, str2, str3 -> {
                    }, future, getTracer());
                } else {
                    future.fail(asyncResult.cause());
                }
            });
        });
    }
}
