root/coordinatorapi.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. dmtcp_CoordinatorAPI_EventHook
  2. restart
  3. getCkptInterval
  4. createNewSocketToCoordinator
  5. instance
  6. init
  7. resetOnFork
  8. setupVirtualCoordinator
  9. waitForCheckpointCommand
  10. noCoordinator
  11. connectAndSendUserCommand
  12. getCoordCkptDir
  13. updateCoordCkptDir
  14. sendMsgToCoordinator
  15. recvMsgFromCoordinator
  16. startNewCoordinator
  17. createNewConnToCoord
  18. sendRecvHandshake
  19. connectToCoordOnStartup
  20. createNewConnectionBeforeFork
  21. connectToCoordOnRestart
  22. sendCkptFilename
  23. sendKeyValPairToCoordinator
  24. sendQueryToCoordinator

   1 /****************************************************************************
   2  *   Copyright (C) 2006-2013 by Jason Ansel, Kapil Arya, and Gene Cooperman *
   3  *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
   4  *                                                                          *
   5  *  This file is part of DMTCP.                                             *
   6  *                                                                          *
   7  *  DMTCP is free software: you can redistribute it and/or                  *
   8  *  modify it under the terms of the GNU Lesser General Public License as   *
   9  *  published by the Free Software Foundation, either version 3 of the      *
  10  *  License, or (at your option) any later version.                         *
  11  *                                                                          *
  12  *  DMTCP is distributed in the hope that it will be useful,                *
  13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
  14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
  15  *  GNU Lesser General Public License for more details.                     *
  16  *                                                                          *
  17  *  You should have received a copy of the GNU Lesser General Public        *
  18  *  License along with DMTCP:dmtcp/src.  If not, see                        *
  19  *  <http://www.gnu.org/licenses/>.                                         *
  20  ****************************************************************************/
  21 
  22 // CAN REMOVE BOOL enableCheckpointing ARG OF DmtcpWorker WHEN WE'RE DONE.
  23 // DmtcpWorker CAN INHERIT THIS CLASS, CoordinatorAPI
  24 
  25 #include <netdb.h>
  26 #include <arpa/inet.h>
  27 #include "coordinatorapi.h"
  28 #include "dmtcp.h"
  29 #include "util.h"
  30 #include "syscallwrappers.h"
  31 #include "util.h"
  32 #include "shareddata.h"
  33 #include "processinfo.h"
  34 #include  "../jalib/jconvert.h"
  35 #include  "../jalib/jfilesystem.h"
  36 #include <fcntl.h>
  37 
  38 using namespace dmtcp;
  39 
  40 void dmtcp_CoordinatorAPI_EventHook(DmtcpEvent_t event, DmtcpEventData_t *data)
  41 {
  42   if (CoordinatorAPI::noCoordinator()) return;
  43   switch (event) {
  44     case DMTCP_EVENT_INIT:
  45       CoordinatorAPI::instance().init();
  46       break;
  47 
  48     case DMTCP_EVENT_THREADS_SUSPEND:
  49       JASSERT(CoordinatorAPI::instance().isValid());
  50       break;
  51 
  52     case DMTCP_EVENT_RESTART:
  53       CoordinatorAPI::restart();
  54       break;
  55 
  56     case DMTCP_EVENT_RESUME:
  57       CoordinatorAPI::instance().sendCkptFilename();
  58       break;
  59 
  60     case DMTCP_EVENT_EXIT:
  61       JTRACE("exit() in progress, disconnecting from dmtcp coordinator");
  62       CoordinatorAPI::instance().closeConnection();
  63       break;
  64 
  65     default:
  66       break;
  67   }
  68 }
  69 
  70 void CoordinatorAPI::restart()
  71 {
  72   instance()._nsSock.close();
  73 }
  74 
  75 static uint32_t getCkptInterval()
  76 {
  77   uint32_t ret = DMTCPMESSAGE_SAME_CKPT_INTERVAL;
  78   const char* interval = getenv (ENV_VAR_CKPT_INTR);
  79   /* DmtcpMessage constructor default:
  80    *   hello_local.theCheckpointInterval: DMTCPMESSAGE_SAME_CKPT_INTERVAL
  81    */
  82   if (interval != NULL) {
  83     ret = jalib::StringToInt (interval);
  84   }
  85   // Tell the coordinator the ckpt interval only once.  It can change later.
  86   _dmtcp_unsetenv (ENV_VAR_CKPT_INTR);
  87   return ret;
  88 }
  89 
  90 static jalib::JSocket createNewSocketToCoordinator(CoordinatorMode mode)
  91 {
  92   const char*host = NULL;
  93   int port = UNINITIALIZED_PORT;
  94 
  95   Util::getCoordHostAndPort(COORD_ANY, &host, &port);
  96   return jalib::JClientSocket(host, port);
  97 }
  98 
  99 //CoordinatorAPI::CoordinatorAPI (int sockfd)
 100   //: _coordinatorSocket(sockfd)
 101 //{ }
 102 
 103 static CoordinatorAPI *coordAPIInst = NULL;
 104 CoordinatorAPI& CoordinatorAPI::instance()
 105 {
 106   //static SysVIPC *inst = new SysVIPC(); return *inst;
 107   if (coordAPIInst == NULL) {
 108     coordAPIInst = new CoordinatorAPI();
 109     if (noCoordinator()) {
 110       coordAPIInst->_coordinatorSocket = jalib::JSocket(PROTECTED_COORD_FD);
 111     }
 112   }
 113   return *coordAPIInst;
 114 }
 115 
 116 void CoordinatorAPI::init()
 117 {
 118   JTRACE("Informing coordinator of new process") (UniquePid::ThisProcess());
 119 
 120   DmtcpMessage msg (DMT_UPDATE_PROCESS_INFO_AFTER_INIT_OR_EXEC);
 121   string progname = jalib::Filesystem::GetProgramName();
 122   msg.extraBytes = progname.length() + 1;
 123 
 124   JASSERT(Util::isValidFd(PROTECTED_COORD_FD));
 125   instance()._coordinatorSocket = jalib::JSocket(PROTECTED_COORD_FD);
 126   instance()._coordinatorSocket << msg;
 127   instance()._coordinatorSocket.writeAll(progname.c_str(),
 128                                          progname.length() + 1);
 129   // The coordinator won't send any msg in response to DMT_UPDATE... so no need
 130   // to call recvCoordinatorHandshake().
 131 }
 132 
 133 void CoordinatorAPI::resetOnFork(CoordinatorAPI& coordAPI)
 134 {
 135   JASSERT(coordAPI._coordinatorSocket.isValid());
 136   JASSERT(coordAPI._coordinatorSocket.sockfd() != PROTECTED_COORD_FD);
 137   instance() = coordAPI;
 138   instance()._coordinatorSocket.changeFd(PROTECTED_COORD_FD);
 139 
 140   JTRACE("Informing coordinator of new process") (UniquePid::ThisProcess());
 141 
 142   DmtcpMessage msg (DMT_UPDATE_PROCESS_INFO_AFTER_FORK);
 143   if (dmtcp_virtual_to_real_pid) {
 144     msg.realPid = dmtcp_virtual_to_real_pid(getpid());
 145   } else {
 146     msg.realPid = getpid();
 147   }
 148   instance()._coordinatorSocket << msg;
 149   // The coordinator won't send any msg in response to DMT_UPDATE... so no need
 150   // to call recvCoordinatorHandshake().
 151   instance()._nsSock.close();
 152 }
 153 
 154 // FIXME:  Does "virtual coordinator" mean coordinator built into the
 155 //         the current process (no separate process?)
 156 void CoordinatorAPI::setupVirtualCoordinator(CoordinatorInfo *coordInfo,
 157                                              struct in_addr  *localIP)
 158 {
 159   const char *host = NULL;
 160   int port;
 161   Util::getCoordHostAndPort(COORD_NONE, &host, &port);
 162   _coordinatorSocket = jalib::JServerSocket(jalib::JSockAddr::ANY, port);
 163   JASSERT(_coordinatorSocket.isValid()) (port) (JASSERT_ERRNO)
 164     .Text("Failed to create listen socket.");
 165   _coordinatorSocket.changeFd(PROTECTED_COORD_FD);
 166   Util::setCoordPort(_coordinatorSocket.port());
 167 
 168   pid_t ppid = getppid();
 169   Util::setVirtualPidEnvVar(INITIAL_VIRTUAL_PID, ppid, ppid);
 170 
 171   UniquePid coordId = UniquePid(INITIAL_VIRTUAL_PID,
 172                                 UniquePid::ThisProcess().hostid(),
 173                                 UniquePid::ThisProcess().time());
 174 
 175   coordInfo->id = coordId.upid();
 176   coordInfo->timeStamp = coordId.time();
 177   coordInfo->addrLen = 0;
 178   if (getenv(ENV_VAR_CKPT_INTR) != NULL) {
 179     coordInfo->interval = (uint32_t) strtol(getenv(ENV_VAR_CKPT_INTR), NULL, 0);
 180   } else {
 181     coordInfo->interval = 0;
 182   }
 183   memset(&coordInfo->addr, 0, sizeof(coordInfo->addr));
 184   memset(localIP, 0, sizeof(*localIP));
 185 }
 186 
 187 void CoordinatorAPI::waitForCheckpointCommand()
 188 {
 189   uint32_t ckptInterval = SharedData::getCkptInterval();
 190   struct timeval tmptime={0,0};
 191   long remaining = ckptInterval;
 192   do {
 193     fd_set rfds;
 194     struct timeval *timeout = NULL;
 195     struct timeval start;
 196     if (ckptInterval > 0) {
 197       timeout = &tmptime;
 198       timeout->tv_sec = remaining;
 199       JASSERT(gettimeofday(&start, NULL) == 0) (JASSERT_ERRNO);
 200     }
 201     FD_ZERO(&rfds);
 202     FD_SET(PROTECTED_COORD_FD, &rfds );
 203     int retval = select(PROTECTED_COORD_FD+1, &rfds, NULL, NULL, timeout);
 204     if (retval == 0) { // timeout expired, time for checkpoint
 205       JTRACE("Timeout expired, checkpointing now.");
 206       return;
 207     } else if (retval > 0) {
 208       JASSERT(FD_ISSET(PROTECTED_COORD_FD, &rfds));
 209       JTRACE("Connect request on virtual coordinator socket.");
 210       break;
 211     }
 212     JASSERT(errno == EINTR) (JASSERT_ERRNO); /* EINTR: a signal was caught */
 213     if (ckptInterval > 0) {
 214       struct timeval end;
 215       JASSERT(gettimeofday(&end, NULL) == 0) (JASSERT_ERRNO);
 216       remaining -= end.tv_sec - start.tv_sec;
 217       // If the remaining time is negative, we can checkpoint now
 218       if (remaining < 0) {
 219         return;
 220       }
 221     }
 222   } while (remaining > 0);
 223 
 224   jalib::JSocket cmdSock(-1);
 225   DmtcpMessage msg;
 226   DmtcpMessage reply(DMT_USER_CMD_RESULT);
 227   do {
 228     cmdSock.close();
 229     cmdSock = _coordinatorSocket.accept();
 230     msg.poison();
 231     JTRACE("Reading from incoming connection...");
 232     cmdSock >> msg;
 233   } while (!cmdSock.isValid());
 234 
 235   JASSERT(msg.type == DMT_USER_CMD) (msg.type)
 236     .Text("Unexpected connection.");
 237 
 238   reply.coordCmdStatus = CoordCmdStatus::NOERROR;
 239 
 240   bool exitWhenDone = false;
 241   switch (msg.coordCmd) {
 242 //    case 'b': case 'B':  // prefix blocking command, prior to checkpoint command
 243 //      JTRACE("blocking checkpoint beginning...");
 244 //      blockUntilDone = true;
 245 //      break;
 246     case 's': case 'S':
 247       JTRACE("Received status command");
 248       reply.numPeers = 1;
 249       reply.isRunning = 1;
 250       break;
 251     case 'c': case 'C':
 252       JTRACE("checkpointing...");
 253       break;
 254     case 'k': case 'K':
 255     case 'q': case 'Q':
 256       JTRACE("Received KILL command from user, exiting");
 257       exitWhenDone = true;
 258       break;
 259     default:
 260       JTRACE("unhandled user command") (msg.coordCmd);
 261       reply.coordCmdStatus = CoordCmdStatus::ERROR_INVALID_COMMAND;
 262   }
 263   cmdSock << reply;
 264   cmdSock.close();
 265   if (exitWhenDone) {
 266     _real_exit(0);
 267   }
 268   return;
 269 }
 270 
 271 bool CoordinatorAPI::noCoordinator()
 272 {
 273   static int virtualCoordinator = -1;
 274   if (virtualCoordinator == -1) {
 275     int optVal = -1;
 276     socklen_t optLen = sizeof(optVal);
 277     int ret = _real_getsockopt(PROTECTED_COORD_FD, SOL_SOCKET,
 278                                SO_ACCEPTCONN, &optVal, &optLen);
 279     if (ret == 0 && optVal == 1) {
 280       virtualCoordinator = 1;
 281     } else {
 282       virtualCoordinator = 0;
 283     }
 284   }
 285   return virtualCoordinator;
 286 }
 287 
 288 void CoordinatorAPI::connectAndSendUserCommand(char c,
 289                                                int *coordCmdStatus,
 290                                                int *numPeers,
 291                                                int *isRunning,
 292                                                int *ckptInterval)
 293 {
 294   _coordinatorSocket = createNewSocketToCoordinator(COORD_ANY);
 295   if (!_coordinatorSocket.isValid()) {
 296     *coordCmdStatus = CoordCmdStatus::ERROR_COORDINATOR_NOT_FOUND;
 297     return;
 298   }
 299 
 300   //tell the coordinator to run given user command
 301   DmtcpMessage msg, reply;
 302 
 303   //send
 304   msg.type = DMT_USER_CMD;
 305   msg.coordCmd = c;
 306 
 307   if (c == 'i') {
 308     const char* interval = getenv (ENV_VAR_CKPT_INTR);
 309     if (interval != NULL){
 310       msg.theCheckpointInterval = jalib::StringToInt (interval);
 311     }
 312   }
 313 
 314   _coordinatorSocket << msg;
 315 
 316   //the coordinator will violently close our socket...
 317   if (c=='q' || c=='Q') {
 318     *coordCmdStatus = CoordCmdStatus::NOERROR;
 319     return;
 320   }
 321 
 322   //receive REPLY
 323   reply.poison();
 324   _coordinatorSocket >> reply;
 325   reply.assertValid();
 326   JASSERT(reply.type == DMT_USER_CMD_RESULT);
 327 
 328   if (coordCmdStatus != NULL) {
 329     *coordCmdStatus =  reply.coordCmdStatus;
 330   }
 331   if (numPeers != NULL) {
 332     *numPeers =  reply.numPeers;
 333   }
 334   if (isRunning != NULL) {
 335     *isRunning = reply.isRunning;
 336   }
 337   if (ckptInterval != NULL) {
 338     *ckptInterval = reply.theCheckpointInterval;
 339   }
 340 
 341   _coordinatorSocket.close();
 342 }
 343 
 344 string CoordinatorAPI::getCoordCkptDir(void)
 345 {
 346   // FIXME: Add a test for make-check.
 347   char buf[PATH_MAX];
 348   if (noCoordinator()) return "";
 349   DmtcpMessage msg(DMT_GET_CKPT_DIR);
 350   _coordinatorSocket << msg;
 351 
 352   msg.poison();
 353   _coordinatorSocket >> msg;
 354   msg.assertValid();
 355   JASSERT(msg.type == DMT_GET_CKPT_DIR_RESULT) (msg.type);
 356 
 357   JASSERT(msg.extraBytes > 0);
 358   _coordinatorSocket.readAll(buf, msg.extraBytes);
 359   return buf;
 360 }
 361 
 362 void CoordinatorAPI::updateCoordCkptDir(const char *dir)
 363 {
 364   if (noCoordinator()) return;
 365   JASSERT(dir != NULL);
 366   DmtcpMessage msg(DMT_UPDATE_CKPT_DIR);
 367   msg.extraBytes = strlen(dir) + 1;
 368   _coordinatorSocket << msg;
 369   _coordinatorSocket.writeAll(dir, strlen(dir) + 1);
 370 }
 371 
 372 void CoordinatorAPI::sendMsgToCoordinator(const DmtcpMessage &msg,
 373                                           const void *extraData,
 374                                           size_t len)
 375 {
 376   if (noCoordinator()) return;
 377   _coordinatorSocket << msg;
 378   if (msg.extraBytes > 0) {
 379     JASSERT(extraData != NULL);
 380     JASSERT(len == msg.extraBytes);
 381     _coordinatorSocket.writeAll((const char *)extraData, msg.extraBytes);
 382   }
 383 }
 384 
 385 void CoordinatorAPI::recvMsgFromCoordinator(DmtcpMessage *msg, void **extraData)
 386 {
 387   JASSERT(!noCoordinator());
 388   msg->poison();
 389   _coordinatorSocket >> (*msg);
 390 
 391   if (extraData != NULL) {
 392     msg->assertValid();
 393     JASSERT(msg->extraBytes > 0);
 394     // Caller must free this buffer
 395     void *buf = JALLOC_HELPER_MALLOC(msg->extraBytes);
 396     _coordinatorSocket.readAll((char*)buf, msg->extraBytes);
 397     JASSERT(extraData != NULL);
 398     *extraData = buf;
 399   }
 400 }
 401 
 402 void CoordinatorAPI::startNewCoordinator(CoordinatorMode mode)
 403 {
 404   const char *host;
 405   int port;
 406   Util::getCoordHostAndPort(mode, &host, &port);
 407 
 408   JASSERT(strcmp(host, "localhost") == 0 ||
 409           strcmp(host, "127.0.0.1") == 0 ||
 410           jalib::Filesystem::GetCurrentHostname() == host)
 411     (host) (jalib::Filesystem::GetCurrentHostname())
 412     .Text("Won't automatically start coordinator because DMTCP_HOST"
 413           " is set to a remote host.");
 414   // Create a socket and bind it to an unused port.
 415   errno = 0;
 416   jalib::JServerSocket coordinatorListenerSocket(jalib::JSockAddr::ANY,
 417                                                  port, 128);
 418   JASSERT(coordinatorListenerSocket.isValid())
 419     (coordinatorListenerSocket.port()) (JASSERT_ERRNO)
 420     .Text("Failed to create listen socket."
 421           "\nIf msg is \"Address already in use\", this may be an old coordinator."
 422           "\nKill other coordinators and try again in a minute or so.");
 423   // Now dup the sockfd to
 424   coordinatorListenerSocket.changeFd(PROTECTED_COORD_FD);
 425   Util::setCoordPort(coordinatorListenerSocket.port());
 426 
 427   JTRACE("Starting a new coordinator automatically.")
 428         (coordinatorListenerSocket.port());
 429 
 430   if (fork() == 0) {
 431     // We can't use Util::getPath() here since the SharedData has not been
 432     // initialized yet.
 433     string coordinator =
 434       jalib::Filesystem::GetProgramDir() + "/dmtcp_coordinator";
 435 
 436     char *modeStr = (char *)"--daemon";
 437     char * args[] = {
 438       (char*)coordinator.c_str(),
 439       (char*)"--exit-on-last",
 440       modeStr,
 441       NULL
 442     };
 443     execv(args[0], args);
 444     JASSERT(false)(coordinator)(JASSERT_ERRNO) .Text("exec(dmtcp_coordinator) failed");
 445   } else {
 446     _real_close (PROTECTED_COORD_FD);
 447   }
 448 
 449   int status;
 450   JASSERT(wait(&status) > 0) (JASSERT_ERRNO);
 451 }
 452 
 453 void CoordinatorAPI::createNewConnToCoord(CoordinatorMode mode)
 454 {
 455   if (mode & COORD_JOIN) {
 456     _coordinatorSocket = createNewSocketToCoordinator(mode);
 457     JASSERT(_coordinatorSocket.isValid()) (JASSERT_ERRNO)
 458       .Text("Coordinator not found, but --join was specified. Exiting.");
 459   } else if (mode & COORD_NEW) {
 460     startNewCoordinator(mode);
 461     _coordinatorSocket = createNewSocketToCoordinator(mode);
 462     JASSERT(_coordinatorSocket.isValid()) (JASSERT_ERRNO)
 463       .Text("Error connecting to newly started coordinator.");
 464   } else if (mode & COORD_ANY) {
 465     _coordinatorSocket = createNewSocketToCoordinator(mode);
 466     if (!_coordinatorSocket.isValid()) {
 467       JTRACE("Coordinator not found, trying to start a new one.");
 468       startNewCoordinator(mode);
 469       _coordinatorSocket = createNewSocketToCoordinator(mode);
 470       JASSERT(_coordinatorSocket.isValid()) (JASSERT_ERRNO)
 471         .Text("Error connecting to newly started coordinator.");
 472     }
 473   } else {
 474     JASSERT(false) .Text("Not Reached");
 475   }
 476   _coordinatorSocket.changeFd(PROTECTED_COORD_FD);
 477 }
 478 
 479 DmtcpMessage CoordinatorAPI::sendRecvHandshake(DmtcpMessage msg,
 480                                                string progname,
 481                                                UniquePid *compId)
 482 {
 483   if (dmtcp_virtual_to_real_pid) {
 484     msg.realPid = dmtcp_virtual_to_real_pid(getpid());
 485   } else {
 486     msg.realPid = getpid();
 487   }
 488 
 489   msg.theCheckpointInterval = getCkptInterval();
 490   string hostname = jalib::Filesystem::GetCurrentHostname();
 491   msg.extraBytes = hostname.length() + 1 + progname.length() + 1;
 492 
 493   _coordinatorSocket << msg;
 494   _coordinatorSocket.writeAll(hostname.c_str(), hostname.length() + 1);
 495   _coordinatorSocket.writeAll(progname.c_str(), progname.length() + 1);
 496 
 497   msg.poison();
 498   _coordinatorSocket >> msg;
 499   msg.assertValid();
 500   if (msg.type == DMT_KILL_PEER) {
 501     JTRACE("Received KILL message from coordinator, exiting");
 502     _real_exit (0);
 503   }
 504   if (msg.type == DMT_REJECT_NOT_RUNNING) {
 505     JASSERT(false)
 506       .Text("Connection rejected by the coordinator.\n"
 507             "Reason: Current computation not in RUNNING state.\n"
 508             "         Is a checkpoint/restart in progress?");
 509   } else if (msg.type == DMT_REJECT_WRONG_COMP) {
 510     JASSERT(compId != NULL);
 511     JASSERT(false) (*compId)
 512       .Text("Connection rejected by the coordinator.\n"
 513             " Reason: This process has a different computation group.");
 514   }
 515   JASSERT(msg.type == DMT_ACCEPT);
 516   return msg;
 517 }
 518 
 519 void CoordinatorAPI::connectToCoordOnStartup(CoordinatorMode mode,
 520                                              string progname,
 521                                              DmtcpUniqueProcessId *compId,
 522                                              CoordinatorInfo *coordInfo,
 523                                              struct in_addr  *localIP)
 524 {
 525   JASSERT(compId != NULL && localIP != NULL && coordInfo != NULL);
 526 
 527   if (mode & COORD_NONE) {
 528     setupVirtualCoordinator(coordInfo, localIP);
 529     *compId = coordInfo->id;
 530     return;
 531   }
 532 
 533   createNewConnToCoord(mode);
 534   JTRACE("sending coordinator handshake")(UniquePid::ThisProcess());
 535   DmtcpMessage hello_local(DMT_NEW_WORKER);
 536   hello_local.virtualPid = -1;
 537 
 538   DmtcpMessage hello_remote = sendRecvHandshake(hello_local, progname);
 539 
 540   JASSERT(hello_remote.virtualPid != -1);
 541   JTRACE("Got virtual pid from coordinator") (hello_remote.virtualPid);
 542 
 543   pid_t ppid = getppid();
 544   Util::setVirtualPidEnvVar(hello_remote.virtualPid, ppid, ppid);
 545 
 546   JASSERT(compId != NULL && localIP != NULL && coordInfo != NULL);
 547   *compId = hello_remote.compGroup.upid();
 548   coordInfo->id = hello_remote.from.upid();
 549   coordInfo->timeStamp = hello_remote.coordTimeStamp;
 550   coordInfo->addrLen = sizeof (coordInfo->addr);
 551   JASSERT(getpeername(_coordinatorSocket.sockfd(),
 552                       (struct sockaddr*) &coordInfo->addr,
 553                       &coordInfo->addrLen) == 0)
 554     (JASSERT_ERRNO);
 555   memcpy(localIP, &hello_remote.ipAddr, sizeof hello_remote.ipAddr);
 556 }
 557 
 558 void CoordinatorAPI::createNewConnectionBeforeFork(string& progname)
 559 {
 560   JASSERT(!noCoordinator());
 561   struct sockaddr_storage addr;
 562   uint32_t len;
 563   SharedData::getCoordAddr((struct sockaddr *)&addr, &len);
 564   socklen_t addrlen = len;
 565   _coordinatorSocket = jalib::JClientSocket((struct sockaddr *)&addr, addrlen);
 566   JASSERT(_coordinatorSocket.isValid());
 567 
 568   DmtcpMessage hello_local(DMT_NEW_WORKER);
 569   DmtcpMessage hello_remote = sendRecvHandshake(hello_local, progname);
 570   JASSERT(hello_remote.virtualPid != -1);
 571 
 572   if (dmtcp_virtual_to_real_pid) {
 573     JTRACE("Got virtual pid from coordinator") (hello_remote.virtualPid);
 574     pid_t pid = getpid();
 575     pid_t realPid = dmtcp_virtual_to_real_pid(pid);
 576     Util::setVirtualPidEnvVar(hello_remote.virtualPid, pid, realPid);
 577   }
 578 }
 579 
 580 void CoordinatorAPI::connectToCoordOnRestart(CoordinatorMode  mode,
 581                                              string progname,
 582                                              UniquePid compGroup,
 583                                              int np,
 584                                              CoordinatorInfo *coordInfo,
 585                                              const char *host,
 586                                              int port,
 587                                              struct in_addr  *localIP)
 588 {
 589   if (mode & COORD_NONE) {
 590     setupVirtualCoordinator(coordInfo, localIP);
 591     return;
 592   }
 593 
 594   createNewConnToCoord(mode);
 595   JTRACE("sending coordinator handshake")(UniquePid::ThisProcess());
 596   DmtcpMessage hello_local(DMT_RESTART_WORKER);
 597   hello_local.virtualPid = -1;
 598   hello_local.numPeers = np;
 599   hello_local.compGroup = compGroup;
 600 
 601   DmtcpMessage hello_remote = sendRecvHandshake(hello_local, progname,
 602                                                 &compGroup);
 603 
 604   if (coordInfo != NULL) {
 605     coordInfo->id = hello_remote.from.upid();
 606     coordInfo->timeStamp = hello_remote.coordTimeStamp;
 607     coordInfo->addrLen = sizeof (coordInfo->addr);
 608     JASSERT(getpeername(_coordinatorSocket.sockfd(),
 609                         (struct sockaddr*) &coordInfo->addr,
 610                         &coordInfo->addrLen) == 0)
 611       (JASSERT_ERRNO);
 612   }
 613   if (localIP != NULL) {
 614     memcpy(localIP, &hello_remote.ipAddr, sizeof hello_remote.ipAddr);
 615   }
 616 
 617   JTRACE("Coordinator handshake RECEIVED!!!!!");
 618 }
 619 
 620 void CoordinatorAPI::sendCkptFilename()
 621 {
 622   if (noCoordinator()) return;
 623   // Tell coordinator to record our filename in the restart script
 624   string ckptFilename = ProcessInfo::instance().getCkptFilename();
 625   string hostname = jalib::Filesystem::GetCurrentHostname();
 626   JTRACE("recording filenames") (ckptFilename) (hostname);
 627   DmtcpMessage msg;
 628   if (dmtcp_unique_ckpt_enabled && dmtcp_unique_ckpt_enabled()) {
 629     msg.type = DMT_UNIQUE_CKPT_FILENAME;
 630   } else {
 631     msg.type = DMT_CKPT_FILENAME;
 632   }
 633   msg.extraBytes = ckptFilename.length() + 1 + hostname.length() + 1;
 634   _coordinatorSocket << msg;
 635   _coordinatorSocket.writeAll(ckptFilename.c_str(), ckptFilename.length() + 1);
 636   _coordinatorSocket.writeAll(hostname.c_str(), hostname.length() + 1);
 637 }
 638 
 639 
 640 int CoordinatorAPI::sendKeyValPairToCoordinator(const char *id,
 641                                                 const void *key,
 642                                                 uint32_t key_len,
 643                                                 const void *val,
 644                                                 uint32_t val_len,
 645                                                 int sync)
 646 {
 647   DmtcpMessage msg (DMT_REGISTER_NAME_SERVICE_DATA);
 648   if (sync) {
 649     msg.type = DMT_REGISTER_NAME_SERVICE_DATA_SYNC;
 650   }
 651   JWARNING(strlen(id) < sizeof(msg.nsid));
 652   strncpy(msg.nsid, id, 8);
 653   msg.keyLen = key_len;
 654   msg.valLen = val_len;
 655   msg.extraBytes = key_len + val_len;
 656   jalib::JSocket sock = _coordinatorSocket;
 657   if (dmtcp_is_running_state()) {
 658     if (_nsSock.sockfd() == -1) {
 659       _nsSock = createNewSocketToCoordinator(COORD_ANY);
 660       JASSERT(_nsSock.isValid());
 661       _nsSock.changeFd(PROTECTED_NS_FD);
 662       DmtcpMessage m(DMT_NAME_SERVICE_WORKER);
 663       _nsSock << m;
 664     }
 665     sock = _nsSock;
 666     JASSERT(sock.isValid());
 667   }
 668 
 669   sock << msg;
 670   sock.writeAll((const char *)key, key_len);
 671   sock.writeAll((const char *)val, val_len);
 672   if (sync) {
 673     msg.poison();
 674     sock >> msg;
 675     JASSERT(msg.type == DMT_REGISTER_NAME_SERVICE_DATA_SYNC_RESPONSE)(msg.type);
 676 
 677   }
 678   return 1;
 679 }
 680 
 681 // On input, val points to a buffer in user memory and *val_len is the maximum
 682 //   size of that buffer (the memory allocated by user).
 683 // On output, we copy data to val, and set *val_len to the actual buffer size
 684 //   (to the size of the data that we copied to the user buffer).
 685 int CoordinatorAPI::sendQueryToCoordinator(const char *id,
 686                                            const void *key,
 687                                            uint32_t key_len,
 688                                            void *val,
 689                                            uint32_t *val_len)
 690 {
 691   DmtcpMessage msg (DMT_NAME_SERVICE_QUERY);
 692   JWARNING(strlen(id) < sizeof(msg.nsid));
 693   strncpy(msg.nsid, id, 8);
 694   msg.keyLen = key_len;
 695   msg.valLen = 0;
 696   msg.extraBytes = key_len;
 697   jalib::JSocket sock = _coordinatorSocket;
 698 
 699   if (key == NULL || key_len == 0 || val == NULL || val_len == 0) {
 700     return 0;
 701   }
 702 
 703   if (dmtcp_is_running_state()) {
 704     if (!_nsSock.isValid()) {
 705       _nsSock = createNewSocketToCoordinator(COORD_ANY);
 706       JASSERT(_nsSock.isValid());
 707       _nsSock.changeFd(PROTECTED_NS_FD);
 708       DmtcpMessage m(DMT_NAME_SERVICE_WORKER);
 709       _nsSock << m;
 710     }
 711     sock = _nsSock;
 712     JASSERT(sock.isValid());
 713   }
 714 
 715   sock << msg;
 716   sock.writeAll((const char *)key, key_len);
 717 
 718   msg.poison();
 719   sock >> msg;
 720   msg.assertValid();
 721   JASSERT(msg.type == DMT_NAME_SERVICE_QUERY_RESPONSE &&
 722           msg.extraBytes == msg.valLen);
 723 
 724   JASSERT (*val_len >= msg.valLen);
 725   *val_len = msg.valLen;
 726   if (*val_len > 0) {
 727     sock.readAll((char*)val, *val_len);
 728   }
 729 
 730   return *val_len;
 731 }

/* [<][>][^][v][top][bottom][index][help] */