diff options
| author | Eugen Wissner <belka@caraus.de> | 2017-02-10 22:30:12 +0100 |
|---|---|---|
| committer | Eugen Wissner <belka@caraus.de> | 2017-02-10 22:30:12 +0100 |
| commit | a012ca40037566b223dbb9d40480d801282f449a (patch) | |
| tree | a6e420d99bfcbb32af43848c9220f3d38a276064 /source | |
| parent | b74e5aa4ee57af17450d7ef63547002d99dffdbe (diff) | |
| download | tanya-a012ca40037566b223dbb9d40480d801282f449a.tar.gz | |
Remove StreamTransport interface
Implement DuplexTransport and SocketTransport separately.
Diffstat (limited to 'source')
| -rw-r--r-- | source/tanya/async/event/epoll.d | 16 | ||||
| -rw-r--r-- | source/tanya/async/event/iocp.d | 50 | ||||
| -rw-r--r-- | source/tanya/async/event/kqueue.d | 15 | ||||
| -rw-r--r-- | source/tanya/async/event/selector.d | 70 | ||||
| -rw-r--r-- | source/tanya/async/loop.d | 10 | ||||
| -rw-r--r-- | source/tanya/async/transport.d | 7 | ||||
| -rw-r--r-- | source/tanya/async/watcher.d | 104 |
7 files changed, 112 insertions, 160 deletions
diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d index 998ad18..c10b241 100644 --- a/source/tanya/async/event/epoll.d +++ b/source/tanya/async/event/epoll.d @@ -70,14 +70,9 @@ final class EpollLoop : SelectorLoop *
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
- protected override bool reify(ConnectionWatcher watcher,
+ protected override bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
- in
- {
- assert(watcher !is null);
- }
- body
{
int op = EPOLL_CTL_DEL;
epoll_event ev;
@@ -123,15 +118,18 @@ final class EpollLoop : SelectorLoop for (auto i = 0; i < eventCount; ++i)
{
- auto transport = cast(SelectorStreamTransport) connections[events[i].data.fd];
+ auto transport = cast(StreamTransport) connections[events[i].data.fd];
if (transport is null)
{
- acceptConnections(connections[events[i].data.fd]);
+ auto connection = cast(ConnectionWatcher) connections[events[i].data.fd];
+ assert(connection !is null);
+
+ acceptConnections(connection);
}
else if (events[i].events & EPOLLERR)
{
- kill(transport, null);
+ kill(transport);
continue;
}
else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d index 31e57c8..84296a8 100644 --- a/source/tanya/async/event/iocp.d +++ b/source/tanya/async/event/iocp.d @@ -26,59 +26,39 @@ import core.sys.windows.winbase; import core.sys.windows.windef; import core.sys.windows.winsock2; -final class IOCPStreamTransport : StreamTransport +/** + * Transport for stream sockets. + */ +final class StreamTransport : IOWatcher, SocketTransport { - private WriteBuffer!ubyte input; - /** * Creates new completion port transport. * * Params: * socket = Socket. * - * Precondition: $(D_INLINECODE socket) + * Precondition: $(D_INLINECODE socket !is null) */ this(OverlappedConnectedSocket socket) @nogc - in - { - assert(socket !is null); - } - body { super(socket); - input = WriteBuffer!ubyte(8192, MmapPool.instance); } /** * Returns: Socket. - */ - override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc - { - return cast(OverlappedConnectedSocket) socket_; - } - - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop, it should currently be - * allocated with $(D_PSYMBOL MmapPool). - * - * Params: - * protocol = Application protocol. * - * Precondition: $(D_INLINECODE protocol !is null) + * Postcondition: $(D_INLINECODE socket !is null) */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc - in + override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc + out (socket) { - assert(protocol !is null); + assert(socket !is null); } body { - protocol_ = protocol; + return cast(OverlappedConnectedSocket) socket_; } - /** * Write some data to the transport. * @@ -137,7 +117,7 @@ final class IOCPLoop : Loop * * Returns: $(D_KEYWORD true) if the operation was successful. */ - override protected bool reify(ConnectionWatcher watcher, + override protected bool reify(SocketWatcher watcher, EventMask oldEvents, EventMask events) @nogc { @@ -170,7 +150,7 @@ final class IOCPLoop : Loop if (!(oldEvents & Event.read) && (events & Event.read) || !(oldEvents & Event.write) && (events & Event.write)) { - auto transport = cast(IOCPStreamTransport) watcher; + auto transport = cast(StreamTransport) watcher; assert(transport !is null); if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle, @@ -237,7 +217,7 @@ final class IOCPLoop : Loop assert(listener !is null); auto socket = listener.endAccept(overlapped); - auto transport = MmapPool.instance.make!IOCPStreamTransport(socket); + auto transport = MmapPool.instance.make!StreamTransport(socket); connection.incoming.enqueue(transport); @@ -247,7 +227,7 @@ final class IOCPLoop : Loop listener.beginAccept(overlapped); break; case OverlappedSocketEvent.read: - auto transport = cast(IOCPStreamTransport) (cast(void*) key); + auto transport = cast(StreamTransport) (cast(void*) key); assert(transport !is null); if (!transport.active) @@ -287,7 +267,7 @@ final class IOCPLoop : Loop } break; case OverlappedSocketEvent.write: - auto transport = cast(IOCPStreamTransport) (cast(void*) key); + auto transport = cast(StreamTransport) (cast(void*) key); assert(transport !is null); transport.input += transport.socket.endSend(overlapped); diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d index e52a737..721a792 100644 --- a/source/tanya/async/event/kqueue.d +++ b/source/tanya/async/event/kqueue.d @@ -177,7 +177,7 @@ final class KqueueLoop : SelectorLoop * * Returns: $(D_KEYWORD true) if the operation was successful. */ - override protected bool reify(ConnectionWatcher watcher, + override protected bool reify(SocketWatcher watcher, EventMask oldEvents, EventMask events) @nogc { @@ -237,15 +237,18 @@ final class KqueueLoop : SelectorLoop { assert(connections.length > events[i].ident); - auto transport = cast(SelectorStreamTransport) connections[events[i].ident]; + auto transport = cast(StreamTransport) connections[events[i].ident]; // If it is a ConnectionWatcher. Accept connections. if (transport is null) { + auto connection = cast(ConnectionWatcher) connections[events[i].data.fd]; + assert(connection !is null); + acceptConnections(connections[events[i].ident]); } else if (events[i].flags & EV_ERROR) { - kill(transport, null); + kill(transport); } else if (events[i].filter == EVFILT_READ) { @@ -300,12 +303,8 @@ final class KqueueLoop : SelectorLoop * Params: * transport = Transport. * exception = Exception thrown on sending. - * - * Returns: $(D_KEYWORD true) if the operation could be successfully - * completed or scheduled, $(D_KEYWORD false) otherwise (the - * transport is be destroyed then). */ - protected override bool feed(SelectorStreamTransport transport, + protected override void feed(StreamTransport transport, SocketException exception = null) @nogc { if (!super.feed(transport, exception)) diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d index 83a9fce..06662b5 100644 --- a/source/tanya/async/event/selector.d +++ b/source/tanya/async/event/selector.d @@ -25,11 +25,8 @@ import tanya.network.socket; /** * Transport for stream sockets. */ -final class SelectorStreamTransport : IOWatcher, StreamTransport +final class StreamTransport : IOWatcher, SocketTransport { - /// Input buffer. - package WriteBuffer!ubyte input; - private SelectorLoop loop; /// Received notification that the underlying socket is write-ready. @@ -37,8 +34,8 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport /** * Params: - * loop = Event loop. - * socket = Socket. + * loop = Event loop. + * socket = Socket. * * Precondition: $(D_INLINECODE loop !is null && socket !is null) */ @@ -46,52 +43,36 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport in { assert(loop !is null); - assert(socket !is null); } body { super(socket); this.loop = loop; - input = WriteBuffer!ubyte(8192, MmapPool.instance); } /** * Returns: Socket. + * + * Postcondition: $(D_INLINECODE socket !is null) */ override @property ConnectedSocket socket() pure nothrow @safe @nogc - { - return cast(ConnectedSocket) socket_; - } - - private @property void socket(ConnectedSocket socket) pure nothrow @safe @nogc - in + out (socket) { assert(socket !is null); } body { - socket_ = socket; + return cast(ConnectedSocket) socket_; } - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop, it should currently be - * allocated with $(D_PSYMBOL MmapPool). - * - * Params: - * protocol = Application protocol. - * - * Precondition: $(D_INLINECODE protocol !is null) - */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc + private @property void socket(ConnectedSocket socket) pure nothrow @safe @nogc in { - assert(protocol !is null); + assert(socket !is null); } body { - protocol_ = protocol; + socket_ = socket; } /** @@ -140,12 +121,12 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport abstract class SelectorLoop : Loop { /// Pending connections. - protected Vector!ConnectionWatcher connections; + protected Vector!SocketWatcher connections; this() @nogc { super(); - connections = Vector!ConnectionWatcher(maxEvents, MmapPool.instance); + connections = Vector!SocketWatcher(maxEvents, MmapPool.instance); } ~this() @nogc @@ -154,11 +135,9 @@ abstract class SelectorLoop : Loop { // We want to free only IOWatchers. ConnectionWatcher are created by the // user and should be freed by himself. - auto io = cast(IOWatcher) connection; - if (io !is null) + if (cast(IOWatcher) connection !is null) { - MmapPool.instance.dispose(io); - connection = null; + MmapPool.instance.dispose(connection); } } } @@ -170,13 +149,14 @@ abstract class SelectorLoop : Loop * Params: * transport = Transport. * exception = Exception thrown on sending. - * - * Returns: $(D_KEYWORD true) if the operation could be successfully - * completed or scheduled, $(D_KEYWORD false) otherwise (the - * transport will be destroyed then). */ - protected bool feed(SelectorStreamTransport transport, + protected void feed(StreamTransport transport, SocketException exception = null) @nogc + in + { + assert(transport !is null); + } + body { while (transport.input.length && transport.writeReady) { @@ -204,9 +184,7 @@ abstract class SelectorLoop : Loop assert(watcher !is null); kill(watcher, exception); - return false; } - return true; } /** @@ -215,7 +193,7 @@ abstract class SelectorLoop : Loop * Params: * watcher = Watcher. */ - override void start(ConnectionWatcher watcher) @nogc + override void start(SocketWatcher watcher) @nogc { if (watcher.active) { @@ -261,11 +239,11 @@ abstract class SelectorLoop : Loop break; } - SelectorStreamTransport transport; + StreamTransport transport; if (connections.length > client.handle) { - transport = cast(SelectorStreamTransport) connections[client.handle]; + transport = cast(StreamTransport) connections[client.handle]; } else { @@ -273,7 +251,7 @@ abstract class SelectorLoop : Loop } if (transport is null) { - transport = MmapPool.instance.make!SelectorStreamTransport(this, client); + transport = MmapPool.instance.make!StreamTransport(this, client); connections[client.handle] = transport; } else diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d index b7854a6..70f1f66 100644 --- a/source/tanya/async/loop.d +++ b/source/tanya/async/loop.d @@ -196,7 +196,7 @@ abstract class Loop * Params:
* watcher = Watcher.
*/
- void start(ConnectionWatcher watcher) @nogc
+ void start(SocketWatcher watcher) @nogc
{
if (watcher.active)
{
@@ -234,9 +234,9 @@ abstract class Loop *
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
- abstract protected bool reify(ConnectionWatcher watcher,
- EventMask oldEvents,
- EventMask events) @nogc;
+ abstract protected bool reify(SocketWatcher watcher,
+ EventMask oldEvents,
+ EventMask events) @nogc;
/**
* Returns: The blocking time.
@@ -273,7 +273,7 @@ abstract class Loop * watcher = Watcher.
* exception = Occurred exception.
*/
- protected void kill(IOWatcher watcher, SocketException exception) @nogc
+ protected void kill(IOWatcher watcher, SocketException exception = null) @nogc
in
{
assert(watcher !is null);
diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d index b06bec7..f18fbee 100644 --- a/source/tanya/async/transport.d +++ b/source/tanya/async/transport.d @@ -85,10 +85,3 @@ interface SocketTransport : Transport */
@property Socket socket() pure nothrow @safe @nogc;
}
-
-/**
- * Represents a connection-oriented socket transport.
- */
-package interface StreamTransport : DuplexTransport, SocketTransport
-{
-}
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d index 37742e2..997e033 100644 --- a/source/tanya/async/watcher.d +++ b/source/tanya/async/watcher.d @@ -36,52 +36,66 @@ abstract class Watcher void invoke() @nogc; } -class ConnectionWatcher : Watcher +/** + * Socket watcher. + */ +abstract class SocketWatcher : Watcher { /// Watched socket. protected Socket socket_; - /// Protocol factory. - protected Protocol delegate() @nogc protocolFactory; - - package Queue!DuplexTransport incoming; - /** * Params: * socket = Socket. + * + * Precondition: $(D_INLINECODE socket !is null) */ - this(Socket socket) @nogc + this(Socket socket) pure nothrow @safe @nogc + in + { + assert(socket !is null); + } + body { - incoming = Queue!DuplexTransport(MmapPool.instance); socket_ = socket; } /** - * Destroys the watcher. + * Returns: Socket. */ - ~this() @nogc + @property Socket socket() pure nothrow @safe @nogc { - foreach (w; incoming) - { - MmapPool.instance.dispose(w); - } + return socket_; } +} + +/** + * Connection watcher. + */ +class ConnectionWatcher : SocketWatcher +{ + /// Incoming connection queue. + Queue!DuplexTransport incoming; + + private Protocol delegate() @nogc protocolFactory; /** * Params: - * P = Protocol should be used. + * socket = Socket. */ - void setProtocol(P : Protocol)() @nogc + this(Socket socket) @nogc { - this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P; + super(socket); + incoming = Queue!DuplexTransport(MmapPool.instance); } /** - * Returns: Socket. + * Params: + * P = Protocol should be used. */ - @property Socket socket() pure nothrow @safe @nogc + void setProtocol(P : Protocol)() @nogc { - return socket_; + this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P; } /** @@ -102,21 +116,17 @@ class ConnectionWatcher : Watcher } } -/** - * Contains a pending watcher with the invoked events or a transport can be - * read from. - */ -class IOWatcher : ConnectionWatcher +package abstract class IOWatcher : SocketWatcher, DuplexTransport { package SocketException exception; - protected Protocol protocol_; - - /** - * Returns: Underlying output buffer. - */ package ReadBuffer!ubyte output; + package WriteBuffer!ubyte input; + + /// Application protocol. + protected Protocol protocol_; + /** * Params: * socket = Socket. @@ -124,26 +134,14 @@ class IOWatcher : ConnectionWatcher * Precondition: $(D_INLINECODE socket !is null) */ this(ConnectedSocket socket) @nogc - in - { - assert(socket !is null); - } - body { super(socket); output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); + input = WriteBuffer!ubyte(8192, MmapPool.instance); active = true; } /** - * Destroys the watcher. - */ - ~this() @nogc - { - MmapPool.instance.dispose(protocol_); - } - - /** * Returns: Application protocol. */ @property Protocol protocol() pure nothrow @safe @nogc @@ -152,18 +150,24 @@ class IOWatcher : ConnectionWatcher } /** - * Returns: Socket. + * Switches the protocol. * - * Precondition: $(D_INLINECODE socket !is null) + * The protocol is deallocated by the event loop, it should currently be + * allocated with $(D_PSYMBOL MmapPool). + * + * Params: + * protocol = Application protocol. + * + * Precondition: $(D_INLINECODE protocol !is null) */ - override @property ConnectedSocket socket() pure nothrow @safe @nogc - out (socket) + @property void protocol(Protocol protocol) pure nothrow @safe @nogc + in { - assert(socket !is null); + assert(protocol !is null); } body { - return cast(ConnectedSocket) socket_; + protocol_ = protocol; } /** @@ -179,7 +183,7 @@ class IOWatcher : ConnectionWatcher else { protocol.disconnected(exception); - MmapPool.instance.dispose(protocol); + MmapPool.instance.dispose(protocol_); defaultAllocator.dispose(exception); active = false; } |
