/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- dmtcp_event_hook
- _do_lock_tbl
- _do_unlock_tbl
- huge_memcpy
- instance
- instance
- instance
- _type
- removeStaleObjects
- resetOnFork
- leaderElection
- preCkptDrain
- preCheckpoint
- preResume
- refill
- postRestart
- virtualToRealId
- realToVirtualId
- updateMapping
- getNewVirtualId
- serialize
- on_shmget
- on_shmat
- on_shmdt
- shmaddrToShmid
- on_semget
- on_semctl
- on_semop
- on_msgget
- on_msgctl
- on_msgsnd
- on_msgrcv
- on_shmat
- on_shmdt
- isValidShmaddr
- isStale
- leaderElection
- preCkptDrain
- preCheckpoint
- postRestart
- refill
- preResume
- on_semop
- isStale
- resetOnFork
- leaderElection
- preCkptDrain
- preCheckpoint
- postRestart
- refill
- isStale
- leaderElection
- preCkptDrain
- preCheckpoint
- postRestart
- refill
1 /****************************************************************************
2 * Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
6 * *
7 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 #include <sys/ioctl.h>
23 #include <unistd.h>
24 #include <sys/file.h>
25 #include <sys/mman.h>
26 #include <sys/sem.h>
27 #include <iostream>
28 #include <iostream>
29 #include <ios>
30 #include <fstream>
31
32 #include "util.h"
33 #include "dmtcp.h"
34 #include "shareddata.h"
35 #include "jassert.h"
36 #include "jfilesystem.h"
37 #include "jconvert.h"
38 #include "jserialize.h"
39
40 #include "sysvipc.h"
41 #include "sysvipcwrappers.h"
42
43 using namespace dmtcp;
44
45 // FIXME: Check and verify the correctness of SEM_UNDO logic for Semaphores.
46
47 /*
48 * Algorithm for properly checkpointing shared memory segments.
49 * 1. BARRIER -- SUSPENDED
50 * 2. Call shmat() and shmdt() for each shm-object. This way the last process
51 * to call shmdt() is elected as the ckptLeader.
52 * 3. BARRIER -- LOCKED
53 * 4. Each process marks itself as ckptLeader if it was elected ckptLeader.
54 * If the ckptLeader doesn't have the shm object mapped, map it now.
55 * 6. BARRIER -- DRAINED
56 * 7. For each shm-object, the ckptLeader unmaps all-but-first shmat() address.
57 * 8. Non ckptLeader processes unmap all shmat() addresses corresponding to
58 * the shm-object.
59 * 9. BARRIER -- CHECKPOINTED
60 * 10. At this point, the contents of the memory-segment have been saved.
61 * 11. BARRIER -- REFILLED
62 * 12. Re-map the memory-segment into each process's memory as it existed prior
63 * to checkpoint.
64 * 13. TODO: Unmap the memory that was mapped in step 4.
65 * 14. BARRIER -- RESUME
66 *
67 * Steps involved in Restart
68 * 0. BARRIER -- RESTARTING
69 * 1. Restore process memory
70 * 2. Insert original-shmids into a node-wide shared file so that other
71 * processes can know about all the existing shmids in order to avoid
72 * future conflicts.
73 * 3. BARRIER -- CHECKPOINTED
74 * 4. Read all original-shmids from the file
75 * 5. Re-create shared-memory segments which were checkpointed by this process.
76 * 6. Remap the shm-segment to a temp addr and copy the checkpointed contents
77 * to this address. Now unmap the area where the checkpointed contents were
78 * stored and map the shm-segment on that address. Unmap the temp addr now.
79 * Remap the shm-segment to the original location.
80 * 7. Write original->current mappings for all shmids which we got from
81 * shmget() in previous step.
82 * 8. BARRIER -- REFILLED
83 * 9. Re-map the memory-segment into each process's memory as it existed prior
84 * to checkpoint.
85 * 10. BARRIER -- RESUME
86 */
87
88 /* TODO: Handle the case when the segment is marked for removal at ckpt time.
89 */
90
91 static pthread_mutex_t tblLock = PTHREAD_MUTEX_INITIALIZER;
92
93 extern "C" void dmtcp_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
94 {
95 switch (event) {
96 case DMTCP_EVENT_ATFORK_CHILD:
97 SysVShm::instance().resetOnFork();
98 SysVSem::instance().resetOnFork();
99 SysVMsq::instance().resetOnFork();
100 break;
101 case DMTCP_EVENT_WRITE_CKPT:
102 SysVShm::instance().preCheckpoint();
103 SysVSem::instance().preCheckpoint();
104 SysVMsq::instance().preCheckpoint();
105 break;
106
107 case DMTCP_EVENT_LEADER_ELECTION:
108 SysVShm::instance().leaderElection();
109 SysVSem::instance().leaderElection();
110 SysVMsq::instance().leaderElection();
111 break;
112
113 case DMTCP_EVENT_DRAIN:
114 SysVShm::instance().preCkptDrain();
115 SysVSem::instance().preCkptDrain();
116 SysVMsq::instance().preCkptDrain();
117 break;
118
119 case DMTCP_EVENT_REFILL:
120 SysVShm::instance().refill(data->refillInfo.isRestart);
121 SysVSem::instance().refill(data->refillInfo.isRestart);
122 SysVMsq::instance().refill(data->refillInfo.isRestart);
123 break;
124
125 case DMTCP_EVENT_THREADS_RESUME:
126 SysVShm::instance().preResume();
127 SysVSem::instance().preResume();
128 SysVMsq::instance().preResume();
129 break;
130
131 case DMTCP_EVENT_PRE_EXEC:
132 {
133 jalib::JBinarySerializeWriterRaw wr("", data->serializerInfo.fd);
134 SysVShm::instance().serialize(wr);
135 SysVSem::instance().serialize(wr);
136 SysVMsq::instance().serialize(wr);
137 }
138 break;
139
140 case DMTCP_EVENT_POST_EXEC:
141 {
142 jalib::JBinarySerializeReaderRaw rd("", data->serializerInfo.fd);
143 SysVShm::instance().serialize(rd);
144 SysVSem::instance().serialize(rd);
145 SysVMsq::instance().serialize(rd);
146 }
147 break;
148
149 case DMTCP_EVENT_RESTART:
150 SysVShm::instance().postRestart();
151 SysVSem::instance().postRestart();
152 SysVMsq::instance().postRestart();
153 break;
154
155 default:
156 break;
157 }
158 DMTCP_NEXT_EVENT_HOOK(event, data);
159 }
160
161 static void _do_lock_tbl()
162 {
163 JASSERT(_real_pthread_mutex_lock(&tblLock) == 0) (JASSERT_ERRNO);
164 }
165
166 static void _do_unlock_tbl()
167 {
168 JASSERT(_real_pthread_mutex_unlock(&tblLock) == 0) (JASSERT_ERRNO);
169 }
170
171 static void huge_memcpy(char *dest, char *src, size_t size)
172 {
173 if (size < 100 * 1024 * 1024) {
174 memcpy(dest, src, size);
175 return;
176 }
177 const size_t hundredMB = (100 * 1024 * 1024);
178 //const size_t oneGB = (1024 * 1024 * 1024);
179 size_t chunkSize = hundredMB;
180 static long page_size = sysconf(_SC_PAGESIZE);
181 static long pagesPerChunk = chunkSize / page_size;
182 size_t n = size / chunkSize;
183 for (size_t i = 0; i < n; i++) {
184 if (!Util::areZeroPages(src, pagesPerChunk)) {
185 memcpy(dest, src, chunkSize);
186 }
187 madvise(src, chunkSize, MADV_DONTNEED);
188 dest += chunkSize;
189 src += chunkSize;
190 size -= chunkSize;
191 }
192 memcpy(dest, src, size);
193 }
194
195 static SysVShm *sysvShmInst = NULL;
196 static SysVSem *sysvSemInst = NULL;
197 static SysVMsq *sysvMsqInst = NULL;
198 SysVShm& SysVShm::instance()
199 {
200 if (sysvShmInst == NULL) {
201 sysvShmInst = new SysVShm();
202 }
203 return *sysvShmInst;
204 }
205
206 SysVSem& SysVSem::instance()
207 {
208 if (sysvSemInst == NULL) {
209 sysvSemInst = new SysVSem();
210 }
211 return *sysvSemInst;
212 }
213
214 SysVMsq& SysVMsq::instance()
215 {
216 if (sysvMsqInst == NULL) {
217 sysvMsqInst = new SysVMsq();
218 }
219 return *sysvMsqInst;
220 }
221
222 /******************************************************************************
223 *
224 * SysVIPC Parent Class
225 *
226 *****************************************************************************/
227
228 SysVIPC::SysVIPC(const char *str, int32_t id, int type)
229 : _virtIdTable(str, id),
230 _type(type)
231
232 {
233 _do_lock_tbl();
234 _map.clear();
235 _do_unlock_tbl();
236 }
237
238 void SysVIPC::removeStaleObjects()
239 {
240 _do_lock_tbl();
241 vector<int> staleIds;
242 for (Iterator i = _map.begin(); i != _map.end(); ++i) {
243 SysVObj* obj = i->second;
244 if (obj->isStale()) {
245 staleIds.push_back(i->first);
246 }
247 }
248 for (size_t j = 0; j < staleIds.size(); ++j) {
249 delete _map[staleIds[j]];
250 _map.erase(staleIds[j]);
251 _virtIdTable.erase(staleIds[j]);
252 }
253 _do_unlock_tbl();
254 }
255
256 void SysVIPC::resetOnFork()
257 {
258 _virtIdTable.resetOnFork(getpid());
259 for (Iterator i = _map.begin(); i != _map.end(); ++i) {
260 i->second->resetOnFork();
261 }
262 }
263
264 void SysVIPC::leaderElection()
265 {
266 /* Remove all invalid/removed shm segments*/
267 removeStaleObjects();
268
269 for (Iterator i = _map.begin(); i != _map.end(); ++i) {
270 i->second->leaderElection();
271 }
272 }
273
274 void SysVIPC::preCkptDrain()
275 {
276 for (Iterator i = _map.begin(); i != _map.end(); ++i) {
277 i->second->preCkptDrain();
278 }
279 }
280
281 void SysVIPC::preCheckpoint()
282 {
283 for (Iterator i = _map.begin(); i != _map.end(); ++i) {
284 i->second->preCheckpoint();
285 }
286 }
287
288 void SysVIPC::preResume()
289 {
290 for (Iterator i = _map.begin(); i != _map.end(); ++i) {
291 i->second->preResume();
292 }
293 }
294
295 void SysVIPC::refill(bool isRestart)
296 {
297 if (!isRestart) return;
298
299 for (Iterator i = _map.begin(); i != _map.end(); ++i) {
300 i->second->refill(isRestart);
301 }
302 }
303
304 void SysVIPC::postRestart()
305 {
306 _virtIdTable.clear();
307
308 for (Iterator i = _map.begin(); i != _map.end(); ++i) {
309 i->second->postRestart();
310 }
311 }
312
313 int SysVIPC::virtualToRealId(int virtId)
314 {
315 if (_virtIdTable.virtualIdExists(virtId)) {
316 return _virtIdTable.virtualToReal(virtId);
317 } else {
318 int realId = SharedData::getRealIPCId(_type, virtId);
319 _virtIdTable.updateMapping(virtId, realId);
320 return realId;
321 }
322 }
323 int SysVIPC::realToVirtualId(int realId)
324 {
325 if (_virtIdTable.realIdExists(realId)) {
326 return _virtIdTable.realToVirtual(realId);
327 } else {
328 return -1;
329 }
330 }
331 void SysVIPC::updateMapping(int virtId, int realId)
332 {
333 _virtIdTable.updateMapping(virtId, realId);
334 SharedData::setIPCIdMap(_type, virtId, realId);
335 }
336
337 int SysVIPC::getNewVirtualId()
338 {
339 int32_t id;
340 JASSERT(_virtIdTable.getNewVirtualId(&id)) (_virtIdTable.size())
341 .Text("Exceeded maximum number of Sys V objects allowed");
342
343 return id;
344 }
345
346 void SysVIPC::serialize(jalib::JBinarySerializer& o)
347 {
348 _virtIdTable.serialize(o);
349 }
350 /******************************************************************************
351 *
352 * SysVIPC Subclasses
353 *
354 *****************************************************************************/
355
356 /*
357 * Shared Memory
358 */
359 void SysVShm::on_shmget(int shmid, key_t key, size_t size, int shmflg)
360 {
361 _do_lock_tbl();
362 if (!_virtIdTable.realIdExists(shmid)) {
363 JASSERT(_map.find(shmid) == _map.end());
364 int virtId = getNewVirtualId();
365 JTRACE ("Shmid not found in table. Creating new entry")
366 (shmid) (virtId);
367 updateMapping(virtId, shmid);
368 _map[virtId] = new ShmSegment(virtId, shmid, key, size, shmflg);
369 } else {
370 JASSERT(_map.find(shmid) != _map.end());
371 }
372 _do_unlock_tbl();
373 }
374
375 void SysVShm::on_shmat(int shmid, const void *shmaddr, int shmflg,
376 void* newaddr)
377 {
378 _do_lock_tbl();
379 if (!_virtIdTable.virtualIdExists(shmid)) {
380 int realId = SharedData::getRealIPCId(_type, shmid);
381 updateMapping(shmid, realId);
382 }
383 if (_map.find(shmid) == _map.end()) {
384 int realId = VIRTUAL_TO_REAL_SHM_ID(shmid);
385 _map[shmid] = new ShmSegment(shmid, realId, -1, -1, -1);
386 }
387
388 JASSERT(shmaddr == NULL || shmaddr == newaddr);
389 ((ShmSegment*)_map[shmid])->on_shmat(newaddr, shmflg);
390 _do_unlock_tbl();
391 }
392
393 void SysVShm::on_shmdt(const void *shmaddr)
394 {
395 int shmid = shmaddrToShmid(shmaddr);
396 JASSERT(shmid != -1) (shmaddr)
397 .Text("No corresponding shmid found for given shmaddr");
398 _do_lock_tbl();
399 ((ShmSegment*)_map[shmid])->on_shmdt(shmaddr);
400 if (_map[shmid]->isStale()) {
401 _map.erase(shmid);
402 }
403 _do_unlock_tbl();
404 }
405
406 int SysVShm::shmaddrToShmid(const void* shmaddr)
407 {
408 DMTCP_PLUGIN_DISABLE_CKPT();
409 int shmid = -1;
410 _do_lock_tbl();
411 for (Iterator i = _map.begin(); i != _map.end(); ++i) {
412 ShmSegment* shmObj = (ShmSegment*)i->second;
413 if (shmObj->isValidShmaddr(shmaddr)) {
414 shmid = i->first;
415 break;
416 }
417 }
418 _do_unlock_tbl();
419 DMTCP_PLUGIN_ENABLE_CKPT();
420 return shmid;
421 }
422
423 /*
424 * Semaphore
425 */
426 void SysVSem::on_semget(int semid, key_t key, int nsems, int semflg)
427 {
428 _do_lock_tbl();
429 if (!_virtIdTable.realIdExists(semid)) {
430 //JASSERT(key == IPC_PRIVATE || (semflg & IPC_CREAT) != 0) (key) (semid);
431 JASSERT(_map.find(semid) == _map.end());
432 JTRACE ("Semid not found in table. Creating new entry") (semid);
433 int virtId = getNewVirtualId();
434 updateMapping(virtId, semid);
435 _map[virtId] = new Semaphore (virtId, semid, key, nsems, semflg);
436 } else {
437 JASSERT(_map.find(semid) != _map.end());
438 }
439 _do_unlock_tbl();
440 }
441
442 void SysVSem::on_semctl(int semid, int semnum, int cmd, union semun arg)
443 {
444 _do_lock_tbl();
445 if (cmd == IPC_RMID && _virtIdTable.virtualIdExists(semid)) {
446 JASSERT(_map[semid]->isStale()) (semid);
447 _map.erase(semid);
448 }
449 _do_unlock_tbl();
450 return;
451 }
452
453 void SysVSem::on_semop(int semid, struct sembuf *sops, unsigned nsops)
454 {
455 _do_lock_tbl();
456 if (!_virtIdTable.virtualIdExists(semid)) {
457 int realId = SharedData::getRealIPCId(_type, semid);
458 updateMapping(semid, realId);
459 }
460 if (_map.find(semid) == _map.end()) {
461 int realId = VIRTUAL_TO_REAL_SEM_ID(semid);
462 _map[semid] = new Semaphore(semid, realId, -1, -1, -1);
463 }
464 ((Semaphore*)_map[semid])->on_semop(sops, nsops);
465 _do_unlock_tbl();
466 }
467
468 /*
469 * Message Queue
470 */
471 void SysVMsq::on_msgget(int msqid, key_t key, int msgflg)
472 {
473 _do_lock_tbl();
474 if (!_virtIdTable.realIdExists(msqid)) {
475 JASSERT(_map.find(msqid) == _map.end());
476 JTRACE ("Msqid not found in table. Creating new entry") (msqid);
477 int virtId = getNewVirtualId();
478 updateMapping(virtId, msqid);
479 _map[virtId] = new MsgQueue (virtId, msqid, key, msgflg);
480 } else {
481 JASSERT(_map.find(msqid) != _map.end());
482 }
483 _do_unlock_tbl();
484 }
485
486 void SysVMsq::on_msgctl(int msqid, int cmd, struct msqid_ds *buf)
487 {
488 _do_lock_tbl();
489 if (cmd == IPC_RMID && _virtIdTable.virtualIdExists(msqid)) {
490 JASSERT(_map[msqid]->isStale()) (msqid);
491 _map.erase(msqid);
492 }
493 _do_unlock_tbl();
494 return;
495 }
496
497 void SysVMsq::on_msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)
498 {
499 _do_lock_tbl();
500 if (!_virtIdTable.virtualIdExists(msqid)) {
501 int realId = SharedData::getRealIPCId(_type, msqid);
502 updateMapping(msqid, realId);
503 }
504 if (_map.find(msqid) == _map.end()) {
505 int realId = VIRTUAL_TO_REAL_MSQ_ID(msqid);
506 _map[msqid] = new MsgQueue(msqid, realId, -1, -1);
507 }
508 _do_unlock_tbl();
509 }
510
511 void SysVMsq::on_msgrcv(int msqid, const void *msgp, size_t msgsz,
512 int msgtyp, int msgflg)
513 {
514 _do_lock_tbl();
515 if (!_virtIdTable.virtualIdExists(msqid)) {
516 int realId = SharedData::getRealIPCId(_type, msqid);
517 updateMapping(msqid, realId);
518 }
519 if (_map.find(msqid) == _map.end()) {
520 int realId = VIRTUAL_TO_REAL_MSQ_ID(msqid);
521 _map[msqid] = new MsgQueue(msqid, realId, -1, -1);
522 }
523 _do_unlock_tbl();
524 }
525
526 /******************************************************************************
527 *
528 * ShmSegment Methods
529 *
530 *****************************************************************************/
531
532 ShmSegment::ShmSegment(int shmid, int realShmid, key_t key, size_t size,
533 int shmflg)
534 : SysVObj(shmid, realShmid, key, shmflg)
535 {
536 _size = size;
537 if (key == -1 || size == 0) {
538 struct shmid_ds shminfo;
539 JASSERT(_real_shmctl(_realId, IPC_STAT, &shminfo) != -1);
540 _key = shminfo.shm_perm.__key;
541 _size = shminfo.shm_segsz;
542 _flags = shminfo.shm_perm.mode;
543 }
544 JTRACE("New Shm Segment") (_key) (_size) (_flags) (_id) (_isCkptLeader);
545 }
546
547 void ShmSegment::on_shmat(const void *shmaddr, int shmflg)
548 {
549 _shmaddrToFlag[shmaddr] = shmflg;
550 }
551
552 void ShmSegment::on_shmdt(const void *shmaddr)
553 {
554 JASSERT(isValidShmaddr(shmaddr));
555 _shmaddrToFlag.erase((void*)shmaddr);
556
557 // TODO: If num-attached == 0; and marked for deletion, remove this segment
558 }
559
560 bool ShmSegment::isValidShmaddr(const void* shmaddr)
561 {
562 return _shmaddrToFlag.find((void*)shmaddr) != _shmaddrToFlag.end();
563 }
564
565 bool ShmSegment::isStale()
566 {
567 struct shmid_ds shminfo;
568 int ret = _real_shmctl(_realId, IPC_STAT, &shminfo);
569 if (ret == -1) {
570 JASSERT (errno == EIDRM || errno == EINVAL);
571 JASSERT(_shmaddrToFlag.empty());
572 return true;
573 }
574 _nattch = shminfo.shm_nattch;
575 _mode = shminfo.shm_perm.mode;
576 return false;
577 }
578
579 void ShmSegment::leaderElection()
580 {
581 /* We attach and detach to the shmid object to set the shm_lpid to our pid.
582 * The process who calls the last shmdt() is declared the leader.
583 */
584 void *addr = _real_shmat(_realId, NULL, 0);
585 JASSERT(addr != (void*) -1) (_id) (JASSERT_ERRNO)
586 .Text("_real_shmat() failed");
587
588 JASSERT(_real_shmdt(addr) == 0) (_id) (addr) (JASSERT_ERRNO);
589 }
590
591 void ShmSegment::preCkptDrain()
592 {
593 struct shmid_ds info;
594 JASSERT(_real_shmctl(_realId, IPC_STAT, &info) != -1);
595
596 /* If we are the ckptLeader for this object, map it now, if not mapped already.
597 */
598 _dmtcpMappedAddr = false;
599 _isCkptLeader = false;
600
601 if (info.shm_lpid == getpid()) {
602 _isCkptLeader = true;
603 if (_shmaddrToFlag.size() == 0) {
604 void *addr = _real_shmat(_realId, NULL, 0);
605 JASSERT(addr != (void*) -1);
606 _shmaddrToFlag[addr] = 0;
607 _dmtcpMappedAddr = true;
608 }
609 }
610 }
611
612 void ShmSegment::preCheckpoint()
613 {
614 ShmaddrToFlagIter i = _shmaddrToFlag.begin();
615 /* If this process won the leader election, unmap all but the first memory
616 * segment, otherwise, unmap all the mappings of this memory-segment.
617 */
618 if (_isCkptLeader) {
619 ++i;
620 }
621 for (; i != _shmaddrToFlag.end(); ++i) {
622 JTRACE("Unmapping shared memory segment") (_id)(i->first);
623 JASSERT(_real_shmdt(i->first) == 0);
624
625 // We need to unmap the duplicate shared memory segments to optimize ckpt
626 // image size. But we will remap it with zero pages that have no rwx
627 // permission, to stop the kernel from assigning these memory addresses for
628 // future mmap calls, since we will be re-mapping it during post-ckpt.
629 JASSERT(mmap((void*) i->first, _size,
630 PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED,
631 0, 0) == i->first);
632 }
633 }
634
635 void ShmSegment::postRestart()
636 {
637 if (!_isCkptLeader) return;
638
639 _realId = _real_shmget(_key, _size, _flags);
640 JASSERT(_realId != -1);
641 SysVShm::instance().updateMapping(_id, _realId);
642
643 // Re-map first address for owner on restart
644 JASSERT(_isCkptLeader);
645 ShmaddrToFlagIter i = _shmaddrToFlag.begin();
646 void *tmpaddr = _real_shmat(_realId, NULL, 0);
647 JASSERT(tmpaddr != (void*) -1) (_realId)(JASSERT_ERRNO);
648 huge_memcpy((char*) tmpaddr, (char*) i->first, _size);
649 JASSERT(_real_shmdt(tmpaddr) == 0);
650 munmap((void*)i->first, _size);
651
652 if (!_dmtcpMappedAddr) {
653 JASSERT (_real_shmat(_realId, i->first, i->second) != (void *) -1)
654 (JASSERT_ERRNO) (_realId) (_id) (_isCkptLeader)
655 (i->first) (i->second) (getpid())
656 .Text ("Error remapping shared memory segment on restart");
657 }
658 JTRACE("Remapping shared memory segment to original address") (_id) (_realId);
659 }
660
661 void ShmSegment::refill(bool isRestart)
662 {
663 if (!isRestart || _isCkptLeader) return;
664
665 // Update _realId;
666 _realId = VIRTUAL_TO_REAL_SHM_ID(_id);
667 }
668
669 void ShmSegment::preResume()
670 {
671 // Re-map all remaining addresses
672 ShmaddrToFlagIter i = _shmaddrToFlag.begin();
673
674 if (_isCkptLeader && i != _shmaddrToFlag.end()) {
675 i++;
676 }
677
678 for (; i != _shmaddrToFlag.end(); ++i) {
679 // Unmap the reserved area.
680 JASSERT(munmap((void*) i->first, _size) == 0);
681
682 JTRACE("Remapping shared memory segment")(_realId);
683 JASSERT (_real_shmat(_realId, i->first, i->second) != (void *) -1)
684 (JASSERT_ERRNO) (_realId) (_id) (_isCkptLeader)
685 (i->first) (i->second) (getpid())
686 .Text ("Error remapping shared memory segment");
687 }
688 // TODO: During Ckpt-resume, if the shm object was mapped by dmtcp
689 // (_dmtcpMappedAddr == true), then we should call shmdt() on it.
690 }
691
692 /******************************************************************************
693 *
694 * Semaphore Methods
695 *
696 *****************************************************************************/
697
698 Semaphore::Semaphore(int semid, int realSemid, key_t key, int nsems, int semflg)
699 : SysVObj(semid, realSemid, key, semflg)
700 {
701 _nsems = nsems;
702 if (key == -1) {
703 struct semid_ds buf;
704 union semun se;
705 se.buf = &buf;
706 JASSERT(_real_semctl(realSemid, 0, IPC_STAT, se) != -1) (JASSERT_ERRNO);
707 _key = se.buf->sem_perm.__key;
708 _nsems = se.buf->sem_nsems;
709 _flags = se.buf->sem_perm.mode;
710 }
711 _semval = (unsigned short*) JALLOC_HELPER_MALLOC(_nsems * sizeof(unsigned short));
712 _semadj = (int*) JALLOC_HELPER_MALLOC(_nsems * sizeof(int));
713 for (int i = 0; i < _nsems; i++) {
714 _semval[i] = 0;
715 _semadj[i] = 0;
716 }
717 JTRACE("New Semaphore Segment")
718 (_key) (_nsems) (_flags) (_id) (_isCkptLeader);
719 }
720
721 void Semaphore::on_semop(struct sembuf *sops, unsigned nsops)
722 {
723 for (unsigned i = 0; i < nsops; i++) {
724 int sem_num = sops[i].sem_num;
725 _semadj[sem_num] -= sops[i].sem_op;
726 }
727 }
728
729 bool Semaphore::isStale()
730 {
731 int ret = _real_semctl(_realId, 0, GETPID);
732 if (ret == -1) {
733 JASSERT (errno == EIDRM || errno == EINVAL);
734 return true;
735 }
736 return false;
737 }
738
739 void Semaphore::resetOnFork()
740 {
741 for (int i = 0; i < _nsems; i++) {
742 _semadj[i] = 0;
743 }
744 }
745
746 void Semaphore::leaderElection()
747 {
748 JASSERT(_realId != -1);
749 /* Every process increments and decrements the semaphore value by 1 in order
750 * to update the sempid value. The process who performs the last semop is
751 * elected the leader.
752 */
753 struct sembuf sops;
754 sops.sem_num = 0;
755 sops.sem_op = 1;
756 sops.sem_flg = 0;
757 int ret = _real_semtimedop(_realId, &sops, 1, NULL);
758 if (ret == 0) {
759 sops.sem_num = 0;
760 sops.sem_op = -1;
761 sops.sem_flg = 0;
762 JASSERT(_real_semtimedop(_realId, &sops, 1, NULL) == 0) (JASSERT_ERRNO) (_id);
763 }
764 }
765
766 void Semaphore::preCkptDrain()
767 {
768 _isCkptLeader = false;
769 if (getpid() == _real_semctl(_realId, 0, GETPID)) {
770 union semun info;
771 info.array = _semval;
772 JASSERT(_real_semctl(_realId, 0, GETALL, info) != -1);
773 _isCkptLeader = true;
774 }
775 }
776
777 void Semaphore::preCheckpoint()
778 {
779 }
780
781 void Semaphore::postRestart()
782 {
783 if (_isCkptLeader) {
784 _realId = _real_semget(_key, _nsems, _flags);
785 JASSERT(_realId != -1) (JASSERT_ERRNO);
786 SysVSem::instance().updateMapping(_id, _realId);
787
788 union semun info;
789 info.array = _semval;
790 JASSERT(_real_semctl(_realId, 0, SETALL, info) != -1);
791 }
792 }
793
794 void Semaphore::refill(bool isRestart)
795 {
796 if (!isRestart) return;
797 /* Update the semadj value for this process.
798 * The way we do it is by calling semop twice as follows:
799 * semop(id, {semid, abs(semadj-value), flag1}*, nsems)
800 * semop(id, {semid, -abs(semadj-value), flag2}*, nsems)
801 *
802 * where: flag1 = semadj-value > 0 ? 0 : SEM_UNDO
803 * flag2 = semadj-value < 0 ? SEM_UNDO : 0
804 */
805 struct sembuf sops;
806 _realId = VIRTUAL_TO_REAL_SEM_ID(_id);
807 JASSERT(_realId != -1);
808 for (int i = 0; i < _nsems; i++) {
809 if (_semadj[i] == 0) continue;
810 sops.sem_num = i;
811 sops.sem_op = abs(_semadj[i]);
812 sops.sem_flg = _semadj[i] > 0 ? 0 : SEM_UNDO;
813 JASSERT(_real_semop(_realId, &sops, 1) == 0);
814
815 sops.sem_op = - abs(_semadj[i]);
816 sops.sem_flg = _semadj[i] < 0 ? SEM_UNDO : 0;
817 JASSERT(_real_semop(_realId, &sops, 1) == 0);
818 }
819 }
820 /******************************************************************************
821 *
822 * MsgQueue Methods
823 *
824 *****************************************************************************/
825
826 MsgQueue::MsgQueue(int msqid, int realMsqid, key_t key, int msgflg)
827 : SysVObj(msqid, realMsqid, key, msgflg)
828 {
829 if (key == -1) {
830 struct msqid_ds buf;
831 JASSERT(_real_msgctl(realMsqid, IPC_STAT, &buf) == 0) (_id) (JASSERT_ERRNO);
832 _key = buf.msg_perm.__key;
833 _flags = buf.msg_perm.mode;
834 }
835 JTRACE("New MsgQueue Created") (_key) (_flags) (_id);
836 }
837
838 bool MsgQueue::isStale()
839 {
840 struct msqid_ds buf;
841 int ret = _real_msgctl(_realId, IPC_STAT, &buf);
842 if (ret == -1) {
843 JASSERT (errno == EIDRM || errno == EINVAL);
844 return true;
845 }
846 return false;
847 }
848
849 void MsgQueue::leaderElection()
850 {
851 // Leader election is done in preCkptDrain(), here we just fetch the number
852 // of messages in the queue.
853 struct msqid_ds buf;
854 JASSERT(_real_msgctl(_realId, IPC_STAT, &buf) == 0) (_id) (JASSERT_ERRNO);
855
856 _qnum = buf.msg_qnum;
857 }
858
859 void MsgQueue::preCkptDrain()
860 {
861 // This is where we elect the leader
862 /* Every process send a message to the queue. Later on, these excess messages
863 * will be removed by the ckptLeader before the user threads are allowed to
864 * resume.
865 * The process whose pid matches the msg_lspid is the leader.
866 */
867 struct msgbuf msg;
868 msg.mtype = getpid();
869 JASSERT(_real_msgsnd(_realId, &msg, 0, IPC_NOWAIT) == 0) (_id) (JASSERT_ERRNO);
870 _isCkptLeader = false;
871 }
872
873 void MsgQueue::preCheckpoint()
874 {
875 struct msqid_ds buf;
876 memset(&buf, 0, sizeof buf);
877 JASSERT(_real_msgctl(_realId, IPC_STAT, &buf) == 0) (_id) (JASSERT_ERRNO);
878
879 if (buf.msg_lspid == getpid()) {
880 size_t size = buf.__msg_cbytes;
881 void *msgBuf = JALLOC_HELPER_MALLOC(size);
882 _isCkptLeader = true;
883 _msgInQueue.clear();
884 for (size_t i = 0; i < _qnum; i++) {
885 ssize_t numBytes = _real_msgrcv(_realId, msgBuf, size, 0, 0);
886 JASSERT(numBytes != -1) (_id) (JASSERT_ERRNO);
887 _msgInQueue.push_back(jalib::JBuffer((const char*)msgBuf,
888 numBytes + sizeof (long)));
889 }
890 JASSERT(_msgInQueue.size() == _qnum) (_qnum);
891 // Now remove all the messages that were sent during preCkptDrain phase.
892 while (_real_msgrcv(_realId, msgBuf, size, 0, IPC_NOWAIT) != -1);
893 JALLOC_HELPER_FREE(msgBuf);
894 } else {
895 }
896 }
897
898 void MsgQueue::postRestart()
899 {
900 if (_isCkptLeader) {
901 _realId = _real_msgget(_key, _flags);
902 JASSERT(_realId != -1) (JASSERT_ERRNO);
903 SysVMsq::instance().updateMapping(_id, _realId);
904 JASSERT(_msgInQueue.size() == _qnum) (_msgInQueue.size()) (_qnum);
905 }
906 }
907
908 void MsgQueue::refill(bool isRestart)
909 {
910 if (_isCkptLeader) {
911 struct msqid_ds buf;
912 JASSERT(_real_msgctl(_realId, IPC_STAT, &buf) == 0) (_id) (JASSERT_ERRNO);
913 if (isRestart) {
914 // Now remove all the messages that were sent during preCkptDrain phase.
915 size_t size = buf.__msg_cbytes;
916 void *msgBuf = JALLOC_HELPER_MALLOC(size);
917 while (_real_msgrcv(_realId, msgBuf, size, 0, IPC_NOWAIT) != -1);
918 JALLOC_HELPER_FREE(msgBuf);
919
920 } else {
921 JASSERT(buf.msg_qnum == 0);
922 }
923
924 for (size_t i = 0; i < _qnum; i++) {
925 JASSERT(_real_msgsnd(_realId, _msgInQueue[i].buffer(),
926 _msgInQueue[i].size(), IPC_NOWAIT) == 0);
927 }
928 }
929 _msgInQueue.clear();
930 _qnum = 0;
931 }