| /* |
| * teamd_zmq.c - Teamd ZeroMQ socket api |
| * Copyright (C) 2013 Jiri Zupka <jzupka@redhat.com> |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Lesser General Public |
| * License as published by the Free Software Foundation; either |
| * version 2.1 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Lesser General Public License for more details. |
| * |
| * You should have received a copy of the GNU Lesser General Public |
| * License along with this library; if not, write to the Free Software |
| * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| */ |
| |
| #include "config.h" |
| |
| #ifdef ENABLE_ZMQ |
| |
| #include <stdbool.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <unistd.h> |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <sys/un.h> |
| #include <errno.h> |
| #include <ctype.h> |
| #include <private/misc.h> |
| #include <private/list.h> |
| #include <team.h> |
| |
| #include "teamd.h" |
| #include "teamd_zmq.h" |
| #include "teamd_zmq_common.h" |
| #include "teamd_ctl.h" |
| #include "teamd_config.h" |
| |
| struct zmq_ops_priv { |
| char *rcv_msg_args; |
| void *sock; |
| }; |
| |
| struct zmq_acc_conn { |
| struct list_item list; |
| int sock; |
| }; |
| |
| static int zmq_op_get_args(void *ops_priv, const char *fmt, ...) |
| { |
| va_list ap; |
| struct zmq_ops_priv *zmq_ops_priv = ops_priv; |
| char **pstr; |
| char *str; |
| char *rest = zmq_ops_priv->rcv_msg_args; |
| int err = 0; |
| |
| va_start(ap, fmt); |
| while (*fmt) { |
| switch (*fmt++) { |
| case 's': /* string */ |
| pstr = va_arg(ap, char **); |
| str = teamd_zmq_msg_getline(&rest); |
| if (!str) { |
| teamd_log_err("Insufficient number of arguments in message."); |
| err = -EINVAL; |
| goto out; |
| } |
| *pstr = str; |
| break; |
| default: |
| teamd_log_err("Unknown argument type requested"); |
| err = -EINVAL; |
| goto out; |
| } |
| } |
| out: |
| va_end(ap); |
| return err; |
| } |
| |
| static void zmq_custom_send(struct zmq_ops_priv *zmq_ops_priv, |
| char *buf, size_t buflen) |
| { |
| int ret; |
| |
| ret = zmq_send(zmq_ops_priv->sock, buf, buflen, 0); |
| if (ret == -1) |
| teamd_log_warn("zmq: send failed: %s", strerror(errno)); |
| } |
| |
| static int zmq_op_reply_err(void *ops_priv, const char *err_code, |
| const char *err_msg) |
| { |
| struct zmq_ops_priv *zmq_ops_priv = ops_priv; |
| char *strbuf; |
| int err; |
| |
| err = asprintf(&strbuf, "%s\n%s\n%s\n", TEAMD_ZMQ_REPLY_ERR_PREFIX, |
| err_code, err_msg); |
| if (err == -1) |
| return -ENOMEM; |
| zmq_custom_send(zmq_ops_priv, strbuf, strlen(strbuf)); |
| free(strbuf); |
| return 0; |
| } |
| |
| static int zmq_op_reply_succ(void *ops_priv, const char *msg) |
| { |
| struct zmq_ops_priv *zmq_ops_priv = ops_priv; |
| char *strbuf; |
| int err; |
| |
| err = asprintf(&strbuf, "%s\n%s", TEAMD_ZMQ_REPLY_SUCC_PREFIX, |
| msg ? msg : ""); |
| if (err == -1) |
| return -ENOMEM; |
| zmq_custom_send(zmq_ops_priv, strbuf, strlen(strbuf)); |
| free(strbuf); |
| return 0; |
| } |
| |
| static const struct teamd_ctl_method_ops teamd_zmq_ctl_method_ops = { |
| .get_args = zmq_op_get_args, |
| .reply_err = zmq_op_reply_err, |
| .reply_succ = zmq_op_reply_succ, |
| }; |
| |
| static int process_rcv_msg(struct teamd_context *ctx, char *rcv_msg) |
| { |
| struct zmq_ops_priv zmq_ops_priv; |
| char *str; |
| char *rest = rcv_msg; |
| |
| str = teamd_zmq_msg_getline(&rest); |
| if (!str) { |
| teamd_log_dbg("zmq: Incomplete message."); |
| return 0; |
| } |
| if (strcmp(TEAMD_ZMQ_REQUEST_PREFIX, str)) { |
| teamd_log_dbg("zmq: Unsupported message type."); |
| return 0; |
| } |
| |
| str = teamd_zmq_msg_getline(&rest); |
| if (!str) { |
| teamd_log_dbg("zmq: Incomplete message."); |
| return 0; |
| } |
| if (!teamd_ctl_method_exists(str)) { |
| teamd_log_dbg("zmq: Unknown method \"%s\".", str); |
| return 0; |
| } |
| |
| zmq_ops_priv.sock = ctx->zmq.sock; |
| zmq_ops_priv.rcv_msg_args = rest; |
| |
| teamd_log_dbg("zmq: calling method \"%s\"", str); |
| |
| return teamd_ctl_method_call(ctx, str, &teamd_zmq_ctl_method_ops, |
| &zmq_ops_priv); |
| } |
| |
| static int callback_zmq(struct teamd_context *ctx, int events, void *priv) |
| { |
| int err = 0; |
| int poolmask; |
| size_t poolmask_size = sizeof(poolmask); |
| |
| err = zmq_getsockopt(ctx->zmq.sock, ZMQ_EVENTS, &poolmask, |
| &poolmask_size); |
| if (err == -1) |
| return -errno; |
| |
| while (poolmask & ZMQ_POLLIN) { |
| zmq_msg_t msg; |
| |
| zmq_msg_init(&msg); |
| if (zmq_msg_recv(&msg, ctx->zmq.sock, 0) == -1) { |
| zmq_msg_close(&msg); |
| return -errno; |
| } |
| |
| err = process_rcv_msg(ctx, zmq_msg_data(&msg)); |
| |
| zmq_msg_close(&msg); |
| |
| if (err == -1) |
| break; |
| |
| err = zmq_getsockopt(ctx->zmq.sock, ZMQ_EVENTS, &poolmask, |
| &poolmask_size); |
| if (err == -1) |
| return -errno; |
| } |
| return err; |
| } |
| |
| #define ZMQ_MAX_CLIENT_COUNT 10 |
| |
| static int teamd_zmq_sock_open(struct teamd_context *ctx) |
| { |
| int err; |
| void *context, *sock; |
| int rc; |
| const char *addr; |
| |
| context = zmq_ctx_new(); |
| if (!context) { |
| teamd_log_err("zmq: Failed to create context."); |
| return -errno; |
| } |
| |
| sock = zmq_socket(context, ZMQ_REP); |
| if (!sock) { |
| teamd_log_err("zmq: Failed to create socket."); |
| return -errno; |
| } |
| |
| if (ctx->zmq.addr) { |
| addr = ctx->zmq.addr; |
| } else { |
| err = teamd_config_string_get(ctx, &addr, "$.runner.addr"); |
| if (err) { |
| teamd_log_err("zmq: Failed to get address from config."); |
| return err; |
| } |
| } |
| |
| rc = zmq_bind(sock, addr); |
| if (rc != 0) { |
| teamd_log_err("zmq: Failed to bind socket."); |
| err = -errno; |
| goto close_sock; |
| } |
| |
| ctx->zmq.context = context; |
| ctx->zmq.sock = sock; |
| return 0; |
| |
| close_sock: |
| zmq_close(sock); |
| zmq_ctx_destroy(context); |
| return err; |
| } |
| |
| static void teamd_zmq_sock_close(struct teamd_context *ctx) |
| { |
| zmq_close(ctx->zmq.sock); |
| zmq_ctx_destroy(ctx->zmq.context); |
| } |
| |
| #define ZMQ_CB_NAME "zmq" |
| |
| int teamd_zmq_init(struct teamd_context *ctx) |
| { |
| int err; |
| int fd; |
| size_t fd_size; |
| |
| if (!ctx->zmq.enabled) |
| return 0; |
| err = teamd_zmq_sock_open(ctx); |
| if (err) |
| return err; |
| |
| |
| fd_size = sizeof(fd); |
| zmq_getsockopt(ctx->zmq.sock, ZMQ_FD, &fd, &fd_size); |
| |
| err = teamd_loop_callback_fd_add(ctx, ZMQ_CB_NAME, ctx, callback_zmq, |
| fd, TEAMD_LOOP_FD_EVENT_READ); |
| if (err) |
| goto sock_close; |
| teamd_loop_callback_enable(ctx, ZMQ_CB_NAME, ctx); |
| return 0; |
| sock_close: |
| teamd_zmq_sock_close(ctx); |
| return err; |
| } |
| |
| void teamd_zmq_fini(struct teamd_context *ctx) |
| { |
| if (!ctx->zmq.enabled) |
| return; |
| teamd_loop_callback_del(ctx, ZMQ_CB_NAME, ctx); |
| teamd_zmq_sock_close(ctx); |
| } |
| |
| #endif /* ENABLE_ZMQ */ |