diff options
Diffstat (limited to 'source/tanya/async/watcher.d')
| -rw-r--r-- | source/tanya/async/watcher.d | 401 |
1 files changed, 203 insertions, 198 deletions
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d index 260c6a3..9a9e441 100644 --- a/source/tanya/async/watcher.d +++ b/source/tanya/async/watcher.d @@ -10,23 +10,24 @@ */ module tanya.async.watcher; +import std.functional; +import std.exception; import tanya.async.loop; import tanya.async.protocol; import tanya.async.transport; import tanya.container.buffer; +import tanya.container.queue; import tanya.memory; import tanya.memory.mmappool; import tanya.network.socket; -import std.functional; -import std.exception; version (Windows) { - import core.sys.windows.basetyps; - import core.sys.windows.mswsock; - import core.sys.windows.winbase; - import core.sys.windows.windef; - import core.sys.windows.winsock2; + import core.sys.windows.basetyps; + import core.sys.windows.mswsock; + import core.sys.windows.winbase; + import core.sys.windows.windef; + import core.sys.windows.winsock2; } /** @@ -35,85 +36,89 @@ version (Windows) */ abstract class Watcher { - /// Whether the watcher is active. - bool active; + /// Whether the watcher is active. + bool active; - /** - * Invoke some action on event. - */ - void invoke(); + /** + * Invoke some action on event. + */ + void invoke(); } class ConnectionWatcher : Watcher { - /// Watched socket. - private Socket socket_; - - /// Protocol factory. - protected Protocol delegate() protocolFactory; - - package PendingQueue!IOWatcher incoming; - - /** - * Params: - * socket = Socket. - */ - this(Socket socket) - { - socket_ = socket; - incoming = MmapPool.instance.make!(PendingQueue!IOWatcher); - } - - /// Ditto. - protected this() - { - } - - ~this() - { - MmapPool.instance.dispose(incoming); - } - - /* - * Params: - * P = Protocol should be used. - */ - void setProtocol(P : Protocol)() - { - this.protocolFactory = () => cast(Protocol) MmapPool.instance.make!P; - } - - /** - * Returns: Socket. - */ - @property inout(Socket) socket() inout pure nothrow @nogc - { - return socket_; - } - - /** - * Returns: New protocol instance. - */ - @property Protocol protocol() - in - { - assert(protocolFactory !is null, "Protocol isn't set."); - } - body - { - return protocolFactory(); - } - - /** - * Invokes new connection callback. - */ - override void invoke() - { - foreach (io; incoming) - { - io.protocol.connected(cast(DuplexTransport) io.transport); - } - } + /// Watched socket. + private Socket socket_; + + /// Protocol factory. + protected Protocol delegate() protocolFactory; + + package Queue!IOWatcher incoming; + + /** + * Params: + * socket = Socket. + */ + this(Socket socket) + { + socket_ = socket; + incoming = MmapPool.instance.make!(Queue!IOWatcher); + } + + /// Ditto. + protected this() + { + } + + ~this() + { + foreach (w; incoming) + { + MmapPool.instance.dispose(w); + } + MmapPool.instance.dispose(incoming); + } + + /* + * Params: + * P = Protocol should be used. + */ + void setProtocol(P : Protocol)() + { + this.protocolFactory = () => cast(Protocol) MmapPool.instance.make!P; + } + + /** + * Returns: Socket. + */ + @property inout(Socket) socket() inout pure nothrow @nogc + { + return socket_; + } + + /** + * Returns: New protocol instance. + */ + @property Protocol protocol() + in + { + assert(protocolFactory !is null, "Protocol isn't set."); + } + body + { + return protocolFactory(); + } + + /** + * Invokes new connection callback. + */ + override void invoke() + { + foreach (io; incoming) + { + io.protocol.connected(cast(DuplexTransport) io.transport); + } + } } /** @@ -122,121 +127,121 @@ class ConnectionWatcher : Watcher */ class IOWatcher : ConnectionWatcher { - /// If an exception was thrown the transport should be already invalid. - private union - { - StreamTransport transport_; - SocketException exception_; - } - - private Protocol protocol_; - - /** - * Returns: Underlying output buffer. - */ - package ReadBuffer output; - - /** - * Params: - * transport = Transport. - * protocol = New instance of the application protocol. - */ - this(StreamTransport transport, Protocol protocol) - in - { - assert(transport !is null); - assert(protocol !is null); - } - body - { - super(); - transport_ = transport; - protocol_ = protocol; - output = MmapPool.instance.make!ReadBuffer(); - active = true; - } - - /** - * Destroys the watcher. - */ - protected ~this() - { - MmapPool.instance.dispose(output); - MmapPool.instance.dispose(protocol_); - } - - /** - * Assigns a transport. - * - * Params: - * transport = Transport. - * protocol = Application protocol. - * - * Returns: $(D_KEYWORD this). - */ - IOWatcher opCall(StreamTransport transport, Protocol protocol) pure nothrow @nogc - in - { - assert(transport !is null); - assert(protocol !is null); - } - body - { - transport_ = transport; - protocol_ = protocol; - active = true; - return this; - } - - /** - * Returns: Transport used by this watcher. - */ - @property inout(StreamTransport) transport() inout pure nothrow @nogc - { - return transport_; - } - - /** - * Sets an exception occurred during a read/write operation. - * - * Params: - * exception = Thrown exception. - */ - @property void exception(SocketException exception) pure nothrow @nogc - { - exception_ = exception; - } - - /** - * Returns: Application protocol. - */ - override @property Protocol protocol() pure nothrow @safe @nogc - { - return protocol_; - } - - /** - * Returns: Socket. - */ - override @property inout(Socket) socket() inout pure nothrow @nogc - { - return transport.socket; - } - - /** - * Invokes the watcher callback. - */ - override void invoke() - { - if (output.length) - { - protocol.received(output[0..$]); - output.clear(); - } - else - { - protocol.disconnected(exception_); - active = false; - } - } + /// If an exception was thrown the transport should be already invalid. + private union + { + StreamTransport transport_; + SocketException exception_; + } + + private Protocol protocol_; + + /** + * Returns: Underlying output buffer. + */ + package ReadBuffer output; + + /** + * Params: + * transport = Transport. + * protocol = New instance of the application protocol. + */ + this(StreamTransport transport, Protocol protocol) + in + { + assert(transport !is null); + assert(protocol !is null); + } + body + { + super(); + transport_ = transport; + protocol_ = protocol; + output = MmapPool.instance.make!ReadBuffer(); + active = true; + } + + /** + * Destroys the watcher. + */ + protected ~this() + { + MmapPool.instance.dispose(output); + MmapPool.instance.dispose(protocol_); + } + + /** + * Assigns a transport. + * + * Params: + * transport = Transport. + * protocol = Application protocol. + * + * Returns: $(D_KEYWORD this). + */ + IOWatcher opCall(StreamTransport transport, Protocol protocol) pure nothrow @nogc + in + { + assert(transport !is null); + assert(protocol !is null); + } + body + { + transport_ = transport; + protocol_ = protocol; + active = true; + return this; + } + + /** + * Returns: Transport used by this watcher. + */ + @property inout(StreamTransport) transport() inout pure nothrow @nogc + { + return transport_; + } + + /** + * Sets an exception occurred during a read/write operation. + * + * Params: + * exception = Thrown exception. + */ + @property void exception(SocketException exception) pure nothrow @nogc + { + exception_ = exception; + } + + /** + * Returns: Application protocol. + */ + override @property Protocol protocol() pure nothrow @safe @nogc + { + return protocol_; + } + + /** + * Returns: Socket. + */ + override @property inout(Socket) socket() inout pure nothrow @nogc + { + return transport.socket; + } + + /** + * Invokes the watcher callback. + */ + override void invoke() + { + if (output.length) + { + protocol.received(output[0..$]); + output.clear(); + } + else + { + protocol.disconnected(exception_); + active = false; + } + } } |
