package org.eclipse.hono.cli.app;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import java.util.ArrayList;
import java.util.Objects;
import java.util.function.BiConsumer;
import javax.annotation.PostConstruct;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.util.MessageHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Profile({"receiver"})
@Component
/* loaded from: input_file:BOOT-INF/classes/org/eclipse/hono/cli/app/Receiver.class */
public class Receiver extends AbstractApplicationClient {
    private static final String TYPE_TELEMETRY = "telemetry";
    private static final String TYPE_EVENT = "event";
    private static final String TYPE_ALL = "all";

    @Value("${message.type}")
    protected String messageType;
    private BiConsumer<String, Message> messageHandler = (str, message) -> {
        handleMessage(str, message);
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setMessageHandler(BiConsumer<String, Message> biConsumer) {
        this.messageHandler = (BiConsumer) Objects.requireNonNull(biConsumer);
    }

    @PostConstruct
    Future<CompositeFuture> start() {
        return this.clientFactory.connect().compose(honoConnection -> {
            this.clientFactory.addReconnectListener(this::createConsumer);
            return createConsumer(honoConnection);
        }).setHandler2(this::handleCreateConsumerStatus);
    }

    private CompositeFuture createConsumer(HonoConnection honoConnection) {
        Handler<Void> handler = r8 -> {
            this.log.info("close handler of consumer is called");
            this.vertx.setTimer(this.connectionRetryInterval, l -> {
                this.log.info("attempting to re-open the consumer link ...");
                createConsumer(honoConnection);
            });
        };
        ArrayList arrayList = new ArrayList();
        if (this.messageType.equals("event") || this.messageType.equals(TYPE_ALL)) {
            arrayList.add(this.clientFactory.createEventConsumer(this.tenantId, message -> {
                this.messageHandler.accept("event", message);
            }, handler));
        }
        if (this.messageType.equals("telemetry") || this.messageType.equals(TYPE_ALL)) {
            arrayList.add(this.clientFactory.createTelemetryConsumer(this.tenantId, message2 -> {
                this.messageHandler.accept("telemetry", message2);
            }, handler));
        }
        if (arrayList.isEmpty()) {
            arrayList.add(Future.failedFuture(String.format("Invalid message type [\"%s\"]. Valid types are \"telemetry\", \"event\" or \"all\"", this.messageType)));
        }
        return CompositeFuture.all(arrayList);
    }

    private void handleMessage(String str, Message message) {
        this.log.info("received {} message [device: {}, content-type: {}]: {}", str, MessageHelper.getDeviceId(message), message.getContentType(), MessageHelper.getPayload(message));
        if (message.getApplicationProperties() != null) {
            this.log.info("... with application properties: {}", message.getApplicationProperties().getValue());
        }
    }

    private void handleCreateConsumerStatus(AsyncResult<CompositeFuture> asyncResult) {
        if (asyncResult.succeeded()) {
            this.log.info("Receiver [tenant: {}, mode: {}] created successfully, hit ctrl-c to exit", this.tenantId, this.messageType);
        } else {
            this.log.error("Error occurred during initialization of receiver: {}", asyncResult.cause().getMessage());
            this.vertx.close();
        }
    }
}
