KDECore
k3bufferedsocket.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "k3bufferedsocket.h"
00026
00027 #include <config.h>
00028 #include <config-network.h>
00029
00030 #include <QMutex>
00031 #include <QTimer>
00032
00033 #include "k3socketdevice.h"
00034 #include "k3socketaddress.h"
00035 #include "k3socketbuffer_p.h"
00036
00037 using namespace KNetwork;
00038 using namespace KNetwork::Internal;
00039
00040 class KNetwork::KBufferedSocketPrivate
00041 {
00042 public:
00043 mutable KSocketBuffer *input, *output;
00044
00045 KBufferedSocketPrivate()
00046 {
00047 input = 0L;
00048 output = 0L;
00049 }
00050 };
00051
00052 KBufferedSocket::KBufferedSocket(const QString& host, const QString& service,
00053 QObject *parent)
00054 : KStreamSocket(host, service, parent),
00055 d(new KBufferedSocketPrivate)
00056 {
00057 setInputBuffering(true);
00058 setOutputBuffering(true);
00059 }
00060
00061 KBufferedSocket::~KBufferedSocket()
00062 {
00063 closeNow();
00064 delete d->input;
00065 delete d->output;
00066 delete d;
00067 }
00068
00069 void KBufferedSocket::setSocketDevice(KSocketDevice* device)
00070 {
00071 KStreamSocket::setSocketDevice(device);
00072 device->setBlocking(false);
00073 KActiveSocketBase::open(openMode() & ~Unbuffered);
00074 }
00075
00076 bool KBufferedSocket::setSocketOptions(int opts)
00077 {
00078 if (opts == Blocking)
00079 return false;
00080
00081 opts &= ~Blocking;
00082 return KStreamSocket::setSocketOptions(opts);
00083 }
00084
00085 void KBufferedSocket::close()
00086 {
00087 if (!d->output || d->output->isEmpty())
00088 closeNow();
00089 else
00090 {
00091 setState(Closing);
00092 QSocketNotifier *n = socketDevice()->readNotifier();
00093 if (n)
00094 n->setEnabled(false);
00095 emit stateChanged(Closing);
00096 }
00097 }
00098
00099 qint64 KBufferedSocket::bytesAvailable() const
00100 {
00101 if (!d->input)
00102 return KStreamSocket::bytesAvailable();
00103
00104 return d->input->length();
00105 }
00106
00107 qint64 KBufferedSocket::waitForMore(int msecs, bool *timeout)
00108 {
00109 qint64 retval = KStreamSocket::waitForMore(msecs, timeout);
00110 if (d->input)
00111 {
00112 resetError();
00113 slotReadActivity();
00114 return bytesAvailable();
00115 }
00116 return retval;
00117 }
00118
00119 qint64 KBufferedSocket::readData(char *data, qint64 maxlen, KSocketAddress* from)
00120 {
00121 if (from)
00122 *from = peerAddress();
00123 if (d->input)
00124 {
00125 if (d->input->isEmpty())
00126 {
00127 return 0;
00128 }
00129 resetError();
00130 return d->input->consumeBuffer(data, maxlen);
00131 }
00132 return KStreamSocket::readData(data, maxlen, 0L);
00133 }
00134
00135 qint64 KBufferedSocket::peekData(char *data, qint64 maxlen, KSocketAddress* from)
00136 {
00137 if (from)
00138 *from = peerAddress();
00139 if (d->input)
00140 {
00141 if (d->input->isEmpty())
00142 {
00143 return 0;
00144 }
00145 resetError();
00146 return d->input->consumeBuffer(data, maxlen, false);
00147 }
00148 return KStreamSocket::peekData(data, maxlen, 0L);
00149 }
00150
00151 qint64 KBufferedSocket::writeData(const char *data, qint64 len,
00152 const KSocketAddress*)
00153 {
00154
00155 if (state() != Connected)
00156 {
00157
00158 setError(NotConnected);
00159 return -1;
00160 }
00161
00162 if (d->output)
00163 {
00164 if (d->output->isFull())
00165 {
00166 setError(WouldBlock);
00167 emit gotError(WouldBlock);
00168 return -1;
00169 }
00170 resetError();
00171
00172
00173 QSocketNotifier *n = socketDevice()->writeNotifier();
00174 if (n)
00175 n->setEnabled(true);
00176
00177 return d->output->feedBuffer(data, len);
00178 }
00179
00180 return KStreamSocket::writeData(data, len, 0L);
00181 }
00182
00183 void KBufferedSocket::enableRead(bool enable)
00184 {
00185 KStreamSocket::enableRead(enable);
00186 if (!enable && d->input)
00187 {
00188
00189 QSocketNotifier *n = socketDevice()->readNotifier();
00190 if (n)
00191 n->setEnabled(true);
00192 }
00193
00194 if (enable && state() != Connected && d->input && !d->input->isEmpty())
00195
00196
00197 QTimer::singleShot(0, this, SLOT(slotReadActivity()));
00198 }
00199
00200 void KBufferedSocket::enableWrite(bool enable)
00201 {
00202 KStreamSocket::enableWrite(enable);
00203 if (!enable && d->output && !d->output->isEmpty())
00204 {
00205
00206 QSocketNotifier *n = socketDevice()->writeNotifier();
00207 if (n)
00208 n->setEnabled(true);
00209 }
00210 }
00211
00212 void KBufferedSocket::stateChanging(SocketState newState)
00213 {
00214 if (newState == Connecting || newState == Connected)
00215 {
00216
00217
00218 if (d->input)
00219 d->input->clear();
00220 if (d->output)
00221 d->output->clear();
00222
00223
00224 enableRead(emitsReadyRead());
00225 enableWrite(emitsReadyWrite());
00226 }
00227 KStreamSocket::stateChanging(newState);
00228 }
00229
00230 void KBufferedSocket::setInputBuffering(bool enable)
00231 {
00232 QMutexLocker locker(mutex());
00233 if (!enable)
00234 {
00235 delete d->input;
00236 d->input = 0L;
00237 }
00238 else if (d->input == 0L)
00239 {
00240 d->input = new KSocketBuffer;
00241 }
00242 }
00243
00244 void KBufferedSocket::setOutputBuffering(bool enable)
00245 {
00246 QMutexLocker locker(mutex());
00247 if (!enable)
00248 {
00249 delete d->output;
00250 d->output = 0L;
00251 }
00252 else if (d->output == 0L)
00253 {
00254 d->output = new KSocketBuffer;
00255 }
00256 }
00257
00258 qint64 KBufferedSocket::bytesToWrite() const
00259 {
00260 if (!d->output)
00261 return 0;
00262
00263 return d->output->length();
00264 }
00265
00266 void KBufferedSocket::closeNow()
00267 {
00268 KStreamSocket::close();
00269 if (d->output)
00270 d->output->clear();
00271 }
00272
00273 bool KBufferedSocket::canReadLine() const
00274 {
00275 if (!d->input)
00276 return false;
00277
00278 return d->input->canReadLine();
00279 }
00280
00281 qint64 KBufferedSocket::readLineData(char* data, qint64 maxSize)
00282 {
00283 return d->input->readLine(data, maxSize);
00284 }
00285
00286 void KBufferedSocket::waitForConnect()
00287 {
00288 if (state() != Connecting)
00289 return;
00290
00291 KStreamSocket::setSocketOptions(socketOptions() | Blocking);
00292 connectionEvent();
00293 KStreamSocket::setSocketOptions(socketOptions() & ~Blocking);
00294 }
00295
00296 void KBufferedSocket::slotReadActivity()
00297 {
00298 if (d->input && state() == Connected)
00299 {
00300 mutex()->lock();
00301 qint64 len = d->input->receiveFrom(socketDevice());
00302
00303 if (len == -1)
00304 {
00305 if (socketDevice()->error() != WouldBlock)
00306 {
00307
00308 copyError();
00309 mutex()->unlock();
00310 emit gotError(error());
00311 closeNow();
00312 return;
00313 }
00314 }
00315 else if (len == 0)
00316 {
00317
00318 setError(RemotelyDisconnected);
00319 mutex()->unlock();
00320 emit gotError(error());
00321 closeNow();
00322 return;
00323 }
00324
00325
00326 mutex()->unlock();
00327 }
00328
00329 if (state() == Connected)
00330 KStreamSocket::slotReadActivity();
00331 else if (emitsReadyRead())
00332 {
00333 if (d->input && !d->input->isEmpty())
00334 {
00335
00336
00337 QTimer::singleShot(0, this, SLOT(slotReadActivity()));
00338 emit readyRead();
00339 }
00340 }
00341 }
00342
00343 void KBufferedSocket::slotWriteActivity()
00344 {
00345 if (d->output && !d->output->isEmpty() &&
00346 (state() == Connected || state() == Closing))
00347 {
00348 mutex()->lock();
00349 qint64 len = d->output->sendTo(socketDevice());
00350
00351 if (len == -1)
00352 {
00353 if (socketDevice()->error() != WouldBlock)
00354 {
00355
00356 copyError();
00357 mutex()->unlock();
00358 emit gotError(error());
00359 closeNow();
00360 return;
00361 }
00362 }
00363 else if (len == 0)
00364 {
00365
00366 setError(RemotelyDisconnected);
00367 mutex()->unlock();
00368 emit gotError(error());
00369 closeNow();
00370 return;
00371 }
00372
00373 if (d->output->isEmpty())
00374
00375
00376 socketDevice()->writeNotifier()->setEnabled(false);
00377
00378 mutex()->unlock();
00379 emit bytesWritten(len);
00380 }
00381
00382 if (state() != Closing)
00383 KStreamSocket::slotWriteActivity();
00384 else if (d->output && d->output->isEmpty() && state() == Closing)
00385 {
00386 KStreamSocket::close();
00387 }
00388 }
00389
00390 #include "k3bufferedsocket.moc"