root/plugin/ipc/file/fileconnection.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. _isVimApp
  2. _isBlacklistedFile
  3. ptmxTestPacketMode
  4. readyToRead
  5. readOnePacket
  6. ptmxReadAll
  7. writeOnePacket
  8. ptmxWriteAll
  9. _preExistingCTTY
  10. drain
  11. preRefill
  12. refill
  13. postRestart
  14. serializeSubClass
  15. doLocking
  16. calculateRelativePath
  17. drain
  18. preCkpt
  19. refill
  20. resume
  21. refreshPath
  22. postRestart
  23. checkDup
  24. openFile
  25. areFilesEqual
  26. writeFileFromFd
  27. getSavedFilePath
  28. serializeSubClass
  29. drain
  30. refill
  31. refreshPath
  32. postRestart
  33. openFile
  34. serializeSubClass
  35. postRestart
  36. on_mq_close
  37. on_mq_notify
  38. drain
  39. refill
  40. postRestart
  41. serializeSubClass

   1 /****************************************************************************
   2  *   Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
   3  *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
   4  *                                                                          *
   5  *   This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src).  *
   6  *                                                                          *
   7  *  DMTCP:dmtcp/src is free software: you can redistribute it and/or        *
   8  *  modify it under the terms of the GNU Lesser General Public License as   *
   9  *  published by the Free Software Foundation, either version 3 of the      *
  10  *  License, or (at your option) any later version.                         *
  11  *                                                                          *
  12  *  DMTCP:dmtcp/src is distributed in the hope that it will be useful,      *
  13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
  14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
  15  *  GNU Lesser General Public License for more details.                     *
  16  *                                                                          *
  17  *  You should have received a copy of the GNU Lesser General Public        *
  18  *  License along with DMTCP:dmtcp/src.  If not, see                        *
  19  *  <http://www.gnu.org/licenses/>.                                         *
  20  ****************************************************************************/
  21 
  22 #include <sys/ioctl.h>
  23 #include <sys/select.h>
  24 #include <sys/un.h>
  25 #include <unistd.h>
  26 #include <fcntl.h>
  27 #include <sys/file.h>
  28 #include <termios.h>
  29 #include <iostream>
  30 #include <ios>
  31 #include <fstream>
  32 #include <linux/limits.h>
  33 #include <arpa/inet.h>
  34 
  35 #include "dmtcp.h"
  36 #include "shareddata.h"
  37 #include "util.h"
  38 #include "jsocket.h"
  39 #include "jassert.h"
  40 #include "jfilesystem.h"
  41 #include "jconvert.h"
  42 
  43 #include "fileconnection.h"
  44 #include "filewrappers.h"
  45 
  46 using namespace dmtcp;
  47 
  48 static bool ptmxTestPacketMode(int masterFd);
  49 static ssize_t ptmxReadAll(int fd, const void *origBuf, size_t maxCount);
  50 static ssize_t ptmxWriteAll(int fd, const void *buf, bool isPacketMode);
  51 static void writeFileFromFd(int fd, int destFd);
  52 static bool areFilesEqual(int fd, int destFd, size_t size);
  53 
  54 static bool _isVimApp()
  55 {
  56   static int isVimApp = -1;
  57 
  58   if (isVimApp == -1) {
  59     string progName = jalib::Filesystem::GetProgramName();
  60 
  61     if (progName == "vi" || progName == "vim" || progName == "vim-normal" ||
  62         progName == "vim.basic"  || progName == "vim.tiny" ||
  63         progName == "vim.gtk" || progName == "vim.gnome") {
  64       isVimApp = 1;
  65     } else {
  66       isVimApp = 0;
  67     }
  68   }
  69   return isVimApp;
  70 }
  71 
  72 static bool _isBlacklistedFile(string& path)
  73 {
  74   if ((Util::strStartsWith(path, "/dev/") &&
  75        !Util::strStartsWith(path, "/dev/shm/")) ||
  76       Util::strStartsWith(path, "/proc/") ||
  77       Util::strStartsWith(path, dmtcp_get_tmpdir())) {
  78     return true;
  79   }
  80   return false;
  81 }
  82 
  83 /*****************************************************************************
  84  * Pseudo-TTY Connection
  85  *****************************************************************************/
  86 
  87 static bool ptmxTestPacketMode(int masterFd)
  88 {
  89   char tmp_buf[100];
  90   int slave_fd, ioctlArg, rc;
  91   fd_set read_fds;
  92   struct timeval zeroTimeout = {0, 0}; /* Zero: will use to poll, not wait.*/
  93 
  94   _real_ptsname_r(masterFd, tmp_buf, 100);
  95   /* permissions not used, but _real_open requires third arg */
  96   slave_fd = _real_open(tmp_buf, O_RDWR, 0666);
  97 
  98   /* A. Drain master before testing.
  99      Ideally, DMTCP has already drained it and preserved any information
 100      about exceptional conditions in command byte, but maybe we accidentally
 101      caused a new command byte in packet mode. */
 102   /* Note:  if terminal was in packet mode, and the next read would be
 103      a non-data command byte, then there's no easy way for now to detect and
 104      restore this. ?? */
 105   /* Note:  if there was no data to flush, there might be no command byte,
 106      even in packet mode. */
 107   tcflush(slave_fd, TCIOFLUSH);
 108   /* If character already transmitted(usual case for pty), then this flush
 109      will tell master to flush it. */
 110   tcflush(masterFd, TCIFLUSH);
 111 
 112   /* B. Now verify that read_fds has no more characters to read. */
 113   ioctlArg = 1;
 114   ioctl(masterFd, TIOCINQ, &ioctlArg);
 115   /* Now check if there's a command byte still to read. */
 116   FD_ZERO(&read_fds);
 117   FD_SET(masterFd, &read_fds);
 118   select(masterFd + 1, &read_fds, NULL, NULL, &zeroTimeout);
 119   if (FD_ISSET(masterFd, &read_fds)) {
 120     // Clean up someone else's command byte from packet mode.
 121     // FIXME:  We should restore this on resume/restart.
 122     rc = read(masterFd, tmp_buf, 100);
 123     JASSERT(rc == 1) (rc) (masterFd);
 124   }
 125 
 126   /* C. Now we're ready to do the real test.  If in packet mode, we should
 127      see command byte of TIOCPKT_DATA(0) with data. */
 128   tmp_buf[0] = 'x'; /* Don't set '\n'.  Could be converted to "\r\n". */
 129   /* Give the masterFd something to read. */
 130   JWARNING((rc = write(slave_fd, tmp_buf, 1)) == 1) (rc) .Text("write failed");
 131   //tcdrain(slave_fd);
 132   _real_close(slave_fd);
 133 
 134   /* Read the 'x':  If we also see a command byte, it's packet mode */
 135   rc = read(masterFd, tmp_buf, 100);
 136 
 137   /* D. Check if command byte packet exists, and chars rec'd is longer by 1. */
 138   return(rc == 2 && tmp_buf[0] == TIOCPKT_DATA && tmp_buf[1] == 'x');
 139 }
 140 
 141 // Also record the count read on each iteration, in case it's packet mode
 142 static bool readyToRead(int fd)
 143 {
 144   fd_set read_fds;
 145   struct timeval zeroTimeout = {0, 0}; /* Zero: will use to poll, not wait.*/
 146   FD_ZERO(&read_fds);
 147   FD_SET(fd, &read_fds);
 148   select(fd + 1, &read_fds, NULL, NULL, &zeroTimeout);
 149   return FD_ISSET(fd, &read_fds);
 150 }
 151 
 152 // returns 0 if not ready to read; else returns -1, or size read incl. header
 153 static ssize_t readOnePacket(int fd, const void *buf, size_t maxCount)
 154 {
 155   typedef int hdr;
 156   ssize_t rc = 0;
 157   // Read single packet:  rc > 0 will be true for at most one iteration.
 158   while (readyToRead(fd) && rc <= 0) {
 159     rc = read(fd,(char *)buf+sizeof(hdr), maxCount-sizeof(hdr));
 160     *(hdr *)buf = rc; // Record the number read in header
 161     if (rc >=(ssize_t) (maxCount-sizeof(hdr))) {
 162       rc = -1; errno = E2BIG; // Invoke new errno for buf size not large enough
 163     }
 164     if (rc == -1 && errno != EAGAIN && errno != EINTR)
 165       break;  /* Give up; bad error */
 166   }
 167   return(rc <= 0 ? rc : rc+sizeof(hdr));
 168 }
 169 
 170 // rc < 0 => error; rc == sizeof(hdr) => no data to read;
 171 // rc > 0 => saved w/ count hdr
 172 static ssize_t ptmxReadAll(int fd, const void *origBuf, size_t maxCount)
 173 {
 174   typedef int hdr;
 175   char *buf =(char *)origBuf;
 176   int rc;
 177   while ((rc = readOnePacket(fd, buf, maxCount)) > 0) {
 178     buf += rc;
 179   }
 180   *(hdr *)buf = 0; /* Header count of zero means we're done */
 181   buf += sizeof(hdr);
 182   JASSERT(rc < 0 || buf -(char *)origBuf > 0) (rc) (origBuf) ((void *)buf);
 183   return(rc < 0 ? rc : buf -(char *)origBuf);
 184 }
 185 
 186 // The hdr contains the size of the full buffer([hdr, data]).
 187 // Return size of origBuf written:  includes packets of form:  [hdr, data]
 188 //   with hdr holding size of data.  Last hdr has value zero.
 189 // Also record the count written on each iteration, in case it's packet mode.
 190 static ssize_t writeOnePacket(int fd, const void *origBuf, bool isPacketMode)
 191 {
 192   typedef int hdr;
 193   int count = *(hdr *)origBuf;
 194   int cum_count = 0;
 195   int rc = 0; // Trigger JASSERT if not modified below.
 196   if (count == 0)
 197     return sizeof(hdr);  // count of zero means we're done, hdr consumed
 198   // FIXME:  It would be nice to restore packet mode(flow control, etc.)
 199   //         For now, we ignore it.
 200   if (count == 1 && isPacketMode)
 201     return sizeof(hdr) + 1;
 202   while (cum_count < count) {
 203     rc = write(fd,(char *)origBuf+sizeof(hdr)+cum_count, count-cum_count);
 204     if (rc == -1 && errno != EAGAIN && errno != EINTR)
 205       break;  /* Give up; bad error */
 206     if (rc >= 0)
 207       cum_count += rc;
 208   }
 209   JASSERT(rc != 0 && cum_count == count)
 210     (JASSERT_ERRNO) (rc) (count) (cum_count);
 211   return(rc < 0 ? rc : cum_count+sizeof(hdr));
 212 }
 213 
 214 static ssize_t ptmxWriteAll(int fd, const void *buf, bool isPacketMode)
 215 {
 216   typedef int hdr;
 217   ssize_t cum_count = 0;
 218   ssize_t rc;
 219   while ((rc = writeOnePacket(fd,(char *)buf+cum_count, isPacketMode))
 220          >(ssize_t)sizeof(hdr)) {
 221     cum_count += rc;
 222   }
 223   JASSERT(rc < 0 || rc == sizeof(hdr)) (rc) (cum_count);
 224   cum_count += sizeof(hdr);  /* Account for last packet: 'done' hdr w/ 0 data */
 225   return(rc <= 0 ? rc : cum_count);
 226 }
 227 
 228 PtyConnection::PtyConnection(int fd, const char *path,
 229                                     int flags, mode_t mode, int type)
 230   : Connection (PTY)
 231   , _flags(flags)
 232   , _mode(mode)
 233   , _preExistingCTTY(false)
 234 {
 235   char buf[PTS_PATH_MAX];
 236   _type = type;
 237   switch (_type) {
 238 
 239     case PTY_DEV_TTY:
 240       _ptsName = path;
 241       break;
 242 
 243     case PTY_PARENT_CTTY:
 244     case PTY_CTTY:
 245       _ptsName = path;
 246       SharedData::getVirtPtyName(path, buf, sizeof(buf));
 247       if (strlen(buf) == 0) {
 248         SharedData::createVirtualPtyName(path, buf, sizeof(buf));
 249       }
 250       _virtPtsName = buf;
 251       JTRACE("creating CTTY connection") (_ptsName) (_virtPtsName);
 252 
 253       break;
 254 
 255     case PTY_MASTER:
 256       _masterName = path;
 257       JASSERT(_real_ptsname_r(fd, buf, sizeof(buf)) == 0) (JASSERT_ERRNO);
 258       _ptsName = buf;
 259 
 260       // glibc allows only 20 char long buf
 261       // Check if there is enough room to insert the string "dmtcp_" before the
 262       //   terminal number, if not then we ASSERT here.
 263       JASSERT((strlen(buf) + strlen("v")) <= 20)
 264         .Text("string /dev/pts/<n> too long, can not be virtualized."
 265               "Once possible workarong here is to replace the string"
 266               "\"dmtcp_\" with something short like \"d_\" or even "
 267               "\"d\" and recompile DMTCP");
 268 
 269       // Generate new Unique buf
 270       SharedData::createVirtualPtyName(_ptsName.c_str(), buf, sizeof(buf));
 271       _virtPtsName = buf;
 272       JTRACE("creating ptmx connection") (_ptsName) (_virtPtsName);
 273       break;
 274 
 275     case PTY_SLAVE:
 276       _ptsName = path;
 277       SharedData::getVirtPtyName(path, buf, sizeof(buf));
 278       _virtPtsName = buf;
 279       JASSERT(strlen(buf) != 0) (path);
 280       JTRACE("creating pts connection") (_ptsName) (_virtPtsName);
 281       break;
 282 
 283     case PTY_BSD_MASTER:
 284       _masterName = path;
 285       break;
 286 
 287     case PTY_BSD_SLAVE:
 288       _ptsName = path;
 289       break;
 290 
 291     default:
 292       break;
 293   }
 294 }
 295 
 296 void PtyConnection::drain()
 297 {
 298   if (_type == PTY_MASTER) {
 299     const int maxCount = 10000;
 300     char buf[maxCount];
 301     int numRead, numWritten;
 302     // _fds[0] is master fd
 303     numRead = ptmxReadAll(_fds[0], buf, maxCount);
 304     _ptmxIsPacketMode = ptmxTestPacketMode(_fds[0]);
 305     JTRACE("_fds[0] is master(/dev/ptmx)") (_fds[0]) (_ptmxIsPacketMode);
 306     numWritten = ptmxWriteAll(_fds[0], buf, _ptmxIsPacketMode);
 307     JASSERT(numRead == numWritten) (numRead) (numWritten);
 308   }
 309   JASSERT((_type == PTY_CTTY || _type == PTY_PARENT_CTTY) || _flags != -1);
 310   if (tcgetpgrp(_fds[0]) != -1) {
 311     _isControllingTTY = true;
 312   } else {
 313     _isControllingTTY = false;
 314   }
 315 }
 316 
 317 void PtyConnection::preRefill(bool isRestart)
 318 {
 319   if (!isRestart) {
 320     return;
 321   }
 322 
 323   if (_type == PTY_SLAVE || _type == PTY_BSD_SLAVE) {
 324     JASSERT(_ptsName.compare("?") != 0);
 325     JTRACE("Restoring PTY slave") (_fds[0]) (_ptsName) (_virtPtsName);
 326     if (_type == PTY_SLAVE) {
 327       char buf[32];
 328       SharedData::getRealPtyName(_virtPtsName.c_str(), buf, sizeof(buf));
 329       JASSERT(strlen(buf) > 0) (_virtPtsName) (_ptsName);
 330       _ptsName = buf;
 331     }
 332 
 333     /* If the process has called setsid(), and has no controlling terminal, a
 334      * call to open() will set this terminal as its controlling. To avoid this,
 335      * we pass O_NOCTTY flag if this terminal was not the controlling terminal
 336      * during checkpoint phase.
 337      */
 338     int extraFlags = 0;//_isControllingTTY ? 0 : O_NOCTTY;
 339     int tempfd = _real_open(_ptsName.c_str(), _flags | extraFlags);
 340     JASSERT(tempfd >= 0) (_virtPtsName) (_ptsName) (JASSERT_ERRNO)
 341       .Text("Error Opening PTS");
 342 
 343     JTRACE("Restoring PTS real") (_ptsName) (_virtPtsName) (_fds[0]);
 344     Util::dupFds(tempfd, _fds);
 345   }
 346 }
 347 
 348 void PtyConnection::refill(bool isRestart)
 349 {
 350   if (!isRestart) {
 351     return;
 352   }
 353   if (_type ==  PTY_DEV_TTY) {
 354     /* If a process has called setsid(), it is possible that there is no
 355      * controlling terminal while in the postRestart phase as processes are
 356      * still creating the pseudo-tty master-slave pairs. By the time we get
 357      * into the refill mode, we should have all the pseudo-ttys present.
 358      */
 359     int tempfd = _real_open("/dev/tty", O_RDWR, 0);
 360     JASSERT(tempfd >= 0) (tempfd) (JASSERT_ERRNO)
 361       .Text("Error opening controlling terminal /dev/tty");
 362 
 363     JTRACE("Restoring /dev/tty for the process") (_fds[0]);
 364     _ptsName = _virtPtsName = "/dev/tty";
 365     Util::dupFds(tempfd, _fds);
 366   }
 367 }
 368 
 369 void PtyConnection::postRestart()
 370 {
 371   JASSERT(_fds.size() > 0);
 372   if (_type == PTY_SLAVE || _type == PTY_BSD_SLAVE || _type == PTY_DEV_TTY) {
 373     return;
 374   }
 375 
 376   int tempfd = -1;
 377   int extraFlags = _isControllingTTY ? 0 : O_NOCTTY;
 378 
 379   switch (_type) {
 380     case PTY_INVALID:
 381       //tempfd = _real_open("/dev/null", O_RDWR);
 382       JTRACE("Restoring invalid PTY.") (id());
 383       return;
 384 
 385     case PTY_CTTY:
 386     case PTY_PARENT_CTTY:
 387       {
 388         string controllingTty;
 389         string stdinDeviceName;
 390         if (_type == PTY_CTTY) {
 391           controllingTty = jalib::Filesystem::GetControllingTerm();
 392         } else {
 393           controllingTty = jalib::Filesystem::GetControllingTerm(getppid());
 394         }
 395         stdinDeviceName = (jalib::Filesystem::GetDeviceName(STDIN_FILENO));
 396         if (controllingTty.length() > 0 &&
 397             _real_access(controllingTty.c_str(), R_OK | W_OK) == 0) {
 398           tempfd = _real_open(controllingTty.c_str(), _fcntlFlags);
 399           JASSERT(tempfd >= 0) (tempfd) (controllingTty) (JASSERT_ERRNO)
 400             .Text("Error Opening the terminal attached with the process");
 401         } else {
 402           if (_type == PTY_CTTY) {
 403             JTRACE("Unable to restore controlling terminal attached with the "
 404                    "parent process.\n"
 405                    "Replacing it with current STDIN")
 406               (stdinDeviceName);
 407           } else {
 408             JWARNING(false) (stdinDeviceName)
 409               .Text("Unable to restore controlling terminal attached with the "
 410                     "parent process.\n"
 411                     "Replacing it with current STDIN");
 412           }
 413           JWARNING(Util::strStartsWith(stdinDeviceName, "/dev/pts/") ||
 414                    stdinDeviceName == "/dev/tty") (stdinDeviceName)
 415             .Text("Controlling terminal not bound to a terminal device.");
 416 
 417           if (Util::isValidFd(STDIN_FILENO)) {
 418             tempfd = _real_dup(STDIN_FILENO);
 419           } else if (Util::isValidFd(STDOUT_FILENO)) {
 420             tempfd = _real_dup(STDOUT_FILENO);
 421           } else {
 422             JASSERT("Controlling terminal and STDIN/OUT not found.");
 423           }
 424         }
 425 
 426         JTRACE("Restoring parent CTTY for the process")
 427           (controllingTty) (_fds[0]);
 428 
 429         _ptsName = controllingTty;
 430         SharedData::insertPtyNameMap(_virtPtsName.c_str(), _ptsName.c_str());
 431         break;
 432       }
 433 
 434     case PTY_MASTER:
 435       {
 436         char pts_name[80];
 437 
 438         tempfd = _real_open("/dev/ptmx", _flags | extraFlags);
 439         JASSERT(tempfd >= 0) (tempfd) (JASSERT_ERRNO)
 440           .Text("Error Opening /dev/ptmx");
 441 
 442         JASSERT(grantpt(tempfd) >= 0) (tempfd) (JASSERT_ERRNO);
 443         JASSERT(unlockpt(tempfd) >= 0) (tempfd) (JASSERT_ERRNO);
 444         JASSERT(_real_ptsname_r(tempfd, pts_name, 80) == 0)
 445           (tempfd) (JASSERT_ERRNO);
 446 
 447         _ptsName = pts_name;
 448         SharedData::insertPtyNameMap(_virtPtsName.c_str(), _ptsName.c_str());
 449 
 450         if (_type == PTY_MASTER) {
 451           int packetMode = _ptmxIsPacketMode;
 452           ioctl(_fds[0], TIOCPKT, &packetMode); /* Restore old packet mode */
 453         }
 454 
 455         JTRACE("Restoring /dev/ptmx") (_fds[0]) (_ptsName) (_virtPtsName);
 456         break;
 457       }
 458     case PTY_BSD_MASTER:
 459       {
 460         JTRACE("Restoring BSD Master Pty") (_masterName) (_fds[0]);
 461         //string slaveDeviceName =
 462           //_masterName.replace(0, strlen("/dev/pty"), "/dev/tty");
 463 
 464         tempfd = _real_open(_masterName.c_str(), _flags | extraFlags);
 465 
 466         // FIXME: If unable to open the original BSD Master Pty, we should try to
 467         // open another one until we succeed and then open slave device
 468         // accordingly.
 469         // This can be done by creating a function openBSDMaster, which will try
 470         // to open the original master device, but if unable to do so, it would
 471         // keep on trying all the possible BSD Master devices until one is
 472         // opened. It should then create a mapping between original Master/Slave
 473         // device name and current Master/Slave device name.
 474         JASSERT(tempfd >= 0) (tempfd) (JASSERT_ERRNO)
 475           .Text("Error Opening BSD Master Pty.(Already in use?)");
 476         break;
 477       }
 478     default:
 479       {
 480         // should never reach here
 481         JASSERT(false) .Text("Should never reach here.");
 482       }
 483   }
 484   Util::dupFds(tempfd, _fds);
 485 }
 486 
 487 void PtyConnection::serializeSubClass(jalib::JBinarySerializer& o)
 488 {
 489   JSERIALIZE_ASSERT_POINT("PtyConnection");
 490   o & _ptsName & _virtPtsName & _masterName & _type;
 491   o & _flags & _mode & _preExistingCTTY;
 492   JTRACE("Serializing PtyConn.") (_ptsName) (_virtPtsName);
 493 }
 494 
 495 /*****************************************************************************
 496  * File Connection
 497  *****************************************************************************/
 498 
 499 // Upper limit on filesize for files that are automatically chosen for ckpt.
 500 // Default 100MB
 501 #define MAX_FILESIZE_TO_AUTOCKPT (100 * 1024 * 1024)
 502 
 503 void FileConnection::doLocking()
 504 {
 505   if (Util::strStartsWith(_path, "/proc/")) {
 506     int index = 6;
 507     char *rest;
 508     pid_t proc_pid = strtol(&_path[index], &rest, 0);
 509     if (proc_pid > 0 && *rest == '/') {
 510       _type = FILE_PROCFS;
 511       if (proc_pid != getpid()) {
 512         return;
 513       }
 514     }
 515   }
 516   Connection::doLocking();
 517   _ckpted_file = false;
 518 }
 519 
 520 void FileConnection::calculateRelativePath()
 521 {
 522   string cwd = jalib::Filesystem::GetCWD();
 523   if (_path.compare(0, cwd.length(), cwd) == 0) {
 524     /* CWD = "/A/B", FileName = "/A/B/C/D" ==> relPath = "C/D" */
 525     _rel_path = _path.substr(cwd.length() + 1);
 526   } else {
 527     _rel_path = "*";
 528   }
 529 }
 530 
 531 void FileConnection::drain()
 532 {
 533   struct stat statbuf;
 534   JASSERT(_fds.size() > 0);
 535 
 536   _ckpted_file = false;
 537 
 538   // Read the current file descriptor offset
 539   _offset = lseek(_fds[0], 0, SEEK_CUR);
 540   fstat(_fds[0], &statbuf);
 541   _st_dev = statbuf.st_dev;
 542   _st_ino = statbuf.st_ino;
 543   _st_size = statbuf.st_size;
 544 
 545   if (_type == FILE_PROCFS) {
 546     return;
 547   }
 548 
 549   if (statbuf.st_nlink == 0) {
 550     _type = FILE_DELETED;
 551   } else if (Util::strStartsWith(jalib::Filesystem::BaseName(_path), ".nfs")) {
 552     // Files deleted on NFS have the .nfsXXXX format.
 553     _type = FILE_DELETED;
 554   } else {
 555     // Update _path to reflect the current state. The file path might be a new
 556     // one after restart and if the current process wasn't the leader, it never
 557     // had a chance to update the _path. Update it now.
 558     _path = jalib::Filesystem::GetDeviceName(_fds[0]);
 559   }
 560 
 561   calculateRelativePath();
 562 
 563   // If this file is related to supported Resource Management system
 564   // handle it specially
 565   if (_type == FILE_BATCH_QUEUE &&
 566       dmtcp_bq_should_ckpt_file &&
 567       dmtcp_bq_should_ckpt_file(_path.c_str(), &_rmtype)) {
 568     JTRACE("Pre-checkpoint Torque files") (_fds.size());
 569     for (unsigned int i=0; i< _fds.size(); i++)
 570       JTRACE("_fds[i]=") (i) (_fds[i]);
 571     _ckpted_file = true;
 572     return;
 573   }
 574 
 575   if (dmtcp_must_ckpt_file && dmtcp_must_ckpt_file(_path.c_str())) {
 576     _ckpted_file = true;
 577     return;
 578   }
 579 
 580   if (_type == FILE_DELETED && (_flags & O_WRONLY)) {
 581     return;
 582   }
 583   if (_isBlacklistedFile(_path)) {
 584     return;
 585   }
 586   if (dmtcp_should_ckpt_open_files() && statbuf.st_uid == getuid()) {
 587     _ckpted_file = true;
 588   } else if (_type == FILE_DELETED || _type == FILE_SHM) {
 589     _ckpted_file = true;
 590   } else if (_isVimApp() &&
 591              (Util::strEndsWith(_path, ".swp") == 0 ||
 592               Util::strEndsWith(_path, ".swo") == 0)) {
 593     _ckpted_file = true;
 594   } else if (Util::strStartsWith(jalib::Filesystem::GetProgramName(),
 595                                  "emacs")) {
 596     _ckpted_file = true;
 597 #if 0
 598   } else if ((_fcntlFlags &(O_WRONLY|O_RDWR)) != 0 &&
 599              _offset < _st_size &&
 600              _st_size < MAX_FILESIZE_TO_AUTOCKPT &&
 601              statbuf.st_uid == getuid()) {
 602     // FIXME: Disable the following heuristic until we can come up with
 603     //        a better one
 604     _ckpted_file = true;
 605 #endif
 606   } else {
 607     _ckpted_file = false;
 608   }
 609 }
 610 
 611 void FileConnection::preCkpt()
 612 {
 613   if (_ckpted_file) {
 614     ConnectionIdentifier id;
 615     JASSERT(_type != FILE_PROCFS && _type != FILE_INVALID);
 616     JASSERT(SharedData::getCkptLeaderForFile(_st_dev, _st_ino, &id));
 617     if (id == _id) {
 618       string savedFilePath = getSavedFilePath(_path);
 619       JASSERT(Util::createDirectoryTree(savedFilePath)) (savedFilePath)
 620         .Text("Unable to create directory in File Path");
 621 
 622       int destFd = _real_open(savedFilePath.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
 623                               S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
 624       JASSERT(destFd != -1) (JASSERT_ERRNO) (_path) (savedFilePath);
 625 
 626       JTRACE("Saving checkpointed copy of the file") (_path) (savedFilePath);
 627       if (_flags & O_WRONLY) {
 628         // If the file is opened() in write-only mode. Open it in readonly mode
 629         // to create the ckpt copy.
 630         int tmpfd = _real_open(_path.c_str(), O_RDONLY, 0);
 631         JASSERT(tmpfd != -1);
 632         writeFileFromFd(tmpfd, destFd);
 633         _real_close(tmpfd);
 634       } else {
 635         writeFileFromFd(_fds[0], destFd);
 636       }
 637       _real_close(destFd);
 638     } else {
 639       JTRACE("Not checkpointing this file") (_path);
 640       _ckpted_file = false;
 641     }
 642   }
 643 }
 644 
 645 void FileConnection::refill(bool isRestart)
 646 {
 647   struct stat statbuf;
 648   if (!isRestart) return;
 649   if (strstr(_path.c_str(), "infiniband/uverbs") ||
 650       strstr(_path.c_str(), "uverbs-event")) return;
 651 
 652   if (_ckpted_file && _fileAlreadyExists) {
 653     string savedFilePath = getSavedFilePath(_path);
 654     int savedFd = _real_open(savedFilePath.c_str(), O_RDONLY, 0);
 655     JASSERT(savedFd != -1) (JASSERT_ERRNO) (savedFilePath);
 656 
 657     if (!areFilesEqual(_fds[0], savedFd, _st_size)) {
 658       if (_type == FILE_SHM) {
 659         JWARNING(false) (_path) (savedFilePath)
 660           .Text("\n"
 661                 "***Mapping current version of file into memory;\n"
 662                 "   _not_ file as it existed at time of checkpoint.\n"
 663                 "   Change this function and re-compile, if you want "
 664                 "different behavior.");
 665       } else {
 666         const char *errMsg =
 667           "\n**** File already exist! Checkpointed copy can't be restored.\n"
 668           "       The Contents of checkpointed copy differ from the "
 669           "contents of the existing copy.\n"
 670           "****Delete the existing file and try again!";
 671         JASSERT(false) (_path) (savedFilePath) (errMsg);
 672       }
 673     }
 674     _real_close(savedFd);
 675   }
 676 
 677   if (!_ckpted_file) {
 678     int tempfd;
 679     if (_type == FILE_DELETED && (_flags & (O_WRONLY | O_RDWR))) {
 680       tempfd = _real_open(_path.c_str(), _fcntlFlags | O_CREAT, 0600);
 681       JASSERT(tempfd != -1) (_path) (JASSERT_ERRNO) .Text("open() failed");
 682       JASSERT(truncate(_path.c_str(), _st_size) ==  0)
 683         (_path.c_str()) (_st_size) (JASSERT_ERRNO);
 684     } else {
 685 
 686       JASSERT(jalib::Filesystem::FileExists(_path)) (_path)
 687         .Text("File not found.");
 688 
 689       if (stat(_path.c_str() ,&statbuf) == 0 && S_ISREG(statbuf.st_mode)) {
 690         if (statbuf.st_size > _st_size &&
 691             ((_fcntlFlags & O_WRONLY) || (_fcntlFlags & O_RDWR))) {
 692           errno = 0;
 693           JASSERT(truncate(_path.c_str(), _st_size) ==  0)
 694             (_path.c_str()) (_st_size) (JASSERT_ERRNO);
 695         } else if (statbuf.st_size < _st_size) {
 696           JWARNING(false) .Text("Size of file smaller than what we expected");
 697         }
 698       }
 699       tempfd = openFile();
 700     }
 701     Util::dupFds(tempfd, _fds);
 702   }
 703 
 704   errno = 0;
 705   if (jalib::Filesystem::FileExists(_path) &&
 706       stat(_path.c_str() ,&statbuf) == 0 && S_ISREG(statbuf.st_mode)) {
 707     if (_offset <= statbuf.st_size && _offset <= _st_size) {
 708       JASSERT(lseek(_fds[0], _offset, SEEK_SET) == _offset)
 709         (_path) (_offset) (JASSERT_ERRNO);
 710       //JTRACE("lseek(_fds[0], _offset, SEEK_SET)") (_fds[0]) (_offset);
 711     } else if (_offset > statbuf.st_size || _offset > _st_size) {
 712       JWARNING(false) (_path) (_offset) (_st_size) (statbuf.st_size)
 713         .Text("No lseek done:  offset is larger than min of old and new size.");
 714     }
 715   }
 716   refreshPath();
 717 }
 718 
 719 void FileConnection::resume(bool isRestart)
 720 {
 721   if (_ckpted_file && isRestart && _type == FILE_DELETED) {
 722     /* Here we want to unlink the file. We want to do it only at the time of
 723      * restart, but there is no way of finding out if we are restarting or not.
 724      * That is why we look for the file on disk and if it is present(it was
 725      * deleted at ckpt time), then we assume that we are restarting and hence
 726      * we unlink the file.
 727      */
 728     if (jalib::Filesystem::FileExists(_path)) {
 729       JWARNING(unlink(_path.c_str()) != -1) (_path)
 730         .Text("The file was unlinked at the time of checkpoint. "
 731               "Unlinking it after restart failed");
 732     }
 733   }
 734 }
 735 
 736 void FileConnection::refreshPath()
 737 {
 738   string cwd = jalib::Filesystem::GetCWD();
 739 
 740   if (_type == FILE_BATCH_QUEUE) {
 741     // get new file name
 742     string newpath = jalib::Filesystem::GetDeviceName(_fds[0]);
 743     JTRACE("This is Resource Manager file!") (_fds[0]) (newpath) (_path) (this);
 744     if (newpath != _path) {
 745       JTRACE("File Manager connection _path is changed => _path = newpath!")
 746         (_path) (newpath);
 747       _path = newpath;
 748     }
 749     return;
 750   }
 751 
 752   if (dmtcp_get_new_file_path) {
 753     char newpath[PATH_MAX];
 754     newpath[0] = '\0';
 755     dmtcp_get_new_file_path(_path.c_str(), cwd.c_str(), newpath);
 756     if (newpath[0] != '\0') {
 757       JASSERT(jalib::Filesystem::FileExists(newpath)) (_path) (newpath)
 758         .Text("Path returned by plugin does not exist.");
 759       _path = newpath;
 760       return;
 761     }
 762   }
 763   if (_rel_path != "*" && !jalib::Filesystem::FileExists(_path)) {
 764     // If file at absolute path doesn't exist and file path is relative to
 765     // executable current dir
 766     string oldPath = _path;
 767     string fullPath = cwd + "/" + _rel_path;
 768     if (jalib::Filesystem::FileExists(fullPath)) {
 769       _path = fullPath;
 770       JTRACE("Change _path based on relative path")
 771         (oldPath) (_path) (_rel_path);
 772     }
 773   } else if (_type == FILE_PROCFS) {
 774     int index = 6;
 775     char *rest;
 776     char buf[64];
 777     pid_t proc_pid = strtol(&_path[index], &rest, 0);
 778     if (proc_pid > 0 && *rest == '/') {
 779       sprintf(buf, "/proc/%d/%s", getpid(), rest);
 780       _path = buf;
 781     }
 782   }
 783 }
 784 
 785 void FileConnection::postRestart()
 786 {
 787   int tempfd;
 788 
 789   JASSERT(_fds.size() > 0);
 790   if (!_ckpted_file) return;
 791   _fileAlreadyExists = false;
 792 
 793   JTRACE("Restoring File Connection") (id()) (_path);
 794   string savedFilePath = getSavedFilePath(_path);
 795   JASSERT(jalib::Filesystem::FileExists(savedFilePath))
 796     (savedFilePath) (_path) .Text("Unable to find checkpointed copy of file");
 797 
 798   if (_type == FILE_BATCH_QUEUE) {
 799     JASSERT(dmtcp_bq_restore_file);
 800     tempfd = dmtcp_bq_restore_file(_path.c_str(), savedFilePath.c_str(),
 801                                    _fcntlFlags, _rmtype);
 802     JTRACE("Restore Resource Manager File") (_path);
 803   } else {
 804     refreshPath();
 805     JASSERT(Util::createDirectoryTree(_path)) (_path)
 806       .Text("Unable to create directory in File Path");
 807     /* Now try to create the file with O_EXCL. If we fail with EEXIST, there
 808      * are two possible scenarios:
 809      * - The file was created by a different restarting process with data from
 810      *   checkpointed copy. It is possible that the data is in flight, so we
 811      *   should wait until the next barrier to compare the data from our copy.
 812      * - The file existed before restart. After the next barrier, abort if the
 813      *   contents differ from our checkpointed copy.
 814      */
 815     int fd = _real_open(_path.c_str(), O_CREAT | O_EXCL | O_RDWR,
 816                         S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
 817     JASSERT(fd != -1 || errno == EEXIST) (_path) (JASSERT_ERRNO);
 818 
 819     if (fd == -1) {
 820       _fileAlreadyExists = true;
 821     } else {
 822       int srcFd = _real_open(savedFilePath.c_str(), O_RDONLY, 0);
 823       JASSERT(srcFd != -1) (_path) (savedFilePath) (JASSERT_ERRNO)
 824         .Text("Failed to open checkpointed copy of the file.");
 825       JTRACE("Copying saved checkpointed file to original location")
 826         (savedFilePath) (_path);
 827       writeFileFromFd(srcFd, fd);
 828       _real_close(srcFd);
 829       _real_close(fd);
 830     }
 831     tempfd = openFile();
 832   }
 833   Util::dupFds(tempfd, _fds);
 834 }
 835 
 836 bool FileConnection::checkDup(int fd)
 837 {
 838   bool retVal = false;
 839 
 840   int myfd = _fds[0];
 841   if ( lseek(myfd, 0, SEEK_CUR) == lseek(fd, 0, SEEK_CUR) ) {
 842     off_t newOffset = lseek (myfd, 1, SEEK_CUR);
 843     JASSERT (newOffset != -1) (JASSERT_ERRNO) .Text("lseek failed");
 844 
 845     if ( newOffset == lseek (fd, 0, SEEK_CUR) ) {
 846       retVal = true;
 847     }
 848     // Now restore the old offset
 849     JASSERT (-1 != lseek (myfd, -1, SEEK_CUR)) .Text("lseek failed");
 850   }
 851   return retVal;
 852 }
 853 
 854 int FileConnection::openFile()
 855 {
 856   JASSERT(jalib::Filesystem::FileExists(_path)) (_path)
 857     .Text("File not present");
 858 
 859   int fd = _real_open(_path.c_str(), _fcntlFlags);
 860   JASSERT(fd != -1) (_path) (JASSERT_ERRNO) .Text("open() failed");
 861 
 862   JTRACE("open(_path.c_str(), _fcntlFlags)") (fd) (_path.c_str()) (_fcntlFlags);
 863   return fd;
 864 }
 865 
 866 static bool areFilesEqual(int fd, int savedFd, size_t size)
 867 {
 868   long page_size = sysconf(_SC_PAGESIZE);
 869   const size_t bufSize = 1024 * page_size;
 870   char *buf1 =(char*)JALLOC_HELPER_MALLOC(bufSize);
 871   char *buf2 =(char*)JALLOC_HELPER_MALLOC(bufSize);
 872 
 873   off_t offset1 = _real_lseek(fd, 0, SEEK_CUR);
 874   off_t offset2 = _real_lseek(savedFd, 0, SEEK_CUR);
 875   JASSERT(_real_lseek(fd, 0, SEEK_SET) == 0) (fd) (JASSERT_ERRNO);
 876   JASSERT(_real_lseek(savedFd, 0, SEEK_SET) == 0) (savedFd) (JASSERT_ERRNO);
 877 
 878   int readBytes;
 879   while (size > 0) {
 880     readBytes = Util::readAll(savedFd, buf1, MIN(bufSize, size));
 881     JASSERT(readBytes != -1) (JASSERT_ERRNO) .Text("Read Failed");
 882     if (readBytes == 0) break;
 883     JASSERT(Util::readAll(fd, buf2, readBytes) == readBytes);
 884     if (memcmp(buf1, buf2, readBytes) != 0) {
 885       break;
 886     }
 887     size -= readBytes;
 888   }
 889   JALLOC_HELPER_FREE(buf1);
 890   JALLOC_HELPER_FREE(buf2);
 891   JASSERT(_real_lseek(fd, offset1, SEEK_SET) != -1);
 892   JASSERT(_real_lseek(savedFd, offset2, SEEK_SET) != -1);
 893   return size == 0;
 894 }
 895 
 896 static void writeFileFromFd(int fd, int destFd)
 897 {
 898   long page_size = sysconf(_SC_PAGESIZE);
 899   const size_t bufSize = 1024 * page_size;
 900   char *buf =(char*)JALLOC_HELPER_MALLOC(bufSize);
 901 
 902   // Synchronize memory buffer with data in filesystem
 903   // On some Linux kernels, the shared-memory test will fail without this.
 904   fsync(fd);
 905 
 906   off_t offset = _real_lseek(fd, 0, SEEK_CUR);
 907   JASSERT(_real_lseek(fd, 0, SEEK_SET) == 0) (fd) (JASSERT_ERRNO);
 908   JASSERT(_real_lseek(destFd, 0, SEEK_SET) == 0) (destFd) (JASSERT_ERRNO);
 909 
 910   int readBytes, writtenBytes;
 911   while (1) {
 912     readBytes = Util::readAll(fd, buf, bufSize);
 913     JASSERT(readBytes != -1) (JASSERT_ERRNO) .Text("Read Failed");
 914     if (readBytes == 0) break;
 915     writtenBytes = Util::writeAll(destFd, buf, readBytes);
 916     JASSERT(writtenBytes != -1) (JASSERT_ERRNO) .Text("Write failed.");
 917   }
 918   JALLOC_HELPER_FREE(buf);
 919   JASSERT(_real_lseek(fd, offset, SEEK_SET) != -1);
 920 }
 921 
 922 string FileConnection::getSavedFilePath(const string& path)
 923 {
 924   ostringstream os;
 925   os << dmtcp_get_ckpt_files_subdir()
 926     << "/" << jalib::Filesystem::BaseName(_path) << "_" << _id.conId();
 927 
 928   return os.str();
 929 }
 930 
 931 void FileConnection::serializeSubClass(jalib::JBinarySerializer& o)
 932 {
 933   JSERIALIZE_ASSERT_POINT("FileConnection");
 934   o & _path & _rel_path;
 935   o & _offset & _st_dev & _st_ino & _st_size & _ckpted_file & _rmtype;
 936   JTRACE("Serializing FileConn.") (_path) (_rel_path)
 937     (dmtcp_get_ckpt_files_subdir()) (_ckpted_file) (_fcntlFlags);
 938 }
 939 
 940 /*****************************************************************************
 941  * FIFO Connection
 942  *****************************************************************************/
 943 
 944 void FifoConnection::drain()
 945 {
 946   struct stat st;
 947   JASSERT(_fds.size() > 0);
 948 
 949   stat(_path.c_str(),&st);
 950   JTRACE("Checkpoint fifo.") (_fds[0]);
 951   _mode = st.st_mode;
 952 
 953   int new_flags =(_fcntlFlags & (~(O_RDONLY|O_WRONLY))) | O_RDWR | O_NONBLOCK;
 954   ckptfd = _real_open(_path.c_str(),new_flags);
 955   JASSERT(ckptfd >= 0) (ckptfd) (JASSERT_ERRNO);
 956 
 957   _in_data.clear();
 958   size_t bufsize = 256;
 959   char buf[bufsize];
 960   int size;
 961 
 962   while (1) { // flush fifo
 963     size = read(ckptfd,buf,bufsize);
 964     if (size < 0) {
 965       break; // nothing to flush
 966     }
 967     for (int i=0;i<size;i++) {
 968       _in_data.push_back(buf[i]);
 969     }
 970   }
 971   close(ckptfd);
 972   JTRACE("Checkpointing fifo:  end.") (_fds[0]) (_in_data.size());
 973 }
 974 
 975 void FifoConnection::refill(bool isRestart)
 976 {
 977   int new_flags =(_fcntlFlags &(~(O_RDONLY|O_WRONLY))) | O_RDWR | O_NONBLOCK;
 978   ckptfd = _real_open(_path.c_str(),new_flags);
 979   JASSERT(ckptfd >= 0) (ckptfd) (JASSERT_ERRNO);
 980 
 981   size_t bufsize = 256;
 982   char buf[bufsize];
 983   size_t j;
 984   ssize_t ret;
 985   for (size_t i=0;i<(_in_data.size()/bufsize);i++) { // refill fifo
 986     for (j=0; j<bufsize; j++) {
 987       buf[j] = _in_data[j+i*bufsize];
 988     }
 989     ret = Util::writeAll(ckptfd,buf,j);
 990     JASSERT(ret ==(ssize_t)j) (JASSERT_ERRNO) (ret) (j) (_fds[0]) (i);
 991   }
 992   int start =(_in_data.size()/bufsize)*bufsize;
 993   for (j=0; j<_in_data.size()%bufsize; j++) {
 994     buf[j] = _in_data[start+j];
 995   }
 996   errno=0;
 997   buf[j] ='\0';
 998   JTRACE("Buf internals.") ((const char*)buf);
 999   ret = Util::writeAll(ckptfd,buf,j);
1000   JASSERT(ret ==(ssize_t)j) (JASSERT_ERRNO) (ret) (j) (_fds[0]);
1001 
1002   close(ckptfd);
1003   // unlock fifo
1004   flock(_fds[0],LOCK_UN);
1005   JTRACE("End checkpointing fifo.") (_fds[0]);
1006 }
1007 
1008 void FifoConnection::refreshPath()
1009 {
1010   string cwd = jalib::Filesystem::GetCWD();
1011   if (_rel_path != "*") { // file path is relative to executable current dir
1012     string oldPath = _path;
1013     ostringstream fullPath;
1014     fullPath << cwd << "/" << _rel_path;
1015     if (jalib::Filesystem::FileExists(fullPath.str())) {
1016       _path = fullPath.str();
1017       JTRACE("Change _path based on relative path") (oldPath) (_path);
1018     }
1019   }
1020 }
1021 
1022 void FifoConnection::postRestart()
1023 {
1024   JASSERT(_fds.size() > 0);
1025   JTRACE("Restoring Fifo Connection") (id()) (_path);
1026   refreshPath();
1027   int tempfd = openFile();
1028   Util::dupFds(tempfd, _fds);
1029   refreshPath();
1030 }
1031 
1032 int FifoConnection::openFile()
1033 {
1034   int fd;
1035 
1036   if (!jalib::Filesystem::FileExists(_path)) {
1037     JTRACE("Fifo file not present, creating new one") (_path);
1038     jalib::string dir = jalib::Filesystem::DirName(_path);
1039     JTRACE("fifo dir:")(dir);
1040     jalib::Filesystem::mkdir_r(dir, 0755);
1041     mkfifo(_path.c_str(), _mode);
1042     JTRACE("mkfifo") (_path.c_str()) (errno);
1043   }
1044 
1045   fd = _real_open(_path.c_str(), O_RDWR | O_NONBLOCK);
1046   JTRACE("Is opened") (_path.c_str()) (fd);
1047 
1048   JASSERT(fd != -1) (_path) (JASSERT_ERRNO);
1049   return fd;
1050 }
1051 
1052 void FifoConnection::serializeSubClass(jalib::JBinarySerializer& o)
1053 {
1054   JSERIALIZE_ASSERT_POINT("FifoConnection");
1055   o & _path & _rel_path & _savedRelativePath & _mode & _in_data;
1056   JTRACE("Serializing FifoConn.") (_path) (_rel_path) (_savedRelativePath);
1057 }
1058 
1059 /*****************************************************************************
1060  * Stdio Connection
1061  *****************************************************************************/
1062 
1063 void StdioConnection::postRestart()
1064 {
1065   for (size_t i=0; i<_fds.size(); ++i) {
1066     int fd = _fds[i];
1067     if (fd <= 2) {
1068       JTRACE("Skipping restore of STDIO, just inherit from parent") (fd);
1069       continue;
1070     }
1071     int oldFd = -1;
1072     switch (_type) {
1073       case STDIO_IN:
1074         JTRACE("Restoring STDIN") (fd);
1075         oldFd=0;
1076         break;
1077       case STDIO_OUT:
1078         JTRACE("Restoring STDOUT") (fd);
1079         oldFd=1;
1080         break;
1081       case STDIO_ERR:
1082         JTRACE("Restoring STDERR") (fd);
1083         oldFd=2;
1084         break;
1085       default:
1086         JASSERT(false);
1087     }
1088     errno = 0;
1089     JWARNING(_real_dup2(oldFd, fd) == fd) (oldFd) (fd) (JASSERT_ERRNO);
1090   }
1091 }
1092 
1093 /*****************************************************************************
1094  * POSIX Message Queue Connection
1095  *****************************************************************************/
1096 
1097 void PosixMQConnection::on_mq_close()
1098 {
1099 }
1100 
1101 void PosixMQConnection::on_mq_notify(const struct sigevent *sevp)
1102 {
1103   if (sevp == NULL && _notifyReg) {
1104     _notifyReg = false;
1105   } else {
1106     _notifyReg = true;
1107     _sevp = *sevp;
1108   }
1109 }
1110 
1111 void PosixMQConnection::drain()
1112 {
1113   JASSERT(_fds.size() > 0);
1114 
1115   JTRACE("Checkpoint Posix Message Queue.") (_fds[0]);
1116 
1117   struct stat statbuf;
1118   JASSERT(fstat(_fds[0], &statbuf) != -1) (JASSERT_ERRNO);
1119   if (_mode == 0) {
1120     _mode = statbuf.st_mode;
1121   }
1122 
1123   struct mq_attr attr;
1124   JASSERT(mq_getattr(_fds[0], &attr) != -1) (JASSERT_ERRNO);
1125   _attr = attr;
1126   if (attr.mq_curmsgs < 0) {
1127     return;
1128   }
1129 
1130   int fd = _real_mq_open(_name.c_str(), O_RDWR, 0, NULL);
1131   JASSERT(fd != -1) (_name) (JASSERT_ERRNO);
1132 
1133   _qnum = attr.mq_curmsgs;
1134   char *buf =(char*) JALLOC_HELPER_MALLOC(attr.mq_msgsize);
1135   for (long i = 0; i < _qnum; i++) {
1136     unsigned prio;
1137     ssize_t numBytes = _real_mq_receive(_fds[0], buf, attr.mq_msgsize, &prio);
1138     JASSERT(numBytes != -1) (JASSERT_ERRNO);
1139     _msgInQueue.push_back(jalib::JBuffer((const char*)buf, numBytes));
1140     _msgInQueuePrio.push_back(prio);
1141   }
1142   JALLOC_HELPER_FREE(buf);
1143   _real_mq_close(fd);
1144 }
1145 
1146 void PosixMQConnection::refill(bool isRestart)
1147 {
1148   for (long i = 0; i < _qnum; i++) {
1149     JASSERT(_real_mq_send(_fds[0], _msgInQueue[i].buffer(),
1150                           _msgInQueue[i].size(), _msgInQueuePrio[i]) != -1);
1151   }
1152   _msgInQueue.clear();
1153   _msgInQueuePrio.clear();
1154 }
1155 
1156 void PosixMQConnection::postRestart()
1157 {
1158   JASSERT(_fds.size() > 0);
1159 
1160   errno = 0;
1161   if (_oflag & O_EXCL) {
1162     mq_unlink(_name.c_str());
1163   }
1164 
1165   int tempfd = _real_mq_open(_name.c_str(), _oflag, _mode, &_attr);
1166   JASSERT(tempfd != -1) (JASSERT_ERRNO);
1167   Util::dupFds(tempfd, _fds);
1168 }
1169 
1170 void PosixMQConnection::serializeSubClass(jalib::JBinarySerializer& o)
1171 {
1172   JSERIALIZE_ASSERT_POINT("PosixMQConnection");
1173   o & _name & _oflag & _mode & _attr;
1174 }

/* [<][>][^][v][top][bottom][index][help] */