diff options
Diffstat (limited to 'source/tanya/event/internal')
| -rw-r--r-- | source/tanya/event/internal/epoll.d | 226 | ||||
| -rw-r--r-- | source/tanya/event/internal/selector.d | 217 |
2 files changed, 443 insertions, 0 deletions
diff --git a/source/tanya/event/internal/epoll.d b/source/tanya/event/internal/epoll.d new file mode 100644 index 0000000..f4e99d9 --- /dev/null +++ b/source/tanya/event/internal/epoll.d @@ -0,0 +1,226 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * Copyright: Eugene Wissner 2016. + * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, + * Mozilla Public License, v. 2.0). + * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) + */ +module tanya.event.internal.epoll; + +import tanya.event.config; + +static if (UseEpoll): + +public import core.sys.linux.epoll; +import tanya.event.internal.selector; +import tanya.event.protocol; +import tanya.event.transport; +import tanya.event.watcher; +import tanya.event.loop; +import tanya.container.list; +import tanya.memory; +import core.stdc.errno; +import core.sys.posix.fcntl; +import core.sys.posix.netinet.in_; +import core.time; +import std.algorithm.comparison; + +@nogc: + +extern (C) nothrow +{ // TODO: Make a pull request for Phobos to mark this extern functions as @nogc. + int epoll_create1(int __flags); + int epoll_ctl(int __epfd, int __op, int __fd, epoll_event *__event); + int epoll_wait(int __epfd, epoll_event *__events, int __maxevents, int __timeout); + int accept4(int, sockaddr*, socklen_t*, int flags); +} + +private enum maxEvents = 128; + +class EpollLoop : Loop +{ +@nogc: + /** + * Initializes the loop. + */ + this() + { + super(); + + if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0) + { + return; + } + epollEvents = makeArray!epoll_event(defaultAllocator, maxEvents).ptr; + } + + /** + * Frees loop internals. + */ + ~this() + { + finalize(defaultAllocator, epollEvents); + } + + /** + * Should be called if the backend configuration changes. + * + * Params: + * socket = Socket. + * oldEvents = The events were already set. + * events = The events should be set. + * + * Returns: $(D_KEYWORD true) if the operation was successful. + */ + protected override bool modify(int socket, EventMask oldEvents, EventMask events) + { + int op = EPOLL_CTL_DEL; + epoll_event ev; + + if (events == oldEvents) + { + return true; + } + if (events && oldEvents) + { + op = EPOLL_CTL_MOD; + } + else if (events && !oldEvents) + { + op = EPOLL_CTL_ADD; + } + + ev.data.fd = socket; + ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0) + | (events & Event.write ? EPOLLOUT : 0) + | EPOLLET; + + return epoll_ctl(fd, op, socket, &ev) == 0; + } + + /** + * Accept incoming connections. + * + * Params: + * protocolFactory = Protocol factory. + * socket = Socket. + */ + protected override void acceptConnection(Protocol delegate() @nogc protocolFactory, + int socket) + { + sockaddr_in client_addr; + socklen_t client_len = client_addr.sizeof; + int client = accept4(socket, + cast(sockaddr *)&client_addr, + &client_len, + O_NONBLOCK); + while (client >= 0) + { + auto transport = make!SocketTransport(defaultAllocator, this, client); + IOWatcher connection; + + if (connections.length > client) + { + connection = cast(IOWatcher) connections[client]; + // If it is a ConnectionWatcher + if (connection is null && connections[client] !is null) + { + finalize(defaultAllocator, connections[client]); + connections[client] = null; + } + } + if (connection !is null) + { + connection(protocolFactory, transport); + } + else + { + connections[client] = make!IOWatcher(defaultAllocator, + protocolFactory, + transport); + } + + modify(client, EventMask(Event.none), EventMask(Event.read, Event.write)); + + swapPendings.insertBack(connections[client]); + + client = accept4(socket, + cast(sockaddr *)&client_addr, + &client_len, + O_NONBLOCK); + } + } + + /** + * Does the actual polling. + */ + protected override void poll() + { + // Don't block + immutable timeout = cast(immutable int) blockTime.total!"msecs"; + auto eventCount = epoll_wait(fd, epollEvents, maxEvents, timeout); + + if (eventCount < 0) + { + if (errno != EINTR) + { + throw make!BadLoopException(defaultAllocator); + } + + return; + } + + for (auto i = 0; i < eventCount; ++i) + { + epoll_event *ev = epollEvents + i; + auto connection = cast(IOWatcher) connections[ev.data.fd]; + + if (connection is null) + { + swapPendings.insertBack(connections[ev.data.fd]); +// acceptConnection(connections[ev.data.fd].protocol, +// connections[ev.data.fd].socket); + } + else + { + auto transport = cast(SocketTransport) connection.transport; + assert(transport !is null); + + if (ev.events & (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP)) + { + try + { + while (!transport.receive()) + { + } + swapPendings.insertBack(connection); + } + catch (TransportException e) + { + swapPendings.insertBack(connection); + finalize(defaultAllocator, e); + } + } + else if (ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) + { + transport.writeReady = true; + } + } + } + } + + /** + * Returns: The blocking time. + */ + override protected @property inout(Duration) blockTime() + inout @safe pure nothrow + { + return min(super.blockTime, 1.dur!"seconds"); + } + + private int fd; + private epoll_event* epollEvents; +} diff --git a/source/tanya/event/internal/selector.d b/source/tanya/event/internal/selector.d new file mode 100644 index 0000000..9682a5d --- /dev/null +++ b/source/tanya/event/internal/selector.d @@ -0,0 +1,217 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * Copyright: Eugene Wissner 2016. + * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, + * Mozilla Public License, v. 2.0). + * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) + */ +module tanya.event.internal.selector; + +import tanya.memory; +import tanya.container.buffer; +import tanya.event.loop; +import tanya.event.protocol; +import tanya.event.transport; +import core.stdc.errno; +import core.sys.posix.netinet.in_; +import core.sys.posix.unistd; + +/** + * Transport for stream sockets. + */ +class SocketTransport : DuplexTransport +{ +@nogc: + private int socket_ = -1; + + private Protocol protocol_; + + /// Input buffer. + private WriteBuffer input_; + + /// Output buffer. + private ReadBuffer output_; + + private Loop loop; + + private bool disconnected_; + + package bool writeReady; + + /** + * Params: + * loop = Event loop. + * socket = Socket. + * protocol = Protocol. + */ + this(Loop loop, int socket, Protocol protocol = null) + { + socket_ = socket; + protocol_ = protocol; + this.loop = loop; + input_ = make!WriteBuffer(defaultAllocator); + output_ = make!ReadBuffer(defaultAllocator); + } + + /** + * Close the transport and deallocate the data buffers. + */ + ~this() + { + close(socket); + finalize(defaultAllocator, input_); + finalize(defaultAllocator, output_); + finalize(defaultAllocator, protocol_); + } + + /** + * Returns: Transport socket. + */ + int socket() const @safe pure nothrow + { + return socket_; + } + + /** + * Returns: Protocol. + */ + @property Protocol protocol() @safe pure nothrow + { + return protocol_; + } + + /** + * Returns: $(D_KEYWORD true) if the remote peer closed the connection, + * $(D_KEYWORD false) otherwise. + */ + @property immutable(bool) disconnected() const @safe pure nothrow + { + return disconnected_; + } + + /** + * Params: + * protocol = Application protocol. + */ + @property void protocol(Protocol protocol) @safe pure nothrow + { + protocol_ = protocol; + } + + /** + * Returns: Application protocol. + */ + @property inout(Protocol) protocol() inout @safe pure nothrow + { + return protocol_; + } + + /** + * Write some data to the transport. + * + * Params: + * data = Data to send. + */ + void write(ubyte[] data) + { + // If the buffer wasn't empty the transport should be already there. + if (!input.length && data.length) + { + loop.feed(this); + } + input ~= data; + } + + /** + * Returns: Input buffer. + */ + @property WriteBuffer input() @safe pure nothrow + { + return input_; + } + + /** + * Returns: Output buffer. + */ + @property ReadBuffer output() @safe pure nothrow + { + return output_; + } + + /** + * Read data from the socket. Returns $(D_KEYWORD true) if the reading + * is completed. In the case that the peer closed the connection, returns + * $(D_KEYWORD true) aswell. + * + * Returns: Whether the reading is completed. + * + * Throws: $(D_PSYMBOL TransportException) if a read error is occured. + */ + bool receive() + { + auto readCount = recv(socket, output.buffer, output.free, 0); + + if (readCount > 0) + { + output_ ~= output.buffer[0..readCount]; + return false; + } + else if (readCount == 0) + { + disconnected_ = true; + return true; + } + else if (errno == EAGAIN || errno == EWOULDBLOCK) + { + return true; + } + else + { + disconnected_ = true; + throw make!TransportException(defaultAllocator, + "Read from the socket failed."); + } + } + + /** + * Returns: Whether the writing is completed. + * + * Throws: $(D_PSYMBOL TransportException) if a read error is occured. + */ + bool send() + { + auto sentCount = core.sys.posix.netinet.in_.send(socket, + input.buffer, + input.length, + 0); + + input.written = sentCount; + if (input.length == 0) + { + return true; + } + else if (sentCount >= 0) + { + loop.feed(this); + + return false; + } + else if (errno == EAGAIN || errno == EWOULDBLOCK) + { + writeReady = false; + loop.feed(this); + + return false; + } + else + { + disconnected_ = true; + loop.feed(this); + throw make!TransportException(defaultAllocator, + "Write to the socket failed."); + } + } +} |
