blob: c39e38ddd3f0d7dd3fcbc1bc60c7ef3640753573 [file] [log] [blame]
/*
* teamd_zmq.c - Teamd ZeroMQ socket api
* Copyright (C) 2013 Jiri Zupka <jzupka@redhat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "config.h"
#ifdef ENABLE_ZMQ
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <errno.h>
#include <ctype.h>
#include <private/misc.h>
#include <private/list.h>
#include <team.h>
#include "teamd.h"
#include "teamd_zmq.h"
#include "teamd_zmq_common.h"
#include "teamd_ctl.h"
#include "teamd_config.h"
struct zmq_ops_priv {
char *rcv_msg_args;
void *sock;
};
struct zmq_acc_conn {
struct list_item list;
int sock;
};
static int zmq_op_get_args(void *ops_priv, const char *fmt, ...)
{
va_list ap;
struct zmq_ops_priv *zmq_ops_priv = ops_priv;
char **pstr;
char *str;
char *rest = zmq_ops_priv->rcv_msg_args;
int err = 0;
va_start(ap, fmt);
while (*fmt) {
switch (*fmt++) {
case 's': /* string */
pstr = va_arg(ap, char **);
str = teamd_zmq_msg_getline(&rest);
if (!str) {
teamd_log_err("Insufficient number of arguments in message.");
err = -EINVAL;
goto out;
}
*pstr = str;
break;
default:
teamd_log_err("Unknown argument type requested");
err = -EINVAL;
goto out;
}
}
out:
va_end(ap);
return err;
}
static void zmq_custom_send(struct zmq_ops_priv *zmq_ops_priv,
char *buf, size_t buflen)
{
int ret;
ret = zmq_send(zmq_ops_priv->sock, buf, buflen, 0);
if (ret == -1)
teamd_log_warn("zmq: send failed: %s", strerror(errno));
}
static int zmq_op_reply_err(void *ops_priv, const char *err_code,
const char *err_msg)
{
struct zmq_ops_priv *zmq_ops_priv = ops_priv;
char *strbuf;
int err;
err = asprintf(&strbuf, "%s\n%s\n%s\n", TEAMD_ZMQ_REPLY_ERR_PREFIX,
err_code, err_msg);
if (err == -1)
return -ENOMEM;
zmq_custom_send(zmq_ops_priv, strbuf, strlen(strbuf));
free(strbuf);
return 0;
}
static int zmq_op_reply_succ(void *ops_priv, const char *msg)
{
struct zmq_ops_priv *zmq_ops_priv = ops_priv;
char *strbuf;
int err;
err = asprintf(&strbuf, "%s\n%s", TEAMD_ZMQ_REPLY_SUCC_PREFIX,
msg ? msg : "");
if (err == -1)
return -ENOMEM;
zmq_custom_send(zmq_ops_priv, strbuf, strlen(strbuf));
free(strbuf);
return 0;
}
static const struct teamd_ctl_method_ops teamd_zmq_ctl_method_ops = {
.get_args = zmq_op_get_args,
.reply_err = zmq_op_reply_err,
.reply_succ = zmq_op_reply_succ,
};
static int process_rcv_msg(struct teamd_context *ctx, char *rcv_msg)
{
struct zmq_ops_priv zmq_ops_priv;
char *str;
char *rest = rcv_msg;
str = teamd_zmq_msg_getline(&rest);
if (!str) {
teamd_log_dbg("zmq: Incomplete message.");
return 0;
}
if (strcmp(TEAMD_ZMQ_REQUEST_PREFIX, str)) {
teamd_log_dbg("zmq: Unsupported message type.");
return 0;
}
str = teamd_zmq_msg_getline(&rest);
if (!str) {
teamd_log_dbg("zmq: Incomplete message.");
return 0;
}
if (!teamd_ctl_method_exists(str)) {
teamd_log_dbg("zmq: Unknown method \"%s\".", str);
return 0;
}
zmq_ops_priv.sock = ctx->zmq.sock;
zmq_ops_priv.rcv_msg_args = rest;
teamd_log_dbg("zmq: calling method \"%s\"", str);
return teamd_ctl_method_call(ctx, str, &teamd_zmq_ctl_method_ops,
&zmq_ops_priv);
}
static int callback_zmq(struct teamd_context *ctx, int events, void *priv)
{
int err = 0;
int poolmask;
size_t poolmask_size = sizeof(poolmask);
err = zmq_getsockopt(ctx->zmq.sock, ZMQ_EVENTS, &poolmask,
&poolmask_size);
if (err == -1)
return -errno;
while (poolmask & ZMQ_POLLIN) {
zmq_msg_t msg;
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, ctx->zmq.sock, 0) == -1) {
zmq_msg_close(&msg);
return -errno;
}
err = process_rcv_msg(ctx, zmq_msg_data(&msg));
zmq_msg_close(&msg);
if (err == -1)
break;
err = zmq_getsockopt(ctx->zmq.sock, ZMQ_EVENTS, &poolmask,
&poolmask_size);
if (err == -1)
return -errno;
}
return err;
}
#define ZMQ_MAX_CLIENT_COUNT 10
static int teamd_zmq_sock_open(struct teamd_context *ctx)
{
int err;
void *context, *sock;
int rc;
const char *addr;
context = zmq_ctx_new();
if (!context) {
teamd_log_err("zmq: Failed to create context.");
return -errno;
}
sock = zmq_socket(context, ZMQ_REP);
if (!sock) {
teamd_log_err("zmq: Failed to create socket.");
return -errno;
}
if (ctx->zmq.addr) {
addr = ctx->zmq.addr;
} else {
err = teamd_config_string_get(ctx, &addr, "$.runner.addr");
if (err) {
teamd_log_err("zmq: Failed to get address from config.");
return err;
}
}
rc = zmq_bind(sock, addr);
if (rc != 0) {
teamd_log_err("zmq: Failed to bind socket.");
err = -errno;
goto close_sock;
}
ctx->zmq.context = context;
ctx->zmq.sock = sock;
return 0;
close_sock:
zmq_close(sock);
zmq_ctx_destroy(context);
return err;
}
static void teamd_zmq_sock_close(struct teamd_context *ctx)
{
zmq_close(ctx->zmq.sock);
zmq_ctx_destroy(ctx->zmq.context);
}
#define ZMQ_CB_NAME "zmq"
int teamd_zmq_init(struct teamd_context *ctx)
{
int err;
int fd;
size_t fd_size;
if (!ctx->zmq.enabled)
return 0;
err = teamd_zmq_sock_open(ctx);
if (err)
return err;
fd_size = sizeof(fd);
zmq_getsockopt(ctx->zmq.sock, ZMQ_FD, &fd, &fd_size);
err = teamd_loop_callback_fd_add(ctx, ZMQ_CB_NAME, ctx, callback_zmq,
fd, TEAMD_LOOP_FD_EVENT_READ);
if (err)
goto sock_close;
teamd_loop_callback_enable(ctx, ZMQ_CB_NAME, ctx);
return 0;
sock_close:
teamd_zmq_sock_close(ctx);
return err;
}
void teamd_zmq_fini(struct teamd_context *ctx)
{
if (!ctx->zmq.enabled)
return;
teamd_loop_callback_del(ctx, ZMQ_CB_NAME, ctx);
teamd_zmq_sock_close(ctx);
}
#endif /* ENABLE_ZMQ */