/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- eventHook
- _isBadFd
- resetOnFork
- deleteStaleConnections
- serialize
- list
- getConnection
- getConnection
- add
- processCloseWork
- processClose
- processDup
- preLockSaveOptions
- preCkptFdLeaderElection
- drain
- preCkpt
- refill
- resume
- postRestart
- registerIncomingCons
- sendReceiveMissingFds
1 /****************************************************************************
2 * Copyright (C) 2006-2013 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
6 * *
7 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <errno.h>
25 #include <sys/stat.h>
26 #include <sys/types.h>
27 #include <sys/wait.h>
28
29 #include "util.h"
30 #include "dmtcp.h"
31 #include "shareddata.h"
32 #include "jfilesystem.h"
33 #include "jconvert.h"
34 #include "jassert.h"
35 #include "jsocket.h"
36
37 #include "util_ipc.h"
38 #include "connection.h"
39 #include "connectionlist.h"
40
41 // Each fd may be shared or private. If a fd is shared, this situation
42 // must be restored at restart time, and only one process should set
43 // the properties of that shared fd. The sequence of events follows.
44
45 // At the time of checkpoint, a leader election algorithm is used to decide
46 // on a unique process as leader, who will be responsible for restoring the
47 // state of a file descriptor (whether shared or not) at the time of restart.
48 // It is implemented through doLocking and checkLocking, in which each
49 // process uses fcntl() with SETOWN, and after a barrier, they check
50 // if they are still the owner using GETOWN.
51 // A list of connections (fd's) is found through /proc/*/fd and saved
52 // in the ConnectionList object.
53 // At the time of restart, con->hasLock() will tell a process if it is the
54 // the leader for that connection (con). However, the process must still
55 // discover if the file descriptor is private or shared.
56 // We define a "private fd" as an fd that is shared by exactly one process.
57 // This criterion is used in the implementation.
58 // There is a list of outgoing connections (outgoingCons) and
59 // incoming connections (incomingCons). These will refer to shared fd's.
60 // For the various processes on the same host, any shared fd corresponding
61 // to a connection has a 'con->hasLock()' method. If it's true, this
62 // process owns the shared fd, and must send it as an outgoing connection.
63 // If it's false, this process does not own the shared fd, and it must
64 // receive it from another process.
65 // The connections of the ConnectionList object are initially entered into
66 // a list of outgoing connections. If a process sees a fd and it does not
67 // have the lock (if con->hasLock() == false), then the process knows that
68 // the fd must be shared. So, the process declares this fd to be "incoming".
69 // Then registerIncomingCons() can separate the fd's into incoming and outgoing
70 // connections. This is done by looking up the shared-area to see which
71 // connections have been declared as "incoming" by other processes.
72 // A UNIX domain socket (called 'restoreFd' or 'protected_fd', depending
73 // on the function, but always deduced from protectedFd()) is used to
74 // send the outgoing connections, and to receive the incoming connections
75 // at restart time, so that the corresponding fd's can again be shared.
76 // The owner of each outgoing connection sends the fd out, and for each
77 // process, if that connection is on its list of incoming connections
78 // (missing connections), then the fd is kept as a shared fd.
79
80 using namespace dmtcp;
81
82 // This is the first program after dmtcp_launch
83 static bool freshProcess = true;
84
85 ConnectionList::~ConnectionList()
86 {
87 }
88
89 void ConnectionList::eventHook(DmtcpEvent_t event,
90 DmtcpEventData_t *data)
91 {
92 switch (event) {
93 case DMTCP_EVENT_INIT:
94 // Delete stale connections if any.
95 deleteStaleConnections();
96 if (freshProcess) {
97 scanForPreExisting();
98 }
99 break;
100
101 case DMTCP_EVENT_PRE_EXEC:
102 {
103 jalib::JBinarySerializeWriterRaw wr("", data->serializerInfo.fd);
104 serialize(wr);
105 }
106 break;
107
108 case DMTCP_EVENT_POST_EXEC:
109 {
110 freshProcess = false;
111 jalib::JBinarySerializeReaderRaw rd("", data->serializerInfo.fd);
112 serialize(rd);
113 deleteStaleConnections();
114 }
115 break;
116
117 case DMTCP_EVENT_RESTART:
118 postRestart();
119
120 break;
121
122 case DMTCP_EVENT_THREADS_SUSPEND:
123 preLockSaveOptions();
124 break;
125
126 case DMTCP_EVENT_LEADER_ELECTION:
127 JTRACE("locking...");
128 preCkptFdLeaderElection();
129 JTRACE("locked");
130 break;
131
132 case DMTCP_EVENT_DRAIN:
133 JTRACE("draining...");
134 drain();
135 JTRACE("drained");
136 break;
137
138 case DMTCP_EVENT_WRITE_CKPT:
139 JTRACE("preCkpt...");
140 preCkpt();
141 JTRACE("done preCkpt");
142 break;
143
144 case DMTCP_EVENT_REFILL:
145 refill(data->refillInfo.isRestart);
146 break;
147
148 case DMTCP_EVENT_THREADS_RESUME:
149 resume(data->resumeInfo.isRestart);
150 break;
151
152 case DMTCP_EVENT_REGISTER_NAME_SERVICE_DATA:
153 registerNSData(data->nameserviceInfo.isRestart);
154 break;
155
156 case DMTCP_EVENT_SEND_QUERIES:
157 sendQueries(data->nameserviceInfo.isRestart);
158 break;
159
160 default:
161 break;
162 }
163
164 return;
165 }
166
167 static bool _isBadFd(int fd)
168 {
169 errno = 0;
170 return _real_fcntl(fd, F_GETFL, 0) == -1 && errno == EBADF;
171 }
172
173 //static ConnectionList *connectionList = NULL;
174 //ConnectionList& ConnectionList::instance()
175 //{
176 // if (connectionList == NULL) {
177 // connectionList = new ConnectionList();
178 // }
179 // return *connectionList;
180 //}
181
182 void ConnectionList::resetOnFork()
183 {
184 JASSERT(pthread_mutex_destroy(&_lock) == 0) (JASSERT_ERRNO);
185 JASSERT(pthread_mutex_init(&_lock, NULL) == 0) (JASSERT_ERRNO);
186 }
187
188
189 void ConnectionList::deleteStaleConnections()
190 {
191 //build list of stale connections
192 vector<int> staleFds;
193 for (FdToConMapT::iterator i = _fdToCon.begin(); i != _fdToCon.end(); ++i) {
194 if (_isBadFd(i->first)) {
195 staleFds.push_back(i->first);
196 }
197 }
198
199 #ifdef DEBUG
200 if (staleFds.size() > 0) {
201 ostringstream out;
202 out << "\tDevice \t\t->\t File Descriptor -> ConnectionId\n";
203 out << "==================================================\n";
204 for (size_t i = 0; i < staleFds.size(); ++i) {
205 Connection *c = getConnection(staleFds[i]);
206
207 out << "\t[" << jalib::XToString(staleFds[i]) << "]"
208 << c->str()
209 << "\t->\t" << staleFds[i]
210 << "\t->\t" << c->id() << "\n";
211 }
212 out << "==================================================\n";
213 JTRACE("Deleting Stale Connections") (out.str());
214 }
215 #endif
216
217 //delete all the stale connections
218 for (size_t i = 0; i < staleFds.size(); ++i) {
219 processClose(staleFds[i]);
220 }
221 }
222
223 void ConnectionList::serialize(jalib::JBinarySerializer& o)
224 {
225 JSERIALIZE_ASSERT_POINT("dmtcp-serialized-connection-table!v0.07");
226
227 JSERIALIZE_ASSERT_POINT("ConnectionIdentifier:");
228 ConnectionIdentifier::serialize(o);
229
230 JSERIALIZE_ASSERT_POINT("ConnectionList:");
231
232 uint32_t numCons = _connections.size();
233 o & numCons;
234
235 if (o.isWriter()) {
236 for (iterator i=_connections.begin(); i!=_connections.end(); ++i) {
237 ConnectionIdentifier key = i->first;
238 Connection& con = *i->second;
239 uint32_t type = con.conType();
240
241 JSERIALIZE_ASSERT_POINT("[StartConnection]");
242 o & key & type;
243 con.serialize(o);
244 JSERIALIZE_ASSERT_POINT("[EndConnection]");
245 }
246 } else {
247 while (numCons-- > 0) {
248 ConnectionIdentifier key;
249 int type = -1;
250 Connection* con = NULL;
251
252 JSERIALIZE_ASSERT_POINT("[StartConnection]");
253 o & key & type;
254 con = createDummyConnection(type);
255 JASSERT(con != NULL) (key);
256 con->serialize(o);
257 _connections[key] = con;
258 const vector<int32_t>& fds = con->getFds();
259 for (size_t i = 0; i < fds.size(); i++) {
260 _fdToCon[fds[i]] = con;
261 }
262 JSERIALIZE_ASSERT_POINT("[EndConnection]");
263 }
264 }
265 JSERIALIZE_ASSERT_POINT("EOF");
266 }
267
268 void ConnectionList::list()
269 {
270 ostringstream o;
271 o << "\n";
272 for (iterator i = begin(); i != end(); i++) {
273 Connection *c = i->second;
274 vector<int> fds = c->getFds();
275 for (size_t j = 0; j<fds.size(); j++) {
276 o << fds[j];
277 if (j < fds.size() - 1)
278 o << "," ;
279 }
280 o << "\t" << i->first << "\t" << c->str();
281 o << "\n";
282 }
283 JTRACE("ConnectionList") (dmtcp_get_uniquepid_str()) (o.str());
284 }
285
286 Connection*
287 ConnectionList::getConnection(const ConnectionIdentifier& id)
288 {
289 if (_connections.find(id) == _connections.end()) {
290 return NULL;
291 }
292 return _connections[id];
293 }
294
295 Connection *ConnectionList::getConnection(int fd)
296 {
297 if (_fdToCon.find(fd) == _fdToCon.end()) {
298 return NULL;
299 }
300 return _fdToCon[fd];
301 }
302
303 void ConnectionList::add(int fd, Connection* c)
304 {
305 _lock_tbl();
306
307 if (_fdToCon.find(fd) != _fdToCon.end()) {
308 /* In ordinary situations, we never exercise this path since we already
309 * capture close() and remove the connection. However, there is one
310 * particular case where this assumption fails -- when gblic opens a socket
311 * using socket() but closes it using the internal close_not_cancel() thus
312 * bypassing our close wrapper. This behavior is observed when dealing with
313 * getaddrinfo().
314 */
315 processCloseWork(fd);
316 }
317
318 if( _connections.find(c->id()) == _connections.end() )
319 _connections[c->id()] = c;
320 c->addFd(fd);
321 _fdToCon[fd] = c;
322 _unlock_tbl();
323 }
324
325 void ConnectionList::processCloseWork(int fd)
326 {
327 Connection *con = _fdToCon[fd];
328 _fdToCon.erase(fd);
329 con->removeFd(fd);
330 if (con->numFds() == 0) {
331 _connections.erase(con->id());
332 delete con;
333 }
334 }
335
336 void ConnectionList::processClose(int fd)
337 {
338 if (_fdToCon.find(fd) != _fdToCon.end()) {
339 _lock_tbl();
340 processCloseWork(fd);
341 _unlock_tbl();
342 }
343 }
344
345 void ConnectionList::processDup(int oldfd, int newfd)
346 {
347 if (oldfd == newfd) return;
348 if (_fdToCon.find(newfd) != _fdToCon.end()) {
349 processClose(newfd);
350 }
351
352 // Add only if the oldfd was already in the _fdToCon table.
353 if (_fdToCon.find(oldfd) != _fdToCon.end()) {
354 _lock_tbl();
355 Connection *con = _fdToCon[oldfd];
356 _fdToCon[newfd] = con;
357 con->addFd(newfd);
358 _unlock_tbl();
359 }
360 }
361
362 /*****************************************************/
363 /*****************************************************/
364 /*****************************************************/
365 /*****************************************************/
366 /*****************************************************/
367 /*****************************************************/
368 /*****************************************************/
369
370 void ConnectionList::preLockSaveOptions()
371 {
372 deleteStaleConnections();
373 list();
374 // Save Options for each Fd (We need to do it here instead of in
375 // preCkptFdLeaderElection because we want to restore the correct owner
376 // in refill).
377 for (iterator i = begin(); i != end(); ++i) {
378 Connection *con = i->second;
379 con->saveOptions();
380 }
381 }
382
383 void ConnectionList::preCkptFdLeaderElection()
384 {
385 deleteStaleConnections();
386 for (iterator i = begin(); i != end(); ++i) {
387 Connection *con = i->second;
388 JASSERT(con->numFds() > 0);
389 con->doLocking();
390 }
391 }
392
393 void ConnectionList::drain()
394 {
395 for (iterator i = begin(); i != end(); ++i) {
396 Connection* con = i->second;
397 con->checkLocking();
398 if (con->hasLock()) {
399 con->drain();
400 }
401 }
402 }
403
404 void ConnectionList::preCkpt()
405 {
406 for (iterator i = begin(); i != end(); ++i) {
407 Connection* con = i->second;
408 if (con->hasLock()) {
409 con->preCkpt();
410 }
411 }
412 }
413
414 void ConnectionList::refill(bool isRestart)
415 {
416 for (iterator i = begin(); i != end(); ++i) {
417 Connection *con = i->second;
418 if (con->hasLock()) {
419 con->refill(isRestart);
420 con->restoreOptions();
421 }
422 }
423 if (isRestart) {
424 JTRACE("Waiting for Missing Cons");
425 sendReceiveMissingFds();
426 JTRACE("Done waiting for Missing Cons");
427 }
428 }
429
430 void ConnectionList::resume(bool isRestart)
431 {
432 for (iterator i = begin(); i != end(); ++i) {
433 Connection *con = i->second;
434 if (con->hasLock()) {
435 con->resume(isRestart);
436 }
437 }
438 }
439
440 void ConnectionList::postRestart()
441 {
442 // Here we modify the restore algorithm by splitting it into two parts. In the
443 // first part we restore all the connections except the PTY_SLAVE types and
444 // in the second part we restore only PTY_SLAVE _connections. This is done to
445 // make sure that by the time we are trying to restore a PTY_SLAVE
446 // connection, its corresponding PTY_MASTER connection has already been
447 // restored.
448 // UPDATE: We also restore the files for which the we didn't have the lock in
449 // second iteration along with PTY_SLAVEs
450 // Part 1: Restore all but Pseudo-terminal slaves and file connection which
451 // were not checkpointed
452 for (iterator i = begin(); i != end(); ++i) {
453 Connection *con = i->second;
454 if (!con->hasLock()) continue;
455
456 // TODO: FIXME: Add support for Socketpairs.
457 // if (con->conType() == Connection::TCP) {
458 // TcpConnection *tcpCon =(TcpConnection *) con;
459 // if (tcpCon->peerType() == TcpConnection::PEER_SOCKETPAIR) {
460 // ConnectionIdentifier peerId = tcpCon->getSocketpairPeerId();
461 // TcpConnection *peerCon = (TcpConnection*) getConnection(peerId);
462 // if (peerCon != NULL) {
463 // tcpCon->restoreSocketPair(peerCon);
464 // continue;
465 // }
466 // }
467 // }
468 con->postRestart();
469 }
470
471 registerIncomingCons();
472 }
473
474
475 void ConnectionList::registerIncomingCons()
476 {
477 int protected_fd = protectedFd();
478 // Add receive-fd data socket.
479 static struct sockaddr_un fdReceiveAddr;
480 static socklen_t fdReceiveAddrLen;
481
482 memset(&fdReceiveAddr, 0, sizeof(fdReceiveAddr));
483 jalib::JSocket sock(_real_socket(AF_UNIX, SOCK_DGRAM, 0));
484 JASSERT(sock.isValid());
485 sock.changeFd(protected_fd);
486 fdReceiveAddr.sun_family = AF_UNIX;
487 JASSERT(_real_bind(protected_fd,
488 (struct sockaddr*) &fdReceiveAddr,
489 sizeof(fdReceiveAddr.sun_family)) == 0) (JASSERT_ERRNO);
490
491 fdReceiveAddrLen = sizeof(fdReceiveAddr);
492 JASSERT(getsockname(protected_fd,
493 (struct sockaddr *)&fdReceiveAddr,
494 &fdReceiveAddrLen) == 0);
495
496
497 vector<const char *> incomingCons;
498 ostringstream in, out;
499 for (iterator i = begin(); i != end(); ++i) {
500 Connection *con = i->second;
501 // Check comments in FileConnList::postRestart() for the explanation
502 // about isPreExistingCTTY.
503 if (!con->hasLock() && !con->isStdio() && !con->isPreExistingCTTY()) {
504 incomingCons.push_back((const char*)&i->first);
505 in << "\n\t" << con->str() << i->first;
506 } else {
507 out << "\n\t" << con->str() << i->first;
508 }
509 }
510 JTRACE("Incoming/Outgoing Cons") (in.str()) (out.str());
511 numIncomingCons = incomingCons.size();
512 if (numIncomingCons > 0) {
513 SharedData::registerIncomingCons(incomingCons, fdReceiveAddr,
514 fdReceiveAddrLen);
515 }
516 }
517
518 void ConnectionList::sendReceiveMissingFds()
519 {
520 size_t i;
521 vector<int> outgoingCons;
522 SharedData::IncomingConMap *maps;
523 uint32_t nmaps;
524 SharedData::getMissingConMaps(&maps, &nmaps);
525 for (i = 0; i < nmaps; i++) {
526 ConnectionIdentifier *id = (ConnectionIdentifier*) maps[i].id;
527 Connection *con = getConnection(*id);
528 if (con != NULL && con->hasLock()) {
529 outgoingCons.push_back(i);
530 }
531 }
532
533 fd_set rfds;
534 fd_set wfds;
535 int restoreFd = protectedFd();
536 size_t numOutgoingCons = outgoingCons.size();
537 while (numOutgoingCons > 0 || numIncomingCons > 0) {
538 FD_ZERO(&wfds);
539 if (outgoingCons.size() > 0) {
540 FD_SET(restoreFd, &wfds);
541 }
542 FD_ZERO(&rfds);
543 if (numIncomingCons > 0) {
544 FD_SET(restoreFd, &rfds);
545 }
546
547 int ret = _real_select(restoreFd+1, &rfds, &wfds, NULL, NULL);
548 JASSERT(ret != -1) (JASSERT_ERRNO);
549
550 if (numOutgoingCons > 0 && FD_ISSET(restoreFd, &wfds)) {
551 size_t idx = outgoingCons.back();
552 outgoingCons.pop_back();
553 ConnectionIdentifier *id = (ConnectionIdentifier*) maps[idx].id;
554 Connection *con = getConnection(*id);
555 JTRACE("Sending Missing Con") (*id);
556 JASSERT(Util::sendFd(restoreFd, con->getFds()[0], id, sizeof(*id),
557 maps[idx].addr, maps[idx].len) != -1);
558 numOutgoingCons--;
559 }
560
561 if (numIncomingCons > 0 && FD_ISSET(restoreFd, &rfds)) {
562 ConnectionIdentifier id;
563 int fd = Util::receiveFd(restoreFd, &id, sizeof(id));
564 JASSERT(fd != -1);
565 Connection *con = getConnection(id);
566 JTRACE("Received Missing Con") (id);
567 JASSERT(con != NULL);
568 Util::dupFds(fd, con->getFds());
569 numIncomingCons--;
570 }
571 }
572 dmtcp_close_protected_fd(restoreFd);
573 }