aboutsummaryrefslogtreecommitdiff
path: root/source/tanya/async/watcher.d
diff options
context:
space:
mode:
Diffstat (limited to 'source/tanya/async/watcher.d')
-rw-r--r--source/tanya/async/watcher.d128
1 files changed, 29 insertions, 99 deletions
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d
index 37742e2..9756d99 100644
--- a/source/tanya/async/watcher.d
+++ b/source/tanya/async/watcher.d
@@ -36,44 +36,28 @@ abstract class Watcher
void invoke() @nogc;
}
-class ConnectionWatcher : Watcher
+/**
+ * Socket watcher.
+ */
+abstract class SocketWatcher : Watcher
{
/// Watched socket.
protected Socket socket_;
- /// Protocol factory.
- protected Protocol delegate() @nogc protocolFactory;
-
- package Queue!DuplexTransport incoming;
-
/**
* Params:
* socket = Socket.
+ *
+ * Precondition: $(D_INLINECODE socket !is null)
*/
- this(Socket socket) @nogc
- {
- incoming = Queue!DuplexTransport(MmapPool.instance);
- socket_ = socket;
- }
-
- /**
- * Destroys the watcher.
- */
- ~this() @nogc
+ this(Socket socket) pure nothrow @safe @nogc
+ in
{
- foreach (w; incoming)
- {
- MmapPool.instance.dispose(w);
- }
+ assert(socket !is null);
}
-
- /**
- * Params:
- * P = Protocol should be used.
- */
- void setProtocol(P : Protocol)() @nogc
+ body
{
- this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
+ socket_ = socket;
}
/**
@@ -83,105 +67,51 @@ class ConnectionWatcher : Watcher
{
return socket_;
}
-
- /**
- * Invokes new connection callback.
- */
- override void invoke() @nogc
- in
- {
- assert(protocolFactory !is null, "Protocol isn't set.");
- }
- body
- {
- foreach (transport; incoming)
- {
- transport.protocol = protocolFactory();
- transport.protocol.connected(transport);
- }
- }
}
/**
- * Contains a pending watcher with the invoked events or a transport can be
- * read from.
+ * Connection watcher.
*/
-class IOWatcher : ConnectionWatcher
+class ConnectionWatcher : SocketWatcher
{
- package SocketException exception;
-
- protected Protocol protocol_;
+ /// Incoming connection queue.
+ Queue!DuplexTransport incoming;
- /**
- * Returns: Underlying output buffer.
- */
- package ReadBuffer!ubyte output;
+ private Protocol delegate() @nogc protocolFactory;
/**
* Params:
* socket = Socket.
- *
- * Precondition: $(D_INLINECODE socket !is null)
*/
- this(ConnectedSocket socket) @nogc
- in
- {
- assert(socket !is null);
- }
- body
+ this(Socket socket) @nogc
{
super(socket);
- output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
- active = true;
- }
-
- /**
- * Destroys the watcher.
- */
- ~this() @nogc
- {
- MmapPool.instance.dispose(protocol_);
+ incoming = Queue!DuplexTransport(MmapPool.instance);
}
/**
- * Returns: Application protocol.
+ * Params:
+ * P = Protocol should be used.
*/
- @property Protocol protocol() pure nothrow @safe @nogc
+ void setProtocol(P : Protocol)() @nogc
{
- return protocol_;
+ this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
}
/**
- * Returns: Socket.
- *
- * Precondition: $(D_INLINECODE socket !is null)
+ * Invokes new connection callback.
*/
- override @property ConnectedSocket socket() pure nothrow @safe @nogc
- out (socket)
+ override void invoke() @nogc
+ in
{
- assert(socket !is null);
+ assert(protocolFactory !is null, "Protocol isn't set.");
}
body
{
- return cast(ConnectedSocket) socket_;
- }
-
- /**
- * Invokes the watcher callback.
- */
- override void invoke() @nogc
- {
- if (output.length)
- {
- protocol.received(output[0 .. $]);
- output.clear();
- }
- else
+ foreach (transport; incoming)
{
- protocol.disconnected(exception);
- MmapPool.instance.dispose(protocol);
- defaultAllocator.dispose(exception);
- active = false;
+ transport.protocol = protocolFactory();
+ transport.protocol.connected(transport);
}
}
}