| /* |
| * Copyright (C) 2009 Red Hat Inc. |
| * |
| * This application is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License as published by the Free |
| * Software Foundation; version 2. |
| * |
| * This application is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * General Public License for more details. |
| */ |
| |
| /** |
| * @file rteval-parserd.c |
| * @author David Sommerseth <davids@redhat.com> |
| * @date Thu Oct 15 11:59:27 2009 |
| * |
| * @brief Polls the rteval.submissionqueue table for notifications |
| * from new inserts and sends the file to a processing thread |
| * |
| * |
| * |
| */ |
| |
| #include <stdio.h> |
| #include <string.h> |
| #include <unistd.h> |
| #include <signal.h> |
| #include <pthread.h> |
| #include <sys/types.h> |
| #include <sys/stat.h> |
| #include <errno.h> |
| #include <assert.h> |
| |
| #include <eurephia_nullsafe.h> |
| #include <eurephia_values.h> |
| #include <configparser.h> |
| #include <pgsql.h> |
| #include <threadinfo.h> |
| #include <parsethread.h> |
| #include <argparser.h> |
| |
| #define DEFAULT_MSG_MAX 5 /**< Default size of the message queue */ |
| #define XMLPARSER_XSL "xmlparser.xsl" /**< rteval report parser XSLT, parses XML into database friendly data*/ |
| |
| static int shutdown = 0; /**< Variable indicating if the program should shutdown */ |
| static LogContext *logctx = NULL; /**< Initialsed log context, to be used by sigcatch() */ |
| |
| |
| /** |
| * Simple signal catcher. Used for SIGINT and SIGTERM signals, and will set the global shutdown |
| * shutdown flag. It's expected that all threads behaves properly and exits as soon as their current |
| * work is completed |
| * |
| * @param sig Recieved signal (not used) |
| */ |
| void sigcatch(int sig) { |
| switch( sig ) { |
| case SIGINT: |
| case SIGTERM: |
| if( shutdown == 0 ) { |
| shutdown = 1; |
| writelog(logctx, LOG_INFO, "[SIGNAL] Shutting down"); |
| } else { |
| writelog(logctx, LOG_INFO, "[SIGNAL] Shutdown in progress ... please be patient ..."); |
| } |
| break; |
| |
| case SIGUSR1: |
| writelog(logctx, LOG_EMERG, "[SIGNAL] Shutdown alarm from a worker thread"); |
| shutdown = 1; |
| break; |
| |
| default: |
| break; |
| } |
| |
| // re-enable signals, to avoid brute force exits. |
| // If brute force is needed, SIGKILL is available. |
| signal(sig, sigcatch); |
| } |
| |
| |
| /** |
| * Opens and reads /proc/sys/fs/mqueue/msg_max, to get the maximum number of allowed messages |
| * on POSIX MQ queues. rteval-parserd will use as much of this as possible when needed. |
| * |
| * @return Returns the system msg_max value, or DEFAULT_MSG_MAX on failure to read the setting. |
| */ |
| unsigned int get_mqueue_msg_max(LogContext *log) { |
| FILE *fp = NULL; |
| char buf[130]; |
| unsigned int msg_max = DEFAULT_MSG_MAX; |
| |
| fp = fopen("/proc/sys/fs/mqueue/msg_max", "r"); |
| if( !fp ) { |
| writelog(log, LOG_WARNING, |
| "Could not open /proc/sys/fs/mqueue/msg_max, defaulting to %i", |
| msg_max); |
| writelog(log, LOG_INFO, "%s", strerror(errno)); |
| return msg_max; |
| } |
| |
| memset(&buf, 0, 130); |
| if( fread(&buf, 1, 128, fp) < 1 ) { |
| writelog(log, LOG_WARNING, |
| "Could not read /proc/sys/fs/mqueue/msg_max, defaulting to %i", |
| msg_max); |
| writelog(log, LOG_INFO, "%s", strerror(errno)); |
| } else { |
| msg_max = atoi_nullsafe(buf); |
| if( msg_max < 1 ) { |
| msg_max = DEFAULT_MSG_MAX; |
| writelog(log, LOG_WARNING, |
| "Failed to parse /proc/sys/fs/mqueue/msg_max," |
| "defaulting to %i", msg_max); |
| } |
| } |
| fclose(fp); |
| return msg_max; |
| } |
| |
| |
| /** |
| * Main loop, which polls the submissionqueue table and puts jobs found here into a POSIX MQ queue |
| * which the worker threads will pick up. |
| * |
| * @param dbc Database connection, where to query the submission queue |
| * @param msgq file descriptor for the message queue |
| * @param activethreads Pointer to an int value containing active worker threads. Each thread updates |
| * this value directly, and this function should only read it. |
| * |
| * @return Returns 0 on successful run, otherwise > 0 on errors. |
| */ |
| int process_submission_queue(dbconn *dbc, mqd_t msgq, int *activethreads) { |
| pthread_mutex_t mtx_submq = PTHREAD_MUTEX_INITIALIZER; |
| parseJob_t *job = NULL; |
| int rc = 0, i, actthr_cp = 0; |
| |
| while( shutdown == 0 ) { |
| // Check status if the worker threads |
| // If we don't have any worker threads, shut down immediately |
| writelog(dbc->log, LOG_DEBUG, "Active worker threads: %i", *activethreads); |
| if( *activethreads < 1 ) { |
| writelog(dbc->log, LOG_EMERG, |
| "All worker threads ceased to exist. Shutting down!"); |
| shutdown = 1; |
| rc = 1; |
| goto exit; |
| } |
| |
| if( db_ping(dbc) != 1 ) { |
| writelog(dbc->log, LOG_EMERG, "Lost connection to database. Shutting down!"); |
| shutdown = 1; |
| rc = 1; |
| goto exit; |
| } |
| |
| // Fetch an available job |
| job = db_get_submissionqueue_job(dbc, &mtx_submq); |
| if( !job ) { |
| writelog(dbc->log, LOG_EMERG, |
| "Failed to get submission queue job. Shutting down!"); |
| shutdown = 1; |
| rc = 1; |
| goto exit; |
| } |
| if( job->status == jbNONE ) { |
| free_nullsafe(job); |
| if( db_wait_notification(dbc, &shutdown, "rteval_submq") < 1 ) { |
| writelog(dbc->log, LOG_EMERG, |
| "Failed to wait for DB notification. Shutting down!"); |
| shutdown = 1; |
| rc = 1; |
| goto exit; |
| } |
| continue; |
| } |
| |
| // Send the job to the queue |
| writelog(dbc->log, LOG_DEBUG, "** New job queued: submid %i, %s", job->submid, job->filename); |
| do { |
| int res; |
| |
| errno = 0; |
| res = mq_send(msgq, (char *) job, sizeof(parseJob_t), 1); |
| if( (res < 0) && (errno != EAGAIN) ) { |
| writelog(dbc->log, LOG_EMERG, |
| "Could not send parse job to the queue. " |
| "Shutting down!"); |
| shutdown = 1; |
| rc = 2; |
| goto exit; |
| } else if( errno == EAGAIN ) { |
| writelog(dbc->log, LOG_WARNING, |
| "Message queue filled up. " |
| "Will not add new messages to queue for the next 60 seconds"); |
| sleep(60); |
| } |
| } while( (errno == EAGAIN) ); |
| free_nullsafe(job); |
| } |
| |
| exit: |
| // Send empty messages to the threads, to make them have a look at the shutdown flag |
| job = (parseJob_t *) malloc_nullsafe(dbc->log, sizeof(parseJob_t)); |
| errno = 0; |
| // Need to make a copy, as *activethreads will change when threads completes shutdown |
| actthr_cp = *activethreads; |
| for( i = 0; i < actthr_cp; i++ ) { |
| do { |
| int res; |
| |
| writelog(dbc->log, LOG_DEBUG, "%s shutdown message %i of %i", |
| (errno == EAGAIN ? "Resending" : "Sending"), i+1, *activethreads); |
| errno = 0; |
| res = mq_send(msgq, (char *) job, sizeof(parseJob_t), 1); |
| if( (res < 0) && (errno != EAGAIN) ) { |
| writelog(dbc->log, LOG_EMERG, |
| "Could not send shutdown notification to the queue."); |
| free_nullsafe(job); |
| return rc; |
| } else if( errno == EAGAIN ) { |
| writelog(dbc->log, LOG_WARNING, |
| "Message queue filled up. " |
| "Will not add new messages to queue for the next 10 seconds"); |
| sleep(10); |
| } |
| } while( (errno == EAGAIN) ); |
| } |
| free_nullsafe(job); |
| return rc; |
| } |
| |
| |
| /** |
| * Prepares the program to be daemonised |
| * |
| * @param log Initialised log context, where log info of the process is reported |
| * |
| * @return Returns 1 on success, otherwise -1 |
| */ |
| int daemonise(LogContext *log) { |
| pid_t pid, sid; |
| int i = 0; |
| |
| if( (log->logtype == ltCONSOLE) ) { |
| writelog(log, LOG_EMERG, |
| "Cannot daemonise when logging to a console (stdout: or stderr:)"); |
| return -1; |
| } |
| |
| pid = fork(); |
| if (pid < 0) { |
| writelog(log, LOG_EMERG, "Failed to daemonise the process (fork)"); |
| return -1; |
| } else if (pid > 0) { |
| writelog(log, LOG_INFO, "Daemon pid: %ld", pid); |
| exit(EXIT_SUCCESS); |
| } |
| |
| umask(0); |
| |
| sid = setsid(); |
| if (sid < 0) { |
| writelog(log, LOG_EMERG, "Failed to daemonise the process (setsid)"); |
| return -1; |
| } |
| |
| if ((chdir("/")) < 0) { |
| writelog(log, LOG_EMERG, "Failed to daemonise the process (fork)"); |
| return -1; |
| } |
| |
| // Prepare stdin, stdout and stderr for daemon mode |
| close(2); |
| close(1); |
| close(0); |
| i = open("/dev/null", O_RDWR); /* open stdin */ |
| dup(i); /* stdout */ |
| dup(i); /* stderr */ |
| |
| writelog(log, LOG_INFO, "Daemonised successfully"); |
| return 1; |
| } |
| |
| |
| /** |
| * rtevald_parser main function. |
| * |
| * @param argc |
| * @param argv |
| * |
| * @return Returns the result of the process_submission_queue() function. |
| */ |
| int main(int argc, char **argv) { |
| eurephiaVALUES *config = NULL, *prgargs = NULL; |
| char xsltfile[2050], *reportdir = NULL; |
| xsltStylesheet *xslt = NULL; |
| dbconn *dbc = NULL; |
| pthread_t **threads = NULL; |
| pthread_attr_t **thread_attrs = NULL; |
| pthread_mutex_t mtx_sysreg = PTHREAD_MUTEX_INITIALIZER; |
| pthread_mutex_t mtx_thrcnt = PTHREAD_MUTEX_INITIALIZER; |
| threadData_t **thrdata = NULL; |
| struct mq_attr msgq_attr; |
| mqd_t msgq = 0; |
| int i,rc, mq_init = 0, max_threads = 0, started_threads = 0, activethreads = 0; |
| unsigned int max_report_size = 0; |
| |
| // Initialise XML and XSLT libraries |
| xsltInit(); |
| xmlInitParser(); |
| |
| prgargs = parse_arguments(argc, argv); |
| if( prgargs == NULL ) { |
| fprintf(stderr, "** ERROR ** Failed to parse program arguments\n"); |
| rc = 2; |
| goto exit; |
| } |
| |
| // Setup a log context |
| logctx = init_log(eGet_value(prgargs, "log"), eGet_value(prgargs, "loglevel")); |
| if( !logctx ) { |
| fprintf(stderr, "** ERROR ** Could not setup a log context\n"); |
| eFree_values(prgargs); |
| rc = 2; |
| goto exit; |
| } |
| |
| // Fetch configuration |
| config = read_config(logctx, prgargs, "xmlrpc_parser"); |
| eFree_values(prgargs); // read_config() copies prgargs into config, we don't need prgargs anymore |
| |
| // Daemonise process if requested |
| if( atoi_nullsafe(eGet_value(config, "daemon")) == 1 ) { |
| if( daemonise(logctx) < 1 ) { |
| rc = 3; |
| goto exit; |
| } |
| } |
| |
| |
| // Parse XSLT template |
| snprintf(xsltfile, 512, "%s/%s", eGet_value(config, "xsltpath"), XMLPARSER_XSL); |
| writelog(logctx, LOG_DEBUG, "Parsing XSLT file: %s", xsltfile); |
| xslt = xsltParseStylesheetFile((xmlChar *) xsltfile); |
| if( !xslt ) { |
| writelog(logctx, LOG_EMERG, "Could not parse XSLT template: %s", xsltfile); |
| rc = 2; |
| goto exit; |
| } |
| |
| // Open a POSIX MQ |
| writelog(logctx, LOG_DEBUG, "Preparing POSIX MQ queue: /rteval_parsequeue"); |
| memset(&msgq, 0, sizeof(mqd_t)); |
| msgq_attr.mq_maxmsg = get_mqueue_msg_max(logctx); |
| msgq_attr.mq_msgsize = sizeof(parseJob_t); |
| msgq_attr.mq_flags = O_NONBLOCK; |
| msgq = mq_open("/rteval_parsequeue", O_RDWR | O_CREAT, 0600, &msgq_attr); |
| if( msgq < 0 ) { |
| writelog(logctx, LOG_EMERG, |
| "Could not open message queue: %s", strerror(errno)); |
| rc = 2; |
| goto exit; |
| } |
| mq_init = 1; |
| |
| // Get the number of worker threads |
| max_threads = atoi_nullsafe(eGet_value(config, "threads")); |
| if( max_threads == 0 ) { |
| max_threads = 4; |
| } |
| |
| // Get a database connection for the main thread |
| dbc = db_connect(config, max_threads, logctx); |
| if( !dbc ) { |
| rc = 4; |
| goto exit; |
| } |
| |
| // Prepare all threads |
| threads = calloc(max_threads + 1, sizeof(pthread_t *)); |
| thread_attrs = calloc(max_threads + 1, sizeof(pthread_attr_t *)); |
| thrdata = calloc(max_threads + 1, sizeof(threadData_t *)); |
| assert( (threads != NULL) && (thread_attrs != NULL) && (thrdata != NULL) ); |
| |
| reportdir = eGet_value(config, "reportdir"); |
| writelog(logctx, LOG_INFO, "Starting %i worker threads", max_threads); |
| max_report_size = defaultIntValue(atoi_nullsafe(eGet_value(config, "max_report_size")), 1024*1024); |
| for( i = 0; i < max_threads; i++ ) { |
| // Prepare thread specific data |
| thrdata[i] = malloc_nullsafe(logctx, sizeof(threadData_t)); |
| if( !thrdata[i] ) { |
| writelog(logctx, LOG_EMERG, |
| "Could not allocate memory for thread data"); |
| rc = 2; |
| goto exit; |
| } |
| |
| // Get a database connection for the thread |
| thrdata[i]->dbc = db_connect(config, i, logctx); |
| if( !thrdata[i]->dbc ) { |
| writelog(logctx, LOG_EMERG, |
| "Could not connect to the database for thread %i", i); |
| rc = 2; |
| shutdown = 1; |
| goto exit; |
| } |
| |
| // Parse the measurement_tables config variable, split it up into an array |
| thrdata[i]->dbc->measurement_tbls = strSplit(eGet_value(config, "measurement_tables"), ", "); |
| if( !thrdata[i]->dbc->measurement_tbls ) { |
| writelog(dbc->log, LOG_CRIT, "Failed to parse measurement_tables configuration"); |
| rc = 2; |
| shutdown = 1; |
| goto exit; |
| } |
| |
| thrdata[i]->shutdown = &shutdown; |
| thrdata[i]->threadcount = &activethreads; |
| thrdata[i]->mtx_thrcnt = &mtx_thrcnt; |
| thrdata[i]->id = i; |
| thrdata[i]->msgq = msgq; |
| thrdata[i]->mtx_sysreg = &mtx_sysreg; |
| thrdata[i]->xslt = xslt; |
| thrdata[i]->destdir = reportdir; |
| thrdata[i]->max_report_size = max_report_size; |
| |
| thread_attrs[i] = malloc_nullsafe(logctx, sizeof(pthread_attr_t)); |
| if( !thread_attrs[i] ) { |
| writelog(logctx, LOG_EMERG, |
| "Could not allocate memory for thread attributes"); |
| rc = 2; |
| goto exit; |
| } |
| pthread_attr_init(thread_attrs[i]); |
| pthread_attr_setdetachstate(thread_attrs[i], PTHREAD_CREATE_JOINABLE); |
| |
| threads[i] = malloc_nullsafe(logctx, sizeof(pthread_t)); |
| if( !threads[i] ) { |
| writelog(logctx, LOG_EMERG, |
| "Could not allocate memory for pthread_t"); |
| rc = 2; |
| goto exit; |
| } |
| } |
| |
| // Setup signal catching |
| signal(SIGINT, sigcatch); |
| signal(SIGTERM, sigcatch); |
| signal(SIGHUP, SIG_IGN); |
| signal(SIGUSR1, sigcatch); |
| signal(SIGUSR2, SIG_IGN); |
| |
| // Start the threads |
| for( i = 0; i < max_threads; i++ ) { |
| int thr_rc = pthread_create(threads[i], thread_attrs[i], parsethread, thrdata[i]); |
| if( thr_rc < 0 ) { |
| writelog(logctx, LOG_EMERG, |
| "** ERROR ** Failed to start thread %i: %s", |
| i, strerror(thr_rc)); |
| rc = 3; |
| goto exit; |
| } |
| started_threads++; |
| } |
| |
| // Main routine |
| // |
| // checks the submission queue and puts unprocessed records on the POSIX MQ |
| // to be parsed by one of the threads |
| // |
| sleep(3); // Allow at least a few parser threads to settle down first before really starting |
| writelog(logctx, LOG_DEBUG, "Starting submission queue checker"); |
| rc = process_submission_queue(dbc, msgq, &activethreads); |
| writelog(logctx, LOG_DEBUG, "Submission queue checker shut down"); |
| |
| exit: |
| // Clean up all threads |
| for( i = 0; i < max_threads; i++ ) { |
| // Wait for all threads to exit |
| if( (i < started_threads) && threads && threads[i] ) { |
| void *thread_rc; |
| int j_rc; |
| |
| if( (j_rc = pthread_join(*threads[i], &thread_rc)) != 0 ) { |
| writelog(logctx, LOG_CRIT, |
| "Failed to join thread %i: %s", |
| i, strerror(j_rc)); |
| } |
| pthread_attr_destroy(thread_attrs[i]); |
| } |
| if( threads ) { |
| free_nullsafe(threads[i]); |
| } |
| if( thread_attrs ) { |
| free_nullsafe(thread_attrs[i]); |
| } |
| |
| // Disconnect threads database connection |
| if( thrdata && thrdata[i] ) { |
| strFree(thrdata[i]->dbc->measurement_tbls); |
| db_disconnect(thrdata[i]->dbc); |
| free_nullsafe(thrdata[i]); |
| } |
| } |
| free_nullsafe(thrdata); |
| free_nullsafe(threads); |
| free_nullsafe(thread_attrs); |
| |
| // Close message queue |
| if( mq_init == 1 ) { |
| errno = 0; |
| if( mq_close(msgq) < 0 ) { |
| writelog(logctx, LOG_CRIT, "Failed to close message queue: %s", |
| strerror(errno)); |
| } |
| errno = 0; |
| if( mq_unlink("/rteval_parsequeue") < 0 ) { |
| writelog(logctx, LOG_ALERT, "Failed to remove the message queue: %s", |
| strerror(errno)); |
| } |
| } |
| |
| // Disconnect from database, main thread connection |
| db_disconnect(dbc); |
| |
| // Free up the rest |
| eFree_values(config); |
| xsltFreeStylesheet(xslt); |
| xmlCleanupParser(); |
| xsltCleanupGlobals(); |
| |
| writelog(logctx, LOG_EMERG, "rteval-parserd is stopped"); |
| close_log(logctx); |
| return rc; |
| } |
| |