root/ckptserializer.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. default_sigchld_handler
  2. prepare_sigchld_handler
  3. restore_sigchld_handler_and_wait_for_zombie
  4. test_use_compression
  5. open_ckpt_to_write_hbict
  6. open_ckpt_to_write_gz
  7. perform_open_ckpt_image_fd
  8. test_and_prepare_for_forked_ckpt
  9. open_ckpt_to_write
  10. createCkptDir
  11. writeCkptImage
  12. writeDmtcpHeader

   1 /****************************************************************************
   2  *   Copyright (C) 2006-2013 by Jason Ansel, Kapil Arya, and Gene Cooperman *
   3  *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
   4  *                                                                          *
   5  *  This file is part of DMTCP.                                             *
   6  *                                                                          *
   7  *  DMTCP is free software: you can redistribute it and/or                  *
   8  *  modify it under the terms of the GNU Lesser General Public License as   *
   9  *  published by the Free Software Foundation, either version 3 of the      *
  10  *  License, or (at your option) any later version.                         *
  11  *                                                                          *
  12  *  DMTCP is distributed in the hope that it will be useful,                *
  13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
  14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
  15  *  GNU Lesser General Public License for more details.                     *
  16  *                                                                          *
  17  *  You should have received a copy of the GNU Lesser General Public        *
  18  *  License along with DMTCP.  If not, see <http://www.gnu.org/licenses/>.  *
  19  ****************************************************************************/
  20 
  21 #include <stdlib.h>
  22 #include <limits.h> /* for LONG_MIN and LONG_MAX */
  23 #include <sys/types.h>
  24 #include <sys/wait.h>
  25 #include <sys/syscall.h>
  26 #ifdef __aarch64__
  27 /* On aarch64, fork() is not implemented, in favor of clone().
  28  *   A true fork call would include CLONE_CHILD_SETTID and set the thread id
  29  * in the thread area of the child (using set_thread_area).  We don't do that.
  30  */
  31 # define _real_sys_fork() _real_syscall(SYS_clone, \
  32     CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, NULL, NULL, NULL)
  33 #else
  34 # define _real_sys_fork() _real_syscall(SYS_fork)
  35 #endif
  36 #include <signal.h>
  37 #include <unistd.h>
  38 #include <fcntl.h>
  39 #include "constants.h"
  40 #include "util.h"
  41 #include "syscallwrappers.h"
  42 #include "dmtcp.h"
  43 #include "protectedfds.h"
  44 #include "ckptserializer.h"
  45 
  46 // aarch64 doesn't define SYS_pipe kernel call by default.
  47 #if defined(__aarch64__)
  48 # define _real_pipe(a) _real_syscall(SYS_pipe2, a, 0)
  49 #else
  50 # define _real_pipe(a) _real_syscall(SYS_pipe, a)
  51 #endif
  52 #define _real_waitpid(a,b,c) _real_syscall(SYS_wait4,a,b,c,NULL)
  53 
  54 using namespace dmtcp;
  55 
  56 #define FORKED_CKPT_FAILED 0
  57 #define FORKED_CKPT_PARENT 1
  58 #define FORKED_CKPT_CHILD 2
  59 
  60 static int forked_ckpt_status = -1;
  61 static pid_t ckpt_extcomp_child_pid = -1;
  62 static struct sigaction saved_sigchld_action;
  63 static int open_ckpt_to_write(int fd, int pipe_fds[2], char **extcomp_args);
  64 void mtcp_writememoryareas(int fd) __attribute__((weak));
  65 
  66 /* We handle SIGCHLD while checkpointing. */
  67 static void default_sigchld_handler(int sig) {
  68   JASSERT(sig == SIGCHLD);
  69 }
  70 
  71 static void prepare_sigchld_handler()
  72 {
  73   /* 3a. Set SIGCHLD to our own handler;
  74    *     User handling is restored after gzip finishes.
  75    * SEE: sigaction(2), NOTES: only portable method of avoiding zombies
  76    *      is to catch SIGCHLD signal and perform a wait(2) or similar.
  77    *
  78    * NOTE: Although the default action for SIGCHLD is supposedly SIG_IGN,
  79    * for historical reasons there are differences between SIG_DFL and SIG_IGN
  80    * for SIGCHLD.  See sigaction(2), NOTES section for more details. */
  81   struct sigaction default_sigchld_action;
  82   memset(&default_sigchld_action, 0, sizeof (default_sigchld_action));
  83   default_sigchld_action.sa_handler = default_sigchld_handler;
  84   sigaction(SIGCHLD, &default_sigchld_action, &saved_sigchld_action);
  85 }
  86 
  87 static void restore_sigchld_handler_and_wait_for_zombie(pid_t pid)
  88 {
  89     /* This is done to avoid calling the user SIGCHLD handler when gzip
  90      * or another compression utility exits.
  91      * NOTE: We must wait in case user did rapid ckpt-kill in succession.
  92      *  Otherwise, kernel could have optimization allowing us to close fd and
  93      *  rename tmp ckpt file to permanent even while gzip is still writing.
  94      */
  95     /* All signals, incl. SIGCHLD, were blocked in mtcp.c:save_sig_state()
  96      * when beginning the ckpt.  A SIGCHLD handler was declared before
  97      * the gzip child process was forked.  The child may have already
  98      * terminated and returned, creating a blocked SIGCHLD.
  99      *   So, we use sigsuspend so that we can clean up the child zombie
 100      * with wait4 whether it already terminated or not yet.
 101      */
 102     sigset_t suspend_sigset;
 103     sigfillset(&suspend_sigset);
 104     sigdelset(&suspend_sigset, SIGCHLD);
 105     _real_sigsuspend(&suspend_sigset);
 106     JWARNING(_real_waitpid(pid, NULL, 0) != -1) (pid) (JASSERT_ERRNO);
 107     pid = -1;
 108     sigaction(SIGCHLD, &saved_sigchld_action, NULL);
 109 }
 110 
 111 /*
 112  * This function returns the fd to which the checkpoint file should be written.
 113  * The purpose of using this function over open() is that this
 114  * function will handle compression and gzipping.
 115  */
 116 static int test_use_compression(char *compressor, char *command, char *path,
 117                                 int def)
 118 {
 119   char *default_val;
 120   char evar[256] = "DMTCP_";
 121   char *do_we_compress;
 122 
 123   if (def)
 124     default_val = const_cast<char*> ("1");
 125   else
 126     default_val = const_cast<char*> ("0");
 127 
 128   JASSERT( strlen(strncat(evar, compressor, sizeof(evar) - strlen(evar) - 1))
 129            < sizeof(evar) - 1 )
 130          (compressor) .Text("compressor is too long.");
 131   do_we_compress = getenv(evar);
 132   // env var is unset, let's default to enabled
 133   // to disable compression, run with MTCP_GZIP=0
 134   if (do_we_compress == NULL)
 135     do_we_compress = default_val;
 136 
 137   char *endptr;
 138   errno = 0;
 139   long int rc = strtol(do_we_compress, &endptr, 0);
 140   JASSERT(rc != LONG_MIN && rc != LONG_MAX) (do_we_compress) (JASSERT_ERRNO);
 141   if (*do_we_compress == '\0' || *endptr != '\0') {
 142     JWARNING(false) (evar) (do_we_compress)
 143       .Text("Compression env var not defined as a number."
 144             "  Checkpoint image will not be compressed.");
 145     do_we_compress = const_cast<char*> ("0");
 146   }
 147 
 148   if ( 0 == strcmp(do_we_compress, "0") )
 149     return 0;
 150 
 151   /* Check if the executable exists. */
 152   if (Util::findExecutable(command, getenv("PATH"), path) == NULL) {
 153     JWARNING(false) (command)
 154       .Text("Command cannot be executed. Compression will not be used.");
 155     return 0;
 156   }
 157 
 158   /* If we arrive down here, it's safe to compress. */
 159   return 1;
 160 }
 161 
 162 #ifdef HBICT_DELTACOMP
 163 static int open_ckpt_to_write_hbict(int fd, int pipe_fds[2], char *hbict_path,
 164                                     char *gzip_path)
 165 {
 166   char *hbict_args[] = {
 167     const_cast<char*> ("hbict"),
 168     const_cast<char*> ("-a"),
 169     NULL,
 170     NULL
 171   };
 172   hbict_args[0] = hbict_path;
 173   JTRACE("open_ckpt_to_write_hbict\n");
 174 
 175   if (gzip_path != NULL){
 176     hbict_args[2] = const_cast<char*> ("-z100");
 177   }
 178   return open_ckpt_to_write(fd,pipe_fds,hbict_args);
 179 }
 180 #endif
 181 
 182 static int open_ckpt_to_write_gz(int fd, int pipe_fds[2], char *gzip_path)
 183 {
 184   char *gzip_args[] = {
 185     const_cast<char*> ("gzip"),
 186     const_cast<char*> ("-1"),
 187     const_cast<char*> ("-"),
 188     NULL
 189   };
 190   gzip_args[0] = gzip_path;
 191   JTRACE("open_ckpt_to_write_gz\n");
 192 
 193   return open_ckpt_to_write(fd,pipe_fds,gzip_args);
 194 }
 195 
 196 static int perform_open_ckpt_image_fd(const char *tempCkptFilename,
 197                                       int *use_compression,
 198                                       int *fdCkptFileOnDisk)
 199 {
 200   *use_compression = 0;  /* default value */
 201 
 202   /* 1. Open fd to checkpoint image on disk */
 203   /* Create temp checkpoint file and write magic number to it */
 204   int flags = O_CREAT | O_TRUNC | O_WRONLY;
 205   int fd = _real_open(tempCkptFilename, flags, 0600);
 206   *fdCkptFileOnDisk = fd; /* if use_compression, fd will be reset to pipe */
 207   JASSERT(fd != -1) (tempCkptFilename) (JASSERT_ERRNO)
 208     .Text ("Error creating file.");
 209 
 210 #ifdef FAST_RST_VIA_MMAP
 211   return fd;
 212 #endif
 213 
 214   /* 2. Test if using GZIP/HBICT compression */
 215   /* 2a. Test if using GZIP compression */
 216   int use_gzip_compression = 0;
 217   int use_deltacompression = 0;
 218   char *gzip_cmd = const_cast<char*> ("gzip");
 219   char gzip_path[PATH_MAX];
 220   use_gzip_compression = test_use_compression(const_cast<char*> ("GZIP"),
 221                                               gzip_cmd, gzip_path, 1);
 222 
 223   /* 2b. Test if using HBICT compression */
 224 # ifdef HBICT_DELTACOMP
 225   char *hbict_cmd = const_cast<char*> ("hbict");
 226   char hbict_path[PATH_MAX];
 227   JTRACE("NOTICE: hbict compression is enabled\n");
 228 
 229   use_deltacompression = test_use_compression(const_cast<char*> ("HBICT"),
 230                                               hbict_cmd, hbict_path, 1);
 231 # endif
 232 
 233   /* 3. We now have the information to pipe to gzip, or directly to fd.
 234   *     We do it this way, so that gzip will be direct child of forked process
 235   *       when using forked checkpointing.
 236   */
 237 
 238   if (use_deltacompression || use_gzip_compression) { /* fork compr. process */
 239     /* 3a. Set SIGCHLD to our own handler;
 240      *     User handling is restored after gzip finishes.
 241      */
 242     prepare_sigchld_handler();
 243 
 244     /* 3b. Open pipe */
 245     int pipe_fds[2];
 246     if (_real_pipe(pipe_fds) == -1) {
 247       JWARNING(false) .Text("Error creating pipe. Compression won't be used.");
 248       use_gzip_compression = use_deltacompression = 0;
 249     }
 250 
 251     /* 3c. Fork compressor child */
 252     if (use_deltacompression) { /* fork a hbict process */
 253 # ifdef HBICT_DELTACOMP
 254       *use_compression = 1;
 255       if ( use_gzip_compression ) // We may want hbict compression only
 256         fd = open_ckpt_to_write_hbict(fd, pipe_fds, hbict_path, gzip_path);
 257       else
 258         fd = open_ckpt_to_write_hbict(fd, pipe_fds, hbict_path, NULL);
 259 # endif
 260     } else if (use_gzip_compression) {/* fork a gzip process */
 261       *use_compression = 1;
 262       fd = open_ckpt_to_write_gz(fd, pipe_fds, gzip_path);
 263       if (pipe_fds[0] == -1) {
 264         /* If open_ckpt_to_write_gz() failed to fork the gzip process */
 265         *use_compression = 0;
 266       }
 267     } else {
 268       JASSERT(false) .Text("Not Reached!\n");
 269     }
 270   }
 271 
 272   return fd;
 273 }
 274 
 275 static int test_and_prepare_for_forked_ckpt()
 276 {
 277 #ifdef TEST_FORKED_CHECKPOINTING
 278   return 1;
 279 #endif
 280 
 281   if (getenv(ENV_VAR_FORKED_CKPT) == NULL) {
 282     return 0;
 283   }
 284 
 285   /* Set SIGCHLD to our own handler;
 286    *     User handling is restored after forking child process.
 287    */
 288   prepare_sigchld_handler();
 289 
 290   pid_t forked_cpid = _real_sys_fork();
 291   if (forked_cpid == -1) {
 292     JWARNING(false)
 293       .Text("Failed to do forked checkpointing, trying normal checkpoint");
 294     return FORKED_CKPT_FAILED;
 295   } else if (forked_cpid > 0) {
 296     restore_sigchld_handler_and_wait_for_zombie(forked_cpid);
 297     JTRACE("checkpoint complete\n");
 298     return FORKED_CKPT_PARENT;
 299   } else {
 300     pid_t grandchild_pid = _real_sys_fork();
 301     JWARNING(grandchild_pid != -1)
 302       .Text("WARNING: Forked checkpoint failed, no checkpoint available");
 303     if (grandchild_pid > 0) {
 304       // Use _exit() instead of exit() to avoid popping atexit() handlers
 305       // registered by the parent process.
 306       _exit(0); /* child exits */
 307     }
 308     /* grandchild continues; no need now to waitpid() on grandchild */
 309     JTRACE("inside grandchild process");
 310   }
 311   return FORKED_CKPT_CHILD;
 312 }
 313 
 314 int
 315 open_ckpt_to_write(int fd, int pipe_fds[2], char **extcomp_args)
 316 {
 317   pid_t cpid;
 318 
 319   cpid = _real_sys_fork();
 320   if (cpid == -1) {
 321     JWARNING(false) (extcomp_args[0]) (JASSERT_ERRNO)
 322       .Text("WARNING: error forking child process. Compression won't be used");
 323     _real_close(pipe_fds[0]);
 324     _real_close(pipe_fds[1]);
 325     pipe_fds[0] = pipe_fds[1] = -1;
 326     //fall through to return fd
 327   } else if (cpid > 0) { /* parent process */
 328     //Before running gzip in child process, we must not use LD_PRELOAD.
 329     // See revision log 342 for details concerning bash.
 330     ckpt_extcomp_child_pid = cpid;
 331     JWARNING(_real_close(pipe_fds[0]) == 0) (JASSERT_ERRNO)
 332       .Text("WARNING: close failed");
 333     fd = pipe_fds[1]; //change return value
 334   } else { /* child process */
 335     //static int (*libc_unsetenv) (const char *name);
 336     //static int (*libc_execvp) (const char *path, char *const argv[]);
 337 
 338     _real_close(pipe_fds[1]);
 339     int infd = _real_dup(pipe_fds[0]);
 340     int outfd = _real_dup(fd);
 341     _real_dup2(infd, STDIN_FILENO);
 342     _real_dup2(outfd, STDOUT_FILENO);
 343 
 344     // No need to close the other FDS
 345     if (pipe_fds[0] > STDERR_FILENO) {
 346       _real_close(pipe_fds[0]);
 347     }
 348     if (infd > STDERR_FILENO) {
 349       _real_close(infd);
 350     }
 351     if (outfd > STDERR_FILENO) {
 352       _real_close(outfd);
 353     }
 354     if (fd > STDERR_FILENO) {
 355       _real_close(fd);
 356     }
 357 
 358     // Don't load libdmtcp.so, etc. in exec.
 359     unsetenv("LD_PRELOAD"); // If in bash, this is bash env. var. version
 360     char *ld_preload_str = (char*) getenv("LD_PRELOAD");
 361     if (ld_preload_str != NULL) {
 362       ld_preload_str[0] = '\0';
 363     }
 364 
 365     _real_execve(extcomp_args[0], extcomp_args, NULL);
 366 
 367     /* should not arrive here */
 368     JASSERT(false)
 369       .Text("Compression failed! No checkpointing will be performed!"
 370             " Cancel now!");
 371   }
 372 
 373   return fd;
 374 }
 375 
 376 
 377 void CkptSerializer::createCkptDir()
 378 {
 379   string ckptDir = ProcessInfo::instance().getCkptDir();
 380   JASSERT(!ckptDir.empty());
 381   JASSERT(mkdir(ckptDir.c_str(), S_IRWXU) == 0 || errno == EEXIST)
 382     (JASSERT_ERRNO) (ckptDir)
 383     .Text("Error creating checkpoint directory");
 384 
 385   JASSERT(0 == access(ckptDir.c_str(), X_OK|W_OK)) (ckptDir)
 386     .Text("ERROR: Missing execute- or write-access to checkpoint dir");
 387 }
 388 
 389 // See comments above for open_ckpt_to_read()
 390 void CkptSerializer::writeCkptImage(void *mtcpHdr, size_t mtcpHdrLen)
 391 {
 392   string ckptFilename = ProcessInfo::instance().getCkptFilename();
 393   string tempCkptFilename = ckptFilename;
 394   tempCkptFilename += ".temp";
 395 
 396   JTRACE("Thread performing checkpoint.") (dmtcp_gettid());
 397   createCkptDir();
 398   forked_ckpt_status = test_and_prepare_for_forked_ckpt();
 399   if (forked_ckpt_status == FORKED_CKPT_PARENT) {
 400     JTRACE("*** Using forked checkpointing.\n");
 401     return;
 402   }
 403 
 404   /* fd will either point to the ckpt file to write, or else the write end
 405    * of a pipe leading to a compression child process.
 406    */
 407   int use_compression = 0;
 408   int fdCkptFileOnDisk = -1;
 409   int fd = -1;
 410 
 411   fd = perform_open_ckpt_image_fd(tempCkptFilename.c_str(), &use_compression,
 412                                   &fdCkptFileOnDisk);
 413   JASSERT(fdCkptFileOnDisk >= 0 );
 414   JASSERT(use_compression || fd == fdCkptFileOnDisk);
 415 
 416   // The rest of this function is for compatibility with original definition.
 417   writeDmtcpHeader(fd);
 418 
 419   // Write MTCP header
 420   JASSERT(Util::writeAll(fd, mtcpHdr, mtcpHdrLen) == (ssize_t) mtcpHdrLen);
 421 
 422   JTRACE ( "MTCP is about to write checkpoint image." )(ckptFilename);
 423   mtcp_writememoryareas(fd);
 424 
 425   if (use_compression) {
 426     /* In perform_open_ckpt_image_fd(), we set SIGCHLD to our own handler.
 427      * Restore it now.
 428      */
 429     restore_sigchld_handler_and_wait_for_zombie(ckpt_extcomp_child_pid);
 430 
 431     /* IF OUT OF DISK SPACE, REPORT IT HERE. */
 432     JASSERT(fsync(fdCkptFileOnDisk) != -1) (JASSERT_ERRNO)
 433       .Text("(compression): fsync error on checkpoint file");
 434     JASSERT(_real_close(fdCkptFileOnDisk) == 0) (JASSERT_ERRNO)
 435       .Text("(compression): error closing checkpoint file.");
 436   }
 437 
 438   /* Now that temp checkpoint file is complete, rename it over old permanent
 439    * checkpoint file.  Uses rename() syscall, which doesn't change i-nodes.
 440    * So, gzip process can continue to write to file even after renaming.
 441    */
 442   JASSERT(rename(tempCkptFilename.c_str(), ckptFilename.c_str()) == 0);
 443 
 444   if (forked_ckpt_status == FORKED_CKPT_CHILD) {
 445     // Use _exit() instead of exit() to avoid popping atexit() handlers
 446     // registered by the parent process.
 447     _exit(0); /* grandchild exits */
 448   }
 449 
 450   JTRACE("checkpoint complete");
 451 }
 452 
 453 void CkptSerializer::writeDmtcpHeader(int fd)
 454 {
 455   const ssize_t len = strlen(DMTCP_FILE_HEADER);
 456   JASSERT(write(fd, DMTCP_FILE_HEADER, len) == len);
 457 
 458   jalib::JBinarySerializeWriterRaw wr("", fd);
 459   ProcessInfo::instance().serialize(wr);
 460   ssize_t written = len + wr.bytes();
 461 
 462   // We must write in multiple of PAGE_SIZE
 463   const ssize_t pagesize = Util::pageSize();
 464   ssize_t remaining = pagesize - (written % pagesize);
 465   char buf[remaining];
 466   JASSERT(Util::writeAll(fd, buf, remaining) == remaining);
 467 }

/* [<][>][^][v][top][bottom][index][help] */