root/plugin/ipc/socket/socketconnlist.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. dmtcp_SocketConnList_EventHook
  2. dmtcp_SocketConn_ProcessFdEvent
  3. instance
  4. drain
  5. preCkpt
  6. postRestart
  7. registerNSData
  8. sendQueries
  9. refill
  10. scanForPreExisting
  11. createDummyConnection

   1 
   2 #include <unistd.h>
   3 #include <sys/syscall.h>
   4 
   5 #include "util.h"
   6 #include "protectedfds.h"
   7 #include "jfilesystem.h"
   8 #include "socketconnection.h"
   9 #include "socketconnlist.h"
  10 #include "kernelbufferdrainer.h"
  11 #include "connectionrewirer.h"
  12 
  13 using namespace dmtcp;
  14 static bool _hasIPv4Sock = false;
  15 static bool _hasIPv6Sock = false;
  16 static bool _hasUNIXSock = false;
  17 
  18 void dmtcp_SocketConnList_EventHook(DmtcpEvent_t event, DmtcpEventData_t *data)
  19 {
  20   SocketConnList::instance().eventHook(event, data);
  21 }
  22 
  23 void dmtcp_SocketConn_ProcessFdEvent(int event, int arg1, int arg2)
  24 {
  25   if (event == SYS_close) {
  26     SocketConnList::instance().processClose(arg1);
  27   } else if (event == SYS_dup) {
  28     SocketConnList::instance().processDup(arg1, arg2);
  29   } else {
  30     JASSERT(false);
  31   }
  32 }
  33 
  34 static SocketConnList *socketConnList = NULL;
  35 SocketConnList& SocketConnList::instance()
  36 {
  37   if (socketConnList == NULL) {
  38     socketConnList = new SocketConnList();
  39   }
  40   return *socketConnList;
  41 }
  42 
  43 void SocketConnList::drain()
  44 {
  45   // First, let all the Connection prepare for drain
  46   ConnectionList::drain();
  47 
  48   //this will block until draining is complete
  49   KernelBufferDrainer::instance().monitorSockets(DRAINER_CHECK_FREQ);
  50   //handle disconnected sockets
  51   const map<ConnectionIdentifier, vector<char> >& discn =
  52     KernelBufferDrainer::instance().getDisconnectedSockets();
  53   map<ConnectionIdentifier, vector<char> >::const_iterator it;
  54   for (it = discn.begin(); it != discn.end(); it++) {
  55     const ConnectionIdentifier& id = it->first;
  56     TcpConnection *con =
  57       (TcpConnection*) SocketConnList::instance().getConnection(id);
  58     JTRACE("recreating disconnected socket") (id);
  59 
  60     //reading from the socket, and taking the error, resulted in an
  61     //implicit close().
  62     //we will create a new, broken socket that is not closed
  63     con->onError();
  64   }
  65 }
  66 
  67 void SocketConnList::preCkpt()
  68 {
  69 #if HANDSHAKE_ON_CHECKPOINT == 1
  70   //handshake is done after one barrier after drain
  71   JTRACE("beginning handshakes");
  72   DmtcpUniqueProcessId coordId = dmtcp_get_coord_id();
  73   //must send first to avoid deadlock
  74   //we are relying on OS buffers holding our message without blocking
  75   for (iterator i = begin(); i != end(); ++i) {
  76     Connection *con = i->second;
  77     if (con->hasLock() && con->conType() == Connection::TCP) {
  78       ((TcpConnection*)con)->doSendHandshakes(coordId);
  79     }
  80   }
  81 
  82   //now receive
  83   for (iterator i = begin(); i != end(); ++i) {
  84     Connection *con = i->second;
  85     if (con->hasLock() && con->conType() == Connection::TCP) {
  86       ((TcpConnection*)con)->doRecvHandshakes(coordId);
  87     }
  88   }
  89   JTRACE("handshaking done");
  90 #endif
  91   _hasIPv4Sock = _hasIPv6Sock = _hasUNIXSock = false;
  92   // Now check if we have IPv4, IPv6, or UNIX domain sockets to restore.
  93   for (iterator i = begin(); i != end(); ++i) {
  94     Connection *con = i->second;
  95     if (con->hasLock() && con->conType() == Connection::TCP) {
  96       int domain = ((TcpConnection*)con)->sockDomain();
  97       if (domain == AF_INET) {
  98         _hasIPv4Sock = true;
  99       } else if (domain == AF_INET6) {
 100         _hasIPv6Sock = true;
 101       } else if (domain == AF_UNIX) {
 102         _hasUNIXSock = true;
 103       }
 104     }
 105   }
 106 }
 107 
 108 void SocketConnList::postRestart()
 109 {
 110   ConnectionRewirer::instance().openRestoreSocket(_hasIPv4Sock, _hasIPv6Sock,
 111                                                   _hasUNIXSock);
 112   ConnectionList::postRestart();
 113 }
 114 
 115 void SocketConnList::registerNSData(bool isRestart)
 116 {
 117   if (isRestart) {
 118     ConnectionRewirer::instance().registerNSData();
 119   }
 120   ConnectionList::registerNSData(isRestart);
 121 }
 122 
 123 void SocketConnList::sendQueries(bool isRestart)
 124 {
 125   if (isRestart) {
 126     ConnectionRewirer::instance().sendQueries();
 127     ConnectionRewirer::instance().doReconnect();
 128     ConnectionRewirer::destroy();
 129   }
 130   ConnectionList::sendQueries(isRestart);
 131 }
 132 
 133 void SocketConnList::refill(bool isRestart)
 134 {
 135   KernelBufferDrainer::instance().refillAllSockets();
 136   ConnectionList::refill(isRestart);
 137 }
 138 
 139 void SocketConnList::scanForPreExisting()
 140 {
 141   // TODO: This is a hack when SLURM + MPI are used:
 142   // when we use command
 143   // srun/ibrun dmtcp_launch a.out
 144   // inside the SLURM submission script, the MPI launching
 145   // process will not run under the control of DMTCP. Instead,
 146   // only the computing processes are. The launching process
 147   // will create some sockets, and then create the computing
 148   // processes. Hence the sockets are shared among the created
 149   // processes at the time when dmtcp_launch is launched. DMTCP
 150   // will treat these sockets as pre-existing sockets instead of
 151   // shared sockets.
 152   //
 153   // In the future, we should generalize the processing of
 154   // pre-existing fds. For example, at checkpoint time, determine
 155   // which sockets are shared, regardless of whether they are
 156   // pre-existing or not. This can be done by adding an extra round
 157   // of leader election.
 158 
 159   if (getenv("SLURM_JOBID") || (getenv("SLURM_JOB_ID"))) {
 160     return;
 161   }
 162 
 163   // FIXME: Detect stdin/out/err fds to detect duplicates.
 164   vector<int> fds = jalib::Filesystem::ListOpenFds();
 165   for (size_t i = 0; i < fds.size(); ++i) {
 166     int fd = fds[i];
 167     if (!Util::isValidFd(fd)) continue;
 168     if (dmtcp_is_protected_fd(fd)) continue;
 169 
 170     string device = jalib::Filesystem::GetDeviceName(fd);
 171 
 172     JTRACE("scanning pre-existing device") (fd) (device);
 173     if (device == jalib::Filesystem::GetControllingTerm()) {
 174     } else if(dmtcp_is_bq_file && dmtcp_is_bq_file(device.c_str())) {
 175     } else if( fd <= 2 ){
 176     } else if (Util::strStartsWith(device, "/")) {
 177     } else {
 178       JNOTE("found pre-existing socket... will not be restored")
 179         (fd) (device);
 180       TcpConnection* con = new TcpConnection(0, 0, 0);
 181       con->markPreExisting();
 182       add(fd, con);
 183     }
 184   }
 185 }
 186 
 187 Connection *SocketConnList::createDummyConnection(int type)
 188 {
 189   if (type == Connection::TCP) {
 190     return new TcpConnection();
 191   } else if (type == Connection::RAW){
 192     return new RawSocketConnection();
 193   }
 194   return NULL;
 195 }

/* [<][>][^][v][top][bottom][index][help] */