summaryrefslogtreecommitdiff
path: root/source
diff options
context:
space:
mode:
Diffstat (limited to 'source')
-rw-r--r--source/tanya/async/event/epoll.d22
-rw-r--r--source/tanya/async/event/iocp.d2
-rw-r--r--source/tanya/async/event/selector.d51
-rw-r--r--source/tanya/async/loop.d11
-rw-r--r--source/tanya/async/watcher.d71
5 files changed, 53 insertions, 104 deletions
diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d
index 48eac8d..bcc375e 100644
--- a/source/tanya/async/event/epoll.d
+++ b/source/tanya/async/event/epoll.d
@@ -123,30 +123,27 @@ class EpollLoop : SelectorLoop
for (auto i = 0; i < eventCount; ++i)
{
- auto io = cast(IOWatcher) connections[events[i].data.fd];
+ auto transport = cast(SelectorStreamTransport) connections[events[i].data.fd];
- if (io is null)
+ if (transport is null)
{
acceptConnections(connections[events[i].data.fd]);
}
else if (events[i].events & EPOLLERR)
{
- kill(io, null);
+ kill(transport, null);
continue;
}
else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
{
- auto transport = cast(SelectorStreamTransport) io.transport;
- assert(transport !is null);
-
SocketException exception;
try
{
ptrdiff_t received;
do
{
- received = transport.socket.receive(io.output[]);
- io.output += received;
+ received = transport.socket.receive(transport.output[]);
+ transport.output += received;
}
while (received);
}
@@ -156,19 +153,16 @@ class EpollLoop : SelectorLoop
}
if (transport.socket.disconnected)
{
- kill(io, exception);
+ kill(transport, exception);
continue;
}
- else if (io.output.length)
+ else if (transport.output.length)
{
- pendings.enqueue(io);
+ pendings.enqueue(transport);
}
}
if (events[i].events & EPOLLOUT)
{
- auto transport = cast(SelectorStreamTransport) io.transport;
- assert(transport !is null);
-
transport.writeReady = true;
if (transport.input.length)
{
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d
index 9464ecf..ac84fa3 100644
--- a/source/tanya/async/event/iocp.d
+++ b/source/tanya/async/event/iocp.d
@@ -259,7 +259,7 @@ class IOCPLoop : Loop
auto transport = MmapPool.instance.make!IOCPStreamTransport(socket, protocol);
auto io = MmapPool.instance.make!IOWatcher(transport, socket, protocol);
- connection.incoming.enqueue(io);
+ connection.incoming.enqueue(transport);
reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d
index db79c78..3aa105c 100644
--- a/source/tanya/async/event/selector.d
+++ b/source/tanya/async/event/selector.d
@@ -25,12 +25,8 @@ import tanya.network.socket;
/**
* Transport for stream sockets.
*/
-class SelectorStreamTransport : StreamTransport
+class SelectorStreamTransport : IOWatcher, StreamTransport
{
- private ConnectedSocket socket_;
-
- private Protocol protocol_;
-
/// Input buffer.
package WriteBuffer!ubyte input;
@@ -43,41 +39,38 @@ class SelectorStreamTransport : StreamTransport
* Params:
* loop = Event loop.
* socket = Socket.
- * protocol = Application protocol.
*
- * Precondition: $(D_INLINECODE loop !is null
- * && socket !is null
- * && protocol !is null)
+ * Precondition: $(D_INLINECODE loop !is null && socket !is null)
*/
- this(SelectorLoop loop, ConnectedSocket socket, Protocol protocol) @nogc
+ this(SelectorLoop loop, ConnectedSocket socket) @nogc
in
{
assert(loop !is null);
assert(socket !is null);
- assert(protocol !is null);
}
body
{
- socket_ = socket;
+ super(socket);
this.loop = loop;
- protocol_ = protocol;
input = WriteBuffer!ubyte(8192, MmapPool.instance);
}
/**
* Returns: Socket.
*/
- ConnectedSocket socket() pure nothrow @safe @nogc
+ override @property ConnectedSocket socket() pure nothrow @safe @nogc
{
- return socket_;
+ return cast(ConnectedSocket) socket_;
}
- /**
- * Returns: Application protocol.
- */
- @property Protocol protocol() pure nothrow @safe @nogc
+ private @property void socket(ConnectedSocket socket) pure nothrow @safe @nogc
+ in
{
- return protocol_;
+ assert(socket !is null);
+ }
+ body
+ {
+ socket_ = socket;
}
/**
@@ -268,30 +261,28 @@ abstract class SelectorLoop : Loop
break;
}
- IOWatcher io;
- auto protocol = connection.protocol;
- auto transport = MmapPool.instance.make!SelectorStreamTransport(this, client, protocol);
+ SelectorStreamTransport transport;
if (connections.length > client.handle)
{
- io = cast(IOWatcher) connections[client.handle];
+ transport = cast(SelectorStreamTransport) connections[client.handle];
}
else
{
connections.length = client.handle + maxEvents / 2;
}
- if (io is null)
+ if (transport is null)
{
- io = MmapPool.instance.make!IOWatcher(transport, client, protocol);
- connections[client.handle] = io;
+ transport = MmapPool.instance.make!SelectorStreamTransport(this, client);
+ connections[client.handle] = transport;
}
else
{
- io(transport, client, protocol);
+ transport.socket = client;
}
- reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
- connection.incoming.enqueue(io);
+ reify(transport, EventMask(Event.none), EventMask(Event.read, Event.write));
+ connection.incoming.enqueue(transport);
}
if (!connection.incoming.empty)
diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d
index a9ed586..b7854a6 100644
--- a/source/tanya/async/loop.d
+++ b/source/tanya/async/loop.d
@@ -68,7 +68,6 @@ import core.time;
import std.algorithm.iteration;
import std.algorithm.mutation;
import std.typecons;
-import tanya.async.protocol;
import tanya.async.transport;
import tanya.async.watcher;
import tanya.container.buffer;
@@ -269,12 +268,20 @@ abstract class Loop
/**
* Kills the watcher and closes the connection.
+ *
+ * Params:
+ * watcher = Watcher.
+ * exception = Occurred exception.
*/
protected void kill(IOWatcher watcher, SocketException exception) @nogc
+ in
+ {
+ assert(watcher !is null);
+ }
+ body
{
watcher.socket.shutdown();
defaultAllocator.dispose(watcher.socket);
- MmapPool.instance.dispose(watcher.transport);
watcher.exception = exception;
pendings.enqueue(watcher);
}
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d
index 1748434..60972e0 100644
--- a/source/tanya/async/watcher.d
+++ b/source/tanya/async/watcher.d
@@ -53,7 +53,7 @@ class ConnectionWatcher : Watcher
/// Protocol factory.
protected Protocol delegate() @nogc protocolFactory;
- package Queue!IOWatcher incoming;
+ package Queue!DuplexTransport incoming;
/**
* Params:
@@ -61,7 +61,7 @@ class ConnectionWatcher : Watcher
*/
this(Socket socket) @nogc
{
- incoming = Queue!IOWatcher(MmapPool.instance);
+ incoming = Queue!DuplexTransport(MmapPool.instance);
socket_ = socket;
}
@@ -94,26 +94,19 @@ class ConnectionWatcher : Watcher
}
/**
- * Returns: New protocol instance.
+ * Invokes new connection callback.
*/
- @property Protocol protocol() @nogc
+ override void invoke() @nogc
in
{
assert(protocolFactory !is null, "Protocol isn't set.");
}
body
{
- return protocolFactory();
- }
-
- /**
- * Invokes new connection callback.
- */
- override void invoke() @nogc
- {
- foreach (io; incoming)
+ foreach (transport; incoming)
{
- io.protocol.connected(cast(DuplexTransport) io.transport);
+ transport.protocol = protocolFactory();
+ transport.protocol.connected(transport);
}
}
}
@@ -124,10 +117,9 @@ class ConnectionWatcher : Watcher
*/
class IOWatcher : ConnectionWatcher
{
- package StreamTransport transport;
package SocketException exception;
- private Protocol protocol_;
+ protected Protocol protocol_;
/**
* Returns: Underlying output buffer.
@@ -136,27 +128,18 @@ class IOWatcher : ConnectionWatcher
/**
* Params:
- * transport = Transport.
- * socket = Socket.
- * protocol = New instance of the application protocol.
+ * socket = Socket.
*
- * Precondition: $(D_INLINECODE transport !is null
- * && socket !is null
- * && protocol !is null)
+ * Precondition: $(D_INLINECODE socket !is null)
*/
- this(StreamTransport transport, ConnectedSocket socket, Protocol protocol)
- @nogc
+ this(ConnectedSocket socket) @nogc
in
{
- assert(transport !is null);
assert(socket !is null);
- assert(protocol !is null);
}
body
{
super(socket);
- this.transport = transport;
- protocol_ = protocol;
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
active = true;
}
@@ -170,37 +153,9 @@ class IOWatcher : ConnectionWatcher
}
/**
- * Reinitializes the watcher.
- *
- * Params:
- * transport = Transport.
- * socket = Socket.
- * protocol = New instance of the application protocol.
- *
- * Precondition: $(D_INLINECODE transport !is null
- * && socket !is null
- * && protocol !is null)
- */
- void opCall(StreamTransport transport,
- ConnectedSocket socket,
- Protocol protocol) pure nothrow @nogc
- in
- {
- assert(transport !is null);
- assert(socket !is null);
- assert(protocol !is null);
- }
- body
- {
- this.transport = transport;
- protocol_ = protocol;
- active = true;
- }
-
- /**
* Returns: Application protocol.
*/
- override @property Protocol protocol() pure nothrow @safe @nogc
+ @property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
}
@@ -233,6 +188,8 @@ class IOWatcher : ConnectionWatcher
else
{
protocol.disconnected(exception);
+ MmapPool.instance.dispose(protocol);
+ defaultAllocator.dispose(exception);
active = false;
}
}