aboutsummaryrefslogtreecommitdiff
path: root/source/tanya/async/watcher.d
diff options
context:
space:
mode:
Diffstat (limited to 'source/tanya/async/watcher.d')
-rw-r--r--source/tanya/async/watcher.d71
1 files changed, 14 insertions, 57 deletions
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d
index 1748434..60972e0 100644
--- a/source/tanya/async/watcher.d
+++ b/source/tanya/async/watcher.d
@@ -53,7 +53,7 @@ class ConnectionWatcher : Watcher
/// Protocol factory.
protected Protocol delegate() @nogc protocolFactory;
- package Queue!IOWatcher incoming;
+ package Queue!DuplexTransport incoming;
/**
* Params:
@@ -61,7 +61,7 @@ class ConnectionWatcher : Watcher
*/
this(Socket socket) @nogc
{
- incoming = Queue!IOWatcher(MmapPool.instance);
+ incoming = Queue!DuplexTransport(MmapPool.instance);
socket_ = socket;
}
@@ -94,26 +94,19 @@ class ConnectionWatcher : Watcher
}
/**
- * Returns: New protocol instance.
+ * Invokes new connection callback.
*/
- @property Protocol protocol() @nogc
+ override void invoke() @nogc
in
{
assert(protocolFactory !is null, "Protocol isn't set.");
}
body
{
- return protocolFactory();
- }
-
- /**
- * Invokes new connection callback.
- */
- override void invoke() @nogc
- {
- foreach (io; incoming)
+ foreach (transport; incoming)
{
- io.protocol.connected(cast(DuplexTransport) io.transport);
+ transport.protocol = protocolFactory();
+ transport.protocol.connected(transport);
}
}
}
@@ -124,10 +117,9 @@ class ConnectionWatcher : Watcher
*/
class IOWatcher : ConnectionWatcher
{
- package StreamTransport transport;
package SocketException exception;
- private Protocol protocol_;
+ protected Protocol protocol_;
/**
* Returns: Underlying output buffer.
@@ -136,27 +128,18 @@ class IOWatcher : ConnectionWatcher
/**
* Params:
- * transport = Transport.
- * socket = Socket.
- * protocol = New instance of the application protocol.
+ * socket = Socket.
*
- * Precondition: $(D_INLINECODE transport !is null
- * && socket !is null
- * && protocol !is null)
+ * Precondition: $(D_INLINECODE socket !is null)
*/
- this(StreamTransport transport, ConnectedSocket socket, Protocol protocol)
- @nogc
+ this(ConnectedSocket socket) @nogc
in
{
- assert(transport !is null);
assert(socket !is null);
- assert(protocol !is null);
}
body
{
super(socket);
- this.transport = transport;
- protocol_ = protocol;
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
active = true;
}
@@ -170,37 +153,9 @@ class IOWatcher : ConnectionWatcher
}
/**
- * Reinitializes the watcher.
- *
- * Params:
- * transport = Transport.
- * socket = Socket.
- * protocol = New instance of the application protocol.
- *
- * Precondition: $(D_INLINECODE transport !is null
- * && socket !is null
- * && protocol !is null)
- */
- void opCall(StreamTransport transport,
- ConnectedSocket socket,
- Protocol protocol) pure nothrow @nogc
- in
- {
- assert(transport !is null);
- assert(socket !is null);
- assert(protocol !is null);
- }
- body
- {
- this.transport = transport;
- protocol_ = protocol;
- active = true;
- }
-
- /**
* Returns: Application protocol.
*/
- override @property Protocol protocol() pure nothrow @safe @nogc
+ @property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
}
@@ -233,6 +188,8 @@ class IOWatcher : ConnectionWatcher
else
{
protocol.disconnected(exception);
+ MmapPool.instance.dispose(protocol);
+ defaultAllocator.dispose(exception);
active = false;
}
}