20 #define THREAD_SIZE 64
21 #define QUEUE_SIZE 4096
23 #define WORK_STEALING_INDEX 0
24 #define LAST_TASK_INDEX 1
28 #define INLINE __forceinline
34 // On Intel set FZ (Flush to Zero) and DAZ (Denormals Are Zero)
35 // flags to avoid costly denormals
37 #include <xmmintrin.h>
39 #define AVOIDDENORMALS _mm_setcsr(_mm_getcsr() | 0x8040)
41 #define AVOIDDENORMALS _mm_setcsr(_mm_getcsr() | 0x8000)
44 #define AVOIDDENORMALS
49 // handle 32/64 bits int size issues
51 #define UInt32 unsigned int
52 #define UInt64 unsigned long int
54 #define UInt32 unsigned int
55 #define UInt64 unsigned long long int
61 #include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacTypes.h>
67 extern TaskQueue
* gTaskQueueList
[THREAD_SIZE
];
68 extern DSPThreadPool
* gThreadPool
;
69 extern int gClientCount
;
70 extern UInt64 gMaxStealing
;
75 * Returns the number of clock cycles elapsed since the last reset
78 static INLINE UInt64
DSP_rdtsc(void)
85 __asm__
__volatile__("rdtsc" : "=a" (count
.i32
[0]), "=d" (count
.i32
[1]));
89 #if defined(__i386__) || defined(__x86_64__)
91 #define LOCK "lock ; "
93 static INLINE
void NOP(void)
95 __asm__
__volatile__("nop \n\t");
98 static INLINE
char CAS1(volatile void* addr
, volatile int value
, int newvalue
)
101 __asm__
__volatile__ (
103 LOCK
"cmpxchg %2, (%1) \n\t"
106 : "c" (addr
), "d" (newvalue
), "a" (value
)
112 static INLINE
int atomic_xadd(volatile int* atomic
, int val
)
115 __asm__
__volatile__ ("# atomic_xadd \n\t"
116 LOCK
"xaddl %0,%1 \n\t"
117 : "=r" (result
), "=m" (*atomic
)
118 : "0" (val
), "m" (*atomic
));
126 static INLINE int INC_ATOMIC(volatile int* val)
131 } while (!CAS1(val, actual, actual + 1));
135 static INLINE int DEC_ATOMIC(volatile int* val)
140 } while (!CAS1(val, actual, actual - 1));
145 static INLINE
int INC_ATOMIC(volatile int* val
)
147 return atomic_xadd(val
, 1);
150 static INLINE
int DEC_ATOMIC(volatile int* val
)
152 return atomic_xadd(val
, -1);
155 // To be used in lock-free queue
167 INLINE
AtomicCounter()
172 INLINE AtomicCounter
& operator=(AtomicCounter
& obj
)
174 info
.fValue
= obj
.info
.fValue
;
178 INLINE AtomicCounter
& operator=(volatile AtomicCounter
& obj
)
180 info
.fValue
= obj
.info
.fValue
;
188 return sysconf(_SC_NPROCESSORS_ONLN
);
200 #define Value(e) (e).info.fValue
202 #define Head(e) (e).info.scounter.fHead
203 #define IncHead(e) (e).info.scounter.fHead++
204 #define DecHead(e) (e).info.scounter.fHead--
206 #define Tail(e) (e).info.scounter.fTail
207 #define IncTail(e) (e).info.scounter.fTail++
208 #define DecTail(e) (e).info.scounter.fTail--
210 #define MASTER_THREAD 0
212 #define MAX_STEAL_DUR 50 // in usec
213 #define DEFAULT_CLOCKSPERSEC 2500000000 // in cycles (2,5 Ghz)
219 int fTaskList
[QUEUE_SIZE
];
220 volatile AtomicCounter fCounter
;
221 UInt64 fStealingStart
;
225 INLINE
TaskQueue(int cur_thread
)
227 for (int i
= 0; i
< QUEUE_SIZE
; i
++) {
230 gTaskQueueList
[cur_thread
] = this;
234 INLINE
void PushHead(int item
)
236 fTaskList
[Head(fCounter
)] = item
;
242 AtomicCounter old_val
;
243 AtomicCounter new_val
;
248 if (Head(old_val
) == Tail(old_val
)) {
249 return WORK_STEALING_INDEX
;
253 } while (!CAS1(&fCounter
, Value(old_val
), Value(new_val
)));
255 return fTaskList
[Head(old_val
) - 1];
260 AtomicCounter old_val
;
261 AtomicCounter new_val
;
266 if (Head(old_val
) == Tail(old_val
)) {
267 return WORK_STEALING_INDEX
;
271 } while (!CAS1(&fCounter
, Value(old_val
), Value(new_val
)));
273 return fTaskList
[Tail(old_val
)];
276 INLINE
void MeasureStealingDur()
278 // Takes first timetamp
279 if (fStealingStart
== 0) {
280 fStealingStart
= DSP_rdtsc();
281 } else if ((DSP_rdtsc() - fStealingStart
) > gMaxStealing
) {
286 INLINE
void ResetStealingDur()
291 static INLINE
int GetNextTask(int thread
, int num_threads
)
294 for (int i
= 0; i
< num_threads
; i
++) {
295 if ((i
!= thread
) && gTaskQueueList
[i
] && (tasknum
= gTaskQueueList
[i
]->PopTail()) != WORK_STEALING_INDEX
) {
297 //if (thread != MASTER_THREAD)
298 gTaskQueueList
[thread
]->ResetStealingDur();
300 return tasknum
; // Task is found
305 //if (thread != MASTER_THREAD)
306 gTaskQueueList
[thread
]->MeasureStealingDur();
308 return WORK_STEALING_INDEX
; // Otherwise will try "workstealing" again next cycle...
311 INLINE
void InitTaskList(int task_list_size
, int* task_list
, int thread_num
, int cur_thread
, int& tasknum
)
313 int task_slice
= task_list_size
/ thread_num
;
314 int task_slice_rest
= task_list_size
% thread_num
;
316 if (task_slice
== 0) {
317 // Each thread directly executes one task
318 tasknum
= task_list
[cur_thread
];
319 // Thread 0 takes remaining ready tasks
320 if (cur_thread
== 0) {
321 for (int index
= 0; index
< task_slice_rest
- thread_num
; index
++) {
322 PushHead(task_list
[task_slice_rest
+ index
]);
326 // Each thread takes a part of ready tasks
328 for (index
= 0; index
< task_slice
- 1; index
++) {
329 PushHead(task_list
[cur_thread
* task_slice
+ index
]);
331 // Each thread directly executes one task
332 tasknum
= task_list
[cur_thread
* task_slice
+ index
];
333 // Thread 0 takes remaining ready tasks
334 if (cur_thread
== 0) {
335 for (index
= 0; index
< task_slice_rest
; index
++) {
336 PushHead(task_list
[thread_num
* task_slice
+ index
]);
342 static INLINE
void Init()
344 for (int i
= 0; i
< THREAD_SIZE
; i
++) {
345 gTaskQueueList
[i
] = 0;
353 volatile int gTaskList
[QUEUE_SIZE
];
357 for (int i
= 0; i
< QUEUE_SIZE
; i
++) {
362 INLINE
void InitTask(int task
, int val
)
364 gTaskList
[task
] = val
;
369 for (int i
= 0; i
< QUEUE_SIZE
; i
++) {
370 printf("Task = %d activation = %d\n", i
, gTaskList
[i
]);
374 INLINE
void ActivateOutputTask(TaskQueue
& queue
, int task
, int& tasknum
)
376 if (DEC_ATOMIC(&gTaskList
[task
]) == 1) {
377 if (tasknum
== WORK_STEALING_INDEX
) {
380 queue
.PushHead(task
);
385 INLINE
void ActivateOutputTask(TaskQueue
& queue
, int task
)
387 if (DEC_ATOMIC(&gTaskList
[task
]) == 1) {
388 queue
.PushHead(task
);
392 INLINE
void ActivateOneOutputTask(TaskQueue
& queue
, int task
, int& tasknum
)
394 if (DEC_ATOMIC(&gTaskList
[task
]) == 1) {
397 tasknum
= queue
.PopHead();
401 INLINE
void GetReadyTask(TaskQueue
& queue
, int& tasknum
)
403 if (tasknum
== WORK_STEALING_INDEX
) {
404 tasknum
= queue
.PopHead();
411 #define THREAD_POOL_SIZE 16
412 #define JACK_SCHED_POLICY SCHED_FIFO
414 /* use 512KB stack per thread - the default is way too high to be feasible
415 * with mlockall() on many systems */
416 #define THREAD_STACK 524288
421 #include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacTypes.h>
422 #include <mach/thread_policy.h>
423 #include <mach/thread_act.h>
425 #define THREAD_SET_PRIORITY 0
426 #define THREAD_SCHEDULED_PRIORITY 1
428 static UInt32
GetThreadPriority(pthread_t thread
, int inWhichPriority
);
430 // returns the thread's priority as it was last set by the API
431 static UInt32
GetThreadSetPriority(pthread_t thread
)
433 return GetThreadPriority(thread
, THREAD_SET_PRIORITY
);
436 // returns the thread's priority as it was last scheduled by the Kernel
437 static UInt32
GetThreadScheduledPriority(pthread_t thread
)
439 return GetThreadPriority(thread
, THREAD_SCHEDULED_PRIORITY
);
442 static int SetThreadToPriority(pthread_t thread
, UInt32 inPriority
, Boolean inIsFixed
, UInt64 period
, UInt64 computation
, UInt64 constraint
)
444 if (inPriority
== 96) {
445 // REAL-TIME / TIME-CONSTRAINT THREAD
446 thread_time_constraint_policy_data_t theTCPolicy
;
447 theTCPolicy
.period
= period
;
448 theTCPolicy
.computation
= computation
;
449 theTCPolicy
.constraint
= constraint
;
450 theTCPolicy
.preemptible
= true;
451 kern_return_t res
= thread_policy_set(pthread_mach_thread_np(thread
), THREAD_TIME_CONSTRAINT_POLICY
, (thread_policy_t
)&theTCPolicy
, THREAD_TIME_CONSTRAINT_POLICY_COUNT
);
452 return (res
== KERN_SUCCESS
) ? 0 : -1;
455 thread_extended_policy_data_t theFixedPolicy
;
456 thread_precedence_policy_data_t thePrecedencePolicy
;
457 SInt32 relativePriority
;
459 // [1] SET FIXED / NOT FIXED
460 theFixedPolicy
.timeshare
= !inIsFixed
;
461 thread_policy_set(pthread_mach_thread_np(thread
), THREAD_EXTENDED_POLICY
, (thread_policy_t
)&theFixedPolicy
, THREAD_EXTENDED_POLICY_COUNT
);
463 // [2] SET PRECEDENCE
464 // N.B.: We expect that if thread A created thread B, and the program wishes to change
465 // the priority of thread B, then the call to change the priority of thread B must be
467 // This assumption allows us to use pthread_self() to correctly calculate the priority
468 // of the feeder thread (since precedency policy's importance is relative to the
469 // spawning thread's priority.)
470 relativePriority
= inPriority
- GetThreadSetPriority(pthread_self());
472 thePrecedencePolicy
.importance
= relativePriority
;
473 kern_return_t res
= thread_policy_set(pthread_mach_thread_np(thread
), THREAD_PRECEDENCE_POLICY
, (thread_policy_t
)&thePrecedencePolicy
, THREAD_PRECEDENCE_POLICY_COUNT
);
474 return (res
== KERN_SUCCESS
) ? 0 : -1;
478 static UInt32
GetThreadPriority(pthread_t thread
, int inWhichPriority
)
480 thread_basic_info_data_t threadInfo
;
481 policy_info_data_t thePolicyInfo
;
485 count
= THREAD_BASIC_INFO_COUNT
;
486 thread_info(pthread_mach_thread_np(thread
), THREAD_BASIC_INFO
, (thread_info_t
)&threadInfo
, &count
);
488 switch (threadInfo
.policy
) {
489 case POLICY_TIMESHARE
:
490 count
= POLICY_TIMESHARE_INFO_COUNT
;
491 thread_info(pthread_mach_thread_np(thread
), THREAD_SCHED_TIMESHARE_INFO
, (thread_info_t
)&(thePolicyInfo
.ts
), &count
);
492 if (inWhichPriority
== THREAD_SCHEDULED_PRIORITY
) {
493 return thePolicyInfo
.ts
.cur_priority
;
495 return thePolicyInfo
.ts
.base_priority
;
500 count
= POLICY_FIFO_INFO_COUNT
;
501 thread_info(pthread_mach_thread_np(thread
), THREAD_SCHED_FIFO_INFO
, (thread_info_t
)&(thePolicyInfo
.fifo
), &count
);
502 if ((thePolicyInfo
.fifo
.depressed
) && (inWhichPriority
== THREAD_SCHEDULED_PRIORITY
)) {
503 return thePolicyInfo
.fifo
.depress_priority
;
505 return thePolicyInfo
.fifo
.base_priority
;
509 count
= POLICY_RR_INFO_COUNT
;
510 thread_info(pthread_mach_thread_np(thread
), THREAD_SCHED_RR_INFO
, (thread_info_t
)&(thePolicyInfo
.rr
), &count
);
511 if ((thePolicyInfo
.rr
.depressed
) && (inWhichPriority
== THREAD_SCHEDULED_PRIORITY
)) {
512 return thePolicyInfo
.rr
.depress_priority
;
514 return thePolicyInfo
.rr
.base_priority
;
521 static int GetParams(pthread_t thread
, UInt64
* period
, UInt64
* computation
, UInt64
* constraint
)
523 thread_time_constraint_policy_data_t theTCPolicy
;
524 mach_msg_type_number_t count
= THREAD_TIME_CONSTRAINT_POLICY_COUNT
;
525 boolean_t get_default
= false;
527 kern_return_t res
= thread_policy_get(pthread_mach_thread_np(thread
),
528 THREAD_TIME_CONSTRAINT_POLICY
,
529 (thread_policy_t
)&theTCPolicy
,
532 if (res
== KERN_SUCCESS
) {
533 *period
= theTCPolicy
.period
;
534 *computation
= theTCPolicy
.computation
;
535 *constraint
= theTCPolicy
.constraint
;
542 static UInt64 period
= 0;
543 static UInt64 computation
= 0;
544 static UInt64 constraint
= 0;
546 INLINE
void GetRealTime()
549 GetParams(pthread_self(), &period
, &computation
, &constraint
);
553 INLINE
void SetRealTime()
555 SetThreadToPriority(pthread_self(), 96, true, period
, computation
, constraint
);
558 void CancelThread(pthread_t fThread
)
560 mach_port_t machThread
= pthread_mach_thread_np(fThread
);
561 thread_terminate(machThread
);
573 static int faust_sched_policy
= -1;
574 static struct sched_param faust_rt_param
;
576 INLINE
void GetRealTime()
578 if (faust_sched_policy
== -1) {
579 memset(&faust_rt_param
, 0, sizeof(faust_rt_param
));
580 pthread_getschedparam(pthread_self(), &faust_sched_policy
, &faust_rt_param
);
584 INLINE
void SetRealTime()
586 faust_rt_param
.sched_priority
--;
587 pthread_setschedparam(pthread_self(), faust_sched_policy
, &faust_rt_param
);
590 void CancelThread(pthread_t fThread
)
592 pthread_cancel(fThread
);
593 pthread_join(fThread
, NULL
);
604 #define KDSPMESURE 50
606 static INLINE
int Range(int min
, int max
, int val
)
610 } else if (val
> max
) {
619 UInt64 fTiming
[KDSPMESURE
];
624 int fOldfDynamicNumThreads
;
627 virtual void computeThread(int cur_thread
) = 0;
629 Runnable():fCounter(0), fOldMean(1000000000.f
), fOldfDynamicNumThreads(1)
631 memset(fTiming
, 0, sizeof(long long int ) * KDSPMESURE
);
632 fDynAdapt
= getenv("OMP_DYN_THREAD") ? strtol(getenv("OMP_DYN_THREAD"), NULL
, 10) : false;
635 INLINE
float ComputeMean()
638 for (int i
= 0; i
< KDSPMESURE
; i
++) {
639 mean
+= float(fTiming
[i
]);
641 mean
/= float(KDSPMESURE
);
645 INLINE
void StartMeasure()
650 fStart
= DSP_rdtsc();
653 INLINE
void StopMeasure(int staticthreadnum
, int& dynthreadnum
)
659 fCounter
= (fCounter
+ 1) % KDSPMESURE
;
661 float mean
= ComputeMean();
662 if (fabs(mean
- fOldMean
) > 5000) {
663 if (mean
> fOldMean
) { // Worse...
664 //printf("Worse %f %f\n", mean, fOldMean);
665 if (fOldfDynamicNumThreads
> dynthreadnum
) {
666 fOldfDynamicNumThreads
= dynthreadnum
;
669 fOldfDynamicNumThreads
= dynthreadnum
;
672 } else { // Better...
673 //printf("Better %f %f\n", mean, fOldMean);
674 if (fOldfDynamicNumThreads
> dynthreadnum
) {
675 fOldfDynamicNumThreads
= dynthreadnum
;
678 fOldfDynamicNumThreads
= dynthreadnum
;
683 dynthreadnum
= Range(1, staticthreadnum
, dynthreadnum
);
684 //printf("dynthreadnum %d\n", dynthreadnum);
687 fTiming
[fCounter
] = fStop
- fStart
;
693 struct DSPThreadPool
{
695 DSPThread
* fThreadPool
[THREAD_POOL_SIZE
];
697 volatile int fCurThreadCount
;
702 void StartAll(int num
, bool realtime
);
704 void SignalAll(int num
, Runnable
* runnable
);
709 static DSPThreadPool
* Init();
710 static void Destroy();
717 DSPThreadPool
* fThreadPool
;
724 DSPThread(int num
, DSPThreadPool
* pool
)
731 sprintf(fName
, "faust_sem_%d_%p", GetPID(), this);
733 if ((fSemaphore
= sem_open(fName
, O_CREAT
, 0777, 0)) == (sem_t
*)SEM_FAILED
) {
734 printf("Allocate: can't check in named semaphore name = %s err = %s", fName
, strerror(errno
));
741 sem_close(fSemaphore
);
746 while (sem_wait(fSemaphore
) != 0) {}
747 fRunnable
->computeThread(fNum
+ 1);
748 fThreadPool
->SignalOne();
751 static void* ThreadHandler(void* arg
)
753 DSPThread
* thread
= static_cast<DSPThread
*>(arg
);
757 // One "dummy" cycle to setup thread
758 if (thread
->fRealTime
) {
770 int Start(bool realtime
)
772 pthread_attr_t attributes
;
773 struct sched_param rt_param
;
774 pthread_attr_init(&attributes
);
776 int priority
= 60; // TODO
782 fRealTime
= getenv("OMP_REALTIME") ? strtol(getenv("OMP_REALTIME"), NULL
, 10) : true;
785 if ((res
= pthread_attr_setdetachstate(&attributes
, PTHREAD_CREATE_JOINABLE
))) {
786 printf("Cannot request joinable thread creation for real-time thread res = %d err = %s\n", res
, strerror(errno
));
790 if ((res
= pthread_attr_setscope(&attributes
, PTHREAD_SCOPE_SYSTEM
))) {
791 printf("Cannot set scheduling scope for real-time thread res = %d err = %s\n", res
, strerror(errno
));
797 if ((res
= pthread_attr_setinheritsched(&attributes
, PTHREAD_EXPLICIT_SCHED
))) {
798 printf("Cannot request explicit scheduling for RT thread res = %d err = %s\n", res
, strerror(errno
));
802 if ((res
= pthread_attr_setschedpolicy(&attributes
, JACK_SCHED_POLICY
))) {
803 printf("Cannot set RR scheduling class for RT thread res = %d err = %s\n", res
, strerror(errno
));
807 memset(&rt_param
, 0, sizeof(rt_param
));
808 rt_param
.sched_priority
= priority
;
810 if ((res
= pthread_attr_setschedparam(&attributes
, &rt_param
))) {
811 printf("Cannot set scheduling priority for RT thread res = %d err = %s\n", res
, strerror(errno
));
817 if ((res
= pthread_attr_setinheritsched(&attributes
, PTHREAD_INHERIT_SCHED
))) {
818 printf("Cannot request explicit scheduling for RT thread res = %d err = %s\n", res
, strerror(errno
));
823 if ((res
= pthread_attr_setstacksize(&attributes
, THREAD_STACK
))) {
824 printf("Cannot set thread stack size res = %d err = %s\n", res
, strerror(errno
));
828 if ((res
= pthread_create(&fThread
, &attributes
, ThreadHandler
, this))) {
829 printf("Cannot create thread res = %d err = %s\n", res
, strerror(errno
));
833 pthread_attr_destroy(&attributes
);
837 void Signal(bool stop
, Runnable
* runnable
)
839 fRunnable
= runnable
;
840 sem_post(fSemaphore
);
845 CancelThread(fThread
);
850 DSPThreadPool::DSPThreadPool()
852 for (int i
= 0; i
< THREAD_POOL_SIZE
; i
++) {
853 fThreadPool
[i
] = NULL
;
859 DSPThreadPool::~DSPThreadPool()
863 for (int i
= 0; i
< fThreadCount
; i
++) {
864 delete(fThreadPool
[i
]);
865 fThreadPool
[i
] = NULL
;
871 void DSPThreadPool::StartAll(int num
, bool realtime
)
873 if (fThreadCount
== 0) { // Protection for multiple call... (like LADSPA plug-ins in Ardour)
874 for (int i
= 0; i
< num
; i
++) {
875 fThreadPool
[i
] = new DSPThread(i
, this);
876 fThreadPool
[i
]->Start(realtime
);
882 void DSPThreadPool::StopAll()
884 for (int i
= 0; i
< fThreadCount
; i
++) {
885 fThreadPool
[i
]->Stop();
889 void DSPThreadPool::SignalAll(int num
, Runnable
* runnable
)
891 fCurThreadCount
= num
;
893 for (int i
= 0; i
< num
; i
++) { // Important : use local num here...
894 fThreadPool
[i
]->Signal(false, runnable
);
898 void DSPThreadPool::SignalOne()
900 DEC_ATOMIC(&fCurThreadCount
);
903 bool DSPThreadPool::IsFinished()
905 return (fCurThreadCount
== 0);
908 DSPThreadPool
* DSPThreadPool::Init()
910 if (gClientCount
++ == 0 && !gThreadPool
) {
911 gThreadPool
= new DSPThreadPool();
916 void DSPThreadPool::Destroy()
918 if (--gClientCount
== 0 && gThreadPool
) {
927 TaskQueue
* gTaskQueueList
[THREAD_SIZE
] = {0};
929 DSPThreadPool
* gThreadPool
= 0;
930 int gClientCount
= 0;
932 int clock_per_microsec
= (getenv("CLOCKSPERSEC")
933 ? strtoll(getenv("CLOCKSPERSEC"), NULL
, 10)
934 : DEFAULT_CLOCKSPERSEC
) / 1000000;
936 UInt64 gMaxStealing
= getenv("OMP_STEALING_DUR")
937 ? strtoll(getenv("OMP_STEALING_DUR"), NULL
, 10) * clock_per_microsec
938 : MAX_STEAL_DUR
* clock_per_microsec
;