/*
 * Decompiled with CFR 0.152.
 */
package com.github.rjeschke.neetutils.concurrent;

import com.github.rjeschke.neetutils.SysUtils;
import com.github.rjeschke.neetutils.collections.Colls;
import com.github.rjeschke.neetutils.concurrent.MapWorker;
import com.github.rjeschke.neetutils.concurrent.MapWorkerCallback;
import com.github.rjeschke.neetutils.concurrent.RequeueWatcher;
import com.github.rjeschke.neetutils.concurrent.RequeueWatcherCallback;
import com.github.rjeschke.neetutils.concurrent.ThreadPool;
import com.github.rjeschke.neetutils.concurrent.WorkerStatus;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;

public class MapWorkerPool<A, B>
implements Runnable,
RequeueWatcherCallback<Job<A, B>, ThreadWorker<A, B>> {
    private final int numThreads;
    private final int queueLimit;
    private final boolean serialCallbacks;
    private final MapWorkerCallback<A, B> callback;
    private final ConcurrentLinkedQueue<ThreadWorker<A, B>> workers = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<Job<A, B>> jobs = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<WorkerResult<A, B>> results = new ConcurrentLinkedQueue();
    private final Semaphore resultSync = new Semaphore(1);
    private final Thread[] threads;
    private Thread callbackThread = null;
    private RequeueWatcher<Job<A, B>, ThreadWorker<A, B>> watcher;

    private MapWorkerPool(MapWorkerCallback<A, B> callback, int threads, int queueLimit, boolean serialCallbacks) {
        this.callback = callback;
        this.numThreads = threads;
        this.serialCallbacks = serialCallbacks;
        this.queueLimit = Math.max(0, queueLimit);
        this.threads = new Thread[threads];
    }

    public static <A, B> MapWorkerPool<A, B> start(MapWorkerCallback<A, B> callback, int threads, int queueLimit, boolean serialCallbacks) {
        MapWorkerPool<A, B> jobber = new MapWorkerPool<A, B>(callback, ThreadPool.defaultThreadcount(threads), queueLimit, serialCallbacks);
        for (int i = 0; i < jobber.threads.length; ++i) {
            ThreadWorker<A, B> w = new ThreadWorker<A, B>(jobber);
            Thread t = new Thread(w);
            t.setDaemon(true);
            jobber.workers.offer(w);
            t.start();
            jobber.threads[i] = t;
        }
        if (serialCallbacks) {
            jobber.resultSync.acquireUninterruptibly();
            Thread t = new Thread(jobber);
            t.setDaemon(true);
            t.start();
            jobber.callbackThread = t;
        }
        jobber.watcher = RequeueWatcher.start(jobber, jobber.jobs, jobber.workers);
        return jobber;
    }

    public static <A, B> List<B> processCollection(MapWorker<A, B> worker, int threads, Iterable<A> input) {
        int usedThreads = ThreadPool.defaultThreadcount(threads);
        ProcessCollectionMapWorkerCallback callback = new ProcessCollectionMapWorkerCallback();
        MapWorkerPool<A, B> pool = MapWorkerPool.start(callback, usedThreads, usedThreads * 4, true);
        for (A a : input) {
            pool.enqueue(worker, a);
        }
        pool.stop();
        return callback.outputList;
    }

    public static final int availableProcessors() {
        return Runtime.getRuntime().availableProcessors();
    }

    public int threadCount() {
        return this.numThreads;
    }

    public void enqueue(MapWorker<A, B> worker, A object) {
        if (worker == null) {
            throw new NullPointerException("A null Worker is not permitted");
        }
        ThreadWorker<A, B> w = this.workers.poll();
        Job<A, B> job = new Job<A, B>(worker, object);
        if (w != null) {
            w.setWorkLoad(job);
        } else {
            if (this.queueLimit != 0 && this.jobs.size() >= this.queueLimit) {
                int ql = Math.max(this.queueLimit >> 1, 1);
                while (this.jobs.size() > ql) {
                    SysUtils.fineSleep(5L);
                }
            }
            this.jobs.offer(job);
        }
    }

    private void reuseOrEnqueue(ThreadWorker<A, B> w) {
        Job<A, B> job = this.jobs.poll();
        if (job != null) {
            w.setWorkLoad(job);
        } else {
            this.workers.offer(w);
        }
    }

    void doCallback(ThreadWorker<A, B> threadWorker, MapWorker<A, B> worker, WorkerStatus status, A input, B output) {
        if (this.serialCallbacks) {
            this.results.offer(new WorkerResult<A, B>(worker, status, input, output));
            this.resultSync.release();
        } else {
            try {
                this.callback.workerCallback(this, worker, status, input, output);
            }
            catch (Throwable t) {
                // empty catch block
            }
        }
        this.reuseOrEnqueue(threadWorker);
    }

    public boolean hasWork() {
        return !this.jobs.isEmpty();
    }

    public void join() {
        while (!this.jobs.isEmpty() || this.serialCallbacks && !this.results.isEmpty()) {
            SysUtils.sleep(10L);
        }
    }

    public void stop() {
        int i;
        StopWorker stop = new StopWorker();
        this.join();
        for (i = 0; i < this.numThreads; ++i) {
            this.enqueue(stop, null);
        }
        for (i = 0; i < this.numThreads; ++i) {
            SysUtils.threadJoin(this.threads[i]);
        }
        if (this.callbackThread != null) {
            SysUtils.sleep(50L);
            this.results.offer(new WorkerResult<Object, Object>(null, null, null, null));
            this.resultSync.release();
            SysUtils.threadJoin(this.callbackThread);
        }
        this.watcher.stop();
    }

    @Override
    public void run() {
        block2: while (true) {
            try {
                while (true) {
                    this.resultSync.acquireUninterruptibly();
                    WorkerResult<A, B> r = this.results.poll();
                    if (r.status == null) break block2;
                    this.callback.workerCallback(this, r.worker, r.status, r.input, r.output);
                }
            }
            catch (Throwable throwable) {
                continue;
            }
            break;
        }
    }

    @Override
    public void requeue(ThreadWorker<A, B> worker, Job<A, B> job) {
        worker.setWorkLoad(job);
    }

    static class ProcessCollectionMapWorkerCallback<A, B>
    implements MapWorkerCallback<A, B> {
        final List<B> outputList = Colls.list();

        @Override
        public void workerCallback(MapWorkerPool<A, B> pool, MapWorker<A, B> worker, WorkerStatus status, A input, B output) {
            this.outputList.add(output);
        }
    }

    static class StopWorker<A, B>
    implements MapWorker<A, B> {
        @Override
        public B run(A object) {
            return null;
        }
    }

    static class Job<A, B> {
        public final MapWorker<A, B> worker;
        public final A input;

        public Job(MapWorker<A, B> worker, A input) {
            this.worker = worker;
            this.input = input;
        }
    }

    static class ThreadWorker<A, B>
    implements Runnable {
        private final Semaphore sync = new Semaphore(1);
        private final MapWorkerPool<A, B> pool;
        private volatile Job<A, B> workload = null;

        public ThreadWorker(MapWorkerPool<A, B> pool) {
            this.sync.acquireUninterruptibly();
            this.pool = pool;
        }

        protected void setWorkLoad(Job<A, B> job) {
            this.workload = job;
            this.sync.release();
        }

        @Override
        public void run() {
            while (true) {
                boolean ok = true;
                Throwable ta = null;
                Object output = null;
                try {
                    this.sync.acquireUninterruptibly();
                    if (this.workload.worker instanceof StopWorker) break;
                    output = this.workload.worker.run(this.workload.input);
                }
                catch (Throwable t) {
                    ta = t;
                    ok = false;
                }
                this.pool.doCallback(this, this.workload.worker, ok ? WorkerStatus.OK : new WorkerStatus(ta), this.workload.input, output);
            }
        }
    }

    static class WorkerResult<A, B> {
        final MapWorker<A, B> worker;
        final WorkerStatus status;
        final A input;
        final B output;

        public WorkerResult(MapWorker<A, B> worker, WorkerStatus status, A input, B output) {
            this.worker = worker;
            this.status = status;
            this.input = input;
            this.output = output;
        }
    }
}

