int len; /* bytes allocated to queue */
int dlen; /* data bytes in queue */
int limit; /* max bytes in queue */
int inilim; /* initial limit */
int state;
int noblock; /* true if writes return immediately when q full */
int eof; /* number of eofs read by user */
/*
* make sure the first block has at least n bytes
*/
Block*
pullupblock(Block *bp, int n)
{
int i;
Block *nbp;
/*
* this should almost always be true, it's
* just to avoid every caller checking.
*/
if(BLEN(bp) >= n)
return bp;
/*
* if not enough room in the first block,
* add another to the front of the list.
*/
if(bp->lim - bp->rp < n){
nbp = allocb(n);
nbp->next = bp;
bp = nbp;
}
/*
* copy bytes from the trailing blocks into the first
*/
n -= BLEN(bp);
while(nbp = bp->next){
i = BLEN(nbp);
if(i > n) {
memmove(bp->wp, nbp->rp, n);
pullupblockcnt++;
bp->wp += n;
nbp->rp += n;
QDEBUG checkb(bp, "pullupblock 1");
return bp;
}
else {
memmove(bp->wp, nbp->rp, i);
pullupblockcnt++;
bp->wp += i;
bp->next = nbp->next;
nbp->next = 0;
freeb(nbp);
n -= i;
if(n == 0){
QDEBUG checkb(bp, "pullupblock 2");
return bp;
}
}
}
freeb(bp);
return 0;
}
/*
* make sure the first block has at least n bytes
*/
Block*
pullupqueue(Queue *q, int n)
{
Block *b;
/*
* throw away up to count bytes from a
* list of blocks. Return count of bytes
* thrown away.
*/
int
pullblock(Block **bph, int count)
{
Block *bp;
int n, bytes;
/*
* Interrupt level copy out of a queue, return # bytes copied.
*/
int
qconsume(Queue *q, void *vp, int len)
{
Block *b;
int n, dowakeup;
uchar *p = vp;
Block *tofree = nil;
/*
* if the allocated space is way out of line with the used
* space, reallocate to a smaller block
*/
Block*
packblock(Block *bp)
{
Block **l, *nbp;
int n;
/*
* wait for the queue to be non-empty or closed.
* called with q ilocked.
*/
static int
qwait(Queue *q)
{
/* wait for data */
for(;;){
if(q->bfirst != nil)
break;
/*
* copy the contents of a string of blocks into
* memory. emptied blocks are freed. return
* pointer to first unconsumed block.
*/
Block*
bl2mem(uchar *p, Block *b, int n)
{
int i;
Block *next;
for(; b != nil; b = next){
i = BLEN(b);
if(i > n){
memmove(p, b->rp, n);
b->rp += n;
return b;
}
memmove(p, b->rp, i);
n -= i;
p += i;
b->rp += i;
next = b->next;
freeb(b);
}
return nil;
}
/*
* copy the contents of memory into a string of blocks.
* return nil on error.
*/
Block*
mem2bl(uchar *p, int len)
{
int n;
Block *b, *first, **l;
first = nil;
l = &first;
if(waserror()){
freeblist(first);
nexterror();
}
do {
n = len;
if(n > Maxatomic)
n = Maxatomic;
*l = b = allocb(n);
setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
memmove(b->wp, p, n);
b->wp += n;
p += n;
len -= n;
l = &b->next;
} while(len > 0);
poperror();
return first;
}
/*
* put a block back to the front of the queue
* called with q ilocked
*/
void
qputback(Queue *q, Block *b)
{
b->next = q->bfirst;
if(q->bfirst == nil)
q->blast = b;
q->bfirst = b;
q->len += BALLOC(b);
q->dlen += BLEN(b);
}
/*
* flow control, get producer going again
* called with q ilocked
*/
static void
qwakeup_iunlock(Queue *q)
{
int dowakeup = 0;
ilock(q);
switch(qwait(q)){
case 0:
/* queue closed */
iunlock(q);
qunlock(&q->rlock);
poperror();
return nil;
case -1:
/* multiple reads on a closed queue */
iunlock(q);
error(q->err);
}
/* if we get here, there's at least one block in the queue */
b = qremove(q);
n = BLEN(b);
/* split block if it's too big and this is not a message queue */
nb = b;
if(n > len){
if((q->state&Qmsg) == 0){
n -= len;
b = allocb(n);
memmove(b->wp, nb->rp+len, n);
b->wp += n;
qputback(q, b);
}
nb->wp = nb->rp + len;
}
/* restart producer */
qwakeup_iunlock(q);
poperror();
qunlock(&q->rlock);
return nb;
}
/*
* read a queue. if no data is queued, post a Block
* and wait on its Rendez.
*/
long
qread(Queue *q, void *vp, int len)
{
Block *b, *first, **l;
int m, n;
ilock(q);
again:
switch(qwait(q)){
case 0:
/* queue closed */
iunlock(q);
qunlock(&q->rlock);
poperror();
return 0;
case -1:
/* multiple reads on a closed queue */
iunlock(q);
error(q->err);
}
/* if we get here, there's at least one block in the queue */
if(q->state & Qcoalesce){
/* when coalescing, 0 length blocks just go away */
b = q->bfirst;
if(BLEN(b) <= 0){
freeb(qremove(q));
goto again;
}
/* grab the first block plus as many
* following blocks as will completely
* fit in the read.
*/
n = 0;
l = &first;
m = BLEN(b);
for(;;) {
*l = qremove(q);
l = &b->next;
n += m;
b = q->bfirst;
if(b == nil)
break;
m = BLEN(b);
if(n+m > len)
break;
}
} else {
first = qremove(q);
n = BLEN(first);
}
/* copy to user space outside of the ilock */
iunlock(q);
b = bl2mem(vp, first, len);
ilock(q);
/* take care of any left over partial block */
if(b != nil){
n -= BLEN(b);
if(q->state & Qmsg)
freeb(b);
else
qputback(q, b);
}
/* make sure other end gets awakened */
if(q->state & Qstarve){
q->state &= ~Qstarve;
dowakeup = 1;
}
iunlock(q);
if(dowakeup){
if(q->kick)
q->kick(q->arg);
p = wakeup(&q->rr);
/* if we just wokeup a higher priority process, let it run */
if(p != nil && p->priority > up->priority)
sched();
}
/*
* flow control, wait for queue to get below the limit
* before allowing the process to continue and queue
* more. We do this here so that postnote can only
* interrupt us after the data has been queued. This
* means that things like 9p flushes and ssl messages
* will not be disrupted by software interrupts.
*
* Note - this is moderately dangerous since a process
* that keeps getting interrupted and rewriting will
* queue infinite crud.
*/
for(;;){
if(q->noblock || qnotfull(q))
break;
/*
* used by print() to write to a queue. Since we may be splhi or not in
* a process, don't qlock.
*/
int
qiwrite(Queue *q, void *vp, int len)
{
int n, sofar, dowakeup;
Block *b;
uchar *p = vp;
dowakeup = 0;
sofar = 0;
do {
n = len-sofar;
if(n > Maxatomic)
n = Maxatomic;