package cz.integsoft.mule.ilm.internal.component.persistence;

import cz.integsoft.mule.ilm.api.exception.GenericLoggingException;
import cz.integsoft.mule.ilm.api.exception.InitializationException;
import cz.integsoft.mule.ilm.api.persistence.LogPersistenceParameter;
import cz.integsoft.mule.ilm.api.persistence.LogPersistentStrategy;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/integsoft/mule/ilm/internal/component/persistence/InMemoryLogPersistenceStrategy.class */
public class InMemoryLogPersistenceStrategy implements LogPersistentStrategy<String> {
    private static final int aE = 5;
    private final int aF;
    private static final Logger aG = LoggerFactory.getLogger(InMemoryLogPersistenceStrategy.class);
    private boolean aH;
    private ExecutorService aJ;
    private final ConcurrentHashMap<String, LinkedBlockingQueue<String>> aI = new ConcurrentHashMap<>();
    private boolean aK = true;
    private int aL = 10;
    private final ConcurrentHashMap<String, AtomicLong> aM = new ConcurrentHashMap<>();
    private final ReentrantLock aN = new ReentrantLock(true);

    public InMemoryLogPersistenceStrategy(int i) {
        this.aF = i;
        if (i == 0 || i < -1) {
            throw new IllegalArgumentException("Invalid max-queue-size value! " + i);
        }
    }

    @Override // cz.integsoft.mule.ilm.api.persistence.LogPersistentStrategy
    public boolean persist(String str, String str2, String str3) {
        if (str3 == null) {
            return false;
        }
        aG.debug("Persisting log message for config {}, connection ID: {}", str, str2);
        return this.aI.computeIfAbsent(str2, str4 -> {
            return h();
        }).offer(str3);
    }

    @Override // cz.integsoft.mule.ilm.api.persistence.LogPersistentStrategy
    public boolean flush(String str, String str2, Consumer<String> consumer) throws GenericLoggingException {
        long decrementAndGet;
        long j;
        LinkedBlockingQueue<String> computeIfAbsent = this.aI.computeIfAbsent(str2, str3 -> {
            return h();
        });
        if (computeIfAbsent.isEmpty()) {
            return true;
        }
        if (b(str, str2, computeIfAbsent.size())) {
            aG.warn("Flushing is locked by another thread for conenction ID {} and configuration name {}", str2, str);
            return false;
        }
        AtomicInteger atomicInteger = new AtomicInteger(0);
        aG.debug("Flushing the log persistence queue with {} records. Connection ID: {}", Integer.valueOf(computeIfAbsent.size()), str2);
        while (computeIfAbsent.size() != 0 && b(str, str2).intValue() > 0) {
            String poll = computeIfAbsent.poll();
            if (poll == null || atomicInteger.intValue() >= this.aL) {
                aG.warn("Sending threshold of {} errors has been exceeded!", Integer.valueOf(this.aL));
                try {
                    computeIfAbsent.offer(poll);
                } catch (Exception e) {
                    aG.warn("Failed to re-offer log message to the queueu for connection ID {}: {}", str2, e.getLocalizedMessage());
                }
                if (b(str, str2).decrementAndGet() <= 0) {
                    c(str, str2);
                } else {
                    aG.trace("Still flushing, messages left for connection ID {}: {}", str2, Integer.valueOf(this.aM.get(a(str, str2)).intValue()));
                }
            } else {
                Consumer consumer2 = str4 -> {
                    try {
                        consumer.accept(str4);
                        atomicInteger.set(0);
                    } catch (Exception e2) {
                        try {
                            computeIfAbsent.offer(str4);
                        } catch (Exception e3) {
                            aG.warn("Failed to re-offer log message to the queueu for connection ID {}: {}", str2, e3.getLocalizedMessage());
                        }
                        atomicInteger.incrementAndGet();
                        aG.error("Failed to log from persistence queue! The log message will not be sent again. Connection ID: " + str2 + " Error: " + e2.getLocalizedMessage(), e2);
                    }
                };
                if (this.aK) {
                    try {
                        try {
                            CompletableFuture.runAsync(() -> {
                                consumer2.accept(poll);
                            }, this.aJ).get();
                            if (b(str, str2).decrementAndGet() <= 0) {
                                c(str, str2);
                            }
                        } catch (InterruptedException | ExecutionException e2) {
                            aG.warn("Completing send function call has failed: " + e2.getLocalizedMessage(), e2);
                            if (b(str, str2).decrementAndGet() <= 0) {
                                c(str, str2);
                            }
                        }
                    } finally {
                        if (b(str, str2).decrementAndGet() <= 0) {
                            c(str, str2);
                        }
                    }
                } else {
                    try {
                        consumer2.accept(poll);
                        if (b(str, str2).decrementAndGet() <= 0) {
                            c(str, str2);
                        } else {
                            aG.trace("Still flushing, messages left for connection ID {}: {}", str2, Integer.valueOf(this.aM.get(a(str, str2)).intValue()));
                        }
                    } catch (Throwable th) {
                        if ((decrementAndGet > j ? 1 : (decrementAndGet == j ? 0 : -1)) > 0) {
                            aG.trace("Still flushing, messages left for connection ID {}: {}", str2, Integer.valueOf(this.aM.get(a(str, str2)).intValue()));
                        }
                        throw th;
                    }
                }
            }
        }
        return true;
    }

    @Override // cz.integsoft.mule.ilm.api.persistence.LogPersistentStrategy
    public int getMaximumQueueSize() {
        return this.aF;
    }

    @Override // cz.integsoft.mule.ilm.api.persistence.LogPersistentStrategy
    public int size() {
        return this.aI.values().stream().mapToInt(linkedBlockingQueue -> {
            return linkedBlockingQueue.size();
        }).sum();
    }

    @Override // cz.integsoft.mule.ilm.api.persistence.LogPersistentStrategy
    public void initialize(String str, List<LogPersistenceParameter> list) throws InitializationException {
        aG.debug("Initializing strategy {} with parameters {}", this, list);
        this.aI.forEach((str2, linkedBlockingQueue) -> {
            linkedBlockingQueue.clear();
        });
        this.aI.clear();
        this.aH = false;
        Optional<LogPersistenceParameter> findFirst = list.parallelStream().filter(logPersistenceParameter -> {
            return LogPersistentStrategy.ASYNC_PARAM_NAME.equalsIgnoreCase(logPersistenceParameter.getKey());
        }).findFirst();
        if (findFirst.isPresent()) {
            this.aK = Boolean.parseBoolean(findFirst.get().getValue());
        }
        if (this.aK) {
            Optional<LogPersistenceParameter> findFirst2 = list.parallelStream().filter(logPersistenceParameter2 -> {
                return LogPersistentStrategy.MAX_WORKERS_PARAM_NAME.equalsIgnoreCase(logPersistenceParameter2.getKey());
            }).findFirst();
            if (findFirst2.isPresent()) {
                this.aJ = Executors.newFixedThreadPool(Integer.parseInt(findFirst2.get().getValue()));
            } else {
                this.aJ = Executors.newFixedThreadPool(5);
            }
        }
        Optional<LogPersistenceParameter> findFirst3 = list.parallelStream().filter(logPersistenceParameter3 -> {
            return LogPersistentStrategy.SEND_ERROR_THRESHOLD_PARAM_NAME.equalsIgnoreCase(logPersistenceParameter3.getKey());
        }).findFirst();
        if (findFirst3.isPresent()) {
            this.aL = Integer.parseInt(findFirst3.get().getValue());
        }
    }

    @Override // cz.integsoft.mule.ilm.api.persistence.LogPersistentStrategy
    public void dispose() {
        aG.debug("Disposing strategy: {}", this);
        this.aI.forEach((str, linkedBlockingQueue) -> {
            linkedBlockingQueue.clear();
        });
        this.aI.clear();
        this.aH = true;
    }

    @Override // cz.integsoft.mule.ilm.api.persistence.LogPersistentStrategy
    public boolean isDisposed() {
        return this.aH;
    }

    private String a(String str, String str2) {
        return str + "_" + str2;
    }

    private AtomicLong a(String str, String str2, int i) {
        return this.aM.putIfAbsent(a(str, str2), new AtomicLong(i));
    }

    private AtomicLong b(String str, String str2) {
        return this.aM.getOrDefault(a(str, str2), new AtomicLong(0L));
    }

    private AtomicLong c(String str, String str2) {
        return this.aM.remove(a(str, str2));
    }

    private boolean b(String str, String str2, int i) {
        try {
            this.aN.lock();
            if (this.aM.containsKey(a(str, str2))) {
                aG.debug("Flush semaphore is present with count: {}", Integer.valueOf(this.aM.get(a(str, str2)).intValue()));
                this.aN.unlock();
                return true;
            }
            a(str, str2, i);
            this.aN.unlock();
            return false;
        } catch (Throwable th) {
            this.aN.unlock();
            throw th;
        }
    }

    private synchronized LinkedBlockingQueue<String> h() {
        return this.aF == -1 ? new LinkedBlockingQueue<>() : new LinkedBlockingQueue<>(this.aF);
    }
}
