diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d index 48eac8d..bcc375e 100644 --- a/source/tanya/async/event/epoll.d +++ b/source/tanya/async/event/epoll.d @@ -123,30 +123,27 @@ class EpollLoop : SelectorLoop for (auto i = 0; i < eventCount; ++i) { - auto io = cast(IOWatcher) connections[events[i].data.fd]; + auto transport = cast(SelectorStreamTransport) connections[events[i].data.fd]; - if (io is null) + if (transport is null) { acceptConnections(connections[events[i].data.fd]); } else if (events[i].events & EPOLLERR) { - kill(io, null); + kill(transport, null); continue; } else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP)) { - auto transport = cast(SelectorStreamTransport) io.transport; - assert(transport !is null); - SocketException exception; try { ptrdiff_t received; do { - received = transport.socket.receive(io.output[]); - io.output += received; + received = transport.socket.receive(transport.output[]); + transport.output += received; } while (received); } @@ -156,19 +153,16 @@ class EpollLoop : SelectorLoop } if (transport.socket.disconnected) { - kill(io, exception); + kill(transport, exception); continue; } - else if (io.output.length) + else if (transport.output.length) { - pendings.enqueue(io); + pendings.enqueue(transport); } } if (events[i].events & EPOLLOUT) { - auto transport = cast(SelectorStreamTransport) io.transport; - assert(transport !is null); - transport.writeReady = true; if (transport.input.length) { diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d index 9464ecf..ac84fa3 100644 --- a/source/tanya/async/event/iocp.d +++ b/source/tanya/async/event/iocp.d @@ -259,7 +259,7 @@ class IOCPLoop : Loop auto transport = MmapPool.instance.make!IOCPStreamTransport(socket, protocol); auto io = MmapPool.instance.make!IOWatcher(transport, socket, protocol); - connection.incoming.enqueue(io); + connection.incoming.enqueue(transport); reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d index db79c78..3aa105c 100644 --- a/source/tanya/async/event/selector.d +++ b/source/tanya/async/event/selector.d @@ -25,12 +25,8 @@ import tanya.network.socket; /** * Transport for stream sockets. */ -class SelectorStreamTransport : StreamTransport +class SelectorStreamTransport : IOWatcher, StreamTransport { - private ConnectedSocket socket_; - - private Protocol protocol_; - /// Input buffer. package WriteBuffer!ubyte input; @@ -43,41 +39,38 @@ class SelectorStreamTransport : StreamTransport * Params: * loop = Event loop. * socket = Socket. - * protocol = Application protocol. * - * Precondition: $(D_INLINECODE loop !is null - * && socket !is null - * && protocol !is null) + * Precondition: $(D_INLINECODE loop !is null && socket !is null) */ - this(SelectorLoop loop, ConnectedSocket socket, Protocol protocol) @nogc + this(SelectorLoop loop, ConnectedSocket socket) @nogc in { assert(loop !is null); assert(socket !is null); - assert(protocol !is null); } body { - socket_ = socket; + super(socket); this.loop = loop; - protocol_ = protocol; input = WriteBuffer!ubyte(8192, MmapPool.instance); } /** * Returns: Socket. */ - ConnectedSocket socket() pure nothrow @safe @nogc + override @property ConnectedSocket socket() pure nothrow @safe @nogc { - return socket_; + return cast(ConnectedSocket) socket_; } - /** - * Returns: Application protocol. - */ - @property Protocol protocol() pure nothrow @safe @nogc + private @property void socket(ConnectedSocket socket) pure nothrow @safe @nogc + in { - return protocol_; + assert(socket !is null); + } + body + { + socket_ = socket; } /** @@ -268,30 +261,28 @@ abstract class SelectorLoop : Loop break; } - IOWatcher io; - auto protocol = connection.protocol; - auto transport = MmapPool.instance.make!SelectorStreamTransport(this, client, protocol); + SelectorStreamTransport transport; if (connections.length > client.handle) { - io = cast(IOWatcher) connections[client.handle]; + transport = cast(SelectorStreamTransport) connections[client.handle]; } else { connections.length = client.handle + maxEvents / 2; } - if (io is null) + if (transport is null) { - io = MmapPool.instance.make!IOWatcher(transport, client, protocol); - connections[client.handle] = io; + transport = MmapPool.instance.make!SelectorStreamTransport(this, client); + connections[client.handle] = transport; } else { - io(transport, client, protocol); + transport.socket = client; } - reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); - connection.incoming.enqueue(io); + reify(transport, EventMask(Event.none), EventMask(Event.read, Event.write)); + connection.incoming.enqueue(transport); } if (!connection.incoming.empty) diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d index a9ed586..b7854a6 100644 --- a/source/tanya/async/loop.d +++ b/source/tanya/async/loop.d @@ -68,7 +68,6 @@ import core.time; import std.algorithm.iteration; import std.algorithm.mutation; import std.typecons; -import tanya.async.protocol; import tanya.async.transport; import tanya.async.watcher; import tanya.container.buffer; @@ -269,12 +268,20 @@ abstract class Loop /** * Kills the watcher and closes the connection. + * + * Params: + * watcher = Watcher. + * exception = Occurred exception. */ protected void kill(IOWatcher watcher, SocketException exception) @nogc + in + { + assert(watcher !is null); + } + body { watcher.socket.shutdown(); defaultAllocator.dispose(watcher.socket); - MmapPool.instance.dispose(watcher.transport); watcher.exception = exception; pendings.enqueue(watcher); } diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d index 1748434..60972e0 100644 --- a/source/tanya/async/watcher.d +++ b/source/tanya/async/watcher.d @@ -53,7 +53,7 @@ class ConnectionWatcher : Watcher /// Protocol factory. protected Protocol delegate() @nogc protocolFactory; - package Queue!IOWatcher incoming; + package Queue!DuplexTransport incoming; /** * Params: @@ -61,7 +61,7 @@ class ConnectionWatcher : Watcher */ this(Socket socket) @nogc { - incoming = Queue!IOWatcher(MmapPool.instance); + incoming = Queue!DuplexTransport(MmapPool.instance); socket_ = socket; } @@ -94,26 +94,19 @@ class ConnectionWatcher : Watcher } /** - * Returns: New protocol instance. + * Invokes new connection callback. */ - @property Protocol protocol() @nogc + override void invoke() @nogc in { assert(protocolFactory !is null, "Protocol isn't set."); } body { - return protocolFactory(); - } - - /** - * Invokes new connection callback. - */ - override void invoke() @nogc - { - foreach (io; incoming) + foreach (transport; incoming) { - io.protocol.connected(cast(DuplexTransport) io.transport); + transport.protocol = protocolFactory(); + transport.protocol.connected(transport); } } } @@ -124,10 +117,9 @@ class ConnectionWatcher : Watcher */ class IOWatcher : ConnectionWatcher { - package StreamTransport transport; package SocketException exception; - private Protocol protocol_; + protected Protocol protocol_; /** * Returns: Underlying output buffer. @@ -136,27 +128,18 @@ class IOWatcher : ConnectionWatcher /** * Params: - * transport = Transport. - * socket = Socket. - * protocol = New instance of the application protocol. + * socket = Socket. * - * Precondition: $(D_INLINECODE transport !is null - * && socket !is null - * && protocol !is null) + * Precondition: $(D_INLINECODE socket !is null) */ - this(StreamTransport transport, ConnectedSocket socket, Protocol protocol) - @nogc + this(ConnectedSocket socket) @nogc in { - assert(transport !is null); assert(socket !is null); - assert(protocol !is null); } body { super(socket); - this.transport = transport; - protocol_ = protocol; output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); active = true; } @@ -169,38 +152,10 @@ class IOWatcher : ConnectionWatcher MmapPool.instance.dispose(protocol_); } - /** - * Reinitializes the watcher. - * - * Params: - * transport = Transport. - * socket = Socket. - * protocol = New instance of the application protocol. - * - * Precondition: $(D_INLINECODE transport !is null - * && socket !is null - * && protocol !is null) - */ - void opCall(StreamTransport transport, - ConnectedSocket socket, - Protocol protocol) pure nothrow @nogc - in - { - assert(transport !is null); - assert(socket !is null); - assert(protocol !is null); - } - body - { - this.transport = transport; - protocol_ = protocol; - active = true; - } - /** * Returns: Application protocol. */ - override @property Protocol protocol() pure nothrow @safe @nogc + @property Protocol protocol() pure nothrow @safe @nogc { return protocol_; } @@ -233,6 +188,8 @@ class IOWatcher : ConnectionWatcher else { protocol.disconnected(exception); + MmapPool.instance.dispose(protocol); + defaultAllocator.dispose(exception); active = false; } }