root/ckptserializer.cpp
/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- default_sigchld_handler
- prepare_sigchld_handler
- restore_sigchld_handler_and_wait_for_zombie
- test_use_compression
- open_ckpt_to_write_hbict
- open_ckpt_to_write_gz
- perform_open_ckpt_image_fd
- test_and_prepare_for_forked_ckpt
- open_ckpt_to_write
- createCkptDir
- writeCkptImage
- writeDmtcpHeader
1 /****************************************************************************
2 * Copyright (C) 2006-2013 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of DMTCP. *
6 * *
7 * DMTCP is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP. If not, see <http://www.gnu.org/licenses/>. *
19 ****************************************************************************/
20
21 #include <stdlib.h>
22 #include <limits.h> /* for LONG_MIN and LONG_MAX */
23 #include <sys/types.h>
24 #include <sys/wait.h>
25 #include <sys/syscall.h>
26 #ifdef __aarch64__
27 /* On aarch64, fork() is not implemented, in favor of clone().
28 * A true fork call would include CLONE_CHILD_SETTID and set the thread id
29 * in the thread area of the child (using set_thread_area). We don't do that.
30 */
31 # define _real_sys_fork() _real_syscall(SYS_clone, \
32 CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, NULL, NULL, NULL)
33 #else
34 # define _real_sys_fork() _real_syscall(SYS_fork)
35 #endif
36 #include <signal.h>
37 #include <unistd.h>
38 #include <fcntl.h>
39 #include "constants.h"
40 #include "util.h"
41 #include "syscallwrappers.h"
42 #include "dmtcp.h"
43 #include "protectedfds.h"
44 #include "ckptserializer.h"
45
46 // aarch64 doesn't define SYS_pipe kernel call by default.
47 #if defined(__aarch64__)
48 # define _real_pipe(a) _real_syscall(SYS_pipe2, a, 0)
49 #else
50 # define _real_pipe(a) _real_syscall(SYS_pipe, a)
51 #endif
52 #define _real_waitpid(a,b,c) _real_syscall(SYS_wait4,a,b,c,NULL)
53
54 using namespace dmtcp;
55
56 #define FORKED_CKPT_FAILED 0
57 #define FORKED_CKPT_PARENT 1
58 #define FORKED_CKPT_CHILD 2
59
60 static int forked_ckpt_status = -1;
61 static pid_t ckpt_extcomp_child_pid = -1;
62 static struct sigaction saved_sigchld_action;
63 static int open_ckpt_to_write(int fd, int pipe_fds[2], char **extcomp_args);
64 void mtcp_writememoryareas(int fd) __attribute__((weak));
65
66 /* We handle SIGCHLD while checkpointing. */
67 static void default_sigchld_handler(int sig) {
68 JASSERT(sig == SIGCHLD);
69 }
70
71 static void prepare_sigchld_handler()
72 {
73 /* 3a. Set SIGCHLD to our own handler;
74 * User handling is restored after gzip finishes.
75 * SEE: sigaction(2), NOTES: only portable method of avoiding zombies
76 * is to catch SIGCHLD signal and perform a wait(2) or similar.
77 *
78 * NOTE: Although the default action for SIGCHLD is supposedly SIG_IGN,
79 * for historical reasons there are differences between SIG_DFL and SIG_IGN
80 * for SIGCHLD. See sigaction(2), NOTES section for more details. */
81 struct sigaction default_sigchld_action;
82 memset(&default_sigchld_action, 0, sizeof (default_sigchld_action));
83 default_sigchld_action.sa_handler = default_sigchld_handler;
84 sigaction(SIGCHLD, &default_sigchld_action, &saved_sigchld_action);
85 }
86
87 static void restore_sigchld_handler_and_wait_for_zombie(pid_t pid)
88 {
89 /* This is done to avoid calling the user SIGCHLD handler when gzip
90 * or another compression utility exits.
91 * NOTE: We must wait in case user did rapid ckpt-kill in succession.
92 * Otherwise, kernel could have optimization allowing us to close fd and
93 * rename tmp ckpt file to permanent even while gzip is still writing.
94 */
95 /* All signals, incl. SIGCHLD, were blocked in mtcp.c:save_sig_state()
96 * when beginning the ckpt. A SIGCHLD handler was declared before
97 * the gzip child process was forked. The child may have already
98 * terminated and returned, creating a blocked SIGCHLD.
99 * So, we use sigsuspend so that we can clean up the child zombie
100 * with wait4 whether it already terminated or not yet.
101 */
102 sigset_t suspend_sigset;
103 sigfillset(&suspend_sigset);
104 sigdelset(&suspend_sigset, SIGCHLD);
105 _real_sigsuspend(&suspend_sigset);
106 JWARNING(_real_waitpid(pid, NULL, 0) != -1) (pid) (JASSERT_ERRNO);
107 pid = -1;
108 sigaction(SIGCHLD, &saved_sigchld_action, NULL);
109 }
110
111 /*
112 * This function returns the fd to which the checkpoint file should be written.
113 * The purpose of using this function over open() is that this
114 * function will handle compression and gzipping.
115 */
116 static int test_use_compression(char *compressor, char *command, char *path,
117 int def)
118 {
119 char *default_val;
120 char evar[256] = "DMTCP_";
121 char *do_we_compress;
122
123 if (def)
124 default_val = const_cast<char*> ("1");
125 else
126 default_val = const_cast<char*> ("0");
127
128 JASSERT( strlen(strncat(evar, compressor, sizeof(evar) - strlen(evar) - 1))
129 < sizeof(evar) - 1 )
130 (compressor) .Text("compressor is too long.");
131 do_we_compress = getenv(evar);
132 // env var is unset, let's default to enabled
133 // to disable compression, run with MTCP_GZIP=0
134 if (do_we_compress == NULL)
135 do_we_compress = default_val;
136
137 char *endptr;
138 errno = 0;
139 long int rc = strtol(do_we_compress, &endptr, 0);
140 JASSERT(rc != LONG_MIN && rc != LONG_MAX) (do_we_compress) (JASSERT_ERRNO);
141 if (*do_we_compress == '\0' || *endptr != '\0') {
142 JWARNING(false) (evar) (do_we_compress)
143 .Text("Compression env var not defined as a number."
144 " Checkpoint image will not be compressed.");
145 do_we_compress = const_cast<char*> ("0");
146 }
147
148 if ( 0 == strcmp(do_we_compress, "0") )
149 return 0;
150
151 /* Check if the executable exists. */
152 if (Util::findExecutable(command, getenv("PATH"), path) == NULL) {
153 JWARNING(false) (command)
154 .Text("Command cannot be executed. Compression will not be used.");
155 return 0;
156 }
157
158 /* If we arrive down here, it's safe to compress. */
159 return 1;
160 }
161
162 #ifdef HBICT_DELTACOMP
163 static int open_ckpt_to_write_hbict(int fd, int pipe_fds[2], char *hbict_path,
164 char *gzip_path)
165 {
166 char *hbict_args[] = {
167 const_cast<char*> ("hbict"),
168 const_cast<char*> ("-a"),
169 NULL,
170 NULL
171 };
172 hbict_args[0] = hbict_path;
173 JTRACE("open_ckpt_to_write_hbict\n");
174
175 if (gzip_path != NULL){
176 hbict_args[2] = const_cast<char*> ("-z100");
177 }
178 return open_ckpt_to_write(fd,pipe_fds,hbict_args);
179 }
180 #endif
181
182 static int open_ckpt_to_write_gz(int fd, int pipe_fds[2], char *gzip_path)
183 {
184 char *gzip_args[] = {
185 const_cast<char*> ("gzip"),
186 const_cast<char*> ("-1"),
187 const_cast<char*> ("-"),
188 NULL
189 };
190 gzip_args[0] = gzip_path;
191 JTRACE("open_ckpt_to_write_gz\n");
192
193 return open_ckpt_to_write(fd,pipe_fds,gzip_args);
194 }
195
196 static int perform_open_ckpt_image_fd(const char *tempCkptFilename,
197 int *use_compression,
198 int *fdCkptFileOnDisk)
199 {
200 *use_compression = 0; /* default value */
201
202 /* 1. Open fd to checkpoint image on disk */
203 /* Create temp checkpoint file and write magic number to it */
204 int flags = O_CREAT | O_TRUNC | O_WRONLY;
205 int fd = _real_open(tempCkptFilename, flags, 0600);
206 *fdCkptFileOnDisk = fd; /* if use_compression, fd will be reset to pipe */
207 JASSERT(fd != -1) (tempCkptFilename) (JASSERT_ERRNO)
208 .Text ("Error creating file.");
209
210 #ifdef FAST_RST_VIA_MMAP
211 return fd;
212 #endif
213
214 /* 2. Test if using GZIP/HBICT compression */
215 /* 2a. Test if using GZIP compression */
216 int use_gzip_compression = 0;
217 int use_deltacompression = 0;
218 char *gzip_cmd = const_cast<char*> ("gzip");
219 char gzip_path[PATH_MAX];
220 use_gzip_compression = test_use_compression(const_cast<char*> ("GZIP"),
221 gzip_cmd, gzip_path, 1);
222
223 /* 2b. Test if using HBICT compression */
224 # ifdef HBICT_DELTACOMP
225 char *hbict_cmd = const_cast<char*> ("hbict");
226 char hbict_path[PATH_MAX];
227 JTRACE("NOTICE: hbict compression is enabled\n");
228
229 use_deltacompression = test_use_compression(const_cast<char*> ("HBICT"),
230 hbict_cmd, hbict_path, 1);
231 # endif
232
233 /* 3. We now have the information to pipe to gzip, or directly to fd.
234 * We do it this way, so that gzip will be direct child of forked process
235 * when using forked checkpointing.
236 */
237
238 if (use_deltacompression || use_gzip_compression) { /* fork compr. process */
239 /* 3a. Set SIGCHLD to our own handler;
240 * User handling is restored after gzip finishes.
241 */
242 prepare_sigchld_handler();
243
244 /* 3b. Open pipe */
245 int pipe_fds[2];
246 if (_real_pipe(pipe_fds) == -1) {
247 JWARNING(false) .Text("Error creating pipe. Compression won't be used.");
248 use_gzip_compression = use_deltacompression = 0;
249 }
250
251 /* 3c. Fork compressor child */
252 if (use_deltacompression) { /* fork a hbict process */
253 # ifdef HBICT_DELTACOMP
254 *use_compression = 1;
255 if ( use_gzip_compression ) // We may want hbict compression only
256 fd = open_ckpt_to_write_hbict(fd, pipe_fds, hbict_path, gzip_path);
257 else
258 fd = open_ckpt_to_write_hbict(fd, pipe_fds, hbict_path, NULL);
259 # endif
260 } else if (use_gzip_compression) {/* fork a gzip process */
261 *use_compression = 1;
262 fd = open_ckpt_to_write_gz(fd, pipe_fds, gzip_path);
263 if (pipe_fds[0] == -1) {
264 /* If open_ckpt_to_write_gz() failed to fork the gzip process */
265 *use_compression = 0;
266 }
267 } else {
268 JASSERT(false) .Text("Not Reached!\n");
269 }
270 }
271
272 return fd;
273 }
274
275 static int test_and_prepare_for_forked_ckpt()
276 {
277 #ifdef TEST_FORKED_CHECKPOINTING
278 return 1;
279 #endif
280
281 if (getenv(ENV_VAR_FORKED_CKPT) == NULL) {
282 return 0;
283 }
284
285 /* Set SIGCHLD to our own handler;
286 * User handling is restored after forking child process.
287 */
288 prepare_sigchld_handler();
289
290 pid_t forked_cpid = _real_sys_fork();
291 if (forked_cpid == -1) {
292 JWARNING(false)
293 .Text("Failed to do forked checkpointing, trying normal checkpoint");
294 return FORKED_CKPT_FAILED;
295 } else if (forked_cpid > 0) {
296 restore_sigchld_handler_and_wait_for_zombie(forked_cpid);
297 JTRACE("checkpoint complete\n");
298 return FORKED_CKPT_PARENT;
299 } else {
300 pid_t grandchild_pid = _real_sys_fork();
301 JWARNING(grandchild_pid != -1)
302 .Text("WARNING: Forked checkpoint failed, no checkpoint available");
303 if (grandchild_pid > 0) {
304 // Use _exit() instead of exit() to avoid popping atexit() handlers
305 // registered by the parent process.
306 _exit(0); /* child exits */
307 }
308 /* grandchild continues; no need now to waitpid() on grandchild */
309 JTRACE("inside grandchild process");
310 }
311 return FORKED_CKPT_CHILD;
312 }
313
314 int
315 open_ckpt_to_write(int fd, int pipe_fds[2], char **extcomp_args)
316 {
317 pid_t cpid;
318
319 cpid = _real_sys_fork();
320 if (cpid == -1) {
321 JWARNING(false) (extcomp_args[0]) (JASSERT_ERRNO)
322 .Text("WARNING: error forking child process. Compression won't be used");
323 _real_close(pipe_fds[0]);
324 _real_close(pipe_fds[1]);
325 pipe_fds[0] = pipe_fds[1] = -1;
326 //fall through to return fd
327 } else if (cpid > 0) { /* parent process */
328 //Before running gzip in child process, we must not use LD_PRELOAD.
329 // See revision log 342 for details concerning bash.
330 ckpt_extcomp_child_pid = cpid;
331 JWARNING(_real_close(pipe_fds[0]) == 0) (JASSERT_ERRNO)
332 .Text("WARNING: close failed");
333 fd = pipe_fds[1]; //change return value
334 } else { /* child process */
335 //static int (*libc_unsetenv) (const char *name);
336 //static int (*libc_execvp) (const char *path, char *const argv[]);
337
338 _real_close(pipe_fds[1]);
339 int infd = _real_dup(pipe_fds[0]);
340 int outfd = _real_dup(fd);
341 _real_dup2(infd, STDIN_FILENO);
342 _real_dup2(outfd, STDOUT_FILENO);
343
344 // No need to close the other FDS
345 if (pipe_fds[0] > STDERR_FILENO) {
346 _real_close(pipe_fds[0]);
347 }
348 if (infd > STDERR_FILENO) {
349 _real_close(infd);
350 }
351 if (outfd > STDERR_FILENO) {
352 _real_close(outfd);
353 }
354 if (fd > STDERR_FILENO) {
355 _real_close(fd);
356 }
357
358 // Don't load libdmtcp.so, etc. in exec.
359 unsetenv("LD_PRELOAD"); // If in bash, this is bash env. var. version
360 char *ld_preload_str = (char*) getenv("LD_PRELOAD");
361 if (ld_preload_str != NULL) {
362 ld_preload_str[0] = '\0';
363 }
364
365 _real_execve(extcomp_args[0], extcomp_args, NULL);
366
367 /* should not arrive here */
368 JASSERT(false)
369 .Text("Compression failed! No checkpointing will be performed!"
370 " Cancel now!");
371 }
372
373 return fd;
374 }
375
376
377 void CkptSerializer::createCkptDir()
378 {
379 string ckptDir = ProcessInfo::instance().getCkptDir();
380 JASSERT(!ckptDir.empty());
381 JASSERT(mkdir(ckptDir.c_str(), S_IRWXU) == 0 || errno == EEXIST)
382 (JASSERT_ERRNO) (ckptDir)
383 .Text("Error creating checkpoint directory");
384
385 JASSERT(0 == access(ckptDir.c_str(), X_OK|W_OK)) (ckptDir)
386 .Text("ERROR: Missing execute- or write-access to checkpoint dir");
387 }
388
389 // See comments above for open_ckpt_to_read()
390 void CkptSerializer::writeCkptImage(void *mtcpHdr, size_t mtcpHdrLen)
391 {
392 string ckptFilename = ProcessInfo::instance().getCkptFilename();
393 string tempCkptFilename = ckptFilename;
394 tempCkptFilename += ".temp";
395
396 JTRACE("Thread performing checkpoint.") (dmtcp_gettid());
397 createCkptDir();
398 forked_ckpt_status = test_and_prepare_for_forked_ckpt();
399 if (forked_ckpt_status == FORKED_CKPT_PARENT) {
400 JTRACE("*** Using forked checkpointing.\n");
401 return;
402 }
403
404 /* fd will either point to the ckpt file to write, or else the write end
405 * of a pipe leading to a compression child process.
406 */
407 int use_compression = 0;
408 int fdCkptFileOnDisk = -1;
409 int fd = -1;
410
411 fd = perform_open_ckpt_image_fd(tempCkptFilename.c_str(), &use_compression,
412 &fdCkptFileOnDisk);
413 JASSERT(fdCkptFileOnDisk >= 0 );
414 JASSERT(use_compression || fd == fdCkptFileOnDisk);
415
416 // The rest of this function is for compatibility with original definition.
417 writeDmtcpHeader(fd);
418
419 // Write MTCP header
420 JASSERT(Util::writeAll(fd, mtcpHdr, mtcpHdrLen) == (ssize_t) mtcpHdrLen);
421
422 JTRACE ( "MTCP is about to write checkpoint image." )(ckptFilename);
423 mtcp_writememoryareas(fd);
424
425 if (use_compression) {
426 /* In perform_open_ckpt_image_fd(), we set SIGCHLD to our own handler.
427 * Restore it now.
428 */
429 restore_sigchld_handler_and_wait_for_zombie(ckpt_extcomp_child_pid);
430
431 /* IF OUT OF DISK SPACE, REPORT IT HERE. */
432 JASSERT(fsync(fdCkptFileOnDisk) != -1) (JASSERT_ERRNO)
433 .Text("(compression): fsync error on checkpoint file");
434 JASSERT(_real_close(fdCkptFileOnDisk) == 0) (JASSERT_ERRNO)
435 .Text("(compression): error closing checkpoint file.");
436 }
437
438 /* Now that temp checkpoint file is complete, rename it over old permanent
439 * checkpoint file. Uses rename() syscall, which doesn't change i-nodes.
440 * So, gzip process can continue to write to file even after renaming.
441 */
442 JASSERT(rename(tempCkptFilename.c_str(), ckptFilename.c_str()) == 0);
443
444 if (forked_ckpt_status == FORKED_CKPT_CHILD) {
445 // Use _exit() instead of exit() to avoid popping atexit() handlers
446 // registered by the parent process.
447 _exit(0); /* grandchild exits */
448 }
449
450 JTRACE("checkpoint complete");
451 }
452
453 void CkptSerializer::writeDmtcpHeader(int fd)
454 {
455 const ssize_t len = strlen(DMTCP_FILE_HEADER);
456 JASSERT(write(fd, DMTCP_FILE_HEADER, len) == len);
457
458 jalib::JBinarySerializeWriterRaw wr("", fd);
459 ProcessInfo::instance().serialize(wr);
460 ssize_t written = len + wr.bytes();
461
462 // We must write in multiple of PAGE_SIZE
463 const ssize_t pagesize = Util::pageSize();
464 ssize_t remaining = pagesize - (written % pagesize);
465 char buf[remaining];
466 JASSERT(Util::writeAll(fd, buf, remaining) == remaining);
467 }