/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- _makeDeadSocket
- _peerType
- addSetsockopt
- restoreSocketOptions
- serialize
- _remotePeerId
- asTcp
- getMPISpawnPortNum
- isBlacklistedTcp
- onBind
- onListen
- onConnect
- _remotePeerId
- onError
- drain
- doSendHandshakes
- doRecvHandshakes
- refill
- postRestart
- sendHandshake
- recvHandshake
- serializeSubClass
- SocketConnection
- drain
- refill
- postRestart
- serializeSubClass
1 /****************************************************************************
2 * Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
6 * *
7 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 #include <sys/ioctl.h>
23 #include <sys/select.h>
24 #include <sys/un.h>
25 #include <unistd.h>
26 #include <fcntl.h>
27 #include <linux/limits.h>
28 #include <arpa/inet.h>
29
30 #include "dmtcp.h"
31 #include "shareddata.h"
32 #include "util.h"
33 #include "jsocket.h"
34 #include "jassert.h"
35 #include "jfilesystem.h"
36 #include "jconvert.h"
37
38 #include "socketconnection.h"
39 #include "socketwrappers.h"
40 #include "kernelbufferdrainer.h"
41 #include "connectionrewirer.h"
42
43 #ifdef REALLY_VERBOSE_CONNECTION_CPP
44 static bool really_verbose = true;
45 #else
46 static bool really_verbose = false;
47 #endif
48
49 using namespace dmtcp;
50
51 //this function creates a socket that is in an error state
52 static int _makeDeadSocket(const char *refillData = NULL, ssize_t len = -1)
53 {
54 //it does it by creating a socket pair and closing one side
55 int sp[2] = {-1,-1};
56 JASSERT(_real_socketpair(AF_UNIX, SOCK_STREAM, 0, sp) == 0) (JASSERT_ERRNO)
57 .Text("socketpair() failed");
58 JASSERT(sp[0]>=0 && sp[1]>=0) (sp[0]) (sp[1])
59 .Text("socketpair() failed");
60 if (refillData != NULL) {
61 JASSERT(Util::writeAll(sp[1], refillData, len) == len);
62 }
63 _real_close(sp[1]);
64 if (really_verbose) {
65 JTRACE("Created dead socket.") (sp[0]);
66 }
67 return sp[0];
68 }
69
70 SocketConnection::SocketConnection(int domain, int type, int protocol)
71 : _sockDomain(domain)
72 , _sockType(type)
73 , _sockProtocol(protocol)
74 , _peerType(PEER_UNKNOWN)
75 { }
76
77 void SocketConnection::addSetsockopt(int level, int option,
78 const char* value, int len)
79 {
80 _sockOptions[level][option] = jalib::JBuffer(value, len);
81 }
82
83 void SocketConnection::restoreSocketOptions(vector<int>& fds)
84 {
85 typedef map<int64_t, map< int64_t, jalib::JBuffer> >::iterator levelIterator;
86 typedef map<int64_t, jalib::JBuffer>::iterator optionIterator;
87
88 for (levelIterator lvl = _sockOptions.begin();
89 lvl!=_sockOptions.end(); ++lvl) {
90 for (optionIterator opt = lvl->second.begin();
91 opt!=lvl->second.end(); ++opt) {
92 JTRACE("Restoring socket option.")
93 (fds[0]) (opt->first) (opt->second.size());
94 int ret = _real_setsockopt(fds[0], lvl->first, opt->first,
95 opt->second.buffer(),
96 opt->second.size());
97 JASSERT(ret == 0) (JASSERT_ERRNO) (fds[0])
98 (lvl->first) (opt->first) (opt->second.size())
99 .Text("Restoring setsockopt failed.");
100 }
101 }
102 }
103
104 void SocketConnection::serialize(jalib::JBinarySerializer& o)
105 {
106 JSERIALIZE_ASSERT_POINT("SocketConnection");
107 o & _sockDomain & _sockType & _sockProtocol & _peerType;
108
109 JSERIALIZE_ASSERT_POINT("SocketOptions:");
110 uint64_t numSockOpts = _sockOptions.size();
111 o & numSockOpts;
112 if (o.isWriter()) {
113 //JTRACE("TCP Serialize ") (_type) (_id.conId());
114 typedef map< int64_t, map< int64_t, jalib::JBuffer > >::iterator levelIterator;
115 typedef map< int64_t, jalib::JBuffer >::iterator optionIterator;
116
117 uint64_t numLvl = _sockOptions.size();
118 o & numLvl;
119
120 for (levelIterator lvl = _sockOptions.begin();
121 lvl!=_sockOptions.end(); ++lvl) {
122 int64_t lvlVal = lvl->first;
123 uint64_t numOpts = lvl->second.size();
124
125 JSERIALIZE_ASSERT_POINT("Lvl");
126
127 o & lvlVal & numOpts;
128
129 for (optionIterator opt = lvl->second.begin();
130 opt!=lvl->second.end(); ++opt) {
131 int64_t optType = opt->first;
132 jalib::JBuffer& buffer = opt->second;
133 int64_t bufLen = buffer.size();
134
135 JSERIALIZE_ASSERT_POINT("Opt");
136
137 o & optType & bufLen;
138 o.readOrWrite(buffer.buffer(), bufLen);
139 }
140 }
141 } else {
142 uint64_t numLvl = 0;
143 o & numLvl;
144
145 while (numLvl-- > 0) {
146 int64_t lvlVal = -1;
147 int64_t numOpts = 0;
148
149 JSERIALIZE_ASSERT_POINT("Lvl");
150
151 o & lvlVal & numOpts;
152
153 while (numOpts-- > 0) {
154 int64_t optType = -1;
155 int64_t bufLen = -1;
156
157 JSERIALIZE_ASSERT_POINT("Opt");
158
159 o & optType & bufLen;
160
161 jalib::JBuffer buffer(bufLen);
162 o.readOrWrite(buffer.buffer(), bufLen);
163
164 _sockOptions[lvlVal][optType]=buffer;
165 }
166 }
167 }
168
169 JSERIALIZE_ASSERT_POINT("EndSockOpts");
170 }
171
172 /*****************************************************************************
173 * TCP Connection
174 *****************************************************************************/
175
176 /*onSocket*/
177 TcpConnection::TcpConnection(int domain, int type, int protocol)
178 : Connection(TCP_CREATED)
179 , SocketConnection(domain, type, protocol)
180 , _listenBacklog(-1)
181 , _bindAddrlen(0)
182 , _remotePeerId(ConnectionIdentifier::null())
183 {
184 if (domain != -1) {
185 // Sometimes _sockType contains SOCK_CLOEXEC/SOCK_NONBLOCK flags.
186 if ((type & 077) == SOCK_DGRAM) {
187 JWARNING(false) (type)
188 .Text("Datagram Sockets not supported. "
189 "Hopefully, this is a short lived connection!");
190 } else {
191 JWARNING((domain == AF_INET || domain == AF_UNIX || domain == AF_INET6)
192 && (type & 077) == SOCK_STREAM)
193 (domain) (type) (protocol);
194 }
195 JTRACE("Creating TcpConnection.") (id()) (domain) (type) (protocol);
196 }
197 memset(&_bindAddr, 0, sizeof _bindAddr);
198 }
199
200 TcpConnection& TcpConnection::asTcp()
201 {
202 return *this;
203 }
204
205 #ifdef STAMPEDE_MPISPAWN_FIX
206 static int
207 getMPISpawnPortNum(const char* envVar)
208 {
209 /* PMI_PORT is of the form: "hostname:port" */
210 char *temp = getenv(envVar);
211 if (temp) {
212 while (*temp && *temp != ':') *temp++;
213 if (*temp == ':') return atoi(temp+1);
214 }
215 return 0;
216 }
217 #endif
218
219 bool TcpConnection::isBlacklistedTcp(const sockaddr* saddr, socklen_t len)
220 {
221 JASSERT(saddr != NULL);
222 if (len <= sizeof(saddr->sa_family)) {
223 return false;
224 }
225
226 if (saddr->sa_family == AF_INET) {
227 struct sockaddr_in* addr =(sockaddr_in*)saddr;
228 // Ports 389 and 636 are the well-known ports in /etc/services that
229 // are reserved for LDAP. Bash continues to maintain a connection to
230 // LDAP, leading to problems at restart time. So, we discover the LDAP
231 // remote addresses, and turn them into dead sockets at restart time.
232 //However, libc.so:getpwuid() can call libnss_ldap.so which calls
233 // libldap-2.4.so to create the LDAP socket while evading our connect
234 // wrapper.
235 int blacklistedRemotePorts[] = {53, // DNS Server
236 389, 636, // LDAP
237 -1};
238 #ifdef STAMPEDE_MPISPAWN_FIX
239 int mpispawnPort = getMPISpawnPortNum("PMI_PORT");
240 JTRACE("PMI_PORT port") (mpispawnPort) (ntohs(addr->sin_port));
241 JASSERT(mpispawnPort != 0).Text("PMI_PORT not found");
242 if (ntohs(addr->sin_port) == mpispawnPort) {
243 JTRACE("PMI_PORT port found") (mpispawnPort);
244 return true;
245 }
246 #endif
247 for (size_t i = 0; blacklistedRemotePorts[i] != -1; i++) {
248 if (ntohs(addr->sin_port) == blacklistedRemotePorts[i]) {
249 JTRACE("LDAP port found") (ntohs(addr->sin_port))
250 (blacklistedRemotePorts[0]) (blacklistedRemotePorts[1]);
251 return true;
252 }
253 }
254 } else if (saddr->sa_family == AF_UNIX) {
255 struct sockaddr_un *uaddr = (struct sockaddr_un *) saddr;
256 static string blacklist[] = {""};
257 for (size_t i = 0; blacklist[i] != ""; i++) {
258 if (Util::strStartsWith(uaddr->sun_path, blacklist[i].c_str()) ||
259 Util::strStartsWith(&uaddr->sun_path[1], blacklist[i].c_str())) {
260 JTRACE("Blacklisted socket address") (uaddr->sun_path);
261 return true;
262 }
263 }
264 }
265
266 // FIXME: Consider adding configure or dmtcp_launch option to disable
267 // all remote connections. Handy as quick test for special cases.
268 return false;
269 }
270
271 void TcpConnection::onBind(const struct sockaddr* addr, socklen_t len)
272 {
273 if (really_verbose) {
274 JTRACE("Binding.") (id()) (len);
275 }
276
277 // If the bind succeeded, we do not need any additional assert.
278 //JASSERT(_type == TCP_CREATED) (_type) (id())
279 // .Text("Binding a socket in use????");
280
281 if (_sockDomain == AF_UNIX && addr != NULL) {
282 JASSERT(len <= sizeof _bindAddr) (len) (sizeof _bindAddr)
283 .Text("That is one huge sockaddr buddy.");
284 _bindAddrlen = len;
285 memcpy(&_bindAddr, addr, len);
286 } else {
287 _bindAddrlen = sizeof(_bindAddr);
288 // Do not rely on the address passed on to bind as it may contain port 0
289 // which allows the OS to give any unused port. Thus we look ourselves up
290 // using getsockname.
291 JASSERT(getsockname(_fds[0], (struct sockaddr *)&_bindAddr, &_bindAddrlen) == 0)
292 (JASSERT_ERRNO);
293 }
294 _type = TCP_BIND;
295 }
296
297 void TcpConnection::onListen(int backlog)
298 {
299 /* The application didn't issue a bind() call; the kernel will assign
300 * a random address in this case. Call the regular onBind() post-
301 * processing function which will save the address returned by the kernel
302 * and change the state of the socket connection to TCP_BIND.
303 */
304 if (_type == TCP_CREATED) {
305 onBind(NULL, 0);
306 }
307
308 if (really_verbose) {
309 JTRACE("Listening.") (id()) (backlog);
310 }
311 JASSERT(_type == TCP_BIND) (_type) (id())
312 .Text("Listening on a non-bind()ed socket????");
313 // A -1 backlog is not an error.
314 //JASSERT(backlog > 0) (backlog)
315 //.Text("That is an odd backlog????");
316
317 _type = TCP_LISTEN;
318 _listenBacklog = backlog;
319 }
320
321 void TcpConnection::onConnect(const struct sockaddr *addr, socklen_t len)
322 {
323 if (really_verbose) {
324 JTRACE("Connecting.") (id());
325 }
326 JWARNING(_type == TCP_CREATED || _type == TCP_BIND) (_type) (id())
327 .Text("Connecting with an in-use socket????");
328
329 if (addr != NULL && isBlacklistedTcp(addr, len)) {
330 _type = TCP_EXTERNAL_CONNECT;
331 _connectAddrlen = len;
332 memcpy(&_connectAddr, addr, len);
333 } else {
334 _type = TCP_CONNECT;
335 }
336 }
337
338 /*onAccept*/
339 TcpConnection::TcpConnection(const TcpConnection& parent,
340 const ConnectionIdentifier& remote)
341 : Connection(TCP_ACCEPT)
342 , SocketConnection(parent._sockDomain, parent._sockType, parent._sockProtocol)
343 , _listenBacklog(-1)
344 , _bindAddrlen(0)
345 , _remotePeerId(remote)
346 {
347 if (really_verbose) {
348 JTRACE("Accepting.") (id()) (parent.id()) (remote);
349 }
350
351 // JASSERT(parent._type == TCP_LISTEN) (parent._type) (parent.id())
352 // .Text("Accepting from a non listening socket????");
353 memset(&_bindAddr, 0, sizeof _bindAddr);
354 }
355
356 void TcpConnection::onError()
357 {
358 JTRACE("Error.") (id());
359 _type = TCP_ERROR;
360 JTRACE("Creating dead socket.") (_fds[0]) (_fds.size());
361 const vector<char>& buffer =
362 KernelBufferDrainer::instance().getDrainedData(_id);
363 Util::dupFds(_makeDeadSocket(&buffer[0], buffer.size()), _fds);
364 }
365
366 void TcpConnection::drain()
367 {
368 JASSERT(_fds.size() > 0) (id());
369
370 if ((_fcntlFlags & O_ASYNC) != 0) {
371 if (really_verbose) {
372 JTRACE("Removing O_ASYNC flag during checkpoint.") (_fds[0]) (id());
373 }
374 errno = 0;
375 JASSERT(fcntl(_fds[0],F_SETFL,_fcntlFlags & ~O_ASYNC) == 0)
376 (JASSERT_ERRNO) (_fds[0]) (id());
377 }
378
379 if (dmtcp_no_coordinator()) {
380 markExternalConnect();
381 }
382
383 switch (_type) {
384 case TCP_ERROR:
385 // Treat TCP_ERROR as a regular socket for draining purposes. There still
386 // might be some stale data on it.
387 case TCP_CONNECT:
388 case TCP_ACCEPT:
389 JTRACE("Will drain socket") (_hasLock) (_fds[0]) (_id) (_remotePeerId);
390 KernelBufferDrainer::instance().beginDrainOf(_fds[0], _id);
391 break;
392 case TCP_LISTEN:
393 KernelBufferDrainer::instance().addListenSocket(_fds[0]);
394 break;
395 case TCP_BIND:
396 JWARNING(_type != TCP_BIND) (_fds[0])
397 .Text("If there are pending connections on this socket,\n"
398 " they won't be checkpointed because"
399 " it is not yet in a listen state.");
400 break;
401 case TCP_EXTERNAL_CONNECT:
402 JTRACE("Socket to External Process, won't be drained") (_fds[0]);
403 break;
404 }
405 }
406
407 void TcpConnection::doSendHandshakes(const ConnectionIdentifier& coordId)
408 {
409 switch (_type) {
410 case TCP_CONNECT:
411 case TCP_ACCEPT:
412 JTRACE("Sending handshake ...") (id()) (_fds[0]);
413 sendHandshake(_fds[0], coordId);
414 break;
415 case TCP_EXTERNAL_CONNECT:
416 JTRACE("Socket to External Process, skipping handshake send") (_fds[0]);
417 break;
418 }
419 }
420
421 void TcpConnection::doRecvHandshakes(const ConnectionIdentifier& coordId)
422 {
423 switch (_type) {
424 case TCP_CONNECT:
425 case TCP_ACCEPT:
426 recvHandshake(_fds[0], coordId);
427 JTRACE("Received handshake.") (id()) (_remotePeerId) (_fds[0]);
428 break;
429 case TCP_EXTERNAL_CONNECT:
430 JTRACE("Socket to External Process, skipping handshake recv") (_fds[0]);
431 break;
432 }
433 }
434
435 void TcpConnection::refill(bool isRestart)
436 {
437 if ((_fcntlFlags & O_ASYNC) != 0) {
438 JTRACE("Re-adding O_ASYNC flag.") (_fds[0]) (id());
439 restoreSocketOptions(_fds);
440 } else if (isRestart && _sockDomain != AF_INET6 &&
441 _type != TCP_EXTERNAL_CONNECT) {
442 restoreSocketOptions(_fds);
443 }
444 }
445
446 void TcpConnection::postRestart()
447 {
448 int fd;
449 JASSERT(_fds.size() > 0);
450 switch (_type) {
451 case TCP_PREEXISTING:
452 case TCP_INVALID:
453 case TCP_EXTERNAL_CONNECT:
454 JTRACE("Creating dead socket.") (_fds[0]) (_fds.size());
455 Util::dupFds(_makeDeadSocket(), _fds);
456 break;
457
458 case TCP_ERROR:
459 // Disconnected socket. Need to refill the drained data
460 {
461 const vector<char>& buffer =
462 KernelBufferDrainer::instance().getDrainedData(_id);
463 Util::dupFds(_makeDeadSocket(&buffer[0], buffer.size()), _fds);
464 }
465 break;
466
467 case TCP_CREATED:
468 case TCP_BIND:
469 case TCP_LISTEN:
470 // Sometimes _sockType contains SOCK_CLOEXEC/SOCK_NONBLOCK flags.
471 JWARNING((_sockDomain == AF_INET || _sockDomain == AF_UNIX ||
472 _sockDomain == AF_INET6) && (_sockType & 077) == SOCK_STREAM)
473 (id()) (_sockDomain) (_sockType) (_sockProtocol)
474 .Text("Socket type not yet [fully] supported.");
475
476 if (really_verbose) {
477 JTRACE("Restoring socket.") (id()) (_fds[0]);
478 }
479
480 fd = _real_socket(_sockDomain, _sockType, _sockProtocol);
481 JASSERT(fd != -1) (JASSERT_ERRNO);
482 Util::dupFds(fd, _fds);
483
484 if (_type == TCP_CREATED) break;
485
486 if (_sockDomain == AF_UNIX &&
487 _bindAddrlen > sizeof(_bindAddr.ss_family)) {
488 struct sockaddr_un *uaddr = (sockaddr_un*) &_bindAddr;
489 if (uaddr->sun_path[0] != '\0') {
490 JTRACE("Unlinking stale unix domain socket.") (uaddr->sun_path);
491 JWARNING(unlink(uaddr->sun_path) == 0) (uaddr->sun_path);
492 }
493 }
494
495 /*
496 * During restart, some socket options must be restored(using
497 * setsockopt) before the socket is used(bind etc.), otherwise we might
498 * not be able to restore them at all. One such option is set in the
499 * following way for IPV6 family:
500 * setsockopt(sd, IPPROTO_IPV6, IPV6_V6ONLY,...)
501 * This fix works for now. A better approach would be to restore the
502 * socket options in the order in which they are set by the user
503 * program. This fix solves a bug that caused Open MPI to fail to
504 * restart under DMTCP.
505 * --Kapil
506 */
507
508 if (_sockDomain == AF_INET6) {
509 JTRACE("Restoring some socket options before binding.");
510 typedef map< int64_t,
511 map< int64_t, jalib::JBuffer > >::iterator levelIterator;
512 typedef map< int64_t, jalib::JBuffer >::iterator optionIterator;
513
514 for (levelIterator lvl = _sockOptions.begin();
515 lvl!=_sockOptions.end(); ++lvl) {
516 if (lvl->first == IPPROTO_IPV6) {
517 for (optionIterator opt = lvl->second.begin();
518 opt!=lvl->second.end(); ++opt) {
519 if (opt->first == IPV6_V6ONLY) {
520 if (really_verbose) {
521 JTRACE("Restoring socket option.")
522 (_fds[0]) (opt->first) (opt->second.size());
523 }
524 int ret = _real_setsockopt(_fds[0], lvl->first, opt->first,
525 opt->second.buffer(),
526 opt->second.size());
527 JASSERT(ret == 0) (JASSERT_ERRNO) (_fds[0]) (lvl->first)
528 (opt->first) (opt->second.buffer()) (opt->second.size())
529 .Text("Restoring setsockopt failed.");
530 }
531 }
532 }
533 }
534 }
535
536 if (really_verbose) {
537 JTRACE("Binding socket.") (id());
538 }
539 errno = 0;
540 JWARNING(_real_bind(_fds[0], (sockaddr*) &_bindAddr,_bindAddrlen) == 0)
541 (JASSERT_ERRNO) (id()) .Text("Bind failed.");
542 if (_type == TCP_BIND) break;
543
544 if (really_verbose) {
545 JTRACE("Listening socket.") (id());
546 }
547 errno = 0;
548 JWARNING(_real_listen(_fds[0], _listenBacklog) == 0)
549 (JASSERT_ERRNO) (id()) (_listenBacklog) .Text("listen failed.");
550 if (_type == TCP_LISTEN) break;
551
552 break;
553
554 case TCP_ACCEPT:
555 JASSERT(!_remotePeerId.isNull()) (id()) (_remotePeerId) (_fds[0])
556 .Text("Can't restore a TCP_ACCEPT socket with null acceptRemoteId.\n"
557 " Perhaps handshake went wrong?");
558
559 JTRACE("registerIncoming") (id()) (_remotePeerId) (_fds[0]);
560 ConnectionRewirer::instance().registerIncoming(id(), this, _sockDomain);
561 break;
562
563 case TCP_CONNECT:
564 #ifdef ENABLE_IP6_SUPPORT
565 fd = _real_socket(_sockDomain, _sockType, _sockProtocol);
566 #else
567 fd = _real_socket(_sockDomain == AF_INET6 ? AF_INET : _sockDomain,
568 _sockType, _sockProtocol);
569 #endif
570 JASSERT(fd != -1) (JASSERT_ERRNO);
571 Util::dupFds(fd, _fds);
572 if (_bindAddrlen != 0) {
573 JWARNING(_real_bind(_fds[0], (sockaddr*)&_bindAddr, _bindAddrlen) != -1)
574 (JASSERT_ERRNO);
575 }
576 JTRACE("registerOutgoing") (id()) (_remotePeerId) (_fds[0]);
577 ConnectionRewirer::instance().registerOutgoing(_remotePeerId, this);
578 break;
579 // case TCP_EXTERNAL_CONNECT:
580 // int sockFd = _real_socket(_sockDomain, _sockType, _sockProtocol);
581 // JASSERT(sockFd >= 0);
582 // JASSERT(_real_dup2(sockFd, _fds[0]) == _fds[0]);
583 // JWARNING(0 == _real_connect(sockFd,(sockaddr*) &_connectAddr,
584 // _connectAddrlen))
585 // (_fds[0]) (JASSERT_ERRNO)
586 // .Text("Unable to connect to external process");
587 // break;
588 }
589 }
590
591 void TcpConnection::sendHandshake(int remotefd,
592 const ConnectionIdentifier& coordId)
593 {
594 jalib::JSocket remote(remotefd);
595 ConnMsg msg(ConnMsg::HANDSHAKE);
596 msg.from = id();
597 msg.coordId = coordId;
598 remote << msg;
599 }
600
601 void TcpConnection::recvHandshake(int remotefd,
602 const ConnectionIdentifier& coordId)
603 {
604 jalib::JSocket remote(remotefd);
605 ConnMsg msg;
606 msg.poison();
607 remote >> msg;
608
609 msg.assertValid(ConnMsg::HANDSHAKE);
610 JASSERT(msg.coordId == coordId) (msg.coordId) (coordId)
611 .Text("Peer has a different dmtcp_coordinator than us!\n"
612 " It must be the same.");
613
614 if (_remotePeerId.isNull()) {
615 //first time
616 _remotePeerId = msg.from;
617 JASSERT(!_remotePeerId.isNull())
618 .Text("Read handshake with invalid 'from' field.");
619 } else {
620 //next time
621 JASSERT(_remotePeerId == msg.from)
622 (_remotePeerId) (msg.from)
623 .Text("Read handshake with a different 'from' field"
624 " than a previous handshake.");
625 }
626 }
627
628 void TcpConnection::serializeSubClass(jalib::JBinarySerializer& o)
629 {
630 JSERIALIZE_ASSERT_POINT("TcpConnection");
631 o & _listenBacklog & _bindAddrlen & _bindAddr & _remotePeerId;
632 SocketConnection::serialize(o);
633 }
634
635 /*****************************************************************************
636 * RawSocket Connection
637 *****************************************************************************/
638 /*onSocket*/
639 RawSocketConnection::RawSocketConnection(int domain, int type, int protocol)
640 : Connection(RAW)
641 , SocketConnection(domain, type, protocol)
642 {
643 JASSERT(type == -1 ||(type & SOCK_RAW));
644 JASSERT(domain == -1 || domain == AF_NETLINK) (domain)
645 .Text("Only Netlink raw socket supported");
646 JTRACE("Creating Raw socket.") (id()) (domain) (type) (protocol);
647 }
648
649 void RawSocketConnection::drain()
650 {
651 JASSERT(_fds.size() > 0) (id());
652
653 if ((_fcntlFlags & O_ASYNC) != 0) {
654 if (really_verbose) {
655 JTRACE("Removing O_ASYNC flag during checkpoint.") (_fds[0]) (id());
656 }
657 errno = 0;
658 JASSERT(fcntl(_fds[0], F_SETFL, _fcntlFlags & ~O_ASYNC) == 0)
659 (JASSERT_ERRNO) (_fds[0]) (id());
660 }
661 }
662
663 void RawSocketConnection::refill(bool isRestart)
664 {
665 if ((_fcntlFlags & O_ASYNC) != 0) {
666 JTRACE("Re-adding O_ASYNC flag.") (_fds[0]) (id());
667 restoreSocketOptions(_fds);
668 } else if (isRestart) {
669 restoreSocketOptions(_fds);
670 }
671 }
672
673 void RawSocketConnection::postRestart()
674 {
675 JASSERT(_fds.size() > 0);
676 if (really_verbose) {
677 JTRACE("Restoring socket.") (id()) (_fds[0]);
678 }
679
680 int sockfd = _real_socket(_sockDomain,_sockType,_sockProtocol);
681 JASSERT(sockfd != -1);
682 Util::dupFds(sockfd, _fds);
683 }
684
685 void RawSocketConnection::serializeSubClass(jalib::JBinarySerializer& o)
686 {
687 JSERIALIZE_ASSERT_POINT("RawSocketConnection");
688 SocketConnection::serialize(o);
689 }