/*
* Copyright (c) 2007-2011 Mindaugas Rasiukevicius <rmind at NetBSD org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
/*
* Implementation of POSIX message queues.
* Defined in the Base Definitions volume of IEEE Std 1003.1-2001.
*
* Locking
*
* Global list of message queues (mqueue_head) is protected by mqlist_lock.
* Each message queue and its members are protected by mqueue::mq_mtx.
* Note that proc_t::p_mqueue_cnt is updated atomically.
*
* Lock order:
*
* mqlist_lock ->
* mqueue::mq_mtx
*/
static int mqueue_sysinit(void);
static int mqueue_sysfini(bool);
static int mq_poll_fop(file_t *, int);
static int mq_stat_fop(file_t *, struct stat *);
static int mq_close_fop(file_t *);
/* Note MQ_PQSIZE + 1. */
for (i = 0; i <= MQ_PQSIZE; i++) {
while ((msg = TAILQ_FIRST(&mq->mq_head[i])) != NULL) {
TAILQ_REMOVE(&mq->mq_head[i], msg, msg_queue);
msz = sizeof(struct mq_msg) + msg->msg_len;
mqueue_freemsg(msg, msz);
}
}
if (mq->mq_name) {
kmem_free(mq->mq_name, MQ_NAMELEN);
}
seldestroy(&mq->mq_rsel);
seldestroy(&mq->mq_wsel);
cv_destroy(&mq->mq_send_cv);
cv_destroy(&mq->mq_recv_cv);
mutex_destroy(&mq->mq_mtx);
kmem_free(mq, sizeof(struct mqueue));
}
/*
* mqueue_lookup: lookup for file name in general list of message queues.
*
* => locks the message queue on success
*/
static mqueue_t *
mqueue_lookup(const char *name)
{
mqueue_t *mq;
/*
* mqueue_get: get the mqueue from the descriptor.
*
* => locks the message queue, if found.
* => holds a reference on the file descriptor.
*/
int
mqueue_get(mqd_t mqd, int fflag, mqueue_t **mqret)
{
const int fd = (int)mqd;
mqueue_t *mq;
file_t *fp;
/*
* mqueue_linear_insert: perform linear insert according to the message
* priority into the reserved queue (MQ_PQRESQ). Reserved queue is a
* sorted list used only when mq_prio_max is increased via sysctl.
*/
static inline void
mqueue_linear_insert(struct mqueue *mq, struct mq_msg *msg)
{
struct mq_msg *mit;
static int
mq_poll_fop(file_t *fp, int events)
{
struct mqueue *mq = fp->f_mqueue;
struct mq_attr *mqattr;
int revents = 0;
mutex_enter(&mq->mq_mtx);
mqattr = &mq->mq_attrib;
if (events & (POLLIN | POLLRDNORM)) {
/* Ready for receiving, if there are messages in the queue. */
if (mqattr->mq_curmsgs)
revents |= events & (POLLIN | POLLRDNORM);
else
selrecord(curlwp, &mq->mq_rsel);
}
if (events & (POLLOUT | POLLWRNORM)) {
/* Ready for sending, if the message queue is not full. */
if (mqattr->mq_curmsgs < mqattr->mq_maxmsg)
revents |= events & (POLLOUT | POLLWRNORM);
else
selrecord(curlwp, &mq->mq_wsel);
}
mutex_exit(&mq->mq_mtx);
/*
* Allocate new message queue, initialize data structures, copy the
* name attributes. Note that the initial reference is set here.
*/
mq = kmem_zalloc(sizeof(mqueue_t), KM_SLEEP);
mutex_init(&mq->mq_mtx, MUTEX_DEFAULT, IPL_NONE);
cv_init(&mq->mq_send_cv, "mqsendcv");
cv_init(&mq->mq_recv_cv, "mqrecvcv");
for (i = 0; i < (MQ_PQSIZE + 1); i++) {
TAILQ_INIT(&mq->mq_head[i]);
}
selinit(&mq->mq_rsel);
selinit(&mq->mq_wsel);
mq->mq_name = name;
mq->mq_refcnt = 1;
/* Store mode and effective UID with GID. */
mq->mq_mode = ((mode & ~cwdi->cwdi_cmask) & ALLPERMS) & ~S_ISTXT;
mq->mq_euid = kauth_cred_geteuid(l->l_cred);
mq->mq_egid = kauth_cred_getegid(l->l_cred);
*mqret = mq;
return 0;
}
/*
* Helper function for mq_open() - note that "u_name" is a userland pointer,
* while "attr" is a kernel pointer!
*/
int
mq_handle_open(struct lwp *l, const char *u_name, int oflag, mode_t mode,
struct mq_attr *attr, register_t *retval)
{
struct proc *p = l->l_proc;
struct mqueue *mq, *mq_new = NULL;
int mqd, error;
file_t *fp;
char *name;
/* Get the name from the user-space. */
name = kmem_alloc(MQ_NAMELEN, KM_SLEEP);
error = copyinstr(u_name, name, MQ_NAMELEN - 1, NULL);
if (error) {
kmem_free(name, MQ_NAMELEN);
return error;
}
/* Allocate file structure and descriptor. */
error = fd_allocfile(&fp, &mqd);
if (error) {
kmem_free(name, MQ_NAMELEN);
return error;
}
/* Account and check for the limit. */
if (atomic_inc_uint_nv(&p->p_mqueue_cnt) > mq_open_max) {
atomic_dec_uint(&p->p_mqueue_cnt);
error = EMFILE;
goto err;
}
if (oflag & O_CREAT) {
/* Create a new message queue. */
error = mqueue_create(l, name, attr, mode, oflag, &mq_new);
if (error) {
goto err;
}
KASSERT(mq_new != NULL);
}
/* Lookup for a message queue with such name. */
mutex_enter(&mqlist_lock);
mq = mqueue_lookup(name);
if (mq) {
KASSERT(mutex_owned(&mq->mq_mtx));
mutex_exit(&mqlist_lock);
/* Check for exclusive create. */
if (oflag & O_EXCL) {
mutex_exit(&mq->mq_mtx);
error = EEXIST;
goto err;
}
/* If we have the access, add a new reference. */
mq->mq_refcnt++;
mutex_exit(&mq->mq_mtx);
} else {
/* Fail if not found and not creating. */
if ((oflag & O_CREAT) == 0) {
mutex_exit(&mqlist_lock);
KASSERT(mq_new == NULL);
error = ENOENT;
goto err;
}
/*
* Finally, insert message queue into the list.
* Note: it already has the initial reference.
*/
LIST_INSERT_HEAD(&mqueue_head, mq, mq_list);
mutex_exit(&mqlist_lock);
mq_new = NULL;
name = NULL;
}
KASSERT(mq != NULL);
fp->f_mqueue = mq;
fd_affix(p, fp, mqd);
*retval = mqd;
err:
if (error) {
fd_abort(p, fp, mqd);
}
if (mq_new) {
/* Note: will free the 'name'. */
mqueue_destroy(mq_new);
} else if (name) {
kmem_free(name, MQ_NAMELEN);
}
return error;
}
/* Check the message size limits */
if (msg_len < mqattr->mq_msgsize) {
error = EMSGSIZE;
goto error;
}
/* Check if queue is empty */
while (mqattr->mq_curmsgs == 0) {
int t;
if (mqattr->mq_flags & O_NONBLOCK) {
error = EAGAIN;
goto error;
}
if (ts) {
error = ts2timo(CLOCK_REALTIME, TIMER_ABSTIME, ts, &t,
NULL);
if (error)
goto error;
} else
t = 0;
/*
* Block until someone sends the message.
* While doing this, notification should not be sent.
*/
mqattr->mq_flags |= MQ_RECEIVE;
error = cv_timedwait_sig(&mq->mq_send_cv, &mq->mq_mtx, t);
mqattr->mq_flags &= ~MQ_RECEIVE;
if (error || (mqattr->mq_flags & MQ_UNLINKED)) {
error = (error == EWOULDBLOCK) ? ETIMEDOUT : EINTR;
goto error;
}
}
/*
* Find the highest priority message, and remove it from the queue.
* At first, reserved queue is checked, bitmap is next.
*/
msg = TAILQ_FIRST(&mq->mq_head[MQ_PQRESQ]);
if (__predict_true(msg == NULL)) {
idx = ffs(mq->mq_bitmap);
msg = TAILQ_FIRST(&mq->mq_head[idx]);
KASSERT(msg != NULL);
} else {
idx = MQ_PQRESQ;
}
TAILQ_REMOVE(&mq->mq_head[idx], msg, msg_queue);
/* Unmark the bit, if last message. */
if (__predict_true(idx) && TAILQ_EMPTY(&mq->mq_head[idx])) {
KASSERT((MQ_PQSIZE - idx) == msg->msg_prio);
mq->mq_bitmap &= ~(1U << --idx);
}
/* Decrement the counter and signal waiter, if any */
mqattr->mq_curmsgs--;
cv_signal(&mq->mq_recv_cv);
/* Ready for sending now */
selnotify(&mq->mq_wsel, POLLOUT | POLLWRNORM, 0);
error:
mutex_exit(&mq->mq_mtx);
fd_putfile((int)mqdes);
if (error)
return error;
/*
* Copy the data to the user-space.
* Note: According to POSIX, no message should be removed from the
* queue in case of fail - this would be violated.
*/
*mlen = msg->msg_len;
error = copyout(msg->msg_ptr, msg_ptr, msg->msg_len);
if (error == 0 && msg_prio)
error = copyout(&msg->msg_prio, msg_prio, sizeof(unsigned));
mqueue_freemsg(msg, sizeof(struct mq_msg) + msg->msg_len);
/* Check if queue is full */
while (mqattr->mq_curmsgs >= mqattr->mq_maxmsg) {
int t;
if (mqattr->mq_flags & O_NONBLOCK) {
error = EAGAIN;
goto error;
}
if (ts) {
error = ts2timo(CLOCK_REALTIME, TIMER_ABSTIME, ts, &t,
NULL);
if (error)
goto error;
} else
t = 0;
/* Block until queue becomes available */
error = cv_timedwait_sig(&mq->mq_recv_cv, &mq->mq_mtx, t);
if (error || (mqattr->mq_flags & MQ_UNLINKED)) {
error = (error == EWOULDBLOCK) ? ETIMEDOUT : error;
goto error;
}
}
KASSERT(mqattr->mq_curmsgs < mqattr->mq_maxmsg);
/*
* Insert message into the queue, according to the priority.
* Note the difference between index and priority.
*/
if (__predict_true(msg_prio < MQ_PQSIZE)) {
u_int idx = MQ_PQSIZE - msg_prio;
/*
* Copy the data to the user-space.
* Note: According to POSIX, the new attributes should not be set in
* case of fail - this would be violated.
*/
if (SCARG(uap, omqstat))
error = copyout(&attr, SCARG(uap, omqstat),
sizeof(struct mq_attr));
/* Get the name from the user-space */
name = kmem_alloc(MQ_NAMELEN, KM_SLEEP);
error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN - 1, NULL);
if (error) {
kmem_free(name, MQ_NAMELEN);
return error;
}
/* Remove and destroy if no references. */
LIST_REMOVE(mq, mq_list);
refcnt = mq->mq_refcnt;
if (refcnt) {
/* Mark as unlinked, if there are references. */
mq->mq_attrib.mq_flags |= MQ_UNLINKED;
}
/* Wake up waiters, if there are any. */
cv_broadcast(&mq->mq_send_cv);
cv_broadcast(&mq->mq_recv_cv);
mutex_exit(&mq->mq_mtx);
err:
mutex_exit(&mqlist_lock);
/*
* If last reference - destroy the message queue. Otherwise,
* the last mq_close() call will do that.
*/
if (!error && refcnt == 0) {
mqueue_destroy(mq);
}
kmem_free(name, MQ_NAMELEN);
return error;
}
/*
* System control nodes.
*/
SYSCTL_SETUP(mqueue_sysctl_init, "mqueue systl")
{
const struct sysctlnode *node = NULL;
sysctl_createv(clog, 0, NULL, NULL,
CTLFLAG_PERMANENT|CTLFLAG_IMMEDIATE,
CTLTYPE_INT, "posix_msg",
SYSCTL_DESCR("Version of IEEE Std 1003.1 and its "
"Message Passing option to which the "
"system attempts to conform"),
NULL, _POSIX_MESSAGE_PASSING, NULL, 0,
CTL_KERN, CTL_CREATE, CTL_EOL);
sysctl_createv(clog, 0, NULL, &node,
CTLFLAG_PERMANENT,
CTLTYPE_NODE, "mqueue",
SYSCTL_DESCR("Message queue options"),
NULL, 0, NULL, 0,
CTL_KERN, CTL_CREATE, CTL_EOL);