root/plugin/ipc/file/posixipcwrappers.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mq_open
  2. mq_close
  3. mq_notify_thread_start
  4. mq_notify
  5. mq_send
  6. mq_receive
  7. mq_timedsend
  8. mq_timedreceive

   1 /****************************************************************************
   2  *   Copyright (C) 2012 by Kapil Arya(kapil@ccs.neu.edu)                   *
   3  *                                                                          *
   4  *   This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src).  *
   5  *                                                                          *
   6  *  DMTCP:dmtcp/src is free software: you can redistribute it and/or        *
   7  *  modify it under the terms of the GNU Lesser General Public License as   *
   8  *  published by the Free Software Foundation, either version 3 of the      *
   9  *  License, or (at your option) any later version.                         *
  10  *                                                                          *
  11  *  DMTCP:dmtcp/src is distributed in the hope that it will be useful,      *
  12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
  13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
  14  *  GNU Lesser General Public License for more details.                     *
  15  *                                                                          *
  16  *  You should have received a copy of the GNU Lesser General Public        *
  17  *  License along with DMTCP:dmtcp/src.  If not, see                        *
  18  *  <http://www.gnu.org/licenses/>.                                         *
  19  ****************************************************************************/
  20 
  21 #include <mqueue.h>
  22 #include <time.h>
  23 #include <stdarg.h>
  24 #include "util.h"
  25 #include "dmtcp.h"
  26 
  27 #include "fileconnection.h"
  28 #include "filewrappers.h"
  29 #include "fileconnlist.h"
  30 
  31 using namespace dmtcp;
  32 
  33 extern "C"
  34 mqd_t mq_open(const char *name, int oflag, ...)
  35 {
  36   mode_t mode = 0;
  37   struct mq_attr *attr = NULL;
  38 
  39   // Handling the variable number of arguments
  40   if (oflag & O_CREAT) {
  41     va_list arg;
  42     va_start(arg, oflag);
  43     mode = va_arg(arg, mode_t);
  44     attr = va_arg(arg, struct mq_attr*);
  45     va_end(arg);
  46   }
  47 
  48   DMTCP_PLUGIN_DISABLE_CKPT();
  49   int res = _real_mq_open(name, oflag, mode, attr);
  50   if (res != -1) {
  51     PosixMQConnection *pcon = new PosixMQConnection(name, oflag,
  52                                                                   mode, attr);
  53     FileConnList::instance().add(res, pcon);
  54   }
  55   DMTCP_PLUGIN_ENABLE_CKPT();
  56   return res;
  57 }
  58 
  59 extern "C"
  60 int mq_close(mqd_t mqdes)
  61 {
  62   DMTCP_PLUGIN_DISABLE_CKPT();
  63   int res = _real_mq_close(mqdes);
  64   if (res != -1) {
  65     PosixMQConnection *con =(PosixMQConnection*)
  66       FileConnList::instance().getConnection(mqdes);
  67     con->on_mq_close();
  68   }
  69   DMTCP_PLUGIN_ENABLE_CKPT();
  70   return res;
  71 }
  72 
  73 struct mqNotifyData {
  74   void(*start_routine) (union sigval);
  75   union sigval sv;
  76   mqd_t mqdes;
  77 };
  78 
  79 static void mq_notify_thread_start(union sigval sv)
  80 {
  81   // FIXME: Possible race:
  82   //        If some other thread in this process calls mq_notify() before we
  83   //        make a call to con->on_mq_notify(NULL), the notify request won't be
  84   //        restored on restart. However, this case is rather unusual.
  85   struct mqNotifyData *m =(struct mqNotifyData*) sv.sival_ptr;
  86   void(*start_routine) (union sigval) = m->start_routine;
  87   union sigval s = m->sv;
  88   mqd_t mqdes = m->mqdes;
  89 
  90   JALLOC_HELPER_FREE(m);
  91 
  92   DMTCP_PLUGIN_DISABLE_CKPT();
  93   PosixMQConnection *con =(PosixMQConnection*)
  94     FileConnList::instance().getConnection(mqdes);
  95   con->on_mq_notify(NULL);
  96   DMTCP_PLUGIN_ENABLE_CKPT();
  97 
  98   start_routine(s);
  99 }
 100 
 101 extern "C"
 102 int mq_notify(mqd_t mqdes, const struct sigevent *sevp)
 103 {
 104   int res;
 105   DMTCP_PLUGIN_DISABLE_CKPT();
 106   if (sevp != NULL && sevp->sigev_notify == SIGEV_THREAD) {
 107     struct sigevent se;
 108     struct mqNotifyData *mdata;
 109     mdata =(struct mqNotifyData*)
 110       JALLOC_HELPER_MALLOC(sizeof(struct mqNotifyData));
 111     se = *sevp;
 112     mdata->start_routine = sevp->sigev_notify_function;
 113     mdata->sv = sevp->sigev_value;
 114     mdata->mqdes = mqdes;
 115     se.sigev_notify_function = mq_notify_thread_start;
 116     se.sigev_value.sival_ptr = mdata;
 117     res = _real_mq_notify(mqdes, &se);
 118   } else {
 119     res = _real_mq_notify(mqdes, sevp);
 120   }
 121 
 122   if (res != -1) {
 123     PosixMQConnection *con =(PosixMQConnection*)
 124       FileConnList::instance().getConnection(mqdes);
 125     con->on_mq_notify(sevp);
 126   }
 127   DMTCP_PLUGIN_ENABLE_CKPT();
 128   return res;
 129 }
 130 
 131 extern "C"
 132 int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio)
 133 {
 134   int res;
 135   struct timespec ts;
 136   do {
 137     JASSERT(clock_gettime(CLOCK_REALTIME, &ts) != -1);
 138     ts.tv_sec += 1000;
 139     res = mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, &ts);
 140   } while (res == -1 && errno == ETIMEDOUT);
 141   return res;
 142 }
 143 
 144 extern "C"
 145 ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
 146                    unsigned *msg_prio)
 147 {
 148   ssize_t res;
 149   struct timespec ts;
 150   do {
 151     JASSERT(clock_gettime(CLOCK_REALTIME, &ts) != -1);
 152     ts.tv_sec += 1000;
 153     res = mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, &ts);
 154   } while (res == -1 && errno == ETIMEDOUT);
 155   return res;
 156 }
 157 
 158 static struct timespec ts_100ms = {0, 100 * 1000 * 1000};
 159 extern "C"
 160 int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
 161                  unsigned msg_prio, const struct timespec *abs_timeout)
 162 {
 163   struct timespec ts;
 164   int ret = -1;
 165   do {
 166     DMTCP_PLUGIN_DISABLE_CKPT();
 167     JASSERT(clock_gettime(CLOCK_REALTIME, &ts) != -1);
 168     if (TIMESPEC_CMP(&ts, abs_timeout, <=)) {
 169       TIMESPEC_ADD(&ts, &ts_100ms, &ts);
 170     }
 171     ret = _real_mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, &ts);
 172     DMTCP_PLUGIN_ENABLE_CKPT();
 173 
 174     if (ret != -1 ||(ret == -1 && errno != ETIMEDOUT)) {
 175       return ret;
 176     }
 177   } while (TIMESPEC_CMP(&ts, abs_timeout, <));
 178 
 179   errno = ETIMEDOUT;
 180   return ret;
 181 }
 182 
 183 extern "C"
 184 ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
 185                         unsigned *msg_prio,
 186                         const struct timespec *abs_timeout)
 187 {
 188   struct timespec ts;
 189   int ret = -1;
 190   do {
 191     DMTCP_PLUGIN_DISABLE_CKPT();
 192     JASSERT(clock_gettime(CLOCK_REALTIME, &ts) != -1);
 193     if (TIMESPEC_CMP(&ts, abs_timeout, <=)) {
 194       TIMESPEC_ADD(&ts, &ts_100ms, &ts);
 195     }
 196     ret = _real_mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, &ts);
 197     DMTCP_PLUGIN_ENABLE_CKPT();
 198 
 199     if (ret != -1 ||(ret == -1 && errno != ETIMEDOUT)) {
 200       return ret;
 201     }
 202   } while (TIMESPEC_CMP(&ts, abs_timeout, <));
 203 
 204   errno = ETIMEDOUT;
 205   return ret;
 206 }

/* [<][>][^][v][top][bottom][index][help] */