blob: 7c6b499562dc83086fb74b427716cadc61d724dc [file] [log] [blame]
/*
* Copyright (c) 2000-2001 Silicon Graphics, Inc.
* All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it would be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <sys/types.h>
#include <sys/mman.h>
#include <errno.h>
#include <signal.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
#include <malloc.h>
#include <assert.h>
#include "config.h"
#include "types.h"
#include "qlock.h"
#include "cldmgr.h"
#include "ring.h"
static int ring_worker_entry(void *ringctxp);
ring_t *
ring_create(size_t ringlen,
size_t bufsz,
bool_t pinpr,
ix_t drive_index,
int (*readfunc)(void *clientctxp, char *bufp),
int (*writefunc)(void *clientctxp, char *bufp),
void *clientctxp,
int *rvalp)
{
bool_t ok;
ring_t *ringp;
size_t mix;
/* pre-initialize return value
*/
*rvalp = 0;
/* allocate a ring descriptor
*/
ringp = (ring_t *)calloc(1, sizeof(ring_t));
assert(ringp);
ringp->r_len = ringlen;
ringp->r_clientctxp = clientctxp;
ringp->r_readfunc = readfunc;
ringp->r_writefunc = writefunc;
/* allocate counting semaphores for the ready and active queues,
* and initialize the queue input and output indices.
*/
ringp->r_ready_qsemh = qsem_alloc(ringlen);
ringp->r_active_qsemh = qsem_alloc(0);
ringp->r_ready_in_ix = 0;
ringp->r_ready_out_ix = 0;
ringp->r_active_in_ix = 0;
ringp->r_active_out_ix = 0;
ringp->r_client_cnt = 0;
ringp->r_worker_cnt = 0;
/* initialize the meters
*/
ringp->r_client_msgcnt = 0;
ringp->r_worker_msgcnt = 0;
ringp->r_client_blkcnt = 0;
ringp->r_worker_blkcnt = 0;
ringp->r_first_io_time = 0;
ringp->r_all_io_cnt = 0;
/* allocate the ring messages
*/
ringp->r_msgp = (ring_msg_t *)calloc(ringlen, sizeof(ring_msg_t));
assert(ringp->r_msgp);
/* allocate the buffers and initialize the messages
*/
for (mix = 0; mix < ringlen; mix++) {
ring_msg_t *msgp = &ringp->r_msgp[mix];
msgp->rm_mix = mix;
msgp->rm_op = RING_OP_NONE;
msgp->rm_stat = RING_STAT_INIT;
msgp->rm_user = 0;
msgp->rm_loc = RING_LOC_READY;
msgp->rm_bufp = (char *)memalign(PGSZ, bufsz);
if (!msgp->rm_bufp) {
*rvalp = ENOMEM;
return 0;
}
if (pinpr) {
int rval;
rval = mlock((void *)msgp->rm_bufp, bufsz);
if (rval) {
if (errno == ENOMEM) {
*rvalp = E2BIG;
return 0;
}
if (errno == EPERM) {
*rvalp = EPERM;
return 0;
}
assert(0);
}
}
}
/* kick off the worker thread
*/
ok = cldmgr_create(ring_worker_entry,
drive_index,
_("worker"),
ringp);
assert(ok);
return ringp;
}
ring_msg_t *
ring_get(ring_t *ringp)
{
ring_msg_t *msgp;
/* assert client currently holds no messages
*/
assert(ringp->r_client_cnt == 0);
/* bump client message count and note if client needs to block
*/
ringp->r_client_msgcnt++;
if (qsemPwouldblock(ringp->r_ready_qsemh)) {
ringp->r_client_blkcnt++;
}
/* block until msg available on ready queue ("P")
*/
qsemP(ringp->r_ready_qsemh);
/* get a pointer to the next msg on the queue
*/
msgp = &ringp->r_msgp[ringp->r_ready_out_ix];
/* assert the message is where it belongs
*/
assert(msgp->rm_loc == RING_LOC_READY);
/* verify the message index has not become corrupted
*/
assert(msgp->rm_mix == ringp->r_ready_out_ix);
/* bump the output index
*/
ringp->r_ready_out_ix = (ringp->r_ready_out_ix + 1)
%
ringp->r_len;
/* update the message location
*/
msgp->rm_loc = RING_LOC_CLIENT;
/* bump the count of messages held by the client
*/
ringp->r_client_cnt++;
/* return the msg to the client
*/
return msgp;
}
void
ring_put(ring_t *ringp, ring_msg_t *msgp)
{
/* assert the client holds exactly one message
*/
assert(ringp->r_client_cnt == 1);
/* assert the client is returning the right message
*/
assert(msgp->rm_mix == ringp->r_active_in_ix);
/* assert the message is where it belongs
*/
assert(msgp->rm_loc == RING_LOC_CLIENT);
/* decrement the count of messages held by the client
*/
ringp->r_client_cnt--;
/* update the message location
*/
msgp->rm_loc = RING_LOC_ACTIVE;
/* bump the active queue input ix
*/
ringp->r_active_in_ix = (ringp->r_active_in_ix + 1)
%
ringp->r_len;
/* bump the semaphore for the active queue ("V")
*/
qsemV(ringp->r_active_qsemh);
}
void
ring_reset(ring_t *ringp, ring_msg_t *msgp)
{
size_t mix;
/* if the client is not holding a message, get the next message
*/
if (ringp->r_client_cnt == 0) {
assert(!msgp);
msgp = ring_get(ringp);
assert(msgp);
assert(ringp->r_client_cnt == 1);
} else {
assert(msgp);
assert(ringp->r_client_cnt == 1);
}
/* tell the worker to abort
*/
msgp->rm_op = RING_OP_RESET;
ring_put(ringp, msgp);
/* wait for the reset to be acknowledged
*/
assert(ringp->r_client_cnt == 0);
do {
/* pull a message from the ready queue
*/
qsemP(ringp->r_ready_qsemh);
msgp = &ringp->r_msgp[ringp->r_ready_out_ix];
assert(msgp->rm_loc == RING_LOC_READY);
ringp->r_ready_out_ix = (ringp->r_ready_out_ix + 1)
%
ringp->r_len;
ringp->r_client_cnt++;
} while (msgp->rm_stat != RING_STAT_RESETACK);
assert(ringp->r_client_cnt == ringp->r_len);
/* re-initialize the ring
*/
assert(qsemPavail(ringp->r_ready_qsemh) == 0);
assert(qsemPavail(ringp->r_active_qsemh) == 0);
ringp->r_ready_in_ix = 0;
ringp->r_ready_out_ix = 0;
ringp->r_active_in_ix = 0;
ringp->r_active_out_ix = 0;
ringp->r_client_cnt = 0;
ringp->r_worker_cnt = 0;
for (mix = 0; mix < ringp->r_len; mix++) {
ring_msg_t *msgp = &ringp->r_msgp[mix];
msgp->rm_mix = mix;
msgp->rm_op = RING_OP_NONE;
msgp->rm_stat = RING_STAT_INIT;
msgp->rm_user = 0;
msgp->rm_loc = RING_LOC_READY;
qsemV(ringp->r_ready_qsemh);
}
assert(qsemPavail(ringp->r_ready_qsemh) == ringp->r_len);
assert(qsemPavail(ringp->r_active_qsemh) == 0);
}
void
ring_destroy(ring_t *ringp)
{
ring_msg_t *msgp;
/* the client must not be holding a message
*/
assert(ringp->r_client_cnt == 0);
/* get a message
*/
msgp = ring_get(ringp);
/* tell the worker to exit
*/
msgp->rm_op = RING_OP_DIE;
ring_put(ringp, msgp);
/* wait for the die to be acknowledged
*/
do {
/* pull a message from the ready queue
*/
qsemP(ringp->r_ready_qsemh);
msgp = &ringp->r_msgp[ringp->r_ready_out_ix];
assert(msgp->rm_loc == RING_LOC_READY);
ringp->r_ready_out_ix = (ringp->r_ready_out_ix + 1)
%
ringp->r_len;
} while (msgp->rm_stat != RING_STAT_DIEACK);
/* the worker is dead.
*/
qsem_free(ringp->r_ready_qsemh);
qsem_free(ringp->r_active_qsemh);
free((void *)ringp);
}
static ring_msg_t *
ring_worker_get(ring_t *ringp)
{
ring_msg_t *msgp;
/* assert worker currently holds no messages
*/
assert(ringp->r_worker_cnt == 0);
/* bump worker message count and note if worker needs to block
*/
ringp->r_worker_msgcnt++;
if (qsemPwouldblock(ringp->r_active_qsemh)) {
ringp->r_worker_blkcnt++;
}
/* block until msg available on active queue ("P")
*/
qsemP(ringp->r_active_qsemh);
/* get a pointer to the next msg on the queue
*/
msgp = &ringp->r_msgp[ringp->r_active_out_ix];
/* assert the message is where it belongs
*/
assert(msgp->rm_loc == RING_LOC_ACTIVE);
/* verify the message index has not become corrupted
*/
assert(msgp->rm_mix == ringp->r_active_out_ix);
/* bump the output index
*/
ringp->r_active_out_ix = (ringp->r_active_out_ix + 1)
%
ringp->r_len;
/* update the message location
*/
msgp->rm_loc = RING_LOC_SLAVE;
/* bump the count of messages held by the worker
*/
ringp->r_worker_cnt++;
/* return the msg to the worker
*/
return msgp;
}
static void
ring_worker_put(ring_t *ringp, ring_msg_t *msgp)
{
/* assert the worker holds exactly one message
*/
assert(ringp->r_worker_cnt == 1);
/* assert the worker is returning the right message
*/
assert(msgp->rm_mix == ringp->r_ready_in_ix);
/* assert the message is where it belongs
*/
assert(msgp->rm_loc == RING_LOC_SLAVE);
/* decrement the count of messages held by the worker
*/
ringp->r_worker_cnt--;
/* update the message location
*/
msgp->rm_loc = RING_LOC_READY;
/* bump the ready queue input ix
*/
ringp->r_ready_in_ix = (ringp->r_ready_in_ix + 1)
%
ringp->r_len;
/* bump the semaphore for the ready queue ("V")
*/
qsemV(ringp->r_ready_qsemh);
}
static int
ring_worker_entry(void *ringctxp)
{
sigset_t blocked_set;
ring_t *ringp = (ring_t *)ringctxp;
enum { LOOPMODE_NORMAL, LOOPMODE_IGNORE, LOOPMODE_DIE } loopmode;
/* block signals, let the main thread handle them
*/
sigemptyset(&blocked_set);
sigaddset(&blocked_set, SIGINT);
sigaddset(&blocked_set, SIGHUP);
sigaddset(&blocked_set, SIGTERM);
sigaddset(&blocked_set, SIGQUIT);
sigaddset(&blocked_set, SIGALRM);
pthread_sigmask(SIG_SETMASK, &blocked_set, NULL);
/* loop reading and precessing messages until told to die
*/
for (loopmode = LOOPMODE_NORMAL; loopmode != LOOPMODE_DIE;) {
ring_msg_t *msgp;
int rval;
msgp = ring_worker_get(ringp);
msgp->rm_rval = 0;
switch(msgp->rm_op) {
case RING_OP_READ:
if (loopmode == LOOPMODE_IGNORE) {
msgp->rm_stat = RING_STAT_IGNORE;
break;
}
if (!ringp->r_first_io_time) {
ringp->r_first_io_time = time(0);
assert(ringp->r_first_io_time);
}
rval = (ringp->r_readfunc)(ringp->r_clientctxp,
msgp->rm_bufp);
msgp->rm_rval = rval;
ringp->r_all_io_cnt++;
if (msgp->rm_rval == 0) {
msgp->rm_stat = RING_STAT_OK;
} else {
msgp->rm_stat = RING_STAT_ERROR;
loopmode = LOOPMODE_IGNORE;
}
break;
case RING_OP_WRITE:
if (loopmode == LOOPMODE_IGNORE) {
msgp->rm_stat = RING_STAT_IGNORE;
break;
}
if (!ringp->r_first_io_time) {
ringp->r_first_io_time = time(0);
assert(ringp->r_first_io_time);
}
rval = (ringp->r_writefunc)(ringp->r_clientctxp,
msgp->rm_bufp);
msgp->rm_rval = rval;
ringp->r_all_io_cnt++;
if (msgp->rm_rval == 0) {
msgp->rm_stat = RING_STAT_OK;
} else {
msgp->rm_stat = RING_STAT_ERROR;
loopmode = LOOPMODE_IGNORE;
}
break;
case RING_OP_NOP:
msgp->rm_stat = RING_STAT_NOPACK;
break;
case RING_OP_TRACE:
msgp->rm_stat = RING_STAT_IGNORE;
break;
case RING_OP_RESET:
loopmode = LOOPMODE_NORMAL;
msgp->rm_stat = RING_STAT_RESETACK;
break;
case RING_OP_DIE:
msgp->rm_stat = RING_STAT_DIEACK;
loopmode = LOOPMODE_DIE;
break;
default:
msgp->rm_stat = RING_STAT_IGNORE;
break;
}
ring_worker_put(ringp, msgp);
}
return 0;
}