From a012ca40037566b223dbb9d40480d801282f449a Mon Sep 17 00:00:00 2001 From: Eugen Wissner Date: Fri, 10 Feb 2017 22:30:12 +0100 Subject: [PATCH 1/6] Remove StreamTransport interface Implement DuplexTransport and SocketTransport separately. --- source/tanya/async/event/epoll.d | 16 ++-- source/tanya/async/event/iocp.d | 62 ++++++--------- source/tanya/async/event/kqueue.d | 15 ++-- source/tanya/async/event/selector.d | 70 ++++++----------- source/tanya/async/loop.d | 10 +-- source/tanya/async/transport.d | 7 -- source/tanya/async/watcher.d | 112 ++++++++++++++-------------- 7 files changed, 122 insertions(+), 170 deletions(-) diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d index 998ad18..c10b241 100644 --- a/source/tanya/async/event/epoll.d +++ b/source/tanya/async/event/epoll.d @@ -70,14 +70,9 @@ final class EpollLoop : SelectorLoop * * Returns: $(D_KEYWORD true) if the operation was successful. */ - protected override bool reify(ConnectionWatcher watcher, + protected override bool reify(SocketWatcher watcher, EventMask oldEvents, EventMask events) @nogc - in - { - assert(watcher !is null); - } - body { int op = EPOLL_CTL_DEL; epoll_event ev; @@ -123,15 +118,18 @@ final class EpollLoop : SelectorLoop for (auto i = 0; i < eventCount; ++i) { - auto transport = cast(SelectorStreamTransport) connections[events[i].data.fd]; + auto transport = cast(StreamTransport) connections[events[i].data.fd]; if (transport is null) { - acceptConnections(connections[events[i].data.fd]); + auto connection = cast(ConnectionWatcher) connections[events[i].data.fd]; + assert(connection !is null); + + acceptConnections(connection); } else if (events[i].events & EPOLLERR) { - kill(transport, null); + kill(transport); continue; } else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP)) diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d index 31e57c8..84296a8 100644 --- a/source/tanya/async/event/iocp.d +++ b/source/tanya/async/event/iocp.d @@ -26,59 +26,39 @@ import core.sys.windows.winbase; import core.sys.windows.windef; import core.sys.windows.winsock2; -final class IOCPStreamTransport : StreamTransport +/** + * Transport for stream sockets. + */ +final class StreamTransport : IOWatcher, SocketTransport { - private WriteBuffer!ubyte input; - /** * Creates new completion port transport. * * Params: * socket = Socket. * - * Precondition: $(D_INLINECODE socket) + * Precondition: $(D_INLINECODE socket !is null) */ this(OverlappedConnectedSocket socket) @nogc - in + { + super(socket); + } + + /** + * Returns: Socket. + * + * Postcondition: $(D_INLINECODE socket !is null) + */ + override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc + out (socket) { assert(socket !is null); } body - { - super(socket); - input = WriteBuffer!ubyte(8192, MmapPool.instance); - } - - /** - * Returns: Socket. - */ - override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc { return cast(OverlappedConnectedSocket) socket_; } - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop, it should currently be - * allocated with $(D_PSYMBOL MmapPool). - * - * Params: - * protocol = Application protocol. - * - * Precondition: $(D_INLINECODE protocol !is null) - */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc - in - { - assert(protocol !is null); - } - body - { - protocol_ = protocol; - } - - /** * Write some data to the transport. * @@ -137,7 +117,7 @@ final class IOCPLoop : Loop * * Returns: $(D_KEYWORD true) if the operation was successful. */ - override protected bool reify(ConnectionWatcher watcher, + override protected bool reify(SocketWatcher watcher, EventMask oldEvents, EventMask events) @nogc { @@ -170,7 +150,7 @@ final class IOCPLoop : Loop if (!(oldEvents & Event.read) && (events & Event.read) || !(oldEvents & Event.write) && (events & Event.write)) { - auto transport = cast(IOCPStreamTransport) watcher; + auto transport = cast(StreamTransport) watcher; assert(transport !is null); if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle, @@ -237,7 +217,7 @@ final class IOCPLoop : Loop assert(listener !is null); auto socket = listener.endAccept(overlapped); - auto transport = MmapPool.instance.make!IOCPStreamTransport(socket); + auto transport = MmapPool.instance.make!StreamTransport(socket); connection.incoming.enqueue(transport); @@ -247,7 +227,7 @@ final class IOCPLoop : Loop listener.beginAccept(overlapped); break; case OverlappedSocketEvent.read: - auto transport = cast(IOCPStreamTransport) (cast(void*) key); + auto transport = cast(StreamTransport) (cast(void*) key); assert(transport !is null); if (!transport.active) @@ -287,7 +267,7 @@ final class IOCPLoop : Loop } break; case OverlappedSocketEvent.write: - auto transport = cast(IOCPStreamTransport) (cast(void*) key); + auto transport = cast(StreamTransport) (cast(void*) key); assert(transport !is null); transport.input += transport.socket.endSend(overlapped); diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d index e52a737..721a792 100644 --- a/source/tanya/async/event/kqueue.d +++ b/source/tanya/async/event/kqueue.d @@ -177,7 +177,7 @@ final class KqueueLoop : SelectorLoop * * Returns: $(D_KEYWORD true) if the operation was successful. */ - override protected bool reify(ConnectionWatcher watcher, + override protected bool reify(SocketWatcher watcher, EventMask oldEvents, EventMask events) @nogc { @@ -237,15 +237,18 @@ final class KqueueLoop : SelectorLoop { assert(connections.length > events[i].ident); - auto transport = cast(SelectorStreamTransport) connections[events[i].ident]; + auto transport = cast(StreamTransport) connections[events[i].ident]; // If it is a ConnectionWatcher. Accept connections. if (transport is null) { + auto connection = cast(ConnectionWatcher) connections[events[i].data.fd]; + assert(connection !is null); + acceptConnections(connections[events[i].ident]); } else if (events[i].flags & EV_ERROR) { - kill(transport, null); + kill(transport); } else if (events[i].filter == EVFILT_READ) { @@ -300,12 +303,8 @@ final class KqueueLoop : SelectorLoop * Params: * transport = Transport. * exception = Exception thrown on sending. - * - * Returns: $(D_KEYWORD true) if the operation could be successfully - * completed or scheduled, $(D_KEYWORD false) otherwise (the - * transport is be destroyed then). */ - protected override bool feed(SelectorStreamTransport transport, + protected override void feed(StreamTransport transport, SocketException exception = null) @nogc { if (!super.feed(transport, exception)) diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d index 83a9fce..06662b5 100644 --- a/source/tanya/async/event/selector.d +++ b/source/tanya/async/event/selector.d @@ -25,11 +25,8 @@ import tanya.network.socket; /** * Transport for stream sockets. */ -final class SelectorStreamTransport : IOWatcher, StreamTransport +final class StreamTransport : IOWatcher, SocketTransport { - /// Input buffer. - package WriteBuffer!ubyte input; - private SelectorLoop loop; /// Received notification that the underlying socket is write-ready. @@ -37,8 +34,8 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport /** * Params: - * loop = Event loop. - * socket = Socket. + * loop = Event loop. + * socket = Socket. * * Precondition: $(D_INLINECODE loop !is null && socket !is null) */ @@ -46,19 +43,24 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport in { assert(loop !is null); - assert(socket !is null); } body { super(socket); this.loop = loop; - input = WriteBuffer!ubyte(8192, MmapPool.instance); } /** * Returns: Socket. + * + * Postcondition: $(D_INLINECODE socket !is null) */ override @property ConnectedSocket socket() pure nothrow @safe @nogc + out (socket) + { + assert(socket !is null); + } + body { return cast(ConnectedSocket) socket_; } @@ -73,27 +75,6 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport socket_ = socket; } - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop, it should currently be - * allocated with $(D_PSYMBOL MmapPool). - * - * Params: - * protocol = Application protocol. - * - * Precondition: $(D_INLINECODE protocol !is null) - */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc - in - { - assert(protocol !is null); - } - body - { - protocol_ = protocol; - } - /** * Write some data to the transport. * @@ -140,12 +121,12 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport abstract class SelectorLoop : Loop { /// Pending connections. - protected Vector!ConnectionWatcher connections; + protected Vector!SocketWatcher connections; this() @nogc { super(); - connections = Vector!ConnectionWatcher(maxEvents, MmapPool.instance); + connections = Vector!SocketWatcher(maxEvents, MmapPool.instance); } ~this() @nogc @@ -154,11 +135,9 @@ abstract class SelectorLoop : Loop { // We want to free only IOWatchers. ConnectionWatcher are created by the // user and should be freed by himself. - auto io = cast(IOWatcher) connection; - if (io !is null) + if (cast(IOWatcher) connection !is null) { - MmapPool.instance.dispose(io); - connection = null; + MmapPool.instance.dispose(connection); } } } @@ -170,13 +149,14 @@ abstract class SelectorLoop : Loop * Params: * transport = Transport. * exception = Exception thrown on sending. - * - * Returns: $(D_KEYWORD true) if the operation could be successfully - * completed or scheduled, $(D_KEYWORD false) otherwise (the - * transport will be destroyed then). */ - protected bool feed(SelectorStreamTransport transport, + protected void feed(StreamTransport transport, SocketException exception = null) @nogc + in + { + assert(transport !is null); + } + body { while (transport.input.length && transport.writeReady) { @@ -204,9 +184,7 @@ abstract class SelectorLoop : Loop assert(watcher !is null); kill(watcher, exception); - return false; } - return true; } /** @@ -215,7 +193,7 @@ abstract class SelectorLoop : Loop * Params: * watcher = Watcher. */ - override void start(ConnectionWatcher watcher) @nogc + override void start(SocketWatcher watcher) @nogc { if (watcher.active) { @@ -261,11 +239,11 @@ abstract class SelectorLoop : Loop break; } - SelectorStreamTransport transport; + StreamTransport transport; if (connections.length > client.handle) { - transport = cast(SelectorStreamTransport) connections[client.handle]; + transport = cast(StreamTransport) connections[client.handle]; } else { @@ -273,7 +251,7 @@ abstract class SelectorLoop : Loop } if (transport is null) { - transport = MmapPool.instance.make!SelectorStreamTransport(this, client); + transport = MmapPool.instance.make!StreamTransport(this, client); connections[client.handle] = transport; } else diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d index b7854a6..70f1f66 100644 --- a/source/tanya/async/loop.d +++ b/source/tanya/async/loop.d @@ -196,7 +196,7 @@ abstract class Loop * Params: * watcher = Watcher. */ - void start(ConnectionWatcher watcher) @nogc + void start(SocketWatcher watcher) @nogc { if (watcher.active) { @@ -234,9 +234,9 @@ abstract class Loop * * Returns: $(D_KEYWORD true) if the operation was successful. */ - abstract protected bool reify(ConnectionWatcher watcher, - EventMask oldEvents, - EventMask events) @nogc; + abstract protected bool reify(SocketWatcher watcher, + EventMask oldEvents, + EventMask events) @nogc; /** * Returns: The blocking time. @@ -273,7 +273,7 @@ abstract class Loop * watcher = Watcher. * exception = Occurred exception. */ - protected void kill(IOWatcher watcher, SocketException exception) @nogc + protected void kill(IOWatcher watcher, SocketException exception = null) @nogc in { assert(watcher !is null); diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d index b06bec7..f18fbee 100644 --- a/source/tanya/async/transport.d +++ b/source/tanya/async/transport.d @@ -85,10 +85,3 @@ interface SocketTransport : Transport */ @property Socket socket() pure nothrow @safe @nogc; } - -/** - * Represents a connection-oriented socket transport. - */ -package interface StreamTransport : DuplexTransport, SocketTransport -{ -} diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d index 37742e2..997e033 100644 --- a/source/tanya/async/watcher.d +++ b/source/tanya/async/watcher.d @@ -36,15 +36,48 @@ abstract class Watcher void invoke() @nogc; } -class ConnectionWatcher : Watcher +/** + * Socket watcher. + */ +abstract class SocketWatcher : Watcher { /// Watched socket. protected Socket socket_; - /// Protocol factory. - protected Protocol delegate() @nogc protocolFactory; + /** + * Params: + * socket = Socket. + * + * Precondition: $(D_INLINECODE socket !is null) + */ + this(Socket socket) pure nothrow @safe @nogc + in + { + assert(socket !is null); + } + body + { + socket_ = socket; + } - package Queue!DuplexTransport incoming; + /** + * Returns: Socket. + */ + @property Socket socket() pure nothrow @safe @nogc + { + return socket_; + } +} + +/** + * Connection watcher. + */ +class ConnectionWatcher : SocketWatcher +{ + /// Incoming connection queue. + Queue!DuplexTransport incoming; + + private Protocol delegate() @nogc protocolFactory; /** * Params: @@ -52,19 +85,8 @@ class ConnectionWatcher : Watcher */ this(Socket socket) @nogc { + super(socket); incoming = Queue!DuplexTransport(MmapPool.instance); - socket_ = socket; - } - - /** - * Destroys the watcher. - */ - ~this() @nogc - { - foreach (w; incoming) - { - MmapPool.instance.dispose(w); - } } /** @@ -76,14 +98,6 @@ class ConnectionWatcher : Watcher this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P; } - /** - * Returns: Socket. - */ - @property Socket socket() pure nothrow @safe @nogc - { - return socket_; - } - /** * Invokes new connection callback. */ @@ -102,21 +116,17 @@ class ConnectionWatcher : Watcher } } -/** - * Contains a pending watcher with the invoked events or a transport can be - * read from. - */ -class IOWatcher : ConnectionWatcher +package abstract class IOWatcher : SocketWatcher, DuplexTransport { package SocketException exception; - protected Protocol protocol_; - - /** - * Returns: Underlying output buffer. - */ package ReadBuffer!ubyte output; + package WriteBuffer!ubyte input; + + /// Application protocol. + protected Protocol protocol_; + /** * Params: * socket = Socket. @@ -124,25 +134,13 @@ class IOWatcher : ConnectionWatcher * Precondition: $(D_INLINECODE socket !is null) */ this(ConnectedSocket socket) @nogc - in - { - assert(socket !is null); - } - body { super(socket); output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); + input = WriteBuffer!ubyte(8192, MmapPool.instance); active = true; } - /** - * Destroys the watcher. - */ - ~this() @nogc - { - MmapPool.instance.dispose(protocol_); - } - /** * Returns: Application protocol. */ @@ -152,18 +150,24 @@ class IOWatcher : ConnectionWatcher } /** - * Returns: Socket. + * Switches the protocol. * - * Precondition: $(D_INLINECODE socket !is null) + * The protocol is deallocated by the event loop, it should currently be + * allocated with $(D_PSYMBOL MmapPool). + * + * Params: + * protocol = Application protocol. + * + * Precondition: $(D_INLINECODE protocol !is null) */ - override @property ConnectedSocket socket() pure nothrow @safe @nogc - out (socket) + @property void protocol(Protocol protocol) pure nothrow @safe @nogc + in { - assert(socket !is null); + assert(protocol !is null); } body { - return cast(ConnectedSocket) socket_; + protocol_ = protocol; } /** @@ -179,7 +183,7 @@ class IOWatcher : ConnectionWatcher else { protocol.disconnected(exception); - MmapPool.instance.dispose(protocol); + MmapPool.instance.dispose(protocol_); defaultAllocator.dispose(exception); active = false; } From c41fa2e98fdab61e4806869dc49576a105176eb8 Mon Sep 17 00:00:00 2001 From: Eugen Wissner Date: Fri, 10 Feb 2017 23:01:33 +0100 Subject: [PATCH 2/6] Adjust kqueue build --- source/tanya/async/event/kqueue.d | 10 +++++++--- source/tanya/async/event/selector.d | 8 +++++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d index 721a792..8673ff7 100644 --- a/source/tanya/async/event/kqueue.d +++ b/source/tanya/async/event/kqueue.d @@ -241,10 +241,10 @@ final class KqueueLoop : SelectorLoop // If it is a ConnectionWatcher. Accept connections. if (transport is null) { - auto connection = cast(ConnectionWatcher) connections[events[i].data.fd]; + auto connection = cast(ConnectionWatcher) connections[events[i].ident]; assert(connection !is null); - acceptConnections(connections[events[i].ident]); + acceptConnections(connection); } else if (events[i].flags & EV_ERROR) { @@ -303,8 +303,12 @@ final class KqueueLoop : SelectorLoop * Params: * transport = Transport. * exception = Exception thrown on sending. + * + * Returns: $(D_KEYWORD true) if the operation could be successfully + * completed or scheduled, $(D_KEYWORD false) otherwise (the + * transport will be destroyed then). */ - protected override void feed(StreamTransport transport, + protected override bool feed(StreamTransport transport, SocketException exception = null) @nogc { if (!super.feed(transport, exception)) diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d index 06662b5..67efd7f 100644 --- a/source/tanya/async/event/selector.d +++ b/source/tanya/async/event/selector.d @@ -149,8 +149,12 @@ abstract class SelectorLoop : Loop * Params: * transport = Transport. * exception = Exception thrown on sending. + * + * Returns: $(D_KEYWORD true) if the operation could be successfully + * completed or scheduled, $(D_KEYWORD false) otherwise (the + * transport will be destroyed then). */ - protected void feed(StreamTransport transport, + protected bool feed(StreamTransport transport, SocketException exception = null) @nogc in { @@ -184,7 +188,9 @@ abstract class SelectorLoop : Loop assert(watcher !is null); kill(watcher, exception); + return false; } + return true; } /** From 3454a1965a8760deb9acfa44d5c2cf311710daec Mon Sep 17 00:00:00 2001 From: Eugen Wissner Date: Sat, 11 Feb 2017 19:47:57 +0100 Subject: [PATCH 3/6] Move all IOWatcher members to the transports --- source/tanya/async/event/iocp.d | 66 ++++++++++++++++- source/tanya/async/event/selector.d | 107 +++++++++++++++++++++++++--- source/tanya/async/loop.d | 22 +----- source/tanya/async/watcher.d | 74 ------------------- 4 files changed, 164 insertions(+), 105 deletions(-) diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d index 84296a8..8dc6f2d 100644 --- a/source/tanya/async/event/iocp.d +++ b/source/tanya/async/event/iocp.d @@ -29,8 +29,16 @@ import core.sys.windows.winsock2; /** * Transport for stream sockets. */ -final class StreamTransport : IOWatcher, SocketTransport +final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport { + private SocketException exception; + + private ReadBuffer!ubyte output; + + private WriteBuffer!ubyte input; + + private Protocol protocol_; + /** * Creates new completion port transport. * @@ -42,6 +50,9 @@ final class StreamTransport : IOWatcher, SocketTransport this(OverlappedConnectedSocket socket) @nogc { super(socket); + output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); + input = WriteBuffer!ubyte(8192, MmapPool.instance); + active = true; } /** @@ -84,6 +95,54 @@ final class StreamTransport : IOWatcher, SocketTransport } } } + + /** + * Returns: Application protocol. + */ + @property Protocol protocol() pure nothrow @safe @nogc + { + return protocol_; + } + + /** + * Switches the protocol. + * + * The protocol is deallocated by the event loop, it should currently be + * allocated with $(D_PSYMBOL MmapPool). + * + * Params: + * protocol = Application protocol. + * + * Precondition: $(D_INLINECODE protocol !is null) + */ + @property void protocol(Protocol protocol) pure nothrow @safe @nogc + in + { + assert(protocol !is null); + } + body + { + protocol_ = protocol; + } + + /** + * Invokes the watcher callback. + */ + override void invoke() @nogc + { + if (output.length) + { + protocol.received(output[0 .. $]); + output.clear(); + } + else + { + protocol.disconnected(exception); + MmapPool.instance.dispose(protocol_); + defaultAllocator.dispose(exception); + active = false; + } + } } final class IOCPLoop : Loop @@ -251,7 +310,10 @@ final class IOCPLoop : Loop { // We want to get one last notification to destroy the watcher transport.socket.beginReceive(transport.output[], overlapped); - kill(transport, exception); + transport.socket.shutdown(); + defaultAllocator.dispose(transport.socket); + transport.exception = exception; + pendings.enqueue(transport); } else if (received > 0) { diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d index 67efd7f..9251b31 100644 --- a/source/tanya/async/event/selector.d +++ b/source/tanya/async/event/selector.d @@ -25,10 +25,18 @@ import tanya.network.socket; /** * Transport for stream sockets. */ -final class StreamTransport : IOWatcher, SocketTransport +package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport { private SelectorLoop loop; + private SocketException exception; + + package ReadBuffer!ubyte output; + + package WriteBuffer!ubyte input; + + private Protocol protocol_; + /// Received notification that the underlying socket is write-ready. package bool writeReady; @@ -48,6 +56,9 @@ final class StreamTransport : IOWatcher, SocketTransport { super(socket); this.loop = loop; + output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); + input = WriteBuffer!ubyte(8192, MmapPool.instance); + active = true; } /** @@ -75,6 +86,54 @@ final class StreamTransport : IOWatcher, SocketTransport socket_ = socket; } + /** + * Returns: Application protocol. + */ + @property Protocol protocol() pure nothrow @safe @nogc + { + return protocol_; + } + + /** + * Switches the protocol. + * + * The protocol is deallocated by the event loop, it should currently be + * allocated with $(D_PSYMBOL MmapPool). + * + * Params: + * protocol = Application protocol. + * + * Precondition: $(D_INLINECODE protocol !is null) + */ + @property void protocol(Protocol protocol) pure nothrow @safe @nogc + in + { + assert(protocol !is null); + } + body + { + protocol_ = protocol; + } + + /** + * Invokes the watcher callback. + */ + override void invoke() @nogc + { + if (output.length) + { + protocol.received(output[0 .. $]); + output.clear(); + } + else + { + protocol.disconnected(exception); + MmapPool.instance.dispose(protocol_); + defaultAllocator.dispose(exception); + active = false; + } + } + /** * Write some data to the transport. * @@ -133,15 +192,50 @@ abstract class SelectorLoop : Loop { foreach (ref connection; connections) { - // We want to free only IOWatchers. ConnectionWatcher are created by the + // We want to free only the transports. ConnectionWatcher are created by the // user and should be freed by himself. - if (cast(IOWatcher) connection !is null) + if (cast(StreamTransport) connection !is null) { MmapPool.instance.dispose(connection); } } } + /** + * Should be called if the backend configuration changes. + * + * Params: + * watcher = Watcher. + * oldEvents = The events were already set. + * events = The events should be set. + * + * Returns: $(D_KEYWORD true) if the operation was successful. + */ + override abstract protected bool reify(SocketWatcher watcher, + EventMask oldEvents, + EventMask events) @nogc; + + /** + * Kills the watcher and closes the connection. + * + * Params: + * transport = Transport. + * exception = Occurred exception. + */ + protected void kill(StreamTransport transport, + SocketException exception = null) @nogc + in + { + assert(transport !is null); + } + body + { + transport.socket.shutdown(); + defaultAllocator.dispose(transport.socket); + transport.exception = exception; + pendings.enqueue(transport); + } + /** * If the transport couldn't send the data, the further sending should * be handled by the event loop. @@ -184,10 +278,7 @@ abstract class SelectorLoop : Loop } if (exception !is null) { - auto watcher = cast(IOWatcher) connections[transport.socket.handle]; - assert(watcher !is null); - - kill(watcher, exception); + kill(transport, exception); return false; } return true; @@ -199,7 +290,7 @@ abstract class SelectorLoop : Loop * Params: * watcher = Watcher. */ - override void start(SocketWatcher watcher) @nogc + override void start(ConnectionWatcher watcher) @nogc { if (watcher.active) { diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d index 70f1f66..54b5238 100644 --- a/source/tanya/async/loop.d +++ b/source/tanya/async/loop.d @@ -196,7 +196,7 @@ abstract class Loop * Params: * watcher = Watcher. */ - void start(SocketWatcher watcher) @nogc + void start(ConnectionWatcher watcher) @nogc { if (watcher.active) { @@ -266,26 +266,6 @@ abstract class Loop blockTime_ = blockTime; } - /** - * Kills the watcher and closes the connection. - * - * Params: - * watcher = Watcher. - * exception = Occurred exception. - */ - protected void kill(IOWatcher watcher, SocketException exception = null) @nogc - in - { - assert(watcher !is null); - } - body - { - watcher.socket.shutdown(); - defaultAllocator.dispose(watcher.socket); - watcher.exception = exception; - pendings.enqueue(watcher); - } - /** * Does the actual polling. */ diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d index 997e033..9756d99 100644 --- a/source/tanya/async/watcher.d +++ b/source/tanya/async/watcher.d @@ -115,77 +115,3 @@ class ConnectionWatcher : SocketWatcher } } } - -package abstract class IOWatcher : SocketWatcher, DuplexTransport -{ - package SocketException exception; - - package ReadBuffer!ubyte output; - - package WriteBuffer!ubyte input; - - /// Application protocol. - protected Protocol protocol_; - - /** - * Params: - * socket = Socket. - * - * Precondition: $(D_INLINECODE socket !is null) - */ - this(ConnectedSocket socket) @nogc - { - super(socket); - output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); - input = WriteBuffer!ubyte(8192, MmapPool.instance); - active = true; - } - - /** - * Returns: Application protocol. - */ - @property Protocol protocol() pure nothrow @safe @nogc - { - return protocol_; - } - - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop, it should currently be - * allocated with $(D_PSYMBOL MmapPool). - * - * Params: - * protocol = Application protocol. - * - * Precondition: $(D_INLINECODE protocol !is null) - */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc - in - { - assert(protocol !is null); - } - body - { - protocol_ = protocol; - } - - /** - * Invokes the watcher callback. - */ - override void invoke() @nogc - { - if (output.length) - { - protocol.received(output[0 .. $]); - output.clear(); - } - else - { - protocol.disconnected(exception); - MmapPool.instance.dispose(protocol_); - defaultAllocator.dispose(exception); - active = false; - } - } -} From e86ff63f91506c83370534d7d10746a92d1b2224 Mon Sep 17 00:00:00 2001 From: Eugen Wissner Date: Sun, 12 Feb 2017 18:51:00 +0100 Subject: [PATCH 4/6] Add DuplexTransport.close for the selector transport --- source/tanya/async/event/selector.d | 29 +++++++++++++++++++++++++++++ source/tanya/async/transport.d | 13 +++++++++++++ 2 files changed, 42 insertions(+) diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d index 9251b31..13254ce 100644 --- a/source/tanya/async/event/selector.d +++ b/source/tanya/async/event/selector.d @@ -37,6 +37,8 @@ package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport private Protocol protocol_; + private bool closing; + /// Received notification that the underlying socket is write-ready. package bool writeReady; @@ -115,6 +117,25 @@ package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport protocol_ = protocol; } + /** + * Returns $(D_PARAM true) if the transport is closing or closed. + */ + bool isClosing() const pure nothrow @safe @nogc + { + return closing; + } + + /** + * Close the transport. + * + * Buffered data will be flushed. No more data will be received. + */ + void close() @nogc + { + closing = true; + loop.reify(this, EventMask(Event.read, Event.write), EventMask(Event.write)); + } + /** * Invokes the watcher callback. */ @@ -124,6 +145,10 @@ package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport { protocol.received(output[0 .. $]); output.clear(); + if (isClosing() && input.length == 0) + { + loop.kill(this); + } } else { @@ -281,6 +306,10 @@ abstract class SelectorLoop : Loop kill(transport, exception); return false; } + if (transport.input.length == 0 && transport.isClosing()) + { + kill(transport); + } return true; } diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d index f18fbee..4550522 100644 --- a/source/tanya/async/transport.d +++ b/source/tanya/async/transport.d @@ -73,6 +73,19 @@ interface DuplexTransport : ReadTransport, WriteTransport { assert(protocol !is null); } + + + /** + * Returns $(D_PARAM true) if the transport is closing or closed. + */ + bool isClosing() const pure nothrow @safe @nogc; + + /** + * Close the transport. + * + * Buffered data will be flushed. No more data will be received. + */ + void close() @nogc; } /** From d210a39249a6f5e172d0f5c119f2a90b959f735a Mon Sep 17 00:00:00 2001 From: Eugen Wissner Date: Sat, 18 Feb 2017 14:10:54 +0100 Subject: [PATCH 5/6] Implement IOCPTransport.close and isClosing --- source/tanya/async/event/iocp.d | 81 +++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 23 deletions(-) diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d index 8dc6f2d..26678e1 100644 --- a/source/tanya/async/event/iocp.d +++ b/source/tanya/async/event/iocp.d @@ -39,6 +39,8 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport private Protocol protocol_; + private bool closing; + /** * Creates new completion port transport. * @@ -70,6 +72,24 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport return cast(OverlappedConnectedSocket) socket_; } + /** + * Returns $(D_PARAM true) if the transport is closing or closed. + */ + bool isClosing() const pure nothrow @safe @nogc + { + return closing; + } + + /** + * Close the transport. + * + * Buffered data will be flushed. No more data will be received. + */ + void close() pure nothrow @safe @nogc + { + closing = true; + } + /** * Write some data to the transport. * @@ -78,22 +98,7 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport */ void write(ubyte[] data) @nogc { - immutable empty = input.length == 0; input ~= data; - if (empty) - { - SocketState overlapped; - try - { - overlapped = MmapPool.instance.make!SocketState; - socket.beginSend(input[], overlapped); - } - catch (SocketException e) - { - MmapPool.instance.dispose(overlapped); - MmapPool.instance.dispose(e); - } - } } /** @@ -132,8 +137,23 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport { if (output.length) { + immutable empty = input.length == 0; protocol.received(output[0 .. $]); output.clear(); + if (empty) + { + SocketState overlapped; + try + { + overlapped = MmapPool.instance.make!SocketState; + socket.beginSend(input[], overlapped); + } + catch (SocketException e) + { + MmapPool.instance.dispose(overlapped); + MmapPool.instance.dispose(e); + } + } } else { @@ -239,6 +259,20 @@ final class IOCPLoop : Loop return true; } + private void kill(StreamTransport transport, + SocketException exception = null) @nogc + in + { + assert(transport !is null); + } + body + { + transport.socket.shutdown(); + defaultAllocator.dispose(transport.socket); + transport.exception = exception; + pendings.enqueue(transport); + } + /** * Does the actual polling. */ @@ -308,19 +342,16 @@ final class IOCPLoop : Loop } if (transport.socket.disconnected) { - // We want to get one last notification to destroy the watcher + // We want to get one last notification to destroy the watcher. transport.socket.beginReceive(transport.output[], overlapped); - transport.socket.shutdown(); - defaultAllocator.dispose(transport.socket); - transport.exception = exception; - pendings.enqueue(transport); + kill(transport, exception); } else if (received > 0) { immutable full = transport.output.free == received; transport.output += received; - // Receive was interrupted because the buffer is full. We have to continue + // Receive was interrupted because the buffer is full. We have to continue. if (full) { transport.socket.beginReceive(transport.output[], overlapped); @@ -333,17 +364,21 @@ final class IOCPLoop : Loop assert(transport !is null); transport.input += transport.socket.endSend(overlapped); - if (transport.input.length) + if (transport.input.length > 0) { transport.socket.beginSend(transport.input[], overlapped); } else { transport.socket.beginReceive(transport.output[], overlapped); + if (transport.isClosing()) + { + kill(transport); + } } break; default: assert(false, "Unknown event"); } } -} +} \ No newline at end of file From 70632d975da09471d0fac5df51a96bd3d9bbeb6f Mon Sep 17 00:00:00 2001 From: Eugen Wissner Date: Sat, 18 Feb 2017 16:35:06 +0100 Subject: [PATCH 6/6] Add documentation link --- README.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/README.md b/README.md index d7da98a..ffb554c 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Tanya extends Phobos functionality and provides alternative implementations for data structures and utilities that depend on the Garbage Collector in Phobos. * [Bug tracker](https://issues.caraus.io/projects/tanya) +* [Documentation](https://docs.caraus.io/tanya) ## Overview @@ -51,9 +52,6 @@ needs more test cases, better performance and some additional features * Tanya is a native D library. -* Documentation and usage examples can be found in the source code. -Online documentation will be published soon. - * Tanya is cross-platform. The development happens on a 64-bit Linux, but it is being tested on Windows and FreeBSD as well.