/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- dmtcp_SocketConnList_EventHook
- dmtcp_SocketConn_ProcessFdEvent
- instance
- drain
- preCkpt
- postRestart
- registerNSData
- sendQueries
- refill
- scanForPreExisting
- createDummyConnection
1
2 #include <unistd.h>
3 #include <sys/syscall.h>
4
5 #include "util.h"
6 #include "protectedfds.h"
7 #include "jfilesystem.h"
8 #include "socketconnection.h"
9 #include "socketconnlist.h"
10 #include "kernelbufferdrainer.h"
11 #include "connectionrewirer.h"
12
13 using namespace dmtcp;
14 static bool _hasIPv4Sock = false;
15 static bool _hasIPv6Sock = false;
16 static bool _hasUNIXSock = false;
17
18 void dmtcp_SocketConnList_EventHook(DmtcpEvent_t event, DmtcpEventData_t *data)
19 {
20 SocketConnList::instance().eventHook(event, data);
21 }
22
23 void dmtcp_SocketConn_ProcessFdEvent(int event, int arg1, int arg2)
24 {
25 if (event == SYS_close) {
26 SocketConnList::instance().processClose(arg1);
27 } else if (event == SYS_dup) {
28 SocketConnList::instance().processDup(arg1, arg2);
29 } else {
30 JASSERT(false);
31 }
32 }
33
34 static SocketConnList *socketConnList = NULL;
35 SocketConnList& SocketConnList::instance()
36 {
37 if (socketConnList == NULL) {
38 socketConnList = new SocketConnList();
39 }
40 return *socketConnList;
41 }
42
43 void SocketConnList::drain()
44 {
45 // First, let all the Connection prepare for drain
46 ConnectionList::drain();
47
48 //this will block until draining is complete
49 KernelBufferDrainer::instance().monitorSockets(DRAINER_CHECK_FREQ);
50 //handle disconnected sockets
51 const map<ConnectionIdentifier, vector<char> >& discn =
52 KernelBufferDrainer::instance().getDisconnectedSockets();
53 map<ConnectionIdentifier, vector<char> >::const_iterator it;
54 for (it = discn.begin(); it != discn.end(); it++) {
55 const ConnectionIdentifier& id = it->first;
56 TcpConnection *con =
57 (TcpConnection*) SocketConnList::instance().getConnection(id);
58 JTRACE("recreating disconnected socket") (id);
59
60 //reading from the socket, and taking the error, resulted in an
61 //implicit close().
62 //we will create a new, broken socket that is not closed
63 con->onError();
64 }
65 }
66
67 void SocketConnList::preCkpt()
68 {
69 #if HANDSHAKE_ON_CHECKPOINT == 1
70 //handshake is done after one barrier after drain
71 JTRACE("beginning handshakes");
72 DmtcpUniqueProcessId coordId = dmtcp_get_coord_id();
73 //must send first to avoid deadlock
74 //we are relying on OS buffers holding our message without blocking
75 for (iterator i = begin(); i != end(); ++i) {
76 Connection *con = i->second;
77 if (con->hasLock() && con->conType() == Connection::TCP) {
78 ((TcpConnection*)con)->doSendHandshakes(coordId);
79 }
80 }
81
82 //now receive
83 for (iterator i = begin(); i != end(); ++i) {
84 Connection *con = i->second;
85 if (con->hasLock() && con->conType() == Connection::TCP) {
86 ((TcpConnection*)con)->doRecvHandshakes(coordId);
87 }
88 }
89 JTRACE("handshaking done");
90 #endif
91 _hasIPv4Sock = _hasIPv6Sock = _hasUNIXSock = false;
92 // Now check if we have IPv4, IPv6, or UNIX domain sockets to restore.
93 for (iterator i = begin(); i != end(); ++i) {
94 Connection *con = i->second;
95 if (con->hasLock() && con->conType() == Connection::TCP) {
96 int domain = ((TcpConnection*)con)->sockDomain();
97 if (domain == AF_INET) {
98 _hasIPv4Sock = true;
99 } else if (domain == AF_INET6) {
100 _hasIPv6Sock = true;
101 } else if (domain == AF_UNIX) {
102 _hasUNIXSock = true;
103 }
104 }
105 }
106 }
107
108 void SocketConnList::postRestart()
109 {
110 ConnectionRewirer::instance().openRestoreSocket(_hasIPv4Sock, _hasIPv6Sock,
111 _hasUNIXSock);
112 ConnectionList::postRestart();
113 }
114
115 void SocketConnList::registerNSData(bool isRestart)
116 {
117 if (isRestart) {
118 ConnectionRewirer::instance().registerNSData();
119 }
120 ConnectionList::registerNSData(isRestart);
121 }
122
123 void SocketConnList::sendQueries(bool isRestart)
124 {
125 if (isRestart) {
126 ConnectionRewirer::instance().sendQueries();
127 ConnectionRewirer::instance().doReconnect();
128 ConnectionRewirer::destroy();
129 }
130 ConnectionList::sendQueries(isRestart);
131 }
132
133 void SocketConnList::refill(bool isRestart)
134 {
135 KernelBufferDrainer::instance().refillAllSockets();
136 ConnectionList::refill(isRestart);
137 }
138
139 void SocketConnList::scanForPreExisting()
140 {
141 // TODO: This is a hack when SLURM + MPI are used:
142 // when we use command
143 // srun/ibrun dmtcp_launch a.out
144 // inside the SLURM submission script, the MPI launching
145 // process will not run under the control of DMTCP. Instead,
146 // only the computing processes are. The launching process
147 // will create some sockets, and then create the computing
148 // processes. Hence the sockets are shared among the created
149 // processes at the time when dmtcp_launch is launched. DMTCP
150 // will treat these sockets as pre-existing sockets instead of
151 // shared sockets.
152 //
153 // In the future, we should generalize the processing of
154 // pre-existing fds. For example, at checkpoint time, determine
155 // which sockets are shared, regardless of whether they are
156 // pre-existing or not. This can be done by adding an extra round
157 // of leader election.
158
159 if (getenv("SLURM_JOBID") || (getenv("SLURM_JOB_ID"))) {
160 return;
161 }
162
163 // FIXME: Detect stdin/out/err fds to detect duplicates.
164 vector<int> fds = jalib::Filesystem::ListOpenFds();
165 for (size_t i = 0; i < fds.size(); ++i) {
166 int fd = fds[i];
167 if (!Util::isValidFd(fd)) continue;
168 if (dmtcp_is_protected_fd(fd)) continue;
169
170 string device = jalib::Filesystem::GetDeviceName(fd);
171
172 JTRACE("scanning pre-existing device") (fd) (device);
173 if (device == jalib::Filesystem::GetControllingTerm()) {
174 } else if(dmtcp_is_bq_file && dmtcp_is_bq_file(device.c_str())) {
175 } else if( fd <= 2 ){
176 } else if (Util::strStartsWith(device, "/")) {
177 } else {
178 JNOTE("found pre-existing socket... will not be restored")
179 (fd) (device);
180 TcpConnection* con = new TcpConnection(0, 0, 0);
181 con->markPreExisting();
182 add(fd, con);
183 }
184 }
185 }
186
187 Connection *SocketConnList::createDummyConnection(int type)
188 {
189 if (type == Connection::TCP) {
190 return new TcpConnection();
191 } else if (type == Connection::RAW){
192 return new RawSocketConnection();
193 }
194 return NULL;
195 }