diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d index 84296a8..8dc6f2d 100644 --- a/source/tanya/async/event/iocp.d +++ b/source/tanya/async/event/iocp.d @@ -29,8 +29,16 @@ import core.sys.windows.winsock2; /** * Transport for stream sockets. */ -final class StreamTransport : IOWatcher, SocketTransport +final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport { + private SocketException exception; + + private ReadBuffer!ubyte output; + + private WriteBuffer!ubyte input; + + private Protocol protocol_; + /** * Creates new completion port transport. * @@ -42,6 +50,9 @@ final class StreamTransport : IOWatcher, SocketTransport this(OverlappedConnectedSocket socket) @nogc { super(socket); + output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); + input = WriteBuffer!ubyte(8192, MmapPool.instance); + active = true; } /** @@ -84,6 +95,54 @@ final class StreamTransport : IOWatcher, SocketTransport } } } + + /** + * Returns: Application protocol. + */ + @property Protocol protocol() pure nothrow @safe @nogc + { + return protocol_; + } + + /** + * Switches the protocol. + * + * The protocol is deallocated by the event loop, it should currently be + * allocated with $(D_PSYMBOL MmapPool). + * + * Params: + * protocol = Application protocol. + * + * Precondition: $(D_INLINECODE protocol !is null) + */ + @property void protocol(Protocol protocol) pure nothrow @safe @nogc + in + { + assert(protocol !is null); + } + body + { + protocol_ = protocol; + } + + /** + * Invokes the watcher callback. + */ + override void invoke() @nogc + { + if (output.length) + { + protocol.received(output[0 .. $]); + output.clear(); + } + else + { + protocol.disconnected(exception); + MmapPool.instance.dispose(protocol_); + defaultAllocator.dispose(exception); + active = false; + } + } } final class IOCPLoop : Loop @@ -251,7 +310,10 @@ final class IOCPLoop : Loop { // We want to get one last notification to destroy the watcher transport.socket.beginReceive(transport.output[], overlapped); - kill(transport, exception); + transport.socket.shutdown(); + defaultAllocator.dispose(transport.socket); + transport.exception = exception; + pendings.enqueue(transport); } else if (received > 0) { diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d index 67efd7f..9251b31 100644 --- a/source/tanya/async/event/selector.d +++ b/source/tanya/async/event/selector.d @@ -25,10 +25,18 @@ import tanya.network.socket; /** * Transport for stream sockets. */ -final class StreamTransport : IOWatcher, SocketTransport +package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport { private SelectorLoop loop; + private SocketException exception; + + package ReadBuffer!ubyte output; + + package WriteBuffer!ubyte input; + + private Protocol protocol_; + /// Received notification that the underlying socket is write-ready. package bool writeReady; @@ -48,6 +56,9 @@ final class StreamTransport : IOWatcher, SocketTransport { super(socket); this.loop = loop; + output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); + input = WriteBuffer!ubyte(8192, MmapPool.instance); + active = true; } /** @@ -75,6 +86,54 @@ final class StreamTransport : IOWatcher, SocketTransport socket_ = socket; } + /** + * Returns: Application protocol. + */ + @property Protocol protocol() pure nothrow @safe @nogc + { + return protocol_; + } + + /** + * Switches the protocol. + * + * The protocol is deallocated by the event loop, it should currently be + * allocated with $(D_PSYMBOL MmapPool). + * + * Params: + * protocol = Application protocol. + * + * Precondition: $(D_INLINECODE protocol !is null) + */ + @property void protocol(Protocol protocol) pure nothrow @safe @nogc + in + { + assert(protocol !is null); + } + body + { + protocol_ = protocol; + } + + /** + * Invokes the watcher callback. + */ + override void invoke() @nogc + { + if (output.length) + { + protocol.received(output[0 .. $]); + output.clear(); + } + else + { + protocol.disconnected(exception); + MmapPool.instance.dispose(protocol_); + defaultAllocator.dispose(exception); + active = false; + } + } + /** * Write some data to the transport. * @@ -133,15 +192,50 @@ abstract class SelectorLoop : Loop { foreach (ref connection; connections) { - // We want to free only IOWatchers. ConnectionWatcher are created by the + // We want to free only the transports. ConnectionWatcher are created by the // user and should be freed by himself. - if (cast(IOWatcher) connection !is null) + if (cast(StreamTransport) connection !is null) { MmapPool.instance.dispose(connection); } } } + /** + * Should be called if the backend configuration changes. + * + * Params: + * watcher = Watcher. + * oldEvents = The events were already set. + * events = The events should be set. + * + * Returns: $(D_KEYWORD true) if the operation was successful. + */ + override abstract protected bool reify(SocketWatcher watcher, + EventMask oldEvents, + EventMask events) @nogc; + + /** + * Kills the watcher and closes the connection. + * + * Params: + * transport = Transport. + * exception = Occurred exception. + */ + protected void kill(StreamTransport transport, + SocketException exception = null) @nogc + in + { + assert(transport !is null); + } + body + { + transport.socket.shutdown(); + defaultAllocator.dispose(transport.socket); + transport.exception = exception; + pendings.enqueue(transport); + } + /** * If the transport couldn't send the data, the further sending should * be handled by the event loop. @@ -184,10 +278,7 @@ abstract class SelectorLoop : Loop } if (exception !is null) { - auto watcher = cast(IOWatcher) connections[transport.socket.handle]; - assert(watcher !is null); - - kill(watcher, exception); + kill(transport, exception); return false; } return true; @@ -199,7 +290,7 @@ abstract class SelectorLoop : Loop * Params: * watcher = Watcher. */ - override void start(SocketWatcher watcher) @nogc + override void start(ConnectionWatcher watcher) @nogc { if (watcher.active) { diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d index 70f1f66..54b5238 100644 --- a/source/tanya/async/loop.d +++ b/source/tanya/async/loop.d @@ -196,7 +196,7 @@ abstract class Loop * Params: * watcher = Watcher. */ - void start(SocketWatcher watcher) @nogc + void start(ConnectionWatcher watcher) @nogc { if (watcher.active) { @@ -266,26 +266,6 @@ abstract class Loop blockTime_ = blockTime; } - /** - * Kills the watcher and closes the connection. - * - * Params: - * watcher = Watcher. - * exception = Occurred exception. - */ - protected void kill(IOWatcher watcher, SocketException exception = null) @nogc - in - { - assert(watcher !is null); - } - body - { - watcher.socket.shutdown(); - defaultAllocator.dispose(watcher.socket); - watcher.exception = exception; - pendings.enqueue(watcher); - } - /** * Does the actual polling. */ diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d index 997e033..9756d99 100644 --- a/source/tanya/async/watcher.d +++ b/source/tanya/async/watcher.d @@ -115,77 +115,3 @@ class ConnectionWatcher : SocketWatcher } } } - -package abstract class IOWatcher : SocketWatcher, DuplexTransport -{ - package SocketException exception; - - package ReadBuffer!ubyte output; - - package WriteBuffer!ubyte input; - - /// Application protocol. - protected Protocol protocol_; - - /** - * Params: - * socket = Socket. - * - * Precondition: $(D_INLINECODE socket !is null) - */ - this(ConnectedSocket socket) @nogc - { - super(socket); - output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); - input = WriteBuffer!ubyte(8192, MmapPool.instance); - active = true; - } - - /** - * Returns: Application protocol. - */ - @property Protocol protocol() pure nothrow @safe @nogc - { - return protocol_; - } - - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop, it should currently be - * allocated with $(D_PSYMBOL MmapPool). - * - * Params: - * protocol = Application protocol. - * - * Precondition: $(D_INLINECODE protocol !is null) - */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc - in - { - assert(protocol !is null); - } - body - { - protocol_ = protocol; - } - - /** - * Invokes the watcher callback. - */ - override void invoke() @nogc - { - if (output.length) - { - protocol.received(output[0 .. $]); - output.clear(); - } - else - { - protocol.disconnected(exception); - MmapPool.instance.dispose(protocol_); - defaultAllocator.dispose(exception); - active = false; - } - } -}