root/plugin/ipc/socket/socketconnection.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. _makeDeadSocket
  2. _peerType
  3. addSetsockopt
  4. restoreSocketOptions
  5. serialize
  6. _remotePeerId
  7. asTcp
  8. getMPISpawnPortNum
  9. isBlacklistedTcp
  10. onBind
  11. onListen
  12. onConnect
  13. _remotePeerId
  14. onError
  15. drain
  16. doSendHandshakes
  17. doRecvHandshakes
  18. refill
  19. postRestart
  20. sendHandshake
  21. recvHandshake
  22. serializeSubClass
  23. SocketConnection
  24. drain
  25. refill
  26. postRestart
  27. serializeSubClass

   1 /****************************************************************************
   2  *   Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
   3  *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
   4  *                                                                          *
   5  *   This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src).  *
   6  *                                                                          *
   7  *  DMTCP:dmtcp/src is free software: you can redistribute it and/or        *
   8  *  modify it under the terms of the GNU Lesser General Public License as   *
   9  *  published by the Free Software Foundation, either version 3 of the      *
  10  *  License, or (at your option) any later version.                         *
  11  *                                                                          *
  12  *  DMTCP:dmtcp/src is distributed in the hope that it will be useful,      *
  13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
  14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
  15  *  GNU Lesser General Public License for more details.                     *
  16  *                                                                          *
  17  *  You should have received a copy of the GNU Lesser General Public        *
  18  *  License along with DMTCP:dmtcp/src.  If not, see                        *
  19  *  <http://www.gnu.org/licenses/>.                                         *
  20  ****************************************************************************/
  21 
  22 #include <sys/ioctl.h>
  23 #include <sys/select.h>
  24 #include <sys/un.h>
  25 #include <unistd.h>
  26 #include <fcntl.h>
  27 #include <linux/limits.h>
  28 #include <arpa/inet.h>
  29 
  30 #include "dmtcp.h"
  31 #include "shareddata.h"
  32 #include "util.h"
  33 #include "jsocket.h"
  34 #include "jassert.h"
  35 #include "jfilesystem.h"
  36 #include "jconvert.h"
  37 
  38 #include "socketconnection.h"
  39 #include "socketwrappers.h"
  40 #include "kernelbufferdrainer.h"
  41 #include "connectionrewirer.h"
  42 
  43 #ifdef REALLY_VERBOSE_CONNECTION_CPP
  44 static bool really_verbose = true;
  45 #else
  46 static bool really_verbose = false;
  47 #endif
  48 
  49 using namespace dmtcp;
  50 
  51 //this function creates a socket that is in an error state
  52 static int _makeDeadSocket(const char *refillData = NULL, ssize_t len = -1)
  53 {
  54   //it does it by creating a socket pair and closing one side
  55   int sp[2] = {-1,-1};
  56   JASSERT(_real_socketpair(AF_UNIX, SOCK_STREAM, 0, sp) == 0) (JASSERT_ERRNO)
  57     .Text("socketpair() failed");
  58   JASSERT(sp[0]>=0 && sp[1]>=0) (sp[0]) (sp[1])
  59     .Text("socketpair() failed");
  60   if (refillData != NULL) {
  61     JASSERT(Util::writeAll(sp[1], refillData, len) == len);
  62   }
  63   _real_close(sp[1]);
  64   if (really_verbose) {
  65     JTRACE("Created dead socket.") (sp[0]);
  66   }
  67   return sp[0];
  68 }
  69 
  70 SocketConnection::SocketConnection(int domain, int type, int protocol)
  71   : _sockDomain(domain)
  72   , _sockType(type)
  73   , _sockProtocol(protocol)
  74   , _peerType(PEER_UNKNOWN)
  75 { }
  76 
  77 void SocketConnection::addSetsockopt(int level, int option,
  78                                      const char* value, int len)
  79 {
  80   _sockOptions[level][option] = jalib::JBuffer(value, len);
  81 }
  82 
  83 void SocketConnection::restoreSocketOptions(vector<int>& fds)
  84 {
  85   typedef map<int64_t, map< int64_t, jalib::JBuffer> >::iterator levelIterator;
  86   typedef map<int64_t, jalib::JBuffer>::iterator optionIterator;
  87 
  88   for (levelIterator lvl = _sockOptions.begin();
  89        lvl!=_sockOptions.end(); ++lvl) {
  90     for (optionIterator opt = lvl->second.begin();
  91          opt!=lvl->second.end(); ++opt) {
  92       JTRACE("Restoring socket option.")
  93         (fds[0]) (opt->first) (opt->second.size());
  94       int ret = _real_setsockopt(fds[0], lvl->first, opt->first,
  95                                  opt->second.buffer(),
  96                                  opt->second.size());
  97       JASSERT(ret == 0) (JASSERT_ERRNO) (fds[0])
  98         (lvl->first) (opt->first) (opt->second.size())
  99         .Text("Restoring setsockopt failed.");
 100     }
 101   }
 102 }
 103 
 104 void SocketConnection::serialize(jalib::JBinarySerializer& o)
 105 {
 106   JSERIALIZE_ASSERT_POINT("SocketConnection");
 107   o & _sockDomain  & _sockType & _sockProtocol & _peerType;
 108 
 109   JSERIALIZE_ASSERT_POINT("SocketOptions:");
 110   uint64_t numSockOpts = _sockOptions.size();
 111   o & numSockOpts;
 112   if (o.isWriter()) {
 113     //JTRACE("TCP Serialize ") (_type) (_id.conId());
 114     typedef map< int64_t, map< int64_t, jalib::JBuffer > >::iterator levelIterator;
 115     typedef map< int64_t, jalib::JBuffer >::iterator optionIterator;
 116 
 117     uint64_t numLvl = _sockOptions.size();
 118     o & numLvl;
 119 
 120     for (levelIterator lvl = _sockOptions.begin();
 121          lvl!=_sockOptions.end(); ++lvl) {
 122       int64_t lvlVal = lvl->first;
 123       uint64_t numOpts = lvl->second.size();
 124 
 125       JSERIALIZE_ASSERT_POINT("Lvl");
 126 
 127       o & lvlVal & numOpts;
 128 
 129       for (optionIterator opt = lvl->second.begin();
 130            opt!=lvl->second.end(); ++opt) {
 131         int64_t optType = opt->first;
 132         jalib::JBuffer& buffer = opt->second;
 133         int64_t bufLen = buffer.size();
 134 
 135         JSERIALIZE_ASSERT_POINT("Opt");
 136 
 137         o & optType & bufLen;
 138         o.readOrWrite(buffer.buffer(), bufLen);
 139       }
 140     }
 141   } else {
 142     uint64_t numLvl = 0;
 143     o & numLvl;
 144 
 145     while (numLvl-- > 0) {
 146       int64_t lvlVal = -1;
 147       int64_t numOpts = 0;
 148 
 149       JSERIALIZE_ASSERT_POINT("Lvl");
 150 
 151       o & lvlVal & numOpts;
 152 
 153       while (numOpts-- > 0) {
 154         int64_t optType = -1;
 155         int64_t bufLen = -1;
 156 
 157         JSERIALIZE_ASSERT_POINT("Opt");
 158 
 159         o & optType & bufLen;
 160 
 161         jalib::JBuffer buffer(bufLen);
 162         o.readOrWrite(buffer.buffer(), bufLen);
 163 
 164         _sockOptions[lvlVal][optType]=buffer;
 165       }
 166     }
 167   }
 168 
 169   JSERIALIZE_ASSERT_POINT("EndSockOpts");
 170 }
 171 
 172 /*****************************************************************************
 173  * TCP Connection
 174  *****************************************************************************/
 175 
 176 /*onSocket*/
 177   TcpConnection::TcpConnection(int domain, int type, int protocol)
 178   : Connection(TCP_CREATED)
 179   , SocketConnection(domain, type, protocol)
 180   , _listenBacklog(-1)
 181   , _bindAddrlen(0)
 182   , _remotePeerId(ConnectionIdentifier::null())
 183 {
 184   if (domain != -1) {
 185     // Sometimes _sockType contains SOCK_CLOEXEC/SOCK_NONBLOCK flags.
 186     if ((type & 077) == SOCK_DGRAM) {
 187       JWARNING(false) (type)
 188         .Text("Datagram Sockets not supported. "
 189               "Hopefully, this is a short lived connection!");
 190     } else {
 191       JWARNING((domain == AF_INET || domain == AF_UNIX || domain == AF_INET6)
 192                && (type & 077) == SOCK_STREAM)
 193         (domain) (type) (protocol);
 194     }
 195     JTRACE("Creating TcpConnection.") (id()) (domain) (type) (protocol);
 196   }
 197   memset(&_bindAddr, 0, sizeof _bindAddr);
 198 }
 199 
 200 TcpConnection& TcpConnection::asTcp()
 201 {
 202   return *this;
 203 }
 204 
 205 #ifdef STAMPEDE_MPISPAWN_FIX
 206 static int
 207 getMPISpawnPortNum(const char* envVar)
 208 {
 209   /* PMI_PORT is of the form: "hostname:port" */
 210   char *temp = getenv(envVar);
 211   if (temp) {
 212     while (*temp && *temp != ':') *temp++;
 213     if (*temp == ':') return atoi(temp+1);
 214   }
 215   return 0;
 216 }
 217 #endif
 218 
 219 bool TcpConnection::isBlacklistedTcp(const sockaddr* saddr, socklen_t len)
 220 {
 221   JASSERT(saddr != NULL);
 222   if (len <= sizeof(saddr->sa_family)) {
 223     return false;
 224   }
 225 
 226   if (saddr->sa_family == AF_INET) {
 227     struct sockaddr_in* addr =(sockaddr_in*)saddr;
 228     // Ports 389 and 636 are the well-known ports in /etc/services that
 229     // are reserved for LDAP.  Bash continues to maintain a connection to
 230     // LDAP, leading to problems at restart time.  So, we discover the LDAP
 231     // remote addresses, and turn them into dead sockets at restart time.
 232     //However, libc.so:getpwuid() can call libnss_ldap.so which calls
 233     // libldap-2.4.so to create the LDAP socket while evading our connect
 234     // wrapper.
 235     int blacklistedRemotePorts[] = {53,                 // DNS Server
 236                                     389, 636,           // LDAP
 237                                     -1};
 238 #ifdef STAMPEDE_MPISPAWN_FIX
 239     int mpispawnPort = getMPISpawnPortNum("PMI_PORT");
 240     JTRACE("PMI_PORT port") (mpispawnPort) (ntohs(addr->sin_port));
 241     JASSERT(mpispawnPort != 0).Text("PMI_PORT not found");
 242     if (ntohs(addr->sin_port) == mpispawnPort) {
 243       JTRACE("PMI_PORT port found") (mpispawnPort);
 244       return true;
 245     }
 246 #endif
 247     for (size_t i = 0; blacklistedRemotePorts[i] != -1; i++) {
 248       if (ntohs(addr->sin_port) == blacklistedRemotePorts[i]) {
 249         JTRACE("LDAP port found") (ntohs(addr->sin_port))
 250           (blacklistedRemotePorts[0]) (blacklistedRemotePorts[1]);
 251         return true;
 252       }
 253     }
 254   } else if (saddr->sa_family == AF_UNIX) {
 255     struct sockaddr_un *uaddr = (struct sockaddr_un *) saddr;
 256     static string blacklist[] = {""};
 257     for (size_t i = 0; blacklist[i] != ""; i++) {
 258       if (Util::strStartsWith(uaddr->sun_path, blacklist[i].c_str()) ||
 259           Util::strStartsWith(&uaddr->sun_path[1], blacklist[i].c_str())) {
 260         JTRACE("Blacklisted socket address") (uaddr->sun_path);
 261         return true;
 262       }
 263     }
 264   }
 265 
 266   // FIXME:  Consider adding configure or dmtcp_launch option to disable
 267   //   all remote connections.  Handy as quick test for special cases.
 268   return false;
 269 }
 270 
 271 void TcpConnection::onBind(const struct sockaddr* addr, socklen_t len)
 272 {
 273   if (really_verbose) {
 274     JTRACE("Binding.") (id()) (len);
 275   }
 276 
 277   // If the bind succeeded, we do not need any additional assert.
 278   //JASSERT(_type == TCP_CREATED) (_type) (id())
 279   //  .Text("Binding a socket in use????");
 280 
 281   if (_sockDomain == AF_UNIX && addr != NULL) {
 282     JASSERT(len <= sizeof _bindAddr) (len) (sizeof _bindAddr)
 283       .Text("That is one huge sockaddr buddy.");
 284     _bindAddrlen = len;
 285     memcpy(&_bindAddr, addr, len);
 286   } else {
 287     _bindAddrlen = sizeof(_bindAddr);
 288     // Do not rely on the address passed on to bind as it may contain port 0
 289     // which allows the OS to give any unused port. Thus we look ourselves up
 290     // using getsockname.
 291     JASSERT(getsockname(_fds[0], (struct sockaddr *)&_bindAddr, &_bindAddrlen) == 0)
 292       (JASSERT_ERRNO);
 293   }
 294   _type = TCP_BIND;
 295 }
 296 
 297 void TcpConnection::onListen(int backlog)
 298 {
 299   /* The application didn't issue a bind() call; the kernel will assign
 300    * a random address in this case. Call the regular onBind() post-
 301    * processing function which will save the address returned by the kernel
 302    * and change the state of the socket connection to TCP_BIND.
 303    */
 304   if (_type == TCP_CREATED) {
 305     onBind(NULL, 0);
 306   }
 307 
 308   if (really_verbose) {
 309     JTRACE("Listening.") (id()) (backlog);
 310   }
 311   JASSERT(_type == TCP_BIND) (_type) (id())
 312     .Text("Listening on a non-bind()ed socket????");
 313   // A -1 backlog is not an error.
 314   //JASSERT(backlog > 0) (backlog)
 315   //.Text("That is an odd backlog????");
 316 
 317   _type = TCP_LISTEN;
 318   _listenBacklog = backlog;
 319 }
 320 
 321 void TcpConnection::onConnect(const struct sockaddr *addr, socklen_t len)
 322 {
 323   if (really_verbose) {
 324     JTRACE("Connecting.") (id());
 325   }
 326   JWARNING(_type == TCP_CREATED || _type == TCP_BIND) (_type) (id())
 327     .Text("Connecting with an in-use socket????");
 328 
 329   if (addr != NULL && isBlacklistedTcp(addr, len)) {
 330     _type = TCP_EXTERNAL_CONNECT;
 331     _connectAddrlen = len;
 332     memcpy(&_connectAddr, addr, len);
 333   } else {
 334     _type = TCP_CONNECT;
 335   }
 336 }
 337 
 338 /*onAccept*/
 339 TcpConnection::TcpConnection(const TcpConnection& parent,
 340                              const ConnectionIdentifier& remote)
 341   : Connection(TCP_ACCEPT)
 342   , SocketConnection(parent._sockDomain, parent._sockType, parent._sockProtocol)
 343   , _listenBacklog(-1)
 344   , _bindAddrlen(0)
 345   , _remotePeerId(remote)
 346 {
 347   if (really_verbose) {
 348     JTRACE("Accepting.") (id()) (parent.id()) (remote);
 349   }
 350 
 351   //     JASSERT(parent._type == TCP_LISTEN) (parent._type) (parent.id())
 352   //             .Text("Accepting from a non listening socket????");
 353   memset(&_bindAddr, 0, sizeof _bindAddr);
 354 }
 355 
 356 void TcpConnection::onError()
 357 {
 358   JTRACE("Error.") (id());
 359   _type = TCP_ERROR;
 360   JTRACE("Creating dead socket.") (_fds[0]) (_fds.size());
 361   const vector<char>& buffer =
 362     KernelBufferDrainer::instance().getDrainedData(_id);
 363   Util::dupFds(_makeDeadSocket(&buffer[0], buffer.size()), _fds);
 364 }
 365 
 366 void TcpConnection::drain()
 367 {
 368   JASSERT(_fds.size() > 0) (id());
 369 
 370   if ((_fcntlFlags & O_ASYNC) != 0) {
 371     if (really_verbose) {
 372       JTRACE("Removing O_ASYNC flag during checkpoint.") (_fds[0]) (id());
 373     }
 374     errno = 0;
 375     JASSERT(fcntl(_fds[0],F_SETFL,_fcntlFlags & ~O_ASYNC) == 0)
 376       (JASSERT_ERRNO) (_fds[0]) (id());
 377   }
 378 
 379   if (dmtcp_no_coordinator()) {
 380     markExternalConnect();
 381   }
 382 
 383   switch (_type) {
 384     case TCP_ERROR:
 385       // Treat TCP_ERROR as a regular socket for draining purposes. There still
 386       // might be some stale data on it.
 387     case TCP_CONNECT:
 388     case TCP_ACCEPT:
 389       JTRACE("Will drain socket") (_hasLock) (_fds[0]) (_id) (_remotePeerId);
 390       KernelBufferDrainer::instance().beginDrainOf(_fds[0], _id);
 391       break;
 392     case TCP_LISTEN:
 393       KernelBufferDrainer::instance().addListenSocket(_fds[0]);
 394       break;
 395     case TCP_BIND:
 396       JWARNING(_type != TCP_BIND) (_fds[0])
 397         .Text("If there are pending connections on this socket,\n"
 398               " they won't be checkpointed because"
 399               " it is not yet in a listen state.");
 400       break;
 401     case TCP_EXTERNAL_CONNECT:
 402       JTRACE("Socket to External Process, won't be drained") (_fds[0]);
 403       break;
 404   }
 405 }
 406 
 407 void TcpConnection::doSendHandshakes(const ConnectionIdentifier& coordId)
 408 {
 409   switch (_type) {
 410     case TCP_CONNECT:
 411     case TCP_ACCEPT:
 412       JTRACE("Sending handshake ...") (id()) (_fds[0]);
 413       sendHandshake(_fds[0], coordId);
 414       break;
 415     case TCP_EXTERNAL_CONNECT:
 416       JTRACE("Socket to External Process, skipping handshake send") (_fds[0]);
 417       break;
 418   }
 419 }
 420 
 421 void TcpConnection::doRecvHandshakes(const ConnectionIdentifier& coordId)
 422 {
 423   switch (_type) {
 424     case TCP_CONNECT:
 425     case TCP_ACCEPT:
 426       recvHandshake(_fds[0], coordId);
 427       JTRACE("Received handshake.") (id()) (_remotePeerId) (_fds[0]);
 428       break;
 429     case TCP_EXTERNAL_CONNECT:
 430       JTRACE("Socket to External Process, skipping handshake recv") (_fds[0]);
 431       break;
 432   }
 433 }
 434 
 435 void TcpConnection::refill(bool isRestart)
 436 {
 437   if ((_fcntlFlags & O_ASYNC) != 0) {
 438     JTRACE("Re-adding O_ASYNC flag.") (_fds[0]) (id());
 439     restoreSocketOptions(_fds);
 440   } else if (isRestart && _sockDomain != AF_INET6 &&
 441              _type != TCP_EXTERNAL_CONNECT) {
 442     restoreSocketOptions(_fds);
 443   }
 444 }
 445 
 446 void TcpConnection::postRestart()
 447 {
 448   int fd;
 449   JASSERT(_fds.size() > 0);
 450   switch (_type) {
 451     case TCP_PREEXISTING:
 452     case TCP_INVALID:
 453     case TCP_EXTERNAL_CONNECT:
 454       JTRACE("Creating dead socket.") (_fds[0]) (_fds.size());
 455       Util::dupFds(_makeDeadSocket(), _fds);
 456       break;
 457 
 458     case TCP_ERROR:
 459       // Disconnected socket. Need to refill the drained data
 460       {
 461         const vector<char>& buffer =
 462           KernelBufferDrainer::instance().getDrainedData(_id);
 463         Util::dupFds(_makeDeadSocket(&buffer[0], buffer.size()), _fds);
 464       }
 465       break;
 466 
 467     case TCP_CREATED:
 468     case TCP_BIND:
 469     case TCP_LISTEN:
 470       // Sometimes _sockType contains SOCK_CLOEXEC/SOCK_NONBLOCK flags.
 471       JWARNING((_sockDomain == AF_INET || _sockDomain == AF_UNIX ||
 472                 _sockDomain == AF_INET6) && (_sockType & 077) == SOCK_STREAM)
 473         (id()) (_sockDomain) (_sockType) (_sockProtocol)
 474         .Text("Socket type not yet [fully] supported.");
 475 
 476       if (really_verbose) {
 477         JTRACE("Restoring socket.") (id()) (_fds[0]);
 478       }
 479 
 480       fd = _real_socket(_sockDomain, _sockType, _sockProtocol);
 481       JASSERT(fd != -1) (JASSERT_ERRNO);
 482       Util::dupFds(fd, _fds);
 483 
 484       if (_type == TCP_CREATED) break;
 485 
 486       if (_sockDomain == AF_UNIX &&
 487           _bindAddrlen > sizeof(_bindAddr.ss_family)) {
 488         struct sockaddr_un *uaddr = (sockaddr_un*) &_bindAddr;
 489         if (uaddr->sun_path[0] != '\0') {
 490           JTRACE("Unlinking stale unix domain socket.") (uaddr->sun_path);
 491           JWARNING(unlink(uaddr->sun_path) == 0) (uaddr->sun_path);
 492         }
 493       }
 494 
 495       /*
 496        * During restart, some socket options must be restored(using
 497        * setsockopt) before the socket is used(bind etc.), otherwise we might
 498        * not be able to restore them at all. One such option is set in the
 499        * following way for IPV6 family:
 500        * setsockopt(sd, IPPROTO_IPV6, IPV6_V6ONLY,...)
 501        * This fix works for now. A better approach would be to restore the
 502        * socket options in the order in which they are set by the user
 503        * program.  This fix solves a bug that caused Open MPI to fail to
 504        * restart under DMTCP.
 505        *                               --Kapil
 506        */
 507 
 508       if (_sockDomain == AF_INET6) {
 509         JTRACE("Restoring some socket options before binding.");
 510         typedef map< int64_t,
 511                      map< int64_t, jalib::JBuffer > >::iterator levelIterator;
 512         typedef map< int64_t, jalib::JBuffer >::iterator optionIterator;
 513 
 514         for (levelIterator lvl = _sockOptions.begin();
 515              lvl!=_sockOptions.end(); ++lvl) {
 516           if (lvl->first == IPPROTO_IPV6) {
 517             for (optionIterator opt = lvl->second.begin();
 518                  opt!=lvl->second.end(); ++opt) {
 519               if (opt->first == IPV6_V6ONLY) {
 520                 if (really_verbose) {
 521                   JTRACE("Restoring socket option.")
 522                     (_fds[0]) (opt->first) (opt->second.size());
 523                 }
 524                 int ret = _real_setsockopt(_fds[0], lvl->first, opt->first,
 525                                            opt->second.buffer(),
 526                                            opt->second.size());
 527                 JASSERT(ret == 0) (JASSERT_ERRNO) (_fds[0]) (lvl->first)
 528                   (opt->first) (opt->second.buffer()) (opt->second.size())
 529                   .Text("Restoring setsockopt failed.");
 530               }
 531             }
 532           }
 533         }
 534       }
 535 
 536       if (really_verbose) {
 537         JTRACE("Binding socket.") (id());
 538       }
 539       errno = 0;
 540       JWARNING(_real_bind(_fds[0], (sockaddr*) &_bindAddr,_bindAddrlen) == 0)
 541         (JASSERT_ERRNO) (id()) .Text("Bind failed.");
 542       if (_type == TCP_BIND) break;
 543 
 544       if (really_verbose) {
 545         JTRACE("Listening socket.") (id());
 546       }
 547       errno = 0;
 548       JWARNING(_real_listen(_fds[0], _listenBacklog) == 0)
 549         (JASSERT_ERRNO) (id()) (_listenBacklog) .Text("listen failed.");
 550       if (_type == TCP_LISTEN) break;
 551 
 552       break;
 553 
 554     case TCP_ACCEPT:
 555       JASSERT(!_remotePeerId.isNull()) (id()) (_remotePeerId) (_fds[0])
 556         .Text("Can't restore a TCP_ACCEPT socket with null acceptRemoteId.\n"
 557               "  Perhaps handshake went wrong?");
 558 
 559       JTRACE("registerIncoming") (id()) (_remotePeerId) (_fds[0]);
 560       ConnectionRewirer::instance().registerIncoming(id(), this, _sockDomain);
 561       break;
 562 
 563     case TCP_CONNECT:
 564 #ifdef ENABLE_IP6_SUPPORT
 565       fd = _real_socket(_sockDomain, _sockType, _sockProtocol);
 566 #else
 567       fd = _real_socket(_sockDomain == AF_INET6 ? AF_INET : _sockDomain,
 568                         _sockType, _sockProtocol);
 569 #endif
 570       JASSERT(fd != -1) (JASSERT_ERRNO);
 571       Util::dupFds(fd, _fds);
 572       if (_bindAddrlen != 0) {
 573         JWARNING(_real_bind(_fds[0], (sockaddr*)&_bindAddr, _bindAddrlen) != -1)
 574           (JASSERT_ERRNO);
 575       }
 576       JTRACE("registerOutgoing") (id()) (_remotePeerId) (_fds[0]);
 577       ConnectionRewirer::instance().registerOutgoing(_remotePeerId, this);
 578       break;
 579       //    case TCP_EXTERNAL_CONNECT:
 580       //      int sockFd = _real_socket(_sockDomain, _sockType, _sockProtocol);
 581       //      JASSERT(sockFd >= 0);
 582       //      JASSERT(_real_dup2(sockFd, _fds[0]) == _fds[0]);
 583       //      JWARNING(0 == _real_connect(sockFd,(sockaddr*) &_connectAddr,
 584       //                                  _connectAddrlen))
 585       //        (_fds[0]) (JASSERT_ERRNO)
 586       //        .Text("Unable to connect to external process");
 587       //      break;
 588   }
 589 }
 590 
 591 void TcpConnection::sendHandshake(int remotefd,
 592                                   const ConnectionIdentifier& coordId)
 593 {
 594   jalib::JSocket remote(remotefd);
 595   ConnMsg msg(ConnMsg::HANDSHAKE);
 596   msg.from = id();
 597   msg.coordId = coordId;
 598   remote << msg;
 599 }
 600 
 601 void TcpConnection::recvHandshake(int remotefd,
 602                                   const ConnectionIdentifier& coordId)
 603 {
 604   jalib::JSocket remote(remotefd);
 605   ConnMsg msg;
 606   msg.poison();
 607   remote >> msg;
 608 
 609   msg.assertValid(ConnMsg::HANDSHAKE);
 610   JASSERT(msg.coordId == coordId) (msg.coordId) (coordId)
 611     .Text("Peer has a different dmtcp_coordinator than us!\n"
 612           "  It must be the same.");
 613 
 614   if (_remotePeerId.isNull()) {
 615     //first time
 616     _remotePeerId = msg.from;
 617     JASSERT(!_remotePeerId.isNull())
 618       .Text("Read handshake with invalid 'from' field.");
 619   } else {
 620     //next time
 621     JASSERT(_remotePeerId == msg.from)
 622       (_remotePeerId) (msg.from)
 623       .Text("Read handshake with a different 'from' field"
 624             " than a previous handshake.");
 625   }
 626 }
 627 
 628 void TcpConnection::serializeSubClass(jalib::JBinarySerializer& o)
 629 {
 630   JSERIALIZE_ASSERT_POINT("TcpConnection");
 631   o & _listenBacklog & _bindAddrlen & _bindAddr & _remotePeerId;
 632   SocketConnection::serialize(o);
 633 }
 634 
 635 /*****************************************************************************
 636  * RawSocket Connection
 637  *****************************************************************************/
 638 /*onSocket*/
 639 RawSocketConnection::RawSocketConnection(int domain, int type, int protocol)
 640   : Connection(RAW)
 641     , SocketConnection(domain, type, protocol)
 642 {
 643   JASSERT(type == -1 ||(type & SOCK_RAW));
 644   JASSERT(domain == -1 || domain == AF_NETLINK) (domain)
 645     .Text("Only Netlink raw socket supported");
 646   JTRACE("Creating Raw socket.") (id()) (domain) (type) (protocol);
 647 }
 648 
 649 void RawSocketConnection::drain()
 650 {
 651   JASSERT(_fds.size() > 0) (id());
 652 
 653   if ((_fcntlFlags & O_ASYNC) != 0) {
 654     if (really_verbose) {
 655       JTRACE("Removing O_ASYNC flag during checkpoint.") (_fds[0]) (id());
 656     }
 657     errno = 0;
 658     JASSERT(fcntl(_fds[0], F_SETFL, _fcntlFlags & ~O_ASYNC) == 0)
 659       (JASSERT_ERRNO) (_fds[0]) (id());
 660   }
 661 }
 662 
 663 void RawSocketConnection::refill(bool isRestart)
 664 {
 665   if ((_fcntlFlags & O_ASYNC) != 0) {
 666     JTRACE("Re-adding O_ASYNC flag.") (_fds[0]) (id());
 667     restoreSocketOptions(_fds);
 668   } else if (isRestart) {
 669     restoreSocketOptions(_fds);
 670   }
 671 }
 672 
 673 void RawSocketConnection::postRestart()
 674 {
 675   JASSERT(_fds.size() > 0);
 676   if (really_verbose) {
 677     JTRACE("Restoring socket.") (id()) (_fds[0]);
 678   }
 679 
 680   int sockfd = _real_socket(_sockDomain,_sockType,_sockProtocol);
 681   JASSERT(sockfd != -1);
 682   Util::dupFds(sockfd, _fds);
 683 }
 684 
 685 void RawSocketConnection::serializeSubClass(jalib::JBinarySerializer& o)
 686 {
 687   JSERIALIZE_ASSERT_POINT("RawSocketConnection");
 688   SocketConnection::serialize(o);
 689 }

/* [<][>][^][v][top][bottom][index][help] */