summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEugen Wissner <belka@caraus.de>2017-02-09 02:58:59 +0100
committerEugene Wissner <belka@caraus.de>2017-02-09 21:40:52 +0100
commit0e91ea6786ea20fa84691d8e7835055bdf7e9784 (patch)
treeffbe30a74bf25291eb8a55b3bff9d96108104fd7
parent63c6226a2ae758d68429667a07ec723d09007c4e (diff)
downloadtanya-0e91ea6786ea20fa84691d8e7835055bdf7e9784.tar.gz
Pass client socket directly to the IOWatcher
-rw-r--r--source/tanya/async/event/iocp.d2
-rw-r--r--source/tanya/async/event/selector.d4
-rw-r--r--source/tanya/async/watcher.d83
3 files changed, 41 insertions, 48 deletions
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d
index 08d60ac..9464ecf 100644
--- a/source/tanya/async/event/iocp.d
+++ b/source/tanya/async/event/iocp.d
@@ -257,7 +257,7 @@ class IOCPLoop : Loop
auto socket = listener.endAccept(overlapped);
auto protocol = connection.protocol;
auto transport = MmapPool.instance.make!IOCPStreamTransport(socket, protocol);
- auto io = MmapPool.instance.make!IOWatcher(transport, protocol);
+ auto io = MmapPool.instance.make!IOWatcher(transport, socket, protocol);
connection.incoming.enqueue(io);
diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d
index 76b0530..db79c78 100644
--- a/source/tanya/async/event/selector.d
+++ b/source/tanya/async/event/selector.d
@@ -282,12 +282,12 @@ abstract class SelectorLoop : Loop
}
if (io is null)
{
- io = MmapPool.instance.make!IOWatcher(transport, protocol);
+ io = MmapPool.instance.make!IOWatcher(transport, client, protocol);
connections[client.handle] = io;
}
else
{
- io(transport, protocol);
+ io(transport, client, protocol);
}
reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d
index de23aa1..1748434 100644
--- a/source/tanya/async/watcher.d
+++ b/source/tanya/async/watcher.d
@@ -48,7 +48,7 @@ abstract class Watcher
class ConnectionWatcher : Watcher
{
/// Watched socket.
- private Socket socket_;
+ protected Socket socket_;
/// Protocol factory.
protected Protocol delegate() @nogc protocolFactory;
@@ -65,11 +65,9 @@ class ConnectionWatcher : Watcher
socket_ = socket;
}
- /// Ditto.
- protected this() pure nothrow @safe @nogc
- {
- }
-
+ /**
+ * Destroys the watcher.
+ */
~this() @nogc
{
foreach (w; incoming)
@@ -90,7 +88,7 @@ class ConnectionWatcher : Watcher
/**
* Returns: Socket.
*/
- @property Socket socket() pure nothrow @nogc
+ @property Socket socket() pure nothrow @safe @nogc
{
return socket_;
}
@@ -126,12 +124,8 @@ class ConnectionWatcher : Watcher
*/
class IOWatcher : ConnectionWatcher
{
- /// If an exception was thrown the transport should be already invalid.
- private union
- {
- StreamTransport transport_;
- SocketException exception_;
- }
+ package StreamTransport transport;
+ package SocketException exception;
private Protocol protocol_;
@@ -143,18 +137,25 @@ class IOWatcher : ConnectionWatcher
/**
* Params:
* transport = Transport.
+ * socket = Socket.
* protocol = New instance of the application protocol.
+ *
+ * Precondition: $(D_INLINECODE transport !is null
+ * && socket !is null
+ * && protocol !is null)
*/
- this(StreamTransport transport, Protocol protocol) @nogc
+ this(StreamTransport transport, ConnectedSocket socket, Protocol protocol)
+ @nogc
in
{
assert(transport !is null);
+ assert(socket !is null);
assert(protocol !is null);
}
body
{
- super();
- transport_ = transport;
+ super(socket);
+ this.transport = transport;
protocol_ = protocol;
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
active = true;
@@ -169,46 +170,31 @@ class IOWatcher : ConnectionWatcher
}
/**
- * Assigns a transport.
+ * Reinitializes the watcher.
*
* Params:
* transport = Transport.
- * protocol = Application protocol.
+ * socket = Socket.
+ * protocol = New instance of the application protocol.
*
- * Returns: $(D_KEYWORD this).
+ * Precondition: $(D_INLINECODE transport !is null
+ * && socket !is null
+ * && protocol !is null)
*/
- IOWatcher opCall(StreamTransport transport, Protocol protocol)
- pure nothrow @nogc
+ void opCall(StreamTransport transport,
+ ConnectedSocket socket,
+ Protocol protocol) pure nothrow @nogc
in
{
assert(transport !is null);
+ assert(socket !is null);
assert(protocol !is null);
}
body
{
- transport_ = transport;
+ this.transport = transport;
protocol_ = protocol;
active = true;
- return this;
- }
-
- /**
- * Returns: Transport used by this watcher.
- */
- @property inout(StreamTransport) transport() inout pure nothrow @nogc
- {
- return transport_;
- }
-
- /**
- * Sets an exception occurred during a read/write operation.
- *
- * Params:
- * exception = Thrown exception.
- */
- @property void exception(SocketException exception) pure nothrow @nogc
- {
- exception_ = exception;
}
/**
@@ -221,10 +207,17 @@ class IOWatcher : ConnectionWatcher
/**
* Returns: Socket.
+ *
+ * Precondition: $(D_INLINECODE socket !is null)
*/
- override @property Socket socket() pure nothrow @nogc
+ override @property ConnectedSocket socket() pure nothrow @safe @nogc
+ out (socket)
+ {
+ assert(socket !is null);
+ }
+ body
{
- return transport.socket;
+ return cast(ConnectedSocket) socket_;
}
/**
@@ -239,7 +232,7 @@ class IOWatcher : ConnectionWatcher
}
else
{
- protocol.disconnected(exception_);
+ protocol.disconnected(exception);
active = false;
}
}