package org.eclipse.hono.client.impl;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.CommandConsumerFactory;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponseSender;
import org.eclipse.hono.client.DisconnectListener;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ReconnectListener;
import org.eclipse.hono.client.ResourceConflictException;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.0-M2.jar:org/eclipse/hono/client/impl/CommandConsumerFactoryImpl.class */
public class CommandConsumerFactoryImpl extends AbstractHonoClientFactory implements CommandConsumerFactory {
    public static final long MIN_LIVENESS_CHECK_INTERVAL_MILLIS = 2000;
    private final CachingClientFactory<MessageConsumer> commandConsumerFactory;
    private final Map<String, Long> livenessChecks;

    public CommandConsumerFactoryImpl(HonoConnection honoConnection) {
        super(honoConnection);
        this.livenessChecks = new HashMap();
        this.commandConsumerFactory = new CachingClientFactory<>(messageConsumer -> {
            return true;
        });
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory
    protected void onDisconnect() {
        this.commandConsumerFactory.clearState();
    }

    private String getKey(String str, String str2) {
        return Device.asAddress(str, str2);
    }

    @Override // org.eclipse.hono.client.CommandConsumerFactory
    public final Future<MessageConsumer> createCommandConsumer(String str, String str2, Handler<CommandContext> handler, Handler<Void> handler2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(handler);
        return this.connection.executeOrRunOnContext(future -> {
            String key = getKey(str, str2);
            if (this.commandConsumerFactory.getClient(key) == null) {
                this.commandConsumerFactory.getOrCreateClient(key, () -> {
                    return newCommandConsumer(str, str2, handler, handler2);
                }, future);
            } else {
                this.log.debug("cannot create concurrent command consumer [tenant: {}, device-id: {}]", str, str2);
                future.fail(new ResourceConflictException("message consumer already in use"));
            }
        });
    }

    @Override // org.eclipse.hono.client.CommandConsumerFactory
    public final Future<MessageConsumer> createCommandConsumer(String str, String str2, Handler<CommandContext> handler, Handler<Void> handler2, long j) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(handler);
        if (j < 0) {
            throw new IllegalArgumentException("liveness check interval must be > 0");
        }
        return createCommandConsumer(str, str2, handler, handler2).map(messageConsumer -> {
            String key = getKey(str, str2);
            this.livenessChecks.put(key, Long.valueOf(this.connection.getVertx().setPeriodic(Math.max(2000L, j), newLivenessCheck(str, str2, key, handler, handler2))));
            return messageConsumer;
        });
    }

    Handler<Long> newLivenessCheck(String str, String str2, String str3, Handler<CommandContext> handler, Handler<Void> handler2) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        return l -> {
            if (this.connection.isShutdown()) {
                this.connection.getVertx().cancelTimer(l.longValue());
            } else {
                this.connection.isConnected().map(r13 -> {
                    if (this.commandConsumerFactory.getClient(str3) != null) {
                        return null;
                    }
                    if (!atomicBoolean.compareAndSet(false, true)) {
                        this.log.debug("already trying to re-create command consumer [tenant: {}, device-id: {}], yielding ...", str, str2);
                        return null;
                    }
                    this.log.debug("trying to re-create command consumer [tenant: {}, device-id: {}]", str, str2);
                    createCommandConsumer(str, str2, handler, handler2).map(messageConsumer -> {
                        this.log.debug("successfully re-created command consumer [tenant: {}, device-id: {}]", str, str2);
                        return messageConsumer;
                    }).otherwise(th -> {
                        this.log.info("failed to re-create command consumer [tenant: {}, device-id: {}]: {}", str, str2, th.getMessage());
                        return null;
                    }).setHandler2(asyncResult -> {
                        atomicBoolean.compareAndSet(true, false);
                    });
                    return null;
                });
            }
        };
    }

    private Future<MessageConsumer> newCommandConsumer(String str, String str2, Handler<CommandContext> handler, Handler<Void> handler2) {
        String asAddress = Device.asAddress(str, str2);
        return CommandConsumer.create(this.connection, str, str2, handler, str3 -> {
            Optional ofNullable = Optional.ofNullable(this.livenessChecks.remove(asAddress));
            Vertx vertx = this.connection.getVertx();
            Objects.requireNonNull(vertx);
            ofNullable.ifPresent((v1) -> {
                r1.cancelTimer(v1);
            });
            this.commandConsumerFactory.removeClient(asAddress);
        }, str4 -> {
            this.commandConsumerFactory.removeClient(asAddress);
            handler2.handle(null);
        }).map(commandConsumer -> {
            return commandConsumer;
        });
    }

    @Override // org.eclipse.hono.client.CommandConsumerFactory
    public Future<CommandResponseSender> getCommandResponseSender(String str, String str2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        return this.connection.executeOrRunOnContext(future -> {
            CommandResponseSenderImpl.create(this.connection, str, str2, str3 -> {
            }).setHandler2(future);
        });
    }

    @Override // org.eclipse.hono.client.CommandConsumerFactory
    @Deprecated
    public Future<Void> closeCommandConsumer(String str, String str2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        return this.connection.executeOrRunOnContext(future -> {
            String key = getKey(str, str2);
            Optional ofNullable = Optional.ofNullable(this.livenessChecks.remove(key));
            Vertx vertx = this.connection.getVertx();
            Objects.requireNonNull(vertx);
            ofNullable.ifPresent((v1) -> {
                r1.cancelTimer(v1);
            });
            this.commandConsumerFactory.removeClient(key, messageConsumer -> {
                messageConsumer.close(future);
            });
        });
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void disconnect(Handler handler) {
        super.disconnect(handler);
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void disconnect() {
        super.disconnect();
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ Future connect() {
        return super.connect();
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void addReconnectListener(ReconnectListener reconnectListener) {
        super.addReconnectListener(reconnectListener);
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void addDisconnectListener(DisconnectListener disconnectListener) {
        super.addDisconnectListener(disconnectListener);
    }
}
