blob: f5055eeefed0c66f512dac9182c8050783ceedf9 [file] [log] [blame]
/*
* Copyright (c) 2000-2001 Silicon Graphics, Inc.
* All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it would be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <sys/types.h>
#include <sys/mman.h>
#include <errno.h>
#include <signal.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
#include <malloc.h>
#include <assert.h>
#include "config.h"
#include "types.h"
#include "qlock.h"
#include "cldmgr.h"
#include "ring.h"
static int ring_slave_entry( void *ringctxp );
ring_t *
ring_create( size_t ringlen,
size_t bufsz,
bool_t pinpr,
ix_t drive_index,
int ( *readfunc )( void *clientctxp, char *bufp ),
int ( *writefunc )( void *clientctxp, char *bufp ),
void *clientctxp,
int *rvalp )
{
bool_t ok;
ring_t *ringp;
size_t mix;
/* pre-initialize return value
*/
*rvalp = 0;
/* allocate a ring descriptor
*/
ringp = ( ring_t * )calloc( 1, sizeof( ring_t ));
assert( ringp );
ringp->r_len = ringlen;
ringp->r_clientctxp = clientctxp;
ringp->r_readfunc = readfunc;
ringp->r_writefunc = writefunc;
/* allocate counting semaphores for the ready and active queues,
* and initialize the queue input and output indices.
*/
ringp->r_ready_qsemh = qsem_alloc( ringlen );
ringp->r_active_qsemh = qsem_alloc( 0 );
ringp->r_ready_in_ix = 0;
ringp->r_ready_out_ix = 0;
ringp->r_active_in_ix = 0;
ringp->r_active_out_ix = 0;
ringp->r_client_cnt = 0;
ringp->r_slave_cnt = 0;
/* initialize the meters
*/
ringp->r_client_msgcnt = 0;
ringp->r_slave_msgcnt = 0;
ringp->r_client_blkcnt = 0;
ringp->r_slave_blkcnt = 0;
ringp->r_first_io_time = 0;
ringp->r_all_io_cnt = 0;
/* allocate the ring messages
*/
ringp->r_msgp = ( ring_msg_t * )calloc( ringlen, sizeof( ring_msg_t ));
assert( ringp->r_msgp );
/* allocate the buffers and initialize the messages
*/
for ( mix = 0 ; mix < ringlen ; mix++ ) {
ring_msg_t *msgp = &ringp->r_msgp[ mix ];
msgp->rm_mix = mix;
msgp->rm_op = RING_OP_NONE;
msgp->rm_stat = RING_STAT_INIT;
msgp->rm_user = 0;
msgp->rm_loc = RING_LOC_READY;
msgp->rm_bufp = ( char * )memalign( PGSZ, bufsz );
if ( ! msgp->rm_bufp ) {
*rvalp = ENOMEM;
return 0;
}
if ( pinpr ) {
int rval;
rval = mlock( ( void * )msgp->rm_bufp, bufsz );
if ( rval ) {
if ( errno == ENOMEM ) {
*rvalp = E2BIG;
return 0;
}
if ( errno == EPERM ) {
*rvalp = EPERM;
return 0;
}
assert( 0 );
}
}
}
/* kick off the slave thread
*/
ok = cldmgr_create( ring_slave_entry,
drive_index,
_("slave"),
ringp );
assert( ok );
return ringp;
}
ring_msg_t *
ring_get( ring_t *ringp )
{
ring_msg_t *msgp;
/* assert client currently holds no messages
*/
assert( ringp->r_client_cnt == 0 );
/* bump client message count and note if client needs to block
*/
ringp->r_client_msgcnt++;
if ( qsemPwouldblock( ringp->r_ready_qsemh )) {
ringp->r_client_blkcnt++;
}
/* block until msg available on ready queue ("P")
*/
qsemP( ringp->r_ready_qsemh );
/* get a pointer to the next msg on the queue
*/
msgp = &ringp->r_msgp[ ringp->r_ready_out_ix ];
/* assert the message is where it belongs
*/
assert( msgp->rm_loc == RING_LOC_READY );
/* verify the message index has not become corrupted
*/
assert( msgp->rm_mix == ringp->r_ready_out_ix );
/* bump the output index
*/
ringp->r_ready_out_ix = ( ringp->r_ready_out_ix + 1 )
%
ringp->r_len;
/* update the message location
*/
msgp->rm_loc = RING_LOC_CLIENT;
/* bump the count of messages held by the client
*/
ringp->r_client_cnt++;
/* return the msg to the client
*/
return msgp;
}
void
ring_put( ring_t *ringp, ring_msg_t *msgp )
{
/* assert the client holds exactly one message
*/
assert( ringp->r_client_cnt == 1 );
/* assert the client is returning the right message
*/
assert( msgp->rm_mix == ringp->r_active_in_ix );
/* assert the message is where it belongs
*/
assert( msgp->rm_loc == RING_LOC_CLIENT );
/* decrement the count of messages held by the client
*/
ringp->r_client_cnt--;
/* update the message location
*/
msgp->rm_loc = RING_LOC_ACTIVE;
/* bump the active queue input ix
*/
ringp->r_active_in_ix = ( ringp->r_active_in_ix + 1 )
%
ringp->r_len;
/* bump the semaphore for the active queue ("V")
*/
qsemV( ringp->r_active_qsemh );
}
void
ring_reset( ring_t *ringp, ring_msg_t *msgp )
{
size_t mix;
/* if the client is not holding a message, get the next message
*/
if ( ringp->r_client_cnt == 0 ) {
assert( ! msgp );
msgp = ring_get( ringp );
assert( msgp );
assert( ringp->r_client_cnt == 1 );
} else {
assert( msgp );
assert( ringp->r_client_cnt == 1 );
}
/* tell the slave to abort
*/
msgp->rm_op = RING_OP_RESET;
ring_put( ringp, msgp );
/* wait for the reset to be acknowledged
*/
assert( ringp->r_client_cnt == 0 );
do {
/* pull a message from the ready queue
*/
qsemP( ringp->r_ready_qsemh );
msgp = &ringp->r_msgp[ ringp->r_ready_out_ix ];
assert( msgp->rm_loc == RING_LOC_READY );
ringp->r_ready_out_ix = ( ringp->r_ready_out_ix + 1 )
%
ringp->r_len;
ringp->r_client_cnt++;
} while ( msgp->rm_stat != RING_STAT_RESETACK );
assert( ringp->r_client_cnt == ringp->r_len );
/* re-initialize the ring
*/
assert( qsemPavail( ringp->r_ready_qsemh ) == 0 );
assert( qsemPavail( ringp->r_active_qsemh ) == 0 );
ringp->r_ready_in_ix = 0;
ringp->r_ready_out_ix = 0;
ringp->r_active_in_ix = 0;
ringp->r_active_out_ix = 0;
ringp->r_client_cnt = 0;
ringp->r_slave_cnt = 0;
for ( mix = 0 ; mix < ringp->r_len ; mix++ ) {
ring_msg_t *msgp = &ringp->r_msgp[ mix ];
msgp->rm_mix = mix;
msgp->rm_op = RING_OP_NONE;
msgp->rm_stat = RING_STAT_INIT;
msgp->rm_user = 0;
msgp->rm_loc = RING_LOC_READY;
qsemV( ringp->r_ready_qsemh );
}
assert( qsemPavail( ringp->r_ready_qsemh ) == ringp->r_len );
assert( qsemPavail( ringp->r_active_qsemh ) == 0 );
}
void
ring_destroy( ring_t *ringp )
{
ring_msg_t *msgp;
/* the client must not be holding a message
*/
assert( ringp->r_client_cnt == 0 );
/* get a message
*/
msgp = ring_get( ringp );
/* tell the slave to exit
*/
msgp->rm_op = RING_OP_DIE;
ring_put( ringp, msgp );
/* wait for the die to be acknowledged
*/
do {
/* pull a message from the ready queue
*/
qsemP( ringp->r_ready_qsemh );
msgp = &ringp->r_msgp[ ringp->r_ready_out_ix ];
assert( msgp->rm_loc == RING_LOC_READY );
ringp->r_ready_out_ix = ( ringp->r_ready_out_ix + 1 )
%
ringp->r_len;
} while ( msgp->rm_stat != RING_STAT_DIEACK );
/* the slave is dead.
*/
qsem_free( ringp->r_ready_qsemh );
qsem_free( ringp->r_active_qsemh );
free( ( void * )ringp );
}
static ring_msg_t *
ring_slave_get( ring_t *ringp )
{
ring_msg_t *msgp;
/* assert slave currently holds no messages
*/
assert( ringp->r_slave_cnt == 0 );
/* bump slave message count and note if slave needs to block
*/
ringp->r_slave_msgcnt++;
if ( qsemPwouldblock( ringp->r_active_qsemh )) {
ringp->r_slave_blkcnt++;
}
/* block until msg available on active queue ("P")
*/
qsemP( ringp->r_active_qsemh );
/* get a pointer to the next msg on the queue
*/
msgp = &ringp->r_msgp[ ringp->r_active_out_ix ];
/* assert the message is where it belongs
*/
assert( msgp->rm_loc == RING_LOC_ACTIVE );
/* verify the message index has not become corrupted
*/
assert( msgp->rm_mix == ringp->r_active_out_ix );
/* bump the output index
*/
ringp->r_active_out_ix = ( ringp->r_active_out_ix + 1 )
%
ringp->r_len;
/* update the message location
*/
msgp->rm_loc = RING_LOC_SLAVE;
/* bump the count of messages held by the slave
*/
ringp->r_slave_cnt++;
/* return the msg to the slave
*/
return msgp;
}
static void
ring_slave_put( ring_t *ringp, ring_msg_t *msgp )
{
/* assert the slave holds exactly one message
*/
assert( ringp->r_slave_cnt == 1 );
/* assert the slave is returning the right message
*/
assert( msgp->rm_mix == ringp->r_ready_in_ix );
/* assert the message is where it belongs
*/
assert( msgp->rm_loc == RING_LOC_SLAVE );
/* decrement the count of messages held by the slave
*/
ringp->r_slave_cnt--;
/* update the message location
*/
msgp->rm_loc = RING_LOC_READY;
/* bump the ready queue input ix
*/
ringp->r_ready_in_ix = ( ringp->r_ready_in_ix + 1 )
%
ringp->r_len;
/* bump the semaphore for the ready queue ("V")
*/
qsemV( ringp->r_ready_qsemh );
}
static int
ring_slave_entry( void *ringctxp )
{
sigset_t blocked_set;
ring_t *ringp = ( ring_t * )ringctxp;
enum { LOOPMODE_NORMAL, LOOPMODE_IGNORE, LOOPMODE_DIE } loopmode;
/* block signals, let the main thread handle them
*/
sigemptyset( &blocked_set );
sigaddset( &blocked_set, SIGINT );
sigaddset( &blocked_set, SIGHUP );
sigaddset( &blocked_set, SIGTERM );
sigaddset( &blocked_set, SIGQUIT );
sigaddset( &blocked_set, SIGALRM );
pthread_sigmask( SIG_SETMASK, &blocked_set, NULL );
/* loop reading and precessing messages until told to die
*/
for ( loopmode = LOOPMODE_NORMAL ; loopmode != LOOPMODE_DIE ; ) {
ring_msg_t *msgp;
int rval;
msgp = ring_slave_get( ringp );
msgp->rm_rval = 0;
switch( msgp->rm_op ) {
case RING_OP_READ:
if ( loopmode == LOOPMODE_IGNORE ) {
msgp->rm_stat = RING_STAT_IGNORE;
break;
}
if ( ! ringp->r_first_io_time ) {
ringp->r_first_io_time = time( 0 );
assert( ringp->r_first_io_time );
}
rval = ( ringp->r_readfunc )( ringp->r_clientctxp,
msgp->rm_bufp );
msgp->rm_rval = rval;
ringp->r_all_io_cnt++;
if ( msgp->rm_rval == 0 ) {
msgp->rm_stat = RING_STAT_OK;
} else {
msgp->rm_stat = RING_STAT_ERROR;
loopmode = LOOPMODE_IGNORE;
}
break;
case RING_OP_WRITE:
if ( loopmode == LOOPMODE_IGNORE ) {
msgp->rm_stat = RING_STAT_IGNORE;
break;
}
if ( ! ringp->r_first_io_time ) {
ringp->r_first_io_time = time( 0 );
assert( ringp->r_first_io_time );
}
rval = ( ringp->r_writefunc )( ringp->r_clientctxp,
msgp->rm_bufp );
msgp->rm_rval = rval;
ringp->r_all_io_cnt++;
if ( msgp->rm_rval == 0 ) {
msgp->rm_stat = RING_STAT_OK;
} else {
msgp->rm_stat = RING_STAT_ERROR;
loopmode = LOOPMODE_IGNORE;
}
break;
case RING_OP_NOP:
msgp->rm_stat = RING_STAT_NOPACK;
break;
case RING_OP_TRACE:
msgp->rm_stat = RING_STAT_IGNORE;
break;
case RING_OP_RESET:
loopmode = LOOPMODE_NORMAL;
msgp->rm_stat = RING_STAT_RESETACK;
break;
case RING_OP_DIE:
msgp->rm_stat = RING_STAT_DIEACK;
loopmode = LOOPMODE_DIE;
break;
default:
msgp->rm_stat = RING_STAT_IGNORE;
break;
}
ring_slave_put( ringp, msgp );
}
return 0;
}