/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- mq_open
- mq_close
- mq_notify_thread_start
- mq_notify
- mq_send
- mq_receive
- mq_timedsend
- mq_timedreceive
1 /****************************************************************************
2 * Copyright (C) 2012 by Kapil Arya(kapil@ccs.neu.edu) *
3 * *
4 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
5 * *
6 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
7 * modify it under the terms of the GNU Lesser General Public License as *
8 * published by the Free Software Foundation, either version 3 of the *
9 * License, or (at your option) any later version. *
10 * *
11 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
14 * GNU Lesser General Public License for more details. *
15 * *
16 * You should have received a copy of the GNU Lesser General Public *
17 * License along with DMTCP:dmtcp/src. If not, see *
18 * <http://www.gnu.org/licenses/>. *
19 ****************************************************************************/
20
21 #include <mqueue.h>
22 #include <time.h>
23 #include <stdarg.h>
24 #include "util.h"
25 #include "dmtcp.h"
26
27 #include "fileconnection.h"
28 #include "filewrappers.h"
29 #include "fileconnlist.h"
30
31 using namespace dmtcp;
32
33 extern "C"
34 mqd_t mq_open(const char *name, int oflag, ...)
35 {
36 mode_t mode = 0;
37 struct mq_attr *attr = NULL;
38
39 // Handling the variable number of arguments
40 if (oflag & O_CREAT) {
41 va_list arg;
42 va_start(arg, oflag);
43 mode = va_arg(arg, mode_t);
44 attr = va_arg(arg, struct mq_attr*);
45 va_end(arg);
46 }
47
48 DMTCP_PLUGIN_DISABLE_CKPT();
49 int res = _real_mq_open(name, oflag, mode, attr);
50 if (res != -1) {
51 PosixMQConnection *pcon = new PosixMQConnection(name, oflag,
52 mode, attr);
53 FileConnList::instance().add(res, pcon);
54 }
55 DMTCP_PLUGIN_ENABLE_CKPT();
56 return res;
57 }
58
59 extern "C"
60 int mq_close(mqd_t mqdes)
61 {
62 DMTCP_PLUGIN_DISABLE_CKPT();
63 int res = _real_mq_close(mqdes);
64 if (res != -1) {
65 PosixMQConnection *con =(PosixMQConnection*)
66 FileConnList::instance().getConnection(mqdes);
67 con->on_mq_close();
68 }
69 DMTCP_PLUGIN_ENABLE_CKPT();
70 return res;
71 }
72
73 struct mqNotifyData {
74 void(*start_routine) (union sigval);
75 union sigval sv;
76 mqd_t mqdes;
77 };
78
79 static void mq_notify_thread_start(union sigval sv)
80 {
81 // FIXME: Possible race:
82 // If some other thread in this process calls mq_notify() before we
83 // make a call to con->on_mq_notify(NULL), the notify request won't be
84 // restored on restart. However, this case is rather unusual.
85 struct mqNotifyData *m =(struct mqNotifyData*) sv.sival_ptr;
86 void(*start_routine) (union sigval) = m->start_routine;
87 union sigval s = m->sv;
88 mqd_t mqdes = m->mqdes;
89
90 JALLOC_HELPER_FREE(m);
91
92 DMTCP_PLUGIN_DISABLE_CKPT();
93 PosixMQConnection *con =(PosixMQConnection*)
94 FileConnList::instance().getConnection(mqdes);
95 con->on_mq_notify(NULL);
96 DMTCP_PLUGIN_ENABLE_CKPT();
97
98 start_routine(s);
99 }
100
101 extern "C"
102 int mq_notify(mqd_t mqdes, const struct sigevent *sevp)
103 {
104 int res;
105 DMTCP_PLUGIN_DISABLE_CKPT();
106 if (sevp != NULL && sevp->sigev_notify == SIGEV_THREAD) {
107 struct sigevent se;
108 struct mqNotifyData *mdata;
109 mdata =(struct mqNotifyData*)
110 JALLOC_HELPER_MALLOC(sizeof(struct mqNotifyData));
111 se = *sevp;
112 mdata->start_routine = sevp->sigev_notify_function;
113 mdata->sv = sevp->sigev_value;
114 mdata->mqdes = mqdes;
115 se.sigev_notify_function = mq_notify_thread_start;
116 se.sigev_value.sival_ptr = mdata;
117 res = _real_mq_notify(mqdes, &se);
118 } else {
119 res = _real_mq_notify(mqdes, sevp);
120 }
121
122 if (res != -1) {
123 PosixMQConnection *con =(PosixMQConnection*)
124 FileConnList::instance().getConnection(mqdes);
125 con->on_mq_notify(sevp);
126 }
127 DMTCP_PLUGIN_ENABLE_CKPT();
128 return res;
129 }
130
131 extern "C"
132 int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio)
133 {
134 int res;
135 struct timespec ts;
136 do {
137 JASSERT(clock_gettime(CLOCK_REALTIME, &ts) != -1);
138 ts.tv_sec += 1000;
139 res = mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, &ts);
140 } while (res == -1 && errno == ETIMEDOUT);
141 return res;
142 }
143
144 extern "C"
145 ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
146 unsigned *msg_prio)
147 {
148 ssize_t res;
149 struct timespec ts;
150 do {
151 JASSERT(clock_gettime(CLOCK_REALTIME, &ts) != -1);
152 ts.tv_sec += 1000;
153 res = mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, &ts);
154 } while (res == -1 && errno == ETIMEDOUT);
155 return res;
156 }
157
158 static struct timespec ts_100ms = {0, 100 * 1000 * 1000};
159 extern "C"
160 int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
161 unsigned msg_prio, const struct timespec *abs_timeout)
162 {
163 struct timespec ts;
164 int ret = -1;
165 do {
166 DMTCP_PLUGIN_DISABLE_CKPT();
167 JASSERT(clock_gettime(CLOCK_REALTIME, &ts) != -1);
168 if (TIMESPEC_CMP(&ts, abs_timeout, <=)) {
169 TIMESPEC_ADD(&ts, &ts_100ms, &ts);
170 }
171 ret = _real_mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, &ts);
172 DMTCP_PLUGIN_ENABLE_CKPT();
173
174 if (ret != -1 ||(ret == -1 && errno != ETIMEDOUT)) {
175 return ret;
176 }
177 } while (TIMESPEC_CMP(&ts, abs_timeout, <));
178
179 errno = ETIMEDOUT;
180 return ret;
181 }
182
183 extern "C"
184 ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
185 unsigned *msg_prio,
186 const struct timespec *abs_timeout)
187 {
188 struct timespec ts;
189 int ret = -1;
190 do {
191 DMTCP_PLUGIN_DISABLE_CKPT();
192 JASSERT(clock_gettime(CLOCK_REALTIME, &ts) != -1);
193 if (TIMESPEC_CMP(&ts, abs_timeout, <=)) {
194 TIMESPEC_ADD(&ts, &ts_100ms, &ts);
195 }
196 ret = _real_mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, &ts);
197 DMTCP_PLUGIN_ENABLE_CKPT();
198
199 if (ret != -1 ||(ret == -1 && errno != ETIMEDOUT)) {
200 return ret;
201 }
202 } while (TIMESPEC_CMP(&ts, abs_timeout, <));
203
204 errno = ETIMEDOUT;
205 return ret;
206 }