diff options
| -rw-r--r-- | source/tanya/async/event/epoll.d | 22 | ||||
| -rw-r--r-- | source/tanya/async/event/iocp.d | 2 | ||||
| -rw-r--r-- | source/tanya/async/event/selector.d | 51 | ||||
| -rw-r--r-- | source/tanya/async/loop.d | 11 | ||||
| -rw-r--r-- | source/tanya/async/watcher.d | 71 |
5 files changed, 53 insertions, 104 deletions
diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d index 48eac8d..bcc375e 100644 --- a/source/tanya/async/event/epoll.d +++ b/source/tanya/async/event/epoll.d @@ -123,30 +123,27 @@ class EpollLoop : SelectorLoop for (auto i = 0; i < eventCount; ++i)
{
- auto io = cast(IOWatcher) connections[events[i].data.fd];
+ auto transport = cast(SelectorStreamTransport) connections[events[i].data.fd];
- if (io is null)
+ if (transport is null)
{
acceptConnections(connections[events[i].data.fd]);
}
else if (events[i].events & EPOLLERR)
{
- kill(io, null);
+ kill(transport, null);
continue;
}
else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
{
- auto transport = cast(SelectorStreamTransport) io.transport;
- assert(transport !is null);
-
SocketException exception;
try
{
ptrdiff_t received;
do
{
- received = transport.socket.receive(io.output[]);
- io.output += received;
+ received = transport.socket.receive(transport.output[]);
+ transport.output += received;
}
while (received);
}
@@ -156,19 +153,16 @@ class EpollLoop : SelectorLoop }
if (transport.socket.disconnected)
{
- kill(io, exception);
+ kill(transport, exception);
continue;
}
- else if (io.output.length)
+ else if (transport.output.length)
{
- pendings.enqueue(io);
+ pendings.enqueue(transport);
}
}
if (events[i].events & EPOLLOUT)
{
- auto transport = cast(SelectorStreamTransport) io.transport;
- assert(transport !is null);
-
transport.writeReady = true;
if (transport.input.length)
{
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d index 9464ecf..ac84fa3 100644 --- a/source/tanya/async/event/iocp.d +++ b/source/tanya/async/event/iocp.d @@ -259,7 +259,7 @@ class IOCPLoop : Loop auto transport = MmapPool.instance.make!IOCPStreamTransport(socket, protocol); auto io = MmapPool.instance.make!IOWatcher(transport, socket, protocol); - connection.incoming.enqueue(io); + connection.incoming.enqueue(transport); reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d index db79c78..3aa105c 100644 --- a/source/tanya/async/event/selector.d +++ b/source/tanya/async/event/selector.d @@ -25,12 +25,8 @@ import tanya.network.socket; /** * Transport for stream sockets. */ -class SelectorStreamTransport : StreamTransport +class SelectorStreamTransport : IOWatcher, StreamTransport { - private ConnectedSocket socket_; - - private Protocol protocol_; - /// Input buffer. package WriteBuffer!ubyte input; @@ -43,41 +39,38 @@ class SelectorStreamTransport : StreamTransport * Params: * loop = Event loop. * socket = Socket. - * protocol = Application protocol. * - * Precondition: $(D_INLINECODE loop !is null - * && socket !is null - * && protocol !is null) + * Precondition: $(D_INLINECODE loop !is null && socket !is null) */ - this(SelectorLoop loop, ConnectedSocket socket, Protocol protocol) @nogc + this(SelectorLoop loop, ConnectedSocket socket) @nogc in { assert(loop !is null); assert(socket !is null); - assert(protocol !is null); } body { - socket_ = socket; + super(socket); this.loop = loop; - protocol_ = protocol; input = WriteBuffer!ubyte(8192, MmapPool.instance); } /** * Returns: Socket. */ - ConnectedSocket socket() pure nothrow @safe @nogc + override @property ConnectedSocket socket() pure nothrow @safe @nogc { - return socket_; + return cast(ConnectedSocket) socket_; } - /** - * Returns: Application protocol. - */ - @property Protocol protocol() pure nothrow @safe @nogc + private @property void socket(ConnectedSocket socket) pure nothrow @safe @nogc + in { - return protocol_; + assert(socket !is null); + } + body + { + socket_ = socket; } /** @@ -268,30 +261,28 @@ abstract class SelectorLoop : Loop break; } - IOWatcher io; - auto protocol = connection.protocol; - auto transport = MmapPool.instance.make!SelectorStreamTransport(this, client, protocol); + SelectorStreamTransport transport; if (connections.length > client.handle) { - io = cast(IOWatcher) connections[client.handle]; + transport = cast(SelectorStreamTransport) connections[client.handle]; } else { connections.length = client.handle + maxEvents / 2; } - if (io is null) + if (transport is null) { - io = MmapPool.instance.make!IOWatcher(transport, client, protocol); - connections[client.handle] = io; + transport = MmapPool.instance.make!SelectorStreamTransport(this, client); + connections[client.handle] = transport; } else { - io(transport, client, protocol); + transport.socket = client; } - reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); - connection.incoming.enqueue(io); + reify(transport, EventMask(Event.none), EventMask(Event.read, Event.write)); + connection.incoming.enqueue(transport); } if (!connection.incoming.empty) diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d index a9ed586..b7854a6 100644 --- a/source/tanya/async/loop.d +++ b/source/tanya/async/loop.d @@ -68,7 +68,6 @@ import core.time; import std.algorithm.iteration;
import std.algorithm.mutation;
import std.typecons;
-import tanya.async.protocol;
import tanya.async.transport;
import tanya.async.watcher;
import tanya.container.buffer;
@@ -269,12 +268,20 @@ abstract class Loop /**
* Kills the watcher and closes the connection.
+ *
+ * Params:
+ * watcher = Watcher.
+ * exception = Occurred exception.
*/
protected void kill(IOWatcher watcher, SocketException exception) @nogc
+ in
+ {
+ assert(watcher !is null);
+ }
+ body
{
watcher.socket.shutdown();
defaultAllocator.dispose(watcher.socket);
- MmapPool.instance.dispose(watcher.transport);
watcher.exception = exception;
pendings.enqueue(watcher);
}
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d index 1748434..60972e0 100644 --- a/source/tanya/async/watcher.d +++ b/source/tanya/async/watcher.d @@ -53,7 +53,7 @@ class ConnectionWatcher : Watcher /// Protocol factory. protected Protocol delegate() @nogc protocolFactory; - package Queue!IOWatcher incoming; + package Queue!DuplexTransport incoming; /** * Params: @@ -61,7 +61,7 @@ class ConnectionWatcher : Watcher */ this(Socket socket) @nogc { - incoming = Queue!IOWatcher(MmapPool.instance); + incoming = Queue!DuplexTransport(MmapPool.instance); socket_ = socket; } @@ -94,26 +94,19 @@ class ConnectionWatcher : Watcher } /** - * Returns: New protocol instance. + * Invokes new connection callback. */ - @property Protocol protocol() @nogc + override void invoke() @nogc in { assert(protocolFactory !is null, "Protocol isn't set."); } body { - return protocolFactory(); - } - - /** - * Invokes new connection callback. - */ - override void invoke() @nogc - { - foreach (io; incoming) + foreach (transport; incoming) { - io.protocol.connected(cast(DuplexTransport) io.transport); + transport.protocol = protocolFactory(); + transport.protocol.connected(transport); } } } @@ -124,10 +117,9 @@ class ConnectionWatcher : Watcher */ class IOWatcher : ConnectionWatcher { - package StreamTransport transport; package SocketException exception; - private Protocol protocol_; + protected Protocol protocol_; /** * Returns: Underlying output buffer. @@ -136,27 +128,18 @@ class IOWatcher : ConnectionWatcher /** * Params: - * transport = Transport. - * socket = Socket. - * protocol = New instance of the application protocol. + * socket = Socket. * - * Precondition: $(D_INLINECODE transport !is null - * && socket !is null - * && protocol !is null) + * Precondition: $(D_INLINECODE socket !is null) */ - this(StreamTransport transport, ConnectedSocket socket, Protocol protocol) - @nogc + this(ConnectedSocket socket) @nogc in { - assert(transport !is null); assert(socket !is null); - assert(protocol !is null); } body { super(socket); - this.transport = transport; - protocol_ = protocol; output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); active = true; } @@ -170,37 +153,9 @@ class IOWatcher : ConnectionWatcher } /** - * Reinitializes the watcher. - * - * Params: - * transport = Transport. - * socket = Socket. - * protocol = New instance of the application protocol. - * - * Precondition: $(D_INLINECODE transport !is null - * && socket !is null - * && protocol !is null) - */ - void opCall(StreamTransport transport, - ConnectedSocket socket, - Protocol protocol) pure nothrow @nogc - in - { - assert(transport !is null); - assert(socket !is null); - assert(protocol !is null); - } - body - { - this.transport = transport; - protocol_ = protocol; - active = true; - } - - /** * Returns: Application protocol. */ - override @property Protocol protocol() pure nothrow @safe @nogc + @property Protocol protocol() pure nothrow @safe @nogc { return protocol_; } @@ -233,6 +188,8 @@ class IOWatcher : ConnectionWatcher else { protocol.disconnected(exception); + MmapPool.instance.dispose(protocol); + defaultAllocator.dispose(exception); active = false; } } |
