root/plugin/svipc/sysvipc.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. dmtcp_event_hook
  2. _do_lock_tbl
  3. _do_unlock_tbl
  4. huge_memcpy
  5. instance
  6. instance
  7. instance
  8. _type
  9. removeStaleObjects
  10. resetOnFork
  11. leaderElection
  12. preCkptDrain
  13. preCheckpoint
  14. preResume
  15. refill
  16. postRestart
  17. virtualToRealId
  18. realToVirtualId
  19. updateMapping
  20. getNewVirtualId
  21. serialize
  22. on_shmget
  23. on_shmat
  24. on_shmdt
  25. shmaddrToShmid
  26. on_semget
  27. on_semctl
  28. on_semop
  29. on_msgget
  30. on_msgctl
  31. on_msgsnd
  32. on_msgrcv
  33. on_shmat
  34. on_shmdt
  35. isValidShmaddr
  36. isStale
  37. leaderElection
  38. preCkptDrain
  39. preCheckpoint
  40. postRestart
  41. refill
  42. preResume
  43. on_semop
  44. isStale
  45. resetOnFork
  46. leaderElection
  47. preCkptDrain
  48. preCheckpoint
  49. postRestart
  50. refill
  51. isStale
  52. leaderElection
  53. preCkptDrain
  54. preCheckpoint
  55. postRestart
  56. refill

   1 /****************************************************************************
   2  *   Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
   3  *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
   4  *                                                                          *
   5  *   This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src).  *
   6  *                                                                          *
   7  *  DMTCP:dmtcp/src is free software: you can redistribute it and/or        *
   8  *  modify it under the terms of the GNU Lesser General Public License as   *
   9  *  published by the Free Software Foundation, either version 3 of the      *
  10  *  License, or (at your option) any later version.                         *
  11  *                                                                          *
  12  *  DMTCP:dmtcp/src is distributed in the hope that it will be useful,      *
  13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
  14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
  15  *  GNU Lesser General Public License for more details.                     *
  16  *                                                                          *
  17  *  You should have received a copy of the GNU Lesser General Public        *
  18  *  License along with DMTCP:dmtcp/src.  If not, see                        *
  19  *  <http://www.gnu.org/licenses/>.                                         *
  20  ****************************************************************************/
  21 
  22 #include <sys/ioctl.h>
  23 #include <unistd.h>
  24 #include <sys/file.h>
  25 #include <sys/mman.h>
  26 #include <sys/sem.h>
  27 #include <iostream>
  28 #include <iostream>
  29 #include <ios>
  30 #include <fstream>
  31 
  32 #include "util.h"
  33 #include "dmtcp.h"
  34 #include "shareddata.h"
  35 #include "jassert.h"
  36 #include "jfilesystem.h"
  37 #include "jconvert.h"
  38 #include "jserialize.h"
  39 
  40 #include "sysvipc.h"
  41 #include "sysvipcwrappers.h"
  42 
  43 using namespace dmtcp;
  44 
  45 // FIXME: Check and verify the correctness of SEM_UNDO logic for Semaphores.
  46 
  47 /*
  48  * Algorithm for properly checkpointing shared memory segments.
  49  *  1. BARRIER -- SUSPENDED
  50  *  2. Call shmat() and shmdt() for each shm-object. This way the last process
  51  *     to call shmdt() is elected as the ckptLeader.
  52  *  3. BARRIER -- LOCKED
  53  *  4. Each process marks itself as ckptLeader if it was elected ckptLeader.
  54  *     If the ckptLeader doesn't have the shm object mapped, map it now.
  55  *  6. BARRIER -- DRAINED
  56  *  7. For each shm-object, the ckptLeader unmaps all-but-first shmat() address.
  57  *  8. Non ckptLeader processes unmap all shmat() addresses corresponding to
  58  *     the shm-object.
  59  *  9. BARRIER -- CHECKPOINTED
  60  * 10. At this point, the contents of the memory-segment have been saved.
  61  * 11. BARRIER -- REFILLED
  62  * 12. Re-map the memory-segment into each process's memory as it existed prior
  63  *     to checkpoint.
  64  * 13. TODO: Unmap the memory that was mapped in step 4.
  65  * 14. BARRIER -- RESUME
  66  *
  67  * Steps involved in Restart
  68  *  0. BARRIER -- RESTARTING
  69  *  1. Restore process memory
  70  *  2. Insert original-shmids into a node-wide shared file so that other
  71  *     processes can know about all the existing shmids in order to avoid
  72  *     future conflicts.
  73  *  3. BARRIER -- CHECKPOINTED
  74  *  4. Read all original-shmids from the file
  75  *  5. Re-create shared-memory segments which were checkpointed by this process.
  76  *  6. Remap the shm-segment to a temp addr and copy the checkpointed contents
  77  *     to this address. Now unmap the area where the checkpointed contents were
  78  *     stored and map the shm-segment on that address. Unmap the temp addr now.
  79  *     Remap the shm-segment to the original location.
  80  *  7. Write original->current mappings for all shmids which we got from
  81  *     shmget() in previous step.
  82  *  8. BARRIER -- REFILLED
  83  *  9. Re-map the memory-segment into each process's memory as it existed prior
  84  *     to checkpoint.
  85  * 10. BARRIER -- RESUME
  86  */
  87 
  88 /* TODO: Handle the case when the segment is marked for removal at ckpt time.
  89  */
  90 
  91 static pthread_mutex_t tblLock = PTHREAD_MUTEX_INITIALIZER;
  92 
  93 extern "C" void dmtcp_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
  94 {
  95   switch (event) {
  96     case DMTCP_EVENT_ATFORK_CHILD:
  97       SysVShm::instance().resetOnFork();
  98       SysVSem::instance().resetOnFork();
  99       SysVMsq::instance().resetOnFork();
 100       break;
 101     case DMTCP_EVENT_WRITE_CKPT:
 102       SysVShm::instance().preCheckpoint();
 103       SysVSem::instance().preCheckpoint();
 104       SysVMsq::instance().preCheckpoint();
 105       break;
 106 
 107     case DMTCP_EVENT_LEADER_ELECTION:
 108       SysVShm::instance().leaderElection();
 109       SysVSem::instance().leaderElection();
 110       SysVMsq::instance().leaderElection();
 111       break;
 112 
 113     case DMTCP_EVENT_DRAIN:
 114       SysVShm::instance().preCkptDrain();
 115       SysVSem::instance().preCkptDrain();
 116       SysVMsq::instance().preCkptDrain();
 117       break;
 118 
 119     case DMTCP_EVENT_REFILL:
 120       SysVShm::instance().refill(data->refillInfo.isRestart);
 121       SysVSem::instance().refill(data->refillInfo.isRestart);
 122       SysVMsq::instance().refill(data->refillInfo.isRestart);
 123       break;
 124 
 125     case DMTCP_EVENT_THREADS_RESUME:
 126       SysVShm::instance().preResume();
 127       SysVSem::instance().preResume();
 128       SysVMsq::instance().preResume();
 129       break;
 130 
 131     case DMTCP_EVENT_PRE_EXEC:
 132       {
 133         jalib::JBinarySerializeWriterRaw wr("", data->serializerInfo.fd);
 134         SysVShm::instance().serialize(wr);
 135         SysVSem::instance().serialize(wr);
 136         SysVMsq::instance().serialize(wr);
 137       }
 138       break;
 139 
 140     case DMTCP_EVENT_POST_EXEC:
 141       {
 142         jalib::JBinarySerializeReaderRaw rd("", data->serializerInfo.fd);
 143         SysVShm::instance().serialize(rd);
 144         SysVSem::instance().serialize(rd);
 145         SysVMsq::instance().serialize(rd);
 146       }
 147       break;
 148 
 149     case DMTCP_EVENT_RESTART:
 150       SysVShm::instance().postRestart();
 151       SysVSem::instance().postRestart();
 152       SysVMsq::instance().postRestart();
 153       break;
 154 
 155     default:
 156       break;
 157   }
 158   DMTCP_NEXT_EVENT_HOOK(event, data);
 159 }
 160 
 161 static void _do_lock_tbl()
 162 {
 163   JASSERT(_real_pthread_mutex_lock(&tblLock) == 0) (JASSERT_ERRNO);
 164 }
 165 
 166 static void _do_unlock_tbl()
 167 {
 168   JASSERT(_real_pthread_mutex_unlock(&tblLock) == 0) (JASSERT_ERRNO);
 169 }
 170 
 171 static void huge_memcpy(char *dest, char *src, size_t size)
 172 {
 173   if (size < 100 * 1024 * 1024) {
 174     memcpy(dest, src, size);
 175     return;
 176   }
 177   const size_t hundredMB = (100 * 1024 * 1024);
 178   //const size_t oneGB = (1024 * 1024 * 1024);
 179   size_t chunkSize = hundredMB;
 180   static long page_size = sysconf(_SC_PAGESIZE);
 181   static long pagesPerChunk = chunkSize / page_size;
 182   size_t n = size / chunkSize;
 183   for (size_t i = 0; i < n; i++) {
 184     if (!Util::areZeroPages(src, pagesPerChunk)) {
 185       memcpy(dest, src, chunkSize);
 186     }
 187     madvise(src, chunkSize, MADV_DONTNEED);
 188     dest += chunkSize;
 189     src += chunkSize;
 190     size -= chunkSize;
 191   }
 192   memcpy(dest, src, size);
 193 }
 194 
 195 static SysVShm *sysvShmInst = NULL;
 196 static SysVSem *sysvSemInst = NULL;
 197 static SysVMsq *sysvMsqInst = NULL;
 198 SysVShm& SysVShm::instance()
 199 {
 200   if (sysvShmInst == NULL) {
 201     sysvShmInst = new SysVShm();
 202   }
 203   return *sysvShmInst;
 204 }
 205 
 206 SysVSem& SysVSem::instance()
 207 {
 208   if (sysvSemInst == NULL) {
 209     sysvSemInst = new SysVSem();
 210   }
 211   return *sysvSemInst;
 212 }
 213 
 214 SysVMsq& SysVMsq::instance()
 215 {
 216   if (sysvMsqInst == NULL) {
 217     sysvMsqInst = new SysVMsq();
 218   }
 219   return *sysvMsqInst;
 220 }
 221 
 222 /******************************************************************************
 223  *
 224  * SysVIPC Parent Class
 225  *
 226  *****************************************************************************/
 227 
 228 SysVIPC::SysVIPC(const char *str, int32_t id, int type)
 229   : _virtIdTable(str, id),
 230     _type(type)
 231 
 232 {
 233   _do_lock_tbl();
 234   _map.clear();
 235   _do_unlock_tbl();
 236 }
 237 
 238 void SysVIPC::removeStaleObjects()
 239 {
 240   _do_lock_tbl();
 241   vector<int> staleIds;
 242   for (Iterator i = _map.begin(); i != _map.end(); ++i) {
 243     SysVObj* obj = i->second;
 244     if (obj->isStale()) {
 245       staleIds.push_back(i->first);
 246     }
 247   }
 248   for (size_t j = 0; j < staleIds.size(); ++j) {
 249     delete _map[staleIds[j]];
 250     _map.erase(staleIds[j]);
 251     _virtIdTable.erase(staleIds[j]);
 252   }
 253   _do_unlock_tbl();
 254 }
 255 
 256 void SysVIPC::resetOnFork()
 257 {
 258   _virtIdTable.resetOnFork(getpid());
 259   for (Iterator i = _map.begin(); i != _map.end(); ++i) {
 260     i->second->resetOnFork();
 261   }
 262 }
 263 
 264 void SysVIPC::leaderElection()
 265 {
 266   /* Remove all invalid/removed shm segments*/
 267   removeStaleObjects();
 268 
 269   for (Iterator i = _map.begin(); i != _map.end(); ++i) {
 270     i->second->leaderElection();
 271   }
 272 }
 273 
 274 void SysVIPC::preCkptDrain()
 275 {
 276   for (Iterator i = _map.begin(); i != _map.end(); ++i) {
 277     i->second->preCkptDrain();
 278   }
 279 }
 280 
 281 void SysVIPC::preCheckpoint()
 282 {
 283   for (Iterator i = _map.begin(); i != _map.end(); ++i) {
 284     i->second->preCheckpoint();
 285   }
 286 }
 287 
 288 void SysVIPC::preResume()
 289 {
 290   for (Iterator i = _map.begin(); i != _map.end(); ++i) {
 291     i->second->preResume();
 292   }
 293 }
 294 
 295 void SysVIPC::refill(bool isRestart)
 296 {
 297   if (!isRestart) return;
 298 
 299   for (Iterator i = _map.begin(); i != _map.end(); ++i) {
 300     i->second->refill(isRestart);
 301   }
 302 }
 303 
 304 void SysVIPC::postRestart()
 305 {
 306   _virtIdTable.clear();
 307 
 308   for (Iterator i = _map.begin(); i != _map.end(); ++i) {
 309     i->second->postRestart();
 310   }
 311 }
 312 
 313 int SysVIPC::virtualToRealId(int virtId)
 314 {
 315   if (_virtIdTable.virtualIdExists(virtId)) {
 316     return _virtIdTable.virtualToReal(virtId);
 317   } else {
 318     int realId = SharedData::getRealIPCId(_type, virtId);
 319     _virtIdTable.updateMapping(virtId, realId);
 320     return realId;
 321   }
 322 }
 323 int SysVIPC::realToVirtualId(int realId)
 324 {
 325   if (_virtIdTable.realIdExists(realId)) {
 326     return _virtIdTable.realToVirtual(realId);
 327   } else {
 328     return -1;
 329   }
 330 }
 331 void SysVIPC::updateMapping(int virtId, int realId)
 332 {
 333   _virtIdTable.updateMapping(virtId, realId);
 334   SharedData::setIPCIdMap(_type, virtId, realId);
 335 }
 336 
 337 int SysVIPC::getNewVirtualId()
 338 {
 339   int32_t id;
 340   JASSERT(_virtIdTable.getNewVirtualId(&id)) (_virtIdTable.size())
 341     .Text("Exceeded maximum number of Sys V objects allowed");
 342 
 343   return id;
 344 }
 345 
 346 void SysVIPC::serialize(jalib::JBinarySerializer& o)
 347 {
 348   _virtIdTable.serialize(o);
 349 }
 350 /******************************************************************************
 351  *
 352  * SysVIPC Subclasses
 353  *
 354  *****************************************************************************/
 355 
 356 /*
 357  * Shared Memory
 358  */
 359 void SysVShm::on_shmget(int shmid, key_t key, size_t size, int shmflg)
 360 {
 361   _do_lock_tbl();
 362   if (!_virtIdTable.realIdExists(shmid)) {
 363     JASSERT(_map.find(shmid) == _map.end());
 364     int virtId = getNewVirtualId();
 365     JTRACE ("Shmid not found in table. Creating new entry")
 366       (shmid) (virtId);
 367     updateMapping(virtId, shmid);
 368     _map[virtId] = new ShmSegment(virtId, shmid, key, size, shmflg);
 369   } else {
 370     JASSERT(_map.find(shmid) != _map.end());
 371   }
 372   _do_unlock_tbl();
 373 }
 374 
 375 void SysVShm::on_shmat(int shmid, const void *shmaddr, int shmflg,
 376                        void* newaddr)
 377 {
 378   _do_lock_tbl();
 379   if (!_virtIdTable.virtualIdExists(shmid)) {
 380     int realId = SharedData::getRealIPCId(_type, shmid);
 381     updateMapping(shmid, realId);
 382   }
 383   if (_map.find(shmid) == _map.end()) {
 384     int realId = VIRTUAL_TO_REAL_SHM_ID(shmid);
 385     _map[shmid] = new ShmSegment(shmid, realId, -1, -1, -1);
 386   }
 387 
 388   JASSERT(shmaddr == NULL || shmaddr == newaddr);
 389   ((ShmSegment*)_map[shmid])->on_shmat(newaddr, shmflg);
 390   _do_unlock_tbl();
 391 }
 392 
 393 void SysVShm::on_shmdt(const void *shmaddr)
 394 {
 395   int shmid = shmaddrToShmid(shmaddr);
 396   JASSERT(shmid != -1) (shmaddr)
 397     .Text("No corresponding shmid found for given shmaddr");
 398   _do_lock_tbl();
 399   ((ShmSegment*)_map[shmid])->on_shmdt(shmaddr);
 400   if (_map[shmid]->isStale()) {
 401     _map.erase(shmid);
 402   }
 403   _do_unlock_tbl();
 404 }
 405 
 406 int SysVShm::shmaddrToShmid(const void* shmaddr)
 407 {
 408   DMTCP_PLUGIN_DISABLE_CKPT();
 409   int shmid = -1;
 410   _do_lock_tbl();
 411   for (Iterator i = _map.begin(); i != _map.end(); ++i) {
 412     ShmSegment* shmObj = (ShmSegment*)i->second;
 413     if (shmObj->isValidShmaddr(shmaddr)) {
 414       shmid = i->first;
 415       break;
 416     }
 417   }
 418   _do_unlock_tbl();
 419   DMTCP_PLUGIN_ENABLE_CKPT();
 420   return shmid;
 421 }
 422 
 423 /*
 424  * Semaphore
 425  */
 426 void SysVSem::on_semget(int semid, key_t key, int nsems, int semflg)
 427 {
 428   _do_lock_tbl();
 429   if (!_virtIdTable.realIdExists(semid)) {
 430     //JASSERT(key == IPC_PRIVATE || (semflg & IPC_CREAT) != 0) (key) (semid);
 431     JASSERT(_map.find(semid) == _map.end());
 432     JTRACE ("Semid not found in table. Creating new entry") (semid);
 433     int virtId = getNewVirtualId();
 434     updateMapping(virtId, semid);
 435     _map[virtId] = new Semaphore (virtId, semid, key, nsems, semflg);
 436   } else {
 437     JASSERT(_map.find(semid) != _map.end());
 438   }
 439   _do_unlock_tbl();
 440 }
 441 
 442 void SysVSem::on_semctl(int semid, int semnum, int cmd, union semun arg)
 443 {
 444   _do_lock_tbl();
 445   if (cmd == IPC_RMID && _virtIdTable.virtualIdExists(semid)) {
 446     JASSERT(_map[semid]->isStale()) (semid);
 447     _map.erase(semid);
 448   }
 449   _do_unlock_tbl();
 450   return;
 451 }
 452 
 453 void SysVSem::on_semop(int semid, struct sembuf *sops, unsigned nsops)
 454 {
 455   _do_lock_tbl();
 456   if (!_virtIdTable.virtualIdExists(semid)) {
 457     int realId = SharedData::getRealIPCId(_type, semid);
 458     updateMapping(semid, realId);
 459   }
 460   if (_map.find(semid) == _map.end()) {
 461     int realId = VIRTUAL_TO_REAL_SEM_ID(semid);
 462     _map[semid] = new Semaphore(semid, realId, -1, -1, -1);
 463   }
 464   ((Semaphore*)_map[semid])->on_semop(sops, nsops);
 465   _do_unlock_tbl();
 466 }
 467 
 468 /*
 469  * Message Queue
 470  */
 471 void SysVMsq::on_msgget(int msqid, key_t key, int msgflg)
 472 {
 473   _do_lock_tbl();
 474   if (!_virtIdTable.realIdExists(msqid)) {
 475     JASSERT(_map.find(msqid) == _map.end());
 476     JTRACE ("Msqid not found in table. Creating new entry") (msqid);
 477     int virtId = getNewVirtualId();
 478     updateMapping(virtId, msqid);
 479     _map[virtId] = new MsgQueue (virtId, msqid, key, msgflg);
 480   } else {
 481     JASSERT(_map.find(msqid) != _map.end());
 482   }
 483   _do_unlock_tbl();
 484 }
 485 
 486 void SysVMsq::on_msgctl(int msqid, int cmd, struct msqid_ds *buf)
 487 {
 488   _do_lock_tbl();
 489   if (cmd == IPC_RMID && _virtIdTable.virtualIdExists(msqid)) {
 490     JASSERT(_map[msqid]->isStale()) (msqid);
 491     _map.erase(msqid);
 492   }
 493   _do_unlock_tbl();
 494   return;
 495 }
 496 
 497 void SysVMsq::on_msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)
 498 {
 499   _do_lock_tbl();
 500   if (!_virtIdTable.virtualIdExists(msqid)) {
 501     int realId = SharedData::getRealIPCId(_type, msqid);
 502     updateMapping(msqid, realId);
 503   }
 504   if (_map.find(msqid) == _map.end()) {
 505     int realId = VIRTUAL_TO_REAL_MSQ_ID(msqid);
 506     _map[msqid] = new MsgQueue(msqid, realId, -1, -1);
 507   }
 508   _do_unlock_tbl();
 509 }
 510 
 511 void SysVMsq::on_msgrcv(int msqid, const void *msgp, size_t msgsz,
 512                         int msgtyp, int msgflg)
 513 {
 514   _do_lock_tbl();
 515   if (!_virtIdTable.virtualIdExists(msqid)) {
 516     int realId = SharedData::getRealIPCId(_type, msqid);
 517     updateMapping(msqid, realId);
 518   }
 519   if (_map.find(msqid) == _map.end()) {
 520     int realId = VIRTUAL_TO_REAL_MSQ_ID(msqid);
 521     _map[msqid] = new MsgQueue(msqid, realId, -1, -1);
 522   }
 523   _do_unlock_tbl();
 524 }
 525 
 526 /******************************************************************************
 527  *
 528  * ShmSegment Methods
 529  *
 530  *****************************************************************************/
 531 
 532 ShmSegment::ShmSegment(int shmid, int realShmid, key_t key, size_t size,
 533                        int shmflg)
 534   : SysVObj(shmid, realShmid, key, shmflg)
 535 {
 536   _size = size;
 537   if (key == -1 || size == 0) {
 538     struct shmid_ds shminfo;
 539     JASSERT(_real_shmctl(_realId, IPC_STAT, &shminfo) != -1);
 540     _key = shminfo.shm_perm.__key;
 541     _size = shminfo.shm_segsz;
 542     _flags = shminfo.shm_perm.mode;
 543   }
 544   JTRACE("New Shm Segment") (_key) (_size) (_flags) (_id) (_isCkptLeader);
 545 }
 546 
 547 void ShmSegment::on_shmat(const void *shmaddr, int shmflg)
 548 {
 549   _shmaddrToFlag[shmaddr] = shmflg;
 550 }
 551 
 552 void ShmSegment::on_shmdt(const void *shmaddr)
 553 {
 554   JASSERT(isValidShmaddr(shmaddr));
 555   _shmaddrToFlag.erase((void*)shmaddr);
 556 
 557   // TODO: If num-attached == 0; and marked for deletion, remove this segment
 558 }
 559 
 560 bool ShmSegment::isValidShmaddr(const void* shmaddr)
 561 {
 562   return _shmaddrToFlag.find((void*)shmaddr) != _shmaddrToFlag.end();
 563 }
 564 
 565 bool ShmSegment::isStale()
 566 {
 567   struct shmid_ds shminfo;
 568   int ret = _real_shmctl(_realId, IPC_STAT, &shminfo);
 569   if (ret == -1) {
 570     JASSERT (errno == EIDRM || errno == EINVAL);
 571     JASSERT(_shmaddrToFlag.empty());
 572     return true;
 573   }
 574   _nattch = shminfo.shm_nattch;
 575   _mode = shminfo.shm_perm.mode;
 576   return false;
 577 }
 578 
 579 void ShmSegment::leaderElection()
 580 {
 581   /* We attach and detach to the shmid object to set the shm_lpid to our pid.
 582    * The process who calls the last shmdt() is declared the leader.
 583    */
 584   void *addr = _real_shmat(_realId, NULL, 0);
 585   JASSERT(addr != (void*) -1) (_id) (JASSERT_ERRNO)
 586     .Text("_real_shmat() failed");
 587 
 588   JASSERT(_real_shmdt(addr) == 0) (_id) (addr) (JASSERT_ERRNO);
 589 }
 590 
 591 void ShmSegment::preCkptDrain()
 592 {
 593   struct shmid_ds info;
 594   JASSERT(_real_shmctl(_realId, IPC_STAT, &info) != -1);
 595 
 596   /* If we are the ckptLeader for this object, map it now, if not mapped already.
 597    */
 598   _dmtcpMappedAddr = false;
 599   _isCkptLeader = false;
 600 
 601   if (info.shm_lpid == getpid()) {
 602     _isCkptLeader = true;
 603     if (_shmaddrToFlag.size() == 0) {
 604       void *addr = _real_shmat(_realId, NULL, 0);
 605       JASSERT(addr != (void*) -1);
 606       _shmaddrToFlag[addr] = 0;
 607       _dmtcpMappedAddr = true;
 608     }
 609   }
 610 }
 611 
 612 void ShmSegment::preCheckpoint()
 613 {
 614   ShmaddrToFlagIter i = _shmaddrToFlag.begin();
 615   /* If this process won the leader election, unmap all but the first memory
 616    * segment, otherwise, unmap all the mappings of this memory-segment.
 617    */
 618   if (_isCkptLeader) {
 619     ++i;
 620   }
 621   for (; i != _shmaddrToFlag.end(); ++i) {
 622     JTRACE("Unmapping shared memory segment") (_id)(i->first);
 623     JASSERT(_real_shmdt(i->first) == 0);
 624 
 625     // We need to unmap the duplicate shared memory segments to optimize ckpt
 626     // image size. But we will remap it with zero pages that have no rwx
 627     // permission, to stop the kernel from assigning these memory addresses for
 628     // future mmap calls, since we will be re-mapping it during post-ckpt.
 629     JASSERT(mmap((void*) i->first, _size,
 630                  PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE | MAP_FIXED,
 631                  0, 0) == i->first);
 632   }
 633 }
 634 
 635 void ShmSegment::postRestart()
 636 {
 637   if (!_isCkptLeader) return;
 638 
 639   _realId = _real_shmget(_key, _size, _flags);
 640   JASSERT(_realId != -1);
 641   SysVShm::instance().updateMapping(_id, _realId);
 642 
 643   // Re-map first address for owner on restart
 644   JASSERT(_isCkptLeader);
 645   ShmaddrToFlagIter i = _shmaddrToFlag.begin();
 646   void *tmpaddr = _real_shmat(_realId, NULL, 0);
 647   JASSERT(tmpaddr != (void*) -1) (_realId)(JASSERT_ERRNO);
 648   huge_memcpy((char*) tmpaddr, (char*) i->first, _size);
 649   JASSERT(_real_shmdt(tmpaddr) == 0);
 650   munmap((void*)i->first, _size);
 651 
 652   if (!_dmtcpMappedAddr) {
 653     JASSERT (_real_shmat(_realId, i->first, i->second) != (void *) -1)
 654       (JASSERT_ERRNO) (_realId) (_id) (_isCkptLeader)
 655       (i->first) (i->second) (getpid())
 656       .Text ("Error remapping shared memory segment on restart");
 657   }
 658   JTRACE("Remapping shared memory segment to original address") (_id) (_realId);
 659 }
 660 
 661 void ShmSegment::refill(bool isRestart)
 662 {
 663   if (!isRestart || _isCkptLeader) return;
 664 
 665   // Update _realId;
 666   _realId = VIRTUAL_TO_REAL_SHM_ID(_id);
 667 }
 668 
 669 void ShmSegment::preResume()
 670 {
 671   // Re-map all remaining addresses
 672   ShmaddrToFlagIter i = _shmaddrToFlag.begin();
 673 
 674   if (_isCkptLeader && i != _shmaddrToFlag.end()) {
 675     i++;
 676   }
 677 
 678   for (; i != _shmaddrToFlag.end(); ++i) {
 679     // Unmap the reserved area.
 680     JASSERT(munmap((void*) i->first, _size) == 0);
 681 
 682     JTRACE("Remapping shared memory segment")(_realId);
 683     JASSERT (_real_shmat(_realId, i->first, i->second) != (void *) -1)
 684       (JASSERT_ERRNO) (_realId) (_id) (_isCkptLeader)
 685       (i->first) (i->second) (getpid())
 686       .Text ("Error remapping shared memory segment");
 687   }
 688   // TODO: During Ckpt-resume, if the shm object was mapped by dmtcp
 689   //       (_dmtcpMappedAddr == true), then we should call shmdt() on it.
 690 }
 691 
 692 /******************************************************************************
 693  *
 694  * Semaphore Methods
 695  *
 696  *****************************************************************************/
 697 
 698 Semaphore::Semaphore(int semid, int realSemid, key_t key, int nsems, int semflg)
 699   : SysVObj(semid, realSemid, key, semflg)
 700 {
 701   _nsems = nsems;
 702   if (key == -1) {
 703     struct semid_ds buf;
 704     union semun se;
 705     se.buf = &buf;
 706     JASSERT(_real_semctl(realSemid, 0, IPC_STAT, se) != -1) (JASSERT_ERRNO);
 707     _key = se.buf->sem_perm.__key;
 708     _nsems = se.buf->sem_nsems;
 709     _flags = se.buf->sem_perm.mode;
 710   }
 711   _semval = (unsigned short*) JALLOC_HELPER_MALLOC(_nsems * sizeof(unsigned short));
 712   _semadj = (int*) JALLOC_HELPER_MALLOC(_nsems * sizeof(int));
 713   for (int i = 0; i < _nsems; i++) {
 714     _semval[i] = 0;
 715     _semadj[i] = 0;
 716   }
 717   JTRACE("New Semaphore Segment")
 718     (_key) (_nsems) (_flags) (_id) (_isCkptLeader);
 719 }
 720 
 721 void Semaphore::on_semop(struct sembuf *sops, unsigned nsops)
 722 {
 723   for (unsigned i = 0; i < nsops; i++) {
 724     int sem_num = sops[i].sem_num;
 725     _semadj[sem_num] -= sops[i].sem_op;
 726   }
 727 }
 728 
 729 bool Semaphore::isStale()
 730 {
 731   int ret = _real_semctl(_realId, 0, GETPID);
 732   if (ret == -1) {
 733     JASSERT (errno == EIDRM || errno == EINVAL);
 734     return true;
 735   }
 736   return false;
 737 }
 738 
 739 void Semaphore::resetOnFork()
 740 {
 741   for (int i = 0; i < _nsems; i++) {
 742     _semadj[i] = 0;
 743   }
 744 }
 745 
 746 void Semaphore::leaderElection()
 747 {
 748   JASSERT(_realId != -1);
 749   /* Every process increments and decrements the semaphore value by 1 in order
 750    * to update the sempid value. The process who performs the last semop is
 751    * elected the leader.
 752    */
 753   struct sembuf sops;
 754   sops.sem_num = 0;
 755   sops.sem_op = 1;
 756   sops.sem_flg = 0;
 757   int ret = _real_semtimedop(_realId, &sops, 1, NULL);
 758   if (ret == 0) {
 759     sops.sem_num = 0;
 760     sops.sem_op = -1;
 761     sops.sem_flg = 0;
 762     JASSERT(_real_semtimedop(_realId, &sops, 1, NULL) == 0) (JASSERT_ERRNO) (_id);
 763   }
 764 }
 765 
 766 void Semaphore::preCkptDrain()
 767 {
 768   _isCkptLeader = false;
 769   if (getpid() == _real_semctl(_realId, 0, GETPID)) {
 770     union semun info;
 771     info.array = _semval;
 772     JASSERT(_real_semctl(_realId, 0, GETALL, info) != -1);
 773     _isCkptLeader = true;
 774   }
 775 }
 776 
 777 void Semaphore::preCheckpoint()
 778 {
 779 }
 780 
 781 void Semaphore::postRestart()
 782 {
 783   if (_isCkptLeader) {
 784     _realId = _real_semget(_key, _nsems, _flags);
 785     JASSERT(_realId != -1) (JASSERT_ERRNO);
 786     SysVSem::instance().updateMapping(_id, _realId);
 787 
 788     union semun info;
 789     info.array = _semval;
 790     JASSERT(_real_semctl(_realId, 0, SETALL, info) != -1);
 791   }
 792 }
 793 
 794 void Semaphore::refill(bool isRestart)
 795 {
 796   if (!isRestart) return;
 797   /* Update the semadj value for this process.
 798    * The way we do it is by calling semop twice as follows:
 799    * semop(id, {semid, abs(semadj-value), flag1}*, nsems)
 800    * semop(id, {semid, -abs(semadj-value), flag2}*, nsems)
 801    *
 802    * where: flag1 = semadj-value > 0 ? 0 : SEM_UNDO
 803    *        flag2 = semadj-value < 0 ? SEM_UNDO : 0
 804    */
 805   struct sembuf sops;
 806   _realId = VIRTUAL_TO_REAL_SEM_ID(_id);
 807   JASSERT(_realId != -1);
 808   for (int i = 0; i < _nsems; i++) {
 809     if (_semadj[i] == 0) continue;
 810     sops.sem_num = i;
 811     sops.sem_op = abs(_semadj[i]);
 812     sops.sem_flg = _semadj[i] > 0 ? 0 : SEM_UNDO;
 813     JASSERT(_real_semop(_realId, &sops, 1) == 0);
 814 
 815     sops.sem_op = - abs(_semadj[i]);
 816     sops.sem_flg = _semadj[i] < 0 ? SEM_UNDO : 0;
 817     JASSERT(_real_semop(_realId, &sops, 1) == 0);
 818   }
 819 }
 820 /******************************************************************************
 821  *
 822  * MsgQueue Methods
 823  *
 824  *****************************************************************************/
 825 
 826 MsgQueue::MsgQueue(int msqid, int realMsqid, key_t key, int msgflg)
 827   : SysVObj(msqid, realMsqid, key, msgflg)
 828 {
 829   if (key == -1) {
 830     struct msqid_ds buf;
 831     JASSERT(_real_msgctl(realMsqid, IPC_STAT, &buf) == 0) (_id) (JASSERT_ERRNO);
 832     _key = buf.msg_perm.__key;
 833     _flags = buf.msg_perm.mode;
 834   }
 835   JTRACE("New MsgQueue Created") (_key) (_flags) (_id);
 836 }
 837 
 838 bool MsgQueue::isStale()
 839 {
 840   struct msqid_ds buf;
 841   int ret = _real_msgctl(_realId, IPC_STAT, &buf);
 842   if (ret == -1) {
 843     JASSERT (errno == EIDRM || errno == EINVAL);
 844     return true;
 845   }
 846   return false;
 847 }
 848 
 849 void MsgQueue::leaderElection()
 850 {
 851   // Leader election is done in preCkptDrain(), here we just fetch the number
 852   // of messages in the queue.
 853   struct msqid_ds buf;
 854   JASSERT(_real_msgctl(_realId, IPC_STAT, &buf) == 0) (_id) (JASSERT_ERRNO);
 855 
 856   _qnum = buf.msg_qnum;
 857 }
 858 
 859 void MsgQueue::preCkptDrain()
 860 {
 861   // This is where we elect the leader
 862   /* Every process send a message to the queue. Later on, these excess messages
 863    * will be removed by the ckptLeader before the user threads are allowed to
 864    * resume.
 865    * The process whose pid matches the msg_lspid is the leader.
 866    */
 867   struct msgbuf msg;
 868   msg.mtype = getpid();
 869   JASSERT(_real_msgsnd(_realId, &msg, 0, IPC_NOWAIT) == 0) (_id) (JASSERT_ERRNO);
 870   _isCkptLeader = false;
 871 }
 872 
 873 void MsgQueue::preCheckpoint()
 874 {
 875   struct msqid_ds buf;
 876   memset(&buf, 0, sizeof buf);
 877   JASSERT(_real_msgctl(_realId, IPC_STAT, &buf) == 0) (_id) (JASSERT_ERRNO);
 878 
 879   if (buf.msg_lspid == getpid()) {
 880     size_t size = buf.__msg_cbytes;
 881     void *msgBuf = JALLOC_HELPER_MALLOC(size);
 882     _isCkptLeader = true;
 883     _msgInQueue.clear();
 884     for (size_t i = 0; i < _qnum; i++) {
 885       ssize_t numBytes = _real_msgrcv(_realId, msgBuf, size, 0, 0);
 886       JASSERT(numBytes != -1) (_id) (JASSERT_ERRNO);
 887       _msgInQueue.push_back(jalib::JBuffer((const char*)msgBuf,
 888                                            numBytes + sizeof (long)));
 889     }
 890     JASSERT(_msgInQueue.size() == _qnum) (_qnum);
 891     // Now remove all the messages that were sent during preCkptDrain phase.
 892     while (_real_msgrcv(_realId, msgBuf, size, 0, IPC_NOWAIT) != -1);
 893     JALLOC_HELPER_FREE(msgBuf);
 894   } else {
 895   }
 896 }
 897 
 898 void MsgQueue::postRestart()
 899 {
 900   if (_isCkptLeader) {
 901     _realId = _real_msgget(_key, _flags);
 902     JASSERT(_realId != -1) (JASSERT_ERRNO);
 903     SysVMsq::instance().updateMapping(_id, _realId);
 904     JASSERT(_msgInQueue.size() == _qnum) (_msgInQueue.size()) (_qnum);
 905   }
 906 }
 907 
 908 void MsgQueue::refill(bool isRestart)
 909 {
 910   if (_isCkptLeader) {
 911     struct msqid_ds buf;
 912     JASSERT(_real_msgctl(_realId, IPC_STAT, &buf) == 0) (_id) (JASSERT_ERRNO);
 913     if (isRestart) {
 914       // Now remove all the messages that were sent during preCkptDrain phase.
 915       size_t size = buf.__msg_cbytes;
 916       void *msgBuf = JALLOC_HELPER_MALLOC(size);
 917       while (_real_msgrcv(_realId, msgBuf, size, 0, IPC_NOWAIT) != -1);
 918       JALLOC_HELPER_FREE(msgBuf);
 919 
 920     } else {
 921       JASSERT(buf.msg_qnum == 0);
 922     }
 923 
 924     for (size_t i = 0; i < _qnum; i++) {
 925       JASSERT(_real_msgsnd(_realId, _msgInQueue[i].buffer(),
 926                            _msgInQueue[i].size(), IPC_NOWAIT) == 0);
 927     }
 928   }
 929   _msgInQueue.clear();
 930   _qnum = 0;
 931 }

/* [<][>][^][v][top][bottom][index][help] */