New directory tree, with preprocessor/ inside interpretor/.
[Faustine.git] / interpretor / preprocessor / faust-0.9.47mr3 / architecture / scheduler.h
diff --git a/interpretor/preprocessor/faust-0.9.47mr3/architecture/scheduler.h b/interpretor/preprocessor/faust-0.9.47mr3/architecture/scheduler.h
new file mode 100644 (file)
index 0000000..b04370f
--- /dev/null
@@ -0,0 +1,941 @@
+
+#include <stdlib.h>
+#include <assert.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <semaphore.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <math.h>
+
+using namespace std;
+
+// Globals
+
+#define THREAD_SIZE 64
+#define QUEUE_SIZE 4096
+
+#define WORK_STEALING_INDEX 0
+#define LAST_TASK_INDEX 1
+
+
+#ifdef __ICC
+#define INLINE __forceinline
+#else
+#define INLINE inline
+#endif
+
+
+// On Intel set FZ (Flush to Zero) and DAZ (Denormals Are Zero)
+// flags to avoid costly denormals
+#ifdef __SSE__
+#include <xmmintrin.h>
+#ifdef __SSE2__
+#define AVOIDDENORMALS _mm_setcsr(_mm_getcsr() | 0x8040)
+#else
+#define AVOIDDENORMALS _mm_setcsr(_mm_getcsr() | 0x8000)
+#endif
+#else
+#define AVOIDDENORMALS 
+#endif
+
+#ifdef __linux__
+
+// handle 32/64 bits int size issues
+#ifdef __x86_64__
+#define UInt32 unsigned int
+#define UInt64 unsigned long int
+#else
+#define UInt32 unsigned int
+#define UInt64 unsigned long long int
+#endif
+
+#endif
+
+#ifdef __APPLE__
+#include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacTypes.h>
+#endif
+
+class TaskQueue;
+struct DSPThreadPool;
+
+extern TaskQueue* gTaskQueueList[THREAD_SIZE];
+extern DSPThreadPool* gThreadPool;
+extern int gClientCount;
+extern UInt64 gMaxStealing;
+    
+void Yield();
+
+/**
+ * Returns the number of clock cycles elapsed since the last reset
+ * of the processor
+ */
+static INLINE UInt64 DSP_rdtsc(void)
+{
+       union {
+               UInt32 i32[2];
+               UInt64 i64;
+       } count;
+       
+       __asm__ __volatile__("rdtsc" : "=a" (count.i32[0]), "=d" (count.i32[1]));
+     return count.i64;
+}
+
+#if defined(__i386__) || defined(__x86_64__)
+
+#define LOCK "lock ; "
+
+static INLINE void NOP(void)
+{
+       __asm__ __volatile__("nop \n\t");
+}
+
+static INLINE char CAS1(volatile void* addr, volatile int value, int newvalue)
+{
+    register char ret;
+    __asm__ __volatile__ (
+                                                 "# CAS \n\t"
+                                                 LOCK "cmpxchg %2, (%1) \n\t"
+                                                 "sete %0               \n\t"
+                                                 : "=a" (ret)
+                                                 : "c" (addr), "d" (newvalue), "a" (value)
+                          : "memory"
+                                                 );
+    return ret;
+}
+
+static INLINE int atomic_xadd(volatile int* atomic, int val) 
+{ 
+    register int result;
+    __asm__ __volatile__ ("# atomic_xadd \n\t"
+                          LOCK "xaddl %0,%1 \n\t"
+                          : "=r" (result), "=m" (*atomic) 
+                          : "0" (val), "m" (*atomic));
+    return result;
+} 
+
+#endif
+
+
+/*
+static INLINE int INC_ATOMIC(volatile int* val)
+{
+    int actual;
+    do {
+        actual = *val;
+    } while (!CAS1(val, actual, actual + 1));
+    return actual;
+}
+
+static INLINE int DEC_ATOMIC(volatile int* val)
+{
+    int actual;
+    do {
+        actual = *val;
+    } while (!CAS1(val, actual, actual - 1));
+    return actual;
+}
+*/
+
+static INLINE int INC_ATOMIC(volatile int* val)
+{
+    return atomic_xadd(val, 1);
+}
+static INLINE int DEC_ATOMIC(volatile int* val)
+{
+    return atomic_xadd(val, -1);
+}
+// To be used in lock-free queue
+struct AtomicCounter
+{
+    union {
+        struct {
+            short fHead;       
+            short fTail;       
+        }
+        scounter;
+        int fValue;
+    }info;
+    
+       INLINE AtomicCounter()
+       {
+        info.fValue = 0;
+    }
+     
+       INLINE  AtomicCounter& operator=(AtomicCounter& obj)
+    {
+        info.fValue = obj.info.fValue;
+        return *this;
+    }
+    
+       INLINE  AtomicCounter& operator=(volatile AtomicCounter& obj)
+       {
+        info.fValue = obj.info.fValue;
+        return *this;
+    }
+    
+};
+
+int get_max_cpu()
+{
+    return sysconf(_SC_NPROCESSORS_ONLN);
+}
+
+static int GetPID()
+{
+#ifdef WIN32
+    return  _getpid();
+#else
+    return getpid();
+#endif
+}
+
+#define Value(e) (e).info.fValue
+
+#define Head(e) (e).info.scounter.fHead
+#define IncHead(e) (e).info.scounter.fHead++
+#define DecHead(e) (e).info.scounter.fHead--
+
+#define Tail(e) (e).info.scounter.fTail
+#define IncTail(e) (e).info.scounter.fTail++
+#define DecTail(e) (e).info.scounter.fTail--
+
+#define MASTER_THREAD 0
+
+#define MAX_STEAL_DUR 50                    // in usec
+#define DEFAULT_CLOCKSPERSEC 2500000000     // in cycles (2,5 Ghz)
+
+class TaskQueue 
+{
+    private:
+    
+        int fTaskList[QUEUE_SIZE];
+        volatile AtomicCounter fCounter;
+        UInt64 fStealingStart;
+     
+    public:
+  
+        INLINE TaskQueue(int cur_thread)
+        {
+            for (int i = 0; i < QUEUE_SIZE; i++) {
+                fTaskList[i] = -1;
+            }
+            gTaskQueueList[cur_thread] = this; 
+            fStealingStart = 0;
+        }
+         
+        INLINE void PushHead(int item)
+        {
+            fTaskList[Head(fCounter)] = item;
+            IncHead(fCounter);
+        }
+        
+        INLINE int PopHead()
+        {
+            AtomicCounter old_val;
+            AtomicCounter new_val;
+            
+            do {
+                old_val = fCounter;
+                new_val = old_val;
+                if (Head(old_val) == Tail(old_val)) {
+                    return WORK_STEALING_INDEX;
+                } else {
+                    DecHead(new_val);
+                }
+            } while (!CAS1(&fCounter, Value(old_val), Value(new_val)));
+            
+            return fTaskList[Head(old_val) - 1];
+        }
+        
+        INLINE int PopTail()
+        {
+            AtomicCounter old_val;
+            AtomicCounter new_val;
+            
+            do {
+                old_val = fCounter;
+                new_val = old_val;
+                if (Head(old_val) == Tail(old_val)) {
+                   return WORK_STEALING_INDEX;
+                } else {
+                    IncTail(new_val);
+                }
+            } while (!CAS1(&fCounter, Value(old_val), Value(new_val)));
+            
+            return fTaskList[Tail(old_val)];
+        }
+
+               INLINE void MeasureStealingDur()
+               {
+            // Takes first timetamp
+            if (fStealingStart == 0) {
+                fStealingStart = DSP_rdtsc();
+            } else if ((DSP_rdtsc() - fStealingStart) > gMaxStealing) {
+                Yield();
+            }
+               }
+
+               INLINE void ResetStealingDur()
+               {
+            fStealingStart = 0;
+               }
+        
+        static INLINE int GetNextTask(int thread, int num_threads)
+        {
+            int tasknum;
+            for (int i = 0; i < num_threads; i++) {
+                if ((i != thread) && gTaskQueueList[i] && (tasknum = gTaskQueueList[i]->PopTail()) != WORK_STEALING_INDEX) {
+                #ifdef __linux__
+                                       //if (thread != MASTER_THREAD)
+                                               gTaskQueueList[thread]->ResetStealingDur();
+                #endif
+                    return tasknum;    // Task is found
+                }
+            }
+            NOP();
+          #ifdef __linux__
+                       //if (thread != MASTER_THREAD)
+                               gTaskQueueList[thread]->MeasureStealingDur();
+        #endif
+            return WORK_STEALING_INDEX;    // Otherwise will try "workstealing" again next cycle...
+        }
+        
+        INLINE void InitTaskList(int task_list_size, int* task_list, int thread_num, int cur_thread, int& tasknum)
+        {
+            int task_slice = task_list_size / thread_num;
+            int task_slice_rest = task_list_size % thread_num;
+
+            if (task_slice == 0) {
+                // Each thread directly executes one task
+                tasknum = task_list[cur_thread];
+                // Thread 0 takes remaining ready tasks 
+                if (cur_thread == 0) { 
+                    for (int index = 0; index < task_slice_rest - thread_num; index++) {
+                        PushHead(task_list[task_slice_rest + index]);
+                    }
+                }
+            } else {
+                // Each thread takes a part of ready tasks
+                int index;
+                for (index = 0; index < task_slice - 1; index++) {
+                    PushHead(task_list[cur_thread * task_slice + index]);
+                }
+                // Each thread directly executes one task 
+                tasknum = task_list[cur_thread * task_slice + index];
+                // Thread 0 takes remaining ready tasks 
+                if (cur_thread == 0) {
+                    for (index = 0; index < task_slice_rest; index++) {
+                        PushHead(task_list[thread_num * task_slice + index]);
+                    }
+                }
+            }
+        }
+        
+        static INLINE void Init()
+        {
+            for (int i = 0; i < THREAD_SIZE; i++) {
+                gTaskQueueList[i] = 0;
+            }
+        }
+     
+};
+
+struct TaskGraph 
+{
+    volatile int gTaskList[QUEUE_SIZE];
+    
+    TaskGraph()
+    {
+        for (int i = 0; i < QUEUE_SIZE; i++) {
+            gTaskList[i] = 0;
+        } 
+    }
+
+    INLINE void InitTask(int task, int val)
+    {
+        gTaskList[task] = val;
+    }
+    
+    void Display()
+    {
+        for (int i = 0; i < QUEUE_SIZE; i++) {
+            printf("Task = %d activation = %d\n", i, gTaskList[i]);
+        } 
+    }
+      
+    INLINE void ActivateOutputTask(TaskQueue& queue, int task, int& tasknum)
+    {
+        if (DEC_ATOMIC(&gTaskList[task]) == 1) {
+            if (tasknum == WORK_STEALING_INDEX) {
+                tasknum = task;
+            } else {
+                queue.PushHead(task);
+            }
+        }    
+    }
+      
+    INLINE void ActivateOutputTask(TaskQueue& queue, int task)
+    {
+        if (DEC_ATOMIC(&gTaskList[task]) == 1) {
+            queue.PushHead(task);
+        }
+    }
+    
+    INLINE void ActivateOneOutputTask(TaskQueue& queue, int task, int& tasknum)
+    {
+        if (DEC_ATOMIC(&gTaskList[task]) == 1) {
+            tasknum = task;
+        } else {
+            tasknum = queue.PopHead(); 
+        }
+    }
+    
+    INLINE void GetReadyTask(TaskQueue& queue, int& tasknum)
+    {
+        if (tasknum == WORK_STEALING_INDEX) {
+            tasknum = queue.PopHead();
+        }
+    }
+};
+
+
+#define THREAD_POOL_SIZE 16
+#define JACK_SCHED_POLICY SCHED_FIFO
+
+/* use 512KB stack per thread - the default is way too high to be feasible
+ * with mlockall() on many systems */
+#define THREAD_STACK 524288
+
+
+#ifdef __APPLE__
+
+#include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacTypes.h>
+#include <mach/thread_policy.h>
+#include <mach/thread_act.h>
+
+#define THREAD_SET_PRIORITY         0
+#define THREAD_SCHEDULED_PRIORITY   1
+
+static UInt32 GetThreadPriority(pthread_t thread, int inWhichPriority);
+
+// returns the thread's priority as it was last set by the API
+static UInt32 GetThreadSetPriority(pthread_t thread)
+{
+    return GetThreadPriority(thread, THREAD_SET_PRIORITY);
+}
+
+// returns the thread's priority as it was last scheduled by the Kernel
+static UInt32 GetThreadScheduledPriority(pthread_t thread)
+{
+    return GetThreadPriority(thread, THREAD_SCHEDULED_PRIORITY);
+}
+
+static int SetThreadToPriority(pthread_t thread, UInt32 inPriority, Boolean inIsFixed, UInt64 period, UInt64 computation, UInt64 constraint)
+{
+    if (inPriority == 96) {
+        // REAL-TIME / TIME-CONSTRAINT THREAD
+        thread_time_constraint_policy_data_t theTCPolicy;
+        theTCPolicy.period = period;
+        theTCPolicy.computation = computation;
+        theTCPolicy.constraint = constraint;
+        theTCPolicy.preemptible = true;
+        kern_return_t res = thread_policy_set(pthread_mach_thread_np(thread), THREAD_TIME_CONSTRAINT_POLICY, (thread_policy_t)&theTCPolicy, THREAD_TIME_CONSTRAINT_POLICY_COUNT);
+        return (res == KERN_SUCCESS) ? 0 : -1;
+    } else {
+        // OTHER THREADS
+        thread_extended_policy_data_t theFixedPolicy;
+        thread_precedence_policy_data_t thePrecedencePolicy;
+        SInt32 relativePriority;
+        
+        // [1] SET FIXED / NOT FIXED
+        theFixedPolicy.timeshare = !inIsFixed;
+        thread_policy_set(pthread_mach_thread_np(thread), THREAD_EXTENDED_POLICY, (thread_policy_t)&theFixedPolicy, THREAD_EXTENDED_POLICY_COUNT);
+        
+        // [2] SET PRECEDENCE
+        // N.B.: We expect that if thread A created thread B, and the program wishes to change
+        // the priority of thread B, then the call to change the priority of thread B must be
+        // made by thread A.
+        // This assumption allows us to use pthread_self() to correctly calculate the priority
+        // of the feeder thread (since precedency policy's importance is relative to the
+        // spawning thread's priority.)
+        relativePriority = inPriority - GetThreadSetPriority(pthread_self());
+        
+        thePrecedencePolicy.importance = relativePriority;
+        kern_return_t res = thread_policy_set(pthread_mach_thread_np(thread), THREAD_PRECEDENCE_POLICY, (thread_policy_t)&thePrecedencePolicy, THREAD_PRECEDENCE_POLICY_COUNT);
+        return (res == KERN_SUCCESS) ? 0 : -1;
+    }
+}
+
+static UInt32 GetThreadPriority(pthread_t thread, int inWhichPriority)
+{
+    thread_basic_info_data_t threadInfo;
+    policy_info_data_t thePolicyInfo;
+    unsigned int count;
+    
+    // get basic info
+    count = THREAD_BASIC_INFO_COUNT;
+    thread_info(pthread_mach_thread_np(thread), THREAD_BASIC_INFO, (thread_info_t)&threadInfo, &count);
+    
+    switch (threadInfo.policy) {
+        case POLICY_TIMESHARE:
+            count = POLICY_TIMESHARE_INFO_COUNT;
+            thread_info(pthread_mach_thread_np(thread), THREAD_SCHED_TIMESHARE_INFO, (thread_info_t)&(thePolicyInfo.ts), &count);
+            if (inWhichPriority == THREAD_SCHEDULED_PRIORITY) {
+                return thePolicyInfo.ts.cur_priority;
+            } else {
+                return thePolicyInfo.ts.base_priority;
+            }
+            break;
+            
+        case POLICY_FIFO:
+            count = POLICY_FIFO_INFO_COUNT;
+            thread_info(pthread_mach_thread_np(thread), THREAD_SCHED_FIFO_INFO, (thread_info_t)&(thePolicyInfo.fifo), &count);
+            if ((thePolicyInfo.fifo.depressed) && (inWhichPriority == THREAD_SCHEDULED_PRIORITY)) {
+                return thePolicyInfo.fifo.depress_priority;
+            }
+            return thePolicyInfo.fifo.base_priority;
+            break;
+            
+        case POLICY_RR:
+            count = POLICY_RR_INFO_COUNT;
+            thread_info(pthread_mach_thread_np(thread), THREAD_SCHED_RR_INFO, (thread_info_t)&(thePolicyInfo.rr), &count);
+            if ((thePolicyInfo.rr.depressed) && (inWhichPriority == THREAD_SCHEDULED_PRIORITY)) {
+                return thePolicyInfo.rr.depress_priority;
+            }
+            return thePolicyInfo.rr.base_priority;
+            break;
+    }
+    
+    return 0;
+}
+
+static int GetParams(pthread_t thread, UInt64* period, UInt64* computation, UInt64* constraint)
+{
+    thread_time_constraint_policy_data_t theTCPolicy;
+    mach_msg_type_number_t count = THREAD_TIME_CONSTRAINT_POLICY_COUNT;
+    boolean_t get_default = false;
+    
+    kern_return_t res = thread_policy_get(pthread_mach_thread_np(thread),
+                                          THREAD_TIME_CONSTRAINT_POLICY,
+                                          (thread_policy_t)&theTCPolicy,
+                                          &count,
+                                          &get_default);
+    if (res == KERN_SUCCESS) {
+        *period = theTCPolicy.period;
+        *computation = theTCPolicy.computation;
+        *constraint = theTCPolicy.constraint;
+        return 0;
+    } else {
+        return -1;
+    }
+}
+
+static UInt64 period = 0;
+static UInt64 computation = 0;
+static UInt64 constraint = 0;
+
+INLINE void GetRealTime()
+{
+    if (period == 0) {
+        GetParams(pthread_self(), &period, &computation, &constraint);
+    }
+}
+
+INLINE void SetRealTime()
+{
+    SetThreadToPriority(pthread_self(), 96, true, period, computation, constraint);
+}
+
+void CancelThread(pthread_t fThread)
+{
+    mach_port_t machThread = pthread_mach_thread_np(fThread);
+    thread_terminate(machThread);
+}
+
+INLINE void Yield()
+{
+    //sched_yield();
+}
+
+#endif
+
+#ifdef __linux__
+
+static int faust_sched_policy = -1;
+static struct sched_param faust_rt_param; 
+
+INLINE void GetRealTime()
+{
+    if (faust_sched_policy == -1) {
+        memset(&faust_rt_param, 0, sizeof(faust_rt_param));
+       pthread_getschedparam(pthread_self(), &faust_sched_policy, &faust_rt_param);
+    }
+}
+
+INLINE void SetRealTime()
+{
+       faust_rt_param.sched_priority--;
+    pthread_setschedparam(pthread_self(), faust_sched_policy, &faust_rt_param);
+}
+
+void CancelThread(pthread_t fThread)
+{
+    pthread_cancel(fThread);
+    pthread_join(fThread, NULL);
+}
+
+INLINE void Yield()
+{
+    pthread_yield();
+}
+
+
+#endif
+
+#define KDSPMESURE 50
+
+static INLINE int Range(int min, int max, int val)
+{
+    if (val < min) {
+        return min;
+    } else if (val > max) {
+        return max;
+    } else {
+        return val;
+    }
+}
+
+struct Runnable {
+    
+    UInt64 fTiming[KDSPMESURE];
+    UInt64 fStart;
+    UInt64 fStop;
+    int fCounter;
+    float fOldMean;
+    int fOldfDynamicNumThreads;
+    bool fDynAdapt;
+    
+    virtual void computeThread(int cur_thread) = 0;
+    
+    Runnable():fCounter(0), fOldMean(1000000000.f), fOldfDynamicNumThreads(1)
+    {
+       memset(fTiming, 0, sizeof(long long int ) * KDSPMESURE);
+        fDynAdapt = getenv("OMP_DYN_THREAD") ? strtol(getenv("OMP_DYN_THREAD"), NULL, 10) : false;
+    }
+    
+    INLINE float ComputeMean()
+    {
+        float mean = 0;
+        for (int i = 0; i < KDSPMESURE; i++) {
+            mean += float(fTiming[i]);
+        }
+        mean /= float(KDSPMESURE);
+        return mean;
+    }
+    
+    INLINE void StartMeasure()
+    {
+        if (!fDynAdapt)
+            return;
+        
+        fStart = DSP_rdtsc();
+    }
+     
+    INLINE void StopMeasure(int staticthreadnum, int& dynthreadnum)
+    {
+        if (!fDynAdapt)
+            return;
+        
+        fStop = DSP_rdtsc();
+        fCounter = (fCounter + 1) % KDSPMESURE;
+        if (fCounter == 0) {
+            float mean = ComputeMean();
+            if (fabs(mean - fOldMean) > 5000) {
+                if (mean > fOldMean) { // Worse...
+                    //printf("Worse %f %f\n", mean, fOldMean);
+                    if (fOldfDynamicNumThreads > dynthreadnum) {
+                        fOldfDynamicNumThreads = dynthreadnum;
+                        dynthreadnum += 1;
+                    } else {
+                        fOldfDynamicNumThreads = dynthreadnum;
+                        dynthreadnum -= 1;
+                    }
+                 } else { // Better...
+                    //printf("Better %f %f\n", mean, fOldMean);
+                    if (fOldfDynamicNumThreads > dynthreadnum) {
+                        fOldfDynamicNumThreads = dynthreadnum;
+                        dynthreadnum -= 1;
+                    } else {
+                        fOldfDynamicNumThreads = dynthreadnum;
+                        dynthreadnum += 1;
+                    }
+                }
+                fOldMean = mean;
+                dynthreadnum = Range(1, staticthreadnum, dynthreadnum);
+                //printf("dynthreadnum %d\n", dynthreadnum);
+            }
+        }
+        fTiming[fCounter] = fStop - fStart; 
+    }
+};
+
+struct DSPThread;
+
+struct DSPThreadPool {
+    
+    DSPThread* fThreadPool[THREAD_POOL_SIZE];
+    int fThreadCount; 
+    volatile int fCurThreadCount;
+      
+    DSPThreadPool();
+    ~DSPThreadPool();
+    
+    void StartAll(int num, bool realtime);
+    void StopAll();
+    void SignalAll(int num, Runnable* runnable);
+    
+    void SignalOne();
+    bool IsFinished();
+    
+    static DSPThreadPool* Init();
+    static void Destroy();
+    
+};
+
+struct DSPThread {
+
+    pthread_t fThread;
+    DSPThreadPool* fThreadPool;
+    Runnable* fRunnable;
+    sem_t* fSemaphore;
+    char fName[128];
+    bool fRealTime;
+    int fNum;
+    
+    DSPThread(int num, DSPThreadPool* pool)
+    {
+        fNum = num;
+        fThreadPool = pool;
+        fRunnable = NULL;
+        fRealTime = false;
+        
+        sprintf(fName, "faust_sem_%d_%p", GetPID(), this);
+        
+        if ((fSemaphore = sem_open(fName, O_CREAT, 0777, 0)) == (sem_t*)SEM_FAILED) {
+            printf("Allocate: can't check in named semaphore name = %s err = %s", fName, strerror(errno));
+        }
+    }
+
+    virtual ~DSPThread()
+    {
+        sem_unlink(fName);
+        sem_close(fSemaphore);
+    }
+    
+    void Run()
+    {
+        while (sem_wait(fSemaphore) != 0) {}
+        fRunnable->computeThread(fNum + 1);
+        fThreadPool->SignalOne();
+    }
+    
+    static void* ThreadHandler(void* arg)
+    {
+        DSPThread* thread = static_cast<DSPThread*>(arg);
+        
+        AVOIDDENORMALS;
+        
+        // One "dummy" cycle to setup thread
+        if (thread->fRealTime) {
+            thread->Run();
+            SetRealTime();
+        }
+                  
+        while (true) {
+            thread->Run();
+        }
+        
+        return NULL;
+    }
+    
+    int Start(bool realtime)
+    {
+        pthread_attr_t attributes;
+        struct sched_param rt_param;
+        pthread_attr_init(&attributes);
+        
+        int priority = 60; // TODO
+        int res;
+        
+        if (realtime) {
+            fRealTime = true;
+        }else {
+            fRealTime = getenv("OMP_REALTIME") ? strtol(getenv("OMP_REALTIME"), NULL, 10) : true;
+        }
+                               
+        if ((res = pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE))) {
+            printf("Cannot request joinable thread creation for real-time thread res = %d err = %s\n", res, strerror(errno));
+            return -1;
+        }
+
+        if ((res = pthread_attr_setscope(&attributes, PTHREAD_SCOPE_SYSTEM))) {
+            printf("Cannot set scheduling scope for real-time thread res = %d err = %s\n", res, strerror(errno));
+            return -1;
+        }
+
+        if (realtime) {
+            
+            if ((res = pthread_attr_setinheritsched(&attributes, PTHREAD_EXPLICIT_SCHED))) {
+                printf("Cannot request explicit scheduling for RT thread res = %d err = %s\n", res, strerror(errno));
+                return -1;
+            }
+        
+            if ((res = pthread_attr_setschedpolicy(&attributes, JACK_SCHED_POLICY))) {
+                printf("Cannot set RR scheduling class for RT thread res = %d err = %s\n", res, strerror(errno));
+                return -1;
+            }
+            
+            memset(&rt_param, 0, sizeof(rt_param));
+            rt_param.sched_priority = priority;
+
+            if ((res = pthread_attr_setschedparam(&attributes, &rt_param))) {
+                printf("Cannot set scheduling priority for RT thread res = %d err = %s\n", res, strerror(errno));
+                return -1;
+            }
+
+        } else {
+            
+            if ((res = pthread_attr_setinheritsched(&attributes, PTHREAD_INHERIT_SCHED))) {
+                printf("Cannot request explicit scheduling for RT thread res = %d err = %s\n", res, strerror(errno));
+                return -1;
+            }
+        }
+     
+        if ((res = pthread_attr_setstacksize(&attributes, THREAD_STACK))) {
+            printf("Cannot set thread stack size res = %d err = %s\n", res, strerror(errno));
+            return -1;
+        }
+        
+        if ((res = pthread_create(&fThread, &attributes, ThreadHandler, this))) {
+            printf("Cannot create thread res = %d err = %s\n", res, strerror(errno));
+            return -1;
+        }
+
+        pthread_attr_destroy(&attributes);
+        return 0;
+    }
+    
+    void Signal(bool stop, Runnable* runnable)
+    {
+        fRunnable = runnable;
+        sem_post(fSemaphore);
+    }
+    
+    void Stop()
+    {
+        CancelThread(fThread);
+    }
+
+};
+
+DSPThreadPool::DSPThreadPool()
+{
+    for (int i = 0; i < THREAD_POOL_SIZE; i++) {
+        fThreadPool[i] = NULL;
+    }
+    fThreadCount = 0;
+    fCurThreadCount = 0;
+}
+
+DSPThreadPool::~DSPThreadPool()
+{
+    StopAll();
+    
+    for (int i = 0; i < fThreadCount; i++) {
+        delete(fThreadPool[i]);
+        fThreadPool[i] = NULL;
+    }
+    
+    fThreadCount = 0;
+ }
+
+void DSPThreadPool::StartAll(int num, bool realtime)
+{
+    if (fThreadCount == 0) {  // Protection for multiple call...  (like LADSPA plug-ins in Ardour)
+        for (int i = 0; i < num; i++) {
+            fThreadPool[i] = new DSPThread(i, this);
+            fThreadPool[i]->Start(realtime);
+            fThreadCount++;
+        }
+    }
+}
+
+void DSPThreadPool::StopAll()
+{
+    for (int i = 0; i < fThreadCount; i++) {
+        fThreadPool[i]->Stop();
+    }
+}
+
+void DSPThreadPool::SignalAll(int num, Runnable* runnable)
+{
+    fCurThreadCount = num;
+        
+    for (int i = 0; i < num; i++) {  // Important : use local num here...
+        fThreadPool[i]->Signal(false, runnable);
+    }
+}
+
+void DSPThreadPool::SignalOne()
+{
+    DEC_ATOMIC(&fCurThreadCount);
+}
+
+bool DSPThreadPool::IsFinished()
+{
+    return (fCurThreadCount == 0);
+}
+
+DSPThreadPool* DSPThreadPool::Init()
+{
+    if (gClientCount++ == 0 && !gThreadPool) {
+        gThreadPool = new DSPThreadPool();
+    }
+    return gThreadPool;
+}
+
+void DSPThreadPool::Destroy()
+{
+    if (--gClientCount == 0 && gThreadPool) {
+        delete gThreadPool;
+        gThreadPool = NULL;
+    }
+}
+
+#ifndef PLUG_IN
+
+// Globals
+TaskQueue* gTaskQueueList[THREAD_SIZE] = {0};
+
+DSPThreadPool* gThreadPool = 0;
+int gClientCount = 0;
+
+int clock_per_microsec = (getenv("CLOCKSPERSEC") 
+                ? strtoll(getenv("CLOCKSPERSEC"), NULL, 10) 
+                : DEFAULT_CLOCKSPERSEC) / 1000000;
+                
+UInt64  gMaxStealing = getenv("OMP_STEALING_DUR") 
+                ? strtoll(getenv("OMP_STEALING_DUR"), NULL, 10) * clock_per_microsec 
+                : MAX_STEAL_DUR * clock_per_microsec;
+
+#endif
+