aboutsummaryrefslogtreecommitdiff
path: root/source/tanya/event/internal
diff options
context:
space:
mode:
Diffstat (limited to 'source/tanya/event/internal')
-rw-r--r--source/tanya/event/internal/epoll.d226
-rw-r--r--source/tanya/event/internal/selector.d217
2 files changed, 443 insertions, 0 deletions
diff --git a/source/tanya/event/internal/epoll.d b/source/tanya/event/internal/epoll.d
new file mode 100644
index 0000000..f4e99d9
--- /dev/null
+++ b/source/tanya/event/internal/epoll.d
@@ -0,0 +1,226 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.event.internal.epoll;
+
+import tanya.event.config;
+
+static if (UseEpoll):
+
+public import core.sys.linux.epoll;
+import tanya.event.internal.selector;
+import tanya.event.protocol;
+import tanya.event.transport;
+import tanya.event.watcher;
+import tanya.event.loop;
+import tanya.container.list;
+import tanya.memory;
+import core.stdc.errno;
+import core.sys.posix.fcntl;
+import core.sys.posix.netinet.in_;
+import core.time;
+import std.algorithm.comparison;
+
+@nogc:
+
+extern (C) nothrow
+{ // TODO: Make a pull request for Phobos to mark this extern functions as @nogc.
+ int epoll_create1(int __flags);
+ int epoll_ctl(int __epfd, int __op, int __fd, epoll_event *__event);
+ int epoll_wait(int __epfd, epoll_event *__events, int __maxevents, int __timeout);
+ int accept4(int, sockaddr*, socklen_t*, int flags);
+}
+
+private enum maxEvents = 128;
+
+class EpollLoop : Loop
+{
+@nogc:
+ /**
+ * Initializes the loop.
+ */
+ this()
+ {
+ super();
+
+ if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
+ {
+ return;
+ }
+ epollEvents = makeArray!epoll_event(defaultAllocator, maxEvents).ptr;
+ }
+
+ /**
+ * Frees loop internals.
+ */
+ ~this()
+ {
+ finalize(defaultAllocator, epollEvents);
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * socket = Socket.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ protected override bool modify(int socket, EventMask oldEvents, EventMask events)
+ {
+ int op = EPOLL_CTL_DEL;
+ epoll_event ev;
+
+ if (events == oldEvents)
+ {
+ return true;
+ }
+ if (events && oldEvents)
+ {
+ op = EPOLL_CTL_MOD;
+ }
+ else if (events && !oldEvents)
+ {
+ op = EPOLL_CTL_ADD;
+ }
+
+ ev.data.fd = socket;
+ ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
+ | (events & Event.write ? EPOLLOUT : 0)
+ | EPOLLET;
+
+ return epoll_ctl(fd, op, socket, &ev) == 0;
+ }
+
+ /**
+ * Accept incoming connections.
+ *
+ * Params:
+ * protocolFactory = Protocol factory.
+ * socket = Socket.
+ */
+ protected override void acceptConnection(Protocol delegate() @nogc protocolFactory,
+ int socket)
+ {
+ sockaddr_in client_addr;
+ socklen_t client_len = client_addr.sizeof;
+ int client = accept4(socket,
+ cast(sockaddr *)&client_addr,
+ &client_len,
+ O_NONBLOCK);
+ while (client >= 0)
+ {
+ auto transport = make!SocketTransport(defaultAllocator, this, client);
+ IOWatcher connection;
+
+ if (connections.length > client)
+ {
+ connection = cast(IOWatcher) connections[client];
+ // If it is a ConnectionWatcher
+ if (connection is null && connections[client] !is null)
+ {
+ finalize(defaultAllocator, connections[client]);
+ connections[client] = null;
+ }
+ }
+ if (connection !is null)
+ {
+ connection(protocolFactory, transport);
+ }
+ else
+ {
+ connections[client] = make!IOWatcher(defaultAllocator,
+ protocolFactory,
+ transport);
+ }
+
+ modify(client, EventMask(Event.none), EventMask(Event.read, Event.write));
+
+ swapPendings.insertBack(connections[client]);
+
+ client = accept4(socket,
+ cast(sockaddr *)&client_addr,
+ &client_len,
+ O_NONBLOCK);
+ }
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ protected override void poll()
+ {
+ // Don't block
+ immutable timeout = cast(immutable int) blockTime.total!"msecs";
+ auto eventCount = epoll_wait(fd, epollEvents, maxEvents, timeout);
+
+ if (eventCount < 0)
+ {
+ if (errno != EINTR)
+ {
+ throw make!BadLoopException(defaultAllocator);
+ }
+
+ return;
+ }
+
+ for (auto i = 0; i < eventCount; ++i)
+ {
+ epoll_event *ev = epollEvents + i;
+ auto connection = cast(IOWatcher) connections[ev.data.fd];
+
+ if (connection is null)
+ {
+ swapPendings.insertBack(connections[ev.data.fd]);
+// acceptConnection(connections[ev.data.fd].protocol,
+// connections[ev.data.fd].socket);
+ }
+ else
+ {
+ auto transport = cast(SocketTransport) connection.transport;
+ assert(transport !is null);
+
+ if (ev.events & (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP))
+ {
+ try
+ {
+ while (!transport.receive())
+ {
+ }
+ swapPendings.insertBack(connection);
+ }
+ catch (TransportException e)
+ {
+ swapPendings.insertBack(connection);
+ finalize(defaultAllocator, e);
+ }
+ }
+ else if (ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP))
+ {
+ transport.writeReady = true;
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns: The blocking time.
+ */
+ override protected @property inout(Duration) blockTime()
+ inout @safe pure nothrow
+ {
+ return min(super.blockTime, 1.dur!"seconds");
+ }
+
+ private int fd;
+ private epoll_event* epollEvents;
+}
diff --git a/source/tanya/event/internal/selector.d b/source/tanya/event/internal/selector.d
new file mode 100644
index 0000000..9682a5d
--- /dev/null
+++ b/source/tanya/event/internal/selector.d
@@ -0,0 +1,217 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.event.internal.selector;
+
+import tanya.memory;
+import tanya.container.buffer;
+import tanya.event.loop;
+import tanya.event.protocol;
+import tanya.event.transport;
+import core.stdc.errno;
+import core.sys.posix.netinet.in_;
+import core.sys.posix.unistd;
+
+/**
+ * Transport for stream sockets.
+ */
+class SocketTransport : DuplexTransport
+{
+@nogc:
+ private int socket_ = -1;
+
+ private Protocol protocol_;
+
+ /// Input buffer.
+ private WriteBuffer input_;
+
+ /// Output buffer.
+ private ReadBuffer output_;
+
+ private Loop loop;
+
+ private bool disconnected_;
+
+ package bool writeReady;
+
+ /**
+ * Params:
+ * loop = Event loop.
+ * socket = Socket.
+ * protocol = Protocol.
+ */
+ this(Loop loop, int socket, Protocol protocol = null)
+ {
+ socket_ = socket;
+ protocol_ = protocol;
+ this.loop = loop;
+ input_ = make!WriteBuffer(defaultAllocator);
+ output_ = make!ReadBuffer(defaultAllocator);
+ }
+
+ /**
+ * Close the transport and deallocate the data buffers.
+ */
+ ~this()
+ {
+ close(socket);
+ finalize(defaultAllocator, input_);
+ finalize(defaultAllocator, output_);
+ finalize(defaultAllocator, protocol_);
+ }
+
+ /**
+ * Returns: Transport socket.
+ */
+ int socket() const @safe pure nothrow
+ {
+ return socket_;
+ }
+
+ /**
+ * Returns: Protocol.
+ */
+ @property Protocol protocol() @safe pure nothrow
+ {
+ return protocol_;
+ }
+
+ /**
+ * Returns: $(D_KEYWORD true) if the remote peer closed the connection,
+ * $(D_KEYWORD false) otherwise.
+ */
+ @property immutable(bool) disconnected() const @safe pure nothrow
+ {
+ return disconnected_;
+ }
+
+ /**
+ * Params:
+ * protocol = Application protocol.
+ */
+ @property void protocol(Protocol protocol) @safe pure nothrow
+ {
+ protocol_ = protocol;
+ }
+
+ /**
+ * Returns: Application protocol.
+ */
+ @property inout(Protocol) protocol() inout @safe pure nothrow
+ {
+ return protocol_;
+ }
+
+ /**
+ * Write some data to the transport.
+ *
+ * Params:
+ * data = Data to send.
+ */
+ void write(ubyte[] data)
+ {
+ // If the buffer wasn't empty the transport should be already there.
+ if (!input.length && data.length)
+ {
+ loop.feed(this);
+ }
+ input ~= data;
+ }
+
+ /**
+ * Returns: Input buffer.
+ */
+ @property WriteBuffer input() @safe pure nothrow
+ {
+ return input_;
+ }
+
+ /**
+ * Returns: Output buffer.
+ */
+ @property ReadBuffer output() @safe pure nothrow
+ {
+ return output_;
+ }
+
+ /**
+ * Read data from the socket. Returns $(D_KEYWORD true) if the reading
+ * is completed. In the case that the peer closed the connection, returns
+ * $(D_KEYWORD true) aswell.
+ *
+ * Returns: Whether the reading is completed.
+ *
+ * Throws: $(D_PSYMBOL TransportException) if a read error is occured.
+ */
+ bool receive()
+ {
+ auto readCount = recv(socket, output.buffer, output.free, 0);
+
+ if (readCount > 0)
+ {
+ output_ ~= output.buffer[0..readCount];
+ return false;
+ }
+ else if (readCount == 0)
+ {
+ disconnected_ = true;
+ return true;
+ }
+ else if (errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ return true;
+ }
+ else
+ {
+ disconnected_ = true;
+ throw make!TransportException(defaultAllocator,
+ "Read from the socket failed.");
+ }
+ }
+
+ /**
+ * Returns: Whether the writing is completed.
+ *
+ * Throws: $(D_PSYMBOL TransportException) if a read error is occured.
+ */
+ bool send()
+ {
+ auto sentCount = core.sys.posix.netinet.in_.send(socket,
+ input.buffer,
+ input.length,
+ 0);
+
+ input.written = sentCount;
+ if (input.length == 0)
+ {
+ return true;
+ }
+ else if (sentCount >= 0)
+ {
+ loop.feed(this);
+
+ return false;
+ }
+ else if (errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ writeReady = false;
+ loop.feed(this);
+
+ return false;
+ }
+ else
+ {
+ disconnected_ = true;
+ loop.feed(this);
+ throw make!TransportException(defaultAllocator,
+ "Write to the socket failed.");
+ }
+ }
+}