| From 2317e7e4742c5c6292663873b30b4db306c84b58 Mon Sep 17 00:00:00 2001 |
| From: Sage Weil <sage@inktank.com> |
| Date: Fri, 20 Jul 2012 17:24:40 -0700 |
| Subject: libceph: replace connection state bits with states |
| |
| From: Sage Weil <sage@inktank.com> |
| |
| (cherry picked from commit 8dacc7da69a491c515851e68de6036f21b5663ce) |
| |
| Use a simple set of 6 enumerated values for the socket states (CON_STATE_*) |
| and use those instead of the state bits. All of the con->state checks are |
| now under the protection of the con mutex, so this is safe. It also |
| simplifies many of the state checks because we can check for anything other |
| than the expected state instead of various bits for races we can think of. |
| |
| This appears to hold up well to stress testing both with and without socket |
| failure injection on the server side. |
| |
| Signed-off-by: Sage Weil <sage@inktank.com> |
| Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org> |
| --- |
| include/linux/ceph/messenger.h | 12 --- |
| net/ceph/messenger.c | 130 +++++++++++++++++++++-------------------- |
| 2 files changed, 68 insertions(+), 74 deletions(-) |
| |
| --- a/include/linux/ceph/messenger.h |
| +++ b/include/linux/ceph/messenger.h |
| @@ -117,18 +117,6 @@ struct ceph_msg_pos { |
| #define BACKOFF 15 |
| |
| /* |
| - * ceph_connection states |
| - */ |
| -#define CONNECTING 1 |
| -#define NEGOTIATING 2 |
| -#define CONNECTED 5 |
| -#define STANDBY 8 /* no outgoing messages, socket closed. we keep |
| - * the ceph_connection around to maintain shared |
| - * state with the peer. */ |
| -#define CLOSED 10 /* we've closed the connection */ |
| -#define OPENING 13 /* open connection w/ (possibly new) peer */ |
| - |
| -/* |
| * A single connection with another host. |
| * |
| * We maintain a queue of outgoing messages, and some session state to |
| --- a/net/ceph/messenger.c |
| +++ b/net/ceph/messenger.c |
| @@ -77,6 +77,17 @@ |
| #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ |
| #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ |
| |
| +/* |
| + * connection states |
| + */ |
| +#define CON_STATE_CLOSED 1 /* -> PREOPEN */ |
| +#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */ |
| +#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */ |
| +#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */ |
| +#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */ |
| +#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */ |
| + |
| + |
| /* static tag bytes (protocol control messages) */ |
| static char tag_msg = CEPH_MSGR_TAG_MSG; |
| static char tag_ack = CEPH_MSGR_TAG_ACK; |
| @@ -503,11 +514,7 @@ void ceph_con_close(struct ceph_connecti |
| mutex_lock(&con->mutex); |
| dout("con_close %p peer %s\n", con, |
| ceph_pr_addr(&con->peer_addr.in_addr)); |
| - clear_bit(NEGOTIATING, &con->state); |
| - clear_bit(CONNECTING, &con->state); |
| - clear_bit(CONNECTED, &con->state); |
| - clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ |
| - set_bit(CLOSED, &con->state); |
| + con->state = CON_STATE_CLOSED; |
| |
| clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ |
| clear_bit(KEEPALIVE_PENDING, &con->flags); |
| @@ -530,8 +537,9 @@ void ceph_con_open(struct ceph_connectio |
| { |
| mutex_lock(&con->mutex); |
| dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); |
| - set_bit(OPENING, &con->state); |
| - WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); |
| + |
| + BUG_ON(con->state != CON_STATE_CLOSED); |
| + con->state = CON_STATE_PREOPEN; |
| |
| con->peer_name.type = (__u8) entity_type; |
| con->peer_name.num = cpu_to_le64(entity_num); |
| @@ -571,7 +579,7 @@ void ceph_con_init(struct ceph_connectio |
| INIT_LIST_HEAD(&con->out_sent); |
| INIT_DELAYED_WORK(&con->work, con_work); |
| |
| - set_bit(CLOSED, &con->state); |
| + con->state = CON_STATE_CLOSED; |
| } |
| EXPORT_SYMBOL(ceph_con_init); |
| |
| @@ -809,27 +817,21 @@ static struct ceph_auth_handshake *get_c |
| if (!con->ops->get_authorizer) { |
| con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; |
| con->out_connect.authorizer_len = 0; |
| - |
| return NULL; |
| } |
| |
| /* Can't hold the mutex while getting authorizer */ |
| - |
| mutex_unlock(&con->mutex); |
| - |
| auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); |
| - |
| mutex_lock(&con->mutex); |
| |
| if (IS_ERR(auth)) |
| return auth; |
| - if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags)) |
| + if (con->state != CON_STATE_NEGOTIATING) |
| return ERR_PTR(-EAGAIN); |
| |
| con->auth_reply_buf = auth->authorizer_reply_buf; |
| con->auth_reply_buf_len = auth->authorizer_reply_buf_len; |
| - |
| - |
| return auth; |
| } |
| |
| @@ -1484,7 +1486,8 @@ static int process_banner(struct ceph_co |
| static void fail_protocol(struct ceph_connection *con) |
| { |
| reset_connection(con); |
| - set_bit(CLOSED, &con->state); /* in case there's queued work */ |
| + BUG_ON(con->state != CON_STATE_NEGOTIATING); |
| + con->state = CON_STATE_CLOSED; |
| } |
| |
| static int process_connect(struct ceph_connection *con) |
| @@ -1558,8 +1561,7 @@ static int process_connect(struct ceph_c |
| if (con->ops->peer_reset) |
| con->ops->peer_reset(con); |
| mutex_lock(&con->mutex); |
| - if (test_bit(CLOSED, &con->state) || |
| - test_bit(OPENING, &con->state)) |
| + if (con->state != CON_STATE_NEGOTIATING) |
| return -EAGAIN; |
| break; |
| |
| @@ -1605,8 +1607,10 @@ static int process_connect(struct ceph_c |
| fail_protocol(con); |
| return -1; |
| } |
| - clear_bit(NEGOTIATING, &con->state); |
| - set_bit(CONNECTED, &con->state); |
| + |
| + BUG_ON(con->state != CON_STATE_NEGOTIATING); |
| + con->state = CON_STATE_OPEN; |
| + |
| con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); |
| con->connect_seq++; |
| con->peer_features = server_feat; |
| @@ -1994,8 +1998,9 @@ more: |
| dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); |
| |
| /* open the socket first? */ |
| - if (con->sock == NULL) { |
| - set_bit(CONNECTING, &con->state); |
| + if (con->state == CON_STATE_PREOPEN) { |
| + BUG_ON(con->sock); |
| + con->state = CON_STATE_CONNECTING; |
| |
| con_out_kvec_reset(con); |
| prepare_write_banner(con); |
| @@ -2046,8 +2051,7 @@ more_kvec: |
| } |
| |
| do_next: |
| - if (!test_bit(CONNECTING, &con->state) && |
| - !test_bit(NEGOTIATING, &con->state)) { |
| + if (con->state == CON_STATE_OPEN) { |
| /* is anything else pending? */ |
| if (!list_empty(&con->out_queue)) { |
| prepare_write_message(con); |
| @@ -2081,29 +2085,19 @@ static int try_read(struct ceph_connecti |
| { |
| int ret = -1; |
| |
| - if (!con->sock) |
| - return 0; |
| - |
| - if (test_bit(STANDBY, &con->state)) |
| +more: |
| + dout("try_read start on %p state %lu\n", con, con->state); |
| + if (con->state != CON_STATE_CONNECTING && |
| + con->state != CON_STATE_NEGOTIATING && |
| + con->state != CON_STATE_OPEN) |
| return 0; |
| |
| - dout("try_read start on %p\n", con); |
| + BUG_ON(!con->sock); |
| |
| -more: |
| dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, |
| con->in_base_pos); |
| |
| - /* |
| - * process_connect and process_message drop and re-take |
| - * con->mutex. make sure we handle a racing close or reopen. |
| - */ |
| - if (test_bit(CLOSED, &con->state) || |
| - test_bit(OPENING, &con->state)) { |
| - ret = -EAGAIN; |
| - goto out; |
| - } |
| - |
| - if (test_bit(CONNECTING, &con->state)) { |
| + if (con->state == CON_STATE_CONNECTING) { |
| dout("try_read connecting\n"); |
| ret = read_partial_banner(con); |
| if (ret <= 0) |
| @@ -2112,8 +2106,8 @@ more: |
| if (ret < 0) |
| goto out; |
| |
| - clear_bit(CONNECTING, &con->state); |
| - set_bit(NEGOTIATING, &con->state); |
| + BUG_ON(con->state != CON_STATE_CONNECTING); |
| + con->state = CON_STATE_NEGOTIATING; |
| |
| /* Banner is good, exchange connection info */ |
| ret = prepare_write_connect(con); |
| @@ -2125,7 +2119,7 @@ more: |
| goto out; |
| } |
| |
| - if (test_bit(NEGOTIATING, &con->state)) { |
| + if (con->state == CON_STATE_NEGOTIATING) { |
| dout("try_read negotiating\n"); |
| ret = read_partial_connect(con); |
| if (ret <= 0) |
| @@ -2136,6 +2130,8 @@ more: |
| goto more; |
| } |
| |
| + BUG_ON(con->state != CON_STATE_OPEN); |
| + |
| if (con->in_base_pos < 0) { |
| /* |
| * skipping + discarding content. |
| @@ -2169,8 +2165,8 @@ more: |
| prepare_read_ack(con); |
| break; |
| case CEPH_MSGR_TAG_CLOSE: |
| - clear_bit(CONNECTED, &con->state); |
| - set_bit(CLOSED, &con->state); /* fixme */ |
| + con_close_socket(con); |
| + con->state = CON_STATE_CLOSED; |
| goto out; |
| default: |
| goto bad_tag; |
| @@ -2246,14 +2242,21 @@ static void con_work(struct work_struct |
| mutex_lock(&con->mutex); |
| restart: |
| if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { |
| - if (test_and_clear_bit(CONNECTED, &con->state)) |
| - con->error_msg = "socket closed"; |
| - else if (test_and_clear_bit(NEGOTIATING, &con->state)) |
| - con->error_msg = "negotiation failed"; |
| - else if (test_and_clear_bit(CONNECTING, &con->state)) |
| + switch (con->state) { |
| + case CON_STATE_CONNECTING: |
| con->error_msg = "connection failed"; |
| - else |
| + break; |
| + case CON_STATE_NEGOTIATING: |
| + con->error_msg = "negotiation failed"; |
| + break; |
| + case CON_STATE_OPEN: |
| + con->error_msg = "socket closed"; |
| + break; |
| + default: |
| + dout("unrecognized con state %d\n", (int)con->state); |
| con->error_msg = "unrecognized con state"; |
| + BUG(); |
| + } |
| goto fault; |
| } |
| |
| @@ -2271,17 +2274,16 @@ restart: |
| } |
| } |
| |
| - if (test_bit(STANDBY, &con->state)) { |
| + if (con->state == CON_STATE_STANDBY) { |
| dout("con_work %p STANDBY\n", con); |
| goto done; |
| } |
| - if (test_bit(CLOSED, &con->state)) { |
| + if (con->state == CON_STATE_CLOSED) { |
| dout("con_work %p CLOSED\n", con); |
| BUG_ON(con->sock); |
| goto done; |
| } |
| - if (test_and_clear_bit(OPENING, &con->state)) { |
| - /* reopen w/ new peer */ |
| + if (con->state == CON_STATE_PREOPEN) { |
| dout("con_work OPENING\n"); |
| BUG_ON(con->sock); |
| } |
| @@ -2328,13 +2330,15 @@ static void ceph_fault(struct ceph_conne |
| dout("fault %p state %lu to peer %s\n", |
| con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); |
| |
| - if (test_bit(CLOSED, &con->state)) |
| - goto out_unlock; |
| + BUG_ON(con->state != CON_STATE_CONNECTING && |
| + con->state != CON_STATE_NEGOTIATING && |
| + con->state != CON_STATE_OPEN); |
| |
| con_close_socket(con); |
| |
| if (test_bit(LOSSYTX, &con->flags)) { |
| - dout("fault on LOSSYTX channel\n"); |
| + dout("fault on LOSSYTX channel, marking CLOSED\n"); |
| + con->state = CON_STATE_CLOSED; |
| goto out_unlock; |
| } |
| |
| @@ -2355,9 +2359,10 @@ static void ceph_fault(struct ceph_conne |
| !test_bit(KEEPALIVE_PENDING, &con->flags)) { |
| dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); |
| clear_bit(WRITE_PENDING, &con->flags); |
| - set_bit(STANDBY, &con->state); |
| + con->state = CON_STATE_STANDBY; |
| } else { |
| /* retry after a delay. */ |
| + con->state = CON_STATE_PREOPEN; |
| if (con->delay == 0) |
| con->delay = BASE_DELAY_INTERVAL; |
| else if (con->delay < MAX_DELAY_INTERVAL) |
| @@ -2431,8 +2436,9 @@ EXPORT_SYMBOL(ceph_messenger_init); |
| static void clear_standby(struct ceph_connection *con) |
| { |
| /* come back from STANDBY? */ |
| - if (test_and_clear_bit(STANDBY, &con->state)) { |
| + if (con->state == CON_STATE_STANDBY) { |
| dout("clear_standby %p and ++connect_seq\n", con); |
| + con->state = CON_STATE_PREOPEN; |
| con->connect_seq++; |
| WARN_ON(test_bit(WRITE_PENDING, &con->flags)); |
| WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); |
| @@ -2451,7 +2457,7 @@ void ceph_con_send(struct ceph_connectio |
| |
| mutex_lock(&con->mutex); |
| |
| - if (test_bit(CLOSED, &con->state)) { |
| + if (con->state == CON_STATE_CLOSED) { |
| dout("con_send %p closed, dropping %p\n", con, msg); |
| ceph_msg_put(msg); |
| mutex_unlock(&con->mutex); |