/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- _isVimApp
- _isBlacklistedFile
- ptmxTestPacketMode
- readyToRead
- readOnePacket
- ptmxReadAll
- writeOnePacket
- ptmxWriteAll
- _preExistingCTTY
- drain
- preRefill
- refill
- postRestart
- serializeSubClass
- doLocking
- calculateRelativePath
- drain
- preCkpt
- refill
- resume
- refreshPath
- postRestart
- checkDup
- openFile
- areFilesEqual
- writeFileFromFd
- getSavedFilePath
- serializeSubClass
- drain
- refill
- refreshPath
- postRestart
- openFile
- serializeSubClass
- postRestart
- on_mq_close
- on_mq_notify
- drain
- refill
- postRestart
- serializeSubClass
1 /****************************************************************************
2 * Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
6 * *
7 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 #include <sys/ioctl.h>
23 #include <sys/select.h>
24 #include <sys/un.h>
25 #include <unistd.h>
26 #include <fcntl.h>
27 #include <sys/file.h>
28 #include <termios.h>
29 #include <iostream>
30 #include <ios>
31 #include <fstream>
32 #include <linux/limits.h>
33 #include <arpa/inet.h>
34
35 #include "dmtcp.h"
36 #include "shareddata.h"
37 #include "util.h"
38 #include "jsocket.h"
39 #include "jassert.h"
40 #include "jfilesystem.h"
41 #include "jconvert.h"
42
43 #include "fileconnection.h"
44 #include "filewrappers.h"
45
46 using namespace dmtcp;
47
48 static bool ptmxTestPacketMode(int masterFd);
49 static ssize_t ptmxReadAll(int fd, const void *origBuf, size_t maxCount);
50 static ssize_t ptmxWriteAll(int fd, const void *buf, bool isPacketMode);
51 static void writeFileFromFd(int fd, int destFd);
52 static bool areFilesEqual(int fd, int destFd, size_t size);
53
54 static bool _isVimApp()
55 {
56 static int isVimApp = -1;
57
58 if (isVimApp == -1) {
59 string progName = jalib::Filesystem::GetProgramName();
60
61 if (progName == "vi" || progName == "vim" || progName == "vim-normal" ||
62 progName == "vim.basic" || progName == "vim.tiny" ||
63 progName == "vim.gtk" || progName == "vim.gnome") {
64 isVimApp = 1;
65 } else {
66 isVimApp = 0;
67 }
68 }
69 return isVimApp;
70 }
71
72 static bool _isBlacklistedFile(string& path)
73 {
74 if ((Util::strStartsWith(path, "/dev/") &&
75 !Util::strStartsWith(path, "/dev/shm/")) ||
76 Util::strStartsWith(path, "/proc/") ||
77 Util::strStartsWith(path, dmtcp_get_tmpdir())) {
78 return true;
79 }
80 return false;
81 }
82
83 /*****************************************************************************
84 * Pseudo-TTY Connection
85 *****************************************************************************/
86
87 static bool ptmxTestPacketMode(int masterFd)
88 {
89 char tmp_buf[100];
90 int slave_fd, ioctlArg, rc;
91 fd_set read_fds;
92 struct timeval zeroTimeout = {0, 0}; /* Zero: will use to poll, not wait.*/
93
94 _real_ptsname_r(masterFd, tmp_buf, 100);
95 /* permissions not used, but _real_open requires third arg */
96 slave_fd = _real_open(tmp_buf, O_RDWR, 0666);
97
98 /* A. Drain master before testing.
99 Ideally, DMTCP has already drained it and preserved any information
100 about exceptional conditions in command byte, but maybe we accidentally
101 caused a new command byte in packet mode. */
102 /* Note: if terminal was in packet mode, and the next read would be
103 a non-data command byte, then there's no easy way for now to detect and
104 restore this. ?? */
105 /* Note: if there was no data to flush, there might be no command byte,
106 even in packet mode. */
107 tcflush(slave_fd, TCIOFLUSH);
108 /* If character already transmitted(usual case for pty), then this flush
109 will tell master to flush it. */
110 tcflush(masterFd, TCIFLUSH);
111
112 /* B. Now verify that read_fds has no more characters to read. */
113 ioctlArg = 1;
114 ioctl(masterFd, TIOCINQ, &ioctlArg);
115 /* Now check if there's a command byte still to read. */
116 FD_ZERO(&read_fds);
117 FD_SET(masterFd, &read_fds);
118 select(masterFd + 1, &read_fds, NULL, NULL, &zeroTimeout);
119 if (FD_ISSET(masterFd, &read_fds)) {
120 // Clean up someone else's command byte from packet mode.
121 // FIXME: We should restore this on resume/restart.
122 rc = read(masterFd, tmp_buf, 100);
123 JASSERT(rc == 1) (rc) (masterFd);
124 }
125
126 /* C. Now we're ready to do the real test. If in packet mode, we should
127 see command byte of TIOCPKT_DATA(0) with data. */
128 tmp_buf[0] = 'x'; /* Don't set '\n'. Could be converted to "\r\n". */
129 /* Give the masterFd something to read. */
130 JWARNING((rc = write(slave_fd, tmp_buf, 1)) == 1) (rc) .Text("write failed");
131 //tcdrain(slave_fd);
132 _real_close(slave_fd);
133
134 /* Read the 'x': If we also see a command byte, it's packet mode */
135 rc = read(masterFd, tmp_buf, 100);
136
137 /* D. Check if command byte packet exists, and chars rec'd is longer by 1. */
138 return(rc == 2 && tmp_buf[0] == TIOCPKT_DATA && tmp_buf[1] == 'x');
139 }
140
141 // Also record the count read on each iteration, in case it's packet mode
142 static bool readyToRead(int fd)
143 {
144 fd_set read_fds;
145 struct timeval zeroTimeout = {0, 0}; /* Zero: will use to poll, not wait.*/
146 FD_ZERO(&read_fds);
147 FD_SET(fd, &read_fds);
148 select(fd + 1, &read_fds, NULL, NULL, &zeroTimeout);
149 return FD_ISSET(fd, &read_fds);
150 }
151
152 // returns 0 if not ready to read; else returns -1, or size read incl. header
153 static ssize_t readOnePacket(int fd, const void *buf, size_t maxCount)
154 {
155 typedef int hdr;
156 ssize_t rc = 0;
157 // Read single packet: rc > 0 will be true for at most one iteration.
158 while (readyToRead(fd) && rc <= 0) {
159 rc = read(fd,(char *)buf+sizeof(hdr), maxCount-sizeof(hdr));
160 *(hdr *)buf = rc; // Record the number read in header
161 if (rc >=(ssize_t) (maxCount-sizeof(hdr))) {
162 rc = -1; errno = E2BIG; // Invoke new errno for buf size not large enough
163 }
164 if (rc == -1 && errno != EAGAIN && errno != EINTR)
165 break; /* Give up; bad error */
166 }
167 return(rc <= 0 ? rc : rc+sizeof(hdr));
168 }
169
170 // rc < 0 => error; rc == sizeof(hdr) => no data to read;
171 // rc > 0 => saved w/ count hdr
172 static ssize_t ptmxReadAll(int fd, const void *origBuf, size_t maxCount)
173 {
174 typedef int hdr;
175 char *buf =(char *)origBuf;
176 int rc;
177 while ((rc = readOnePacket(fd, buf, maxCount)) > 0) {
178 buf += rc;
179 }
180 *(hdr *)buf = 0; /* Header count of zero means we're done */
181 buf += sizeof(hdr);
182 JASSERT(rc < 0 || buf -(char *)origBuf > 0) (rc) (origBuf) ((void *)buf);
183 return(rc < 0 ? rc : buf -(char *)origBuf);
184 }
185
186 // The hdr contains the size of the full buffer([hdr, data]).
187 // Return size of origBuf written: includes packets of form: [hdr, data]
188 // with hdr holding size of data. Last hdr has value zero.
189 // Also record the count written on each iteration, in case it's packet mode.
190 static ssize_t writeOnePacket(int fd, const void *origBuf, bool isPacketMode)
191 {
192 typedef int hdr;
193 int count = *(hdr *)origBuf;
194 int cum_count = 0;
195 int rc = 0; // Trigger JASSERT if not modified below.
196 if (count == 0)
197 return sizeof(hdr); // count of zero means we're done, hdr consumed
198 // FIXME: It would be nice to restore packet mode(flow control, etc.)
199 // For now, we ignore it.
200 if (count == 1 && isPacketMode)
201 return sizeof(hdr) + 1;
202 while (cum_count < count) {
203 rc = write(fd,(char *)origBuf+sizeof(hdr)+cum_count, count-cum_count);
204 if (rc == -1 && errno != EAGAIN && errno != EINTR)
205 break; /* Give up; bad error */
206 if (rc >= 0)
207 cum_count += rc;
208 }
209 JASSERT(rc != 0 && cum_count == count)
210 (JASSERT_ERRNO) (rc) (count) (cum_count);
211 return(rc < 0 ? rc : cum_count+sizeof(hdr));
212 }
213
214 static ssize_t ptmxWriteAll(int fd, const void *buf, bool isPacketMode)
215 {
216 typedef int hdr;
217 ssize_t cum_count = 0;
218 ssize_t rc;
219 while ((rc = writeOnePacket(fd,(char *)buf+cum_count, isPacketMode))
220 >(ssize_t)sizeof(hdr)) {
221 cum_count += rc;
222 }
223 JASSERT(rc < 0 || rc == sizeof(hdr)) (rc) (cum_count);
224 cum_count += sizeof(hdr); /* Account for last packet: 'done' hdr w/ 0 data */
225 return(rc <= 0 ? rc : cum_count);
226 }
227
228 PtyConnection::PtyConnection(int fd, const char *path,
229 int flags, mode_t mode, int type)
230 : Connection (PTY)
231 , _flags(flags)
232 , _mode(mode)
233 , _preExistingCTTY(false)
234 {
235 char buf[PTS_PATH_MAX];
236 _type = type;
237 switch (_type) {
238
239 case PTY_DEV_TTY:
240 _ptsName = path;
241 break;
242
243 case PTY_PARENT_CTTY:
244 case PTY_CTTY:
245 _ptsName = path;
246 SharedData::getVirtPtyName(path, buf, sizeof(buf));
247 if (strlen(buf) == 0) {
248 SharedData::createVirtualPtyName(path, buf, sizeof(buf));
249 }
250 _virtPtsName = buf;
251 JTRACE("creating CTTY connection") (_ptsName) (_virtPtsName);
252
253 break;
254
255 case PTY_MASTER:
256 _masterName = path;
257 JASSERT(_real_ptsname_r(fd, buf, sizeof(buf)) == 0) (JASSERT_ERRNO);
258 _ptsName = buf;
259
260 // glibc allows only 20 char long buf
261 // Check if there is enough room to insert the string "dmtcp_" before the
262 // terminal number, if not then we ASSERT here.
263 JASSERT((strlen(buf) + strlen("v")) <= 20)
264 .Text("string /dev/pts/<n> too long, can not be virtualized."
265 "Once possible workarong here is to replace the string"
266 "\"dmtcp_\" with something short like \"d_\" or even "
267 "\"d\" and recompile DMTCP");
268
269 // Generate new Unique buf
270 SharedData::createVirtualPtyName(_ptsName.c_str(), buf, sizeof(buf));
271 _virtPtsName = buf;
272 JTRACE("creating ptmx connection") (_ptsName) (_virtPtsName);
273 break;
274
275 case PTY_SLAVE:
276 _ptsName = path;
277 SharedData::getVirtPtyName(path, buf, sizeof(buf));
278 _virtPtsName = buf;
279 JASSERT(strlen(buf) != 0) (path);
280 JTRACE("creating pts connection") (_ptsName) (_virtPtsName);
281 break;
282
283 case PTY_BSD_MASTER:
284 _masterName = path;
285 break;
286
287 case PTY_BSD_SLAVE:
288 _ptsName = path;
289 break;
290
291 default:
292 break;
293 }
294 }
295
296 void PtyConnection::drain()
297 {
298 if (_type == PTY_MASTER) {
299 const int maxCount = 10000;
300 char buf[maxCount];
301 int numRead, numWritten;
302 // _fds[0] is master fd
303 numRead = ptmxReadAll(_fds[0], buf, maxCount);
304 _ptmxIsPacketMode = ptmxTestPacketMode(_fds[0]);
305 JTRACE("_fds[0] is master(/dev/ptmx)") (_fds[0]) (_ptmxIsPacketMode);
306 numWritten = ptmxWriteAll(_fds[0], buf, _ptmxIsPacketMode);
307 JASSERT(numRead == numWritten) (numRead) (numWritten);
308 }
309 JASSERT((_type == PTY_CTTY || _type == PTY_PARENT_CTTY) || _flags != -1);
310 if (tcgetpgrp(_fds[0]) != -1) {
311 _isControllingTTY = true;
312 } else {
313 _isControllingTTY = false;
314 }
315 }
316
317 void PtyConnection::preRefill(bool isRestart)
318 {
319 if (!isRestart) {
320 return;
321 }
322
323 if (_type == PTY_SLAVE || _type == PTY_BSD_SLAVE) {
324 JASSERT(_ptsName.compare("?") != 0);
325 JTRACE("Restoring PTY slave") (_fds[0]) (_ptsName) (_virtPtsName);
326 if (_type == PTY_SLAVE) {
327 char buf[32];
328 SharedData::getRealPtyName(_virtPtsName.c_str(), buf, sizeof(buf));
329 JASSERT(strlen(buf) > 0) (_virtPtsName) (_ptsName);
330 _ptsName = buf;
331 }
332
333 /* If the process has called setsid(), and has no controlling terminal, a
334 * call to open() will set this terminal as its controlling. To avoid this,
335 * we pass O_NOCTTY flag if this terminal was not the controlling terminal
336 * during checkpoint phase.
337 */
338 int extraFlags = 0;//_isControllingTTY ? 0 : O_NOCTTY;
339 int tempfd = _real_open(_ptsName.c_str(), _flags | extraFlags);
340 JASSERT(tempfd >= 0) (_virtPtsName) (_ptsName) (JASSERT_ERRNO)
341 .Text("Error Opening PTS");
342
343 JTRACE("Restoring PTS real") (_ptsName) (_virtPtsName) (_fds[0]);
344 Util::dupFds(tempfd, _fds);
345 }
346 }
347
348 void PtyConnection::refill(bool isRestart)
349 {
350 if (!isRestart) {
351 return;
352 }
353 if (_type == PTY_DEV_TTY) {
354 /* If a process has called setsid(), it is possible that there is no
355 * controlling terminal while in the postRestart phase as processes are
356 * still creating the pseudo-tty master-slave pairs. By the time we get
357 * into the refill mode, we should have all the pseudo-ttys present.
358 */
359 int tempfd = _real_open("/dev/tty", O_RDWR, 0);
360 JASSERT(tempfd >= 0) (tempfd) (JASSERT_ERRNO)
361 .Text("Error opening controlling terminal /dev/tty");
362
363 JTRACE("Restoring /dev/tty for the process") (_fds[0]);
364 _ptsName = _virtPtsName = "/dev/tty";
365 Util::dupFds(tempfd, _fds);
366 }
367 }
368
369 void PtyConnection::postRestart()
370 {
371 JASSERT(_fds.size() > 0);
372 if (_type == PTY_SLAVE || _type == PTY_BSD_SLAVE || _type == PTY_DEV_TTY) {
373 return;
374 }
375
376 int tempfd = -1;
377 int extraFlags = _isControllingTTY ? 0 : O_NOCTTY;
378
379 switch (_type) {
380 case PTY_INVALID:
381 //tempfd = _real_open("/dev/null", O_RDWR);
382 JTRACE("Restoring invalid PTY.") (id());
383 return;
384
385 case PTY_CTTY:
386 case PTY_PARENT_CTTY:
387 {
388 string controllingTty;
389 string stdinDeviceName;
390 if (_type == PTY_CTTY) {
391 controllingTty = jalib::Filesystem::GetControllingTerm();
392 } else {
393 controllingTty = jalib::Filesystem::GetControllingTerm(getppid());
394 }
395 stdinDeviceName = (jalib::Filesystem::GetDeviceName(STDIN_FILENO));
396 if (controllingTty.length() > 0 &&
397 _real_access(controllingTty.c_str(), R_OK | W_OK) == 0) {
398 tempfd = _real_open(controllingTty.c_str(), _fcntlFlags);
399 JASSERT(tempfd >= 0) (tempfd) (controllingTty) (JASSERT_ERRNO)
400 .Text("Error Opening the terminal attached with the process");
401 } else {
402 if (_type == PTY_CTTY) {
403 JTRACE("Unable to restore controlling terminal attached with the "
404 "parent process.\n"
405 "Replacing it with current STDIN")
406 (stdinDeviceName);
407 } else {
408 JWARNING(false) (stdinDeviceName)
409 .Text("Unable to restore controlling terminal attached with the "
410 "parent process.\n"
411 "Replacing it with current STDIN");
412 }
413 JWARNING(Util::strStartsWith(stdinDeviceName, "/dev/pts/") ||
414 stdinDeviceName == "/dev/tty") (stdinDeviceName)
415 .Text("Controlling terminal not bound to a terminal device.");
416
417 if (Util::isValidFd(STDIN_FILENO)) {
418 tempfd = _real_dup(STDIN_FILENO);
419 } else if (Util::isValidFd(STDOUT_FILENO)) {
420 tempfd = _real_dup(STDOUT_FILENO);
421 } else {
422 JASSERT("Controlling terminal and STDIN/OUT not found.");
423 }
424 }
425
426 JTRACE("Restoring parent CTTY for the process")
427 (controllingTty) (_fds[0]);
428
429 _ptsName = controllingTty;
430 SharedData::insertPtyNameMap(_virtPtsName.c_str(), _ptsName.c_str());
431 break;
432 }
433
434 case PTY_MASTER:
435 {
436 char pts_name[80];
437
438 tempfd = _real_open("/dev/ptmx", _flags | extraFlags);
439 JASSERT(tempfd >= 0) (tempfd) (JASSERT_ERRNO)
440 .Text("Error Opening /dev/ptmx");
441
442 JASSERT(grantpt(tempfd) >= 0) (tempfd) (JASSERT_ERRNO);
443 JASSERT(unlockpt(tempfd) >= 0) (tempfd) (JASSERT_ERRNO);
444 JASSERT(_real_ptsname_r(tempfd, pts_name, 80) == 0)
445 (tempfd) (JASSERT_ERRNO);
446
447 _ptsName = pts_name;
448 SharedData::insertPtyNameMap(_virtPtsName.c_str(), _ptsName.c_str());
449
450 if (_type == PTY_MASTER) {
451 int packetMode = _ptmxIsPacketMode;
452 ioctl(_fds[0], TIOCPKT, &packetMode); /* Restore old packet mode */
453 }
454
455 JTRACE("Restoring /dev/ptmx") (_fds[0]) (_ptsName) (_virtPtsName);
456 break;
457 }
458 case PTY_BSD_MASTER:
459 {
460 JTRACE("Restoring BSD Master Pty") (_masterName) (_fds[0]);
461 //string slaveDeviceName =
462 //_masterName.replace(0, strlen("/dev/pty"), "/dev/tty");
463
464 tempfd = _real_open(_masterName.c_str(), _flags | extraFlags);
465
466 // FIXME: If unable to open the original BSD Master Pty, we should try to
467 // open another one until we succeed and then open slave device
468 // accordingly.
469 // This can be done by creating a function openBSDMaster, which will try
470 // to open the original master device, but if unable to do so, it would
471 // keep on trying all the possible BSD Master devices until one is
472 // opened. It should then create a mapping between original Master/Slave
473 // device name and current Master/Slave device name.
474 JASSERT(tempfd >= 0) (tempfd) (JASSERT_ERRNO)
475 .Text("Error Opening BSD Master Pty.(Already in use?)");
476 break;
477 }
478 default:
479 {
480 // should never reach here
481 JASSERT(false) .Text("Should never reach here.");
482 }
483 }
484 Util::dupFds(tempfd, _fds);
485 }
486
487 void PtyConnection::serializeSubClass(jalib::JBinarySerializer& o)
488 {
489 JSERIALIZE_ASSERT_POINT("PtyConnection");
490 o & _ptsName & _virtPtsName & _masterName & _type;
491 o & _flags & _mode & _preExistingCTTY;
492 JTRACE("Serializing PtyConn.") (_ptsName) (_virtPtsName);
493 }
494
495 /*****************************************************************************
496 * File Connection
497 *****************************************************************************/
498
499 // Upper limit on filesize for files that are automatically chosen for ckpt.
500 // Default 100MB
501 #define MAX_FILESIZE_TO_AUTOCKPT (100 * 1024 * 1024)
502
503 void FileConnection::doLocking()
504 {
505 if (Util::strStartsWith(_path, "/proc/")) {
506 int index = 6;
507 char *rest;
508 pid_t proc_pid = strtol(&_path[index], &rest, 0);
509 if (proc_pid > 0 && *rest == '/') {
510 _type = FILE_PROCFS;
511 if (proc_pid != getpid()) {
512 return;
513 }
514 }
515 }
516 Connection::doLocking();
517 _ckpted_file = false;
518 }
519
520 void FileConnection::calculateRelativePath()
521 {
522 string cwd = jalib::Filesystem::GetCWD();
523 if (_path.compare(0, cwd.length(), cwd) == 0) {
524 /* CWD = "/A/B", FileName = "/A/B/C/D" ==> relPath = "C/D" */
525 _rel_path = _path.substr(cwd.length() + 1);
526 } else {
527 _rel_path = "*";
528 }
529 }
530
531 void FileConnection::drain()
532 {
533 struct stat statbuf;
534 JASSERT(_fds.size() > 0);
535
536 _ckpted_file = false;
537
538 // Read the current file descriptor offset
539 _offset = lseek(_fds[0], 0, SEEK_CUR);
540 fstat(_fds[0], &statbuf);
541 _st_dev = statbuf.st_dev;
542 _st_ino = statbuf.st_ino;
543 _st_size = statbuf.st_size;
544
545 if (_type == FILE_PROCFS) {
546 return;
547 }
548
549 if (statbuf.st_nlink == 0) {
550 _type = FILE_DELETED;
551 } else if (Util::strStartsWith(jalib::Filesystem::BaseName(_path), ".nfs")) {
552 // Files deleted on NFS have the .nfsXXXX format.
553 _type = FILE_DELETED;
554 } else {
555 // Update _path to reflect the current state. The file path might be a new
556 // one after restart and if the current process wasn't the leader, it never
557 // had a chance to update the _path. Update it now.
558 _path = jalib::Filesystem::GetDeviceName(_fds[0]);
559 }
560
561 calculateRelativePath();
562
563 // If this file is related to supported Resource Management system
564 // handle it specially
565 if (_type == FILE_BATCH_QUEUE &&
566 dmtcp_bq_should_ckpt_file &&
567 dmtcp_bq_should_ckpt_file(_path.c_str(), &_rmtype)) {
568 JTRACE("Pre-checkpoint Torque files") (_fds.size());
569 for (unsigned int i=0; i< _fds.size(); i++)
570 JTRACE("_fds[i]=") (i) (_fds[i]);
571 _ckpted_file = true;
572 return;
573 }
574
575 if (dmtcp_must_ckpt_file && dmtcp_must_ckpt_file(_path.c_str())) {
576 _ckpted_file = true;
577 return;
578 }
579
580 if (_type == FILE_DELETED && (_flags & O_WRONLY)) {
581 return;
582 }
583 if (_isBlacklistedFile(_path)) {
584 return;
585 }
586 if (dmtcp_should_ckpt_open_files() && statbuf.st_uid == getuid()) {
587 _ckpted_file = true;
588 } else if (_type == FILE_DELETED || _type == FILE_SHM) {
589 _ckpted_file = true;
590 } else if (_isVimApp() &&
591 (Util::strEndsWith(_path, ".swp") == 0 ||
592 Util::strEndsWith(_path, ".swo") == 0)) {
593 _ckpted_file = true;
594 } else if (Util::strStartsWith(jalib::Filesystem::GetProgramName(),
595 "emacs")) {
596 _ckpted_file = true;
597 #if 0
598 } else if ((_fcntlFlags &(O_WRONLY|O_RDWR)) != 0 &&
599 _offset < _st_size &&
600 _st_size < MAX_FILESIZE_TO_AUTOCKPT &&
601 statbuf.st_uid == getuid()) {
602 // FIXME: Disable the following heuristic until we can come up with
603 // a better one
604 _ckpted_file = true;
605 #endif
606 } else {
607 _ckpted_file = false;
608 }
609 }
610
611 void FileConnection::preCkpt()
612 {
613 if (_ckpted_file) {
614 ConnectionIdentifier id;
615 JASSERT(_type != FILE_PROCFS && _type != FILE_INVALID);
616 JASSERT(SharedData::getCkptLeaderForFile(_st_dev, _st_ino, &id));
617 if (id == _id) {
618 string savedFilePath = getSavedFilePath(_path);
619 JASSERT(Util::createDirectoryTree(savedFilePath)) (savedFilePath)
620 .Text("Unable to create directory in File Path");
621
622 int destFd = _real_open(savedFilePath.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
623 S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
624 JASSERT(destFd != -1) (JASSERT_ERRNO) (_path) (savedFilePath);
625
626 JTRACE("Saving checkpointed copy of the file") (_path) (savedFilePath);
627 if (_flags & O_WRONLY) {
628 // If the file is opened() in write-only mode. Open it in readonly mode
629 // to create the ckpt copy.
630 int tmpfd = _real_open(_path.c_str(), O_RDONLY, 0);
631 JASSERT(tmpfd != -1);
632 writeFileFromFd(tmpfd, destFd);
633 _real_close(tmpfd);
634 } else {
635 writeFileFromFd(_fds[0], destFd);
636 }
637 _real_close(destFd);
638 } else {
639 JTRACE("Not checkpointing this file") (_path);
640 _ckpted_file = false;
641 }
642 }
643 }
644
645 void FileConnection::refill(bool isRestart)
646 {
647 struct stat statbuf;
648 if (!isRestart) return;
649 if (strstr(_path.c_str(), "infiniband/uverbs") ||
650 strstr(_path.c_str(), "uverbs-event")) return;
651
652 if (_ckpted_file && _fileAlreadyExists) {
653 string savedFilePath = getSavedFilePath(_path);
654 int savedFd = _real_open(savedFilePath.c_str(), O_RDONLY, 0);
655 JASSERT(savedFd != -1) (JASSERT_ERRNO) (savedFilePath);
656
657 if (!areFilesEqual(_fds[0], savedFd, _st_size)) {
658 if (_type == FILE_SHM) {
659 JWARNING(false) (_path) (savedFilePath)
660 .Text("\n"
661 "***Mapping current version of file into memory;\n"
662 " _not_ file as it existed at time of checkpoint.\n"
663 " Change this function and re-compile, if you want "
664 "different behavior.");
665 } else {
666 const char *errMsg =
667 "\n**** File already exist! Checkpointed copy can't be restored.\n"
668 " The Contents of checkpointed copy differ from the "
669 "contents of the existing copy.\n"
670 "****Delete the existing file and try again!";
671 JASSERT(false) (_path) (savedFilePath) (errMsg);
672 }
673 }
674 _real_close(savedFd);
675 }
676
677 if (!_ckpted_file) {
678 int tempfd;
679 if (_type == FILE_DELETED && (_flags & (O_WRONLY | O_RDWR))) {
680 tempfd = _real_open(_path.c_str(), _fcntlFlags | O_CREAT, 0600);
681 JASSERT(tempfd != -1) (_path) (JASSERT_ERRNO) .Text("open() failed");
682 JASSERT(truncate(_path.c_str(), _st_size) == 0)
683 (_path.c_str()) (_st_size) (JASSERT_ERRNO);
684 } else {
685
686 JASSERT(jalib::Filesystem::FileExists(_path)) (_path)
687 .Text("File not found.");
688
689 if (stat(_path.c_str() ,&statbuf) == 0 && S_ISREG(statbuf.st_mode)) {
690 if (statbuf.st_size > _st_size &&
691 ((_fcntlFlags & O_WRONLY) || (_fcntlFlags & O_RDWR))) {
692 errno = 0;
693 JASSERT(truncate(_path.c_str(), _st_size) == 0)
694 (_path.c_str()) (_st_size) (JASSERT_ERRNO);
695 } else if (statbuf.st_size < _st_size) {
696 JWARNING(false) .Text("Size of file smaller than what we expected");
697 }
698 }
699 tempfd = openFile();
700 }
701 Util::dupFds(tempfd, _fds);
702 }
703
704 errno = 0;
705 if (jalib::Filesystem::FileExists(_path) &&
706 stat(_path.c_str() ,&statbuf) == 0 && S_ISREG(statbuf.st_mode)) {
707 if (_offset <= statbuf.st_size && _offset <= _st_size) {
708 JASSERT(lseek(_fds[0], _offset, SEEK_SET) == _offset)
709 (_path) (_offset) (JASSERT_ERRNO);
710 //JTRACE("lseek(_fds[0], _offset, SEEK_SET)") (_fds[0]) (_offset);
711 } else if (_offset > statbuf.st_size || _offset > _st_size) {
712 JWARNING(false) (_path) (_offset) (_st_size) (statbuf.st_size)
713 .Text("No lseek done: offset is larger than min of old and new size.");
714 }
715 }
716 refreshPath();
717 }
718
719 void FileConnection::resume(bool isRestart)
720 {
721 if (_ckpted_file && isRestart && _type == FILE_DELETED) {
722 /* Here we want to unlink the file. We want to do it only at the time of
723 * restart, but there is no way of finding out if we are restarting or not.
724 * That is why we look for the file on disk and if it is present(it was
725 * deleted at ckpt time), then we assume that we are restarting and hence
726 * we unlink the file.
727 */
728 if (jalib::Filesystem::FileExists(_path)) {
729 JWARNING(unlink(_path.c_str()) != -1) (_path)
730 .Text("The file was unlinked at the time of checkpoint. "
731 "Unlinking it after restart failed");
732 }
733 }
734 }
735
736 void FileConnection::refreshPath()
737 {
738 string cwd = jalib::Filesystem::GetCWD();
739
740 if (_type == FILE_BATCH_QUEUE) {
741 // get new file name
742 string newpath = jalib::Filesystem::GetDeviceName(_fds[0]);
743 JTRACE("This is Resource Manager file!") (_fds[0]) (newpath) (_path) (this);
744 if (newpath != _path) {
745 JTRACE("File Manager connection _path is changed => _path = newpath!")
746 (_path) (newpath);
747 _path = newpath;
748 }
749 return;
750 }
751
752 if (dmtcp_get_new_file_path) {
753 char newpath[PATH_MAX];
754 newpath[0] = '\0';
755 dmtcp_get_new_file_path(_path.c_str(), cwd.c_str(), newpath);
756 if (newpath[0] != '\0') {
757 JASSERT(jalib::Filesystem::FileExists(newpath)) (_path) (newpath)
758 .Text("Path returned by plugin does not exist.");
759 _path = newpath;
760 return;
761 }
762 }
763 if (_rel_path != "*" && !jalib::Filesystem::FileExists(_path)) {
764 // If file at absolute path doesn't exist and file path is relative to
765 // executable current dir
766 string oldPath = _path;
767 string fullPath = cwd + "/" + _rel_path;
768 if (jalib::Filesystem::FileExists(fullPath)) {
769 _path = fullPath;
770 JTRACE("Change _path based on relative path")
771 (oldPath) (_path) (_rel_path);
772 }
773 } else if (_type == FILE_PROCFS) {
774 int index = 6;
775 char *rest;
776 char buf[64];
777 pid_t proc_pid = strtol(&_path[index], &rest, 0);
778 if (proc_pid > 0 && *rest == '/') {
779 sprintf(buf, "/proc/%d/%s", getpid(), rest);
780 _path = buf;
781 }
782 }
783 }
784
785 void FileConnection::postRestart()
786 {
787 int tempfd;
788
789 JASSERT(_fds.size() > 0);
790 if (!_ckpted_file) return;
791 _fileAlreadyExists = false;
792
793 JTRACE("Restoring File Connection") (id()) (_path);
794 string savedFilePath = getSavedFilePath(_path);
795 JASSERT(jalib::Filesystem::FileExists(savedFilePath))
796 (savedFilePath) (_path) .Text("Unable to find checkpointed copy of file");
797
798 if (_type == FILE_BATCH_QUEUE) {
799 JASSERT(dmtcp_bq_restore_file);
800 tempfd = dmtcp_bq_restore_file(_path.c_str(), savedFilePath.c_str(),
801 _fcntlFlags, _rmtype);
802 JTRACE("Restore Resource Manager File") (_path);
803 } else {
804 refreshPath();
805 JASSERT(Util::createDirectoryTree(_path)) (_path)
806 .Text("Unable to create directory in File Path");
807 /* Now try to create the file with O_EXCL. If we fail with EEXIST, there
808 * are two possible scenarios:
809 * - The file was created by a different restarting process with data from
810 * checkpointed copy. It is possible that the data is in flight, so we
811 * should wait until the next barrier to compare the data from our copy.
812 * - The file existed before restart. After the next barrier, abort if the
813 * contents differ from our checkpointed copy.
814 */
815 int fd = _real_open(_path.c_str(), O_CREAT | O_EXCL | O_RDWR,
816 S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
817 JASSERT(fd != -1 || errno == EEXIST) (_path) (JASSERT_ERRNO);
818
819 if (fd == -1) {
820 _fileAlreadyExists = true;
821 } else {
822 int srcFd = _real_open(savedFilePath.c_str(), O_RDONLY, 0);
823 JASSERT(srcFd != -1) (_path) (savedFilePath) (JASSERT_ERRNO)
824 .Text("Failed to open checkpointed copy of the file.");
825 JTRACE("Copying saved checkpointed file to original location")
826 (savedFilePath) (_path);
827 writeFileFromFd(srcFd, fd);
828 _real_close(srcFd);
829 _real_close(fd);
830 }
831 tempfd = openFile();
832 }
833 Util::dupFds(tempfd, _fds);
834 }
835
836 bool FileConnection::checkDup(int fd)
837 {
838 bool retVal = false;
839
840 int myfd = _fds[0];
841 if ( lseek(myfd, 0, SEEK_CUR) == lseek(fd, 0, SEEK_CUR) ) {
842 off_t newOffset = lseek (myfd, 1, SEEK_CUR);
843 JASSERT (newOffset != -1) (JASSERT_ERRNO) .Text("lseek failed");
844
845 if ( newOffset == lseek (fd, 0, SEEK_CUR) ) {
846 retVal = true;
847 }
848 // Now restore the old offset
849 JASSERT (-1 != lseek (myfd, -1, SEEK_CUR)) .Text("lseek failed");
850 }
851 return retVal;
852 }
853
854 int FileConnection::openFile()
855 {
856 JASSERT(jalib::Filesystem::FileExists(_path)) (_path)
857 .Text("File not present");
858
859 int fd = _real_open(_path.c_str(), _fcntlFlags);
860 JASSERT(fd != -1) (_path) (JASSERT_ERRNO) .Text("open() failed");
861
862 JTRACE("open(_path.c_str(), _fcntlFlags)") (fd) (_path.c_str()) (_fcntlFlags);
863 return fd;
864 }
865
866 static bool areFilesEqual(int fd, int savedFd, size_t size)
867 {
868 long page_size = sysconf(_SC_PAGESIZE);
869 const size_t bufSize = 1024 * page_size;
870 char *buf1 =(char*)JALLOC_HELPER_MALLOC(bufSize);
871 char *buf2 =(char*)JALLOC_HELPER_MALLOC(bufSize);
872
873 off_t offset1 = _real_lseek(fd, 0, SEEK_CUR);
874 off_t offset2 = _real_lseek(savedFd, 0, SEEK_CUR);
875 JASSERT(_real_lseek(fd, 0, SEEK_SET) == 0) (fd) (JASSERT_ERRNO);
876 JASSERT(_real_lseek(savedFd, 0, SEEK_SET) == 0) (savedFd) (JASSERT_ERRNO);
877
878 int readBytes;
879 while (size > 0) {
880 readBytes = Util::readAll(savedFd, buf1, MIN(bufSize, size));
881 JASSERT(readBytes != -1) (JASSERT_ERRNO) .Text("Read Failed");
882 if (readBytes == 0) break;
883 JASSERT(Util::readAll(fd, buf2, readBytes) == readBytes);
884 if (memcmp(buf1, buf2, readBytes) != 0) {
885 break;
886 }
887 size -= readBytes;
888 }
889 JALLOC_HELPER_FREE(buf1);
890 JALLOC_HELPER_FREE(buf2);
891 JASSERT(_real_lseek(fd, offset1, SEEK_SET) != -1);
892 JASSERT(_real_lseek(savedFd, offset2, SEEK_SET) != -1);
893 return size == 0;
894 }
895
896 static void writeFileFromFd(int fd, int destFd)
897 {
898 long page_size = sysconf(_SC_PAGESIZE);
899 const size_t bufSize = 1024 * page_size;
900 char *buf =(char*)JALLOC_HELPER_MALLOC(bufSize);
901
902 // Synchronize memory buffer with data in filesystem
903 // On some Linux kernels, the shared-memory test will fail without this.
904 fsync(fd);
905
906 off_t offset = _real_lseek(fd, 0, SEEK_CUR);
907 JASSERT(_real_lseek(fd, 0, SEEK_SET) == 0) (fd) (JASSERT_ERRNO);
908 JASSERT(_real_lseek(destFd, 0, SEEK_SET) == 0) (destFd) (JASSERT_ERRNO);
909
910 int readBytes, writtenBytes;
911 while (1) {
912 readBytes = Util::readAll(fd, buf, bufSize);
913 JASSERT(readBytes != -1) (JASSERT_ERRNO) .Text("Read Failed");
914 if (readBytes == 0) break;
915 writtenBytes = Util::writeAll(destFd, buf, readBytes);
916 JASSERT(writtenBytes != -1) (JASSERT_ERRNO) .Text("Write failed.");
917 }
918 JALLOC_HELPER_FREE(buf);
919 JASSERT(_real_lseek(fd, offset, SEEK_SET) != -1);
920 }
921
922 string FileConnection::getSavedFilePath(const string& path)
923 {
924 ostringstream os;
925 os << dmtcp_get_ckpt_files_subdir()
926 << "/" << jalib::Filesystem::BaseName(_path) << "_" << _id.conId();
927
928 return os.str();
929 }
930
931 void FileConnection::serializeSubClass(jalib::JBinarySerializer& o)
932 {
933 JSERIALIZE_ASSERT_POINT("FileConnection");
934 o & _path & _rel_path;
935 o & _offset & _st_dev & _st_ino & _st_size & _ckpted_file & _rmtype;
936 JTRACE("Serializing FileConn.") (_path) (_rel_path)
937 (dmtcp_get_ckpt_files_subdir()) (_ckpted_file) (_fcntlFlags);
938 }
939
940 /*****************************************************************************
941 * FIFO Connection
942 *****************************************************************************/
943
944 void FifoConnection::drain()
945 {
946 struct stat st;
947 JASSERT(_fds.size() > 0);
948
949 stat(_path.c_str(),&st);
950 JTRACE("Checkpoint fifo.") (_fds[0]);
951 _mode = st.st_mode;
952
953 int new_flags =(_fcntlFlags & (~(O_RDONLY|O_WRONLY))) | O_RDWR | O_NONBLOCK;
954 ckptfd = _real_open(_path.c_str(),new_flags);
955 JASSERT(ckptfd >= 0) (ckptfd) (JASSERT_ERRNO);
956
957 _in_data.clear();
958 size_t bufsize = 256;
959 char buf[bufsize];
960 int size;
961
962 while (1) { // flush fifo
963 size = read(ckptfd,buf,bufsize);
964 if (size < 0) {
965 break; // nothing to flush
966 }
967 for (int i=0;i<size;i++) {
968 _in_data.push_back(buf[i]);
969 }
970 }
971 close(ckptfd);
972 JTRACE("Checkpointing fifo: end.") (_fds[0]) (_in_data.size());
973 }
974
975 void FifoConnection::refill(bool isRestart)
976 {
977 int new_flags =(_fcntlFlags &(~(O_RDONLY|O_WRONLY))) | O_RDWR | O_NONBLOCK;
978 ckptfd = _real_open(_path.c_str(),new_flags);
979 JASSERT(ckptfd >= 0) (ckptfd) (JASSERT_ERRNO);
980
981 size_t bufsize = 256;
982 char buf[bufsize];
983 size_t j;
984 ssize_t ret;
985 for (size_t i=0;i<(_in_data.size()/bufsize);i++) { // refill fifo
986 for (j=0; j<bufsize; j++) {
987 buf[j] = _in_data[j+i*bufsize];
988 }
989 ret = Util::writeAll(ckptfd,buf,j);
990 JASSERT(ret ==(ssize_t)j) (JASSERT_ERRNO) (ret) (j) (_fds[0]) (i);
991 }
992 int start =(_in_data.size()/bufsize)*bufsize;
993 for (j=0; j<_in_data.size()%bufsize; j++) {
994 buf[j] = _in_data[start+j];
995 }
996 errno=0;
997 buf[j] ='\0';
998 JTRACE("Buf internals.") ((const char*)buf);
999 ret = Util::writeAll(ckptfd,buf,j);
1000 JASSERT(ret ==(ssize_t)j) (JASSERT_ERRNO) (ret) (j) (_fds[0]);
1001
1002 close(ckptfd);
1003 // unlock fifo
1004 flock(_fds[0],LOCK_UN);
1005 JTRACE("End checkpointing fifo.") (_fds[0]);
1006 }
1007
1008 void FifoConnection::refreshPath()
1009 {
1010 string cwd = jalib::Filesystem::GetCWD();
1011 if (_rel_path != "*") { // file path is relative to executable current dir
1012 string oldPath = _path;
1013 ostringstream fullPath;
1014 fullPath << cwd << "/" << _rel_path;
1015 if (jalib::Filesystem::FileExists(fullPath.str())) {
1016 _path = fullPath.str();
1017 JTRACE("Change _path based on relative path") (oldPath) (_path);
1018 }
1019 }
1020 }
1021
1022 void FifoConnection::postRestart()
1023 {
1024 JASSERT(_fds.size() > 0);
1025 JTRACE("Restoring Fifo Connection") (id()) (_path);
1026 refreshPath();
1027 int tempfd = openFile();
1028 Util::dupFds(tempfd, _fds);
1029 refreshPath();
1030 }
1031
1032 int FifoConnection::openFile()
1033 {
1034 int fd;
1035
1036 if (!jalib::Filesystem::FileExists(_path)) {
1037 JTRACE("Fifo file not present, creating new one") (_path);
1038 jalib::string dir = jalib::Filesystem::DirName(_path);
1039 JTRACE("fifo dir:")(dir);
1040 jalib::Filesystem::mkdir_r(dir, 0755);
1041 mkfifo(_path.c_str(), _mode);
1042 JTRACE("mkfifo") (_path.c_str()) (errno);
1043 }
1044
1045 fd = _real_open(_path.c_str(), O_RDWR | O_NONBLOCK);
1046 JTRACE("Is opened") (_path.c_str()) (fd);
1047
1048 JASSERT(fd != -1) (_path) (JASSERT_ERRNO);
1049 return fd;
1050 }
1051
1052 void FifoConnection::serializeSubClass(jalib::JBinarySerializer& o)
1053 {
1054 JSERIALIZE_ASSERT_POINT("FifoConnection");
1055 o & _path & _rel_path & _savedRelativePath & _mode & _in_data;
1056 JTRACE("Serializing FifoConn.") (_path) (_rel_path) (_savedRelativePath);
1057 }
1058
1059 /*****************************************************************************
1060 * Stdio Connection
1061 *****************************************************************************/
1062
1063 void StdioConnection::postRestart()
1064 {
1065 for (size_t i=0; i<_fds.size(); ++i) {
1066 int fd = _fds[i];
1067 if (fd <= 2) {
1068 JTRACE("Skipping restore of STDIO, just inherit from parent") (fd);
1069 continue;
1070 }
1071 int oldFd = -1;
1072 switch (_type) {
1073 case STDIO_IN:
1074 JTRACE("Restoring STDIN") (fd);
1075 oldFd=0;
1076 break;
1077 case STDIO_OUT:
1078 JTRACE("Restoring STDOUT") (fd);
1079 oldFd=1;
1080 break;
1081 case STDIO_ERR:
1082 JTRACE("Restoring STDERR") (fd);
1083 oldFd=2;
1084 break;
1085 default:
1086 JASSERT(false);
1087 }
1088 errno = 0;
1089 JWARNING(_real_dup2(oldFd, fd) == fd) (oldFd) (fd) (JASSERT_ERRNO);
1090 }
1091 }
1092
1093 /*****************************************************************************
1094 * POSIX Message Queue Connection
1095 *****************************************************************************/
1096
1097 void PosixMQConnection::on_mq_close()
1098 {
1099 }
1100
1101 void PosixMQConnection::on_mq_notify(const struct sigevent *sevp)
1102 {
1103 if (sevp == NULL && _notifyReg) {
1104 _notifyReg = false;
1105 } else {
1106 _notifyReg = true;
1107 _sevp = *sevp;
1108 }
1109 }
1110
1111 void PosixMQConnection::drain()
1112 {
1113 JASSERT(_fds.size() > 0);
1114
1115 JTRACE("Checkpoint Posix Message Queue.") (_fds[0]);
1116
1117 struct stat statbuf;
1118 JASSERT(fstat(_fds[0], &statbuf) != -1) (JASSERT_ERRNO);
1119 if (_mode == 0) {
1120 _mode = statbuf.st_mode;
1121 }
1122
1123 struct mq_attr attr;
1124 JASSERT(mq_getattr(_fds[0], &attr) != -1) (JASSERT_ERRNO);
1125 _attr = attr;
1126 if (attr.mq_curmsgs < 0) {
1127 return;
1128 }
1129
1130 int fd = _real_mq_open(_name.c_str(), O_RDWR, 0, NULL);
1131 JASSERT(fd != -1) (_name) (JASSERT_ERRNO);
1132
1133 _qnum = attr.mq_curmsgs;
1134 char *buf =(char*) JALLOC_HELPER_MALLOC(attr.mq_msgsize);
1135 for (long i = 0; i < _qnum; i++) {
1136 unsigned prio;
1137 ssize_t numBytes = _real_mq_receive(_fds[0], buf, attr.mq_msgsize, &prio);
1138 JASSERT(numBytes != -1) (JASSERT_ERRNO);
1139 _msgInQueue.push_back(jalib::JBuffer((const char*)buf, numBytes));
1140 _msgInQueuePrio.push_back(prio);
1141 }
1142 JALLOC_HELPER_FREE(buf);
1143 _real_mq_close(fd);
1144 }
1145
1146 void PosixMQConnection::refill(bool isRestart)
1147 {
1148 for (long i = 0; i < _qnum; i++) {
1149 JASSERT(_real_mq_send(_fds[0], _msgInQueue[i].buffer(),
1150 _msgInQueue[i].size(), _msgInQueuePrio[i]) != -1);
1151 }
1152 _msgInQueue.clear();
1153 _msgInQueuePrio.clear();
1154 }
1155
1156 void PosixMQConnection::postRestart()
1157 {
1158 JASSERT(_fds.size() > 0);
1159
1160 errno = 0;
1161 if (_oflag & O_EXCL) {
1162 mq_unlink(_name.c_str());
1163 }
1164
1165 int tempfd = _real_mq_open(_name.c_str(), _oflag, _mode, &_attr);
1166 JASSERT(tempfd != -1) (JASSERT_ERRNO);
1167 Util::dupFds(tempfd, _fds);
1168 }
1169
1170 void PosixMQConnection::serializeSubClass(jalib::JBinarySerializer& o)
1171 {
1172 JSERIALIZE_ASSERT_POINT("PosixMQConnection");
1173 o & _name & _oflag & _mode & _attr;
1174 }