root/plugin/ipc/socket/kernelbufferdrainer.cpp

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. scaleSendBuffers
  2. instance
  3. onConnect
  4. onData
  5. onDisconnect
  6. onTimeoutInterval
  7. beginDrainOf
  8. refillAllSockets
  9. getDrainedData

   1 /****************************************************************************
   2  *   Copyright (C) 2006-2010 by Jason Ansel, Kapil Arya, and Gene Cooperman *
   3  *   jansel@csail.mit.edu, kapil@ccs.neu.edu, gene@ccs.neu.edu              *
   4  *                                                                          *
   5  *   This file is part of the dmtcp/src module of DMTCP (DMTCP:dmtcp/src).  *
   6  *                                                                          *
   7  *  DMTCP:dmtcp/src is free software: you can redistribute it and/or        *
   8  *  modify it under the terms of the GNU Lesser General Public License as   *
   9  *  published by the Free Software Foundation, either version 3 of the      *
  10  *  License, or (at your option) any later version.                         *
  11  *                                                                          *
  12  *  DMTCP:dmtcp/src is distributed in the hope that it will be useful,      *
  13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of          *
  14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           *
  15  *  GNU Lesser General Public License for more details.                     *
  16  *                                                                          *
  17  *  You should have received a copy of the GNU Lesser General Public        *
  18  *  License along with DMTCP:dmtcp/src.  If not, see                        *
  19  *  <http://www.gnu.org/licenses/>.                                         *
  20  ****************************************************************************/
  21 
  22 #include "kernelbufferdrainer.h"
  23 #include "connectionlist.h"
  24 #include "socketwrappers.h"
  25 #include "../jalib/jassert.h"
  26 #include "../jalib/jbuffer.h"
  27 #include "util.h"
  28 
  29 #define SOCKET_DRAIN_MAGIC_COOKIE_STR "[dmtcp{v0<DRAIN!"
  30 
  31 using namespace dmtcp;
  32 
  33 const char theMagicDrainCookie[] = SOCKET_DRAIN_MAGIC_COOKIE_STR;
  34 
  35 void scaleSendBuffers(int fd, double factor)
  36 {
  37   int size;
  38   unsigned len = sizeof(size);
  39   JASSERT(getsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&size, &len) == 0);
  40 
  41   // getsockopt returns doubled size. So, if we pass the same value to
  42   // setsockopt, it would double the buffer size.
  43   int newSize = static_cast<int>(size * factor / 2);
  44   len = sizeof(newSize);
  45   JASSERT(_real_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&newSize, len) == 0);
  46 }
  47 
  48 static KernelBufferDrainer *theDrainer = NULL;
  49 KernelBufferDrainer& KernelBufferDrainer::instance()
  50 {
  51   if (theDrainer == NULL) {
  52     theDrainer = new KernelBufferDrainer();
  53   }
  54   return *theDrainer;
  55 }
  56 
  57 void KernelBufferDrainer::onConnect(const jalib::JSocket& sock,
  58                                     const struct sockaddr* remoteAddr,
  59                                     socklen_t remoteLen)
  60 {
  61   JWARNING(false) (sock.sockfd())
  62     .Text("we don't yet support checkpointing non-accepted connections..."
  63           " restore will likely fail.. closing connection");
  64   jalib::JSocket(sock).close();
  65 }
  66 
  67 void KernelBufferDrainer::onData(jalib::JReaderInterface* sock)
  68 {
  69   vector<char>& buffer = _drainedData[sock->socket().sockfd() ];
  70   buffer.resize(buffer.size() + sock->bytesRead());
  71   int startIdx = buffer.size() - sock->bytesRead();
  72   memcpy(&buffer[startIdx],sock->buffer(),sock->bytesRead());
  73 //     JTRACE("got buffer chunk") (sock->bytesRead());
  74   sock->reset();
  75 }
  76 
  77 void KernelBufferDrainer::onDisconnect(jalib::JReaderInterface* sock)
  78 {
  79   int fd;
  80   errno = 0;
  81   fd = sock->socket().sockfd();
  82   //check if this was on purpose
  83   if (fd < 0) return;
  84   JTRACE("found disconnected socket... marking it dead")
  85       (fd) (_reverseLookup[fd]) (JASSERT_ERRNO);
  86   _disconnectedSockets[_reverseLookup[fd]] = _drainedData[fd];
  87   // _drainedData is used to refill socket buffers. Remove the disconnected
  88   // socket from this list. Disconnected sockets are refilled when they are
  89   // recreated by _makeDeadSocket().
  90   _drainedData.erase(fd);
  91 }
  92 
  93 void KernelBufferDrainer::onTimeoutInterval()
  94 {
  95   int count = 0;
  96   for (size_t i = 0; i < _dataSockets.size();++i)
  97   {
  98     if (_dataSockets[i]->bytesRead() > 0) onData(_dataSockets[i]);
  99     vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd() ];
 100     if (buffer.size() >= sizeof(theMagicDrainCookie)
 101         && memcmp(&buffer[buffer.size() - sizeof(theMagicDrainCookie)],
 102                   theMagicDrainCookie,
 103                   sizeof(theMagicDrainCookie)) == 0) {
 104       buffer.resize(buffer.size() - sizeof(theMagicDrainCookie));
 105       JTRACE("buffer drain complete") (_dataSockets[i]->socket().sockfd())
 106         (buffer.size()) ((_dataSockets.size()));
 107       _dataSockets[i]->socket() = -1; //poison socket
 108     } else {
 109       ++count;
 110     }
 111   }
 112 
 113   if (count == 0) {
 114     _listenSockets.clear();
 115   } else {
 116     const static int WARN_INTERVAL_TICKS =
 117       (int) (DRAINER_WARNING_FREQ / DRAINER_CHECK_FREQ + 0.5);
 118     const static float WARN_INTERVAL_SEC =
 119       WARN_INTERVAL_TICKS * DRAINER_CHECK_FREQ;
 120     if (_timeoutCount++ > WARN_INTERVAL_TICKS){
 121       _timeoutCount=0;
 122       for (size_t i = 0; i < _dataSockets.size();++i){
 123         vector<char>& buffer = _drainedData[_dataSockets[i]->socket().sockfd() ];
 124         JWARNING(false) (_dataSockets[i]->socket().sockfd())
 125           (buffer.size()) (WARN_INTERVAL_SEC)
 126           .Text("Still draining socket... "
 127                 "perhaps remote host is not running under DMTCP?");
 128 #ifdef CERN_CMS
 129         JNOTE("\n*** Closing this socket (to database?).  Please use dmtcp \n"
 130               "***  plugins to gracefully handle such sockets, and re-run.\n"
 131               "***  Trying a workaround for now, and hoping it doesn't fail.\n");
 132         _real_close(_dataSockets[i]->socket().sockfd());
 133         //it does it by creating a socket pair and closing one side
 134         int sp[2] = {-1,-1};
 135         JASSERT(_real_socketpair(AF_UNIX, SOCK_STREAM, 0, sp) == 0)
 136           (JASSERT_ERRNO) .Text("socketpair() failed");
 137         JASSERT(sp[0] >= 0 && sp[1] >= 0) (sp[0]) (sp[1])
 138           .Text("socketpair() failed");
 139         _real_close(sp[1]);
 140         JTRACE("created dead socket") (sp[0]);
 141         _real_dup2(sp[0], _dataSockets[i]->socket().sockfd());
 142 #endif
 143 
 144       }
 145     }
 146   }
 147 }
 148 
 149 void KernelBufferDrainer::beginDrainOf(int fd, const ConnectionIdentifier& id)
 150 {
 151 //     JTRACE("will drain socket") (fd);
 152   _drainedData[fd]; // create buffer
 153 // this is the simple way:  jalib::JSocket(fd) << theMagicDrainCookie;
 154   //instead used delayed write in case kernel buffer is full:
 155   addWrite(new jalib::JChunkWriter(fd, theMagicDrainCookie,
 156                                    sizeof theMagicDrainCookie));
 157   //now setup a reader:
 158   addDataSocket(new jalib::JChunkReader(fd,512));
 159 
 160   //insert it in reverse lookup
 161   _reverseLookup[fd]=id;
 162 }
 163 
 164 
 165 void KernelBufferDrainer::refillAllSockets()
 166 {
 167   JTRACE("refilling socket buffers") (_drainedData.size());
 168 
 169   //write all buffers out
 170   map<int, vector<char> >::iterator i;
 171   for (i = _drainedData.begin(); i != _drainedData.end(); ++i) {
 172     int size = i->second.size();
 173     JWARNING(size>=0) (size).Text("a failed drain is in our table???");
 174     if (size<0) size=0;
 175     // Double the send buffer
 176     scaleSendBuffers(i->first, 2);
 177     ConnMsg msg(ConnMsg::REFILL);
 178     msg.extraBytes = size;
 179     jalib::JSocket sock(i->first);
 180     if (size > 0) {
 181       JTRACE("requesting repeat buffer...") (sock.sockfd()) (size);
 182     }
 183     sock << msg;
 184     if (size>0) sock.writeAll(&i->second[0],size);
 185     i->second.clear();
 186   }
 187 
 188 //     JTRACE("repeating our friends buffers...");
 189 
 190   //read all buffers in
 191   for (i = _drainedData.begin(); i != _drainedData.end(); ++i) {
 192     ConnMsg msg;
 193     msg.poison();
 194     jalib::JSocket sock(i->first);
 195     sock >> msg;
 196 
 197     msg.assertValid(ConnMsg::REFILL);
 198     int size = msg.extraBytes;
 199     JTRACE("repeating buffer back to peer") (size);
 200     if (size > 0) {
 201       //echo it back...
 202       jalib::JBuffer tmp(size);
 203       sock.readAll(tmp,size);
 204       sock.writeAll(tmp,size);
 205     }
 206     // Reset the send buffer
 207     scaleSendBuffers(i->first, 0.5);
 208   }
 209 
 210   JTRACE("buffers refilled");
 211 
 212   // Free up the object
 213   delete theDrainer;
 214   theDrainer = NULL;
 215 }
 216 
 217 const vector<char>& KernelBufferDrainer::getDrainedData(ConnectionIdentifier id)
 218 {
 219   JASSERT(_disconnectedSockets.find(id) != _disconnectedSockets.end()) (id);
 220   return _disconnectedSockets[id];
 221 }

/* [<][>][^][v][top][bottom][index][help] */