/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ /** * Event loop implementation for Linux. * * Copyright: Eugene Wissner 2016-2020. * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, * Mozilla Public License, v. 2.0). * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/epoll.d, * tanya/async/event/epoll.d) */ module tanya.async.event.epoll; version (D_Ddoc) { } else version (linux): import core.stdc.errno; public import core.sys.linux.epoll; import core.sys.posix.unistd; import core.time; import tanya.algorithm.comparison; import tanya.async.event.selector; import tanya.async.loop; import tanya.async.protocol; import tanya.async.transport; import tanya.async.watcher; import tanya.container.array; import tanya.memory.allocator; import tanya.network.socket; extern (C) nothrow @nogc { int epoll_create1(int flags); int epoll_ctl (int epfd, int op, int fd, epoll_event *event); int epoll_wait (int epfd, epoll_event *events, int maxevents, int timeout); } final class EpollLoop : SelectorLoop { protected int fd; private Array!epoll_event events; /** * Initializes the loop. */ this() @nogc { if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0) { throw defaultAllocator.make!BadLoopException("epoll initialization failed"); } super(); events = Array!epoll_event(maxEvents); } /** * Frees loop internals. */ ~this() @nogc { close(fd); } /** * Should be called if the backend configuration changes. * * Params: * watcher = Watcher. * oldEvents = The events were already set. * events = The events should be set. * * Returns: $(D_KEYWORD true) if the operation was successful. */ protected override bool reify(SocketWatcher watcher, EventMask oldEvents, EventMask events) @nogc { int op = EPOLL_CTL_DEL; epoll_event ev; if (events == oldEvents) { return true; } if (events && oldEvents) { op = EPOLL_CTL_MOD; } else if (events && !oldEvents) { op = EPOLL_CTL_ADD; } ev.data.fd = watcher.socket.handle; ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0) | (events & Event.write ? EPOLLOUT : 0) | EPOLLET; return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0; } /** * Does the actual polling. */ protected override void poll() @nogc { // Don't block immutable timeout = cast(immutable int) blockTime.total!"msecs"; auto eventCount = epoll_wait(fd, events.get().ptr, maxEvents, timeout); if (eventCount < 0) { if (errno != EINTR) { throw defaultAllocator.make!BadLoopException(); } return; } for (auto i = 0; i < eventCount; ++i) { auto transport = cast(StreamTransport) connections[events[i].data.fd]; if (transport is null) { auto connection = cast(ConnectionWatcher) connections[events[i].data.fd]; assert(connection !is null); acceptConnections(connection); } else if (events[i].events & EPOLLERR) { kill(transport); continue; } else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP)) { SocketException exception; try { ptrdiff_t received; do { received = transport.socket.receive(transport.output[]); transport.output += received; } while (received); } catch (SocketException e) { exception = e; } if (transport.socket.disconnected) { kill(transport, exception); continue; } else if (transport.output.length) { pendings.insertBack(transport); } } if (events[i].events & EPOLLOUT) { transport.writeReady = true; if (transport.input.length) { feed(transport); } } } } /** * Returns: The blocking time. */ override protected @property inout(Duration) blockTime() inout @safe pure nothrow { return min(super.blockTime, 1.dur!"seconds"); } }