diff options
Diffstat (limited to 'source/tanya/async/watcher.d')
| -rw-r--r-- | source/tanya/async/watcher.d | 128 |
1 files changed, 29 insertions, 99 deletions
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d index 37742e2..9756d99 100644 --- a/source/tanya/async/watcher.d +++ b/source/tanya/async/watcher.d @@ -36,44 +36,28 @@ abstract class Watcher void invoke() @nogc; } -class ConnectionWatcher : Watcher +/** + * Socket watcher. + */ +abstract class SocketWatcher : Watcher { /// Watched socket. protected Socket socket_; - /// Protocol factory. - protected Protocol delegate() @nogc protocolFactory; - - package Queue!DuplexTransport incoming; - /** * Params: * socket = Socket. + * + * Precondition: $(D_INLINECODE socket !is null) */ - this(Socket socket) @nogc - { - incoming = Queue!DuplexTransport(MmapPool.instance); - socket_ = socket; - } - - /** - * Destroys the watcher. - */ - ~this() @nogc + this(Socket socket) pure nothrow @safe @nogc + in { - foreach (w; incoming) - { - MmapPool.instance.dispose(w); - } + assert(socket !is null); } - - /** - * Params: - * P = Protocol should be used. - */ - void setProtocol(P : Protocol)() @nogc + body { - this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P; + socket_ = socket; } /** @@ -83,105 +67,51 @@ class ConnectionWatcher : Watcher { return socket_; } - - /** - * Invokes new connection callback. - */ - override void invoke() @nogc - in - { - assert(protocolFactory !is null, "Protocol isn't set."); - } - body - { - foreach (transport; incoming) - { - transport.protocol = protocolFactory(); - transport.protocol.connected(transport); - } - } } /** - * Contains a pending watcher with the invoked events or a transport can be - * read from. + * Connection watcher. */ -class IOWatcher : ConnectionWatcher +class ConnectionWatcher : SocketWatcher { - package SocketException exception; - - protected Protocol protocol_; + /// Incoming connection queue. + Queue!DuplexTransport incoming; - /** - * Returns: Underlying output buffer. - */ - package ReadBuffer!ubyte output; + private Protocol delegate() @nogc protocolFactory; /** * Params: * socket = Socket. - * - * Precondition: $(D_INLINECODE socket !is null) */ - this(ConnectedSocket socket) @nogc - in - { - assert(socket !is null); - } - body + this(Socket socket) @nogc { super(socket); - output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); - active = true; - } - - /** - * Destroys the watcher. - */ - ~this() @nogc - { - MmapPool.instance.dispose(protocol_); + incoming = Queue!DuplexTransport(MmapPool.instance); } /** - * Returns: Application protocol. + * Params: + * P = Protocol should be used. */ - @property Protocol protocol() pure nothrow @safe @nogc + void setProtocol(P : Protocol)() @nogc { - return protocol_; + this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P; } /** - * Returns: Socket. - * - * Precondition: $(D_INLINECODE socket !is null) + * Invokes new connection callback. */ - override @property ConnectedSocket socket() pure nothrow @safe @nogc - out (socket) + override void invoke() @nogc + in { - assert(socket !is null); + assert(protocolFactory !is null, "Protocol isn't set."); } body { - return cast(ConnectedSocket) socket_; - } - - /** - * Invokes the watcher callback. - */ - override void invoke() @nogc - { - if (output.length) - { - protocol.received(output[0 .. $]); - output.clear(); - } - else + foreach (transport; incoming) { - protocol.disconnected(exception); - MmapPool.instance.dispose(protocol); - defaultAllocator.dispose(exception); - active = false; + transport.protocol = protocolFactory(); + transport.protocol.connected(transport); } } } |
