/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- markSocketNonBlocking
- markSocketBlocking
- instance
- destroy
- checkForPendingIncoming
- doReconnect
- openRestoreSocket
- registerIncoming
- registerOutgoing
- registerNSData
- registerNSData
- sendQueries
- debugPrint
1 /****************************************************************************
2 * Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
6 * *
7 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <sys/socket.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28
29 #include "dmtcp.h"
30 #include "protectedfds.h"
31 #include "util.h"
32 #include "jsocket.h"
33
34 #include "connectionrewirer.h"
35 #include "socketconnection.h"
36 #include "socketwrappers.h"
37
38 using namespace dmtcp;
39
40 // FIXME: IP6 Support disabled for now. However, we do go through the exercise
41 // of creating the restore socket and all.
42 // #define ENABLE_IP6_SUPPORT
43 static void markSocketNonBlocking(int sockfd)
44 {
45 // Remove O_NONBLOCK flag from listener socket
46 int flags = _real_fcntl(sockfd, F_GETFL, NULL);
47 JASSERT(flags != -1);
48 JASSERT(_real_fcntl(sockfd, F_SETFL,
49 (void*) (long) (flags | O_NONBLOCK)) != -1);
50 }
51
52 static void markSocketBlocking(int sockfd)
53 {
54 // Remove O_NONBLOCK flag from listener socket
55 int flags = _real_fcntl(sockfd, F_GETFL, NULL);
56 JASSERT(flags != -1);
57 JASSERT(_real_fcntl(sockfd, F_SETFL,
58 (void*) (long) (flags & ~O_NONBLOCK)) != -1);
59 }
60
61 static ConnectionRewirer *theRewirer = NULL;
62 ConnectionRewirer& ConnectionRewirer::instance()
63 {
64 if (theRewirer == NULL) {
65 theRewirer = new ConnectionRewirer();
66 }
67 return *theRewirer;
68 }
69
70 void ConnectionRewirer::destroy()
71 {
72 dmtcp_close_protected_fd(PROTECTED_RESTORE_IP4_SOCK_FD);
73 dmtcp_close_protected_fd(PROTECTED_RESTORE_IP6_SOCK_FD);
74 dmtcp_close_protected_fd(PROTECTED_RESTORE_UDS_SOCK_FD);
75
76 // Free up the object.
77 delete theRewirer;
78 theRewirer = NULL;
79 }
80
81 void ConnectionRewirer::checkForPendingIncoming(int restoreSockFd,
82 ConnectionListT *conList)
83 {
84 while (conList->size() > 0) {
85 int fd = _real_accept(restoreSockFd, NULL, NULL);
86 if (fd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
87 return;
88 }
89 JASSERT(fd != -1) (JASSERT_ERRNO) .Text("Accept failed.");
90 ConnectionIdentifier id;
91 JASSERT(Util::readAll(fd, &id, sizeof id) == sizeof id);
92
93 iterator i = conList->find(id);
94 JASSERT(i != conList->end()) (id)
95 .Text("got unexpected incoming restore request");
96
97 Util::dupFds(fd, (i->second)->getFds());
98
99 JTRACE("restoring incoming connection") (id);
100 conList->erase(i);
101 }
102 }
103
104 void ConnectionRewirer::doReconnect()
105 {
106 iterator i;
107 for (i = _pendingOutgoing.begin(); i != _pendingOutgoing.end(); i++) {
108 const ConnectionIdentifier& id = i->first;
109 Connection *con = i->second;
110 struct RemoteAddr& remoteAddr = _remoteInfo[id];
111 int fd = con->getFds()[0];
112 errno = 0;
113 JASSERT(_real_connect(fd, (sockaddr*) &remoteAddr.addr, remoteAddr.len)
114 == 0)
115 (id) (JASSERT_ERRNO) .Text("failed to restore connection");
116
117 Util::writeAll(fd, &id, sizeof id);
118
119 checkForPendingIncoming(PROTECTED_RESTORE_IP4_SOCK_FD,
120 &_pendingIP4Incoming);
121 checkForPendingIncoming(PROTECTED_RESTORE_IP6_SOCK_FD,
122 &_pendingIP6Incoming);
123 checkForPendingIncoming(PROTECTED_RESTORE_UDS_SOCK_FD,
124 &_pendingUDSIncoming);
125 }
126 _pendingOutgoing.clear();
127 _remoteInfo.clear();
128
129 if (_pendingIP4Incoming.size() > 0) {
130 // Add O_NONBLOCK flag to the listener sockets.
131 markSocketBlocking(PROTECTED_RESTORE_IP4_SOCK_FD);
132 checkForPendingIncoming(PROTECTED_RESTORE_IP4_SOCK_FD,
133 &_pendingIP4Incoming);
134 _real_close(PROTECTED_RESTORE_IP4_SOCK_FD);
135 }
136 if (_pendingIP6Incoming.size() > 0) {
137 // Add O_NONBLOCK flag to the listener sockets.
138 markSocketBlocking(PROTECTED_RESTORE_IP6_SOCK_FD);
139 checkForPendingIncoming(PROTECTED_RESTORE_IP6_SOCK_FD,
140 &_pendingIP6Incoming);
141 _real_close(PROTECTED_RESTORE_IP6_SOCK_FD);
142 }
143 if (_pendingUDSIncoming.size() > 0) {
144 // Add O_NONBLOCK flag to the listener sockets.
145 markSocketBlocking(PROTECTED_RESTORE_UDS_SOCK_FD);
146 checkForPendingIncoming(PROTECTED_RESTORE_UDS_SOCK_FD,
147 &_pendingUDSIncoming);
148 _real_close(PROTECTED_RESTORE_UDS_SOCK_FD);
149 }
150 JTRACE("Closed restore sockets");
151 }
152
153 void ConnectionRewirer::openRestoreSocket(bool hasIPv4Sock,
154 bool hasIPv6Sock,
155 bool hasUNIXSock)
156 {
157 memset(&_ip4RestoreAddr, 0, sizeof(_ip4RestoreAddr));
158 memset(&_ip6RestoreAddr, 0, sizeof(_ip6RestoreAddr));
159 memset(&_udsRestoreAddr, 0, sizeof(_udsRestoreAddr));
160
161 // Open IP4 Restore Socket
162 if (hasIPv4Sock) {
163 jalib::JServerSocket restoreSocket(jalib::JSockAddr::ANY, 0);
164 JASSERT(restoreSocket.isValid());
165 restoreSocket.changeFd(PROTECTED_RESTORE_IP4_SOCK_FD);
166
167 // Setup restore socket for name service
168 _ip4RestoreAddr.sin_family = AF_INET;
169 dmtcp_get_local_ip_addr(&_ip4RestoreAddr.sin_addr);
170 _ip4RestoreAddr.sin_port = htons(restoreSocket.port());
171 _ip4RestoreAddrlen = sizeof(_ip4RestoreAddr);
172
173 JTRACE("opened listen socket") (restoreSocket.sockfd())
174 (inet_ntoa(_ip4RestoreAddr.sin_addr)) (ntohs(_ip4RestoreAddr.sin_port));
175 markSocketNonBlocking(PROTECTED_RESTORE_IP4_SOCK_FD);
176 }
177
178 // Open IP6 Restore Socket
179 if (hasIPv6Sock) {
180 int ip6fd = _real_socket(AF_INET6, SOCK_STREAM, 0);
181 JASSERT(ip6fd != -1) (JASSERT_ERRNO);
182
183 _ip6RestoreAddr.sin6_family = AF_INET6;
184 _ip6RestoreAddr.sin6_port = 0;
185 _ip6RestoreAddr.sin6_addr = in6addr_any;
186 _ip6RestoreAddrlen = sizeof(_ip6RestoreAddr);
187 JASSERT(_real_bind(ip6fd, (struct sockaddr*) &_ip6RestoreAddr,
188 _ip6RestoreAddrlen) == 0)
189 (JASSERT_ERRNO);
190 JASSERT(getsockname(ip6fd, (struct sockaddr*)&_ip6RestoreAddr,
191 &_ip6RestoreAddrlen) == 0)
192 (JASSERT_ERRNO);
193 JASSERT(_real_listen(ip6fd, 32) == 0) (JASSERT_ERRNO);
194 Util::changeFd(ip6fd, PROTECTED_RESTORE_IP6_SOCK_FD);
195
196 JTRACE("opened ip6 listen socket") (PROTECTED_RESTORE_IP6_SOCK_FD);
197 markSocketNonBlocking(PROTECTED_RESTORE_IP6_SOCK_FD);
198 }
199
200 // Open UDS Restore Socket
201 if (hasUNIXSock) {
202 ostringstream o;
203 o << dmtcp_get_uniquepid_str() << "_" << dmtcp_get_coordinator_timestamp();
204 string str = o.str();
205 int udsfd = _real_socket(AF_UNIX, SOCK_STREAM, 0);
206 JASSERT(udsfd != -1);
207 memset(&_udsRestoreAddr, 0, sizeof(struct sockaddr_un));
208 _udsRestoreAddr.sun_family = AF_UNIX;
209 strncpy(&_udsRestoreAddr.sun_path[1], str.c_str(), str.length());
210 _udsRestoreAddrlen = sizeof(sa_family_t) + str.length() + 1;
211 JASSERT(_real_bind(udsfd, (struct sockaddr*) &_udsRestoreAddr,
212 _udsRestoreAddrlen) == 0)
213 (JASSERT_ERRNO);
214 JASSERT(_real_listen(udsfd, 32) == 0) (JASSERT_ERRNO);
215 Util::changeFd(udsfd, PROTECTED_RESTORE_UDS_SOCK_FD);
216
217 JTRACE("opened UDS listen socket")
218 (PROTECTED_RESTORE_UDS_SOCK_FD) (&_udsRestoreAddr.sun_path[1]);
219 markSocketNonBlocking(PROTECTED_RESTORE_UDS_SOCK_FD);
220 }
221 }
222
223 void
224 ConnectionRewirer::registerIncoming(const ConnectionIdentifier& local,
225 Connection* con,
226 int domain)
227 {
228 JASSERT(domain == AF_INET || domain == AF_INET6 || domain == AF_UNIX)
229 (domain) .Text("Unsupported domain.");
230
231 if (domain == AF_INET) {
232 _pendingIP4Incoming[local] = con;
233 } else if (domain == AF_INET6) {
234 #ifdef ENABLE_IP6_SUPPORT
235 _pendingIP6Incoming[local] = con;
236 #else
237 _pendingIP4Incoming[local] = con;
238 #endif
239 } else if (domain == AF_UNIX) {
240 _pendingUDSIncoming[local] = con;
241 } else {
242 JASSERT(false) .Text("Not implemented");
243 }
244
245 JTRACE("announcing pending incoming") (local);
246 }
247
248 void
249 ConnectionRewirer::registerOutgoing(const ConnectionIdentifier& remote,
250 Connection* con)
251 {
252 _pendingOutgoing[remote] = con;
253 JTRACE("announcing pending outgoing") (remote);
254 }
255
256 void ConnectionRewirer::registerNSData()
257 {
258 registerNSData((void*)&_ip4RestoreAddr, _ip4RestoreAddrlen,
259 &_pendingIP4Incoming);
260 registerNSData((void*)&_ip6RestoreAddr, _ip6RestoreAddrlen,
261 &_pendingIP6Incoming);
262 registerNSData((void*)&_udsRestoreAddr, _udsRestoreAddrlen,
263 &_pendingUDSIncoming);
264 }
265
266 void ConnectionRewirer::registerNSData(void *addr,
267 socklen_t addrLen,
268 ConnectionListT *conList)
269 {
270 iterator i;
271 JASSERT(theRewirer != NULL);
272 for (i = conList->begin(); i != conList->end(); ++i) {
273 const ConnectionIdentifier& id = i->first;
274 dmtcp_send_key_val_pair_to_coordinator("Socket",
275 (const void *)&id,
276 (uint32_t) sizeof(id),
277 addr,
278 (uint32_t) addrLen);
279 /*
280 sockaddr_in *sn = (sockaddr_in*) &_restoreAddr;
281 unsigned short port = htons(sn->sin_port);
282 char *ip = inet_ntoa(sn->sin_addr);
283 JTRACE("Send NS information:")(id)(sn->sin_family)(port)(ip);
284 */
285 }
286 //debugPrint();
287 }
288
289 void ConnectionRewirer::sendQueries()
290 {
291 iterator i;
292 for (i = _pendingOutgoing.begin(); i != _pendingOutgoing.end(); ++i) {
293 const ConnectionIdentifier& id = i->first;
294 struct RemoteAddr remote;
295 uint32_t len = sizeof(remote.addr);
296 dmtcp_send_query_to_coordinator("Socket",
297 (const void *)&id, (uint32_t) sizeof(id),
298 &remote.addr, &len);
299 remote.len = len;
300 /*
301 sockaddr_in *sn = (sockaddr_in*) &remote.addr;
302 unsigned short port = htons(sn->sin_port);
303 char *ip = inet_ntoa(sn->sin_addr);
304 JTRACE("Send Queries. Get remote from coordinator:")(id)(sn->sin_family)(port)(ip);
305 */
306 _remoteInfo[id] = remote;
307 }
308 }
309
310 #if 0
311 void ConnectionRewirer::debugPrint() const
312 {
313 #ifdef DEBUG
314 ostringstream o;
315 o << "Pending Incoming:\n";
316 const_iterator i;
317 for (i = _pendingIncoming.begin(); i!=_pendingIncoming.end(); ++i) {
318 Connection *con = i->second;
319 o << i->first << " numFds=" << con->getFds().size()
320 << " firstFd=" << con->getFds()[0] << '\n';
321 }
322 o << "Pending Outgoing:\n";
323 for (i = _pendingOutgoing.begin(); i!=_pendingOutgoing.end(); ++i) {
324 Connection *con = i->second;
325 o << i->first << " numFds=" << con->getFds().size()
326 << " firstFd=" << con->getFds()[0] << '\n';
327 }
328 JNOTE("Pending connections") (o.str());
329 #endif
330 }
331 #endif