From d210a39249a6f5e172d0f5c119f2a90b959f735a Mon Sep 17 00:00:00 2001 From: Eugen Wissner Date: Sat, 18 Feb 2017 14:10:54 +0100 Subject: [PATCH] Implement IOCPTransport.close and isClosing --- source/tanya/async/event/iocp.d | 81 +++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 23 deletions(-) diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d index 8dc6f2d..26678e1 100644 --- a/source/tanya/async/event/iocp.d +++ b/source/tanya/async/event/iocp.d @@ -39,6 +39,8 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport private Protocol protocol_; + private bool closing; + /** * Creates new completion port transport. * @@ -70,6 +72,24 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport return cast(OverlappedConnectedSocket) socket_; } + /** + * Returns $(D_PARAM true) if the transport is closing or closed. + */ + bool isClosing() const pure nothrow @safe @nogc + { + return closing; + } + + /** + * Close the transport. + * + * Buffered data will be flushed. No more data will be received. + */ + void close() pure nothrow @safe @nogc + { + closing = true; + } + /** * Write some data to the transport. * @@ -78,22 +98,7 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport */ void write(ubyte[] data) @nogc { - immutable empty = input.length == 0; input ~= data; - if (empty) - { - SocketState overlapped; - try - { - overlapped = MmapPool.instance.make!SocketState; - socket.beginSend(input[], overlapped); - } - catch (SocketException e) - { - MmapPool.instance.dispose(overlapped); - MmapPool.instance.dispose(e); - } - } } /** @@ -132,8 +137,23 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport { if (output.length) { + immutable empty = input.length == 0; protocol.received(output[0 .. $]); output.clear(); + if (empty) + { + SocketState overlapped; + try + { + overlapped = MmapPool.instance.make!SocketState; + socket.beginSend(input[], overlapped); + } + catch (SocketException e) + { + MmapPool.instance.dispose(overlapped); + MmapPool.instance.dispose(e); + } + } } else { @@ -239,6 +259,20 @@ final class IOCPLoop : Loop return true; } + private void kill(StreamTransport transport, + SocketException exception = null) @nogc + in + { + assert(transport !is null); + } + body + { + transport.socket.shutdown(); + defaultAllocator.dispose(transport.socket); + transport.exception = exception; + pendings.enqueue(transport); + } + /** * Does the actual polling. */ @@ -308,19 +342,16 @@ final class IOCPLoop : Loop } if (transport.socket.disconnected) { - // We want to get one last notification to destroy the watcher + // We want to get one last notification to destroy the watcher. transport.socket.beginReceive(transport.output[], overlapped); - transport.socket.shutdown(); - defaultAllocator.dispose(transport.socket); - transport.exception = exception; - pendings.enqueue(transport); + kill(transport, exception); } else if (received > 0) { immutable full = transport.output.free == received; transport.output += received; - // Receive was interrupted because the buffer is full. We have to continue + // Receive was interrupted because the buffer is full. We have to continue. if (full) { transport.socket.beginReceive(transport.output[], overlapped); @@ -333,17 +364,21 @@ final class IOCPLoop : Loop assert(transport !is null); transport.input += transport.socket.endSend(overlapped); - if (transport.input.length) + if (transport.input.length > 0) { transport.socket.beginSend(transport.input[], overlapped); } else { transport.socket.beginReceive(transport.output[], overlapped); + if (transport.isClosing()) + { + kill(transport); + } } break; default: assert(false, "Unknown event"); } } -} +} \ No newline at end of file