aboutsummaryrefslogtreecommitdiff
path: root/source/tanya/event/internal/selector.d
diff options
context:
space:
mode:
Diffstat (limited to 'source/tanya/event/internal/selector.d')
-rw-r--r--source/tanya/event/internal/selector.d217
1 files changed, 217 insertions, 0 deletions
diff --git a/source/tanya/event/internal/selector.d b/source/tanya/event/internal/selector.d
new file mode 100644
index 0000000..9682a5d
--- /dev/null
+++ b/source/tanya/event/internal/selector.d
@@ -0,0 +1,217 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.event.internal.selector;
+
+import tanya.memory;
+import tanya.container.buffer;
+import tanya.event.loop;
+import tanya.event.protocol;
+import tanya.event.transport;
+import core.stdc.errno;
+import core.sys.posix.netinet.in_;
+import core.sys.posix.unistd;
+
+/**
+ * Transport for stream sockets.
+ */
+class SocketTransport : DuplexTransport
+{
+@nogc:
+ private int socket_ = -1;
+
+ private Protocol protocol_;
+
+ /// Input buffer.
+ private WriteBuffer input_;
+
+ /// Output buffer.
+ private ReadBuffer output_;
+
+ private Loop loop;
+
+ private bool disconnected_;
+
+ package bool writeReady;
+
+ /**
+ * Params:
+ * loop = Event loop.
+ * socket = Socket.
+ * protocol = Protocol.
+ */
+ this(Loop loop, int socket, Protocol protocol = null)
+ {
+ socket_ = socket;
+ protocol_ = protocol;
+ this.loop = loop;
+ input_ = make!WriteBuffer(defaultAllocator);
+ output_ = make!ReadBuffer(defaultAllocator);
+ }
+
+ /**
+ * Close the transport and deallocate the data buffers.
+ */
+ ~this()
+ {
+ close(socket);
+ finalize(defaultAllocator, input_);
+ finalize(defaultAllocator, output_);
+ finalize(defaultAllocator, protocol_);
+ }
+
+ /**
+ * Returns: Transport socket.
+ */
+ int socket() const @safe pure nothrow
+ {
+ return socket_;
+ }
+
+ /**
+ * Returns: Protocol.
+ */
+ @property Protocol protocol() @safe pure nothrow
+ {
+ return protocol_;
+ }
+
+ /**
+ * Returns: $(D_KEYWORD true) if the remote peer closed the connection,
+ * $(D_KEYWORD false) otherwise.
+ */
+ @property immutable(bool) disconnected() const @safe pure nothrow
+ {
+ return disconnected_;
+ }
+
+ /**
+ * Params:
+ * protocol = Application protocol.
+ */
+ @property void protocol(Protocol protocol) @safe pure nothrow
+ {
+ protocol_ = protocol;
+ }
+
+ /**
+ * Returns: Application protocol.
+ */
+ @property inout(Protocol) protocol() inout @safe pure nothrow
+ {
+ return protocol_;
+ }
+
+ /**
+ * Write some data to the transport.
+ *
+ * Params:
+ * data = Data to send.
+ */
+ void write(ubyte[] data)
+ {
+ // If the buffer wasn't empty the transport should be already there.
+ if (!input.length && data.length)
+ {
+ loop.feed(this);
+ }
+ input ~= data;
+ }
+
+ /**
+ * Returns: Input buffer.
+ */
+ @property WriteBuffer input() @safe pure nothrow
+ {
+ return input_;
+ }
+
+ /**
+ * Returns: Output buffer.
+ */
+ @property ReadBuffer output() @safe pure nothrow
+ {
+ return output_;
+ }
+
+ /**
+ * Read data from the socket. Returns $(D_KEYWORD true) if the reading
+ * is completed. In the case that the peer closed the connection, returns
+ * $(D_KEYWORD true) aswell.
+ *
+ * Returns: Whether the reading is completed.
+ *
+ * Throws: $(D_PSYMBOL TransportException) if a read error is occured.
+ */
+ bool receive()
+ {
+ auto readCount = recv(socket, output.buffer, output.free, 0);
+
+ if (readCount > 0)
+ {
+ output_ ~= output.buffer[0..readCount];
+ return false;
+ }
+ else if (readCount == 0)
+ {
+ disconnected_ = true;
+ return true;
+ }
+ else if (errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ return true;
+ }
+ else
+ {
+ disconnected_ = true;
+ throw make!TransportException(defaultAllocator,
+ "Read from the socket failed.");
+ }
+ }
+
+ /**
+ * Returns: Whether the writing is completed.
+ *
+ * Throws: $(D_PSYMBOL TransportException) if a read error is occured.
+ */
+ bool send()
+ {
+ auto sentCount = core.sys.posix.netinet.in_.send(socket,
+ input.buffer,
+ input.length,
+ 0);
+
+ input.written = sentCount;
+ if (input.length == 0)
+ {
+ return true;
+ }
+ else if (sentCount >= 0)
+ {
+ loop.feed(this);
+
+ return false;
+ }
+ else if (errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ writeReady = false;
+ loop.feed(this);
+
+ return false;
+ }
+ else
+ {
+ disconnected_ = true;
+ loop.feed(this);
+ throw make!TransportException(defaultAllocator,
+ "Write to the socket failed.");
+ }
+ }
+}