/*
 * Decompiled with CFR 0.152.
 */
package com.github.rjeschke.neetutils.concurrent;

import com.github.rjeschke.neetutils.SysUtils;
import com.github.rjeschke.neetutils.concurrent.RequeueWatcher;
import com.github.rjeschke.neetutils.concurrent.RequeueWatcherCallback;
import com.github.rjeschke.neetutils.concurrent.ThreadPool;
import com.github.rjeschke.neetutils.concurrent.Worker;
import com.github.rjeschke.neetutils.concurrent.WorkerCallback;
import com.github.rjeschke.neetutils.concurrent.WorkerStatus;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;

public class WorkerPool<T>
implements Runnable,
RequeueWatcherCallback<Job<T>, ThreadWorker<T>> {
    private final int numThreads;
    private final int queueLimit;
    private final boolean serialCallbacks;
    private final WorkerCallback<T> callback;
    private final ConcurrentLinkedQueue<ThreadWorker<T>> workers = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<Job<T>> jobs = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<WorkerResult<T>> results = new ConcurrentLinkedQueue();
    private final Semaphore resultSync = new Semaphore(1);
    private final Thread[] threads;
    private Thread callbackThread = null;
    private RequeueWatcher<Job<T>, ThreadWorker<T>> watcher;

    private WorkerPool(WorkerCallback<T> callback, int threads, int queueLimit, boolean serialCallbacks) {
        this.callback = callback;
        this.numThreads = threads;
        this.serialCallbacks = serialCallbacks;
        this.queueLimit = Math.max(0, queueLimit);
        this.threads = new Thread[threads];
    }

    public static <T> WorkerPool<T> start(WorkerCallback<T> callback, int threads, int queueLimit, boolean serialCallbacks) {
        WorkerPool<T> jobber = new WorkerPool<T>(callback, ThreadPool.defaultThreadcount(threads), queueLimit, serialCallbacks);
        for (int i = 0; i < jobber.threads.length; ++i) {
            ThreadWorker<T> w = new ThreadWorker<T>(jobber);
            Thread t = new Thread(w);
            t.setDaemon(true);
            jobber.workers.offer(w);
            t.start();
            jobber.threads[i] = t;
        }
        if (serialCallbacks) {
            jobber.resultSync.acquireUninterruptibly();
            Thread t = new Thread(jobber);
            t.setDaemon(true);
            t.start();
            jobber.callbackThread = t;
        }
        jobber.watcher = RequeueWatcher.start(jobber, jobber.jobs, jobber.workers);
        return jobber;
    }

    public static final int availableProcessors() {
        return Runtime.getRuntime().availableProcessors();
    }

    public int threadCount() {
        return this.numThreads;
    }

    public void enqueue(Worker<T> worker, T object) {
        if (worker == null) {
            throw new NullPointerException("A null Worker is not permitted");
        }
        ThreadWorker<T> w = this.workers.poll();
        Job<T> job = new Job<T>(worker, object);
        if (w != null) {
            w.setWorkLoad(job);
        } else {
            if (this.queueLimit != 0 && this.jobs.size() >= this.queueLimit) {
                int ql = Math.max(this.queueLimit >> 1, 1);
                while (this.jobs.size() > ql) {
                    SysUtils.fineSleep(5L);
                }
            }
            this.jobs.offer(job);
        }
    }

    private void reuseOrEnqueue(ThreadWorker<T> w) {
        Job<T> job = this.jobs.poll();
        if (job != null) {
            w.setWorkLoad(job);
        } else {
            this.workers.offer(w);
        }
    }

    void doCallback(ThreadWorker<T> threadWorker, WorkerStatus status, Worker<T> worker, T object) {
        if (this.serialCallbacks) {
            this.results.offer(new WorkerResult<T>(worker, object, status));
            this.resultSync.release();
        } else {
            try {
                this.callback.workerCallback(this, status, worker, object);
            }
            catch (Throwable t) {
                // empty catch block
            }
        }
        this.reuseOrEnqueue(threadWorker);
    }

    public boolean hasWork() {
        return !this.jobs.isEmpty();
    }

    public void join() {
        while (!this.jobs.isEmpty() || this.serialCallbacks && !this.results.isEmpty()) {
            SysUtils.sleep(10L);
        }
    }

    public void stop() {
        int i;
        StopWorker stop = new StopWorker();
        this.join();
        for (i = 0; i < this.numThreads; ++i) {
            this.enqueue(stop, null);
        }
        for (i = 0; i < this.numThreads; ++i) {
            SysUtils.threadJoin(this.threads[i]);
        }
        if (this.callbackThread != null) {
            this.results.offer(new WorkerResult<Object>(null, null, WorkerStatus.OK));
            this.resultSync.release();
            SysUtils.threadJoin(this.callbackThread);
        }
        this.watcher.stop();
    }

    @Override
    public void run() {
        block2: while (true) {
            try {
                while (true) {
                    this.resultSync.acquireUninterruptibly();
                    WorkerResult<T> r = this.results.poll();
                    if (r.worker == null) break block2;
                    this.callback.workerCallback(this, r.status, r.worker, r.object);
                }
            }
            catch (Throwable throwable) {
                continue;
            }
            break;
        }
    }

    @Override
    public void requeue(ThreadWorker<T> worker, Job<T> job) {
        worker.setWorkLoad(job);
    }

    static class StopWorker<T>
    implements Worker<T> {
        @Override
        public void run(T object) {
        }
    }

    static class Job<T> {
        public final Worker<T> worker;
        public final T object;

        public Job(Worker<T> worker, T object) {
            this.worker = worker;
            this.object = object;
        }
    }

    static class ThreadWorker<T>
    implements Runnable {
        private final Semaphore sync = new Semaphore(1);
        private final WorkerPool<T> pool;
        private volatile Job<T> workload = null;

        public ThreadWorker(WorkerPool<T> pool) {
            this.sync.acquireUninterruptibly();
            this.pool = pool;
        }

        protected void setWorkLoad(Job<T> job) {
            this.workload = job;
            this.sync.release();
        }

        @Override
        public void run() {
            while (true) {
                boolean ok = true;
                Throwable ta = null;
                try {
                    this.sync.acquireUninterruptibly();
                    if (this.workload.worker instanceof StopWorker) break;
                    this.workload.worker.run(this.workload.object);
                }
                catch (Throwable t) {
                    ta = t;
                    ok = false;
                }
                this.pool.doCallback(this, ok ? WorkerStatus.OK : new WorkerStatus(ta), this.workload.worker, this.workload.object);
            }
        }
    }

    static class WorkerResult<T> {
        final Worker<T> worker;
        final WorkerStatus status;
        final T object;

        public WorkerResult(Worker<T> worker, T object, WorkerStatus status) {
            this.worker = worker;
            this.object = object;
            this.status = status;
        }
    }
}

