package org.eclipse.hono.client.impl;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.proton.ProtonReceiver;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.Command;
import org.eclipse.hono.client.CommandConsumerFactory;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponseSender;
import org.eclipse.hono.client.DelegatedCommandSender;
import org.eclipse.hono.client.DisconnectListener;
import org.eclipse.hono.client.GatewayMapper;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ReconnectListener;
import org.eclipse.hono.client.ResourceConflictException;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.1.0-M1.jar:org/eclipse/hono/client/impl/CommandConsumerFactoryImpl.class */
public class CommandConsumerFactoryImpl extends AbstractHonoClientFactory implements CommandConsumerFactory {
    public static final long MIN_LIVENESS_CHECK_INTERVAL_MILLIS = 2000;
    private static final Boolean FORCED_COMMAND_REROUTING_ENABLED = Boolean.valueOf(System.getProperty("enableForcedCommandRerouting", "false"));
    private static final String FORCE_COMMAND_REROUTING_APPLICATION_PROPERTY = "force-command-rerouting";
    private final CachingClientFactory<DestinationCommandConsumer> destinationCommandConsumerFactory;
    private final CachingClientFactory<MessageConsumer> mappingAndDelegatingCommandConsumerFactory;
    private final CachingClientFactory<DelegatedCommandSender> delegatedCommandSenderFactory;
    private final Map<String, LivenessCheckData> destinationCommandConsumerLivenessChecks;
    private final GatewayMapper gatewayMapper;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/hono-client-1.1.0-M1.jar:org/eclipse/hono/client/impl/CommandConsumerFactoryImpl$LivenessCheckData.class */
    public static class LivenessCheckData {
        private final long timerId;
        private Supplier<Collection<CommandHandlerWrapper>> commandHandlersSupplier;

        LivenessCheckData(long j, Supplier<Collection<CommandHandlerWrapper>> supplier) {
            this.timerId = ((Long) Objects.requireNonNull(Long.valueOf(j))).longValue();
            this.commandHandlersSupplier = (Supplier) Objects.requireNonNull(supplier);
        }

        public long getTimerId() {
            return this.timerId;
        }

        public Collection<CommandHandlerWrapper> getCommandHandlers() {
            return this.commandHandlersSupplier.get();
        }

        public void setCommandHandlersSupplier(Supplier<Collection<CommandHandlerWrapper>> supplier) {
            this.commandHandlersSupplier = (Supplier) Objects.requireNonNull(supplier);
        }
    }

    public CommandConsumerFactoryImpl(HonoConnection honoConnection, GatewayMapper gatewayMapper) {
        super(honoConnection);
        this.destinationCommandConsumerLivenessChecks = new HashMap();
        this.gatewayMapper = (GatewayMapper) Objects.requireNonNull(gatewayMapper);
        this.destinationCommandConsumerFactory = new CachingClientFactory<>(honoConnection.getVertx(), destinationCommandConsumer -> {
            return destinationCommandConsumer.isAlive();
        });
        this.mappingAndDelegatingCommandConsumerFactory = new CachingClientFactory<>(honoConnection.getVertx(), messageConsumer -> {
            return true;
        });
        this.delegatedCommandSenderFactory = new CachingClientFactory<>(honoConnection.getVertx(), delegatedCommandSender -> {
            return delegatedCommandSender.isOpen();
        });
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory
    protected void onDisconnect() {
        this.destinationCommandConsumerFactory.clearState();
        this.mappingAndDelegatingCommandConsumerFactory.clearState();
    }

    private String getGatewayOrDeviceKey(String str, String str2, String str3) {
        return Device.asAddress(str, str3 != null ? str3 : str2);
    }

    private String getDeviceKey(String str, String str2) {
        return Device.asAddress(str, str2);
    }

    @Override // org.eclipse.hono.client.CommandConsumerFactory
    public final Future<MessageConsumer> createCommandConsumer(String str, String str2, Handler<CommandContext> handler, Handler<Void> handler2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(handler);
        return doCreateCommandConsumer(str, str2, null, handler, handler2, null);
    }

    @Override // org.eclipse.hono.client.CommandConsumerFactory
    public final Future<MessageConsumer> createCommandConsumer(String str, String str2, String str3, Handler<CommandContext> handler, Handler<Void> handler2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(handler);
        return doCreateCommandConsumer(str, str2, str3, handler, handler2, null);
    }

    @Override // org.eclipse.hono.client.CommandConsumerFactory
    public final Future<MessageConsumer> createCommandConsumer(String str, String str2, Handler<CommandContext> handler, Handler<Void> handler2, long j) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(handler);
        if (j < 0) {
            throw new IllegalArgumentException("liveness check interval must be > 0");
        }
        return doCreateCommandConsumer(str, str2, null, handler, handler2, Long.valueOf(j));
    }

    @Override // org.eclipse.hono.client.CommandConsumerFactory
    public final Future<MessageConsumer> createCommandConsumer(String str, String str2, String str3, Handler<CommandContext> handler, Handler<Void> handler2, long j) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(handler);
        if (j < 0) {
            throw new IllegalArgumentException("liveness check interval must be > 0");
        }
        return doCreateCommandConsumer(str, str2, str3, handler, handler2, Long.valueOf(j));
    }

    private Future<MessageConsumer> doCreateCommandConsumer(String str, String str2, String str3, Handler<CommandContext> handler, Handler<Void> handler2, Long l) {
        this.log.trace("create command consumer [tenant-id: {}, device-id: {}, gateway-id: {}]", str, str2, str3);
        return this.connection.executeOnContext(promise -> {
            String str4 = str3 != null ? str3 : str2;
            String gatewayOrDeviceKey = getGatewayOrDeviceKey(str, str2, str3);
            ensureNoConflictingConsumerExists(str, str2, str3, gatewayOrDeviceKey).compose(r17 -> {
                Promise promise = Promise.promise();
                this.destinationCommandConsumerFactory.getOrCreateClient(gatewayOrDeviceKey, () -> {
                    return newDestinationCommandConsumer(str, str4);
                }, promise);
                Future map = promise.future().compose(destinationCommandConsumer -> {
                    return destinationCommandConsumer.addDeviceSpecificCommandHandler(str2, str3, handler, handler2);
                }).map(r8 -> {
                    return new DeviceSpecificCommandConsumer(() -> {
                        return this.destinationCommandConsumerFactory.getClient(gatewayOrDeviceKey);
                    }, str2);
                });
                return CompositeFuture.all(map, getOrCreateMappingAndDelegatingCommandConsumer(str)).map(compositeFuture -> {
                    if (l != null) {
                        DestinationCommandConsumer destinationCommandConsumer2 = (DestinationCommandConsumer) promise.future().result();
                        registerLivenessCheck(str, str4, () -> {
                            return destinationCommandConsumer2.getCommandHandlers();
                        }, l.longValue());
                    }
                    return (MessageConsumer) map.result();
                });
            }).setHandler2(promise);
        });
    }

    private Future<Void> ensureNoConflictingConsumerExists(String str, String str2, String str3, String str4) {
        Promise promise = Promise.promise();
        DestinationCommandConsumer client = this.destinationCommandConsumerFactory.getClient(str4);
        if (client == null) {
            promise.complete();
        } else if (!client.isAlive()) {
            this.log.debug("cannot create command consumer, existing consumer not properly closed yet [tenant: {}, device-id: {}]", str, str2);
            promise.fail(new ResourceConflictException("message consumer already in use"));
        } else if (client.containsCommandHandler(str2)) {
            this.log.debug("cannot create concurrent command consumer [tenant: {}, device-id: {}]", str, str2);
            promise.fail(new ResourceConflictException("message consumer already in use"));
        } else if (str3 != null) {
            this.log.trace("gateway command consumer already exists, will add device handler to that [tenant: {}, gateway-id: {}, device-id: {}]", str, str3, str2);
            promise.complete();
        } else {
            this.log.trace("gateway command consumer with a device specific handler already exists, will add handler for all gateway devices [tenant: {}, gateway-id: {}]", str, str2);
            promise.complete();
        }
        return promise.future();
    }

    private Future<DestinationCommandConsumer> newDestinationCommandConsumer(String str, String str2) {
        String deviceKey = getDeviceKey(str, str2);
        return DestinationCommandConsumer.create(this.connection, str, str2, str3 -> {
            Optional.ofNullable(this.destinationCommandConsumerLivenessChecks.remove(deviceKey)).ifPresent(livenessCheckData -> {
                this.connection.getVertx().cancelTimer(livenessCheckData.getTimerId());
            });
            this.destinationCommandConsumerFactory.removeClient(deviceKey);
        }, str4 -> {
            this.destinationCommandConsumerFactory.removeClient(deviceKey);
        });
    }

    private Future<MessageConsumer> getOrCreateMappingAndDelegatingCommandConsumer(String str) {
        Objects.requireNonNull(str);
        return this.connection.executeOnContext(promise -> {
            MessageConsumer client = this.mappingAndDelegatingCommandConsumerFactory.getClient(str);
            if (client != null) {
                promise.complete(client);
            } else {
                this.mappingAndDelegatingCommandConsumerFactory.getOrCreateClient(str, () -> {
                    return newMappingAndDelegatingCommandConsumer(str);
                }, promise);
            }
        });
    }

    private Future<MessageConsumer> newMappingAndDelegatingCommandConsumer(String str) {
        AtomicReference atomicReference = new AtomicReference();
        DelegateViaDownstreamPeerCommandHandler delegateViaDownstreamPeerCommandHandler = new DelegateViaDownstreamPeerCommandHandler((str2, str3) -> {
            return createDelegatedCommandSender(str2, str3);
        });
        GatewayMappingCommandHandler gatewayMappingCommandHandler = new GatewayMappingCommandHandler(this.gatewayMapper, commandContext -> {
            CommandHandlerWrapper commandHandlerWrapper = null;
            DestinationCommandConsumer client = this.destinationCommandConsumerFactory.getClient(getDeviceKey(str, commandContext.getCommand().getDeviceId()));
            if (client != null) {
                commandHandlerWrapper = client.getCommandHandlerOrDefault(commandContext.getCommand().getOriginalDeviceId());
            }
            if (commandHandlerWrapper != null && isForcedCommandReroutingSet(commandContext)) {
                this.log.debug("forced command rerouting is set, skip usage of local {} for {}", commandHandlerWrapper, commandContext.getCommand());
                commandHandlerWrapper = null;
            }
            if (commandHandlerWrapper == null) {
                delegateViaDownstreamPeerCommandHandler.handle(commandContext);
            } else {
                this.log.trace("use local {} for {}", commandHandlerWrapper, commandContext.getCommand());
                commandHandlerWrapper.handleCommand(commandContext);
            }
        });
        return MappingAndDelegatingCommandConsumer.create(this.connection, str, (protonDelivery, message) -> {
            String resourceId = message.getAddress() != null ? ResourceIdentifier.fromString(message.getAddress()).getResourceId() : null;
            if (resourceId == null) {
                this.log.debug("address of command message is invalid: {}", message.getAddress());
                Rejected rejected = new Rejected();
                rejected.setError(new ErrorCondition(Constants.AMQP_BAD_REQUEST, "malformed command message"));
                protonDelivery.disposition(rejected, true);
                return;
            }
            Command from = Command.from(message, str, resourceId);
            SpanContext extractSpanContext = TracingHelper.extractSpanContext(this.connection.getTracer(), message);
            Span createSpan = CommandConsumer.createSpan("delegate and send command", str, resourceId, null, this.connection.getTracer(), extractSpanContext);
            CommandConsumer.logReceivedCommandToSpan(from, createSpan);
            CommandContext from2 = CommandContext.from(from, protonDelivery, (ProtonReceiver) atomicReference.get(), createSpan);
            if (from.isValid()) {
                gatewayMappingCommandHandler.handle(from2);
            } else {
                this.gatewayMapper.getMappedGatewayDevice(str, resourceId, extractSpanContext).setHandler2(asyncResult -> {
                    DestinationCommandConsumer client = this.destinationCommandConsumerFactory.getClient(getDeviceKey(str, asyncResult.succeeded() ? (String) asyncResult.result() : resourceId));
                    CommandHandlerWrapper commandHandlerWrapper = null;
                    if (client != null) {
                        commandHandlerWrapper = client.getCommandHandlerOrDefault(resourceId);
                    }
                    if (commandHandlerWrapper != null) {
                        commandHandlerWrapper.handleCommand(from2);
                    } else {
                        this.log.debug("command message is invalid: {}", from);
                        from2.reject(new ErrorCondition(Constants.AMQP_BAD_REQUEST, "malformed command message"));
                    }
                });
            }
        }, str4 -> {
            this.mappingAndDelegatingCommandConsumerFactory.removeClient(str);
        }, str5 -> {
            this.mappingAndDelegatingCommandConsumerFactory.removeClient(str);
        }, atomicReference).map(mappingAndDelegatingCommandConsumer -> {
            return mappingAndDelegatingCommandConsumer;
        });
    }

    private boolean isForcedCommandReroutingSet(CommandContext commandContext) {
        if (!FORCED_COMMAND_REROUTING_ENABLED.booleanValue() || !commandContext.getCommand().isValid()) {
            return false;
        }
        return Boolean.TRUE.equals(MessageHelper.getApplicationProperty(commandContext.getCommand().getCommandMessage().getApplicationProperties(), FORCE_COMMAND_REROUTING_APPLICATION_PROPERTY, Boolean.class));
    }

    private Future<DelegatedCommandSender> createDelegatedCommandSender(String str, String str2) {
        Objects.requireNonNull(str);
        return this.connection.executeOnContext(promise -> {
            this.delegatedCommandSenderFactory.createClient(() -> {
                return DelegatedCommandSenderImpl.create(this.connection, str, str2, null);
            }, promise);
        });
    }

    private void registerLivenessCheck(String str, String str2, Supplier<Collection<CommandHandlerWrapper>> supplier, long j) {
        this.destinationCommandConsumerLivenessChecks.compute(getDeviceKey(str, str2), (str3, livenessCheckData) -> {
            if (livenessCheckData != null) {
                livenessCheckData.setCommandHandlersSupplier(supplier);
                return livenessCheckData;
            }
            return new LivenessCheckData(this.connection.getVertx().setPeriodic(Math.max(2000L, j), newLivenessCheck(str, str2)), supplier);
        });
    }

    Handler<Long> newLivenessCheck(String str, String str2) {
        String deviceKey = getDeviceKey(str, str2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        return l -> {
            LivenessCheckData livenessCheckData = this.destinationCommandConsumerLivenessChecks.get(deviceKey);
            if (this.connection.isShutdown() || livenessCheckData == null) {
                this.connection.getVertx().cancelTimer(l.longValue());
            } else {
                this.connection.isConnected().map(r14 -> {
                    if (this.destinationCommandConsumerFactory.getClient(deviceKey) == null) {
                        if (atomicBoolean.compareAndSet(false, true)) {
                            this.log.debug("trying to re-create destination command consumer [tenant: {}, device-id: {}]", str, str2);
                            Promise promise = Promise.promise();
                            this.destinationCommandConsumerFactory.getOrCreateClient(deviceKey, () -> {
                                return newDestinationCommandConsumer(str, str2);
                            }, promise);
                            promise.future().map(destinationCommandConsumer -> {
                                livenessCheckData.getCommandHandlers().forEach(commandHandlerWrapper -> {
                                    this.log.debug("adding {} to created destination command consumer [tenant: {}, device-id: {}]", commandHandlerWrapper, str, str2);
                                    destinationCommandConsumer.addDeviceSpecificCommandHandler(commandHandlerWrapper);
                                });
                                livenessCheckData.setCommandHandlersSupplier(() -> {
                                    return destinationCommandConsumer.getCommandHandlers();
                                });
                                atomicBoolean.compareAndSet(true, false);
                                return destinationCommandConsumer;
                            }).otherwise(th -> {
                                this.log.info("failed to re-create destination command consumer [tenant: {}, device-id: {}]: {}", str, str2, th.getMessage());
                                return null;
                            }).setHandler2(asyncResult -> {
                                atomicBoolean.compareAndSet(true, false);
                            });
                        } else {
                            this.log.debug("already trying to re-create destination command consumer [tenant: {}, device-id: {}], yielding ...", str, str2);
                        }
                    }
                    if (this.mappingAndDelegatingCommandConsumerFactory.getClient(str) != null) {
                        return null;
                    }
                    if (!atomicBoolean2.compareAndSet(false, true)) {
                        this.log.debug("already trying to re-create MappingAndDelegatingCommandConsumer [tenant: {}], yielding ...", str);
                        return null;
                    }
                    this.log.debug("trying to re-create MappingAndDelegatingCommandConsumer [tenant: {}]", str);
                    getOrCreateMappingAndDelegatingCommandConsumer(str).map(messageConsumer -> {
                        this.log.debug("successfully re-created MappingAndDelegatingCommandConsumer [tenant: {}]", str);
                        return messageConsumer;
                    }).otherwise(th2 -> {
                        this.log.info("failed to re-create MappingAndDelegatingCommandConsumer [tenant: {}]: {}", str, th2.getMessage());
                        return null;
                    }).setHandler2(asyncResult2 -> {
                        atomicBoolean2.compareAndSet(true, false);
                    });
                    return null;
                });
            }
        };
    }

    @Override // org.eclipse.hono.client.CommandConsumerFactory
    public Future<CommandResponseSender> getCommandResponseSender(String str, String str2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        return this.connection.executeOnContext(promise -> {
            CommandResponseSenderImpl.create(this.connection, str, str2, str3 -> {
            }).setHandler2(promise);
        });
    }

    @Override // org.eclipse.hono.client.CommandConsumerFactory
    @Deprecated
    public Future<CommandResponseSender> getLegacyCommandResponseSender(String str, String str2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        return this.connection.executeOnContext(promise -> {
            LegacyCommandResponseSenderImpl.create(this.connection, str, str2, str3 -> {
            }).setHandler2(promise);
        });
    }

    Map<String, LivenessCheckData> getDestinationCommandConsumerLivenessChecks() {
        return this.destinationCommandConsumerLivenessChecks;
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public Future<HonoConnection> connect() {
        Future<HonoConnection> connect = super.connect();
        return CompositeFuture.all(connect, this.gatewayMapper.connect()).map(compositeFuture -> {
            return (HonoConnection) connect.result();
        });
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public void disconnect() {
        super.disconnect();
        this.gatewayMapper.disconnect();
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public void disconnect(Handler<AsyncResult<Void>> handler) {
        Promise promise = Promise.promise();
        super.disconnect(promise);
        Promise promise2 = Promise.promise();
        this.gatewayMapper.disconnect(promise2);
        CompositeFuture.all(promise.future(), promise2.future()).map(compositeFuture -> {
            return (Void) promise.future().result();
        }).setHandler2(handler);
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void addReconnectListener(ReconnectListener<HonoConnection> reconnectListener) {
        super.addReconnectListener(reconnectListener);
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void addDisconnectListener(DisconnectListener<HonoConnection> disconnectListener) {
        super.addDisconnectListener(disconnectListener);
    }
}
