package org.eclipse.hono.client.impl;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import java.util.Objects;
import org.eclipse.hono.client.DisconnectListener;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.client.DownstreamSenderFactory;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.ReconnectListener;
import org.eclipse.hono.util.Constants;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.3.0-M3.jar:org/eclipse/hono/client/impl/DownstreamSenderFactoryImpl.class */
public class DownstreamSenderFactoryImpl extends AbstractHonoClientFactory implements DownstreamSenderFactory {
    private final CachingClientFactory<DownstreamSender> clientFactory;

    public DownstreamSenderFactoryImpl(HonoConnection honoConnection) {
        super(honoConnection);
        this.clientFactory = new CachingClientFactory<>(honoConnection.getVertx(), downstreamSender -> {
            return downstreamSender.isOpen();
        });
        honoConnection.getVertx().eventBus().consumer(Constants.EVENT_BUS_ADDRESS_TENANT_TIMED_OUT, this::handleTenantTimeout);
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory
    protected void onDisconnect() {
        this.clientFactory.clearState();
    }

    @Override // org.eclipse.hono.client.DownstreamSenderFactory
    public final Future<DownstreamSender> getOrCreateTelemetrySender(String str) {
        Objects.requireNonNull(str);
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r6 -> {
            return this.connection.executeOnContext(promise -> {
                this.clientFactory.getOrCreateClient(TelemetrySenderImpl.getTargetAddress(str, null), () -> {
                    return TelemetrySenderImpl.create(this.connection, str, str2 -> {
                        this.clientFactory.removeClient(TelemetrySenderImpl.getTargetAddress(str, null));
                    });
                }, promise);
            });
        });
    }

    @Override // org.eclipse.hono.client.DownstreamSenderFactory
    public final Future<DownstreamSender> getOrCreateEventSender(String str) {
        Objects.requireNonNull(str);
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r6 -> {
            return this.connection.executeOnContext(promise -> {
                this.clientFactory.getOrCreateClient(EventSenderImpl.getTargetAddress(str, null), () -> {
                    return EventSenderImpl.create(this.connection, str, str2 -> {
                        this.clientFactory.removeClient(EventSenderImpl.getTargetAddress(str, null));
                    });
                }, promise);
            });
        });
    }

    private void handleTenantTimeout(Message<String> message) {
        String targetAddress = TelemetrySenderImpl.getTargetAddress(message.body(), null);
        DownstreamSender client = this.clientFactory.getClient(targetAddress);
        if (client != null) {
            client.close(asyncResult -> {
                this.clientFactory.removeClient(targetAddress);
            });
        }
        String targetAddress2 = EventSenderImpl.getTargetAddress(message.body(), null);
        DownstreamSender client2 = this.clientFactory.getClient(targetAddress2);
        if (client2 != null) {
            client2.close(asyncResult2 -> {
                this.clientFactory.removeClient(targetAddress2);
            });
        }
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void disconnect(Handler handler) {
        super.disconnect(handler);
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void disconnect() {
        super.disconnect();
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ Future<HonoConnection> connect() {
        return super.connect();
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void addReconnectListener(ReconnectListener<HonoConnection> reconnectListener) {
        super.addReconnectListener(reconnectListener);
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void addDisconnectListener(DisconnectListener<HonoConnection> disconnectListener) {
        super.addDisconnectListener(disconnectListener);
    }
}
