diff options
| author | Eugen Wissner <belka@caraus.de> | 2017-02-11 19:47:57 +0100 |
|---|---|---|
| committer | Eugen Wissner <belka@caraus.de> | 2017-02-11 19:47:57 +0100 |
| commit | 3454a1965a8760deb9acfa44d5c2cf311710daec (patch) | |
| tree | 578783ac6967f760d7d19c3dd014833c8ef905d3 | |
| parent | c41fa2e98fdab61e4806869dc49576a105176eb8 (diff) | |
| download | tanya-3454a1965a8760deb9acfa44d5c2cf311710daec.tar.gz | |
Move all IOWatcher members to the transports
| -rw-r--r-- | source/tanya/async/event/iocp.d | 66 | ||||
| -rw-r--r-- | source/tanya/async/event/selector.d | 107 | ||||
| -rw-r--r-- | source/tanya/async/loop.d | 22 | ||||
| -rw-r--r-- | source/tanya/async/watcher.d | 74 |
4 files changed, 164 insertions, 105 deletions
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d index 84296a8..8dc6f2d 100644 --- a/source/tanya/async/event/iocp.d +++ b/source/tanya/async/event/iocp.d @@ -29,8 +29,16 @@ import core.sys.windows.winsock2; /** * Transport for stream sockets. */ -final class StreamTransport : IOWatcher, SocketTransport +final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport { + private SocketException exception; + + private ReadBuffer!ubyte output; + + private WriteBuffer!ubyte input; + + private Protocol protocol_; + /** * Creates new completion port transport. * @@ -42,6 +50,9 @@ final class StreamTransport : IOWatcher, SocketTransport this(OverlappedConnectedSocket socket) @nogc { super(socket); + output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); + input = WriteBuffer!ubyte(8192, MmapPool.instance); + active = true; } /** @@ -84,6 +95,54 @@ final class StreamTransport : IOWatcher, SocketTransport } } } + + /** + * Returns: Application protocol. + */ + @property Protocol protocol() pure nothrow @safe @nogc + { + return protocol_; + } + + /** + * Switches the protocol. + * + * The protocol is deallocated by the event loop, it should currently be + * allocated with $(D_PSYMBOL MmapPool). + * + * Params: + * protocol = Application protocol. + * + * Precondition: $(D_INLINECODE protocol !is null) + */ + @property void protocol(Protocol protocol) pure nothrow @safe @nogc + in + { + assert(protocol !is null); + } + body + { + protocol_ = protocol; + } + + /** + * Invokes the watcher callback. + */ + override void invoke() @nogc + { + if (output.length) + { + protocol.received(output[0 .. $]); + output.clear(); + } + else + { + protocol.disconnected(exception); + MmapPool.instance.dispose(protocol_); + defaultAllocator.dispose(exception); + active = false; + } + } } final class IOCPLoop : Loop @@ -251,7 +310,10 @@ final class IOCPLoop : Loop { // We want to get one last notification to destroy the watcher transport.socket.beginReceive(transport.output[], overlapped); - kill(transport, exception); + transport.socket.shutdown(); + defaultAllocator.dispose(transport.socket); + transport.exception = exception; + pendings.enqueue(transport); } else if (received > 0) { diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d index 67efd7f..9251b31 100644 --- a/source/tanya/async/event/selector.d +++ b/source/tanya/async/event/selector.d @@ -25,10 +25,18 @@ import tanya.network.socket; /** * Transport for stream sockets. */ -final class StreamTransport : IOWatcher, SocketTransport +package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport { private SelectorLoop loop; + private SocketException exception; + + package ReadBuffer!ubyte output; + + package WriteBuffer!ubyte input; + + private Protocol protocol_; + /// Received notification that the underlying socket is write-ready. package bool writeReady; @@ -48,6 +56,9 @@ final class StreamTransport : IOWatcher, SocketTransport { super(socket); this.loop = loop; + output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); + input = WriteBuffer!ubyte(8192, MmapPool.instance); + active = true; } /** @@ -76,6 +87,54 @@ final class StreamTransport : IOWatcher, SocketTransport } /** + * Returns: Application protocol. + */ + @property Protocol protocol() pure nothrow @safe @nogc + { + return protocol_; + } + + /** + * Switches the protocol. + * + * The protocol is deallocated by the event loop, it should currently be + * allocated with $(D_PSYMBOL MmapPool). + * + * Params: + * protocol = Application protocol. + * + * Precondition: $(D_INLINECODE protocol !is null) + */ + @property void protocol(Protocol protocol) pure nothrow @safe @nogc + in + { + assert(protocol !is null); + } + body + { + protocol_ = protocol; + } + + /** + * Invokes the watcher callback. + */ + override void invoke() @nogc + { + if (output.length) + { + protocol.received(output[0 .. $]); + output.clear(); + } + else + { + protocol.disconnected(exception); + MmapPool.instance.dispose(protocol_); + defaultAllocator.dispose(exception); + active = false; + } + } + + /** * Write some data to the transport. * * Params: @@ -133,9 +192,9 @@ abstract class SelectorLoop : Loop { foreach (ref connection; connections) { - // We want to free only IOWatchers. ConnectionWatcher are created by the + // We want to free only the transports. ConnectionWatcher are created by the // user and should be freed by himself. - if (cast(IOWatcher) connection !is null) + if (cast(StreamTransport) connection !is null) { MmapPool.instance.dispose(connection); } @@ -143,6 +202,41 @@ abstract class SelectorLoop : Loop } /** + * Should be called if the backend configuration changes. + * + * Params: + * watcher = Watcher. + * oldEvents = The events were already set. + * events = The events should be set. + * + * Returns: $(D_KEYWORD true) if the operation was successful. + */ + override abstract protected bool reify(SocketWatcher watcher, + EventMask oldEvents, + EventMask events) @nogc; + + /** + * Kills the watcher and closes the connection. + * + * Params: + * transport = Transport. + * exception = Occurred exception. + */ + protected void kill(StreamTransport transport, + SocketException exception = null) @nogc + in + { + assert(transport !is null); + } + body + { + transport.socket.shutdown(); + defaultAllocator.dispose(transport.socket); + transport.exception = exception; + pendings.enqueue(transport); + } + + /** * If the transport couldn't send the data, the further sending should * be handled by the event loop. * @@ -184,10 +278,7 @@ abstract class SelectorLoop : Loop } if (exception !is null) { - auto watcher = cast(IOWatcher) connections[transport.socket.handle]; - assert(watcher !is null); - - kill(watcher, exception); + kill(transport, exception); return false; } return true; @@ -199,7 +290,7 @@ abstract class SelectorLoop : Loop * Params: * watcher = Watcher. */ - override void start(SocketWatcher watcher) @nogc + override void start(ConnectionWatcher watcher) @nogc { if (watcher.active) { diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d index 70f1f66..54b5238 100644 --- a/source/tanya/async/loop.d +++ b/source/tanya/async/loop.d @@ -196,7 +196,7 @@ abstract class Loop * Params:
* watcher = Watcher.
*/
- void start(SocketWatcher watcher) @nogc
+ void start(ConnectionWatcher watcher) @nogc
{
if (watcher.active)
{
@@ -267,26 +267,6 @@ abstract class Loop }
/**
- * Kills the watcher and closes the connection.
- *
- * Params:
- * watcher = Watcher.
- * exception = Occurred exception.
- */
- protected void kill(IOWatcher watcher, SocketException exception = null) @nogc
- in
- {
- assert(watcher !is null);
- }
- body
- {
- watcher.socket.shutdown();
- defaultAllocator.dispose(watcher.socket);
- watcher.exception = exception;
- pendings.enqueue(watcher);
- }
-
- /**
* Does the actual polling.
*/
abstract protected void poll() @nogc;
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d index 997e033..9756d99 100644 --- a/source/tanya/async/watcher.d +++ b/source/tanya/async/watcher.d @@ -115,77 +115,3 @@ class ConnectionWatcher : SocketWatcher } } } - -package abstract class IOWatcher : SocketWatcher, DuplexTransport -{ - package SocketException exception; - - package ReadBuffer!ubyte output; - - package WriteBuffer!ubyte input; - - /// Application protocol. - protected Protocol protocol_; - - /** - * Params: - * socket = Socket. - * - * Precondition: $(D_INLINECODE socket !is null) - */ - this(ConnectedSocket socket) @nogc - { - super(socket); - output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); - input = WriteBuffer!ubyte(8192, MmapPool.instance); - active = true; - } - - /** - * Returns: Application protocol. - */ - @property Protocol protocol() pure nothrow @safe @nogc - { - return protocol_; - } - - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop, it should currently be - * allocated with $(D_PSYMBOL MmapPool). - * - * Params: - * protocol = Application protocol. - * - * Precondition: $(D_INLINECODE protocol !is null) - */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc - in - { - assert(protocol !is null); - } - body - { - protocol_ = protocol; - } - - /** - * Invokes the watcher callback. - */ - override void invoke() @nogc - { - if (output.length) - { - protocol.received(output[0 .. $]); - output.clear(); - } - else - { - protocol.disconnected(exception); - MmapPool.instance.dispose(protocol_); - defaultAllocator.dispose(exception); - active = false; - } - } -} |
