root/coordinatorapi.cpp
/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- dmtcp_CoordinatorAPI_EventHook
- restart
- getCkptInterval
- createNewSocketToCoordinator
- instance
- init
- resetOnFork
- setupVirtualCoordinator
- waitForCheckpointCommand
- noCoordinator
- connectAndSendUserCommand
- getCoordCkptDir
- updateCoordCkptDir
- sendMsgToCoordinator
- recvMsgFromCoordinator
- startNewCoordinator
- createNewConnToCoord
- sendRecvHandshake
- connectToCoordOnStartup
- createNewConnectionBeforeFork
- connectToCoordOnRestart
- sendCkptFilename
- sendKeyValPairToCoordinator
- sendQueryToCoordinator
1 /****************************************************************************
2 * Copyright (C) 2006-2013 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of DMTCP. *
6 * *
7 * DMTCP is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 // CAN REMOVE BOOL enableCheckpointing ARG OF DmtcpWorker WHEN WE'RE DONE.
23 // DmtcpWorker CAN INHERIT THIS CLASS, CoordinatorAPI
24
25 #include <netdb.h>
26 #include <arpa/inet.h>
27 #include "coordinatorapi.h"
28 #include "dmtcp.h"
29 #include "util.h"
30 #include "syscallwrappers.h"
31 #include "util.h"
32 #include "shareddata.h"
33 #include "processinfo.h"
34 #include "../jalib/jconvert.h"
35 #include "../jalib/jfilesystem.h"
36 #include <fcntl.h>
37
38 using namespace dmtcp;
39
40 void dmtcp_CoordinatorAPI_EventHook(DmtcpEvent_t event, DmtcpEventData_t *data)
41 {
42 if (CoordinatorAPI::noCoordinator()) return;
43 switch (event) {
44 case DMTCP_EVENT_INIT:
45 CoordinatorAPI::instance().init();
46 break;
47
48 case DMTCP_EVENT_THREADS_SUSPEND:
49 JASSERT(CoordinatorAPI::instance().isValid());
50 break;
51
52 case DMTCP_EVENT_RESTART:
53 CoordinatorAPI::restart();
54 break;
55
56 case DMTCP_EVENT_RESUME:
57 CoordinatorAPI::instance().sendCkptFilename();
58 break;
59
60 case DMTCP_EVENT_EXIT:
61 JTRACE("exit() in progress, disconnecting from dmtcp coordinator");
62 CoordinatorAPI::instance().closeConnection();
63 break;
64
65 default:
66 break;
67 }
68 }
69
70 void CoordinatorAPI::restart()
71 {
72 instance()._nsSock.close();
73 }
74
75 static uint32_t getCkptInterval()
76 {
77 uint32_t ret = DMTCPMESSAGE_SAME_CKPT_INTERVAL;
78 const char* interval = getenv (ENV_VAR_CKPT_INTR);
79 /* DmtcpMessage constructor default:
80 * hello_local.theCheckpointInterval: DMTCPMESSAGE_SAME_CKPT_INTERVAL
81 */
82 if (interval != NULL) {
83 ret = jalib::StringToInt (interval);
84 }
85 // Tell the coordinator the ckpt interval only once. It can change later.
86 _dmtcp_unsetenv (ENV_VAR_CKPT_INTR);
87 return ret;
88 }
89
90 static jalib::JSocket createNewSocketToCoordinator(CoordinatorMode mode)
91 {
92 const char*host = NULL;
93 int port = UNINITIALIZED_PORT;
94
95 Util::getCoordHostAndPort(COORD_ANY, &host, &port);
96 return jalib::JClientSocket(host, port);
97 }
98
99 //CoordinatorAPI::CoordinatorAPI (int sockfd)
100 //: _coordinatorSocket(sockfd)
101 //{ }
102
103 static CoordinatorAPI *coordAPIInst = NULL;
104 CoordinatorAPI& CoordinatorAPI::instance()
105 {
106 //static SysVIPC *inst = new SysVIPC(); return *inst;
107 if (coordAPIInst == NULL) {
108 coordAPIInst = new CoordinatorAPI();
109 if (noCoordinator()) {
110 coordAPIInst->_coordinatorSocket = jalib::JSocket(PROTECTED_COORD_FD);
111 }
112 }
113 return *coordAPIInst;
114 }
115
116 void CoordinatorAPI::init()
117 {
118 JTRACE("Informing coordinator of new process") (UniquePid::ThisProcess());
119
120 DmtcpMessage msg (DMT_UPDATE_PROCESS_INFO_AFTER_INIT_OR_EXEC);
121 string progname = jalib::Filesystem::GetProgramName();
122 msg.extraBytes = progname.length() + 1;
123
124 JASSERT(Util::isValidFd(PROTECTED_COORD_FD));
125 instance()._coordinatorSocket = jalib::JSocket(PROTECTED_COORD_FD);
126 instance()._coordinatorSocket << msg;
127 instance()._coordinatorSocket.writeAll(progname.c_str(),
128 progname.length() + 1);
129 // The coordinator won't send any msg in response to DMT_UPDATE... so no need
130 // to call recvCoordinatorHandshake().
131 }
132
133 void CoordinatorAPI::resetOnFork(CoordinatorAPI& coordAPI)
134 {
135 JASSERT(coordAPI._coordinatorSocket.isValid());
136 JASSERT(coordAPI._coordinatorSocket.sockfd() != PROTECTED_COORD_FD);
137 instance() = coordAPI;
138 instance()._coordinatorSocket.changeFd(PROTECTED_COORD_FD);
139
140 JTRACE("Informing coordinator of new process") (UniquePid::ThisProcess());
141
142 DmtcpMessage msg (DMT_UPDATE_PROCESS_INFO_AFTER_FORK);
143 if (dmtcp_virtual_to_real_pid) {
144 msg.realPid = dmtcp_virtual_to_real_pid(getpid());
145 } else {
146 msg.realPid = getpid();
147 }
148 instance()._coordinatorSocket << msg;
149 // The coordinator won't send any msg in response to DMT_UPDATE... so no need
150 // to call recvCoordinatorHandshake().
151 instance()._nsSock.close();
152 }
153
154 // FIXME: Does "virtual coordinator" mean coordinator built into the
155 // the current process (no separate process?)
156 void CoordinatorAPI::setupVirtualCoordinator(CoordinatorInfo *coordInfo,
157 struct in_addr *localIP)
158 {
159 const char *host = NULL;
160 int port;
161 Util::getCoordHostAndPort(COORD_NONE, &host, &port);
162 _coordinatorSocket = jalib::JServerSocket(jalib::JSockAddr::ANY, port);
163 JASSERT(_coordinatorSocket.isValid()) (port) (JASSERT_ERRNO)
164 .Text("Failed to create listen socket.");
165 _coordinatorSocket.changeFd(PROTECTED_COORD_FD);
166 Util::setCoordPort(_coordinatorSocket.port());
167
168 pid_t ppid = getppid();
169 Util::setVirtualPidEnvVar(INITIAL_VIRTUAL_PID, ppid, ppid);
170
171 UniquePid coordId = UniquePid(INITIAL_VIRTUAL_PID,
172 UniquePid::ThisProcess().hostid(),
173 UniquePid::ThisProcess().time());
174
175 coordInfo->id = coordId.upid();
176 coordInfo->timeStamp = coordId.time();
177 coordInfo->addrLen = 0;
178 if (getenv(ENV_VAR_CKPT_INTR) != NULL) {
179 coordInfo->interval = (uint32_t) strtol(getenv(ENV_VAR_CKPT_INTR), NULL, 0);
180 } else {
181 coordInfo->interval = 0;
182 }
183 memset(&coordInfo->addr, 0, sizeof(coordInfo->addr));
184 memset(localIP, 0, sizeof(*localIP));
185 }
186
187 void CoordinatorAPI::waitForCheckpointCommand()
188 {
189 uint32_t ckptInterval = SharedData::getCkptInterval();
190 struct timeval tmptime={0,0};
191 long remaining = ckptInterval;
192 do {
193 fd_set rfds;
194 struct timeval *timeout = NULL;
195 struct timeval start;
196 if (ckptInterval > 0) {
197 timeout = &tmptime;
198 timeout->tv_sec = remaining;
199 JASSERT(gettimeofday(&start, NULL) == 0) (JASSERT_ERRNO);
200 }
201 FD_ZERO(&rfds);
202 FD_SET(PROTECTED_COORD_FD, &rfds );
203 int retval = select(PROTECTED_COORD_FD+1, &rfds, NULL, NULL, timeout);
204 if (retval == 0) { // timeout expired, time for checkpoint
205 JTRACE("Timeout expired, checkpointing now.");
206 return;
207 } else if (retval > 0) {
208 JASSERT(FD_ISSET(PROTECTED_COORD_FD, &rfds));
209 JTRACE("Connect request on virtual coordinator socket.");
210 break;
211 }
212 JASSERT(errno == EINTR) (JASSERT_ERRNO); /* EINTR: a signal was caught */
213 if (ckptInterval > 0) {
214 struct timeval end;
215 JASSERT(gettimeofday(&end, NULL) == 0) (JASSERT_ERRNO);
216 remaining -= end.tv_sec - start.tv_sec;
217 // If the remaining time is negative, we can checkpoint now
218 if (remaining < 0) {
219 return;
220 }
221 }
222 } while (remaining > 0);
223
224 jalib::JSocket cmdSock(-1);
225 DmtcpMessage msg;
226 DmtcpMessage reply(DMT_USER_CMD_RESULT);
227 do {
228 cmdSock.close();
229 cmdSock = _coordinatorSocket.accept();
230 msg.poison();
231 JTRACE("Reading from incoming connection...");
232 cmdSock >> msg;
233 } while (!cmdSock.isValid());
234
235 JASSERT(msg.type == DMT_USER_CMD) (msg.type)
236 .Text("Unexpected connection.");
237
238 reply.coordCmdStatus = CoordCmdStatus::NOERROR;
239
240 bool exitWhenDone = false;
241 switch (msg.coordCmd) {
242 // case 'b': case 'B': // prefix blocking command, prior to checkpoint command
243 // JTRACE("blocking checkpoint beginning...");
244 // blockUntilDone = true;
245 // break;
246 case 's': case 'S':
247 JTRACE("Received status command");
248 reply.numPeers = 1;
249 reply.isRunning = 1;
250 break;
251 case 'c': case 'C':
252 JTRACE("checkpointing...");
253 break;
254 case 'k': case 'K':
255 case 'q': case 'Q':
256 JTRACE("Received KILL command from user, exiting");
257 exitWhenDone = true;
258 break;
259 default:
260 JTRACE("unhandled user command") (msg.coordCmd);
261 reply.coordCmdStatus = CoordCmdStatus::ERROR_INVALID_COMMAND;
262 }
263 cmdSock << reply;
264 cmdSock.close();
265 if (exitWhenDone) {
266 _real_exit(0);
267 }
268 return;
269 }
270
271 bool CoordinatorAPI::noCoordinator()
272 {
273 static int virtualCoordinator = -1;
274 if (virtualCoordinator == -1) {
275 int optVal = -1;
276 socklen_t optLen = sizeof(optVal);
277 int ret = _real_getsockopt(PROTECTED_COORD_FD, SOL_SOCKET,
278 SO_ACCEPTCONN, &optVal, &optLen);
279 if (ret == 0 && optVal == 1) {
280 virtualCoordinator = 1;
281 } else {
282 virtualCoordinator = 0;
283 }
284 }
285 return virtualCoordinator;
286 }
287
288 void CoordinatorAPI::connectAndSendUserCommand(char c,
289 int *coordCmdStatus,
290 int *numPeers,
291 int *isRunning,
292 int *ckptInterval)
293 {
294 _coordinatorSocket = createNewSocketToCoordinator(COORD_ANY);
295 if (!_coordinatorSocket.isValid()) {
296 *coordCmdStatus = CoordCmdStatus::ERROR_COORDINATOR_NOT_FOUND;
297 return;
298 }
299
300 //tell the coordinator to run given user command
301 DmtcpMessage msg, reply;
302
303 //send
304 msg.type = DMT_USER_CMD;
305 msg.coordCmd = c;
306
307 if (c == 'i') {
308 const char* interval = getenv (ENV_VAR_CKPT_INTR);
309 if (interval != NULL){
310 msg.theCheckpointInterval = jalib::StringToInt (interval);
311 }
312 }
313
314 _coordinatorSocket << msg;
315
316 //the coordinator will violently close our socket...
317 if (c=='q' || c=='Q') {
318 *coordCmdStatus = CoordCmdStatus::NOERROR;
319 return;
320 }
321
322 //receive REPLY
323 reply.poison();
324 _coordinatorSocket >> reply;
325 reply.assertValid();
326 JASSERT(reply.type == DMT_USER_CMD_RESULT);
327
328 if (coordCmdStatus != NULL) {
329 *coordCmdStatus = reply.coordCmdStatus;
330 }
331 if (numPeers != NULL) {
332 *numPeers = reply.numPeers;
333 }
334 if (isRunning != NULL) {
335 *isRunning = reply.isRunning;
336 }
337 if (ckptInterval != NULL) {
338 *ckptInterval = reply.theCheckpointInterval;
339 }
340
341 _coordinatorSocket.close();
342 }
343
344 string CoordinatorAPI::getCoordCkptDir(void)
345 {
346 // FIXME: Add a test for make-check.
347 char buf[PATH_MAX];
348 if (noCoordinator()) return "";
349 DmtcpMessage msg(DMT_GET_CKPT_DIR);
350 _coordinatorSocket << msg;
351
352 msg.poison();
353 _coordinatorSocket >> msg;
354 msg.assertValid();
355 JASSERT(msg.type == DMT_GET_CKPT_DIR_RESULT) (msg.type);
356
357 JASSERT(msg.extraBytes > 0);
358 _coordinatorSocket.readAll(buf, msg.extraBytes);
359 return buf;
360 }
361
362 void CoordinatorAPI::updateCoordCkptDir(const char *dir)
363 {
364 if (noCoordinator()) return;
365 JASSERT(dir != NULL);
366 DmtcpMessage msg(DMT_UPDATE_CKPT_DIR);
367 msg.extraBytes = strlen(dir) + 1;
368 _coordinatorSocket << msg;
369 _coordinatorSocket.writeAll(dir, strlen(dir) + 1);
370 }
371
372 void CoordinatorAPI::sendMsgToCoordinator(const DmtcpMessage &msg,
373 const void *extraData,
374 size_t len)
375 {
376 if (noCoordinator()) return;
377 _coordinatorSocket << msg;
378 if (msg.extraBytes > 0) {
379 JASSERT(extraData != NULL);
380 JASSERT(len == msg.extraBytes);
381 _coordinatorSocket.writeAll((const char *)extraData, msg.extraBytes);
382 }
383 }
384
385 void CoordinatorAPI::recvMsgFromCoordinator(DmtcpMessage *msg, void **extraData)
386 {
387 JASSERT(!noCoordinator());
388 msg->poison();
389 _coordinatorSocket >> (*msg);
390
391 if (extraData != NULL) {
392 msg->assertValid();
393 JASSERT(msg->extraBytes > 0);
394 // Caller must free this buffer
395 void *buf = JALLOC_HELPER_MALLOC(msg->extraBytes);
396 _coordinatorSocket.readAll((char*)buf, msg->extraBytes);
397 JASSERT(extraData != NULL);
398 *extraData = buf;
399 }
400 }
401
402 void CoordinatorAPI::startNewCoordinator(CoordinatorMode mode)
403 {
404 const char *host;
405 int port;
406 Util::getCoordHostAndPort(mode, &host, &port);
407
408 JASSERT(strcmp(host, "localhost") == 0 ||
409 strcmp(host, "127.0.0.1") == 0 ||
410 jalib::Filesystem::GetCurrentHostname() == host)
411 (host) (jalib::Filesystem::GetCurrentHostname())
412 .Text("Won't automatically start coordinator because DMTCP_HOST"
413 " is set to a remote host.");
414 // Create a socket and bind it to an unused port.
415 errno = 0;
416 jalib::JServerSocket coordinatorListenerSocket(jalib::JSockAddr::ANY,
417 port, 128);
418 JASSERT(coordinatorListenerSocket.isValid())
419 (coordinatorListenerSocket.port()) (JASSERT_ERRNO)
420 .Text("Failed to create listen socket."
421 "\nIf msg is \"Address already in use\", this may be an old coordinator."
422 "\nKill other coordinators and try again in a minute or so.");
423 // Now dup the sockfd to
424 coordinatorListenerSocket.changeFd(PROTECTED_COORD_FD);
425 Util::setCoordPort(coordinatorListenerSocket.port());
426
427 JTRACE("Starting a new coordinator automatically.")
428 (coordinatorListenerSocket.port());
429
430 if (fork() == 0) {
431 // We can't use Util::getPath() here since the SharedData has not been
432 // initialized yet.
433 string coordinator =
434 jalib::Filesystem::GetProgramDir() + "/dmtcp_coordinator";
435
436 char *modeStr = (char *)"--daemon";
437 char * args[] = {
438 (char*)coordinator.c_str(),
439 (char*)"--exit-on-last",
440 modeStr,
441 NULL
442 };
443 execv(args[0], args);
444 JASSERT(false)(coordinator)(JASSERT_ERRNO) .Text("exec(dmtcp_coordinator) failed");
445 } else {
446 _real_close (PROTECTED_COORD_FD);
447 }
448
449 int status;
450 JASSERT(wait(&status) > 0) (JASSERT_ERRNO);
451 }
452
453 void CoordinatorAPI::createNewConnToCoord(CoordinatorMode mode)
454 {
455 if (mode & COORD_JOIN) {
456 _coordinatorSocket = createNewSocketToCoordinator(mode);
457 JASSERT(_coordinatorSocket.isValid()) (JASSERT_ERRNO)
458 .Text("Coordinator not found, but --join was specified. Exiting.");
459 } else if (mode & COORD_NEW) {
460 startNewCoordinator(mode);
461 _coordinatorSocket = createNewSocketToCoordinator(mode);
462 JASSERT(_coordinatorSocket.isValid()) (JASSERT_ERRNO)
463 .Text("Error connecting to newly started coordinator.");
464 } else if (mode & COORD_ANY) {
465 _coordinatorSocket = createNewSocketToCoordinator(mode);
466 if (!_coordinatorSocket.isValid()) {
467 JTRACE("Coordinator not found, trying to start a new one.");
468 startNewCoordinator(mode);
469 _coordinatorSocket = createNewSocketToCoordinator(mode);
470 JASSERT(_coordinatorSocket.isValid()) (JASSERT_ERRNO)
471 .Text("Error connecting to newly started coordinator.");
472 }
473 } else {
474 JASSERT(false) .Text("Not Reached");
475 }
476 _coordinatorSocket.changeFd(PROTECTED_COORD_FD);
477 }
478
479 DmtcpMessage CoordinatorAPI::sendRecvHandshake(DmtcpMessage msg,
480 string progname,
481 UniquePid *compId)
482 {
483 if (dmtcp_virtual_to_real_pid) {
484 msg.realPid = dmtcp_virtual_to_real_pid(getpid());
485 } else {
486 msg.realPid = getpid();
487 }
488
489 msg.theCheckpointInterval = getCkptInterval();
490 string hostname = jalib::Filesystem::GetCurrentHostname();
491 msg.extraBytes = hostname.length() + 1 + progname.length() + 1;
492
493 _coordinatorSocket << msg;
494 _coordinatorSocket.writeAll(hostname.c_str(), hostname.length() + 1);
495 _coordinatorSocket.writeAll(progname.c_str(), progname.length() + 1);
496
497 msg.poison();
498 _coordinatorSocket >> msg;
499 msg.assertValid();
500 if (msg.type == DMT_KILL_PEER) {
501 JTRACE("Received KILL message from coordinator, exiting");
502 _real_exit (0);
503 }
504 if (msg.type == DMT_REJECT_NOT_RUNNING) {
505 JASSERT(false)
506 .Text("Connection rejected by the coordinator.\n"
507 "Reason: Current computation not in RUNNING state.\n"
508 " Is a checkpoint/restart in progress?");
509 } else if (msg.type == DMT_REJECT_WRONG_COMP) {
510 JASSERT(compId != NULL);
511 JASSERT(false) (*compId)
512 .Text("Connection rejected by the coordinator.\n"
513 " Reason: This process has a different computation group.");
514 }
515 JASSERT(msg.type == DMT_ACCEPT);
516 return msg;
517 }
518
519 void CoordinatorAPI::connectToCoordOnStartup(CoordinatorMode mode,
520 string progname,
521 DmtcpUniqueProcessId *compId,
522 CoordinatorInfo *coordInfo,
523 struct in_addr *localIP)
524 {
525 JASSERT(compId != NULL && localIP != NULL && coordInfo != NULL);
526
527 if (mode & COORD_NONE) {
528 setupVirtualCoordinator(coordInfo, localIP);
529 *compId = coordInfo->id;
530 return;
531 }
532
533 createNewConnToCoord(mode);
534 JTRACE("sending coordinator handshake")(UniquePid::ThisProcess());
535 DmtcpMessage hello_local(DMT_NEW_WORKER);
536 hello_local.virtualPid = -1;
537
538 DmtcpMessage hello_remote = sendRecvHandshake(hello_local, progname);
539
540 JASSERT(hello_remote.virtualPid != -1);
541 JTRACE("Got virtual pid from coordinator") (hello_remote.virtualPid);
542
543 pid_t ppid = getppid();
544 Util::setVirtualPidEnvVar(hello_remote.virtualPid, ppid, ppid);
545
546 JASSERT(compId != NULL && localIP != NULL && coordInfo != NULL);
547 *compId = hello_remote.compGroup.upid();
548 coordInfo->id = hello_remote.from.upid();
549 coordInfo->timeStamp = hello_remote.coordTimeStamp;
550 coordInfo->addrLen = sizeof (coordInfo->addr);
551 JASSERT(getpeername(_coordinatorSocket.sockfd(),
552 (struct sockaddr*) &coordInfo->addr,
553 &coordInfo->addrLen) == 0)
554 (JASSERT_ERRNO);
555 memcpy(localIP, &hello_remote.ipAddr, sizeof hello_remote.ipAddr);
556 }
557
558 void CoordinatorAPI::createNewConnectionBeforeFork(string& progname)
559 {
560 JASSERT(!noCoordinator());
561 struct sockaddr_storage addr;
562 uint32_t len;
563 SharedData::getCoordAddr((struct sockaddr *)&addr, &len);
564 socklen_t addrlen = len;
565 _coordinatorSocket = jalib::JClientSocket((struct sockaddr *)&addr, addrlen);
566 JASSERT(_coordinatorSocket.isValid());
567
568 DmtcpMessage hello_local(DMT_NEW_WORKER);
569 DmtcpMessage hello_remote = sendRecvHandshake(hello_local, progname);
570 JASSERT(hello_remote.virtualPid != -1);
571
572 if (dmtcp_virtual_to_real_pid) {
573 JTRACE("Got virtual pid from coordinator") (hello_remote.virtualPid);
574 pid_t pid = getpid();
575 pid_t realPid = dmtcp_virtual_to_real_pid(pid);
576 Util::setVirtualPidEnvVar(hello_remote.virtualPid, pid, realPid);
577 }
578 }
579
580 void CoordinatorAPI::connectToCoordOnRestart(CoordinatorMode mode,
581 string progname,
582 UniquePid compGroup,
583 int np,
584 CoordinatorInfo *coordInfo,
585 const char *host,
586 int port,
587 struct in_addr *localIP)
588 {
589 if (mode & COORD_NONE) {
590 setupVirtualCoordinator(coordInfo, localIP);
591 return;
592 }
593
594 createNewConnToCoord(mode);
595 JTRACE("sending coordinator handshake")(UniquePid::ThisProcess());
596 DmtcpMessage hello_local(DMT_RESTART_WORKER);
597 hello_local.virtualPid = -1;
598 hello_local.numPeers = np;
599 hello_local.compGroup = compGroup;
600
601 DmtcpMessage hello_remote = sendRecvHandshake(hello_local, progname,
602 &compGroup);
603
604 if (coordInfo != NULL) {
605 coordInfo->id = hello_remote.from.upid();
606 coordInfo->timeStamp = hello_remote.coordTimeStamp;
607 coordInfo->addrLen = sizeof (coordInfo->addr);
608 JASSERT(getpeername(_coordinatorSocket.sockfd(),
609 (struct sockaddr*) &coordInfo->addr,
610 &coordInfo->addrLen) == 0)
611 (JASSERT_ERRNO);
612 }
613 if (localIP != NULL) {
614 memcpy(localIP, &hello_remote.ipAddr, sizeof hello_remote.ipAddr);
615 }
616
617 JTRACE("Coordinator handshake RECEIVED!!!!!");
618 }
619
620 void CoordinatorAPI::sendCkptFilename()
621 {
622 if (noCoordinator()) return;
623 // Tell coordinator to record our filename in the restart script
624 string ckptFilename = ProcessInfo::instance().getCkptFilename();
625 string hostname = jalib::Filesystem::GetCurrentHostname();
626 JTRACE("recording filenames") (ckptFilename) (hostname);
627 DmtcpMessage msg;
628 if (dmtcp_unique_ckpt_enabled && dmtcp_unique_ckpt_enabled()) {
629 msg.type = DMT_UNIQUE_CKPT_FILENAME;
630 } else {
631 msg.type = DMT_CKPT_FILENAME;
632 }
633 msg.extraBytes = ckptFilename.length() + 1 + hostname.length() + 1;
634 _coordinatorSocket << msg;
635 _coordinatorSocket.writeAll(ckptFilename.c_str(), ckptFilename.length() + 1);
636 _coordinatorSocket.writeAll(hostname.c_str(), hostname.length() + 1);
637 }
638
639
640 int CoordinatorAPI::sendKeyValPairToCoordinator(const char *id,
641 const void *key,
642 uint32_t key_len,
643 const void *val,
644 uint32_t val_len,
645 int sync)
646 {
647 DmtcpMessage msg (DMT_REGISTER_NAME_SERVICE_DATA);
648 if (sync) {
649 msg.type = DMT_REGISTER_NAME_SERVICE_DATA_SYNC;
650 }
651 JWARNING(strlen(id) < sizeof(msg.nsid));
652 strncpy(msg.nsid, id, 8);
653 msg.keyLen = key_len;
654 msg.valLen = val_len;
655 msg.extraBytes = key_len + val_len;
656 jalib::JSocket sock = _coordinatorSocket;
657 if (dmtcp_is_running_state()) {
658 if (_nsSock.sockfd() == -1) {
659 _nsSock = createNewSocketToCoordinator(COORD_ANY);
660 JASSERT(_nsSock.isValid());
661 _nsSock.changeFd(PROTECTED_NS_FD);
662 DmtcpMessage m(DMT_NAME_SERVICE_WORKER);
663 _nsSock << m;
664 }
665 sock = _nsSock;
666 JASSERT(sock.isValid());
667 }
668
669 sock << msg;
670 sock.writeAll((const char *)key, key_len);
671 sock.writeAll((const char *)val, val_len);
672 if (sync) {
673 msg.poison();
674 sock >> msg;
675 JASSERT(msg.type == DMT_REGISTER_NAME_SERVICE_DATA_SYNC_RESPONSE)(msg.type);
676
677 }
678 return 1;
679 }
680
681 // On input, val points to a buffer in user memory and *val_len is the maximum
682 // size of that buffer (the memory allocated by user).
683 // On output, we copy data to val, and set *val_len to the actual buffer size
684 // (to the size of the data that we copied to the user buffer).
685 int CoordinatorAPI::sendQueryToCoordinator(const char *id,
686 const void *key,
687 uint32_t key_len,
688 void *val,
689 uint32_t *val_len)
690 {
691 DmtcpMessage msg (DMT_NAME_SERVICE_QUERY);
692 JWARNING(strlen(id) < sizeof(msg.nsid));
693 strncpy(msg.nsid, id, 8);
694 msg.keyLen = key_len;
695 msg.valLen = 0;
696 msg.extraBytes = key_len;
697 jalib::JSocket sock = _coordinatorSocket;
698
699 if (key == NULL || key_len == 0 || val == NULL || val_len == 0) {
700 return 0;
701 }
702
703 if (dmtcp_is_running_state()) {
704 if (!_nsSock.isValid()) {
705 _nsSock = createNewSocketToCoordinator(COORD_ANY);
706 JASSERT(_nsSock.isValid());
707 _nsSock.changeFd(PROTECTED_NS_FD);
708 DmtcpMessage m(DMT_NAME_SERVICE_WORKER);
709 _nsSock << m;
710 }
711 sock = _nsSock;
712 JASSERT(sock.isValid());
713 }
714
715 sock << msg;
716 sock.writeAll((const char *)key, key_len);
717
718 msg.poison();
719 sock >> msg;
720 msg.assertValid();
721 JASSERT(msg.type == DMT_NAME_SERVICE_QUERY_RESPONSE &&
722 msg.extraBytes == msg.valLen);
723
724 JASSERT (*val_len >= msg.valLen);
725 *val_len = msg.valLen;
726 if (*val_len > 0) {
727 sock.readAll((char*)val, *val_len);
728 }
729
730 return *val_len;
731 }