--- /n/sources/plan9/sys/src/mkfile Fri May 9 22:24:50 2008 +++ /sys/src/mkfile Wed Nov 9 00:00:00 2011 @@ -24,6 +25,7 @@ libmemdraw\ libmemlayer\ libmp\ + libmux\ libndb\ liboventi\ libplumb\ diff -Nru /sys/src/libmux/COPYRIGHT /sys/src/libmux/COPYRIGHT --- /sys/src/libmux/COPYRIGHT Thu Jan 1 00:00:00 1970 +++ /sys/src/libmux/COPYRIGHT Wed Nov 9 00:00:00 2011 @@ -0,0 +1,27 @@ + +This software was developed as part of a project at MIT: + /sys/src/libmux/* + /sys/include/mux.h + +Copyright (c) 2003 Russ Cox, + Massachusetts Institute of Technology + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + diff -Nru /sys/src/libmux/io.c /sys/src/libmux/io.c --- /sys/src/libmux/io.c Thu Jan 1 00:00:00 1970 +++ /sys/src/libmux/io.c Wed Nov 9 00:00:00 2011 @@ -0,0 +1,145 @@ +/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ +/* See COPYRIGHT */ + +#include +#include +#include + +/* + * If you fork off two procs running muxrecvproc and muxsendproc, + * then muxrecv/muxsend (and thus muxrpc) will never block except on + * rendevouses, which is nice when it's running in one thread of many. + */ +void +_muxrecvproc(void *v) +{ + void *p; + Mux *mux; + Muxqueue *q; + + mux = v; + q = _muxqalloc(); + + qlock(&mux->lk); + mux->readq = q; + qlock(&mux->inlk); + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); + + while((p = mux->recv(mux)) != nil) + if(_muxqsend(q, p) < 0){ + free(p); + break; + } + qunlock(&mux->inlk); + qlock(&mux->lk); + _muxqhangup(q); + p = nil; + while(_muxnbqrecv(q, &p) && p != nil){ + free(p); + p = nil; + } + free(q); + mux->readq = nil; + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); +} + +void +_muxsendproc(void *v) +{ + Muxqueue *q; + void *p; + Mux *mux; + + mux = v; + q = _muxqalloc(); + + qlock(&mux->lk); + mux->writeq = q; + qlock(&mux->outlk); + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); + + while((p = _muxqrecv(q)) != nil) + if(mux->send(mux, p) < 0) + break; + qunlock(&mux->outlk); + qlock(&mux->lk); + _muxqhangup(q); + while(_muxnbqrecv(q, &p)) + free(p); + free(q); + mux->writeq = nil; + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); + return; +} + +int +_muxrecv(Mux *mux, int canblock, void **vp) +{ + void *p; + int ret; + + qlock(&mux->lk); + if(mux->readq){ + qunlock(&mux->lk); + if(canblock){ + *vp = _muxqrecv(mux->readq); + return 1; + } + return _muxnbqrecv(mux->readq, vp); + } + + qlock(&mux->inlk); + qunlock(&mux->lk); + if(canblock){ + p = mux->recv(mux); + ret = 1; + }else{ + if(mux->nbrecv) + ret = mux->nbrecv(mux, &p); + else{ + /* send eof, not "no packet ready" */ + p = nil; + ret = 1; + } + } + qunlock(&mux->inlk); + *vp = p; + return ret; +} + +int +_muxsend(Mux *mux, void *p) +{ + qlock(&mux->lk); +/* + if(mux->state != VtStateConnected){ + packetfree(p); + werrstr("not connected"); + qunlock(&mux->lk); + return -1; + } +*/ + if(mux->writeq){ + qunlock(&mux->lk); + if(_muxqsend(mux->writeq, p) < 0){ + free(p); + return -1; + } + return 0; + } + + qlock(&mux->outlk); + qunlock(&mux->lk); + if(mux->send(mux, p) < 0){ + qunlock(&mux->outlk); + /* vthangup(mux); */ + return -1; + } + qunlock(&mux->outlk); + return 0; +} + diff -Nru /sys/src/libmux/mkfile /sys/src/libmux/mkfile --- /sys/src/libmux/mkfile Thu Jan 1 00:00:00 1970 +++ /sys/src/libmux/mkfile Wed Nov 9 00:00:00 2011 @@ -0,0 +1,15 @@ + +#include +#include + +static int gettag(Mux*, Muxrpc*); +static void puttag(Mux*, Muxrpc*); +static void enqueue(Mux*, Muxrpc*); +static void dequeue(Mux*, Muxrpc*); + +void +muxinit(Mux *mux) +{ + memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk)); + mux->tagrend.l = &mux->lk; + mux->rpcfork.l = &mux->lk; + mux->sleep.next = &mux->sleep; + mux->sleep.prev = &mux->sleep; +} + +static Muxrpc* +allocmuxrpc(Mux *mux) +{ + Muxrpc *r; + + /* must malloc because stack could be private */ + r = mallocz(sizeof(Muxrpc), 1); + if(r == nil){ + werrstr("mallocz: %r"); + return nil; + } + r->mux = mux; + r->r.l = &mux->lk; + r->waiting = 1; + + return r; +} + +static int +tagmuxrpc(Muxrpc *r, void *tx) +{ + int tag; + Mux *mux; + + mux = r->mux; + /* assign the tag, add selves to response queue */ + qlock(&mux->lk); + tag = gettag(mux, r); +/*print("gettag %p %d\n", r, tag); */ + enqueue(mux, r); + qunlock(&mux->lk); + + /* actually send the packet */ + if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){ + werrstr("settag/send tag %d: %r", tag); + fprint(2, "%r\n"); + qlock(&mux->lk); + dequeue(mux, r); + puttag(mux, r); + qunlock(&mux->lk); + return -1; + } + return 0; +} + +void +muxmsgandqlock(Mux *mux, void *p) +{ + int tag; + Muxrpc *r2; + + tag = mux->gettag(mux, p) - mux->mintag; +/*print("mux tag %d\n", tag); */ + qlock(&mux->lk); + /* hand packet to correct sleeper */ + if(tag < 0 || tag >= mux->mwait){ + fprint(2, "%s: bad rpc tag %ux\n", argv0, tag); + /* must leak packet! don't know how to free it! */ + return; + } + r2 = mux->wait[tag]; + if(r2 == nil || r2->prev == nil){ + fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag); + /* must leak packet! don't know how to free it! */ + return; + } + r2->p = p; + dequeue(mux, r2); + rwakeup(&r2->r); +} + +void +electmuxer(Mux *mux) +{ + Muxrpc *rpc; + + /* if there is anyone else sleeping, wake them to mux */ + for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){ + if(!rpc->async){ + mux->muxer = rpc; + rwakeup(&rpc->r); + return; + } + } + mux->muxer = nil; +} + +void* +muxrpc(Mux *mux, void *tx) +{ + Muxrpc *r; + void *p; + + if((r = allocmuxrpc(mux)) == nil) + return nil; + + if(tagmuxrpc(r, tx) < 0) + return nil; + + qlock(&mux->lk); + /* wait for our packet */ + while(mux->muxer && mux->muxer != r && !r->p) + rsleep(&r->r); + + /* if not done, there's no muxer: start muxing */ + if(!r->p){ + if(mux->muxer != nil && mux->muxer != r) + abort(); + mux->muxer = r; + while(!r->p){ + qunlock(&mux->lk); + _muxrecv(mux, 1, &p); + if(p == nil){ + /* eof -- just give up and pass the buck */ + qlock(&mux->lk); + dequeue(mux, r); + break; + } + muxmsgandqlock(mux, p); + } + electmuxer(mux); + } + p = r->p; + puttag(mux, r); + qunlock(&mux->lk); + if(p == nil) + werrstr("unexpected eof"); + return p; +} + +Muxrpc* +muxrpcstart(Mux *mux, void *tx) +{ + Muxrpc *r; + + if((r = allocmuxrpc(mux)) == nil) + return nil; + r->async = 1; + if(tagmuxrpc(r, tx) < 0) + return nil; + return r; +} + +int +muxrpccanfinish(Muxrpc *r, void **vp) +{ + void *p; + Mux *mux; + int ret; + + mux = r->mux; + qlock(&mux->lk); + ret = 1; + if(!r->p && !mux->muxer){ + mux->muxer = r; + while(!r->p){ + qunlock(&mux->lk); + p = nil; + if(!_muxrecv(mux, 0, &p)) + ret = 0; + if(p == nil){ + qlock(&mux->lk); + break; + } + muxmsgandqlock(mux, p); + } + electmuxer(mux); + } + p = r->p; + if(p) + puttag(mux, r); + qunlock(&mux->lk); + *vp = p; + return ret; +} + +static void +enqueue(Mux *mux, Muxrpc *r) +{ + r->next = mux->sleep.next; + r->prev = &mux->sleep; + r->next->prev = r; + r->prev->next = r; +} + +static void +dequeue(Mux *mux, Muxrpc *r) +{ + USED(mux); + r->next->prev = r->prev; + r->prev->next = r->next; + r->prev = nil; + r->next = nil; +} + +static int +gettag(Mux *mux, Muxrpc *r) +{ + int i, mw; + Muxrpc **w; + + for(;;){ + /* wait for a free tag */ + while(mux->nwait == mux->mwait){ + if(mux->mwait < mux->maxtag-mux->mintag){ + mw = mux->mwait; + if(mw == 0) + mw = 1; + else + mw <<= 1; + w = realloc(mux->wait, mw*sizeof(w[0])); + if(w == nil) + return -1; + memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0])); + mux->wait = w; + mux->freetag = mux->mwait; + mux->mwait = mw; + break; + } + rsleep(&mux->tagrend); + } + + i=mux->freetag; + if(mux->wait[i] == 0) + goto Found; + for(; imwait; i++) + if(mux->wait[i] == 0) + goto Found; + for(i=0; ifreetag; i++) + if(mux->wait[i] == 0) + goto Found; + /* should not fall out of while without free tag */ + fprint(2, "libfs: nwait botch\n"); + abort(); + } + +Found: + mux->nwait++; + mux->wait[i] = r; + r->tag = i+mux->mintag; + return r->tag; +} + +static void +puttag(Mux *mux, Muxrpc *r) +{ + int i; + + i = r->tag - mux->mintag; + assert(mux->wait[i] == r); + mux->wait[i] = nil; + mux->nwait--; + mux->freetag = i; + rwakeup(&mux->tagrend); + free(r); +} diff -Nru /sys/src/libmux/queue.c /sys/src/libmux/queue.c --- /sys/src/libmux/queue.c Thu Jan 1 00:00:00 1970 +++ /sys/src/libmux/queue.c Wed Nov 9 00:00:00 2011 @@ -0,0 +1,112 @@ +/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ +/* See COPYRIGHT */ + +#include +#include +#include + +typedef struct Qel Qel; +struct Qel +{ + Qel *next; + void *p; +}; + +struct Muxqueue +{ + int hungup; + QLock lk; + Rendez r; + Qel *head; + Qel *tail; +}; + +Muxqueue* +_muxqalloc(void) +{ + Muxqueue *q; + + q = mallocz(sizeof(Muxqueue), 1); + if(q == nil) + return nil; + q->r.l = &q->lk; + return q; +} + +int +_muxqsend(Muxqueue *q, void *p) +{ + Qel *e; + + e = malloc(sizeof(Qel)); + if(e == nil) + return -1; + qlock(&q->lk); + if(q->hungup){ + werrstr("hungup queue"); + qunlock(&q->lk); + free(e); + return -1; + } + e->p = p; + e->next = nil; + if(q->head == nil) + q->head = e; + else + q->tail->next = e; + q->tail = e; + rwakeup(&q->r); + qunlock(&q->lk); + return 0; +} + +void* +_muxqrecv(Muxqueue *q) +{ + void *p; + Qel *e; + + qlock(&q->lk); + while(q->head == nil && !q->hungup) + rsleep(&q->r); + if(q->hungup){ + qunlock(&q->lk); + return nil; + } + e = q->head; + q->head = e->next; + qunlock(&q->lk); + p = e->p; + free(e); + return p; +} + +int +_muxnbqrecv(Muxqueue *q, void **vp) +{ + void *p; + Qel *e; + + qlock(&q->lk); + if(q->head == nil){ + qunlock(&q->lk); + *vp = nil; + return q->hungup; + } + e = q->head; + q->head = e->next; + qunlock(&q->lk); + p = e->p; + free(e); + *vp = p; + return 1; +} + +void +_muxqhangup(Muxqueue *q) +{ + qlock(&q->lk); + q->hungup = 1; + rwakeupall(&q->r); + qunlock(&q->lk); +} diff -Nru /sys/src/libmux/thread.c /sys/src/libmux/thread.c --- /sys/src/libmux/thread.c Thu Jan 1 00:00:00 1970 +++ /sys/src/libmux/thread.c Wed Nov 9 00:00:00 2011 @@ -0,0 +1,27 @@ +/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ +/* See COPYRIGHT */ + +#include +#include +#include +#include + +enum +{ + STACK = 32768 +}; + +void +muxprocs(Mux *mux) +{ + proccreate(_muxrecvproc, mux, STACK); + qlock(&mux->lk); + while(!mux->readq) + rsleep(&mux->rpcfork); + qunlock(&mux->lk); + proccreate(_muxsendproc, mux, STACK); + qlock(&mux->lk); + while(!mux->writeq) + rsleep(&mux->rpcfork); + qunlock(&mux->lk); +} --- /sys/include/mux.h Thu Jan 1 00:00:00 1970 +++ /sys/include/mux.h Wed Nov 9 00:00:00 2011 @@ -0,0 +1,59 @@ +typedef struct Mux Mux; +typedef struct Muxrpc Muxrpc; +typedef struct Muxqueue Muxqueue; + +struct Muxrpc +{ + Mux *mux; + Muxrpc *next; + Muxrpc *prev; + Rendez r; + uint tag; + void *p; + int waiting; + int async; +}; + +struct Mux +{ + uint mintag; /* to be filled by client */ + uint maxtag; + int (*send)(Mux*, void*); + void *(*recv)(Mux*); + int (*nbrecv)(Mux*, void**); + int (*gettag)(Mux*, void*); + int (*settag)(Mux*, void*, uint); + void *aux; /* for private use by client */ + +/* private */ + QLock lk; /* must be first for muxinit */ + QLock inlk; + QLock outlk; + Rendez tagrend; + Rendez rpcfork; + Muxqueue *readq; + Muxqueue *writeq; + uint nwait; + uint mwait; + uint freetag; + Muxrpc **wait; + Muxrpc *muxer; + Muxrpc sleep; +}; + +void muxinit(Mux*); +void* muxrpc(Mux*, void*); +void muxprocs(Mux*); +Muxrpc* muxrpcstart(Mux*, void*); +int muxrpccanfinish(Muxrpc*, void**); + +/* private */ +int _muxsend(Mux*, void*); +int _muxrecv(Mux*, int, void**); +void _muxsendproc(void*); +void _muxrecvproc(void*); +Muxqueue *_muxqalloc(void); +int _muxqsend(Muxqueue*, void*); +void *_muxqrecv(Muxqueue*); +void _muxqhangup(Muxqueue*); +int _muxnbqrecv(Muxqueue*, void**); --- /sys/man/2/mux Thu Jan 1 00:00:00 1970 +++ /sys/man/2/mux Wed Nov 9 00:00:00 2011 @@ -0,0 +1,185 @@ +.TH MUX 2 +.SH NAME +Mux, muxinit, muxrpc, muxthreads \- protocol multiplexor +.SH SYNOPSIS +.B #include +.PP +.nf +.B +.ta +4n +.ft B +struct Mux +{ + uint mintag; + uint maxtag; + int (*settag)(Mux *mux, void *msg, uint tag); + int (*gettag)(Mux *mux, void *msg); + int (*send)(Mux *mux, void *msg); + void *(*recv)(Mux *mux); + void *(*nbrecv)(Mux *mux); + void *aux; + + \&... /* private fields follow */ +}; +.ta +\w'\fLvoid* 'u +.PP +.B +void muxinit(Mux *mux); +.PP +.B +void* muxrpc(Mux *mux, void *request); +.PP +.B +void muxprocs(Mux *mux); +.PP +.B +Muxrpc* muxrpcstart(Mux *mux, void *request); +.PP +.B +void* muxrpccanfinish(Muxrpc *rpc); +.SH DESCRIPTION +.I Libmux +is a generic protocol multiplexor. +A client program initializes a +.B Mux +structure with information about the protocol +(mainly in the form of helper functions) +and can then use +.I muxrpc +to execute individual RPCs without worrying +about details of multiplexing requests +and demultiplexing responses. +.PP +.I Libmux +assumes that the protocol messages contain a +.I tag +(or message ID) field that exists for the sole purpose of demultiplexing messages. +.I Libmux +chooses the tags and then calls a helper function +to put them in the outgoing messages. +.I Libmux +calls another helper function to retrieve tags +from incoming messages. +It also calls helper functions to send and receive packets. +.PP +A client should allocate a +.B Mux +structure and then call +.I muxinit +to initialize the library's private elements. +The client must initialize the following elements: +.TP +.I mintag\fR, \fPmaxtag +The range of valid tags; +.I maxtag +is the maximum valid tag plus one, so that +.IR maxtag \- mintag +is equal to the number of valid tags. +If +.I libmux +runs out of tags +(all tags are being used for RPCs currently in progress), +a new call to +.IR muxrpc +will block until an executing call finishes. +.TP +.I settag\fR, \fPgettag +Set or get the tag value in a message. +.TP +.I send\fR, \fPrecv\fR, \fPnbrecv +Send or receive protocol messages on the connection. +.I Recv +should block until a message is available and +should return nil if the connection is closed. +.I Nbrecv +should not block; it returns nil if there is no +message available to be read. +.I Libmux +will arrange that only one call to +.I recv +or +.I nbrecv +is active at a time. +.TP +.I aux +An auxiliary pointer for use by the client. +.PD +Once a client has initialized the +.B Mux +structure, it can call +.I muxrpc +to execute RPCs. +The +.I request +is the message passed to +.I settag +and +.IR send . +The return value is the response packet, +as provided by +.IR recv , +or +nil if an error occurred. +.I Muxprocs +allocates new procs +(see +.IR thread (2)) +in which to run +.I send +and +.IR recv . +After a call to +.IR muxprocs , +.I muxrpc +will run +.I send +and +.I recv +in these procs instead of in the calling proc. +This is useful if the implementation of +either (particularly +.IR recv ) +blocks an entire proc +and there are other threads in the calling proc +that need to remain active. +.PP +.I Libmux +also provides a non-blocking interface, useful for programs forced +to use a +.IR select (2)-based +main loop. +.I Muxrpcstart +runs the first half of +.IR muxrpc : +it assigns a tag and sends the request, +but does not wait for the reply. +Instead it returns a pointer to a +.B Muxrpc +structure that represents the in-progress call. +.I Muxrpccanfinish +checks whether the given call +can finish. +If no mux procs have been started, +.I muxrpccanfinish +may call +.I nbrecv +to poll for newly arrived responses. +.SH EXAMPLE +See +.B /sys/src/lib9pclient/fs.c +for an example of using +.I libmux +with +9P +(see +.IR intro (5)). +.SH SOURCE +.B /sys/src/libmux +.SH SEE ALSO +.IR thread (2), +.IR intro (5) +.SH BUGS +.I Libmux +does not know how to free protocol messages, +so message arriving with unexpected or invalid +tags are leaked.