package org.eclipse.hono.client.impl;

import com.fasterxml.jackson.core.JsonLocation;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.cache.ExpiringValueCache;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.RequestResponseClient;
import org.eclipse.hono.client.RequestResponseClientConfigProperties;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CacheDirective;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RequestResponseResult;
import org.eclipse.hono.util.TriTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.1.0-M1.jar:org/eclipse/hono/client/impl/AbstractRequestResponseClient.class */
public abstract class AbstractRequestResponseClient<R extends RequestResponseResult<?>> extends AbstractHonoClient implements RequestResponseClient {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractRequestResponseClient.class);
    private static final int[] CACHEABLE_STATUS_CODES = {ClientConfigProperties.DEFAULT_INITIAL_CREDITS, 203, 206, 300, 301, 410};
    protected final String linkTargetAddress;
    private final Map<Object, TriTuple<Handler<AsyncResult<R>>, Object, Span>> replyMap;
    private Handler<Void> drainHandler;
    private final String replyToAddress;
    private final String tenantId;
    private ExpiringValueCache<Object, R> responseCache;
    private long requestTimeoutMillis;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRequestResponseClient(HonoConnection honoConnection, String str) {
        this(honoConnection, str, UUID.randomUUID().toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRequestResponseClient(HonoConnection honoConnection, String str, String str2) {
        super(honoConnection);
        this.replyMap = new HashMap();
        this.requestTimeoutMillis = honoConnection.getConfig().getRequestTimeout();
        if (str == null) {
            this.linkTargetAddress = getName();
            this.replyToAddress = String.format("%s/%s", getReplyToEndpointName(), str2);
        } else {
            this.linkTargetAddress = String.format("%s/%s", getName(), str);
            this.replyToAddress = String.format("%s/%s/%s", getReplyToEndpointName(), str, str2);
        }
        this.tenantId = str;
    }

    protected AbstractRequestResponseClient(HonoConnection honoConnection, String str, String str2, String str3) {
        super(honoConnection);
        this.replyMap = new HashMap();
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        this.requestTimeoutMillis = honoConnection.getConfig().getRequestTimeout();
        this.linkTargetAddress = String.format("%s/%s/%s", getName(), str, str2);
        this.replyToAddress = String.format("%s/%s/%s/%s", getReplyToEndpointName(), str, str2, str3);
        this.tenantId = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRequestResponseClient(HonoConnection honoConnection, String str, ProtonSender protonSender, ProtonReceiver protonReceiver) {
        this(honoConnection, str);
        this.sender = (ProtonSender) Objects.requireNonNull(protonSender);
        this.receiver = (ProtonReceiver) Objects.requireNonNull(protonReceiver);
    }

    public final void setResponseCache(ExpiringValueCache<Object, R> expiringValueCache) {
        this.responseCache = expiringValueCache;
        LOG.info("enabling caching of responses from {}", getName());
    }

    protected final long getResponseCacheDefaultTimeout() {
        if (this.connection.getConfig() instanceof RequestResponseClientConfigProperties) {
            return ((RequestResponseClientConfigProperties) this.connection.getConfig()).getResponseCacheDefaultTimeout();
        }
        return 600L;
    }

    @Override // org.eclipse.hono.client.RequestResponseClient
    public final void setRequestTimeout(long j) {
        if (j < 0) {
            throw new IllegalArgumentException("request timeout must be >= 0");
        }
        this.requestTimeoutMillis = j;
    }

    @Override // org.eclipse.hono.client.CreditBasedSender
    public final int getCredit() {
        if (this.sender == null) {
            return 0;
        }
        return this.sender.getCredit();
    }

    @Override // org.eclipse.hono.client.CreditBasedSender
    public final void sendQueueDrainHandler(Handler<Void> handler) {
        if (this.drainHandler != null) {
            throw new IllegalStateException("already waiting for replenishment with credit");
        }
        this.drainHandler = (Handler) Objects.requireNonNull(handler);
        this.sender.sendQueueDrainHandler(protonSender -> {
            LOG.trace("command client has received FLOW [credits: {}, queued:{}]", Integer.valueOf(protonSender.getCredit()), Integer.valueOf(protonSender.getQueued()));
            Handler<Void> handler2 = this.drainHandler;
            this.drainHandler = null;
            if (handler2 != null) {
                handler2.handle(null);
            }
        });
    }

    protected abstract String getName();

    protected String getReplyToEndpointName() {
        return getName();
    }

    protected abstract String createMessageId();

    protected abstract R getResult(int i, String str, Buffer buffer, CacheDirective cacheDirective, ApplicationProperties applicationProperties);

    protected String getDefaultMessageTargetAddress() {
        return this.linkTargetAddress;
    }

    protected final Future<Void> createLinks() {
        return createLinks(null, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Future<Void> createLinks(Handler<String> handler, Handler<String> handler2) {
        return createReceiver(this.replyToAddress, handler2).compose(protonReceiver -> {
            this.receiver = protonReceiver;
            return createSender(this.linkTargetAddress, handler);
        }).compose(protonSender -> {
            LOG.debug("request-response client for peer [{}] created", this.connection.getConfig().getHost());
            this.sender = protonSender;
            return Future.succeededFuture();
        });
    }

    private Future<ProtonSender> createSender(String str, Handler<String> handler) {
        return this.connection.createSender(str, ProtonQoS.AT_LEAST_ONCE, handler);
    }

    private Future<ProtonReceiver> createReceiver(String str, Handler<String> handler) {
        return this.connection.createReceiver(str, ProtonQoS.AT_LEAST_ONCE, this::handleResponse, handler);
    }

    protected final void handleResponse(ProtonDelivery protonDelivery, Message message) {
        TriTuple<Handler<AsyncResult<R>>, Object, Span> remove = this.replyMap.remove(message.getCorrelationId());
        if (remove == null) {
            LOG.debug("discarding unexpected response [reply-to: {}, correlation ID: {}]", this.replyToAddress, message.getCorrelationId());
            ProtonHelper.rejected(protonDelivery, true);
            return;
        }
        R requestResponseResult = getRequestResponseResult(message);
        Span three = remove.three();
        if (requestResponseResult == null) {
            LOG.debug("discarding malformed response [reply-to: {}, correlation ID: {}]", this.replyToAddress, message.getCorrelationId());
            remove.one().handle(Future.failedFuture(new ServerErrorException(JsonLocation.MAX_CONTENT_SNIPPET, "cannot process response from service [" + getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX)));
            ProtonHelper.released(protonDelivery, true);
        } else {
            LOG.debug("received response [reply-to: {}, subject: {}, correlation ID: {}, status: {}]", this.replyToAddress, message.getSubject(), message.getCorrelationId(), Integer.valueOf(requestResponseResult.getStatus()));
            addToCache(remove.two(), requestResponseResult);
            if (three != null) {
                three.log("response from peer accepted");
            }
            remove.one().handle(Future.succeededFuture(requestResponseResult));
            ProtonHelper.accepted(protonDelivery, true);
        }
    }

    protected final void cancelRequest(Object obj, AsyncResult<R> asyncResult) {
        Objects.requireNonNull(obj);
        Objects.requireNonNull(asyncResult);
        if (asyncResult.succeeded()) {
            throw new IllegalArgumentException("result must be failed");
        }
        TriTuple<Handler<AsyncResult<R>>, Object, Span> remove = this.replyMap.remove(obj);
        if (remove == null) {
            return;
        }
        LOG.debug("canceling request [target: {}, correlation ID: {}]: {}", this.linkTargetAddress, obj, asyncResult.cause().getMessage());
        remove.one().handle(asyncResult);
    }

    private R getRequestResponseResult(Message message) {
        Integer status = MessageHelper.getStatus(message);
        if (status == null) {
            LOG.debug("response message has no status code application property [reply-to: {}, correlation ID: {}]", this.replyToAddress, message.getCorrelationId());
            return null;
        }
        return getResult(status.intValue(), message.getContentType(), MessageHelper.getPayload(message), CacheDirective.from(MessageHelper.getCacheDirective(message)), message.getApplicationProperties());
    }

    private Message createMessage(String str, String str2, Map<String, Object> map) {
        Objects.requireNonNull(str);
        Message message = ProtonHelper.message();
        String createMessageId = createMessageId();
        AbstractHonoClient.setApplicationProperties(message, map);
        message.setAddress(str2);
        message.setReplyTo(this.replyToAddress);
        message.setMessageId(createMessageId);
        message.setSubject(str);
        return message;
    }

    protected final void createAndSendRequest(String str, Buffer buffer, Handler<AsyncResult<R>> handler) {
        createAndSendRequest(str, (Map<String, Object>) null, buffer, handler);
    }

    protected final void createAndSendRequest(String str, Buffer buffer, Handler<AsyncResult<R>> handler, Span span) {
        createAndSendRequest(str, (Map<String, Object>) null, buffer, (String) null, handler, (Object) null, span);
    }

    protected final void createAndSendRequest(String str, Buffer buffer, Handler<AsyncResult<R>> handler, Object obj) {
        createAndSendRequest(str, null, buffer, handler, obj, null);
    }

    protected final void createAndSendRequest(String str, Map<String, Object> map, Buffer buffer, Handler<AsyncResult<R>> handler) {
        createAndSendRequest(str, map, buffer, handler, null, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void createAndSendRequest(String str, Map<String, Object> map, Buffer buffer, Handler<AsyncResult<R>> handler, Object obj, SpanContext spanContext) {
        createAndSendRequest(str, map, buffer, "application/json", handler, obj, spanContext);
    }

    protected final void createAndSendRequest(String str, Map<String, Object> map, Buffer buffer, String str2, Handler<AsyncResult<R>> handler, Object obj, SpanContext spanContext) {
        Span newChildSpan = newChildSpan(spanContext, "invoke '" + str + "' on " + getName() + " endpoint");
        createAndSendRequest(str, map, buffer, str2, asyncResult -> {
            if (asyncResult.failed()) {
                Tags.HTTP_STATUS.set(newChildSpan, Integer.valueOf(ServiceInvocationException.extractStatusCode(asyncResult.cause())));
                TracingHelper.logError(newChildSpan, asyncResult.cause());
            } else if (asyncResult.result() != null) {
                Tags.HTTP_STATUS.set(newChildSpan, Integer.valueOf(((RequestResponseResult) asyncResult.result()).getStatus()));
                if (((RequestResponseResult) asyncResult.result()).isError()) {
                    Tags.ERROR.set(newChildSpan, Boolean.TRUE);
                }
            } else {
                Tags.HTTP_STATUS.set(newChildSpan, (Integer) 202);
            }
            newChildSpan.finish();
            handler.handle(asyncResult);
        }, obj, newChildSpan);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void createAndSendRequest(String str, Map<String, Object> map, Buffer buffer, String str2, Handler<AsyncResult<R>> handler, Object obj, Span span) {
        createAndSendRequest(str, getDefaultMessageTargetAddress(), map, buffer, str2, handler, obj, span);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void createAndSendRequest(String str, String str2, Map<String, Object> map, Buffer buffer, String str3, Handler<AsyncResult<R>> handler, Object obj, Span span) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(handler);
        Objects.requireNonNull(span);
        if (!isOpen()) {
            handler.handle(Future.failedFuture(new ServerErrorException(503, "sender and/or receiver link is not open")));
            return;
        }
        Message createMessage = createMessage(str, str2, map);
        MessageHelper.setPayload(createMessage, str3, buffer);
        sendRequest(createMessage, handler, obj, span);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void sendRequest(Message message, Handler<AsyncResult<R>> handler, Object obj, Span span) {
        String address = message.getAddress() != null ? message.getAddress() : getDefaultMessageTargetAddress();
        Tags.MESSAGE_BUS_DESTINATION.set(span, address);
        Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_CLIENT);
        Tags.HTTP_METHOD.set(span, message.getSubject());
        if (this.tenantId != null) {
            span.setTag(MessageHelper.APP_PROPERTY_TENANT_ID, this.tenantId);
        }
        this.connection.executeOnContext(promise -> {
            if (this.sender.sendQueueFull()) {
                LOG.debug("cannot send request to peer, no credit left for link [link target: {}]", this.linkTargetAddress);
                handler.handle(Future.failedFuture(new ServerErrorException(503, "no credit available for sending request")));
                return;
            }
            HashMap hashMap = new HashMap(3);
            Object orElse = Optional.ofNullable(message.getCorrelationId()).orElse(message.getMessageId());
            if (orElse instanceof String) {
                hashMap.put(TracingHelper.TAG_CORRELATION_ID.getKey(), orElse);
            }
            hashMap.put(TracingHelper.TAG_CREDIT.getKey(), Integer.valueOf(this.sender.getCredit()));
            hashMap.put(TracingHelper.TAG_QOS.getKey(), this.sender.getQoS().toString());
            span.log(hashMap);
            TriTuple<Handler<AsyncResult<R>>, Object, Span> of = TriTuple.of(handler, obj, span);
            TracingHelper.injectSpanContext(this.connection.getTracer(), span.context(), message);
            this.replyMap.put(orElse, of);
            this.sender.send(message, protonDelivery -> {
                Promise promise = Promise.promise();
                DeliveryState remoteState = protonDelivery.getRemoteState();
                if (Rejected.class.isInstance(remoteState)) {
                    Rejected rejected = (Rejected) remoteState;
                    if (rejected.getError() != null) {
                        LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}]: {}", address, message.getSubject(), orElse, rejected.getError());
                        promise.fail(StatusCodeMapper.from(rejected.getError()));
                        cancelRequest(orElse, promise.future());
                        return;
                    } else {
                        LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}]", address, message.getSubject(), orElse);
                        promise.fail(new ClientErrorException(400));
                        cancelRequest(orElse, promise.future());
                        return;
                    }
                }
                if (Accepted.class.isInstance(remoteState)) {
                    LOG.trace("service has accepted request [target address: {}, subject: {}, correlation ID: {}]", address, message.getSubject(), orElse);
                    span.log("request accepted by peer");
                    if (message.getReplyTo() == null) {
                        this.replyMap.remove(orElse);
                        handler.handle(Future.succeededFuture());
                        return;
                    }
                    return;
                }
                if (Released.class.isInstance(remoteState)) {
                    LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}], remote state: {}", address, message.getSubject(), orElse, remoteState);
                    promise.fail(new ServerErrorException(503));
                    cancelRequest(orElse, promise.future());
                } else if (Modified.class.isInstance(remoteState)) {
                    LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}], remote state: {}", address, message.getSubject(), orElse, remoteState);
                    promise.fail(((Modified) protonDelivery.getRemoteState()).getUndeliverableHere().booleanValue() ? new ClientErrorException(404) : new ServerErrorException(503));
                    cancelRequest(orElse, promise.future());
                }
            });
            if (this.requestTimeoutMillis > 0) {
                this.connection.getVertx().setTimer(this.requestTimeoutMillis, l -> {
                    cancelRequest(orElse, Future.failedFuture(new ServerErrorException(503, "request timed out after " + this.requestTimeoutMillis + "ms")));
                });
            }
            if (LOG.isDebugEnabled()) {
                String deviceId = MessageHelper.getDeviceId(message);
                if (deviceId == null) {
                    LOG.debug("sent request [target address: {}, subject: {}, correlation ID: {}] to service", address, message.getSubject(), orElse);
                } else {
                    LOG.debug("sent request [target address: {}, subject: {}, correlation ID: {}, device ID: {}] to service", address, message.getSubject(), orElse, deviceId);
                }
            }
        }).otherwise(th -> {
            TracingHelper.logError(span, "not connected");
            handler.handle(Future.failedFuture(new ServerErrorException(503, "not connected")));
            return null;
        });
    }

    @Override // org.eclipse.hono.client.RequestResponseClient
    public final boolean isOpen() {
        return this.sender != null && this.sender.isOpen() && this.receiver != null && this.receiver.isOpen();
    }

    @Override // org.eclipse.hono.client.RequestResponseClient
    public final void close(Handler<AsyncResult<Void>> handler) {
        LOG.debug("closing request-response client ...");
        closeLinks(r4 -> {
            if (handler != null) {
                handler.handle(Future.succeededFuture());
            }
        });
    }

    protected final boolean isCachingEnabled() {
        return this.responseCache != null;
    }

    protected Future<R> getResponseFromCache(Object obj) {
        return getResponseFromCache(obj, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Future<R> getResponseFromCache(Object obj, Span span) {
        if (this.responseCache == null) {
            return Future.failedFuture(new IllegalStateException("no cache configured"));
        }
        R r = this.responseCache.get(obj);
        if (span != null) {
            TracingHelper.TAG_CACHE_HIT.set(span, Boolean.valueOf(r != null));
        }
        return r == null ? Future.failedFuture("cache miss") : Future.succeededFuture(r);
    }

    protected final void addToCache(Object obj, R r) {
        Objects.requireNonNull(r);
        if (this.responseCache == null || obj == null) {
            return;
        }
        CacheDirective cacheDirective = (CacheDirective) Optional.ofNullable(r.getCacheDirective()).orElseGet(() -> {
            return isCacheableStatusCode(r.getStatus()) ? CacheDirective.maxAgeDirective(getResponseCacheDefaultTimeout()) : CacheDirective.noCacheDirective();
        });
        if (!cacheDirective.isCachingAllowed() || cacheDirective.getMaxAge() <= 0) {
            return;
        }
        this.responseCache.put((ExpiringValueCache<Object, R>) obj, r, Duration.ofSeconds(cacheDirective.getMaxAge()));
    }

    private boolean isCacheableStatusCode(int i) {
        return Arrays.binarySearch(CACHEABLE_STATUS_CODES, i) >= 0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getTenantId() {
        return this.tenantId;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean isSuccessResponse(int i, String str, Buffer buffer) {
        return StatusCodeMapper.isSuccessful(Integer.valueOf(i)) && buffer != null && "application/json".equalsIgnoreCase(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final <T> Future<T> mapResultAndFinishSpan(Future<R> future, Function<R, T> function, Span span) {
        return (Future<T>) future.recover(th -> {
            Tags.HTTP_STATUS.set(span, Integer.valueOf(ServiceInvocationException.extractStatusCode(th)));
            TracingHelper.logError(span, th);
            span.finish();
            return Future.failedFuture(th);
        }).map(requestResponseResult -> {
            if (requestResponseResult != null) {
                Tags.HTTP_STATUS.set(span, Integer.valueOf(requestResponseResult.getStatus()));
                if (requestResponseResult.isError()) {
                    Tags.ERROR.set(span, Boolean.TRUE);
                }
            } else {
                Tags.HTTP_STATUS.set(span, (Integer) 202);
            }
            span.finish();
            return function.apply(requestResponseResult);
        });
    }
}
