root/plugin/ipc/socket/connectionrewirer.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. markSocketNonBlocking
  2. markSocketBlocking
  3. instance
  4. destroy
  5. checkForPendingIncoming
  6. doReconnect
  7. openRestoreSocket
  8. registerIncoming
  9. registerOutgoing
  10. registerNSData
  11. registerNSData
  12. sendQueries
  13. debugPrint

   1 /****************************************************************************
   2  *   Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
   3  *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
   4  *                                                                          *
   5  *   This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src).  *
   6  *                                                                          *
   7  *  DMTCP:dmtcp/src is free software: you can redistribute it and/or        *
   8  *  modify it under the terms of the GNU Lesser General Public License as   *
   9  *  published by the Free Software Foundation, either version 3 of the      *
  10  *  License, or (at your option) any later version.                         *
  11  *                                                                          *
  12  *  DMTCP:dmtcp/src is distributed in the hope that it will be useful,      *
  13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
  14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
  15  *  GNU Lesser General Public License for more details.                     *
  16  *                                                                          *
  17  *  You should have received a copy of the GNU Lesser General Public        *
  18  *  License along with DMTCP:dmtcp/src.  If not, see                        *
  19  *  <http://www.gnu.org/licenses/>.                                         *
  20  ****************************************************************************/
  21 
  22 #include <fcntl.h>
  23 #include <unistd.h>
  24 #include <sys/socket.h>
  25 #include <sys/socket.h>
  26 #include <netinet/in.h>
  27 #include <arpa/inet.h>
  28 
  29 #include "dmtcp.h"
  30 #include "protectedfds.h"
  31 #include "util.h"
  32 #include "jsocket.h"
  33 
  34 #include "connectionrewirer.h"
  35 #include "socketconnection.h"
  36 #include "socketwrappers.h"
  37 
  38 using namespace dmtcp;
  39 
  40 // FIXME: IP6 Support disabled for now. However, we do go through the exercise
  41 // of creating the restore socket and all.
  42 // #define ENABLE_IP6_SUPPORT
  43 static void markSocketNonBlocking(int sockfd)
  44 {
  45     // Remove O_NONBLOCK flag from listener socket
  46     int flags = _real_fcntl(sockfd, F_GETFL, NULL);
  47     JASSERT(flags != -1);
  48     JASSERT(_real_fcntl(sockfd, F_SETFL,
  49                         (void*) (long) (flags | O_NONBLOCK)) != -1);
  50 }
  51 
  52 static void markSocketBlocking(int sockfd)
  53 {
  54     // Remove O_NONBLOCK flag from listener socket
  55     int flags = _real_fcntl(sockfd, F_GETFL, NULL);
  56     JASSERT(flags != -1);
  57     JASSERT(_real_fcntl(sockfd, F_SETFL,
  58                         (void*) (long) (flags & ~O_NONBLOCK)) != -1);
  59 }
  60 
  61 static ConnectionRewirer *theRewirer = NULL;
  62 ConnectionRewirer& ConnectionRewirer::instance()
  63 {
  64   if (theRewirer == NULL) {
  65     theRewirer = new ConnectionRewirer();
  66   }
  67   return *theRewirer;
  68 }
  69 
  70 void ConnectionRewirer::destroy()
  71 {
  72   dmtcp_close_protected_fd(PROTECTED_RESTORE_IP4_SOCK_FD);
  73   dmtcp_close_protected_fd(PROTECTED_RESTORE_IP6_SOCK_FD);
  74   dmtcp_close_protected_fd(PROTECTED_RESTORE_UDS_SOCK_FD);
  75 
  76   // Free up the object.
  77   delete theRewirer;
  78   theRewirer = NULL;
  79 }
  80 
  81 void ConnectionRewirer::checkForPendingIncoming(int restoreSockFd,
  82                                                 ConnectionListT *conList)
  83 {
  84   while (conList->size() > 0) {
  85     int fd = _real_accept(restoreSockFd, NULL, NULL);
  86     if (fd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  87       return;
  88     }
  89     JASSERT(fd != -1) (JASSERT_ERRNO) .Text("Accept failed.");
  90     ConnectionIdentifier id;
  91     JASSERT(Util::readAll(fd, &id, sizeof id) == sizeof id);
  92 
  93     iterator i = conList->find(id);
  94     JASSERT(i != conList->end()) (id)
  95       .Text("got unexpected incoming restore request");
  96 
  97     Util::dupFds(fd, (i->second)->getFds());
  98 
  99     JTRACE("restoring incoming connection") (id);
 100     conList->erase(i);
 101   }
 102 }
 103 
 104 void ConnectionRewirer::doReconnect()
 105 {
 106   iterator i;
 107   for (i = _pendingOutgoing.begin(); i != _pendingOutgoing.end(); i++) {
 108     const ConnectionIdentifier& id = i->first;
 109     Connection *con = i->second;
 110     struct RemoteAddr& remoteAddr = _remoteInfo[id];
 111     int fd = con->getFds()[0];
 112     errno = 0;
 113     JASSERT(_real_connect(fd, (sockaddr*) &remoteAddr.addr, remoteAddr.len)
 114             == 0)
 115       (id) (JASSERT_ERRNO) .Text("failed to restore connection");
 116 
 117     Util::writeAll(fd, &id, sizeof id);
 118 
 119     checkForPendingIncoming(PROTECTED_RESTORE_IP4_SOCK_FD,
 120                             &_pendingIP4Incoming);
 121     checkForPendingIncoming(PROTECTED_RESTORE_IP6_SOCK_FD,
 122                             &_pendingIP6Incoming);
 123     checkForPendingIncoming(PROTECTED_RESTORE_UDS_SOCK_FD,
 124                             &_pendingUDSIncoming);
 125   }
 126   _pendingOutgoing.clear();
 127   _remoteInfo.clear();
 128 
 129   if (_pendingIP4Incoming.size() > 0) {
 130     // Add O_NONBLOCK flag to the listener sockets.
 131     markSocketBlocking(PROTECTED_RESTORE_IP4_SOCK_FD);
 132     checkForPendingIncoming(PROTECTED_RESTORE_IP4_SOCK_FD,
 133                             &_pendingIP4Incoming);
 134     _real_close(PROTECTED_RESTORE_IP4_SOCK_FD);
 135   }
 136   if (_pendingIP6Incoming.size() > 0) {
 137     // Add O_NONBLOCK flag to the listener sockets.
 138     markSocketBlocking(PROTECTED_RESTORE_IP6_SOCK_FD);
 139     checkForPendingIncoming(PROTECTED_RESTORE_IP6_SOCK_FD,
 140                             &_pendingIP6Incoming);
 141     _real_close(PROTECTED_RESTORE_IP6_SOCK_FD);
 142   }
 143   if (_pendingUDSIncoming.size() > 0) {
 144     // Add O_NONBLOCK flag to the listener sockets.
 145     markSocketBlocking(PROTECTED_RESTORE_UDS_SOCK_FD);
 146     checkForPendingIncoming(PROTECTED_RESTORE_UDS_SOCK_FD,
 147                             &_pendingUDSIncoming);
 148     _real_close(PROTECTED_RESTORE_UDS_SOCK_FD);
 149   }
 150   JTRACE("Closed restore sockets");
 151 }
 152 
 153 void ConnectionRewirer::openRestoreSocket(bool hasIPv4Sock,
 154                                           bool hasIPv6Sock,
 155                                           bool hasUNIXSock)
 156 {
 157   memset(&_ip4RestoreAddr, 0, sizeof(_ip4RestoreAddr));
 158   memset(&_ip6RestoreAddr, 0, sizeof(_ip6RestoreAddr));
 159   memset(&_udsRestoreAddr, 0, sizeof(_udsRestoreAddr));
 160 
 161   // Open IP4 Restore Socket
 162   if (hasIPv4Sock) {
 163     jalib::JServerSocket restoreSocket(jalib::JSockAddr::ANY, 0);
 164     JASSERT(restoreSocket.isValid());
 165     restoreSocket.changeFd(PROTECTED_RESTORE_IP4_SOCK_FD);
 166 
 167     // Setup restore socket for name service
 168     _ip4RestoreAddr.sin_family = AF_INET;
 169     dmtcp_get_local_ip_addr(&_ip4RestoreAddr.sin_addr);
 170     _ip4RestoreAddr.sin_port = htons(restoreSocket.port());
 171     _ip4RestoreAddrlen = sizeof(_ip4RestoreAddr);
 172 
 173     JTRACE("opened listen socket") (restoreSocket.sockfd())
 174       (inet_ntoa(_ip4RestoreAddr.sin_addr)) (ntohs(_ip4RestoreAddr.sin_port));
 175     markSocketNonBlocking(PROTECTED_RESTORE_IP4_SOCK_FD);
 176   }
 177 
 178   // Open IP6 Restore Socket
 179   if (hasIPv6Sock) {
 180     int ip6fd = _real_socket(AF_INET6, SOCK_STREAM, 0);
 181     JASSERT(ip6fd != -1) (JASSERT_ERRNO);
 182 
 183     _ip6RestoreAddr.sin6_family = AF_INET6;
 184     _ip6RestoreAddr.sin6_port = 0;
 185     _ip6RestoreAddr.sin6_addr = in6addr_any;
 186     _ip6RestoreAddrlen = sizeof(_ip6RestoreAddr);
 187     JASSERT(_real_bind(ip6fd, (struct sockaddr*) &_ip6RestoreAddr,
 188                        _ip6RestoreAddrlen) == 0)
 189       (JASSERT_ERRNO);
 190     JASSERT(getsockname(ip6fd, (struct sockaddr*)&_ip6RestoreAddr,
 191                         &_ip6RestoreAddrlen) == 0)
 192       (JASSERT_ERRNO);
 193     JASSERT(_real_listen(ip6fd, 32) == 0) (JASSERT_ERRNO);
 194     Util::changeFd(ip6fd, PROTECTED_RESTORE_IP6_SOCK_FD);
 195 
 196     JTRACE("opened ip6 listen socket") (PROTECTED_RESTORE_IP6_SOCK_FD);
 197     markSocketNonBlocking(PROTECTED_RESTORE_IP6_SOCK_FD);
 198   }
 199 
 200   // Open UDS Restore Socket
 201   if (hasUNIXSock) {
 202     ostringstream o;
 203     o << dmtcp_get_uniquepid_str() << "_" << dmtcp_get_coordinator_timestamp();
 204     string str = o.str();
 205     int udsfd = _real_socket(AF_UNIX, SOCK_STREAM, 0);
 206     JASSERT(udsfd != -1);
 207     memset(&_udsRestoreAddr, 0, sizeof(struct sockaddr_un));
 208     _udsRestoreAddr.sun_family = AF_UNIX;
 209     strncpy(&_udsRestoreAddr.sun_path[1], str.c_str(), str.length());
 210     _udsRestoreAddrlen = sizeof(sa_family_t) + str.length() + 1;
 211     JASSERT(_real_bind(udsfd, (struct sockaddr*) &_udsRestoreAddr,
 212                        _udsRestoreAddrlen) == 0)
 213       (JASSERT_ERRNO);
 214     JASSERT(_real_listen(udsfd, 32) == 0) (JASSERT_ERRNO);
 215     Util::changeFd(udsfd, PROTECTED_RESTORE_UDS_SOCK_FD);
 216 
 217     JTRACE("opened UDS listen socket")
 218       (PROTECTED_RESTORE_UDS_SOCK_FD) (&_udsRestoreAddr.sun_path[1]);
 219     markSocketNonBlocking(PROTECTED_RESTORE_UDS_SOCK_FD);
 220   }
 221 }
 222 
 223 void
 224 ConnectionRewirer::registerIncoming(const ConnectionIdentifier& local,
 225                                     Connection* con,
 226                                     int domain)
 227 {
 228   JASSERT(domain == AF_INET || domain == AF_INET6 || domain == AF_UNIX)
 229     (domain) .Text("Unsupported domain.");
 230 
 231   if (domain == AF_INET) {
 232     _pendingIP4Incoming[local] = con;
 233   } else if (domain == AF_INET6) {
 234 #ifdef ENABLE_IP6_SUPPORT
 235     _pendingIP6Incoming[local] = con;
 236 #else
 237     _pendingIP4Incoming[local] = con;
 238 #endif
 239   } else if (domain == AF_UNIX) {
 240     _pendingUDSIncoming[local] = con;
 241   } else {
 242     JASSERT(false) .Text("Not implemented");
 243   }
 244 
 245   JTRACE("announcing pending incoming") (local);
 246 }
 247 
 248 void
 249 ConnectionRewirer::registerOutgoing(const ConnectionIdentifier& remote,
 250                                     Connection* con)
 251 {
 252   _pendingOutgoing[remote] = con;
 253   JTRACE("announcing pending outgoing") (remote);
 254 }
 255 
 256 void ConnectionRewirer::registerNSData()
 257 {
 258   registerNSData((void*)&_ip4RestoreAddr, _ip4RestoreAddrlen,
 259                  &_pendingIP4Incoming);
 260   registerNSData((void*)&_ip6RestoreAddr, _ip6RestoreAddrlen,
 261                  &_pendingIP6Incoming);
 262   registerNSData((void*)&_udsRestoreAddr, _udsRestoreAddrlen,
 263                  &_pendingUDSIncoming);
 264 }
 265 
 266 void ConnectionRewirer::registerNSData(void *addr,
 267                                        socklen_t addrLen,
 268                                        ConnectionListT *conList)
 269 {
 270   iterator i;
 271   JASSERT(theRewirer != NULL);
 272   for (i = conList->begin(); i != conList->end(); ++i) {
 273     const ConnectionIdentifier& id = i->first;
 274     dmtcp_send_key_val_pair_to_coordinator("Socket",
 275                                            (const void *)&id,
 276                                            (uint32_t) sizeof(id),
 277                                            addr,
 278                                            (uint32_t) addrLen);
 279     /*
 280     sockaddr_in *sn = (sockaddr_in*) &_restoreAddr;
 281     unsigned short port = htons(sn->sin_port);
 282     char *ip = inet_ntoa(sn->sin_addr);
 283     JTRACE("Send NS information:")(id)(sn->sin_family)(port)(ip);
 284     */
 285   }
 286   //debugPrint();
 287 }
 288 
 289 void ConnectionRewirer::sendQueries()
 290 {
 291   iterator i;
 292   for (i = _pendingOutgoing.begin(); i != _pendingOutgoing.end(); ++i) {
 293     const ConnectionIdentifier& id = i->first;
 294     struct RemoteAddr remote;
 295     uint32_t len = sizeof(remote.addr);
 296     dmtcp_send_query_to_coordinator("Socket",
 297                                     (const void *)&id, (uint32_t) sizeof(id),
 298                                     &remote.addr, &len);
 299     remote.len = len;
 300     /*
 301     sockaddr_in *sn = (sockaddr_in*) &remote.addr;
 302     unsigned short port = htons(sn->sin_port);
 303     char *ip = inet_ntoa(sn->sin_addr);
 304     JTRACE("Send Queries. Get remote from coordinator:")(id)(sn->sin_family)(port)(ip);
 305     */
 306     _remoteInfo[id] = remote;
 307   }
 308 }
 309 
 310 #if 0
 311 void ConnectionRewirer::debugPrint() const
 312 {
 313 #ifdef DEBUG
 314   ostringstream o;
 315   o << "Pending Incoming:\n";
 316   const_iterator i;
 317   for (i = _pendingIncoming.begin(); i!=_pendingIncoming.end(); ++i) {
 318     Connection *con = i->second;
 319     o << i->first << " numFds=" << con->getFds().size()
 320       << " firstFd=" << con->getFds()[0] << '\n';
 321   }
 322   o << "Pending Outgoing:\n";
 323   for (i = _pendingOutgoing.begin(); i!=_pendingOutgoing.end(); ++i) {
 324     Connection *con = i->second;
 325     o << i->first << " numFds=" << con->getFds().size()
 326       << " firstFd=" << con->getFds()[0] << '\n';
 327   }
 328   JNOTE("Pending connections") (o.str());
 329 #endif
 330 }
 331 #endif

/* [<][>][^][v][top][bottom][index][help] */