package org.eclipse.hono.client.impl;

import com.fasterxml.jackson.core.JsonLocation;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.log.Fields;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.tracing.MessageAnnotationsInjectAdapter;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.TelemetryConstants;

/* loaded from: input_file:BOOT-INF/lib/hono-client-0.9-M2.jar:org/eclipse/hono/client/impl/TelemetrySenderImpl.class */
public final class TelemetrySenderImpl extends AbstractSender {
    TelemetrySenderImpl(ClientConfigProperties clientConfigProperties, ProtonSender protonSender, String str, String str2, Context context) {
        this(clientConfigProperties, protonSender, str, str2, context, null);
    }

    TelemetrySenderImpl(ClientConfigProperties clientConfigProperties, ProtonSender protonSender, String str, String str2, Context context, Tracer tracer) {
        super(clientConfigProperties, protonSender, str, str2, context, tracer);
    }

    public static String getTargetAddress(String str, String str2) {
        StringBuilder append = new StringBuilder(TelemetryConstants.TELEMETRY_ENDPOINT).append("/").append((String) Objects.requireNonNull(str));
        if (str2 != null && str2.length() > 0) {
            append.append("/").append(str2);
        }
        return append.toString();
    }

    @Override // org.eclipse.hono.client.MessageSender
    public String getEndpoint() {
        return TelemetryConstants.TELEMETRY_ENDPOINT;
    }

    @Override // org.eclipse.hono.client.impl.AbstractSender
    protected String getTo(String str) {
        return getTargetAddress(this.tenantId, str);
    }

    public static void create(Context context, ClientConfigProperties clientConfigProperties, ProtonConnection protonConnection, String str, String str2, Handler<String> handler, Handler<AsyncResult<MessageSender>> handler2, Tracer tracer) {
        Objects.requireNonNull(context);
        Objects.requireNonNull(protonConnection);
        Objects.requireNonNull(str);
        Objects.requireNonNull(handler2);
        String targetAddress = getTargetAddress(str, str2);
        createSender(context, clientConfigProperties, protonConnection, targetAddress, ProtonQoS.AT_LEAST_ONCE, handler).compose(protonSender -> {
            return Future.succeededFuture(new TelemetrySenderImpl(clientConfigProperties, protonSender, str, targetAddress, context, tracer));
        }).setHandler2(handler2);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public Future<ProtonDelivery> sendAndWaitForOutcome(Message message) {
        return sendAndWaitForOutcome(message, null);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public Future<ProtonDelivery> sendAndWaitForOutcome(Message message, SpanContext spanContext) {
        Objects.requireNonNull(message);
        Span startChildSpan = startChildSpan(spanContext, message);
        Tags.MESSAGE_BUS_DESTINATION.set(startChildSpan, this.targetAddress);
        startChildSpan.setTag(MessageHelper.APP_PROPERTY_TENANT_ID, this.tenantId);
        startChildSpan.setTag(MessageHelper.APP_PROPERTY_DEVICE_ID, MessageHelper.getDeviceId(message));
        this.tracer.inject(startChildSpan.context(), Format.Builtin.TEXT_MAP, new MessageAnnotationsInjectAdapter(message));
        if (!isRegistrationAssertionRequired()) {
            MessageHelper.getAndRemoveRegistrationAssertion(message);
        }
        return executeOrRunOnContext(future -> {
            if (!this.sender.sendQueueFull()) {
                sendMessageAndWaitForOutcome(message, startChildSpan).setHandler2(future.completer());
                return;
            }
            ServerErrorException serverErrorException = new ServerErrorException(503, "no credit available");
            logError(startChildSpan, serverErrorException);
            startChildSpan.finish();
            future.fail(serverErrorException);
        });
    }

    @Override // org.eclipse.hono.client.impl.AbstractSender
    protected Future<ProtonDelivery> sendMessage(Message message, Span span) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(span);
        String format = String.format("%s-%d", getClass().getSimpleName(), Long.valueOf(MESSAGE_COUNTER.getAndIncrement()));
        message.setMessageId(format);
        HashMap hashMap = new HashMap(3);
        hashMap.put(TracingHelper.TAG_MESSAGE_ID.getKey(), format);
        hashMap.put(TracingHelper.TAG_CREDIT.getKey(), Integer.valueOf(this.sender.getCredit()));
        hashMap.put(TracingHelper.TAG_QOS.getKey(), this.sender.getQoS().toString());
        span.log(hashMap);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Long valueOf = this.config.getSendMessageTimeout() > 0 ? Long.valueOf(this.context.owner().setTimer(this.config.getSendMessageTimeout(), l -> {
            if (atomicBoolean.compareAndSet(false, true)) {
                ServerErrorException serverErrorException = new ServerErrorException(503, "waiting for delivery update timed out after " + this.config.getSendMessageTimeout() + "ms");
                this.LOG.debug("waiting for delivery update timed out for message [message ID: {}] after {}ms", format, Long.valueOf(this.config.getSendMessageTimeout()));
                TracingHelper.logError(span, serverErrorException.getMessage());
                Tags.HTTP_STATUS.set(span, (Integer) 503);
                span.finish();
            }
        })) : null;
        ProtonDelivery send = this.sender.send(message, protonDelivery -> {
            if (valueOf != null) {
                this.context.owner().cancelTimer(valueOf.longValue());
            }
            DeliveryState remoteState = protonDelivery.getRemoteState();
            if (atomicBoolean.get()) {
                this.LOG.debug("ignoring received delivery update for message [message ID: {}]: waiting for the update has already timed out", format);
            } else if (!protonDelivery.remotelySettled()) {
                this.LOG.warn("peer did not settle message [message ID: {}, remote state: {}]", format, remoteState.getClass().getSimpleName());
                TracingHelper.logError(span, new ServerErrorException(JsonLocation.MAX_CONTENT_SNIPPET, "peer did not settle message, failing delivery"));
            } else if (Accepted.class.isInstance(remoteState)) {
                this.LOG.trace("message [message ID: {}] accepted by peer", format);
                span.log("message accepted by peer");
                Tags.HTTP_STATUS.set(span, (Integer) 202);
            } else {
                HashMap hashMap2 = new HashMap();
                if (Rejected.class.isInstance(remoteState)) {
                    Rejected rejected = (Rejected) protonDelivery.getRemoteState();
                    Tags.HTTP_STATUS.set(span, (Integer) 400);
                    if (rejected.getError() == null) {
                        this.LOG.debug("message [message ID: {}] rejected by peer", format);
                        hashMap2.put(Fields.MESSAGE, "message rejected by peer");
                    } else {
                        this.LOG.debug("message [message ID: {}] rejected by peer: {}, {}", format, rejected.getError().getCondition(), rejected.getError().getDescription());
                        hashMap2.put(Fields.MESSAGE, String.format("message rejected by peer: %s, %s", rejected.getError().getCondition(), rejected.getError().getDescription()));
                    }
                } else if (Released.class.isInstance(remoteState)) {
                    this.LOG.debug("message [message ID: {}] not accepted by peer, remote state: {}", format, remoteState.getClass().getSimpleName());
                    Tags.HTTP_STATUS.set(span, (Integer) 503);
                    hashMap2.put(Fields.MESSAGE, "message not accepted by peer, remote state: " + remoteState);
                } else if (Modified.class.isInstance(remoteState)) {
                    Modified modified = (Modified) protonDelivery.getRemoteState();
                    this.LOG.debug("message [message ID: {}] not accepted by peer, remote state: {}", format, modified);
                    Tags.HTTP_STATUS.set(span, Integer.valueOf(modified.getUndeliverableHere().booleanValue() ? 404 : 503));
                    hashMap2.put(Fields.MESSAGE, "message not accepted by peer, remote state: " + remoteState);
                }
                TracingHelper.logError(span, hashMap2);
            }
            span.finish();
        });
        this.LOG.trace("sent message [ID: {}], remaining credit: {}, queued messages: {}", format, Integer.valueOf(this.sender.getCredit()), Integer.valueOf(this.sender.getQueued()));
        return Future.succeededFuture(send);
    }

    @Override // org.eclipse.hono.client.impl.AbstractSender
    protected Span startSpan(SpanContext spanContext, Message message) {
        if (this.tracer == null) {
            throw new IllegalStateException("no tracer configured");
        }
        Span newFollowingSpan = newFollowingSpan(spanContext, "forward Telemetry data");
        Tags.SPAN_KIND.set(newFollowingSpan, Tags.SPAN_KIND_PRODUCER);
        return newFollowingSpan;
    }

    private Span startChildSpan(SpanContext spanContext, Message message) {
        if (this.tracer == null) {
            throw new IllegalStateException("no tracer configured");
        }
        Span newChildSpan = newChildSpan(spanContext, "forward Telemetry data");
        Tags.SPAN_KIND.set(newChildSpan, Tags.SPAN_KIND_PRODUCER);
        return newChildSpan;
    }
}
