root/plugin/ipc/connectionlist.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. eventHook
  2. _isBadFd
  3. resetOnFork
  4. deleteStaleConnections
  5. serialize
  6. list
  7. getConnection
  8. getConnection
  9. add
  10. processCloseWork
  11. processClose
  12. processDup
  13. preLockSaveOptions
  14. preCkptFdLeaderElection
  15. drain
  16. preCkpt
  17. refill
  18. resume
  19. postRestart
  20. registerIncomingCons
  21. sendReceiveMissingFds

   1 /****************************************************************************
   2  *   Copyright (C) 2006-2013 by Jason Ansel, Kapil Arya, and Gene Cooperman *
   3  *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
   4  *                                                                          *
   5  *   This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src).  *
   6  *                                                                          *
   7  *  DMTCP:dmtcp/src is free software: you can redistribute it and/or        *
   8  *  modify it under the terms of the GNU Lesser General Public License as   *
   9  *  published by the Free Software Foundation, either version 3 of the      *
  10  *  License, or (at your option) any later version.                         *
  11  *                                                                          *
  12  *  DMTCP:dmtcp/src is distributed in the hope that it will be useful,      *
  13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
  14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
  15  *  GNU Lesser General Public License for more details.                     *
  16  *                                                                          *
  17  *  You should have received a copy of the GNU Lesser General Public        *
  18  *  License along with DMTCP:dmtcp/src.  If not, see                        *
  19  *  <http://www.gnu.org/licenses/>.                                         *
  20  ****************************************************************************/
  21 
  22 #include <fcntl.h>
  23 #include <unistd.h>
  24 #include <errno.h>
  25 #include <sys/stat.h>
  26 #include <sys/types.h>
  27 #include <sys/wait.h>
  28 
  29 #include "util.h"
  30 #include "dmtcp.h"
  31 #include "shareddata.h"
  32 #include "jfilesystem.h"
  33 #include "jconvert.h"
  34 #include "jassert.h"
  35 #include "jsocket.h"
  36 
  37 #include "util_ipc.h"
  38 #include "connection.h"
  39 #include "connectionlist.h"
  40 
  41 // Each fd may be shared or private.  If a fd is shared, this situation
  42 //  must be restored at restart time, and only one process should set
  43 //  the properties of that shared fd.  The sequence of events follows.
  44 
  45 // At the time of checkpoint, a leader election algorithm is used to decide
  46 //  on a unique process as leader, who will be responsible for restoring the
  47 //  state of a file descriptor (whether shared or not) at the time of restart.
  48 //  It is implemented through doLocking and checkLocking, in which each
  49 //  process uses fcntl() with SETOWN, and after a barrier, they check
  50 //  if they are still the owner using GETOWN.
  51 // A list of connections (fd's) is found through /proc/*/fd and saved
  52 //  in the ConnectionList object.
  53 // At the time of restart, con->hasLock() will tell a process if it is the
  54 //  the leader for that connection (con).  However, the process must still
  55 //  discover if the file descriptor is private or shared.
  56 // We define a "private fd" as an fd that is shared by exactly one process.
  57 //  This criterion is used in the implementation.
  58 // There is a list of outgoing connections (outgoingCons) and
  59 //  incoming connections (incomingCons).  These will refer to shared fd's.
  60 //  For the various processes on the same host, any shared fd corresponding
  61 //  to a connection has a 'con->hasLock()' method.  If it's true, this
  62 //  process owns the shared fd, and must send it as an outgoing connection.
  63 //  If it's false, this process does not own the shared fd, and it must
  64 //  receive it from another process.
  65 // The connections of the ConnectionList object are initially entered into
  66 //  a list of outgoing connections.  If a process sees a fd and it does not
  67 //  have the lock (if con->hasLock() == false), then the process knows that
  68 //  the fd must be shared.  So, the process declares this fd to be "incoming".
  69 //  Then registerIncomingCons() can separate the fd's into incoming and outgoing
  70 //  connections.  This is done by looking up the shared-area to see which
  71 //  connections have been declared as "incoming" by other processes.
  72 // A UNIX domain socket (called 'restoreFd' or 'protected_fd', depending
  73 //  on the function, but always deduced from protectedFd()) is used to
  74 //  send the outgoing connections, and to receive the incoming connections
  75 //  at restart time, so that the corresponding fd's can again be shared.
  76 // The owner of each outgoing connection sends the fd out, and for each
  77 //  process, if that connection is on its list of incoming connections
  78 //  (missing connections), then the fd is kept as a shared fd.
  79 
  80 using namespace dmtcp;
  81 
  82 // This is the first program after dmtcp_launch
  83 static bool freshProcess = true;
  84 
  85 ConnectionList::~ConnectionList()
  86 {
  87 }
  88 
  89 void ConnectionList::eventHook(DmtcpEvent_t event,
  90                                          DmtcpEventData_t *data)
  91 {
  92   switch (event) {
  93     case DMTCP_EVENT_INIT:
  94       // Delete stale connections if any.
  95       deleteStaleConnections();
  96       if (freshProcess) {
  97         scanForPreExisting();
  98       }
  99       break;
 100 
 101     case DMTCP_EVENT_PRE_EXEC:
 102       {
 103         jalib::JBinarySerializeWriterRaw wr("", data->serializerInfo.fd);
 104         serialize(wr);
 105       }
 106       break;
 107 
 108     case DMTCP_EVENT_POST_EXEC:
 109       {
 110         freshProcess = false;
 111         jalib::JBinarySerializeReaderRaw rd("", data->serializerInfo.fd);
 112         serialize(rd);
 113         deleteStaleConnections();
 114       }
 115       break;
 116 
 117     case DMTCP_EVENT_RESTART:
 118       postRestart();
 119 
 120       break;
 121 
 122     case DMTCP_EVENT_THREADS_SUSPEND:
 123       preLockSaveOptions();
 124       break;
 125 
 126     case DMTCP_EVENT_LEADER_ELECTION:
 127       JTRACE("locking...");
 128       preCkptFdLeaderElection();
 129       JTRACE("locked");
 130       break;
 131 
 132     case DMTCP_EVENT_DRAIN:
 133       JTRACE("draining...");
 134       drain();
 135       JTRACE("drained");
 136       break;
 137 
 138     case DMTCP_EVENT_WRITE_CKPT:
 139       JTRACE("preCkpt...");
 140       preCkpt();
 141       JTRACE("done preCkpt");
 142       break;
 143 
 144     case DMTCP_EVENT_REFILL:
 145       refill(data->refillInfo.isRestart);
 146       break;
 147 
 148     case DMTCP_EVENT_THREADS_RESUME:
 149       resume(data->resumeInfo.isRestart);
 150       break;
 151 
 152     case DMTCP_EVENT_REGISTER_NAME_SERVICE_DATA:
 153       registerNSData(data->nameserviceInfo.isRestart);
 154       break;
 155 
 156     case DMTCP_EVENT_SEND_QUERIES:
 157       sendQueries(data->nameserviceInfo.isRestart);
 158       break;
 159 
 160     default:
 161       break;
 162   }
 163 
 164   return;
 165 }
 166 
 167 static bool _isBadFd(int fd)
 168 {
 169   errno = 0;
 170   return _real_fcntl(fd, F_GETFL, 0) == -1 && errno == EBADF;
 171 }
 172 
 173 //static ConnectionList *connectionList = NULL;
 174 //ConnectionList& ConnectionList::instance()
 175 //{
 176 //  if (connectionList == NULL) {
 177 //    connectionList = new ConnectionList();
 178 //  }
 179 //  return *connectionList;
 180 //}
 181 
 182 void ConnectionList::resetOnFork()
 183 {
 184   JASSERT(pthread_mutex_destroy(&_lock) == 0) (JASSERT_ERRNO);
 185   JASSERT(pthread_mutex_init(&_lock, NULL) == 0) (JASSERT_ERRNO);
 186 }
 187 
 188 
 189 void ConnectionList::deleteStaleConnections()
 190 {
 191   //build list of stale connections
 192   vector<int> staleFds;
 193   for (FdToConMapT::iterator i = _fdToCon.begin(); i != _fdToCon.end(); ++i) {
 194     if (_isBadFd(i->first)) {
 195       staleFds.push_back(i->first);
 196     }
 197   }
 198 
 199 #ifdef DEBUG
 200   if (staleFds.size() > 0) {
 201     ostringstream out;
 202     out << "\tDevice \t\t->\t File Descriptor -> ConnectionId\n";
 203     out << "==================================================\n";
 204     for (size_t i = 0; i < staleFds.size(); ++i) {
 205       Connection *c = getConnection(staleFds[i]);
 206 
 207       out << "\t[" << jalib::XToString(staleFds[i]) << "]"
 208           << c->str()
 209           << "\t->\t" << staleFds[i]
 210           << "\t->\t" << c->id() << "\n";
 211     }
 212     out << "==================================================\n";
 213     JTRACE("Deleting Stale Connections") (out.str());
 214   }
 215 #endif
 216 
 217   //delete all the stale connections
 218   for (size_t i = 0; i < staleFds.size(); ++i) {
 219     processClose(staleFds[i]);
 220   }
 221 }
 222 
 223 void ConnectionList::serialize(jalib::JBinarySerializer& o)
 224 {
 225   JSERIALIZE_ASSERT_POINT("dmtcp-serialized-connection-table!v0.07");
 226 
 227   JSERIALIZE_ASSERT_POINT("ConnectionIdentifier:");
 228   ConnectionIdentifier::serialize(o);
 229 
 230   JSERIALIZE_ASSERT_POINT("ConnectionList:");
 231 
 232   uint32_t numCons = _connections.size();
 233   o & numCons;
 234 
 235   if (o.isWriter()) {
 236     for (iterator i=_connections.begin(); i!=_connections.end(); ++i) {
 237       ConnectionIdentifier key = i->first;
 238       Connection& con = *i->second;
 239       uint32_t type = con.conType();
 240 
 241       JSERIALIZE_ASSERT_POINT("[StartConnection]");
 242       o & key & type;
 243       con.serialize(o);
 244       JSERIALIZE_ASSERT_POINT("[EndConnection]");
 245     }
 246   } else {
 247     while (numCons-- > 0) {
 248       ConnectionIdentifier key;
 249       int type = -1;
 250       Connection* con = NULL;
 251 
 252       JSERIALIZE_ASSERT_POINT("[StartConnection]");
 253       o & key & type;
 254       con = createDummyConnection(type);
 255       JASSERT(con != NULL) (key);
 256       con->serialize(o);
 257       _connections[key] = con;
 258       const vector<int32_t>& fds = con->getFds();
 259       for (size_t i = 0; i < fds.size(); i++) {
 260         _fdToCon[fds[i]] = con;
 261       }
 262       JSERIALIZE_ASSERT_POINT("[EndConnection]");
 263     }
 264   }
 265   JSERIALIZE_ASSERT_POINT("EOF");
 266 }
 267 
 268 void ConnectionList::list()
 269 {
 270   ostringstream o;
 271   o << "\n";
 272   for (iterator i = begin(); i != end(); i++) {
 273     Connection *c = i->second;
 274     vector<int> fds = c->getFds();
 275     for (size_t j = 0; j<fds.size(); j++) {
 276       o << fds[j];
 277       if (j < fds.size() - 1)
 278         o << "," ;
 279     }
 280     o << "\t" << i->first << "\t" << c->str();
 281     o << "\n";
 282   }
 283   JTRACE("ConnectionList") (dmtcp_get_uniquepid_str()) (o.str());
 284 }
 285 
 286 Connection*
 287 ConnectionList::getConnection(const ConnectionIdentifier& id)
 288 {
 289   if (_connections.find(id) == _connections.end()) {
 290     return NULL;
 291   }
 292   return _connections[id];
 293 }
 294 
 295 Connection *ConnectionList::getConnection(int fd)
 296 {
 297   if (_fdToCon.find(fd) == _fdToCon.end()) {
 298     return NULL;
 299   }
 300   return _fdToCon[fd];
 301 }
 302 
 303 void ConnectionList::add(int fd, Connection* c)
 304 {
 305   _lock_tbl();
 306 
 307   if (_fdToCon.find(fd) != _fdToCon.end()) {
 308     /* In ordinary situations, we never exercise this path since we already
 309      * capture close() and remove the connection. However, there is one
 310      * particular case where this assumption fails -- when gblic opens a socket
 311      * using socket() but closes it using the internal close_not_cancel() thus
 312      * bypassing our close wrapper. This behavior is observed when dealing with
 313      * getaddrinfo().
 314      */
 315     processCloseWork(fd);
 316   }
 317 
 318   if( _connections.find(c->id()) == _connections.end() )
 319     _connections[c->id()] = c;
 320   c->addFd(fd);
 321   _fdToCon[fd] = c;
 322   _unlock_tbl();
 323 }
 324 
 325 void ConnectionList::processCloseWork(int fd)
 326 {
 327   Connection *con = _fdToCon[fd];
 328   _fdToCon.erase(fd);
 329   con->removeFd(fd);
 330   if (con->numFds() == 0) {
 331     _connections.erase(con->id());
 332     delete con;
 333   }
 334 }
 335 
 336 void ConnectionList::processClose(int fd)
 337 {
 338   if (_fdToCon.find(fd) != _fdToCon.end()) {
 339     _lock_tbl();
 340     processCloseWork(fd);
 341     _unlock_tbl();
 342   }
 343 }
 344 
 345 void ConnectionList::processDup(int oldfd, int newfd)
 346 {
 347   if (oldfd == newfd) return;
 348   if (_fdToCon.find(newfd) != _fdToCon.end()) {
 349     processClose(newfd);
 350   }
 351 
 352   // Add only if the oldfd was already in the _fdToCon table.
 353   if (_fdToCon.find(oldfd) != _fdToCon.end()) {
 354     _lock_tbl();
 355     Connection *con = _fdToCon[oldfd];
 356     _fdToCon[newfd] = con;
 357     con->addFd(newfd);
 358     _unlock_tbl();
 359   }
 360 }
 361 
 362 /*****************************************************/
 363 /*****************************************************/
 364 /*****************************************************/
 365 /*****************************************************/
 366 /*****************************************************/
 367 /*****************************************************/
 368 /*****************************************************/
 369 
 370 void ConnectionList::preLockSaveOptions()
 371 {
 372   deleteStaleConnections();
 373   list();
 374   // Save Options for each Fd (We need to do it here instead of in
 375   // preCkptFdLeaderElection because we want to restore the correct owner
 376   // in refill).
 377   for (iterator i = begin(); i != end(); ++i) {
 378     Connection *con = i->second;
 379     con->saveOptions();
 380   }
 381 }
 382 
 383 void ConnectionList::preCkptFdLeaderElection()
 384 {
 385   deleteStaleConnections();
 386   for (iterator i = begin(); i != end(); ++i) {
 387     Connection *con = i->second;
 388     JASSERT(con->numFds() > 0);
 389     con->doLocking();
 390   }
 391 }
 392 
 393 void ConnectionList::drain()
 394 {
 395   for (iterator i = begin(); i != end(); ++i) {
 396     Connection* con =  i->second;
 397     con->checkLocking();
 398     if (con->hasLock()) {
 399       con->drain();
 400     }
 401   }
 402 }
 403 
 404 void ConnectionList::preCkpt()
 405 {
 406   for (iterator i = begin(); i != end(); ++i) {
 407     Connection* con =  i->second;
 408     if (con->hasLock()) {
 409       con->preCkpt();
 410     }
 411   }
 412 }
 413 
 414 void ConnectionList::refill(bool isRestart)
 415 {
 416   for (iterator i = begin(); i != end(); ++i) {
 417     Connection *con = i->second;
 418     if (con->hasLock()) {
 419       con->refill(isRestart);
 420       con->restoreOptions();
 421     }
 422   }
 423   if (isRestart) {
 424     JTRACE("Waiting for Missing Cons");
 425     sendReceiveMissingFds();
 426     JTRACE("Done waiting for Missing Cons");
 427   }
 428 }
 429 
 430 void ConnectionList::resume(bool isRestart)
 431 {
 432   for (iterator i = begin(); i != end(); ++i) {
 433     Connection *con = i->second;
 434     if (con->hasLock()) {
 435       con->resume(isRestart);
 436     }
 437   }
 438 }
 439 
 440 void ConnectionList::postRestart()
 441 {
 442   // Here we modify the restore algorithm by splitting it into two parts. In the
 443   // first part we restore all the connections except the PTY_SLAVE types and
 444   // in the second part we restore only PTY_SLAVE _connections. This is done to
 445   // make sure that by the time we are trying to restore a PTY_SLAVE
 446   // connection, its corresponding PTY_MASTER connection has already been
 447   // restored.
 448   // UPDATE: We also restore the files for which the we didn't have the lock in
 449   //         second iteration along with PTY_SLAVEs
 450   // Part 1: Restore all but Pseudo-terminal slaves and file connection which
 451   //         were not checkpointed
 452   for (iterator i = begin(); i != end(); ++i) {
 453     Connection *con = i->second;
 454     if (!con->hasLock()) continue;
 455 
 456 // TODO: FIXME: Add support for Socketpairs.
 457 //    if (con->conType() == Connection::TCP) {
 458 //      TcpConnection *tcpCon =(TcpConnection *) con;
 459 //      if (tcpCon->peerType() == TcpConnection::PEER_SOCKETPAIR) {
 460 //        ConnectionIdentifier peerId = tcpCon->getSocketpairPeerId();
 461 //        TcpConnection *peerCon = (TcpConnection*) getConnection(peerId);
 462 //        if (peerCon != NULL) {
 463 //          tcpCon->restoreSocketPair(peerCon);
 464 //          continue;
 465 //        }
 466 //      }
 467 //    }
 468     con->postRestart();
 469   }
 470 
 471   registerIncomingCons();
 472 }
 473 
 474 
 475 void ConnectionList::registerIncomingCons()
 476 {
 477   int protected_fd = protectedFd();
 478   // Add receive-fd data socket.
 479   static struct sockaddr_un fdReceiveAddr;
 480   static socklen_t         fdReceiveAddrLen;
 481 
 482   memset(&fdReceiveAddr, 0, sizeof(fdReceiveAddr));
 483   jalib::JSocket sock(_real_socket(AF_UNIX, SOCK_DGRAM, 0));
 484   JASSERT(sock.isValid());
 485   sock.changeFd(protected_fd);
 486   fdReceiveAddr.sun_family = AF_UNIX;
 487   JASSERT(_real_bind(protected_fd,
 488                      (struct sockaddr*) &fdReceiveAddr,
 489                      sizeof(fdReceiveAddr.sun_family)) == 0) (JASSERT_ERRNO);
 490 
 491   fdReceiveAddrLen = sizeof(fdReceiveAddr);
 492   JASSERT(getsockname(protected_fd,
 493                       (struct sockaddr *)&fdReceiveAddr,
 494                       &fdReceiveAddrLen) == 0);
 495 
 496 
 497   vector<const char *> incomingCons;
 498   ostringstream in, out;
 499   for (iterator i = begin(); i != end(); ++i) {
 500     Connection *con = i->second;
 501     // Check comments in FileConnList::postRestart() for the explanation
 502     // about isPreExistingCTTY.
 503     if (!con->hasLock() && !con->isStdio() && !con->isPreExistingCTTY()) {
 504       incomingCons.push_back((const char*)&i->first);
 505       in << "\n\t" << con->str() << i->first;
 506     } else {
 507       out << "\n\t" << con->str() << i->first;
 508     }
 509   }
 510   JTRACE("Incoming/Outgoing Cons") (in.str()) (out.str());
 511   numIncomingCons = incomingCons.size();
 512   if (numIncomingCons > 0) {
 513     SharedData::registerIncomingCons(incomingCons, fdReceiveAddr,
 514                                     fdReceiveAddrLen);
 515   }
 516 }
 517 
 518 void ConnectionList::sendReceiveMissingFds()
 519 {
 520   size_t i;
 521   vector<int> outgoingCons;
 522   SharedData::IncomingConMap *maps;
 523   uint32_t nmaps;
 524   SharedData::getMissingConMaps(&maps, &nmaps);
 525   for (i = 0; i < nmaps; i++) {
 526     ConnectionIdentifier *id = (ConnectionIdentifier*) maps[i].id;
 527     Connection *con = getConnection(*id);
 528     if (con != NULL && con->hasLock()) {
 529       outgoingCons.push_back(i);
 530     }
 531   }
 532 
 533   fd_set rfds;
 534   fd_set wfds;
 535   int restoreFd = protectedFd();
 536   size_t numOutgoingCons = outgoingCons.size();
 537   while (numOutgoingCons > 0 || numIncomingCons > 0) {
 538     FD_ZERO(&wfds);
 539     if (outgoingCons.size() > 0) {
 540       FD_SET(restoreFd, &wfds);
 541     }
 542     FD_ZERO(&rfds);
 543     if (numIncomingCons > 0) {
 544       FD_SET(restoreFd, &rfds);
 545     }
 546 
 547     int ret = _real_select(restoreFd+1, &rfds, &wfds, NULL, NULL);
 548     JASSERT(ret != -1) (JASSERT_ERRNO);
 549 
 550     if (numOutgoingCons > 0 && FD_ISSET(restoreFd, &wfds)) {
 551       size_t idx = outgoingCons.back();
 552       outgoingCons.pop_back();
 553       ConnectionIdentifier *id = (ConnectionIdentifier*) maps[idx].id;
 554       Connection *con = getConnection(*id);
 555       JTRACE("Sending Missing Con") (*id);
 556       JASSERT(Util::sendFd(restoreFd, con->getFds()[0], id, sizeof(*id),
 557                            maps[idx].addr, maps[idx].len) != -1);
 558       numOutgoingCons--;
 559     }
 560 
 561     if (numIncomingCons > 0 && FD_ISSET(restoreFd, &rfds)) {
 562       ConnectionIdentifier id;
 563       int fd = Util::receiveFd(restoreFd, &id, sizeof(id));
 564       JASSERT(fd != -1);
 565       Connection *con = getConnection(id);
 566       JTRACE("Received Missing Con") (id);
 567       JASSERT(con != NULL);
 568       Util::dupFds(fd, con->getFds());
 569       numIncomingCons--;
 570     }
 571   }
 572   dmtcp_close_protected_fd(restoreFd);
 573 }

/* [<][>][^][v][top][bottom][index][help] */