root/plugin/ipc/ssh/sshdrainer.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. onConnect
  2. onData
  3. onDisconnect
  4. onTimeoutInterval
  5. beginDrainOf
  6. refill

   1 #include "ipc.h"
   2 #include "sshdrainer.h"
   3 #include "util.h"
   4 #include "../jalib/jassert.h"
   5 #include "../jalib/jbuffer.h"
   6 
   7 #define SOCKET_DRAIN_MAGIC_COOKIE_STR "[dmtcp{v0<DRAIN!"
   8 
   9 using namespace dmtcp;
  10 
  11 const char theMagicDrainCookie[] = SOCKET_DRAIN_MAGIC_COOKIE_STR;
  12 
  13 void SSHDrainer::onConnect(const jalib::JSocket& sock,
  14                            const struct sockaddr*
  15                            remoteAddr,socklen_t remoteLen)
  16 {
  17   JASSERT(false) .Text("Not Implemented!");
  18 }
  19 
  20 void SSHDrainer::onData(jalib::JReaderInterface* sock)
  21 {
  22   vector<char>& buffer = _drainedData[sock->socket().sockfd() ];
  23   buffer.resize(buffer.size() + sock->bytesRead());
  24   int startIdx = buffer.size() - sock->bytesRead();
  25   memcpy(&buffer[startIdx],sock->buffer(),sock->bytesRead());
  26   //     JTRACE("got buffer chunk") (sock->bytesRead());
  27   sock->reset();
  28 }
  29 
  30 void SSHDrainer::onDisconnect(jalib::JReaderInterface* sock)
  31 {
  32   int fd;
  33   errno = 0;
  34   fd = sock->socket().sockfd();
  35   //check if this was on purpose
  36   if (fd < 0) return;
  37   JNOTE("found disconnected socket... marking it dead")
  38     (fd) (JASSERT_ERRNO);
  39   _drainedData.erase(fd);
  40   JASSERT(false) .Text("Not Implemented!");
  41 }
  42 
  43 void SSHDrainer::onTimeoutInterval()
  44 {
  45   int count = 0;
  46   for (size_t i = 0; i < _dataSockets.size();++i)
  47   {
  48     if (_dataSockets[i]->bytesRead() > 0) onData(_dataSockets[i]);
  49     vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd()];
  50     if (buffer.size() >= sizeof(theMagicDrainCookie)
  51         && memcmp(&buffer[buffer.size() - sizeof(theMagicDrainCookie)],
  52                   theMagicDrainCookie,
  53                   sizeof(theMagicDrainCookie)) == 0) {
  54       buffer.resize(buffer.size() - sizeof(theMagicDrainCookie));
  55       JTRACE("buffer drain complete") (_dataSockets[i]->socket().sockfd())
  56         (buffer.size()) ((_dataSockets.size()));
  57       _dataSockets[i]->socket() = -1; //poison socket
  58     } else {
  59       ++count;
  60     }
  61   }
  62 
  63   if (count == 0) {
  64     _listenSockets.clear();
  65   } else {
  66     const static int WARN_INTERVAL_TICKS =
  67       (int) (DRAINER_WARNING_FREQ / DRAINER_CHECK_FREQ + 0.5);
  68     const static float WARN_INTERVAL_SEC =
  69       WARN_INTERVAL_TICKS * DRAINER_CHECK_FREQ;
  70     if (_timeoutCount++ > WARN_INTERVAL_TICKS){
  71       _timeoutCount=0;
  72       for (size_t i = 0; i < _dataSockets.size();++i){
  73         vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd()];
  74         JWARNING(false) (_dataSockets[i]->socket().sockfd())
  75           (buffer.size()) (WARN_INTERVAL_SEC)
  76           .Text("Still draining socket... "
  77                 "perhaps remote host is not running under DMTCP?");
  78       }
  79     }
  80   }
  81 }
  82 
  83 void SSHDrainer::beginDrainOf(int fd, int refillFd)
  84 {
  85   if (refillFd == -1) {
  86     // Just write to the socket
  87     addWrite(new jalib::JChunkWriter(fd, theMagicDrainCookie,
  88                                      sizeof theMagicDrainCookie));
  89   } else {
  90     // Need to relay the read data to the refillFd.
  91     _drainedData[fd]; // create buffer
  92     _refillFd[fd] = refillFd;
  93     addDataSocket(new jalib::JChunkReader(fd,512));
  94   }
  95 }
  96 
  97 void SSHDrainer::refill()
  98 {
  99   JTRACE("refilling socket buffers") (_drainedData.size());
 100 
 101   //write all buffers out
 102   map<int, vector<char> >::iterator i;
 103   for (i = _drainedData.begin(); i != _drainedData.end(); ++i) {
 104     int fd = i->first;
 105     int refillFd = _refillFd[fd];
 106 
 107     int size = i->second.size();
 108     JWARNING(size>=0) (size).Text("a failed drain is in our table???");
 109     if (size<0) size=0;
 110 
 111     Util::writeAll(refillFd, &i->second[0], size);
 112     i->second.clear();
 113   }
 114 }

/* [<][>][^][v][top][bottom][index][help] */