diff options
Diffstat (limited to 'source/tanya/async/watcher.d')
| -rw-r--r-- | source/tanya/async/watcher.d | 71 |
1 files changed, 14 insertions, 57 deletions
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d index 1748434..60972e0 100644 --- a/source/tanya/async/watcher.d +++ b/source/tanya/async/watcher.d @@ -53,7 +53,7 @@ class ConnectionWatcher : Watcher /// Protocol factory. protected Protocol delegate() @nogc protocolFactory; - package Queue!IOWatcher incoming; + package Queue!DuplexTransport incoming; /** * Params: @@ -61,7 +61,7 @@ class ConnectionWatcher : Watcher */ this(Socket socket) @nogc { - incoming = Queue!IOWatcher(MmapPool.instance); + incoming = Queue!DuplexTransport(MmapPool.instance); socket_ = socket; } @@ -94,26 +94,19 @@ class ConnectionWatcher : Watcher } /** - * Returns: New protocol instance. + * Invokes new connection callback. */ - @property Protocol protocol() @nogc + override void invoke() @nogc in { assert(protocolFactory !is null, "Protocol isn't set."); } body { - return protocolFactory(); - } - - /** - * Invokes new connection callback. - */ - override void invoke() @nogc - { - foreach (io; incoming) + foreach (transport; incoming) { - io.protocol.connected(cast(DuplexTransport) io.transport); + transport.protocol = protocolFactory(); + transport.protocol.connected(transport); } } } @@ -124,10 +117,9 @@ class ConnectionWatcher : Watcher */ class IOWatcher : ConnectionWatcher { - package StreamTransport transport; package SocketException exception; - private Protocol protocol_; + protected Protocol protocol_; /** * Returns: Underlying output buffer. @@ -136,27 +128,18 @@ class IOWatcher : ConnectionWatcher /** * Params: - * transport = Transport. - * socket = Socket. - * protocol = New instance of the application protocol. + * socket = Socket. * - * Precondition: $(D_INLINECODE transport !is null - * && socket !is null - * && protocol !is null) + * Precondition: $(D_INLINECODE socket !is null) */ - this(StreamTransport transport, ConnectedSocket socket, Protocol protocol) - @nogc + this(ConnectedSocket socket) @nogc in { - assert(transport !is null); assert(socket !is null); - assert(protocol !is null); } body { super(socket); - this.transport = transport; - protocol_ = protocol; output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); active = true; } @@ -170,37 +153,9 @@ class IOWatcher : ConnectionWatcher } /** - * Reinitializes the watcher. - * - * Params: - * transport = Transport. - * socket = Socket. - * protocol = New instance of the application protocol. - * - * Precondition: $(D_INLINECODE transport !is null - * && socket !is null - * && protocol !is null) - */ - void opCall(StreamTransport transport, - ConnectedSocket socket, - Protocol protocol) pure nothrow @nogc - in - { - assert(transport !is null); - assert(socket !is null); - assert(protocol !is null); - } - body - { - this.transport = transport; - protocol_ = protocol; - active = true; - } - - /** * Returns: Application protocol. */ - override @property Protocol protocol() pure nothrow @safe @nogc + @property Protocol protocol() pure nothrow @safe @nogc { return protocol_; } @@ -233,6 +188,8 @@ class IOWatcher : ConnectionWatcher else { protocol.disconnected(exception); + MmapPool.instance.dispose(protocol); + defaultAllocator.dispose(exception); active = false; } } |
