Implement IOCPTransport.close and isClosing

This commit is contained in:
Eugen Wissner 2017-02-18 14:10:54 +01:00
parent e86ff63f91
commit d210a39249

View File

@ -39,6 +39,8 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
private Protocol protocol_; private Protocol protocol_;
private bool closing;
/** /**
* Creates new completion port transport. * Creates new completion port transport.
* *
@ -70,6 +72,24 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
return cast(OverlappedConnectedSocket) socket_; return cast(OverlappedConnectedSocket) socket_;
} }
/**
* Returns $(D_PARAM true) if the transport is closing or closed.
*/
bool isClosing() const pure nothrow @safe @nogc
{
return closing;
}
/**
* Close the transport.
*
* Buffered data will be flushed. No more data will be received.
*/
void close() pure nothrow @safe @nogc
{
closing = true;
}
/** /**
* Write some data to the transport. * Write some data to the transport.
* *
@ -78,22 +98,7 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
*/ */
void write(ubyte[] data) @nogc void write(ubyte[] data) @nogc
{ {
immutable empty = input.length == 0;
input ~= data; input ~= data;
if (empty)
{
SocketState overlapped;
try
{
overlapped = MmapPool.instance.make!SocketState;
socket.beginSend(input[], overlapped);
}
catch (SocketException e)
{
MmapPool.instance.dispose(overlapped);
MmapPool.instance.dispose(e);
}
}
} }
/** /**
@ -132,8 +137,23 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{ {
if (output.length) if (output.length)
{ {
immutable empty = input.length == 0;
protocol.received(output[0 .. $]); protocol.received(output[0 .. $]);
output.clear(); output.clear();
if (empty)
{
SocketState overlapped;
try
{
overlapped = MmapPool.instance.make!SocketState;
socket.beginSend(input[], overlapped);
}
catch (SocketException e)
{
MmapPool.instance.dispose(overlapped);
MmapPool.instance.dispose(e);
}
}
} }
else else
{ {
@ -239,6 +259,20 @@ final class IOCPLoop : Loop
return true; return true;
} }
private void kill(StreamTransport transport,
SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{
transport.socket.shutdown();
defaultAllocator.dispose(transport.socket);
transport.exception = exception;
pendings.enqueue(transport);
}
/** /**
* Does the actual polling. * Does the actual polling.
*/ */
@ -308,19 +342,16 @@ final class IOCPLoop : Loop
} }
if (transport.socket.disconnected) if (transport.socket.disconnected)
{ {
// We want to get one last notification to destroy the watcher // We want to get one last notification to destroy the watcher.
transport.socket.beginReceive(transport.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
transport.socket.shutdown(); kill(transport, exception);
defaultAllocator.dispose(transport.socket);
transport.exception = exception;
pendings.enqueue(transport);
} }
else if (received > 0) else if (received > 0)
{ {
immutable full = transport.output.free == received; immutable full = transport.output.free == received;
transport.output += received; transport.output += received;
// Receive was interrupted because the buffer is full. We have to continue // Receive was interrupted because the buffer is full. We have to continue.
if (full) if (full)
{ {
transport.socket.beginReceive(transport.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
@ -333,17 +364,21 @@ final class IOCPLoop : Loop
assert(transport !is null); assert(transport !is null);
transport.input += transport.socket.endSend(overlapped); transport.input += transport.socket.endSend(overlapped);
if (transport.input.length) if (transport.input.length > 0)
{ {
transport.socket.beginSend(transport.input[], overlapped); transport.socket.beginSend(transport.input[], overlapped);
} }
else else
{ {
transport.socket.beginReceive(transport.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
if (transport.isClosing())
{
kill(transport);
}
} }
break; break;
default: default:
assert(false, "Unknown event"); assert(false, "Unknown event");
} }
} }
} }