summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--source/tanya/async/event/iocp.d81
1 files changed, 58 insertions, 23 deletions
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d
index 8dc6f2d..26678e1 100644
--- a/source/tanya/async/event/iocp.d
+++ b/source/tanya/async/event/iocp.d
@@ -39,6 +39,8 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
private Protocol protocol_;
+ private bool closing;
+
/**
* Creates new completion port transport.
*
@@ -71,6 +73,24 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
}
/**
+ * Returns $(D_PARAM true) if the transport is closing or closed.
+ */
+ bool isClosing() const pure nothrow @safe @nogc
+ {
+ return closing;
+ }
+
+ /**
+ * Close the transport.
+ *
+ * Buffered data will be flushed. No more data will be received.
+ */
+ void close() pure nothrow @safe @nogc
+ {
+ closing = true;
+ }
+
+ /**
* Write some data to the transport.
*
* Params:
@@ -78,22 +98,7 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
*/
void write(ubyte[] data) @nogc
{
- immutable empty = input.length == 0;
input ~= data;
- if (empty)
- {
- SocketState overlapped;
- try
- {
- overlapped = MmapPool.instance.make!SocketState;
- socket.beginSend(input[], overlapped);
- }
- catch (SocketException e)
- {
- MmapPool.instance.dispose(overlapped);
- MmapPool.instance.dispose(e);
- }
- }
}
/**
@@ -132,8 +137,23 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{
if (output.length)
{
+ immutable empty = input.length == 0;
protocol.received(output[0 .. $]);
output.clear();
+ if (empty)
+ {
+ SocketState overlapped;
+ try
+ {
+ overlapped = MmapPool.instance.make!SocketState;
+ socket.beginSend(input[], overlapped);
+ }
+ catch (SocketException e)
+ {
+ MmapPool.instance.dispose(overlapped);
+ MmapPool.instance.dispose(e);
+ }
+ }
}
else
{
@@ -239,6 +259,20 @@ final class IOCPLoop : Loop
return true;
}
+ private void kill(StreamTransport transport,
+ SocketException exception = null) @nogc
+ in
+ {
+ assert(transport !is null);
+ }
+ body
+ {
+ transport.socket.shutdown();
+ defaultAllocator.dispose(transport.socket);
+ transport.exception = exception;
+ pendings.enqueue(transport);
+ }
+
/**
* Does the actual polling.
*/
@@ -308,19 +342,16 @@ final class IOCPLoop : Loop
}
if (transport.socket.disconnected)
{
- // We want to get one last notification to destroy the watcher
+ // We want to get one last notification to destroy the watcher.
transport.socket.beginReceive(transport.output[], overlapped);
- transport.socket.shutdown();
- defaultAllocator.dispose(transport.socket);
- transport.exception = exception;
- pendings.enqueue(transport);
+ kill(transport, exception);
}
else if (received > 0)
{
immutable full = transport.output.free == received;
transport.output += received;
- // Receive was interrupted because the buffer is full. We have to continue
+ // Receive was interrupted because the buffer is full. We have to continue.
if (full)
{
transport.socket.beginReceive(transport.output[], overlapped);
@@ -333,17 +364,21 @@ final class IOCPLoop : Loop
assert(transport !is null);
transport.input += transport.socket.endSend(overlapped);
- if (transport.input.length)
+ if (transport.input.length > 0)
{
transport.socket.beginSend(transport.input[], overlapped);
}
else
{
transport.socket.beginReceive(transport.output[], overlapped);
+ if (transport.isClosing())
+ {
+ kill(transport);
+ }
}
break;
default:
assert(false, "Unknown event");
}
}
-}
+} \ No newline at end of file