diff options
Diffstat (limited to 'source/tanya/event/internal/epoll.d')
| -rw-r--r-- | source/tanya/event/internal/epoll.d | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/source/tanya/event/internal/epoll.d b/source/tanya/event/internal/epoll.d new file mode 100644 index 0000000..f4e99d9 --- /dev/null +++ b/source/tanya/event/internal/epoll.d @@ -0,0 +1,226 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * Copyright: Eugene Wissner 2016. + * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, + * Mozilla Public License, v. 2.0). + * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) + */ +module tanya.event.internal.epoll; + +import tanya.event.config; + +static if (UseEpoll): + +public import core.sys.linux.epoll; +import tanya.event.internal.selector; +import tanya.event.protocol; +import tanya.event.transport; +import tanya.event.watcher; +import tanya.event.loop; +import tanya.container.list; +import tanya.memory; +import core.stdc.errno; +import core.sys.posix.fcntl; +import core.sys.posix.netinet.in_; +import core.time; +import std.algorithm.comparison; + +@nogc: + +extern (C) nothrow +{ // TODO: Make a pull request for Phobos to mark this extern functions as @nogc. + int epoll_create1(int __flags); + int epoll_ctl(int __epfd, int __op, int __fd, epoll_event *__event); + int epoll_wait(int __epfd, epoll_event *__events, int __maxevents, int __timeout); + int accept4(int, sockaddr*, socklen_t*, int flags); +} + +private enum maxEvents = 128; + +class EpollLoop : Loop +{ +@nogc: + /** + * Initializes the loop. + */ + this() + { + super(); + + if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0) + { + return; + } + epollEvents = makeArray!epoll_event(defaultAllocator, maxEvents).ptr; + } + + /** + * Frees loop internals. + */ + ~this() + { + finalize(defaultAllocator, epollEvents); + } + + /** + * Should be called if the backend configuration changes. + * + * Params: + * socket = Socket. + * oldEvents = The events were already set. + * events = The events should be set. + * + * Returns: $(D_KEYWORD true) if the operation was successful. + */ + protected override bool modify(int socket, EventMask oldEvents, EventMask events) + { + int op = EPOLL_CTL_DEL; + epoll_event ev; + + if (events == oldEvents) + { + return true; + } + if (events && oldEvents) + { + op = EPOLL_CTL_MOD; + } + else if (events && !oldEvents) + { + op = EPOLL_CTL_ADD; + } + + ev.data.fd = socket; + ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0) + | (events & Event.write ? EPOLLOUT : 0) + | EPOLLET; + + return epoll_ctl(fd, op, socket, &ev) == 0; + } + + /** + * Accept incoming connections. + * + * Params: + * protocolFactory = Protocol factory. + * socket = Socket. + */ + protected override void acceptConnection(Protocol delegate() @nogc protocolFactory, + int socket) + { + sockaddr_in client_addr; + socklen_t client_len = client_addr.sizeof; + int client = accept4(socket, + cast(sockaddr *)&client_addr, + &client_len, + O_NONBLOCK); + while (client >= 0) + { + auto transport = make!SocketTransport(defaultAllocator, this, client); + IOWatcher connection; + + if (connections.length > client) + { + connection = cast(IOWatcher) connections[client]; + // If it is a ConnectionWatcher + if (connection is null && connections[client] !is null) + { + finalize(defaultAllocator, connections[client]); + connections[client] = null; + } + } + if (connection !is null) + { + connection(protocolFactory, transport); + } + else + { + connections[client] = make!IOWatcher(defaultAllocator, + protocolFactory, + transport); + } + + modify(client, EventMask(Event.none), EventMask(Event.read, Event.write)); + + swapPendings.insertBack(connections[client]); + + client = accept4(socket, + cast(sockaddr *)&client_addr, + &client_len, + O_NONBLOCK); + } + } + + /** + * Does the actual polling. + */ + protected override void poll() + { + // Don't block + immutable timeout = cast(immutable int) blockTime.total!"msecs"; + auto eventCount = epoll_wait(fd, epollEvents, maxEvents, timeout); + + if (eventCount < 0) + { + if (errno != EINTR) + { + throw make!BadLoopException(defaultAllocator); + } + + return; + } + + for (auto i = 0; i < eventCount; ++i) + { + epoll_event *ev = epollEvents + i; + auto connection = cast(IOWatcher) connections[ev.data.fd]; + + if (connection is null) + { + swapPendings.insertBack(connections[ev.data.fd]); +// acceptConnection(connections[ev.data.fd].protocol, +// connections[ev.data.fd].socket); + } + else + { + auto transport = cast(SocketTransport) connection.transport; + assert(transport !is null); + + if (ev.events & (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP)) + { + try + { + while (!transport.receive()) + { + } + swapPendings.insertBack(connection); + } + catch (TransportException e) + { + swapPendings.insertBack(connection); + finalize(defaultAllocator, e); + } + } + else if (ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) + { + transport.writeReady = true; + } + } + } + } + + /** + * Returns: The blocking time. + */ + override protected @property inout(Duration) blockTime() + inout @safe pure nothrow + { + return min(super.blockTime, 1.dur!"seconds"); + } + + private int fd; + private epoll_event* epollEvents; +} |
