Introduction
Introduction Statistics Contact Development Disclaimer Help
tthread.c - plan9port - [fork] Plan 9 from user space
git clone git://src.adamsgaard.dk/plan9port
Log
Files
Refs
README
LICENSE
---
tthread.c (16472B)
---
1 #include "threadimpl.h"
2
3 int _threaddebuglevel = 0;
4
5 static uint threadnproc;
6 static uint threadnsysproc;
7 static Lock threadnproclock;
8 static Ref threadidref;
9 static Proc *threadmainproc;
10
11 static void addproc(Proc*);
12 static void delproc(Proc*);
13 static void addthread(_Threadlist*, _Thread*);
14 static void delthread(_Threadlist*, _Thread*);
15 static int onlist(_Threadlist*, _Thread*);
16 static void addthreadinproc(Proc*, _Thread*);
17 static void delthreadinproc(Proc*, _Thread*);
18 static void procmain(Proc*);
19 static int threadinfo(void*, char*);
20 static void pthreadscheduler(Proc *p);
21 static void pthreadsleepschedlocked(Proc *p, _Thre…
22 static void pthreadwakeupschedlocked(Proc *p, _Thr…
23 static _Thread* procnext(Proc*, _Thread*);
24
25 static void
26 _threaddebug(_Thread *t, char *fmt, ...)
27 {
28 va_list arg;
29 char buf[128];
30 char *p;
31 static int fd = -1;
32
33 if(_threaddebuglevel == 0)
34 return;
35
36 if(fd < 0){
37 p = strrchr(argv0, '/');
38 if(p)
39 p++;
40 else
41 p = argv0;
42 snprint(buf, sizeof buf, "/tmp/%s.tlog", p);
43 if((fd = create(buf, OWRITE, 0666)) < 0)
44 fd = open("/dev/null", OWRITE);
45 if(fd >= 0 && fd != 2){
46 dup(fd, 2);
47 close(fd);
48 fd = 2;
49 }
50 }
51
52 va_start(arg, fmt);
53 vsnprint(buf, sizeof buf, fmt, arg);
54 va_end(arg);
55 if(t == nil)
56 t = proc()->thread;
57 if(t)
58 fprint(fd, "%p %d.%d: %s\n", proc(), getpid(), t->id, bu…
59 else
60 fprint(fd, "%p %d._: %s\n", proc(), getpid(), buf);
61 }
62
63 static _Thread*
64 getthreadnow(void)
65 {
66 return proc()->thread;
67 }
68 _Thread *(*threadnow)(void) = getthreadnow;
69
70 static Proc*
71 procalloc(void)
72 {
73 Proc *p;
74
75 p = malloc(sizeof *p);
76 if(p == nil)
77 sysfatal("procalloc malloc: %r");
78 memset(p, 0, sizeof *p);
79 addproc(p);
80 lock(&threadnproclock);
81 threadnproc++;
82 unlock(&threadnproclock);
83 return p;
84 }
85
86 _Thread*
87 _threadcreate(Proc *p, void (*fn)(void*), void *arg, uint stack)
88 {
89 _Thread *t;
90
91 USED(stack);
92 t = malloc(sizeof *t);
93 if(t == nil)
94 sysfatal("threadcreate malloc: %r");
95 memset(t, 0, sizeof *t);
96 t->id = incref(&threadidref);
97 t->startfn = fn;
98 t->startarg = arg;
99 t->proc = p;
100 if(p->nthread != 0)
101 _threadpthreadstart(p, t);
102 else
103 t->mainthread = 1;
104 p->nthread++;
105 addthreadinproc(p, t);
106 _threadready(t);
107 return t;
108 }
109
110 int
111 threadcreate(void (*fn)(void*), void *arg, uint stack)
112 {
113 _Thread *t;
114
115 t = _threadcreate(proc(), fn, arg, stack);
116 _threaddebug(nil, "threadcreate %d", t->id);
117 return t->id;
118 }
119
120 int
121 proccreate(void (*fn)(void*), void *arg, uint stack)
122 {
123 int id;
124 _Thread *t;
125 Proc *p;
126
127 p = procalloc();
128 t = _threadcreate(p, fn, arg, stack);
129 id = t->id; /* t might be freed after _procstart */
130 _threaddebug(t, "proccreate %p", p);
131 _procstart(p, procmain);
132 return id;
133 }
134
135 void
136 _threadswitch(void)
137 {
138 Proc *p;
139
140 needstack(0);
141 p = proc();
142 /*print("threadswtch %p\n", p); */
143 pthreadscheduler(p);
144 }
145
146 void
147 _threadready(_Thread *t)
148 {
149 Proc *p;
150
151 p = t->proc;
152 lock(&p->lock);
153 p->runrend.l = &p->lock;
154 addthread(&p->runqueue, t);
155 /*print("%d wake for job %d->%d\n", time(0), getpid(), p->osprocid); */
156 if(p != proc())
157 _procwakeupandunlock(&p->runrend);
158 else
159 unlock(&p->lock);
160 }
161
162 int
163 threadidle(void)
164 {
165 int n;
166 Proc *p;
167
168 p = proc();
169 n = p->nswitch;
170 lock(&p->lock);
171 p->runrend.l = &p->lock;
172 addthread(&p->idlequeue, p->thread);
173 unlock(&p->lock);
174 _threadswitch();
175 return p->nswitch - n;
176 }
177
178 int
179 threadyield(void)
180 {
181 int n;
182 Proc *p;
183
184 p = proc();
185 n = p->nswitch;
186 _threadready(p->thread);
187 _threadswitch();
188 return p->nswitch - n;
189 }
190
191 void
192 threadexits(char *msg)
193 {
194 Proc *p;
195
196 p = proc();
197 if(msg == nil)
198 msg = "";
199 utfecpy(p->msg, p->msg+sizeof p->msg, msg);
200 proc()->thread->exiting = 1;
201 _threadswitch();
202 }
203
204 void
205 threadpin(void)
206 {
207 Proc *p;
208
209 p = proc();
210 if(p->pinthread){
211 fprint(2, "already pinning a thread - %p %p\n", p->pinth…
212 assert(0);
213 }
214 p->pinthread = p->thread;
215 }
216
217 void
218 threadunpin(void)
219 {
220 Proc *p;
221
222 p = proc();
223 if(p->pinthread != p->thread){
224 fprint(2, "wrong pinthread - %p %p\n", p->pinthread, p->…
225 assert(0);
226 }
227 p->pinthread = nil;
228 }
229
230 void
231 threadsysfatal(char *fmt, va_list arg)
232 {
233 char buf[256];
234
235 vseprint(buf, buf+sizeof(buf), fmt, arg);
236 __fixargv0();
237 fprint(2, "%s: %s\n", argv0 ? argv0 : "<prog>", buf);
238 threadexitsall(buf);
239 }
240
241 static void
242 procmain(Proc *p)
243 {
244 _Thread *t;
245
246 _threadsetproc(p);
247
248 /* take out first thread to run on system stack */
249 t = p->runqueue.head;
250 delthread(&p->runqueue, t);
251
252 /* run it */
253 p->thread = t;
254 t->startfn(t->startarg);
255 if(p->nthread != 0)
256 threadexits(nil);
257 }
258
259 void
260 _threadpthreadmain(Proc *p, _Thread *t)
261 {
262 _threadsetproc(p);
263 lock(&p->lock);
264 pthreadsleepschedlocked(p, t);
265 unlock(&p->lock);
266 _threaddebug(nil, "startfn");
267 t->startfn(t->startarg);
268 threadexits(nil);
269 }
270
271 static void
272 pthreadsleepschedlocked(Proc *p, _Thread *t)
273 {
274 _threaddebug(t, "pthreadsleepsched %p %d", p, t->id);;
275 t->schedrend.l = &p->lock;
276 while(p->schedthread != t)
277 _procsleep(&t->schedrend);
278 }
279
280 static void
281 pthreadwakeupschedlocked(Proc *p, _Thread *self, _Thread *t)
282 {
283 _threaddebug(self, "pthreadwakeupschedlocked %p %d", p, t->id);;
284 t->schedrend.l = &p->lock;
285 p->schedthread = t;
286 _procwakeup(&t->schedrend);
287 }
288
289 static void
290 pthreadscheduler(Proc *p)
291 {
292 _Thread *self, *t;
293
294 _threaddebug(nil, "scheduler");
295 lock(&p->lock);
296 self = p->thread;
297 p->thread = nil;
298 _threaddebug(self, "pausing");
299
300 if(self->exiting) {
301 _threaddebug(self, "exiting");
302 delthreadinproc(p, self);
303 p->nthread--;
304 }
305
306 t = procnext(p, self);
307 if(t != nil) {
308 pthreadwakeupschedlocked(p, self, t);
309 if(!self->exiting) {
310 pthreadsleepschedlocked(p, self);
311 _threaddebug(nil, "resume %d", self->id);
312 unlock(&p->lock);
313 return;
314 }
315 }
316
317 if(t == nil) {
318 /* Tear down proc bookkeeping. Wait to free p. */
319 delproc(p);
320 lock(&threadnproclock);
321 if(p->sysproc)
322 --threadnsysproc;
323 if(--threadnproc == threadnsysproc)
324 threadexitsall(p->msg);
325 unlock(&threadnproclock);
326 }
327
328 /* Tear down pthread. */
329 if(self->mainthread && p->mainproc) {
330 _threaddaemonize();
331 _threaddebug(self, "sleeper");
332 unlock(&p->lock);
333 /*
334 * Avoid bugs with main pthread exiting.
335 * When all procs are gone, threadexitsall above will ha…
336 */
337 for(;;)
338 sleep(60*60*1000);
339 }
340 _threadsetproc(nil);
341 free(self);
342 unlock(&p->lock);
343 if(t == nil)
344 free(p);
345 _threadpexit();
346 }
347
348 static _Thread*
349 procnext(Proc *p, _Thread *self)
350 {
351 _Thread *t;
352
353 if((t = p->pinthread) != nil){
354 while(!onlist(&p->runqueue, t)){
355 p->runrend.l = &p->lock;
356 _threaddebug(self, "scheduler sleep (pin)");
357 _procsleep(&p->runrend);
358 _threaddebug(self, "scheduler wake (pin)");
359 }
360 } else
361 while((t = p->runqueue.head) == nil){
362 if(p->nthread == 0)
363 return nil;
364 if((t = p->idlequeue.head) != nil){
365 /*
366 * Run all the idling threads once.
367 */
368 while((t = p->idlequeue.head) != nil){
369 delthread(&p->idlequeue, t);
370 addthread(&p->runqueue, t);
371 }
372 continue;
373 }
374 p->runrend.l = &p->lock;
375 _threaddebug(self, "scheduler sleep");
376 _procsleep(&p->runrend);
377 _threaddebug(self, "scheduler wake");
378 }
379
380 if(p->pinthread && p->pinthread != t)
381 fprint(2, "p->pinthread %p t %p\n", p->pinthread, t);
382 assert(p->pinthread == nil || p->pinthread == t);
383 delthread(&p->runqueue, t);
384
385 p->thread = t;
386 p->nswitch++;
387 return t;
388 }
389
390 void
391 _threadsetsysproc(void)
392 {
393 lock(&threadnproclock);
394 if(++threadnsysproc == threadnproc)
395 threadexitsall(nil);
396 unlock(&threadnproclock);
397 proc()->sysproc = 1;
398 }
399
400 void**
401 procdata(void)
402 {
403 return &proc()->udata;
404 }
405
406 void**
407 threaddata(void)
408 {
409 return &proc()->thread->udata;
410 }
411
412 extern Jmp *(*_notejmpbuf)(void);
413 static Jmp*
414 threadnotejmp(void)
415 {
416 return &proc()->sigjmp;
417 }
418
419 /*
420 * debugging
421 */
422 void
423 threadsetname(char *fmt, ...)
424 {
425 va_list arg;
426 _Thread *t;
427
428 t = proc()->thread;
429 va_start(arg, fmt);
430 vsnprint(t->name, sizeof t->name, fmt, arg);
431 va_end(arg);
432 }
433
434 char*
435 threadgetname(void)
436 {
437 return proc()->thread->name;
438 }
439
440 void
441 threadsetstate(char *fmt, ...)
442 {
443 va_list arg;
444 _Thread *t;
445
446 t = proc()->thread;
447 va_start(arg, fmt);
448 vsnprint(t->state, sizeof t->name, fmt, arg);
449 va_end(arg);
450 }
451
452 int
453 threadid(void)
454 {
455 _Thread *t;
456
457 t = proc()->thread;
458 return t->id;
459 }
460
461 void
462 needstack(int n)
463 {
464 _Thread *t;
465
466 t = proc()->thread;
467 if(t->stk == nil)
468 return;
469
470 if((char*)&t <= (char*)t->stk
471 || (char*)&t - (char*)t->stk < 256+n){
472 fprint(2, "thread stack overflow: &t=%p tstk=%p n=%d\n",…
473 abort();
474 }
475 }
476
477 static int
478 singlethreaded(void)
479 {
480 return threadnproc == 1 && _threadprocs->nthread == 1;
481 }
482
483 /*
484 * locking
485 */
486 static int
487 threadqlock(QLock *l, int block, ulong pc)
488 {
489 /*print("threadqlock %p\n", l); */
490 lock(&l->l);
491 if(l->owner == nil){
492 l->owner = (*threadnow)();
493 /*print("qlock %p @%#x by %p\n", l, pc, l->owner); */
494 if(l->owner == nil) {
495 fprint(2, "%s: qlock uncontended owner=nil oops\…
496 abort();
497 }
498 unlock(&l->l);
499 return 1;
500 }
501 if(!block){
502 unlock(&l->l);
503 return 0;
504 }
505
506 if(singlethreaded()){
507 fprint(2, "qlock deadlock\n");
508 abort();
509 }
510
511 /*print("qsleep %p @%#x by %p\n", l, pc, (*threadnow)()); */
512 addthread(&l->waiting, (*threadnow)());
513 unlock(&l->l);
514
515 _threadswitch();
516
517 if(l->owner != (*threadnow)()){
518 fprint(2, "%s: qlock pc=0x%lux owner=%p self=%p oops\n",
519 argv0, pc, l->owner, (*threadnow)());
520 abort();
521 }
522 if(l->owner == nil) {
523 fprint(2, "%s: qlock threadswitch owner=nil oops\n", arg…
524 abort();
525 }
526
527 /*print("qlock wakeup %p @%#x by %p\n", l, pc, (*threadnow)()); */
528 return 1;
529 }
530
531 static void
532 threadqunlock(QLock *l, ulong pc)
533 {
534 _Thread *ready;
535
536 lock(&l->l);
537 /*print("qlock unlock %p @%#x by %p (owner %p)\n", l, pc, (*threadnow)()…
538 if(l->owner == 0){
539 fprint(2, "%s: qunlock pc=0x%lux owner=%p self=%p oops\n…
540 argv0, pc, l->owner, (*threadnow)());
541 abort();
542 }
543 if((l->owner = ready = l->waiting.head) != nil)
544 delthread(&l->waiting, l->owner);
545 /*
546 * N.B. Cannot call _threadready() before unlocking l->l,
547 * because the thread we are readying might:
548 * - be in another proc
549 * - start running immediately
550 * - and free l before we get a chance to run again
551 */
552 unlock(&l->l);
553 if(ready)
554 _threadready(l->owner);
555 }
556
557 static int
558 threadrlock(RWLock *l, int block, ulong pc)
559 {
560 USED(pc);
561
562 lock(&l->l);
563 if(l->writer == nil && l->wwaiting.head == nil){
564 l->readers++;
565 unlock(&l->l);
566 return 1;
567 }
568 if(!block){
569 unlock(&l->l);
570 return 0;
571 }
572 if(singlethreaded()){
573 fprint(2, "rlock deadlock\n");
574 abort();
575 }
576 addthread(&l->rwaiting, (*threadnow)());
577 unlock(&l->l);
578 _threadswitch();
579 return 1;
580 }
581
582 static int
583 threadwlock(RWLock *l, int block, ulong pc)
584 {
585 USED(pc);
586
587 lock(&l->l);
588 if(l->writer == nil && l->readers == 0){
589 l->writer = (*threadnow)();
590 unlock(&l->l);
591 return 1;
592 }
593 if(!block){
594 unlock(&l->l);
595 return 0;
596 }
597 if(singlethreaded()){
598 fprint(2, "wlock deadlock\n");
599 abort();
600 }
601 addthread(&l->wwaiting, (*threadnow)());
602 unlock(&l->l);
603 _threadswitch();
604 return 1;
605 }
606
607 static void
608 threadrunlock(RWLock *l, ulong pc)
609 {
610 _Thread *t;
611
612 USED(pc);
613 t = nil;
614 lock(&l->l);
615 --l->readers;
616 if(l->readers == 0 && (t = l->wwaiting.head) != nil){
617 delthread(&l->wwaiting, t);
618 l->writer = t;
619 }
620 unlock(&l->l);
621 if(t)
622 _threadready(t);
623
624 }
625
626 static void
627 threadwunlock(RWLock *l, ulong pc)
628 {
629 _Thread *t;
630
631 USED(pc);
632 lock(&l->l);
633 l->writer = nil;
634 assert(l->readers == 0);
635 while((t = l->rwaiting.head) != nil){
636 delthread(&l->rwaiting, t);
637 l->readers++;
638 _threadready(t);
639 }
640 t = nil;
641 if(l->readers == 0 && (t = l->wwaiting.head) != nil){
642 delthread(&l->wwaiting, t);
643 l->writer = t;
644 }
645 unlock(&l->l);
646 if(t)
647 _threadready(t);
648 }
649
650 /*
651 * sleep and wakeup
652 */
653 static void
654 threadrsleep(Rendez *r, ulong pc)
655 {
656 if(singlethreaded()){
657 fprint(2, "rsleep deadlock\n");
658 abort();
659 }
660 addthread(&r->waiting, proc()->thread);
661 qunlock(r->l);
662 _threadswitch();
663 qlock(r->l);
664 }
665
666 static int
667 threadrwakeup(Rendez *r, int all, ulong pc)
668 {
669 int i;
670 _Thread *t;
671
672 _threaddebug(nil, "rwakeup %p %d", r, all);
673 for(i=0;; i++){
674 if(i==1 && !all)
675 break;
676 if((t = r->waiting.head) == nil)
677 break;
678 _threaddebug(nil, "rwakeup %p %d -> wake %d", r, all, t-…
679 delthread(&r->waiting, t);
680 _threadready(t);
681 _threaddebug(nil, "rwakeup %p %d -> loop", r, all);
682 }
683 _threaddebug(nil, "rwakeup %p %d -> total %d", r, all, i);
684 return i;
685 }
686
687 /*
688 * startup
689 */
690
691 static int threadargc;
692 static char **threadargv;
693 int mainstacksize;
694 extern int _p9usepwlibrary; /* getgrgid etc. smash the stack - te…
695 static void
696 threadmainstart(void *v)
697 {
698 USED(v);
699
700 /*
701 * N.B. This call to proc() is a program's first call (indirectl…
702 * pthreads function while executing on a non-pthreads-allocated
703 * stack. If the pthreads implementation is using the stack poi…
704 * to locate the per-thread data, then this call will blow up.
705 * This means the pthread implementation is not suitable for
706 * running under libthread. Time to write your own. Sorry.
707 */
708 _p9usepwlibrary = 0;
709 threadmainproc = proc();
710 threadmain(threadargc, threadargv);
711 }
712
713 extern void (*_sysfatal)(char*, va_list);
714
715 int
716 main(int argc, char **argv)
717 {
718 Proc *p;
719 _Thread *t;
720 char *opts;
721
722 argv0 = argv[0];
723
724 opts = getenv("LIBTHREAD");
725 if(opts == nil)
726 opts = "";
727
728 if(threadmaybackground() && strstr(opts, "nodaemon") == nil && g…
729 _threadsetupdaemonize();
730
731 threadargc = argc;
732 threadargv = argv;
733
734 /*
735 * Install locking routines into C library.
736 */
737 _lock = _threadlock;
738 _unlock = _threadunlock;
739 _qlock = threadqlock;
740 _qunlock = threadqunlock;
741 _rlock = threadrlock;
742 _runlock = threadrunlock;
743 _wlock = threadwlock;
744 _wunlock = threadwunlock;
745 _rsleep = threadrsleep;
746 _rwakeup = threadrwakeup;
747 _notejmpbuf = threadnotejmp;
748 _pin = threadpin;
749 _unpin = threadunpin;
750 _sysfatal = threadsysfatal;
751
752 _pthreadinit();
753 p = procalloc();
754 p->mainproc = 1;
755 _threadsetproc(p);
756 if(mainstacksize == 0)
757 mainstacksize = 256*1024;
758 atnotify(threadinfo, 1);
759 t = _threadcreate(p, threadmainstart, nil, mainstacksize);
760 t->mainthread = 1;
761 procmain(p);
762 sysfatal("procmain returned in libthread");
763 /* does not return */
764 return 0;
765 }
766
767 /*
768 * hooray for linked lists
769 */
770 static void
771 addthread(_Threadlist *l, _Thread *t)
772 {
773 if(l->tail){
774 l->tail->next = t;
775 t->prev = l->tail;
776 }else{
777 l->head = t;
778 t->prev = nil;
779 }
780 l->tail = t;
781 t->next = nil;
782 }
783
784 static void
785 delthread(_Threadlist *l, _Thread *t)
786 {
787 if(t->prev)
788 t->prev->next = t->next;
789 else
790 l->head = t->next;
791 if(t->next)
792 t->next->prev = t->prev;
793 else
794 l->tail = t->prev;
795 }
796
797 /* inefficient but rarely used */
798 static int
799 onlist(_Threadlist *l, _Thread *t)
800 {
801 _Thread *tt;
802
803 for(tt = l->head; tt; tt=tt->next)
804 if(tt == t)
805 return 1;
806 return 0;
807 }
808
809 static void
810 addthreadinproc(Proc *p, _Thread *t)
811 {
812 _Threadlist *l;
813
814 l = &p->allthreads;
815 if(l->tail){
816 l->tail->allnext = t;
817 t->allprev = l->tail;
818 }else{
819 l->head = t;
820 t->allprev = nil;
821 }
822 l->tail = t;
823 t->allnext = nil;
824 }
825
826 static void
827 delthreadinproc(Proc *p, _Thread *t)
828 {
829 _Threadlist *l;
830
831 l = &p->allthreads;
832 if(t->allprev)
833 t->allprev->allnext = t->allnext;
834 else
835 l->head = t->allnext;
836 if(t->allnext)
837 t->allnext->allprev = t->allprev;
838 else
839 l->tail = t->allprev;
840 }
841
842 Proc *_threadprocs;
843 Lock _threadprocslock;
844 static Proc *_threadprocstail;
845
846 static void
847 addproc(Proc *p)
848 {
849 lock(&_threadprocslock);
850 if(_threadprocstail){
851 _threadprocstail->next = p;
852 p->prev = _threadprocstail;
853 }else{
854 _threadprocs = p;
855 p->prev = nil;
856 }
857 _threadprocstail = p;
858 p->next = nil;
859 unlock(&_threadprocslock);
860 }
861
862 static void
863 delproc(Proc *p)
864 {
865 lock(&_threadprocslock);
866 if(p->prev)
867 p->prev->next = p->next;
868 else
869 _threadprocs = p->next;
870 if(p->next)
871 p->next->prev = p->prev;
872 else
873 _threadprocstail = p->prev;
874 unlock(&_threadprocslock);
875 }
876
877 /*
878 * notify - for now just use the usual mechanisms
879 */
880 void
881 threadnotify(int (*f)(void*, char*), int in)
882 {
883 atnotify(f, in);
884 }
885
886 static int
887 onrunqueue(Proc *p, _Thread *t)
888 {
889 _Thread *tt;
890
891 for(tt=p->runqueue.head; tt; tt=tt->next)
892 if(tt == t)
893 return 1;
894 return 0;
895 }
896
897 /*
898 * print state - called from SIGINFO
899 */
900 static int
901 threadinfo(void *v, char *s)
902 {
903 Proc *p;
904 _Thread *t;
905
906 if(strcmp(s, "quit") != 0 && strcmp(s, "sys: status request") !=…
907 return 0;
908
909 for(p=_threadprocs; p; p=p->next){
910 fprint(2, "proc %p %s%s\n", (void*)p->osprocid, p->msg,
911 p->sysproc ? " (sysproc)": "");
912 for(t=p->allthreads.head; t; t=t->allnext){
913 fprint(2, "\tthread %d %s: %s %s\n",
914 t->id,
915 t == p->thread ? "Running" :
916 onrunqueue(p, t) ? "Ready" : "Sleeping",
917 t->state, t->name);
918 }
919 }
920 return 1;
921 }
You are viewing proxied material from mx1.adamsgaard.dk. The copyright of proxied material belongs to its original authors. Any comments or complaints in relation to proxied material should be directed to the original authors of the content concerned. Please see the disclaimer for more details.