blob: 17ff9d8c160cf0b46e319c94bdac808abfc1e50c [file] [log] [blame]
/*
* Copyright (C) 2009, 2010 Red Hat Inc, Steven Rostedt <srostedt@redhat.com>
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; version 2 of the License (not later!)
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, see <http://www.gnu.org/licenses>
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
#define _LARGEFILE64_SOURCE
#include <dirent.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <getopt.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>
#include "trace-local.h"
#include "trace-msg.h"
#define MAX_OPTION_SIZE 4096
#define _VAR_DIR_Q(dir) #dir
#define VAR_DIR_Q(dir) _VAR_DIR_Q(dir)
#define VAR_RUN_DIR VAR_DIR_Q(VAR_DIR) "/run"
static char *default_output_dir = ".";
static char *output_dir;
static char *default_output_file = "trace";
static char *output_file;
static FILE *logfp;
static int backlog = 5;
static int proto_ver;
static int do_daemon;
#define TEMP_FILE_STR "%s.%s:%s.cpu%d", output_file, host, port, cpu
static char *get_temp_file(const char *host, const char *port, int cpu)
{
char *file = NULL;
int size;
size = snprintf(file, 0, TEMP_FILE_STR);
file = malloc(size + 1);
if (!file)
return NULL;
sprintf(file, TEMP_FILE_STR);
return file;
}
static void put_temp_file(char *file)
{
free(file);
}
#define MAX_PATH 1024
static void signal_setup(int sig, sighandler_t handle)
{
struct sigaction action;
sigaction(sig, NULL, &action);
/* Make accept return EINTR */
action.sa_flags &= ~SA_RESTART;
action.sa_handler = handle;
sigaction(sig, &action, NULL);
}
static void delete_temp_file(const char *host, const char *port, int cpu)
{
char file[MAX_PATH];
snprintf(file, MAX_PATH, TEMP_FILE_STR);
unlink(file);
}
static int read_string(int fd, char *buf, size_t size)
{
size_t i;
int n;
for (i = 0; i < size; i++) {
n = read(fd, buf+i, 1);
if (!buf[i] || n <= 0)
break;
}
return i;
}
static int process_option(char *option)
{
/* currently the only option we have is to us TCP */
if (strcmp(option, "TCP") == 0) {
use_tcp = 1;
return 1;
}
return 0;
}
static void finish(int sig)
{
done = true;
}
#define LOG_BUF_SIZE 1024
static void __plog(const char *prefix, const char *fmt, va_list ap,
FILE *fp)
{
static int newline = 1;
char buf[LOG_BUF_SIZE];
int r;
r = vsnprintf(buf, LOG_BUF_SIZE, fmt, ap);
if (r > LOG_BUF_SIZE)
r = LOG_BUF_SIZE;
if (logfp) {
if (newline)
fprintf(logfp, "[%d]%s%.*s", getpid(), prefix, r, buf);
else
fprintf(logfp, "[%d]%s%.*s", getpid(), prefix, r, buf);
newline = buf[r - 1] == '\n';
fflush(logfp);
return;
}
fprintf(fp, "%.*s", r, buf);
}
void plog(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
__plog("", fmt, ap, stdout);
va_end(ap);
/* Make sure it gets to the screen, in case we crash afterward */
fflush(stdout);
}
static void make_pid_name(int mode, char *buf)
{
snprintf(buf, PATH_MAX, VAR_RUN_DIR "/trace-cmd-net.pid");
}
static void remove_pid_file(void)
{
char buf[PATH_MAX];
int mode = do_daemon;
if (!do_daemon)
return;
make_pid_name(mode, buf);
unlink(buf);
}
void pdie(const char *fmt, ...)
{
va_list ap;
char *str = "";
va_start(ap, fmt);
__plog("Error: ", fmt, ap, stderr);
va_end(ap);
if (errno)
str = strerror(errno);
if (logfp)
fprintf(logfp, "\n%s\n", str);
else
fprintf(stderr, "\n%s\n", str);
remove_pid_file();
exit(-1);
}
static int process_udp_child(int sfd, const char *host, const char *port,
int cpu, int page_size)
{
struct sockaddr_storage peer_addr;
socklen_t peer_addr_len;
char buf[page_size];
char *tempfile;
int cfd;
int fd;
int n;
int once = 0;
signal_setup(SIGUSR1, finish);
tempfile = get_temp_file(host, port, cpu);
if (!tempfile)
return -ENOMEM;
fd = open(tempfile, O_WRONLY | O_TRUNC | O_CREAT, 0644);
if (fd < 0)
pdie("creating %s", tempfile);
if (use_tcp) {
if (listen(sfd, backlog) < 0)
pdie("listen");
peer_addr_len = sizeof(peer_addr);
cfd = accept(sfd, (struct sockaddr *)&peer_addr, &peer_addr_len);
if (cfd < 0 && errno == EINTR)
goto done;
if (cfd < 0)
pdie("accept");
close(sfd);
sfd = cfd;
}
do {
/* TODO, make this copyless! */
n = read(sfd, buf, page_size);
if (n < 0) {
if (errno == EINTR)
continue;
pdie("reading client");
}
if (!n)
break;
/* UDP requires that we get the full size in one go */
if (!use_tcp && n < page_size && !once) {
once = 1;
warning("read %d bytes, expected %d", n, page_size);
}
write(fd, buf, n);
} while (!done);
done:
put_temp_file(tempfile);
exit(0);
}
#define START_PORT_SEARCH 1500
#define MAX_PORT_SEARCH 6000
static int udp_bind_a_port(int start_port, int *sfd)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
char buf[BUFSIZ];
int s;
int num_port = start_port;
again:
snprintf(buf, BUFSIZ, "%d", num_port);
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = use_tcp ? SOCK_STREAM : SOCK_DGRAM;
hints.ai_flags = AI_PASSIVE;
s = getaddrinfo(NULL, buf, &hints, &result);
if (s != 0)
pdie("getaddrinfo: error opening udp socket");
for (rp = result; rp != NULL; rp = rp->ai_next) {
*sfd = socket(rp->ai_family, rp->ai_socktype,
rp->ai_protocol);
if (*sfd < 0)
continue;
if (bind(*sfd, rp->ai_addr, rp->ai_addrlen) == 0)
break;
close(*sfd);
}
if (rp == NULL) {
freeaddrinfo(result);
if (++num_port > MAX_PORT_SEARCH)
pdie("No available ports to bind");
goto again;
}
freeaddrinfo(result);
return num_port;
}
static void fork_udp_reader(int sfd, const char *node, const char *port,
int *pid, int cpu, int pagesize)
{
int ret;
*pid = fork();
if (*pid < 0)
pdie("creating udp reader");
if (!*pid) {
ret = process_udp_child(sfd, node, port, cpu, pagesize);
if (ret < 0)
pdie("Problem with udp reader %d", ret);
}
close(sfd);
}
static int open_udp(const char *node, const char *port, int *pid,
int cpu, int pagesize, int start_port)
{
int sfd;
int num_port;
/*
* udp_bind_a_port() currently does not return an error, but if that
* changes in the future, we have a check for it now.
*/
num_port = udp_bind_a_port(start_port, &sfd);
if (num_port < 0)
return num_port;
fork_udp_reader(sfd, node, port, pid, cpu, pagesize);
return num_port;
}
static int communicate_with_client(int fd, int *cpus, int *pagesize)
{
char *last_proto = NULL;
char buf[BUFSIZ];
char *option;
int options;
int size;
int n, s, t, i;
int ret = -EINVAL;
/* Let the client know what we are */
write(fd, "tracecmd", 8);
try_again:
/* read back the CPU count */
n = read_string(fd, buf, BUFSIZ);
if (n == BUFSIZ)
/** ERROR **/
return -EINVAL;
*cpus = atoi(buf);
/* Is the client using the new protocol? */
if (*cpus == -1) {
if (memcmp(buf, V2_CPU, n) != 0) {
/* If it did not send a version, then bail */
if (memcmp(buf, "-1V", 3)) {
plog("Unknown string %s\n", buf);
goto out;
}
/* Skip "-1" */
plog("Cannot handle the protocol %s\n", buf+2);
/* If it returned the same command as last time, bail! */
if (last_proto && strncmp(last_proto, buf, n) == 0) {
plog("Repeat of version %s sent\n", last_proto);
goto out;
}
free(last_proto);
last_proto = malloc(n + 1);
if (last_proto) {
memcpy(last_proto, buf, n);
last_proto[n] = 0;
}
/* Return the highest protocol we can use */
write(fd, "V2", 3);
goto try_again;
}
/* Let the client know we use v2 protocol */
write(fd, "V2", 3);
/* read the rest of dummy data */
n = read(fd, buf, sizeof(V2_MAGIC));
if (memcmp(buf, V2_MAGIC, n) != 0)
goto out;
/* We're off! */
write(fd, "OK", 2);
proto_ver = V2_PROTOCOL;
/* read the CPU count, the page size, and options */
if (tracecmd_msg_initial_setting(fd, cpus, pagesize) < 0)
goto out;
} else {
/* The client is using the v1 protocol */
plog("cpus=%d\n", *cpus);
if (*cpus < 0)
goto out;
/* next read the page size */
n = read_string(fd, buf, BUFSIZ);
if (n == BUFSIZ)
/** ERROR **/
goto out;
*pagesize = atoi(buf);
plog("pagesize=%d\n", *pagesize);
if (*pagesize <= 0)
goto out;
/* Now the number of options */
n = read_string(fd, buf, BUFSIZ);
if (n == BUFSIZ)
/** ERROR **/
return -EINVAL;
options = atoi(buf);
for (i = 0; i < options; i++) {
/* next is the size of the options */
n = read_string(fd, buf, BUFSIZ);
if (n == BUFSIZ)
/** ERROR **/
goto out;
size = atoi(buf);
/* prevent a client from killing us */
if (size > MAX_OPTION_SIZE)
goto out;
ret = -ENOMEM;
option = malloc(size);
if (!option)
goto out;
ret = -EIO;
do {
t = size;
s = 0;
s = read(fd, option+s, t);
if (s <= 0)
goto out;
t -= s;
s = size - t;
} while (t);
s = process_option(option);
free(option);
/* do we understand this option? */
ret = -EINVAL;
if (!s)
goto out;
}
}
if (use_tcp)
plog("Using TCP for live connection\n");
ret = 0;
out:
free(last_proto);
return ret;
}
static int create_client_file(const char *node, const char *port)
{
char buf[BUFSIZ];
int ofd;
snprintf(buf, BUFSIZ, "%s.%s:%s.dat", output_file, node, port);
ofd = open(buf, O_RDWR | O_CREAT | O_TRUNC, 0644);
if (ofd < 0)
pdie("Can not create file %s", buf);
return ofd;
}
static void destroy_all_readers(int cpus, int *pid_array, const char *node,
const char *port)
{
int cpu;
for (cpu = 0; cpu < cpus; cpu++) {
if (pid_array[cpu] > 0) {
kill(pid_array[cpu], SIGKILL);
waitpid(pid_array[cpu], NULL, 0);
delete_temp_file(node, port, cpu);
pid_array[cpu] = 0;
}
}
free(pid_array);
}
static int *create_all_readers(int cpus, const char *node, const char *port,
int pagesize, int fd)
{
char buf[BUFSIZ];
int *port_array;
int *pid_array;
int start_port;
int udp_port;
int cpu;
int pid;
port_array = malloc(sizeof(int) * cpus);
if (!port_array)
return NULL;
pid_array = malloc(sizeof(int) * cpus);
if (!pid_array) {
free(port_array);
return NULL;
}
memset(pid_array, 0, sizeof(int) * cpus);
start_port = START_PORT_SEARCH;
/* Now create a UDP port for each CPU */
for (cpu = 0; cpu < cpus; cpu++) {
udp_port = open_udp(node, port, &pid, cpu,
pagesize, start_port);
if (udp_port < 0)
goto out_free;
port_array[cpu] = udp_port;
pid_array[cpu] = pid;
/*
* Due to some bugging finding ports,
* force search after last port
*/
start_port = udp_port + 1;
}
if (proto_ver == V2_PROTOCOL) {
/* send set of port numbers to the client */
if (tracecmd_msg_send_port_array(fd, cpus, port_array) < 0)
goto out_free;
} else {
/* send the client a comma deliminated set of port numbers */
for (cpu = 0; cpu < cpus; cpu++) {
snprintf(buf, BUFSIZ, "%s%d",
cpu ? "," : "", port_array[cpu]);
write(fd, buf, strlen(buf));
}
/* end with null terminator */
write(fd, "\0", 1);
}
return pid_array;
out_free:
destroy_all_readers(cpus, pid_array, node, port);
return NULL;
}
static void collect_metadata_from_client(int ifd, int ofd)
{
char buf[BUFSIZ];
int n, s, t;
do {
n = read(ifd, buf, BUFSIZ);
if (n < 0) {
if (errno == EINTR)
continue;
pdie("reading client");
}
t = n;
s = 0;
do {
s = write(ofd, buf+s, t);
if (s < 0) {
if (errno == EINTR)
break;
pdie("writing to file");
}
t -= s;
s = n - t;
} while (t);
} while (n > 0 && !done);
}
static void stop_all_readers(int cpus, int *pid_array)
{
int cpu;
for (cpu = 0; cpu < cpus; cpu++) {
if (pid_array[cpu] > 0)
kill(pid_array[cpu], SIGUSR1);
}
}
static int put_together_file(int cpus, int ofd, const char *node,
const char *port)
{
char **temp_files;
int cpu;
int ret = -ENOMEM;
/* Now put together the file */
temp_files = malloc(sizeof(*temp_files) * cpus);
if (!temp_files)
return -ENOMEM;
for (cpu = 0; cpu < cpus; cpu++) {
temp_files[cpu] = get_temp_file(node, port, cpu);
if (!temp_files[cpu])
goto out;
}
tracecmd_attach_cpu_data_fd(ofd, cpus, temp_files);
ret = 0;
out:
for (cpu--; cpu >= 0; cpu--) {
put_temp_file(temp_files[cpu]);
}
free(temp_files);
return ret;
}
static int process_client(const char *node, const char *port, int fd)
{
int *pid_array;
int pagesize;
int cpus;
int ofd;
int ret;
ret = communicate_with_client(fd, &cpus, &pagesize);
if (ret < 0)
return ret;
ofd = create_client_file(node, port);
pid_array = create_all_readers(cpus, node, port, pagesize, fd);
if (!pid_array)
return -ENOMEM;
/* Now we are ready to start reading data from the client */
if (proto_ver == V2_PROTOCOL)
tracecmd_msg_collect_metadata(fd, ofd);
else
collect_metadata_from_client(fd, ofd);
/* wait a little to let our readers finish reading */
sleep(1);
/* stop our readers */
stop_all_readers(cpus, pid_array);
/* wait a little to have the readers clean up */
sleep(1);
ret = put_together_file(cpus, ofd, node, port);
destroy_all_readers(cpus, pid_array, node, port);
return ret;
}
static int do_fork(int cfd)
{
pid_t pid;
/* in debug mode, we do not fork off children */
if (debug)
return 0;
pid = fork();
if (pid < 0) {
warning("failed to create child");
return -1;
}
if (pid > 0) {
close(cfd);
return pid;
}
signal_setup(SIGINT, finish);
return 0;
}
static int do_connection(int cfd, struct sockaddr_storage *peer_addr,
socklen_t peer_addr_len)
{
char host[NI_MAXHOST], service[NI_MAXSERV];
int s;
int ret;
ret = do_fork(cfd);
if (ret)
return ret;
s = getnameinfo((struct sockaddr *)peer_addr, peer_addr_len,
host, NI_MAXHOST,
service, NI_MAXSERV, NI_NUMERICSERV);
if (s == 0)
plog("Connected with %s:%s\n",
host, service);
else {
plog("Error with getnameinfo: %s\n",
gai_strerror(s));
close(cfd);
return -1;
}
process_client(host, service, cfd);
close(cfd);
if (!debug)
exit(0);
return 0;
}
static int *client_pids;
static int free_pids;
static int saved_pids;
static void add_process(int pid)
{
int *client = NULL;
int i;
if (free_pids) {
for (i = 0; i < saved_pids; i++) {
if (!client_pids[i]) {
client = &client_pids[i];
break;
}
}
free_pids--;
if (!client)
warning("Could not find free pid");
}
if (!client) {
client_pids = realloc(client_pids,
sizeof(*client_pids) * (saved_pids + 1));
if (!client_pids)
pdie("allocating pids");
client = &client_pids[saved_pids++];
}
*client = pid;
}
static void remove_process(int pid)
{
int i;
for (i = 0; i < saved_pids; i++) {
if (client_pids[i] == pid)
break;
}
if (i == saved_pids)
return;
client_pids[i] = 0;
free_pids++;
}
static void kill_clients(void)
{
int status;
int i;
for (i = 0; i < saved_pids; i++) {
if (!client_pids[i])
continue;
kill(client_pids[i], SIGINT);
waitpid(client_pids[i], &status, 0);
}
saved_pids = 0;
}
static void clean_up(void)
{
int status;
int ret;
/* Clean up any children that has started before */
do {
ret = waitpid(0, &status, WNOHANG);
if (ret > 0)
remove_process(ret);
} while (ret > 0);
}
static void do_accept_loop(int sfd)
{
struct sockaddr_storage peer_addr;
socklen_t peer_addr_len;
int cfd, pid;
peer_addr_len = sizeof(peer_addr);
do {
cfd = accept(sfd, (struct sockaddr *)&peer_addr,
&peer_addr_len);
if (cfd < 0 && errno == EINTR) {
clean_up();
continue;
}
if (cfd < 0)
pdie("connecting");
pid = do_connection(cfd, &peer_addr, peer_addr_len);
if (pid > 0)
add_process(pid);
} while (!done);
/* Get any final stragglers */
clean_up();
}
static void make_pid_file(void)
{
char buf[PATH_MAX];
int mode = do_daemon;
int fd;
if (!do_daemon)
return;
make_pid_name(mode, buf);
fd = open(buf, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0) {
perror(buf);
return;
}
sprintf(buf, "%d\n", getpid());
write(fd, buf, strlen(buf));
close(fd);
}
static void sigstub(int sig)
{
}
static void do_listen(char *port)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
int sfd, s;
if (!debug)
signal_setup(SIGCHLD, sigstub);
make_pid_file();
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
s = getaddrinfo(NULL, port, &hints, &result);
if (s != 0)
pdie("getaddrinfo: error opening %s", port);
for (rp = result; rp != NULL; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype,
rp->ai_protocol);
if (sfd < 0)
continue;
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0)
break;
close(sfd);
}
if (rp == NULL)
pdie("Could not bind");
freeaddrinfo(result);
if (listen(sfd, backlog) < 0)
pdie("listen");
do_accept_loop(sfd);
kill_clients();
remove_pid_file();
}
static void start_daemon(void)
{
do_daemon = 1;
if (daemon(1, 0) < 0)
die("starting daemon");
}
enum {
OPT_debug = 255,
};
void trace_listen(int argc, char **argv)
{
char *logfile = NULL;
char *port = NULL;
int daemon = 0;
int c;
if (argc < 2)
usage(argv);
if (strcmp(argv[1], "listen") != 0)
usage(argv);
for (;;) {
int option_index = 0;
static struct option long_options[] = {
{"port", required_argument, NULL, 'p'},
{"help", no_argument, NULL, '?'},
{"debug", no_argument, NULL, OPT_debug},
{NULL, 0, NULL, 0}
};
c = getopt_long (argc-1, argv+1, "+hp:o:d:l:D",
long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'h':
usage(argv);
break;
case 'p':
port = optarg;
break;
case 'd':
output_dir = optarg;
break;
case 'o':
output_file = optarg;
break;
case 'l':
logfile = optarg;
break;
case 'D':
daemon = 1;
break;
case OPT_debug:
debug = 1;
break;
default:
usage(argv);
}
}
if (!port)
usage(argv);
if ((argc - optind) >= 2)
usage(argv);
if (!output_file)
output_file = default_output_file;
if (!output_dir)
output_dir = default_output_dir;
if (logfile) {
/* set the writes to a logfile instead */
logfp = fopen(logfile, "w");
if (!logfp)
die("creating log file %s", logfile);
}
if (chdir(output_dir) < 0)
die("Can't access directory %s", output_dir);
if (daemon)
start_daemon();
signal_setup(SIGINT, finish);
signal_setup(SIGTERM, finish);
do_listen(port);
return;
}