/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- onConnect
- onData
- onDisconnect
- onTimeoutInterval
- beginDrainOf
- refill
1 #include "ipc.h"
2 #include "sshdrainer.h"
3 #include "util.h"
4 #include "../jalib/jassert.h"
5 #include "../jalib/jbuffer.h"
6
7 #define SOCKET_DRAIN_MAGIC_COOKIE_STR "[dmtcp{v0<DRAIN!"
8
9 using namespace dmtcp;
10
11 const char theMagicDrainCookie[] = SOCKET_DRAIN_MAGIC_COOKIE_STR;
12
13 void SSHDrainer::onConnect(const jalib::JSocket& sock,
14 const struct sockaddr*
15 remoteAddr,socklen_t remoteLen)
16 {
17 JASSERT(false) .Text("Not Implemented!");
18 }
19
20 void SSHDrainer::onData(jalib::JReaderInterface* sock)
21 {
22 vector<char>& buffer = _drainedData[sock->socket().sockfd() ];
23 buffer.resize(buffer.size() + sock->bytesRead());
24 int startIdx = buffer.size() - sock->bytesRead();
25 memcpy(&buffer[startIdx],sock->buffer(),sock->bytesRead());
26 // JTRACE("got buffer chunk") (sock->bytesRead());
27 sock->reset();
28 }
29
30 void SSHDrainer::onDisconnect(jalib::JReaderInterface* sock)
31 {
32 int fd;
33 errno = 0;
34 fd = sock->socket().sockfd();
35 //check if this was on purpose
36 if (fd < 0) return;
37 JNOTE("found disconnected socket... marking it dead")
38 (fd) (JASSERT_ERRNO);
39 _drainedData.erase(fd);
40 JASSERT(false) .Text("Not Implemented!");
41 }
42
43 void SSHDrainer::onTimeoutInterval()
44 {
45 int count = 0;
46 for (size_t i = 0; i < _dataSockets.size();++i)
47 {
48 if (_dataSockets[i]->bytesRead() > 0) onData(_dataSockets[i]);
49 vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd()];
50 if (buffer.size() >= sizeof(theMagicDrainCookie)
51 && memcmp(&buffer[buffer.size() - sizeof(theMagicDrainCookie)],
52 theMagicDrainCookie,
53 sizeof(theMagicDrainCookie)) == 0) {
54 buffer.resize(buffer.size() - sizeof(theMagicDrainCookie));
55 JTRACE("buffer drain complete") (_dataSockets[i]->socket().sockfd())
56 (buffer.size()) ((_dataSockets.size()));
57 _dataSockets[i]->socket() = -1; //poison socket
58 } else {
59 ++count;
60 }
61 }
62
63 if (count == 0) {
64 _listenSockets.clear();
65 } else {
66 const static int WARN_INTERVAL_TICKS =
67 (int) (DRAINER_WARNING_FREQ / DRAINER_CHECK_FREQ + 0.5);
68 const static float WARN_INTERVAL_SEC =
69 WARN_INTERVAL_TICKS * DRAINER_CHECK_FREQ;
70 if (_timeoutCount++ > WARN_INTERVAL_TICKS){
71 _timeoutCount=0;
72 for (size_t i = 0; i < _dataSockets.size();++i){
73 vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd()];
74 JWARNING(false) (_dataSockets[i]->socket().sockfd())
75 (buffer.size()) (WARN_INTERVAL_SEC)
76 .Text("Still draining socket... "
77 "perhaps remote host is not running under DMTCP?");
78 }
79 }
80 }
81 }
82
83 void SSHDrainer::beginDrainOf(int fd, int refillFd)
84 {
85 if (refillFd == -1) {
86 // Just write to the socket
87 addWrite(new jalib::JChunkWriter(fd, theMagicDrainCookie,
88 sizeof theMagicDrainCookie));
89 } else {
90 // Need to relay the read data to the refillFd.
91 _drainedData[fd]; // create buffer
92 _refillFd[fd] = refillFd;
93 addDataSocket(new jalib::JChunkReader(fd,512));
94 }
95 }
96
97 void SSHDrainer::refill()
98 {
99 JTRACE("refilling socket buffers") (_drainedData.size());
100
101 //write all buffers out
102 map<int, vector<char> >::iterator i;
103 for (i = _drainedData.begin(); i != _drainedData.end(); ++i) {
104 int fd = i->first;
105 int refillFd = _refillFd[fd];
106
107 int size = i->second.size();
108 JWARNING(size>=0) (size).Text("a failed drain is in our table???");
109 if (size<0) size=0;
110
111 Util::writeAll(refillFd, &i->second[0], size);
112 i->second.clear();
113 }
114 }