tmux.c - plan9port - [fork] Plan 9 from user space | |
git clone git://src.adamsgaard.dk/plan9port | |
Log | |
Files | |
Refs | |
README | |
LICENSE | |
--- | |
tmux.c (5119B) | |
--- | |
1 /* Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technolo… | |
2 /* See COPYRIGHT */ | |
3 | |
4 /* | |
5 * Generic RPC packet multiplexor. Inspired by but not derived from | |
6 * Plan 9 kernel. Originally developed as part of Tra, later used in | |
7 * libnventi, and then finally split out into a generic library. | |
8 */ | |
9 | |
10 #include <u.h> | |
11 #include <libc.h> | |
12 #include <mux.h> | |
13 | |
14 static int gettag(Mux*, Muxrpc*); | |
15 static void puttag(Mux*, Muxrpc*); | |
16 static void enqueue(Mux*, Muxrpc*); | |
17 static void dequeue(Mux*, Muxrpc*); | |
18 | |
19 void | |
20 muxinit(Mux *mux) | |
21 { | |
22 memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk)); | |
23 mux->tagrend.l = &mux->lk; | |
24 mux->rpcfork.l = &mux->lk; | |
25 mux->sleep.next = &mux->sleep; | |
26 mux->sleep.prev = &mux->sleep; | |
27 } | |
28 | |
29 static Muxrpc* | |
30 allocmuxrpc(Mux *mux) | |
31 { | |
32 Muxrpc *r; | |
33 | |
34 /* must malloc because stack could be private */ | |
35 r = mallocz(sizeof(Muxrpc), 1); | |
36 if(r == nil){ | |
37 werrstr("mallocz: %r"); | |
38 return nil; | |
39 } | |
40 r->mux = mux; | |
41 r->r.l = &mux->lk; | |
42 r->waiting = 1; | |
43 | |
44 return r; | |
45 } | |
46 | |
47 static int | |
48 tagmuxrpc(Muxrpc *r, void *tx) | |
49 { | |
50 int tag; | |
51 Mux *mux; | |
52 | |
53 mux = r->mux; | |
54 /* assign the tag, add selves to response queue */ | |
55 qlock(&mux->lk); | |
56 tag = gettag(mux, r); | |
57 /*print("gettag %p %d\n", r, tag); */ | |
58 enqueue(mux, r); | |
59 qunlock(&mux->lk); | |
60 | |
61 /* actually send the packet */ | |
62 if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx)… | |
63 werrstr("settag/send tag %d: %r", tag); | |
64 fprint(2, "%r\n"); | |
65 qlock(&mux->lk); | |
66 dequeue(mux, r); | |
67 puttag(mux, r); | |
68 qunlock(&mux->lk); | |
69 return -1; | |
70 } | |
71 return 0; | |
72 } | |
73 | |
74 void | |
75 muxmsgandqlock(Mux *mux, void *p) | |
76 { | |
77 int tag; | |
78 Muxrpc *r2; | |
79 | |
80 tag = mux->gettag(mux, p) - mux->mintag; | |
81 /*print("mux tag %d\n", tag); */ | |
82 qlock(&mux->lk); | |
83 /* hand packet to correct sleeper */ | |
84 if(tag < 0 || tag >= mux->mwait){ | |
85 fprint(2, "%s: bad rpc tag %ux\n", argv0, tag); | |
86 /* must leak packet! don't know how to free it! */ | |
87 return; | |
88 } | |
89 r2 = mux->wait[tag]; | |
90 if(r2 == nil || r2->prev == nil){ | |
91 fprint(2, "%s: bad rpc tag %ux (no one waiting on that t… | |
92 /* must leak packet! don't know how to free it! */ | |
93 return; | |
94 } | |
95 r2->p = p; | |
96 dequeue(mux, r2); | |
97 rwakeup(&r2->r); | |
98 } | |
99 | |
100 void | |
101 electmuxer(Mux *mux) | |
102 { | |
103 Muxrpc *rpc; | |
104 | |
105 /* if there is anyone else sleeping, wake them to mux */ | |
106 for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){ | |
107 if(!rpc->async){ | |
108 mux->muxer = rpc; | |
109 rwakeup(&rpc->r); | |
110 return; | |
111 } | |
112 } | |
113 mux->muxer = nil; | |
114 } | |
115 | |
116 void* | |
117 muxrpc(Mux *mux, void *tx) | |
118 { | |
119 int tag; | |
120 Muxrpc *r; | |
121 void *p; | |
122 | |
123 if((r = allocmuxrpc(mux)) == nil) | |
124 return nil; | |
125 | |
126 if((tag = tagmuxrpc(r, tx)) < 0) | |
127 return nil; | |
128 | |
129 qlock(&mux->lk); | |
130 /* wait for our packet */ | |
131 while(mux->muxer && mux->muxer != r && !r->p) | |
132 rsleep(&r->r); | |
133 | |
134 /* if not done, there's no muxer: start muxing */ | |
135 if(!r->p){ | |
136 if(mux->muxer != nil && mux->muxer != r) | |
137 abort(); | |
138 mux->muxer = r; | |
139 while(!r->p){ | |
140 qunlock(&mux->lk); | |
141 _muxrecv(mux, 1, &p); | |
142 if(p == nil){ | |
143 /* eof -- just give up and pass the buck… | |
144 qlock(&mux->lk); | |
145 dequeue(mux, r); | |
146 break; | |
147 } | |
148 muxmsgandqlock(mux, p); | |
149 } | |
150 electmuxer(mux); | |
151 } | |
152 p = r->p; | |
153 puttag(mux, r); | |
154 qunlock(&mux->lk); | |
155 if(p == nil) | |
156 werrstr("unexpected eof"); | |
157 return p; | |
158 } | |
159 | |
160 Muxrpc* | |
161 muxrpcstart(Mux *mux, void *tx) | |
162 { | |
163 int tag; | |
164 Muxrpc *r; | |
165 | |
166 if((r = allocmuxrpc(mux)) == nil) | |
167 return nil; | |
168 r->async = 1; | |
169 if((tag = tagmuxrpc(r, tx)) < 0) | |
170 return nil; | |
171 return r; | |
172 } | |
173 | |
174 int | |
175 muxrpccanfinish(Muxrpc *r, void **vp) | |
176 { | |
177 void *p; | |
178 Mux *mux; | |
179 int ret; | |
180 | |
181 mux = r->mux; | |
182 qlock(&mux->lk); | |
183 ret = 1; | |
184 if(!r->p && !mux->muxer){ | |
185 mux->muxer = r; | |
186 while(!r->p){ | |
187 qunlock(&mux->lk); | |
188 p = nil; | |
189 if(!_muxrecv(mux, 0, &p)) | |
190 ret = 0; | |
191 if(p == nil){ | |
192 qlock(&mux->lk); | |
193 break; | |
194 } | |
195 muxmsgandqlock(mux, p); | |
196 } | |
197 electmuxer(mux); | |
198 } | |
199 p = r->p; | |
200 if(p) | |
201 puttag(mux, r); | |
202 qunlock(&mux->lk); | |
203 *vp = p; | |
204 return ret; | |
205 } | |
206 | |
207 static void | |
208 enqueue(Mux *mux, Muxrpc *r) | |
209 { | |
210 r->next = mux->sleep.next; | |
211 r->prev = &mux->sleep; | |
212 r->next->prev = r; | |
213 r->prev->next = r; | |
214 } | |
215 | |
216 static void | |
217 dequeue(Mux *mux, Muxrpc *r) | |
218 { | |
219 r->next->prev = r->prev; | |
220 r->prev->next = r->next; | |
221 r->prev = nil; | |
222 r->next = nil; | |
223 } | |
224 | |
225 static int | |
226 gettag(Mux *mux, Muxrpc *r) | |
227 { | |
228 int i, mw; | |
229 Muxrpc **w; | |
230 | |
231 for(;;){ | |
232 /* wait for a free tag */ | |
233 while(mux->nwait == mux->mwait){ | |
234 if(mux->mwait < mux->maxtag-mux->mintag){ | |
235 mw = mux->mwait; | |
236 if(mw == 0) | |
237 mw = 1; | |
238 else | |
239 mw <<= 1; | |
240 w = realloc(mux->wait, mw*sizeof(w[0])); | |
241 if(w == nil) | |
242 return -1; | |
243 memset(w+mux->mwait, 0, (mw-mux->mwait)*… | |
244 mux->wait = w; | |
245 mux->freetag = mux->mwait; | |
246 mux->mwait = mw; | |
247 break; | |
248 } | |
249 rsleep(&mux->tagrend); | |
250 } | |
251 | |
252 i=mux->freetag; | |
253 if(mux->wait[i] == 0) | |
254 goto Found; | |
255 for(; i<mux->mwait; i++) | |
256 if(mux->wait[i] == 0) | |
257 goto Found; | |
258 for(i=0; i<mux->freetag; i++) | |
259 if(mux->wait[i] == 0) | |
260 goto Found; | |
261 /* should not fall out of while without free tag */ | |
262 fprint(2, "libfs: nwait botch\n"); | |
263 abort(); | |
264 } | |
265 | |
266 Found: | |
267 mux->nwait++; | |
268 mux->wait[i] = r; | |
269 r->tag = i+mux->mintag; | |
270 return r->tag; | |
271 } | |
272 | |
273 static void | |
274 puttag(Mux *mux, Muxrpc *r) | |
275 { | |
276 int i; | |
277 | |
278 i = r->tag - mux->mintag; | |
279 assert(mux->wait[i] == r); | |
280 mux->wait[i] = nil; | |
281 mux->nwait--; | |
282 mux->freetag = i; | |
283 rwakeup(&mux->tagrend); | |
284 free(r); | |
285 } |