From db607f7602ae2991a6efeb943f297ebdf4603900 Mon Sep 17 00:00:00 2001 From: Eugen Wissner Date: Mon, 30 May 2022 05:50:55 +0200 Subject: [PATCH] Remove unmaintained and unfinished async code --- README.md | 3 +- source/tanya/async/event/epoll.d | 187 ---- source/tanya/async/event/iocp.d | 390 --------- source/tanya/async/event/kqueue.d | 331 ------- source/tanya/async/event/selector.d | 407 --------- source/tanya/async/iocp.d | 56 -- source/tanya/async/loop.d | 352 -------- source/tanya/async/package.d | 20 - source/tanya/async/protocol.d | 58 -- source/tanya/async/transport.d | 104 --- source/tanya/async/watcher.d | 117 --- source/tanya/net/package.d | 3 +- source/tanya/net/socket.d | 1258 --------------------------- tests/tanya/async/tests/loop.d | 97 --- 14 files changed, 2 insertions(+), 3381 deletions(-) delete mode 100644 source/tanya/async/event/epoll.d delete mode 100644 source/tanya/async/event/iocp.d delete mode 100644 source/tanya/async/event/kqueue.d delete mode 100644 source/tanya/async/event/selector.d delete mode 100644 source/tanya/async/iocp.d delete mode 100644 source/tanya/async/loop.d delete mode 100644 source/tanya/async/package.d delete mode 100644 source/tanya/async/protocol.d delete mode 100644 source/tanya/async/transport.d delete mode 100644 source/tanya/async/watcher.d delete mode 100644 source/tanya/net/socket.d delete mode 100644 tests/tanya/async/tests/loop.d diff --git a/README.md b/README.md index 95f535d..053fce9 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,6 @@ programming in D. Tanya consists of the following packages and (top-level) modules: * `algorithm`: Collection of generic algorithms. -* `async`: Event loop (epoll, kqueue and IOCP). * `bitmanip`: Bit manipulation. * `container`: Queue, Array, Singly and doubly linked lists, Buffers, UTF-8 string, Set, Hash table. @@ -164,7 +163,7 @@ parameter is used) | DMD | GCC | |:-------:|:---------:| -| 2.096.0 | 10.3 | +| 2.098.1 | 11.2 | ## Further characteristics diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d deleted file mode 100644 index c81d6ff..0000000 --- a/source/tanya/async/event/epoll.d +++ /dev/null @@ -1,187 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Event loop implementation for Linux. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/epoll.d, - * tanya/async/event/epoll.d) - */ -module tanya.async.event.epoll; - -version (D_Ddoc) -{ -} -else version (linux): - -import core.stdc.errno; -public import core.sys.linux.epoll; -import core.sys.posix.unistd; -import core.time; -import std.algorithm.comparison; -import tanya.async.event.selector; -import tanya.async.loop; -import tanya.async.protocol; -import tanya.async.transport; -import tanya.async.watcher; -import tanya.container.array; -import tanya.memory.allocator; -import tanya.net.socket; - -extern (C) nothrow @nogc -{ - int epoll_create1(int flags); - int epoll_ctl (int epfd, int op, int fd, epoll_event *event); - int epoll_wait (int epfd, epoll_event *events, int maxevents, int timeout); -} - -final class EpollLoop : SelectorLoop -{ - protected int fd; - private Array!epoll_event events; - - /** - * Initializes the loop. - */ - this() @nogc - { - if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0) - { - throw defaultAllocator.make!BadLoopException("epoll initialization failed"); - } - super(); - events = Array!epoll_event(maxEvents); - } - - /** - * Frees loop internals. - */ - ~this() @nogc - { - close(fd); - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * watcher = Watcher. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - protected override bool reify(SocketWatcher watcher, - EventMask oldEvents, - EventMask events) @nogc - { - int op = EPOLL_CTL_DEL; - epoll_event ev; - - if (events == oldEvents) - { - return true; - } - if (events && oldEvents) - { - op = EPOLL_CTL_MOD; - } - else if (events && !oldEvents) - { - op = EPOLL_CTL_ADD; - } - - ev.data.fd = watcher.socket.handle; - ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0) - | (events & Event.write ? EPOLLOUT : 0) - | EPOLLET; - - return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0; - } - - /** - * Does the actual polling. - */ - protected override void poll() @nogc - { - // Don't block - immutable timeout = cast(immutable int) blockTime.total!"msecs"; - auto eventCount = epoll_wait(fd, events.get().ptr, maxEvents, timeout); - - if (eventCount < 0) - { - if (errno != EINTR) - { - throw defaultAllocator.make!BadLoopException(); - } - return; - } - - for (auto i = 0; i < eventCount; ++i) - { - auto transport = cast(StreamTransport) connections[events[i].data.fd]; - - if (transport is null) - { - auto connection = cast(ConnectionWatcher) connections[events[i].data.fd]; - assert(connection !is null); - - acceptConnections(connection); - } - else if (events[i].events & EPOLLERR) - { - kill(transport); - continue; - } - else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP)) - { - SocketException exception; - try - { - ptrdiff_t received; - do - { - received = transport.socket.receive(transport.output[]); - transport.output += received; - } - while (received); - } - catch (SocketException e) - { - exception = e; - } - if (transport.socket.disconnected) - { - kill(transport, exception); - continue; - } - else if (transport.output.length) - { - pendings.insertBack(transport); - } - } - if (events[i].events & EPOLLOUT) - { - transport.writeReady = true; - if (transport.input.length) - { - feed(transport); - } - } - } - } - - /** - * Returns: The blocking time. - */ - override protected @property inout(Duration) blockTime() - inout @safe pure nothrow - { - return min(super.blockTime, 1.dur!"seconds"); - } -} diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d deleted file mode 100644 index df8c8d6..0000000 --- a/source/tanya/async/event/iocp.d +++ /dev/null @@ -1,390 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Event loop implementation for Windows. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/iocp.d, - * tanya/async/event/iocp.d) - */ -module tanya.async.event.iocp; - -version (D_Ddoc) -{ -} -else version (Windows): - -import core.sys.windows.mswsock; -import core.sys.windows.winsock2; -import tanya.async.loop; -import tanya.async.protocol; -import tanya.async.transport; -import tanya.async.watcher; -import tanya.container.buffer; -import tanya.memory.allocator; -import tanya.network.socket; -import tanya.sys.windows.winbase; - -/** - * Transport for stream sockets. - */ -final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport -{ - private SocketException exception; - - private ReadBuffer!ubyte output; - - private WriteBuffer!ubyte input; - - private Protocol protocol_; - - private bool closing; - - /** - * Creates new completion port transport. - * - * Params: - * socket = Socket. - * - * Precondition: $(D_INLINECODE socket !is null) - */ - this(OverlappedConnectedSocket socket) @nogc - { - super(socket); - output = ReadBuffer!ubyte(8192, 1024); - input = WriteBuffer!ubyte(8192); - active = true; - } - - /** - * Returns: Socket. - * - * Postcondition: $(D_INLINECODE socket !is null) - */ - override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc - out (socket) - { - assert(socket !is null); - } - do - { - return cast(OverlappedConnectedSocket) socket_; - } - - /** - * Returns $(D_PARAM true) if the transport is closing or closed. - */ - bool isClosing() const pure nothrow @safe @nogc - { - return closing; - } - - /** - * Close the transport. - * - * Buffered data will be flushed. No more data will be received. - */ - void close() pure nothrow @safe @nogc - { - closing = true; - } - - /** - * Write some data to the transport. - * - * Params: - * data = Data to send. - */ - void write(ubyte[] data) @nogc - { - input ~= data; - } - - /** - * Returns: Application protocol. - */ - @property Protocol protocol() pure nothrow @safe @nogc - { - return protocol_; - } - - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop. - * - * Params: - * protocol = Application protocol. - * - * Precondition: $(D_INLINECODE protocol !is null) - */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc - in - { - assert(protocol !is null); - } - do - { - protocol_ = protocol; - } - - /** - * Invokes the watcher callback. - */ - override void invoke() @nogc - { - if (output.length) - { - immutable empty = input.length == 0; - protocol.received(output[0 .. $]); - output.clear(); - if (empty) - { - SocketState overlapped; - try - { - overlapped = defaultAllocator.make!SocketState; - socket.beginSend(input[], overlapped); - } - catch (SocketException e) - { - defaultAllocator.dispose(overlapped); - defaultAllocator.dispose(e); - } - } - } - else - { - protocol.disconnected(exception); - defaultAllocator.dispose(protocol_); - defaultAllocator.dispose(exception); - active = false; - } - } -} - -final class IOCPLoop : Loop -{ - protected HANDLE completionPort; - - protected OVERLAPPED overlap; - - /** - * Initializes the loop. - */ - this() @nogc - { - super(); - - completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0); - if (!completionPort) - { - throw make!BadLoopException(defaultAllocator, - "Creating completion port failed"); - } - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * watcher = Watcher. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - override protected bool reify(SocketWatcher watcher, - EventMask oldEvents, - EventMask events) @nogc - { - SocketState overlapped; - if (!(oldEvents & Event.accept) && (events & Event.accept)) - { - auto socket = cast(OverlappedStreamSocket) watcher.socket; - assert(socket !is null); - - if (CreateIoCompletionPort(cast(HANDLE) socket.handle, - completionPort, - cast(size_t) (cast(void*) watcher), - 0) !is completionPort) - { - return false; - } - - try - { - overlapped = defaultAllocator.make!SocketState; - socket.beginAccept(overlapped); - } - catch (SocketException e) - { - defaultAllocator.dispose(overlapped); - defaultAllocator.dispose(e); - return false; - } - } - if ((!(oldEvents & Event.read) && (events & Event.read)) - || (!(oldEvents & Event.write) && (events & Event.write))) - { - auto transport = cast(StreamTransport) watcher; - assert(transport !is null); - - if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle, - completionPort, - cast(size_t) (cast(void*) watcher), - 0) !is completionPort) - { - return false; - } - - // Begin to read - if (!(oldEvents & Event.read) && (events & Event.read)) - { - try - { - overlapped = defaultAllocator.make!SocketState; - transport.socket.beginReceive(transport.output[], overlapped); - } - catch (SocketException e) - { - defaultAllocator.dispose(overlapped); - defaultAllocator.dispose(e); - return false; - } - } - } - return true; - } - - private void kill(StreamTransport transport, - SocketException exception = null) @nogc - in - { - assert(transport !is null); - } - do - { - transport.socket.shutdown(); - defaultAllocator.dispose(transport.socket); - transport.exception = exception; - pendings.insertBack(transport); - } - - /** - * Does the actual polling. - */ - override protected void poll() @nogc - { - DWORD lpNumberOfBytes; - size_t key; - OVERLAPPED* overlap; - immutable timeout = cast(immutable int) blockTime.total!"msecs"; - - auto result = GetQueuedCompletionStatus(completionPort, - &lpNumberOfBytes, - &key, - &overlap, - timeout); - if (result == FALSE && overlap is null) - { - return; // Timeout - } - - enum size_t offset = size_t.sizeof * 2; - auto overlapped = cast(SocketState) ((cast(void*) overlap) - offset); - assert(overlapped !is null); - scope (failure) - { - defaultAllocator.dispose(overlapped); - } - - switch (overlapped.event) - { - case OverlappedSocketEvent.accept: - auto connection = cast(ConnectionWatcher) (cast(void*) key); - assert(connection !is null); - - auto listener = cast(OverlappedStreamSocket) connection.socket; - assert(listener !is null); - - auto socket = listener.endAccept(overlapped); - auto transport = defaultAllocator.make!StreamTransport(socket); - - connection.incoming.insertBack(transport); - - reify(transport, - EventMask(Event.none), - EventMask(Event.read | Event.write)); - - pendings.insertBack(connection); - listener.beginAccept(overlapped); - break; - case OverlappedSocketEvent.read: - auto transport = cast(StreamTransport) (cast(void*) key); - assert(transport !is null); - - if (!transport.active) - { - defaultAllocator.dispose(transport); - defaultAllocator.dispose(overlapped); - return; - } - - int received; - SocketException exception; - try - { - received = transport.socket.endReceive(overlapped); - } - catch (SocketException e) - { - exception = e; - } - if (transport.socket.disconnected) - { - // We want to get one last notification to destroy the watcher. - transport.socket.beginReceive(transport.output[], overlapped); - kill(transport, exception); - } - else if (received > 0) - { - immutable full = transport.output.free == received; - - transport.output += received; - // Receive was interrupted because the buffer is full. We have to continue. - if (full) - { - transport.socket.beginReceive(transport.output[], overlapped); - } - pendings.insertBack(transport); - } - break; - case OverlappedSocketEvent.write: - auto transport = cast(StreamTransport) (cast(void*) key); - assert(transport !is null); - - transport.input += transport.socket.endSend(overlapped); - if (transport.input.length > 0) - { - transport.socket.beginSend(transport.input[], overlapped); - } - else - { - transport.socket.beginReceive(transport.output[], overlapped); - if (transport.isClosing()) - { - kill(transport); - } - } - break; - default: - assert(false, "Unknown event"); - } - } -} diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d deleted file mode 100644 index b3c1826..0000000 --- a/source/tanya/async/event/kqueue.d +++ /dev/null @@ -1,331 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/* - * Event loop implementation for *BSD. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/kqueue.d, - * tanya/async/event/kqueue.d) - */ -module tanya.async.event.kqueue; - -version (D_Ddoc) -{ -} -else version (OSX) -{ - version = MacBSD; -} -else version (iOS) -{ - version = MacBSD; -} -else version (TVOS) -{ - version = MacBSD; -} -else version (WatchOS) -{ - version = MacBSD; -} -else version (FreeBSD) -{ - version = MacBSD; -} -else version (OpenBSD) -{ - version = MacBSD; -} -else version (DragonFlyBSD) -{ - version = MacBSD; -} - -version (MacBSD): - -import core.stdc.errno; -import core.sys.posix.time; // timespec -import core.sys.posix.unistd; -import core.time; -import tanya.algorithm.comparison; -import tanya.async.event.selector; -import tanya.async.loop; -import tanya.async.transport; -import tanya.async.watcher; -import tanya.container.array; -import tanya.memory.allocator; -import tanya.network.socket; - -void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) pure nothrow @nogc -{ - *kevp = kevent_t(args); -} - -enum : short -{ - EVFILT_READ = -1, - EVFILT_WRITE = -2, - EVFILT_AIO = -3, /* attached to aio requests */ - EVFILT_VNODE = -4, /* attached to vnodes */ - EVFILT_PROC = -5, /* attached to struct proc */ - EVFILT_SIGNAL = -6, /* attached to struct proc */ - EVFILT_TIMER = -7, /* timers */ - EVFILT_MACHPORT = -8, /* Mach portsets */ - EVFILT_FS = -9, /* filesystem events */ - EVFILT_USER = -10, /* User events */ - EVFILT_VM = -12, /* virtual memory events */ - EVFILT_SYSCOUNT = 11 -} - -struct kevent_t -{ - uintptr_t ident; // Identifier for this event - short filter; // Filter for event - ushort flags; - uint fflags; - intptr_t data; - void* udata; // Opaque user data identifier -} - -enum -{ - /* actions */ - EV_ADD = 0x0001, /* add event to kq (implies enable) */ - EV_DELETE = 0x0002, /* delete event from kq */ - EV_ENABLE = 0x0004, /* enable event */ - EV_DISABLE = 0x0008, /* disable event (not reported) */ - - /* flags */ - EV_ONESHOT = 0x0010, /* only report one occurrence */ - EV_CLEAR = 0x0020, /* clear event state after reporting */ - EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ - EV_DISPATCH = 0x0080, /* disable event after reporting */ - - EV_SYSFLAGS = 0xF000, /* reserved by system */ - EV_FLAG1 = 0x2000, /* filter-specific flag */ - - /* returned values */ - EV_EOF = 0x8000, /* EOF detected */ - EV_ERROR = 0x4000, /* error, data contains errno */ -} - -extern(C) int kqueue() nothrow @nogc; -extern(C) int kevent(int kq, const kevent_t *changelist, int nchanges, - kevent_t *eventlist, int nevents, const timespec *timeout) - nothrow @nogc; - -final class KqueueLoop : SelectorLoop -{ - protected int fd; - private Array!kevent_t events; - private Array!kevent_t changes; - private size_t changeCount; - - /** - * Returns: Maximal event count can be got at a time - * (should be supported by the backend). - */ - override protected @property uint maxEvents() - const pure nothrow @safe @nogc - { - return cast(uint) events.length; - } - - this() @nogc - { - super(); - - if ((fd = kqueue()) == -1) - { - throw make!BadLoopException(defaultAllocator, - "kqueue initialization failed"); - } - events = Array!kevent_t(64); - changes = Array!kevent_t(64); - } - - /** - * Frees loop internals. - */ - ~this() @nogc - { - close(fd); - } - - private void set(SocketType socket, short filter, ushort flags) @nogc - { - if (changes.length <= changeCount) - { - changes.length = changeCount + maxEvents; - } - EV_SET(&changes[changeCount], - cast(ulong) socket, - filter, - flags, - 0U, - 0, - null); - ++changeCount; - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * watcher = Watcher. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - override protected bool reify(SocketWatcher watcher, - EventMask oldEvents, - EventMask events) @nogc - { - if (events != oldEvents) - { - if (oldEvents & Event.read || oldEvents & Event.accept) - { - set(watcher.socket.handle, EVFILT_READ, EV_DELETE); - } - if (oldEvents & Event.write) - { - set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE); - } - } - if (events & (Event.read | events & Event.accept)) - { - set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE); - } - if (events & Event.write) - { - set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH); - } - return true; - } - - /** - * Does the actual polling. - */ - protected override void poll() @nogc - { - timespec ts; - blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec); - - if (changeCount > maxEvents) - { - events.length = changes.length; - } - - auto eventCount = kevent(fd, - changes.get().ptr, - cast(int) changeCount, - events.get().ptr, - maxEvents, - &ts); - changeCount = 0; - - if (eventCount < 0) - { - if (errno != EINTR) - { - throw defaultAllocator.make!BadLoopException(); - } - return; - } - - for (int i; i < eventCount; ++i) - { - assert(connections.length > events[i].ident); - - auto transport = cast(StreamTransport) connections[events[i].ident]; - // If it is a ConnectionWatcher. Accept connections. - if (transport is null) - { - auto connection = cast(ConnectionWatcher) connections[events[i].ident]; - assert(connection !is null); - - acceptConnections(connection); - } - else if (events[i].flags & EV_ERROR) - { - kill(transport); - } - else if (events[i].filter == EVFILT_READ) - { - SocketException exception; - try - { - ptrdiff_t received; - do - { - received = transport.socket.receive(transport.output[]); - transport.output += received; - } - while (received); - } - catch (SocketException e) - { - exception = e; - } - if (transport.socket.disconnected) - { - kill(transport, exception); - } - else if (transport.output.length) - { - pendings.insertBack(transport); - } - } - else if (events[i].filter == EVFILT_WRITE) - { - transport.writeReady = true; - if (transport.input.length) - { - feed(transport); - } - } - } - } - - /** - * Returns: The blocking time. - */ - override protected @property inout(Duration) blockTime() - inout @nogc @safe pure nothrow - { - return min(super.blockTime, 1.dur!"seconds"); - } - - /** - * If the transport couldn't send the data, the further sending should - * be handled by the event loop. - * - * Params: - * transport = Transport. - * exception = Exception thrown on sending. - * - * Returns: $(D_KEYWORD true) if the operation could be successfully - * completed or scheduled, $(D_KEYWORD false) otherwise (the - * transport will be destroyed then). - */ - protected override bool feed(StreamTransport transport, - SocketException exception = null) @nogc - { - if (!super.feed(transport, exception)) - { - return false; - } - if (!transport.writeReady) - { - set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH); - return true; - } - return false; - } -} diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d deleted file mode 100644 index f954e8c..0000000 --- a/source/tanya/async/event/selector.d +++ /dev/null @@ -1,407 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/* - * This module contains base implementations for reactor event loops. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/selector.d, - * tanya/async/event/selector.d) - */ -module tanya.async.event.selector; - -version (D_Ddoc) -{ -} -else version (Posix): - -import tanya.async.loop; -import tanya.async.protocol; -import tanya.async.transport; -import tanya.async.watcher; -import tanya.container.array; -import tanya.container.buffer; -import tanya.memory.allocator; -import tanya.net.socket; - -/** - * Transport for stream sockets. - */ -package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport -{ - private SelectorLoop loop; - - private SocketException exception; - - package ReadBuffer!ubyte output; - - package WriteBuffer!ubyte input; - - private Protocol protocol_; - - private bool closing; - - /// Received notification that the underlying socket is write-ready. - package bool writeReady; - - /** - * Params: - * loop = Event loop. - * socket = Socket. - * - * Precondition: $(D_INLINECODE loop !is null && socket !is null) - */ - this(SelectorLoop loop, ConnectedSocket socket) @nogc - in - { - assert(loop !is null); - } - do - { - super(socket); - this.loop = loop; - output = ReadBuffer!ubyte(8192, 1024); - input = WriteBuffer!ubyte(8192); - active = true; - } - - /** - * Returns: Socket. - * - * Postcondition: $(D_INLINECODE socket !is null) - */ - override @property ConnectedSocket socket() pure nothrow @safe @nogc - out (socket) - { - assert(socket !is null); - } - do - { - return cast(ConnectedSocket) socket_; - } - - private @property void socket(ConnectedSocket socket) - pure nothrow @safe @nogc - in - { - assert(socket !is null); - } - do - { - socket_ = socket; - } - - /** - * Returns: Application protocol. - */ - @property Protocol protocol() pure nothrow @safe @nogc - { - return protocol_; - } - - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop. - * - * Params: - * protocol = Application protocol. - * - * Precondition: $(D_INLINECODE protocol !is null) - */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc - in - { - assert(protocol !is null); - } - do - { - protocol_ = protocol; - } - - /** - * Returns $(D_PARAM true) if the transport is closing or closed. - */ - bool isClosing() const pure nothrow @safe @nogc - { - return closing; - } - - /** - * Close the transport. - * - * Buffered data will be flushed. No more data will be received. - */ - void close() @nogc - { - closing = true; - loop.reify(this, - EventMask(Event.read | Event.write), - EventMask(Event.write)); - } - - /** - * Invokes the watcher callback. - */ - override void invoke() @nogc - { - if (output.length) - { - protocol.received(output[0 .. $]); - output.clear(); - if (isClosing() && input.length == 0) - { - loop.kill(this); - } - } - else - { - protocol.disconnected(exception); - defaultAllocator.dispose(protocol_); - defaultAllocator.dispose(exception); - active = false; - } - } - - /** - * Write some data to the transport. - * - * Params: - * data = Data to send. - */ - void write(ubyte[] data) @nogc - { - if (!data.length) - { - return; - } - // Try to write if the socket is write ready. - if (writeReady) - { - ptrdiff_t sent; - SocketException exception; - try - { - sent = socket.send(data); - if (sent == 0) - { - writeReady = false; - } - } - catch (SocketException e) - { - writeReady = false; - exception = e; - } - if (sent < data.length) - { - input ~= data[sent..$]; - loop.feed(this, exception); - } - } - else - { - input ~= data; - } - } -} - -abstract class SelectorLoop : Loop -{ - /// Pending connections. - protected Array!SocketWatcher connections; - - this() @nogc - { - super(); - this.connections = Array!SocketWatcher(maxEvents); - } - - ~this() @nogc - { - foreach (ref connection; this.connections[]) - { - // We want to free only the transports. ConnectionWatcher are - // created by the user and should be freed by himself. - if (cast(StreamTransport) connection !is null) - { - defaultAllocator.dispose(connection); - } - } - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * watcher = Watcher. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - override abstract protected bool reify(SocketWatcher watcher, - EventMask oldEvents, - EventMask events) @nogc; - - /** - * Kills the watcher and closes the connection. - * - * Params: - * transport = Transport. - * exception = Occurred exception. - */ - protected void kill(StreamTransport transport, - SocketException exception = null) @nogc - in - { - assert(transport !is null); - } - do - { - transport.socket.shutdown(); - defaultAllocator.dispose(transport.socket); - transport.exception = exception; - pendings.insertBack(transport); - } - - /** - * If the transport couldn't send the data, the further sending should - * be handled by the event loop. - * - * Params: - * transport = Transport. - * exception = Exception thrown on sending. - * - * Returns: $(D_KEYWORD true) if the operation could be successfully - * completed or scheduled, $(D_KEYWORD false) otherwise (the - * transport will be destroyed then). - */ - protected bool feed(StreamTransport transport, - SocketException exception = null) @nogc - in - { - assert(transport !is null); - } - do - { - while (transport.input.length && transport.writeReady) - { - try - { - ptrdiff_t sent = transport.socket.send(transport.input[]); - if (sent == 0) - { - transport.writeReady = false; - } - else - { - transport.input += sent; - } - } - catch (SocketException e) - { - exception = e; - transport.writeReady = false; - } - } - if (exception !is null) - { - kill(transport, exception); - return false; - } - if (transport.input.length == 0 && transport.isClosing()) - { - kill(transport); - } - return true; - } - - /** - * Start watching. - * - * Params: - * watcher = Watcher. - */ - override void start(ConnectionWatcher watcher) @nogc - { - if (watcher.active) - { - return; - } - - if (connections.length <= watcher.socket) - { - connections.length = watcher.socket.handle + maxEvents / 2; - } - connections[watcher.socket.handle] = watcher; - - super.start(watcher); - } - - /** - * Accept incoming connections. - * - * Params: - * connection = Connection watcher ready to accept. - */ - package void acceptConnections(ConnectionWatcher connection) @nogc - in - { - assert(connection !is null); - } - do - { - while (true) - { - ConnectedSocket client; - try - { - client = (cast(StreamSocket) connection.socket).accept(); - } - catch (SocketException e) - { - defaultAllocator.dispose(e); - break; - } - if (client is null) - { - break; - } - - StreamTransport transport; - - if (connections.length > client.handle) - { - transport = cast(StreamTransport) connections[client.handle]; - } - else - { - connections.length = client.handle + maxEvents / 2; - } - if (transport is null) - { - transport = defaultAllocator.make!StreamTransport(this, client); - connections[client.handle] = transport; - } - else - { - transport.socket = client; - } - - reify(transport, - EventMask(Event.none), - EventMask(Event.read | Event.write)); - connection.incoming.insertBack(transport); - } - - if (!connection.incoming.empty) - { - pendings.insertBack(connection); - } - } -} diff --git a/source/tanya/async/iocp.d b/source/tanya/async/iocp.d deleted file mode 100644 index 156e7be..0000000 --- a/source/tanya/async/iocp.d +++ /dev/null @@ -1,56 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * This module provides API for Windows I/O Completion Ports. - * - * Note: Available only on Windows. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/iocp.d, - * tanya/async/iocp.d) - */ -module tanya.async.iocp; - -version (Windows) -{ - version = WindowsDoc; -} -else version (D_Ddoc) -{ - version = WindowsDoc; - version (Windows) - { - } - else - { - private struct OVERLAPPED - { - } - private alias HANDLE = void*; - } -} - -version (WindowsDoc): - -import core.sys.windows.winbase; - -/** - * Provides an extendable representation of a Win32 $(D_PSYMBOL OVERLAPPED) - * structure. - */ -class State -{ - /// For internal use by Windows API. - align(1) OVERLAPPED overlapped; - - /// File/socket handle. - HANDLE handle; - - /// For keeping events or event masks. - int event; -} diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d deleted file mode 100644 index 1d88575..0000000 --- a/source/tanya/async/loop.d +++ /dev/null @@ -1,352 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Interface for the event loop implementations and the default event loop - * chooser. - * - * --- - * import tanya.async; - * import tanya.memory; - * import tanya.network.socket; - * - * class EchoProtocol : TransmissionControlProtocol - * { - * private DuplexTransport transport; - * - * void received(in ubyte[] data) @nogc - * { - * ubyte[512] buffer; - * buffer[0 .. data.length] = data; - * transport.write(buffer[]); - * } - * - * void connected(DuplexTransport transport) @nogc - * { - * this.transport = transport; - * } - * - * void disconnected(SocketException e) @nogc - * { - * } - * } - * - * void main() - * { - * auto address = address4("127.0.0.1"); - * auto endpoint = Endpoint(address.get, cast(ushort) 8192); - * - * version (Windows) - * { - * auto sock = defaultAllocator.make!OverlappedStreamSocket(AddressFamily.inet); - * } - * else - * { - * auto sock = defaultAllocator.make!StreamSocket(AddressFamily.inet); - * sock.blocking = false; - * } - * - * sock.bind(endpoint); - * sock.listen(5); - * - * auto io = defaultAllocator.make!ConnectionWatcher(sock); - * io.setProtocol!EchoProtocol; - * - * defaultLoop.start(io); - * defaultLoop.run(); - * - * sock.shutdown(); - * defaultAllocator.dispose(io); - * defaultAllocator.dispose(sock); - * } - * --- - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/loop.d, - * tanya/async/loop.d) - */ -module tanya.async.loop; - -import core.time; -import tanya.async.transport; -import tanya.async.watcher; -import tanya.bitmanip; -import tanya.container.buffer; -import tanya.container.list; -import tanya.memory.allocator; -import tanya.net.socket; - -version (DisableBackends) -{ -} -else version (D_Ddoc) -{ -} -else version (linux) -{ - import tanya.async.event.epoll; - version = Epoll; -} -else version (Windows) -{ - import tanya.async.event.iocp; - version = IOCP; -} -else version (OSX) -{ - version = Kqueue; -} -else version (iOS) -{ - version = Kqueue; -} -else version (FreeBSD) -{ - version = Kqueue; -} -else version (OpenBSD) -{ - version = Kqueue; -} -else version (DragonFlyBSD) -{ - version = Kqueue; -} - -/** - * Events. - */ -enum Event : uint -{ - none = 0x00, /// No events. - read = 0x01, /// Non-blocking read call. - write = 0x02, /// Non-blocking write call. - accept = 0x04, /// Connection made. - error = 0x80000000, /// Sent when an error occurs. -} - -alias EventMask = BitFlags!Event; - -/** - * Event loop. - */ -abstract class Loop -{ - protected bool done = true; - - /// Pending watchers. - protected DList!Watcher pendings; - - /** - * Returns: Maximal event count can be got at a time - * (should be supported by the backend). - */ - protected @property uint maxEvents() - const pure nothrow @safe @nogc - { - return 128U; - } - - /** - * Initializes the loop. - */ - this() @nogc - { - } - - /** - * Frees loop internals. - */ - ~this() @nogc - { - for (; !this.pendings.empty; this.pendings.removeFront()) - { - defaultAllocator.dispose(this.pendings.front); - } - } - - /** - * Starts the loop. - */ - void run() @nogc - { - this.done = false; - do - { - poll(); - - // Invoke pendings - for (; !this.pendings.empty; this.pendings.removeFront()) - { - this.pendings.front.invoke(); - } - } - while (!this.done); - } - - /** - * Break out of the loop. - */ - void unloop() @safe pure nothrow @nogc - { - this.done = true; - } - - /** - * Start watching. - * - * Params: - * watcher = Watcher. - */ - void start(ConnectionWatcher watcher) @nogc - { - if (watcher.active) - { - return; - } - watcher.active = true; - - reify(watcher, EventMask(Event.none), EventMask(Event.accept)); - } - - /** - * Stop watching. - * - * Params: - * watcher = Watcher. - */ - void stop(ConnectionWatcher watcher) @nogc - { - if (!watcher.active) - { - return; - } - watcher.active = false; - - reify(watcher, EventMask(Event.accept), EventMask(Event.none)); - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * watcher = Watcher. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - abstract protected bool reify(SocketWatcher watcher, - EventMask oldEvents, - EventMask events) @nogc; - - /** - * Returns: The blocking time. - */ - protected @property inout(Duration) blockTime() - inout @safe pure nothrow @nogc - { - // Don't block if we have to do. - return pendings.empty ? blockTime_ : Duration.zero; - } - - /** - * Sets the blocking time for IO watchers. - * - * Params: - * blockTime = The blocking time. Cannot be larger than - * $(D_PSYMBOL maxBlockTime). - */ - protected @property void blockTime(in Duration blockTime) @safe pure nothrow @nogc - in - { - assert(blockTime <= 1.dur!"hours", "Too long to wait."); - assert(!blockTime.isNegative); - } - do - { - blockTime_ = blockTime; - } - - /** - * Does the actual polling. - */ - abstract protected void poll() @nogc; - - /// Maximal block time. - protected Duration blockTime_ = 1.dur!"minutes"; -} - -/** - * Exception thrown on errors in the event loop. - */ -class BadLoopException : Exception -{ - /** - * Params: - * file = The file where the exception occurred. - * line = The line number where the exception occurred. - * next = The previous exception in the chain of exceptions, if any. - */ - this(string file = __FILE__, size_t line = __LINE__, Throwable next = null) - pure nothrow const @safe @nogc - { - super("Event loop cannot be initialized.", file, line, next); - } -} - -/** - * Returns the event loop used by default. If an event loop wasn't set with - * $(D_PSYMBOL defaultLoop) before, $(D_PSYMBOL defaultLoop) will try to - * choose an event loop supported on the system. - * - * Returns: The default event loop. - */ -@property Loop defaultLoop() @nogc -{ - if (defaultLoop_ !is null) - { - return defaultLoop_; - } - version (Epoll) - { - defaultLoop_ = defaultAllocator.make!EpollLoop; - } - else version (IOCP) - { - defaultLoop_ = defaultAllocator.make!IOCPLoop; - } - else version (Kqueue) - { - import tanya.async.event.kqueue; - defaultLoop_ = defaultAllocator.make!KqueueLoop; - } - return defaultLoop_; -} - -/** - * Sets the default event loop. - * - * This property makes it possible to implement your own backends or event - * loops, for example, if the system is not supported or if you want to - * extend the supported implementation. Just extend $(D_PSYMBOL Loop) and pass - * your implementation to this property. - * - * Params: - * loop = The event loop. - */ -@property void defaultLoop(Loop loop) @nogc -in -{ - assert(loop !is null); -} -do -{ - defaultLoop_ = loop; -} - -private Loop defaultLoop_; diff --git a/source/tanya/async/package.d b/source/tanya/async/package.d deleted file mode 100644 index 18f361e..0000000 --- a/source/tanya/async/package.d +++ /dev/null @@ -1,20 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * This package provides asynchronous capabilities. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/package.d, - * tanya/async/package.d) - */ -module tanya.async; - -public import tanya.async.loop; -public import tanya.async.protocol; -public import tanya.async.transport; -public import tanya.async.watcher; diff --git a/source/tanya/async/protocol.d b/source/tanya/async/protocol.d deleted file mode 100644 index c51dae5..0000000 --- a/source/tanya/async/protocol.d +++ /dev/null @@ -1,58 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * This module contains protocol which handle data in asynchronous - * applications. - * - * When an event from the network arrives, a protocol method gets - * called and can respond to the event. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/protocol.d, - * tanya/async/protocol.d) - */ -module tanya.async.protocol; - -import tanya.async.transport; -import tanya.net.socket; - -/** - * Common protocol interface. - */ -interface Protocol -{ - /** - * Params: - * data = Read data. - */ - void received(in ubyte[] data) @nogc; - - /** - * Called when a connection is made. - * - * Params: - * transport = Protocol transport. - */ - void connected(DuplexTransport transport) @nogc; - - /** - * Called when a connection is lost. - * - * Params: - * exception = $(D_PSYMBOL Exception) if an error caused - * the disconnect, $(D_KEYWORD null) otherwise. - */ - void disconnected(SocketException exception) @nogc; -} - -/** - * Interface for TCP. - */ -interface TransmissionControlProtocol : Protocol -{ -} diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d deleted file mode 100644 index 87051e8..0000000 --- a/source/tanya/async/transport.d +++ /dev/null @@ -1,104 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * This module contains transports which are responsible for data dilvery - * between two parties of an asynchronous communication. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/transport.d, - * tanya/async/transport.d) - */ -module tanya.async.transport; - -import tanya.async.protocol; -import tanya.net.socket; - -/** - * Base transport interface. - */ -interface Transport -{ -} - -/** - * Interface for read-only transports. - */ -interface ReadTransport : Transport -{ -} - -/** - * Interface for write-only transports. - */ -interface WriteTransport : Transport -{ - /** - * Write some data to the transport. - * - * Params: - * data = Data to send. - */ - void write(ubyte[] data) @nogc; -} - -/** - * Represents a bidirectional transport. - */ -interface DuplexTransport : ReadTransport, WriteTransport -{ - /** - * Returns: Application protocol. - * - * Postcondition: $(D_INLINECODE protocol !is null) - */ - @property Protocol protocol() pure nothrow @safe @nogc - out (protocol) - { - assert(protocol !is null); - } - - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop. - * - * Params: - * protocol = Application protocol. - * - * Precondition: $(D_INLINECODE protocol !is null) - */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc - in - { - assert(protocol !is null); - } - - - /** - * Returns $(D_PARAM true) if the transport is closing or closed. - */ - bool isClosing() const pure nothrow @safe @nogc; - - /** - * Close the transport. - * - * Buffered data will be flushed. No more data will be received. - */ - void close() @nogc; -} - -/** - * Represents a socket transport. - */ -interface SocketTransport : Transport -{ - /** - * Returns: Socket. - */ - @property Socket socket() pure nothrow @safe @nogc; -} diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d deleted file mode 100644 index 1159779..0000000 --- a/source/tanya/async/watcher.d +++ /dev/null @@ -1,117 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Watchers register user's interest in some event. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/watcher.d, - * tanya/async/watcher.d) - */ -module tanya.async.watcher; - -import tanya.async.loop; -import tanya.async.protocol; -import tanya.async.transport; -import tanya.container.buffer; -import tanya.container.list; -import tanya.memory.allocator; -import tanya.net.socket; - -/** - * A watcher is an opaque structure that you allocate and register to record - * your interest in some event. - */ -abstract class Watcher -{ - /// Whether the watcher is active. - bool active; - - /** - * Invoke some action on event. - */ - void invoke() @nogc; -} - -/** - * Socket watcher. - */ -abstract class SocketWatcher : Watcher -{ - /// Watched socket. - protected Socket socket_; - - /** - * Params: - * socket = Socket. - * - * Precondition: $(D_INLINECODE socket !is null) - */ - this(Socket socket) pure nothrow @safe @nogc - in - { - assert(socket !is null); - } - do - { - socket_ = socket; - } - - /** - * Returns: Socket. - */ - @property Socket socket() pure nothrow @safe @nogc - { - return socket_; - } -} - -/** - * Connection watcher. - */ -class ConnectionWatcher : SocketWatcher -{ - /// Incoming connection queue. - DList!DuplexTransport incoming; - - private Protocol delegate() @nogc protocolFactory; - - /** - * Params: - * socket = Socket. - */ - this(Socket socket) @nogc - { - super(socket); - } - - /** - * Params: - * P = Protocol should be used. - */ - void setProtocol(P : Protocol)() @nogc - { - this.protocolFactory = () @nogc => cast(Protocol) defaultAllocator.make!P; - } - - /** - * Invokes new connection callback. - */ - override void invoke() @nogc - in - { - assert(protocolFactory !is null, "Protocol isn't set."); - } - do - { - for (; !this.incoming.empty; this.incoming.removeFront()) - { - this.incoming.front.protocol = protocolFactory(); - this.incoming.front.protocol.connected(this.incoming.front); - } - } -} diff --git a/source/tanya/net/package.d b/source/tanya/net/package.d index 37ac2cc..9626b9e 100644 --- a/source/tanya/net/package.d +++ b/source/tanya/net/package.d @@ -5,7 +5,7 @@ /** * Network programming. * - * Copyright: Eugene Wissner 2017-2020. + * Copyright: Eugene Wissner 2017-2022. * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, * Mozilla Public License, v. 2.0). * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) @@ -17,5 +17,4 @@ module tanya.net; public import tanya.net.iface; public import tanya.net.inet; public import tanya.net.ip; -public import tanya.net.socket; public import tanya.net.uri; diff --git a/source/tanya/net/socket.d b/source/tanya/net/socket.d deleted file mode 100644 index 0bdb84a..0000000 --- a/source/tanya/net/socket.d +++ /dev/null @@ -1,1258 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Low-level socket programming. - * - * Current API supports only server-side TCP communication. - * - * For an example of an asynchronous server refer to the documentation of the - * $(D_PSYMBOL tanya.async.loop) module. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/network/socket.d, - * tanya/network/socket.d) - */ -module tanya.net.socket; - -import core.stdc.errno; -import core.time; -public import std.socket : SocketOption, SocketOptionLevel; -import tanya.bitmanip; -import tanya.memory.allocator; -import tanya.meta.trait; -import tanya.os.error; -import tanya.net.iface; -import tanya.net.ip; - -/// Value returned by socket operations on error. -enum int socketError = -1; - -version (Posix) -{ - import core.stdc.errno; - import core.sys.posix.fcntl; - import core.sys.posix.netdb; - import core.sys.posix.netinet.in_; - import core.sys.posix.sys.socket; - import core.sys.posix.sys.time; - import core.sys.posix.unistd; - - enum SocketType : int - { - init = -1, - } - - private alias LingerField = int; -} -else version (Windows) -{ - import core.sys.windows.winbase; - import core.sys.windows.winerror; - import core.sys.windows.winsock2 : accept, - addrinfo, - bind, - closesocket, - FIONBIO, - freeaddrinfo, - getaddrinfo, - getsockopt, - ioctlsocket, - listen, - MSG_DONTROUTE, - MSG_OOB, - MSG_PEEK, - recv, - SD_BOTH, - SD_RECEIVE, - SD_SEND, - send, - setsockopt, - shutdown, - SO_TYPE, - SOCKADDR, - sockaddr, - sockaddr_in, - sockaddr_in6, - SOCKADDR_STORAGE, - socket, - socklen_t, - SOL_SOCKET, - WSAEWOULDBLOCK, - WSAGetLastError; - import tanya.async.iocp; - import tanya.sys.windows.def; - public import tanya.sys.windows.winbase; - public import tanya.sys.windows.winsock2; - - enum SocketType : size_t - { - init = ~0, - } - - private alias LingerField = ushort; - - enum OverlappedSocketEvent - { - accept = 1, - read = 2, - write = 3, - } - - class SocketState : State - { - private WSABUF buffer; - } - - /** - * Socket returned if a connection has been established. - * - * Note: Available only on Windows. - */ - class OverlappedConnectedSocket : ConnectedSocket - { - /** - * Create a socket. - * - * Params: - * handle = Socket handle. - * af = Address family. - */ - this(SocketType handle, AddressFamily af) @nogc - { - super(handle, af); - } - - /** - * Begins to asynchronously receive data from a connected socket. - * - * Params: - * buffer = Storage location for the received data. - * flags = Flags. - * overlapped = Unique operation identifier. - * - * Returns: $(D_KEYWORD true) if the operation could be finished synchronously. - * $(D_KEYWORD false) otherwise. - * - * Throws: $(D_PSYMBOL SocketException) if unable to receive. - */ - bool beginReceive(ubyte[] buffer, - SocketState overlapped, - Flags flags = Flags(Flag.none)) @nogc @trusted - { - auto receiveFlags = cast(DWORD) flags; - - overlapped.handle = cast(HANDLE) handle_; - overlapped.event = OverlappedSocketEvent.read; - overlapped.buffer.len = cast(ULONG) buffer.length; - overlapped.buffer.buf = cast(char*) buffer.ptr; - - auto result = WSARecv(handle_, - cast(WSABUF*) &overlapped.buffer, - 1u, - null, - &receiveFlags, - &overlapped.overlapped, - null); - - if (result == socketError && !wouldHaveBlocked) - { - throw defaultAllocator.make!SocketException("Unable to receive"); - } - return result == 0; - } - - /** - * Ends a pending asynchronous read. - * - * Params: - * overlapped = Unique operation identifier. - * - * Returns: Number of bytes received. - * - * Throws: $(D_PSYMBOL SocketException) if unable to receive. - * - * Postcondition: $(D_INLINECODE result >= 0). - */ - int endReceive(SocketState overlapped) @nogc @trusted - out (count) - { - assert(count >= 0); - } - do - { - DWORD lpNumber; - BOOL result = GetOverlappedResult(overlapped.handle, - &overlapped.overlapped, - &lpNumber, - FALSE); - if (result == FALSE && !wouldHaveBlocked) - { - disconnected_ = true; - throw defaultAllocator.make!SocketException("Unable to receive"); - } - if (lpNumber == 0) - { - disconnected_ = true; - } - return lpNumber; - } - - /** - * Sends data asynchronously to a connected socket. - * - * Params: - * buffer = Data to be sent. - * flags = Flags. - * overlapped = Unique operation identifier. - * - * Returns: $(D_KEYWORD true) if the operation could be finished synchronously. - * $(D_KEYWORD false) otherwise. - * - * Throws: $(D_PSYMBOL SocketException) if unable to send. - */ - bool beginSend(ubyte[] buffer, - SocketState overlapped, - Flags flags = Flags(Flag.none)) @nogc @trusted - { - overlapped.handle = cast(HANDLE) handle_; - overlapped.event = OverlappedSocketEvent.write; - overlapped.buffer.len = cast(ULONG) buffer.length; - overlapped.buffer.buf = cast(char*) buffer.ptr; - - auto result = WSASend(handle_, - &overlapped.buffer, - 1u, - null, - cast(DWORD) flags, - &overlapped.overlapped, - null); - - if (result == socketError && !wouldHaveBlocked) - { - disconnected_ = true; - throw defaultAllocator.make!SocketException("Unable to send"); - } - return result == 0; - } - - /** - * Ends a pending asynchronous send. - * - * Params: - * overlapped = Unique operation identifier. - * - * Returns: Number of bytes sent. - * - * Throws: $(D_PSYMBOL SocketException) if unable to receive. - * - * Postcondition: $(D_INLINECODE result >= 0). - */ - int endSend(SocketState overlapped) @nogc @trusted - out (count) - { - assert(count >= 0); - } - do - { - DWORD lpNumber; - BOOL result = GetOverlappedResult(overlapped.handle, - &overlapped.overlapped, - &lpNumber, - FALSE); - if (result == FALSE && !wouldHaveBlocked) - { - disconnected_ = true; - throw defaultAllocator.make!SocketException("Unable to receive"); - } - return lpNumber; - } - } - - /** - * Windows stream socket overlapped I/O. - */ - class OverlappedStreamSocket : StreamSocket - { - // Accept extension function pointer. - package LPFN_ACCEPTEX acceptExtension; - - /** - * Create a socket. - * - * Params: - * af = Address family. - * - * Throws: $(D_PSYMBOL SocketException) on errors. - */ - this(AddressFamily af) @nogc @trusted - { - super(af); - scope (failure) - { - this.close(); - } - blocking = false; - - GUID guidAcceptEx = WSAID_ACCEPTEX; - DWORD dwBytes; - - auto result = WSAIoctl(handle_, - SIO_GET_EXTENSION_FUNCTION_POINTER, - &guidAcceptEx, - guidAcceptEx.sizeof, - &acceptExtension, - acceptExtension.sizeof, - &dwBytes, - null, - null); - if (!result == socketError) - { - throw make!SocketException(defaultAllocator, - "Unable to retrieve an accept extension function pointer"); - } - } - - /** - * Begins an asynchronous operation to accept an incoming connection attempt. - * - * Params: - * overlapped = Unique operation identifier. - * - * Returns: $(D_KEYWORD true) if the operation could be finished synchronously. - * $(D_KEYWORD false) otherwise. - * - * Throws: $(D_PSYMBOL SocketException) on accept errors. - */ - bool beginAccept(SocketState overlapped) @nogc @trusted - { - auto socket = cast(SocketType) socket(addressFamily, 1, 0); - if (socket == SocketType.init) - { - throw defaultAllocator.make!SocketException("Unable to create socket"); - } - scope (failure) - { - closesocket(socket); - } - DWORD dwBytes; - overlapped.handle = cast(HANDLE) socket; - overlapped.event = OverlappedSocketEvent.accept; - - const len = (sockaddr_in.sizeof + 16) * 2; - overlapped.buffer.len = len; - overlapped.buffer.buf = cast(char*) defaultAllocator.allocate(len).ptr; - - // We don't want to get any data now, but only start to accept the connections - BOOL result = acceptExtension(handle_, - socket, - overlapped.buffer.buf, - 0u, - sockaddr_in.sizeof + 16, - sockaddr_in.sizeof + 16, - &dwBytes, - &overlapped.overlapped); - if (result == FALSE && !wouldHaveBlocked) - { - throw defaultAllocator.make!SocketException("Unable to accept socket connection"); - } - return result == TRUE; - } - - /** - * Asynchronously accepts an incoming connection attempt and creates a - * new socket to handle remote host communication. - * - * Params: - * overlapped = Unique operation identifier. - * - * Returns: Connected socket. - * - * Throws: $(D_PSYMBOL SocketException) if unable to accept. - */ - OverlappedConnectedSocket endAccept(SocketState overlapped) - @nogc @trusted - { - scope (exit) - { - defaultAllocator.dispose(overlapped.buffer.buf[0 .. overlapped.buffer.len]); - } - auto socket = make!OverlappedConnectedSocket(defaultAllocator, - cast(SocketType) overlapped.handle, - addressFamily); - scope (failure) - { - defaultAllocator.dispose(socket); - } - socket.setOption(SocketOptionLevel.SOCKET, - cast(SocketOption) SO_UPDATE_ACCEPT_CONTEXT, - cast(size_t) handle); - return socket; - } - } -} - -/** - * Socket option that specifies what should happen when the socket that - * promises reliable delivery still has untransmitted messages when - * it is closed. - */ -struct Linger -{ - /// If nonzero, $(D_PSYMBOL close) and $(D_PSYMBOL shutdown) block until - /// the data are transmitted or the timeout period has expired. - LingerField l_onoff; - - /// Time, in seconds to wait before any buffered data to be sent is - /// discarded. - LingerField l_linger; - - /** - * If $(D_PARAM timeout) is `0`, linger is disabled, otherwise enables the - * linger and sets the timeout. - * - * Params: - * timeout = Timeout, in seconds. - */ - this(const ushort timeout) - { - time = timeout; - } - - /// - unittest - { - { - auto linger = Linger(5); - assert(linger.enabled); - assert(linger.time == 5); - } - { - auto linger = Linger(0); - assert(!linger.enabled); - } - { // Default constructor. - Linger linger; - assert(!linger.enabled); - } - } - - /** - * System dependent constructor. - * - * Params: - * l_onoff = $(D_PSYMBOL l_onoff) value. - * l_linger = $(D_PSYMBOL l_linger) value. - */ - this(LingerField l_onoff, LingerField l_linger) - { - this.l_onoff = l_onoff; - this.l_linger = l_linger; - } - - /// - unittest - { - auto linger = Linger(1, 5); - assert(linger.l_onoff == 1); - assert(linger.l_linger == 5); - } - - /** - * Params: - * value = Whether to linger after the socket is closed. - * - * See_Also: $(D_PSYMBOL time). - */ - @property void enabled(const bool value) pure nothrow @safe @nogc - { - this.l_onoff = value; - } - - /** - * Returns: Whether to linger after the socket is closed. - */ - @property bool enabled() const pure nothrow @safe @nogc - { - return this.l_onoff != 0; - } - - /** - * Returns: Timeout period, in seconds, to wait before closing the socket - * if the $(D_PSYMBOL Linger) is $(D_PSYMBOL enabled). - */ - @property ushort time() const pure nothrow @safe @nogc - { - return this.l_linger & ushort.max; - } - - /** - * Sets timeout period, to wait before closing the socket if the - * $(D_PSYMBOL Linger) is $(D_PSYMBOL enabled), ignored otherwise. - * - * Params: - * timeout = Timeout period, in seconds. - */ - @property void time(const ushort timeout) pure nothrow @safe @nogc - { - this.l_onoff = timeout > 0; - this.l_linger = timeout; - } -} - -version (linux) -{ - enum SOCK_NONBLOCK = O_NONBLOCK; - extern(C) int accept4(int, sockaddr*, socklen_t*, int flags) @nogc nothrow; -} -else version (OSX) -{ - version = MacBSD; -} -else version (iOS) -{ - version = MacBSD; -} -else version (FreeBSD) -{ - version = MacBSD; -} -else version (OpenBSD) -{ - version = MacBSD; -} -else version (DragonFlyBSD) -{ - version = MacBSD; -} - -version (MacBSD) -{ - enum ESOCKTNOSUPPORT = 44; // Socket type not suppoted. -} - -private immutable -{ - typeof(&getaddrinfo) getaddrinfoPointer; - typeof(&freeaddrinfo) freeaddrinfoPointer; -} - -shared static this() -{ - version (Windows) - { - auto ws2Lib = GetModuleHandle("ws2_32.dll"); - - getaddrinfoPointer = cast(typeof(getaddrinfoPointer)) - GetProcAddress(ws2Lib, "getaddrinfo"); - freeaddrinfoPointer = cast(typeof(freeaddrinfoPointer)) - GetProcAddress(ws2Lib, "freeaddrinfo"); - } - else version (Posix) - { - getaddrinfoPointer = &getaddrinfo; - freeaddrinfoPointer = &freeaddrinfo; - } -} - -/** - * $(D_PSYMBOL SocketException) should be thrown only if one of the socket functions - * $(D_PSYMBOL socketError) and sets $(D_PSYMBOL errno), because - * $(D_PSYMBOL SocketException) relies on the $(D_PSYMBOL errno) value. - */ -class SocketException : Exception -{ - const ErrorCode.ErrorNo error = ErrorCode.ErrorNo.success; - - /** - * Params: - * msg = The message for the exception. - * file = The file where the exception occurred. - * line = The line number where the exception occurred. - * next = The previous exception in the chain of exceptions, if any. - */ - this(string msg, - string file = __FILE__, - size_t line = __LINE__, - Throwable next = null) @nogc @safe nothrow - { - super(msg, file, line, next); - - foreach (member; EnumMembers!(ErrorCode.ErrorNo)) - { - if (member == lastError) - { - error = member; - return; - } - } - if (lastError == ENOMEM) - { - error = ErrorCode.ErrorNo.noBufferSpace; - } - else if (lastError == EMFILE) - { - error = ErrorCode.ErrorNo.tooManyDescriptors; - } - else version (linux) - { - if (lastError == ENOSR) - { - error = ErrorCode.ErrorNo.networkDown; - } - } - else version (Posix) - { - if (lastError == EPROTO) - { - error = ErrorCode.ErrorNo.networkDown; - } - } - } -} - -/** - * Class for creating a network communication endpoint using the Berkeley - * sockets interfaces of different types. - */ -abstract class Socket -{ - version (Posix) - { - /** - * How a socket is shutdown. - */ - enum Shutdown : int - { - receive = SHUT_RD, /// Socket receives are disallowed - send = SHUT_WR, /// Socket sends are disallowed - both = SHUT_RDWR, /// Both receive and send - } - } - else version (Windows) - { - /// Property to get or set whether the socket is blocking or nonblocking. - private bool blocking_ = true; - - /** - * How a socket is shutdown. - */ - enum Shutdown : int - { - receive = SD_RECEIVE, /// Socket receives are disallowed. - send = SD_SEND, /// Socket sends are disallowed. - both = SD_BOTH, /// Both receive and send. - } - - // The WinSock timeouts seem to be effectively skewed by a constant - // offset of about half a second (in milliseconds). - private enum WINSOCK_TIMEOUT_SKEW = 500; - } - - /// Socket handle. - protected SocketType handle_; - - /// Address family. - protected AddressFamily family; - - private @property void handle(SocketType handle) @nogc - in - { - assert(handle != SocketType.init); - assert(handle_ == SocketType.init, "Socket handle cannot be changed"); - } - do - { - handle_ = handle; - - // Set the option to disable SIGPIPE on send() if the platform - // has it (e.g. on OS X). - static if (is(typeof(SO_NOSIGPIPE))) - { - setOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_NOSIGPIPE, true); - } - } - - @property inout(SocketType) handle() inout const pure nothrow @safe @nogc - { - return handle_; - } - - /** - * Create a socket. - * - * Params: - * handle = Socket. - * af = Address family. - */ - this(SocketType handle, AddressFamily af) @nogc - in - { - assert(handle != SocketType.init); - } - do - { - scope (failure) - { - this.close(); - } - this.handle = handle; - family = af; - } - - /** - * Closes the socket and calls the destructor on itself. - */ - ~this() nothrow @trusted @nogc - { - this.close(); - } - - /** - * Get a socket option. - * - * Params: - * level = Protocol level at that the option exists. - * option = Option. - * result = Buffer to save the result. - * - * Returns: The number of bytes written to $(D_PARAM result). - * - * Throws: $(D_PSYMBOL SocketException) on error. - */ - protected int getOption(SocketOptionLevel level, - SocketOption option, - void[] result) const @trusted @nogc - { - auto length = cast(socklen_t) result.length; - if (getsockopt(handle_, - cast(int) level, - cast(int) option, - result.ptr, - &length) == socketError) - { - throw defaultAllocator.make!SocketException("Unable to get socket option"); - } - return length; - } - - /// Ditto. - int getOption(SocketOptionLevel level, - SocketOption option, - out size_t result) const @trusted @nogc - { - return getOption(level, option, (&result)[0 .. 1]); - } - - /// Ditto. - int getOption(SocketOptionLevel level, - SocketOption option, - out Linger result) const @trusted @nogc - { - return getOption(level, option, (&result)[0 .. 1]); - } - - /// Ditto. - int getOption(SocketOptionLevel level, - SocketOption option, - out Duration result) const @trusted @nogc - { - // WinSock returns the timeout values as a milliseconds DWORD, - // while Linux and BSD return a timeval struct. - version (Posix) - { - timeval tv; - auto ret = getOption(level, option, (&tv)[0 .. 1]); - result = dur!"seconds"(tv.tv_sec) + dur!"usecs"(tv.tv_usec); - } - else version (Windows) - { - int msecs; - auto ret = getOption(level, option, (&msecs)[0 .. 1]); - if (option == SocketOption.RCVTIMEO) - { - msecs += WINSOCK_TIMEOUT_SKEW; - } - result = dur!"msecs"(msecs); - } - return ret; - } - - /** - * Set a socket option. - * - * Params: - * level = Protocol level at that the option exists. - * option = Option. - * value = Option value. - * - * Throws: $(D_PSYMBOL SocketException) on error. - */ - protected void setOption(SocketOptionLevel level, - SocketOption option, - void[] value) const @trusted @nogc - { - if (setsockopt(handle_, - cast(int)level, - cast(int)option, - value.ptr, - cast(uint) value.length) == socketError) - { - throw defaultAllocator.make!SocketException("Unable to set socket option"); - } - } - - /// Ditto. - void setOption(SocketOptionLevel level, SocketOption option, size_t value) - const @trusted @nogc - { - setOption(level, option, (&value)[0 .. 1]); - } - - /// Ditto. - void setOption(SocketOptionLevel level, SocketOption option, Linger value) - const @trusted @nogc - { - setOption(level, option, (&value)[0 .. 1]); - } - - /// Ditto. - void setOption(SocketOptionLevel level, SocketOption option, Duration value) - const @trusted @nogc - { - version (Posix) - { - timeval tv; - value.split!("seconds", "usecs")(tv.tv_sec, tv.tv_usec); - setOption(level, option, (&tv)[0 .. 1]); - } - else version (Windows) - { - auto msecs = cast(int) value.total!"msecs"; - if (msecs > 0 && option == SocketOption.RCVTIMEO) - { - msecs = max(1, msecs - WINSOCK_TIMEOUT_SKEW); - } - setOption(level, option, msecs); - } - } - - /** - * Returns: Socket's blocking flag. - */ - @property inout(bool) blocking() inout const nothrow @nogc - { - version (Posix) - { - return !(fcntl(handle_, F_GETFL, 0) & O_NONBLOCK); - } - else version (Windows) - { - return this.blocking_; - } - } - - /** - * Params: - * yes = Socket's blocking flag. - */ - @property void blocking(bool yes) @nogc - { - version (Posix) - { - int fl = fcntl(handle_, F_GETFL, 0); - - if (fl != socketError) - { - fl = yes ? fl & ~O_NONBLOCK : fl | O_NONBLOCK; - fl = fcntl(handle_, F_SETFL, fl); - } - if (fl == socketError) - { - throw make!SocketException(defaultAllocator, - "Unable to set socket blocking"); - } - } - else version (Windows) - { - uint num = !yes; - if (ioctlsocket(handle_, FIONBIO, &num) == socketError) - { - throw make!SocketException(defaultAllocator, - "Unable to set socket blocking"); - } - this.blocking_ = yes; - } - } - - /** - * Returns: The socket's address family. - */ - @property AddressFamily addressFamily() const @nogc @safe pure nothrow - { - return family; - } - - /** - * Returns: $(D_KEYWORD true) if this is a valid, alive socket. - */ - @property bool isAlive() @trusted const nothrow @nogc - { - int type; - socklen_t typesize = cast(socklen_t) type.sizeof; - return !getsockopt(handle_, SOL_SOCKET, SO_TYPE, cast(char*)&type, &typesize); - } - - /** - * Disables sends and/or receives. - * - * Params: - * how = What to disable. - * - * See_Also: - * $(D_PSYMBOL Shutdown) - */ - void shutdown(Shutdown how = Shutdown.both) @nogc @trusted const nothrow - { - .shutdown(handle_, cast(int)how); - } - - /** - * Immediately drop any connections and release socket resources. - * Calling $(D_PSYMBOL shutdown) before $(D_PSYMBOL close) is recommended - * for connection-oriented sockets. The $(D_PSYMBOL Socket) object is no - * longer usable after $(D_PSYMBOL close). - */ - void close() nothrow @trusted @nogc - { - version(Windows) - { - .closesocket(handle_); - } - else version(Posix) - { - .close(handle_); - } - handle_ = SocketType.init; - } - - /** - * Listen for an incoming connection. $(D_PSYMBOL bind) must be called before you - * can $(D_PSYMBOL listen). - * - * Params: - * backlog = Request of how many pending incoming connections are - * queued until $(D_PSYMBOL accept)ed. - */ - void listen(int backlog) const @trusted @nogc - { - if (.listen(handle_, backlog) == socketError) - { - throw defaultAllocator.make!SocketException("Unable to listen on socket"); - } - } - - /** - * Compare handles. - * - * Params: - * that = Another handle. - * - * Returns: Comparision result. - */ - int opCmp(size_t that) const pure nothrow @safe @nogc - { - return handle_ < that ? -1 : handle_ > that ? 1 : 0; - } -} - -/** - * Interface with common fileds for stream and connected sockets. - */ -interface ConnectionOrientedSocket -{ - /** - * Flags may be OR'ed together. - */ - enum Flag : int - { - /// No flags specified. - none = 0, - /// Out-of-band stream data. - outOfBand = MSG_OOB, - /// Peek at incoming data without removing it from the queue, only for receiving. - peek = MSG_PEEK, - /// Data should not be subject to routing; this flag may be ignored. Only for sending. - dontRoute = MSG_DONTROUTE, - } - - alias Flags = BitFlags!Flag; -} - -class StreamSocket : Socket, ConnectionOrientedSocket -{ - /** - * Create a socket. - * - * Params: - * af = Address family. - */ - this(AddressFamily af) @trusted @nogc - { - auto handle = cast(SocketType) socket(af, 1, 0); - if (handle == SocketType.init) - { - throw defaultAllocator.make!SocketException("Unable to create socket"); - } - super(handle, af); - } - - /** - * Associate a local address with this socket. - * - * Params: - * endpoint = Local address. - * - * Throws: $(D_PSYMBOL SocketException) if unable to bind. - */ - void bind(const Endpoint endpoint) const @nogc - { - if (.bind(handle_, cast(const(sockaddr)*) &endpoint, Endpoint.sizeof)) - { - throw defaultAllocator.make!SocketException("Unable to bind socket"); - } - } - - /** - * Accept an incoming connection. - * - * The blocking mode is always inherited. - * - * Returns: $(D_PSYMBOL Socket) for the accepted connection or - * $(D_KEYWORD null) if the call would block on a - * non-blocking socket. - * - * Throws: $(D_PSYMBOL SocketException) if unable to accept. - */ - ConnectedSocket accept() @trusted @nogc - { - SocketType sock; - - version (linux) - { - int flags; - if (!blocking) - { - flags |= SOCK_NONBLOCK; - } - sock = cast(SocketType).accept4(handle_, null, null, flags); - } - else - { - sock = cast(SocketType).accept(handle_, null, null); - } - - if (sock == SocketType.init) - { - if (wouldHaveBlocked()) - { - return null; - } - throw make!SocketException(defaultAllocator, - "Unable to accept socket connection"); - } - - auto newSocket = defaultAllocator.make!ConnectedSocket(sock, addressFamily); - - version (linux) - { // Blocking mode already set - } - else version (Posix) - { - if (!blocking) - { - try - { - newSocket.blocking = blocking; - } - catch (SocketException e) - { - defaultAllocator.dispose(newSocket); - throw e; - } - } - } - else version (Windows) - { // Inherits blocking mode - newSocket.blocking_ = blocking; - } - return newSocket; - } -} - -/** - * Socket returned if a connection has been established. - */ -class ConnectedSocket : Socket, ConnectionOrientedSocket -{ - /** - * $(D_KEYWORD true) if the stream socket peer has performed an orderly - * shutdown. - */ - protected bool disconnected_; - - /** - * Returns: $(D_KEYWORD true) if the stream socket peer has performed an orderly - * shutdown. - */ - @property inout(bool) disconnected() inout const pure nothrow @safe @nogc - { - return disconnected_; - } - - /** - * Create a socket. - * - * Params: - * handle = Socket. - * af = Address family. - */ - this(SocketType handle, AddressFamily af) @nogc - { - super(handle, af); - } - - version (Windows) - { - private static int capToMaxBuffer(size_t size) pure nothrow @safe @nogc - { - // Windows uses int instead of size_t for length arguments. - // Luckily, the send/recv functions make no guarantee that - // all the data is sent, so we use that to send at most - // int.max bytes. - return size > size_t (int.max) ? int.max : cast(int) size; - } - } - else - { - private static size_t capToMaxBuffer(size_t size) pure nothrow @safe @nogc - { - return size; - } - } - - /** - * Receive data on the connection. - * - * Params: - * buf = Buffer to save received data. - * flags = Flags. - * - * Returns: The number of bytes received or 0 if nothing received - * because the call would block. - * - * Throws: $(D_PSYMBOL SocketException) if unable to receive. - */ - ptrdiff_t receive(ubyte[] buf, Flags flags = Flag.none) @trusted @nogc - { - ptrdiff_t ret; - if (!buf.length) - { - return 0; - } - - ret = recv(handle_, buf.ptr, capToMaxBuffer(buf.length), cast(int) flags); - if (ret == 0) - { - disconnected_ = true; - } - else if (ret == socketError) - { - if (wouldHaveBlocked()) - { - return 0; - } - disconnected_ = true; - throw defaultAllocator.make!SocketException("Unable to receive"); - } - return ret; - } - - /** - * Send data on the connection. If the socket is blocking and there is no - * buffer space left, $(D_PSYMBOL send) waits, non-blocking socket returns - * 0 in this case. - * - * Params: - * buf = Data to be sent. - * flags = Flags. - * - * Returns: The number of bytes actually sent. - * - * Throws: $(D_PSYMBOL SocketException) if unable to send. - */ - ptrdiff_t send(const(ubyte)[] buf, Flags flags = Flag.none) - const @trusted @nogc - { - int sendFlags = cast(int) flags; - ptrdiff_t sent; - - static if (is(typeof(MSG_NOSIGNAL))) - { - sendFlags |= MSG_NOSIGNAL; - } - - sent = .send(handle_, buf.ptr, capToMaxBuffer(buf.length), sendFlags); - if (sent != socketError) - { - return sent; - } - else if (wouldHaveBlocked()) - { - return 0; - } - throw defaultAllocator.make!SocketException("Unable to send"); - } -} - -/** - * Checks if the last error is a serious error or just a special - * behaviour error of non-blocking sockets (for example an error - * returned because the socket would block or because the - * asynchronous operation was successfully started but not finished yet). - * - * Returns: $(D_KEYWORD false) if a serious error happened, $(D_KEYWORD true) - * otherwise. - */ -bool wouldHaveBlocked() nothrow @trusted @nogc -{ - version (Posix) - { - return errno == EAGAIN || errno == EWOULDBLOCK; - } - else version (Windows) - { - return WSAGetLastError() == ERROR_IO_PENDING - || WSAGetLastError() == WSAEWOULDBLOCK - || WSAGetLastError() == ERROR_IO_INCOMPLETE; - } -} - -/** - * Returns: Platform specific error code. - */ -private @property int lastError() nothrow @safe @nogc -{ - version (Windows) - { - return WSAGetLastError(); - } - else - { - return errno; - } -} diff --git a/tests/tanya/async/tests/loop.d b/tests/tanya/async/tests/loop.d deleted file mode 100644 index afee3bf..0000000 --- a/tests/tanya/async/tests/loop.d +++ /dev/null @@ -1,97 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -module tanya.async.tests.loop; - -import core.time; -import tanya.async.loop; -import tanya.async.watcher; -import tanya.memory.allocator; - -private final class DummyWatcher : Watcher -{ - bool invoked; - - override void invoke() @nogc - { - this.invoked = true; - } -} - -private final class TestLoop : Loop -{ - override protected bool reify(SocketWatcher watcher, - EventMask oldEvents, - EventMask events) @nogc - { - return true; - } - - override protected void poll() @nogc - { - assert(!this.done); - unloop(); - } - - override protected @property uint maxEvents() - const pure nothrow @safe @nogc - { - return 64U; - } - - @nogc @system unittest - { - auto loop = defaultAllocator.make!TestLoop; - assert(loop.blockTime == 1.dur!"minutes"); - - loop.blockTime = 2.dur!"minutes"; - assert(loop.blockTime == 2.dur!"minutes"); - - defaultAllocator.dispose(loop); - } - - @nogc @system unittest - { - auto loop = defaultAllocator.make!TestLoop; - assert(loop.done); - - loop.run(); - assert(loop.done); - - defaultAllocator.dispose(loop); - } - - @nogc @system unittest - { - auto loop = defaultAllocator.make!TestLoop; - auto watcher = defaultAllocator.make!DummyWatcher; - loop.pendings.insertBack(watcher); - - assert(!watcher.invoked); - loop.run(); - assert(watcher.invoked); - - defaultAllocator.dispose(loop); - defaultAllocator.dispose(watcher); - } -} - -@nogc @system unittest -{ - auto loop = defaultAllocator.make!TestLoop; - assert(loop.maxEvents == 64); - - defaultAllocator.dispose(loop); -} - -@nogc @system unittest -{ - auto oldLoop = defaultLoop; - auto loop = defaultAllocator.make!TestLoop; - - defaultLoop = loop; - assert(defaultLoop is loop); - - defaultLoop = oldLoop; - defaultAllocator.dispose(loop); -}