| /* |
| * Copyright (c) 2000-2001 Silicon Graphics, Inc. |
| * All Rights Reserved. |
| * |
| * This program is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU General Public License as |
| * published by the Free Software Foundation. |
| * |
| * This program is distributed in the hope that it would be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| * GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License |
| * along with this program; if not, write the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
| */ |
| |
| #include <sys/types.h> |
| #include <sys/mman.h> |
| #include <errno.h> |
| #include <signal.h> |
| #include <time.h> |
| #include <stdlib.h> |
| #include <unistd.h> |
| #include <malloc.h> |
| #include <assert.h> |
| |
| #include "config.h" |
| |
| #include "types.h" |
| #include "qlock.h" |
| #include "cldmgr.h" |
| #include "ring.h" |
| |
| static int ring_worker_entry(void *ringctxp); |
| |
| ring_t * |
| ring_create(size_t ringlen, |
| size_t bufsz, |
| bool_t pinpr, |
| ix_t drive_index, |
| int (*readfunc)(void *clientctxp, char *bufp), |
| int (*writefunc)(void *clientctxp, char *bufp), |
| void *clientctxp, |
| int *rvalp) |
| { |
| bool_t ok; |
| ring_t *ringp; |
| size_t mix; |
| |
| /* pre-initialize return value |
| */ |
| *rvalp = 0; |
| |
| /* allocate a ring descriptor |
| */ |
| ringp = (ring_t *)calloc(1, sizeof(ring_t)); |
| assert(ringp); |
| ringp->r_len = ringlen; |
| ringp->r_clientctxp = clientctxp; |
| ringp->r_readfunc = readfunc; |
| ringp->r_writefunc = writefunc; |
| |
| /* allocate counting semaphores for the ready and active queues, |
| * and initialize the queue input and output indices. |
| */ |
| ringp->r_ready_qsemh = qsem_alloc(ringlen); |
| ringp->r_active_qsemh = qsem_alloc(0); |
| ringp->r_ready_in_ix = 0; |
| ringp->r_ready_out_ix = 0; |
| ringp->r_active_in_ix = 0; |
| ringp->r_active_out_ix = 0; |
| ringp->r_client_cnt = 0; |
| ringp->r_worker_cnt = 0; |
| |
| /* initialize the meters |
| */ |
| ringp->r_client_msgcnt = 0; |
| ringp->r_worker_msgcnt = 0; |
| ringp->r_client_blkcnt = 0; |
| ringp->r_worker_blkcnt = 0; |
| ringp->r_first_io_time = 0; |
| ringp->r_all_io_cnt = 0; |
| |
| /* allocate the ring messages |
| */ |
| ringp->r_msgp = (ring_msg_t *)calloc(ringlen, sizeof(ring_msg_t)); |
| assert(ringp->r_msgp); |
| |
| /* allocate the buffers and initialize the messages |
| */ |
| for (mix = 0; mix < ringlen; mix++) { |
| ring_msg_t *msgp = &ringp->r_msgp[mix]; |
| msgp->rm_mix = mix; |
| msgp->rm_op = RING_OP_NONE; |
| msgp->rm_stat = RING_STAT_INIT; |
| msgp->rm_user = 0; |
| msgp->rm_loc = RING_LOC_READY; |
| |
| msgp->rm_bufp = (char *)memalign(PGSZ, bufsz); |
| if (!msgp->rm_bufp) { |
| *rvalp = ENOMEM; |
| return 0; |
| } |
| if (pinpr) { |
| int rval; |
| rval = mlock((void *)msgp->rm_bufp, bufsz); |
| if (rval) { |
| if (errno == ENOMEM) { |
| *rvalp = E2BIG; |
| return 0; |
| } |
| if (errno == EPERM) { |
| *rvalp = EPERM; |
| return 0; |
| } |
| assert(0); |
| } |
| } |
| } |
| |
| /* kick off the worker thread |
| */ |
| ok = cldmgr_create(ring_worker_entry, |
| drive_index, |
| _("worker"), |
| ringp); |
| assert(ok); |
| |
| return ringp; |
| } |
| |
| ring_msg_t * |
| ring_get(ring_t *ringp) |
| { |
| ring_msg_t *msgp; |
| |
| /* assert client currently holds no messages |
| */ |
| assert(ringp->r_client_cnt == 0); |
| |
| /* bump client message count and note if client needs to block |
| */ |
| ringp->r_client_msgcnt++; |
| if (qsemPwouldblock(ringp->r_ready_qsemh)) { |
| ringp->r_client_blkcnt++; |
| } |
| |
| /* block until msg available on ready queue ("P") |
| */ |
| qsemP(ringp->r_ready_qsemh); |
| |
| /* get a pointer to the next msg on the queue |
| */ |
| msgp = &ringp->r_msgp[ringp->r_ready_out_ix]; |
| |
| /* assert the message is where it belongs |
| */ |
| assert(msgp->rm_loc == RING_LOC_READY); |
| |
| /* verify the message index has not become corrupted |
| */ |
| assert(msgp->rm_mix == ringp->r_ready_out_ix); |
| |
| /* bump the output index |
| */ |
| ringp->r_ready_out_ix = (ringp->r_ready_out_ix + 1) |
| % |
| ringp->r_len; |
| |
| /* update the message location |
| */ |
| msgp->rm_loc = RING_LOC_CLIENT; |
| |
| /* bump the count of messages held by the client |
| */ |
| ringp->r_client_cnt++; |
| |
| /* return the msg to the client |
| */ |
| return msgp; |
| } |
| |
| void |
| ring_put(ring_t *ringp, ring_msg_t *msgp) |
| { |
| /* assert the client holds exactly one message |
| */ |
| assert(ringp->r_client_cnt == 1); |
| |
| /* assert the client is returning the right message |
| */ |
| assert(msgp->rm_mix == ringp->r_active_in_ix); |
| |
| /* assert the message is where it belongs |
| */ |
| assert(msgp->rm_loc == RING_LOC_CLIENT); |
| |
| /* decrement the count of messages held by the client |
| */ |
| ringp->r_client_cnt--; |
| |
| /* update the message location |
| */ |
| msgp->rm_loc = RING_LOC_ACTIVE; |
| |
| /* bump the active queue input ix |
| */ |
| ringp->r_active_in_ix = (ringp->r_active_in_ix + 1) |
| % |
| ringp->r_len; |
| |
| /* bump the semaphore for the active queue ("V") |
| */ |
| qsemV(ringp->r_active_qsemh); |
| } |
| |
| void |
| ring_reset(ring_t *ringp, ring_msg_t *msgp) |
| { |
| size_t mix; |
| |
| /* if the client is not holding a message, get the next message |
| */ |
| if (ringp->r_client_cnt == 0) { |
| assert(!msgp); |
| msgp = ring_get(ringp); |
| assert(msgp); |
| assert(ringp->r_client_cnt == 1); |
| } else { |
| assert(msgp); |
| assert(ringp->r_client_cnt == 1); |
| } |
| |
| /* tell the worker to abort |
| */ |
| msgp->rm_op = RING_OP_RESET; |
| ring_put(ringp, msgp); |
| |
| /* wait for the reset to be acknowledged |
| */ |
| assert(ringp->r_client_cnt == 0); |
| do { |
| /* pull a message from the ready queue |
| */ |
| qsemP(ringp->r_ready_qsemh); |
| msgp = &ringp->r_msgp[ringp->r_ready_out_ix]; |
| assert(msgp->rm_loc == RING_LOC_READY); |
| ringp->r_ready_out_ix = (ringp->r_ready_out_ix + 1) |
| % |
| ringp->r_len; |
| ringp->r_client_cnt++; |
| } while (msgp->rm_stat != RING_STAT_RESETACK); |
| assert(ringp->r_client_cnt == ringp->r_len); |
| |
| /* re-initialize the ring |
| */ |
| assert(qsemPavail(ringp->r_ready_qsemh) == 0); |
| assert(qsemPavail(ringp->r_active_qsemh) == 0); |
| ringp->r_ready_in_ix = 0; |
| ringp->r_ready_out_ix = 0; |
| ringp->r_active_in_ix = 0; |
| ringp->r_active_out_ix = 0; |
| ringp->r_client_cnt = 0; |
| ringp->r_worker_cnt = 0; |
| for (mix = 0; mix < ringp->r_len; mix++) { |
| ring_msg_t *msgp = &ringp->r_msgp[mix]; |
| msgp->rm_mix = mix; |
| msgp->rm_op = RING_OP_NONE; |
| msgp->rm_stat = RING_STAT_INIT; |
| msgp->rm_user = 0; |
| msgp->rm_loc = RING_LOC_READY; |
| qsemV(ringp->r_ready_qsemh); |
| } |
| assert(qsemPavail(ringp->r_ready_qsemh) == ringp->r_len); |
| assert(qsemPavail(ringp->r_active_qsemh) == 0); |
| } |
| |
| void |
| ring_destroy(ring_t *ringp) |
| { |
| ring_msg_t *msgp; |
| |
| /* the client must not be holding a message |
| */ |
| assert(ringp->r_client_cnt == 0); |
| |
| /* get a message |
| */ |
| msgp = ring_get(ringp); |
| |
| /* tell the worker to exit |
| */ |
| msgp->rm_op = RING_OP_DIE; |
| ring_put(ringp, msgp); |
| |
| /* wait for the die to be acknowledged |
| */ |
| do { |
| /* pull a message from the ready queue |
| */ |
| qsemP(ringp->r_ready_qsemh); |
| msgp = &ringp->r_msgp[ringp->r_ready_out_ix]; |
| assert(msgp->rm_loc == RING_LOC_READY); |
| ringp->r_ready_out_ix = (ringp->r_ready_out_ix + 1) |
| % |
| ringp->r_len; |
| } while (msgp->rm_stat != RING_STAT_DIEACK); |
| |
| /* the worker is dead. |
| */ |
| qsem_free(ringp->r_ready_qsemh); |
| qsem_free(ringp->r_active_qsemh); |
| free((void *)ringp); |
| } |
| |
| |
| static ring_msg_t * |
| ring_worker_get(ring_t *ringp) |
| { |
| ring_msg_t *msgp; |
| |
| /* assert worker currently holds no messages |
| */ |
| assert(ringp->r_worker_cnt == 0); |
| |
| /* bump worker message count and note if worker needs to block |
| */ |
| ringp->r_worker_msgcnt++; |
| if (qsemPwouldblock(ringp->r_active_qsemh)) { |
| ringp->r_worker_blkcnt++; |
| } |
| |
| /* block until msg available on active queue ("P") |
| */ |
| qsemP(ringp->r_active_qsemh); |
| |
| /* get a pointer to the next msg on the queue |
| */ |
| msgp = &ringp->r_msgp[ringp->r_active_out_ix]; |
| |
| /* assert the message is where it belongs |
| */ |
| assert(msgp->rm_loc == RING_LOC_ACTIVE); |
| |
| /* verify the message index has not become corrupted |
| */ |
| assert(msgp->rm_mix == ringp->r_active_out_ix); |
| |
| /* bump the output index |
| */ |
| ringp->r_active_out_ix = (ringp->r_active_out_ix + 1) |
| % |
| ringp->r_len; |
| |
| /* update the message location |
| */ |
| msgp->rm_loc = RING_LOC_SLAVE; |
| |
| /* bump the count of messages held by the worker |
| */ |
| ringp->r_worker_cnt++; |
| |
| /* return the msg to the worker |
| */ |
| return msgp; |
| } |
| |
| static void |
| ring_worker_put(ring_t *ringp, ring_msg_t *msgp) |
| { |
| /* assert the worker holds exactly one message |
| */ |
| assert(ringp->r_worker_cnt == 1); |
| |
| /* assert the worker is returning the right message |
| */ |
| assert(msgp->rm_mix == ringp->r_ready_in_ix); |
| |
| /* assert the message is where it belongs |
| */ |
| assert(msgp->rm_loc == RING_LOC_SLAVE); |
| |
| /* decrement the count of messages held by the worker |
| */ |
| ringp->r_worker_cnt--; |
| |
| /* update the message location |
| */ |
| msgp->rm_loc = RING_LOC_READY; |
| |
| /* bump the ready queue input ix |
| */ |
| ringp->r_ready_in_ix = (ringp->r_ready_in_ix + 1) |
| % |
| ringp->r_len; |
| |
| /* bump the semaphore for the ready queue ("V") |
| */ |
| qsemV(ringp->r_ready_qsemh); |
| } |
| |
| static int |
| ring_worker_entry(void *ringctxp) |
| { |
| sigset_t blocked_set; |
| ring_t *ringp = (ring_t *)ringctxp; |
| enum { LOOPMODE_NORMAL, LOOPMODE_IGNORE, LOOPMODE_DIE } loopmode; |
| |
| /* block signals, let the main thread handle them |
| */ |
| sigemptyset(&blocked_set); |
| sigaddset(&blocked_set, SIGINT); |
| sigaddset(&blocked_set, SIGHUP); |
| sigaddset(&blocked_set, SIGTERM); |
| sigaddset(&blocked_set, SIGQUIT); |
| sigaddset(&blocked_set, SIGALRM); |
| pthread_sigmask(SIG_SETMASK, &blocked_set, NULL); |
| |
| /* loop reading and precessing messages until told to die |
| */ |
| for (loopmode = LOOPMODE_NORMAL; loopmode != LOOPMODE_DIE;) { |
| ring_msg_t *msgp; |
| int rval; |
| |
| msgp = ring_worker_get(ringp); |
| msgp->rm_rval = 0; |
| |
| switch(msgp->rm_op) { |
| case RING_OP_READ: |
| if (loopmode == LOOPMODE_IGNORE) { |
| msgp->rm_stat = RING_STAT_IGNORE; |
| break; |
| } |
| if (!ringp->r_first_io_time) { |
| ringp->r_first_io_time = time(0); |
| assert(ringp->r_first_io_time); |
| } |
| rval = (ringp->r_readfunc)(ringp->r_clientctxp, |
| msgp->rm_bufp); |
| msgp->rm_rval = rval; |
| ringp->r_all_io_cnt++; |
| if (msgp->rm_rval == 0) { |
| msgp->rm_stat = RING_STAT_OK; |
| } else { |
| msgp->rm_stat = RING_STAT_ERROR; |
| loopmode = LOOPMODE_IGNORE; |
| } |
| break; |
| case RING_OP_WRITE: |
| if (loopmode == LOOPMODE_IGNORE) { |
| msgp->rm_stat = RING_STAT_IGNORE; |
| break; |
| } |
| if (!ringp->r_first_io_time) { |
| ringp->r_first_io_time = time(0); |
| assert(ringp->r_first_io_time); |
| } |
| rval = (ringp->r_writefunc)(ringp->r_clientctxp, |
| msgp->rm_bufp); |
| msgp->rm_rval = rval; |
| ringp->r_all_io_cnt++; |
| if (msgp->rm_rval == 0) { |
| msgp->rm_stat = RING_STAT_OK; |
| } else { |
| msgp->rm_stat = RING_STAT_ERROR; |
| loopmode = LOOPMODE_IGNORE; |
| } |
| break; |
| case RING_OP_NOP: |
| msgp->rm_stat = RING_STAT_NOPACK; |
| break; |
| case RING_OP_TRACE: |
| msgp->rm_stat = RING_STAT_IGNORE; |
| break; |
| case RING_OP_RESET: |
| loopmode = LOOPMODE_NORMAL; |
| msgp->rm_stat = RING_STAT_RESETACK; |
| break; |
| case RING_OP_DIE: |
| msgp->rm_stat = RING_STAT_DIEACK; |
| loopmode = LOOPMODE_DIE; |
| break; |
| default: |
| msgp->rm_stat = RING_STAT_IGNORE; |
| break; |
| } |
| ring_worker_put(ringp, msgp); |
| } |
| |
| return 0; |
| } |