aboutsummaryrefslogtreecommitdiff
path: root/source/tanya/async/watcher.d
diff options
context:
space:
mode:
Diffstat (limited to 'source/tanya/async/watcher.d')
-rw-r--r--source/tanya/async/watcher.d104
1 files changed, 54 insertions, 50 deletions
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d
index 37742e2..997e033 100644
--- a/source/tanya/async/watcher.d
+++ b/source/tanya/async/watcher.d
@@ -36,52 +36,66 @@ abstract class Watcher
void invoke() @nogc;
}
-class ConnectionWatcher : Watcher
+/**
+ * Socket watcher.
+ */
+abstract class SocketWatcher : Watcher
{
/// Watched socket.
protected Socket socket_;
- /// Protocol factory.
- protected Protocol delegate() @nogc protocolFactory;
-
- package Queue!DuplexTransport incoming;
-
/**
* Params:
* socket = Socket.
+ *
+ * Precondition: $(D_INLINECODE socket !is null)
*/
- this(Socket socket) @nogc
+ this(Socket socket) pure nothrow @safe @nogc
+ in
+ {
+ assert(socket !is null);
+ }
+ body
{
- incoming = Queue!DuplexTransport(MmapPool.instance);
socket_ = socket;
}
/**
- * Destroys the watcher.
+ * Returns: Socket.
*/
- ~this() @nogc
+ @property Socket socket() pure nothrow @safe @nogc
{
- foreach (w; incoming)
- {
- MmapPool.instance.dispose(w);
- }
+ return socket_;
}
+}
+
+/**
+ * Connection watcher.
+ */
+class ConnectionWatcher : SocketWatcher
+{
+ /// Incoming connection queue.
+ Queue!DuplexTransport incoming;
+
+ private Protocol delegate() @nogc protocolFactory;
/**
* Params:
- * P = Protocol should be used.
+ * socket = Socket.
*/
- void setProtocol(P : Protocol)() @nogc
+ this(Socket socket) @nogc
{
- this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
+ super(socket);
+ incoming = Queue!DuplexTransport(MmapPool.instance);
}
/**
- * Returns: Socket.
+ * Params:
+ * P = Protocol should be used.
*/
- @property Socket socket() pure nothrow @safe @nogc
+ void setProtocol(P : Protocol)() @nogc
{
- return socket_;
+ this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
}
/**
@@ -102,21 +116,17 @@ class ConnectionWatcher : Watcher
}
}
-/**
- * Contains a pending watcher with the invoked events or a transport can be
- * read from.
- */
-class IOWatcher : ConnectionWatcher
+package abstract class IOWatcher : SocketWatcher, DuplexTransport
{
package SocketException exception;
- protected Protocol protocol_;
-
- /**
- * Returns: Underlying output buffer.
- */
package ReadBuffer!ubyte output;
+ package WriteBuffer!ubyte input;
+
+ /// Application protocol.
+ protected Protocol protocol_;
+
/**
* Params:
* socket = Socket.
@@ -124,26 +134,14 @@ class IOWatcher : ConnectionWatcher
* Precondition: $(D_INLINECODE socket !is null)
*/
this(ConnectedSocket socket) @nogc
- in
- {
- assert(socket !is null);
- }
- body
{
super(socket);
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
+ input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true;
}
/**
- * Destroys the watcher.
- */
- ~this() @nogc
- {
- MmapPool.instance.dispose(protocol_);
- }
-
- /**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
@@ -152,18 +150,24 @@ class IOWatcher : ConnectionWatcher
}
/**
- * Returns: Socket.
+ * Switches the protocol.
*
- * Precondition: $(D_INLINECODE socket !is null)
+ * The protocol is deallocated by the event loop, it should currently be
+ * allocated with $(D_PSYMBOL MmapPool).
+ *
+ * Params:
+ * protocol = Application protocol.
+ *
+ * Precondition: $(D_INLINECODE protocol !is null)
*/
- override @property ConnectedSocket socket() pure nothrow @safe @nogc
- out (socket)
+ @property void protocol(Protocol protocol) pure nothrow @safe @nogc
+ in
{
- assert(socket !is null);
+ assert(protocol !is null);
}
body
{
- return cast(ConnectedSocket) socket_;
+ protocol_ = protocol;
}
/**
@@ -179,7 +183,7 @@ class IOWatcher : ConnectionWatcher
else
{
protocol.disconnected(exception);
- MmapPool.instance.dispose(protocol);
+ MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception);
active = false;
}