/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- scaleSendBuffers
- instance
- onConnect
- onData
- onDisconnect
- onTimeoutInterval
- beginDrainOf
- refillAllSockets
- getDrainedData
1 /****************************************************************************
2 * Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
3 * jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu *
4 * *
5 * This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src). *
6 * *
7 * DMTCP:dmtcp/src is free software: you can redistribute it and/or *
8 * modify it under the terms of the GNU Lesser General Public License as *
9 * published by the Free Software Foundation, either version 3 of the *
10 * License, or (at your option) any later version. *
11 * *
12 * DMTCP:dmtcp/src is distributed in the hope that it will be useful, *
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15 * GNU Lesser General Public License for more details. *
16 * *
17 * You should have received a copy of the GNU Lesser General Public *
18 * License along with DMTCP:dmtcp/src. If not, see *
19 * <http://www.gnu.org/licenses/>. *
20 ****************************************************************************/
21
22 #include "kernelbufferdrainer.h"
23 #include "connectionlist.h"
24 #include "socketwrappers.h"
25 #include "../jalib/jassert.h"
26 #include "../jalib/jbuffer.h"
27 #include "util.h"
28
29 #define SOCKET_DRAIN_MAGIC_COOKIE_STR "[dmtcp{v0<DRAIN!"
30
31 using namespace dmtcp;
32
33 const char theMagicDrainCookie[] = SOCKET_DRAIN_MAGIC_COOKIE_STR;
34
35 void scaleSendBuffers(int fd, double factor)
36 {
37 int size;
38 unsigned len = sizeof(size);
39 JASSERT(getsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&size, &len) == 0);
40
41 // getsockopt returns doubled size. So, if we pass the same value to
42 // setsockopt, it would double the buffer size.
43 int newSize = static_cast<int>(size * factor / 2);
44 len = sizeof(newSize);
45 JASSERT(_real_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&newSize, len) == 0);
46 }
47
48 static KernelBufferDrainer *theDrainer = NULL;
49 KernelBufferDrainer& KernelBufferDrainer::instance()
50 {
51 if (theDrainer == NULL) {
52 theDrainer = new KernelBufferDrainer();
53 }
54 return *theDrainer;
55 }
56
57 void KernelBufferDrainer::onConnect(const jalib::JSocket& sock,
58 const struct sockaddr* remoteAddr,
59 socklen_t remoteLen)
60 {
61 JWARNING(false) (sock.sockfd())
62 .Text("we don't yet support checkpointing non-accepted connections..."
63 " restore will likely fail.. closing connection");
64 jalib::JSocket(sock).close();
65 }
66
67 void KernelBufferDrainer::onData(jalib::JReaderInterface* sock)
68 {
69 vector<char>& buffer = _drainedData[sock->socket().sockfd() ];
70 buffer.resize(buffer.size() + sock->bytesRead());
71 int startIdx = buffer.size() - sock->bytesRead();
72 memcpy(&buffer[startIdx],sock->buffer(),sock->bytesRead());
73 // JTRACE("got buffer chunk") (sock->bytesRead());
74 sock->reset();
75 }
76
77 void KernelBufferDrainer::onDisconnect(jalib::JReaderInterface* sock)
78 {
79 int fd;
80 errno = 0;
81 fd = sock->socket().sockfd();
82 //check if this was on purpose
83 if (fd < 0) return;
84 JTRACE("found disconnected socket... marking it dead")
85 (fd) (_reverseLookup[fd]) (JASSERT_ERRNO);
86 _disconnectedSockets[_reverseLookup[fd]] = _drainedData[fd];
87 // _drainedData is used to refill socket buffers. Remove the disconnected
88 // socket from this list. Disconnected sockets are refilled when they are
89 // recreated by _makeDeadSocket().
90 _drainedData.erase(fd);
91 }
92
93 void KernelBufferDrainer::onTimeoutInterval()
94 {
95 int count = 0;
96 for (size_t i = 0; i < _dataSockets.size();++i)
97 {
98 if (_dataSockets[i]->bytesRead() > 0) onData(_dataSockets[i]);
99 vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd() ];
100 if (buffer.size() >= sizeof(theMagicDrainCookie)
101 && memcmp(&buffer[buffer.size() - sizeof(theMagicDrainCookie)],
102 theMagicDrainCookie,
103 sizeof(theMagicDrainCookie)) == 0) {
104 buffer.resize(buffer.size() - sizeof(theMagicDrainCookie));
105 JTRACE("buffer drain complete") (_dataSockets[i]->socket().sockfd())
106 (buffer.size()) ((_dataSockets.size()));
107 _dataSockets[i]->socket() = -1; //poison socket
108 } else {
109 ++count;
110 }
111 }
112
113 if (count == 0) {
114 _listenSockets.clear();
115 } else {
116 const static int WARN_INTERVAL_TICKS =
117 (int) (DRAINER_WARNING_FREQ / DRAINER_CHECK_FREQ + 0.5);
118 const static float WARN_INTERVAL_SEC =
119 WARN_INTERVAL_TICKS * DRAINER_CHECK_FREQ;
120 if (_timeoutCount++ > WARN_INTERVAL_TICKS){
121 _timeoutCount=0;
122 for (size_t i = 0; i < _dataSockets.size();++i){
123 vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd() ];
124 JWARNING(false) (_dataSockets[i]->socket().sockfd())
125 (buffer.size()) (WARN_INTERVAL_SEC)
126 .Text("Still draining socket... "
127 "perhaps remote host is not running under DMTCP?");
128 #ifdef CERN_CMS
129 JNOTE("\n*** Closing this socket (to database?). Please use dmtcp \n"
130 "*** plugins to gracefully handle such sockets, and re-run.\n"
131 "*** Trying a workaround for now, and hoping it doesn't fail.\n");
132 _real_close(_dataSockets[i]->socket().sockfd());
133 //it does it by creating a socket pair and closing one side
134 int sp[2] = {-1,-1};
135 JASSERT(_real_socketpair(AF_UNIX, SOCK_STREAM, 0, sp) == 0)
136 (JASSERT_ERRNO) .Text("socketpair() failed");
137 JASSERT(sp[0] >= 0 && sp[1] >= 0) (sp[0]) (sp[1])
138 .Text("socketpair() failed");
139 _real_close(sp[1]);
140 JTRACE("created dead socket") (sp[0]);
141 _real_dup2(sp[0], _dataSockets[i]->socket().sockfd());
142 #endif
143
144 }
145 }
146 }
147 }
148
149 void KernelBufferDrainer::beginDrainOf(int fd, const ConnectionIdentifier& id)
150 {
151 // JTRACE("will drain socket") (fd);
152 _drainedData[fd]; // create buffer
153 // this is the simple way: jalib::JSocket(fd) << theMagicDrainCookie;
154 //instead used delayed write in case kernel buffer is full:
155 addWrite(new jalib::JChunkWriter(fd, theMagicDrainCookie,
156 sizeof theMagicDrainCookie));
157 //now setup a reader:
158 addDataSocket(new jalib::JChunkReader(fd,512));
159
160 //insert it in reverse lookup
161 _reverseLookup[fd]=id;
162 }
163
164
165 void KernelBufferDrainer::refillAllSockets()
166 {
167 JTRACE("refilling socket buffers") (_drainedData.size());
168
169 //write all buffers out
170 map<int, vector<char> >::iterator i;
171 for (i = _drainedData.begin(); i != _drainedData.end(); ++i) {
172 int size = i->second.size();
173 JWARNING(size>=0) (size).Text("a failed drain is in our table???");
174 if (size<0) size=0;
175 // Double the send buffer
176 scaleSendBuffers(i->first, 2);
177 ConnMsg msg(ConnMsg::REFILL);
178 msg.extraBytes = size;
179 jalib::JSocket sock(i->first);
180 if (size > 0) {
181 JTRACE("requesting repeat buffer...") (sock.sockfd()) (size);
182 }
183 sock << msg;
184 if (size>0) sock.writeAll(&i->second[0],size);
185 i->second.clear();
186 }
187
188 // JTRACE("repeating our friends buffers...");
189
190 //read all buffers in
191 for (i = _drainedData.begin(); i != _drainedData.end(); ++i) {
192 ConnMsg msg;
193 msg.poison();
194 jalib::JSocket sock(i->first);
195 sock >> msg;
196
197 msg.assertValid(ConnMsg::REFILL);
198 int size = msg.extraBytes;
199 JTRACE("repeating buffer back to peer") (size);
200 if (size > 0) {
201 //echo it back...
202 jalib::JBuffer tmp(size);
203 sock.readAll(tmp,size);
204 sock.writeAll(tmp,size);
205 }
206 // Reset the send buffer
207 scaleSendBuffers(i->first, 0.5);
208 }
209
210 JTRACE("buffers refilled");
211
212 // Free up the object
213 delete theDrainer;
214 theDrainer = NULL;
215 }
216
217 const vector<char>& KernelBufferDrainer::getDrainedData(ConnectionIdentifier id)
218 {
219 JASSERT(_disconnectedSockets.find(id) != _disconnectedSockets.end()) (id);
220 return _disconnectedSockets[id];
221 }