/*
* Copyright (c) 2007 Mindaugas Rasiukevicius <rmind at NetBSD org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
/*
* Implementation of POSIX asynchronous I/O.
* Defined in the Base Definitions volume of IEEE Std 1003.1-2001.
*/
/* Initialize queue and their synchronization structures */
mutex_init(&aio->aio_mtx, MUTEX_DEFAULT, IPL_NONE);
cv_init(&aio->aio_worker_cv, "aiowork");
cv_init(&aio->done_cv, "aiodone");
TAILQ_INIT(&aio->jobs_queue);
/*
* Create an AIO worker thread.
* XXX: Currently, AIO thread is not protected against user's actions.
*/
uaddr = uvm_uarea_alloc();
if (uaddr == 0) {
aio_exit(p, aio);
return SET_ERROR(EAGAIN);
}
error = lwp_create(curlwp, p, uaddr, 0, NULL, 0, aio_worker,
NULL, &l, curlwp->l_class, &curlwp->l_sigmask, &curlwp->l_sigstk);
if (error != 0) {
uvm_uarea_free(uaddr);
aio_exit(p, aio);
return error;
}
/* Recheck if we are really first */
mutex_enter(p->p_lock);
if (p->p_aio) {
mutex_exit(p->p_lock);
aio_exit(p, aio);
lwp_exit(l);
return 0;
}
p->p_aio = aio;
/* Complete the initialization of thread, and run it */
aio->aio_worker = l;
lwp_lock(l);
lwp_changepri(l, MAXPRI_USER);
setrunnable(l);
/* LWP now unlocked */
mutex_exit(p->p_lock);
/*
* Make an empty signal mask, so it
* handles only SIGKILL and SIGSTOP.
*/
sigfillset(&nss);
mutex_enter(p->p_lock);
error = sigprocmask1(curlwp, SIG_SETMASK, &nss, &oss);
mutex_exit(p->p_lock);
KASSERT(error == 0);
for (;;) {
/*
* Loop for each job in the queue. If there
* are no jobs then sleep.
*/
mutex_enter(&aio->aio_mtx);
while ((a_job = TAILQ_FIRST(&aio->jobs_queue)) == NULL) {
if (cv_wait_sig(&aio->aio_worker_cv, &aio->aio_mtx)) {
/*
* Thread was interrupted - check for
* pending exit or suspend.
*/
mutex_exit(&aio->aio_mtx);
lwp_userret(curlwp);
mutex_enter(&aio->aio_mtx);
}
}
/* Take the job from the queue */
aio->curjob = a_job;
TAILQ_REMOVE(&aio->jobs_queue, a_job, list);
/* Non-accurate check for the limit */
if (aio_jobs_count + 1 > aio_max)
return SET_ERROR(EAGAIN);
/* Get the data structure from user-space */
error = copyin(aiocb_uptr, &aiocbp, sizeof(struct aiocb));
if (error)
return error;
/* Check if signal is set, and validate it */
sig = &aiocbp.aio_sigevent;
if (sig->sigev_signo < 0 || sig->sigev_signo >= NSIG ||
sig->sigev_notify < SIGEV_NONE || sig->sigev_notify > SIGEV_SA)
return SET_ERROR(EINVAL);
/* Buffer and byte count */
if (((AIO_SYNC | AIO_DSYNC) & op) == 0)
if (aiocbp.aio_buf == NULL || aiocbp.aio_nbytes > SSIZE_MAX)
return SET_ERROR(EINVAL);
/* Check the opcode, if LIO_NOP - simply ignore */
if (op == AIO_LIO) {
KASSERT(lio != NULL);
if (aiocbp.aio_lio_opcode == LIO_WRITE)
op = AIO_WRITE;
else if (aiocbp.aio_lio_opcode == LIO_READ)
op = AIO_READ;
else
return (aiocbp.aio_lio_opcode == LIO_NOP) ? 0 :
SET_ERROR(EINVAL);
} else {
KASSERT(lio == NULL);
}
/*
* Look for already existing job. If found - the job is in-progress.
* According to POSIX this is invalid, so return the error.
*/
aio = p->p_aio;
if (aio) {
mutex_enter(&aio->aio_mtx);
TAILQ_FOREACH(a_job, &aio->jobs_queue, list) {
if (a_job->aiocb_uptr != aiocb_uptr)
continue;
mutex_exit(&aio->aio_mtx);
return SET_ERROR(EINVAL);
}
mutex_exit(&aio->aio_mtx);
}
/*
* Check if AIO structure is initialized, if not - initialize it.
* In LIO case, we did that already. We will recheck this with
* the lock in aio_procinit().
*/
if (lio == NULL && p->p_aio == NULL)
if (aio_procinit(p))
return SET_ERROR(EAGAIN);
aio = p->p_aio;
/*
* Set the state with errno, and copy data
* structure back to the user-space.
*/
aiocbp._state = JOB_WIP;
aiocbp._errno = SET_ERROR(EINPROGRESS);
aiocbp._retval = -1;
error = copyout(&aiocbp, aiocb_uptr, sizeof(struct aiocb));
if (error)
return error;
/* Allocate and initialize a new AIO job */
a_job = pool_get(&aio_job_pool, PR_WAITOK | PR_ZERO);
/*
* Set the data.
* Store the user-space pointer for searching. Since we
* are storing only per proc pointers - it is safe.
*/
memcpy(&a_job->aiocbp, &aiocbp, sizeof(struct aiocb));
a_job->aiocb_uptr = aiocb_uptr;
a_job->aio_op |= op;
a_job->lio = lio;
/*
* Add the job to the queue, update the counters, and
* notify the AIO worker thread to handle the job.
*/
mutex_enter(&aio->aio_mtx);
/* Fail, if the limit was reached */
if (atomic_inc_uint_nv(&aio_jobs_count) > aio_max ||
aio->jobs_count >= aio_listio_max) {
atomic_dec_uint(&aio_jobs_count);
mutex_exit(&aio->aio_mtx);
pool_put(&aio_job_pool, a_job);
return SET_ERROR(EAGAIN);
}
TAILQ_INSERT_TAIL(&aio->jobs_queue, a_job, list);
aio->jobs_count++;
if (lio)
lio->refcnt++;
cv_signal(&aio->aio_worker_cv);
mutex_exit(&aio->aio_mtx);
/*
* One would handle the errors only with aio_error() function.
* This way is appropriate according to POSIX.
*/
return 0;
}
/* Cancel the jobs, and remove them from the queue */
cn = 0;
TAILQ_INIT(&tmp_jobs_list);
TAILQ_FOREACH(a_job, &aio->jobs_queue, list) {
if (aiocbp_ptr) {
if (aiocbp_ptr != a_job->aiocb_uptr)
continue;
if (fildes != a_job->aiocbp.aio_fildes) {
mutex_exit(&aio->aio_mtx);
return SET_ERROR(EBADF);
}
} else if (a_job->aiocbp.aio_fildes != fildes)
continue;
/* There are canceled jobs */
if (cn)
*retval = AIO_CANCELED;
/* We cannot cancel current job */
a_job = aio->curjob;
if (a_job && ((a_job->aiocbp.aio_fildes == fildes) ||
(a_job->aiocb_uptr == aiocbp_ptr)))
*retval = AIO_NOTCANCELED;
mutex_exit(&aio->aio_mtx);
/* Free the jobs after the lock */
errcnt = 0;
while (!TAILQ_EMPTY(&tmp_jobs_list)) {
a_job = TAILQ_FIRST(&tmp_jobs_list);
TAILQ_REMOVE(&tmp_jobs_list, a_job, list);
/* Set the errno and copy structures back to the user-space */
a_job->aiocbp._errno = SET_ERROR(ECANCELED);
a_job->aiocbp._state = JOB_DONE;
if (copyout(&a_job->aiocbp, a_job->aiocb_uptr,
sizeof(struct aiocb)))
errcnt++;
/* Send a signal if any */
aio_sendsig(p, &a_job->aiocbp.aio_sigevent);
if (a_job->lio) {
lio = a_job->lio;
aio_sendsig(p, &lio->sig);
pool_put(&aio_lio_pool, lio);
}
pool_put(&aio_job_pool, a_job);
}
if (errcnt)
return SET_ERROR(EFAULT);
/* Set a correct return value */
if (*retval == 0)
*retval = AIO_ALLDONE;
int
aio_suspend1(struct lwp *l, struct aiocb **aiocbp_list, int nent,
struct timespec *ts)
{
struct proc *p = l->l_proc;
struct aioproc *aio;
struct aio_job *a_job;
int i, error, timo;
if (p->p_aio == NULL)
return SET_ERROR(EAGAIN);
aio = p->p_aio;
if (ts) {
timo = mstohz((ts->tv_sec * 1000) + (ts->tv_nsec / 1000000));
if (timo == 0 && ts->tv_sec == 0 && ts->tv_nsec > 0)
timo = 1;
if (timo <= 0)
return SET_ERROR(EAGAIN);
} else
timo = 0;
mutex_enter(&aio->aio_mtx);
for (;;) {
for (i = 0; i < nent; i++) {
/* Skip NULL entries */
if (aiocbp_list[i] == NULL)
continue;
/* Skip current job */
if (aio->curjob) {
a_job = aio->curjob;
if (a_job->aiocb_uptr == aiocbp_list[i])
continue;
}
/* Look for a job in the queue */
TAILQ_FOREACH(a_job, &aio->jobs_queue, list)
if (a_job->aiocb_uptr == aiocbp_list[i])
break;
if (a_job == NULL) {
struct aiocb aiocbp;
mutex_exit(&aio->aio_mtx);
/* Check if the job is done. */
error = copyin(aiocbp_list[i], &aiocbp,
sizeof(struct aiocb));
if (error == 0 && aiocbp._state != JOB_DONE) {
mutex_enter(&aio->aio_mtx);
continue;
}
return error;
}
}
/* Wait for a signal or when timeout occurs */
error = cv_timedwait_sig(&aio->done_cv, &aio->aio_mtx, timo);
if (error) {
if (error == EWOULDBLOCK)
error = SET_ERROR(EAGAIN);
break;
}
}
mutex_exit(&aio->aio_mtx);
return error;
}
/* Non-accurate checks for the limit and invalid values */
if (nent < 1 || nent > aio_listio_max)
return SET_ERROR(EINVAL);
if (aio_jobs_count + nent > aio_max)
return SET_ERROR(EAGAIN);
/* Check if AIO structure is initialized, if not - initialize it */
if (p->p_aio == NULL)
if (aio_procinit(p))
return SET_ERROR(EAGAIN);
aio = p->p_aio;
if (error != 0) {
pool_put(&aio_lio_pool, lio);
return error;
}
/* Get the list from user-space */
aiocbp_list = kmem_alloc(nent * sizeof(*aiocbp_list), KM_SLEEP);
error = copyin(SCARG(uap, list), aiocbp_list,
nent * sizeof(*aiocbp_list));
if (error) {
mutex_enter(&aio->aio_mtx);
goto err;
}
/* Enqueue all jobs */
errcnt = 0;
for (i = 0; i < nent; i++) {
error = aio_enqueue_job(AIO_LIO, aiocbp_list[i], lio);
/*
* According to POSIX, in such error case it may
* fail with other I/O operations initiated.
*/
if (error)
errcnt++;
}
mutex_enter(&aio->aio_mtx);
/* Return an error, if any */
if (errcnt) {
error = SET_ERROR(EIO);
goto err;
}
if (mode == LIO_WAIT) {
/*
* Wait for AIO completion. In such case,
* the LIO structure will be freed here.
*/
while (lio->refcnt > 1 && error == 0)
error = cv_wait_sig(&aio->done_cv, &aio->aio_mtx);
if (error)
error = SET_ERROR(EINTR);
}
if (p == NULL) {
(*pr)("AIO: We are not in the processes right now.\n");
return;
}
aio = p->p_aio;
if (aio == NULL) {
(*pr)("AIO data is not initialized (PID = %d).\n", p->p_pid);
return;
}
(*pr)("AIO: PID = %d\n", p->p_pid);
(*pr)("AIO: Global count of the jobs = %u\n", aio_jobs_count);
(*pr)("AIO: Count of the jobs = %u\n", aio->jobs_count);