diff options
Diffstat (limited to 'source')
| -rw-r--r-- | source/tanya/async/event/epoll.d | 187 | ||||
| -rw-r--r-- | source/tanya/async/event/iocp.d | 390 | ||||
| -rw-r--r-- | source/tanya/async/event/kqueue.d | 331 | ||||
| -rw-r--r-- | source/tanya/async/event/selector.d | 407 | ||||
| -rw-r--r-- | source/tanya/async/iocp.d | 56 | ||||
| -rw-r--r-- | source/tanya/async/loop.d | 352 | ||||
| -rw-r--r-- | source/tanya/async/package.d | 20 | ||||
| -rw-r--r-- | source/tanya/async/protocol.d | 58 | ||||
| -rw-r--r-- | source/tanya/async/transport.d | 104 | ||||
| -rw-r--r-- | source/tanya/async/watcher.d | 117 | ||||
| -rw-r--r-- | source/tanya/net/package.d | 3 | ||||
| -rw-r--r-- | source/tanya/net/socket.d | 1258 |
12 files changed, 1 insertions, 3282 deletions
diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d deleted file mode 100644 index c81d6ff..0000000 --- a/source/tanya/async/event/epoll.d +++ /dev/null @@ -1,187 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Event loop implementation for Linux. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/epoll.d, - * tanya/async/event/epoll.d) - */ -module tanya.async.event.epoll; - -version (D_Ddoc) -{ -} -else version (linux): - -import core.stdc.errno; -public import core.sys.linux.epoll; -import core.sys.posix.unistd; -import core.time; -import std.algorithm.comparison; -import tanya.async.event.selector; -import tanya.async.loop; -import tanya.async.protocol; -import tanya.async.transport; -import tanya.async.watcher; -import tanya.container.array; -import tanya.memory.allocator; -import tanya.net.socket; - -extern (C) nothrow @nogc -{ - int epoll_create1(int flags); - int epoll_ctl (int epfd, int op, int fd, epoll_event *event); - int epoll_wait (int epfd, epoll_event *events, int maxevents, int timeout); -} - -final class EpollLoop : SelectorLoop -{ - protected int fd; - private Array!epoll_event events; - - /** - * Initializes the loop. - */ - this() @nogc - { - if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0) - { - throw defaultAllocator.make!BadLoopException("epoll initialization failed"); - } - super(); - events = Array!epoll_event(maxEvents); - } - - /** - * Frees loop internals. - */ - ~this() @nogc - { - close(fd); - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * watcher = Watcher. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - protected override bool reify(SocketWatcher watcher, - EventMask oldEvents, - EventMask events) @nogc - { - int op = EPOLL_CTL_DEL; - epoll_event ev; - - if (events == oldEvents) - { - return true; - } - if (events && oldEvents) - { - op = EPOLL_CTL_MOD; - } - else if (events && !oldEvents) - { - op = EPOLL_CTL_ADD; - } - - ev.data.fd = watcher.socket.handle; - ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0) - | (events & Event.write ? EPOLLOUT : 0) - | EPOLLET; - - return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0; - } - - /** - * Does the actual polling. - */ - protected override void poll() @nogc - { - // Don't block - immutable timeout = cast(immutable int) blockTime.total!"msecs"; - auto eventCount = epoll_wait(fd, events.get().ptr, maxEvents, timeout); - - if (eventCount < 0) - { - if (errno != EINTR) - { - throw defaultAllocator.make!BadLoopException(); - } - return; - } - - for (auto i = 0; i < eventCount; ++i) - { - auto transport = cast(StreamTransport) connections[events[i].data.fd]; - - if (transport is null) - { - auto connection = cast(ConnectionWatcher) connections[events[i].data.fd]; - assert(connection !is null); - - acceptConnections(connection); - } - else if (events[i].events & EPOLLERR) - { - kill(transport); - continue; - } - else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP)) - { - SocketException exception; - try - { - ptrdiff_t received; - do - { - received = transport.socket.receive(transport.output[]); - transport.output += received; - } - while (received); - } - catch (SocketException e) - { - exception = e; - } - if (transport.socket.disconnected) - { - kill(transport, exception); - continue; - } - else if (transport.output.length) - { - pendings.insertBack(transport); - } - } - if (events[i].events & EPOLLOUT) - { - transport.writeReady = true; - if (transport.input.length) - { - feed(transport); - } - } - } - } - - /** - * Returns: The blocking time. - */ - override protected @property inout(Duration) blockTime() - inout @safe pure nothrow - { - return min(super.blockTime, 1.dur!"seconds"); - } -} diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d deleted file mode 100644 index df8c8d6..0000000 --- a/source/tanya/async/event/iocp.d +++ /dev/null @@ -1,390 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Event loop implementation for Windows. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/iocp.d, - * tanya/async/event/iocp.d) - */ -module tanya.async.event.iocp; - -version (D_Ddoc) -{ -} -else version (Windows): - -import core.sys.windows.mswsock; -import core.sys.windows.winsock2; -import tanya.async.loop; -import tanya.async.protocol; -import tanya.async.transport; -import tanya.async.watcher; -import tanya.container.buffer; -import tanya.memory.allocator; -import tanya.network.socket; -import tanya.sys.windows.winbase; - -/** - * Transport for stream sockets. - */ -final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport -{ - private SocketException exception; - - private ReadBuffer!ubyte output; - - private WriteBuffer!ubyte input; - - private Protocol protocol_; - - private bool closing; - - /** - * Creates new completion port transport. - * - * Params: - * socket = Socket. - * - * Precondition: $(D_INLINECODE socket !is null) - */ - this(OverlappedConnectedSocket socket) @nogc - { - super(socket); - output = ReadBuffer!ubyte(8192, 1024); - input = WriteBuffer!ubyte(8192); - active = true; - } - - /** - * Returns: Socket. - * - * Postcondition: $(D_INLINECODE socket !is null) - */ - override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc - out (socket) - { - assert(socket !is null); - } - do - { - return cast(OverlappedConnectedSocket) socket_; - } - - /** - * Returns $(D_PARAM true) if the transport is closing or closed. - */ - bool isClosing() const pure nothrow @safe @nogc - { - return closing; - } - - /** - * Close the transport. - * - * Buffered data will be flushed. No more data will be received. - */ - void close() pure nothrow @safe @nogc - { - closing = true; - } - - /** - * Write some data to the transport. - * - * Params: - * data = Data to send. - */ - void write(ubyte[] data) @nogc - { - input ~= data; - } - - /** - * Returns: Application protocol. - */ - @property Protocol protocol() pure nothrow @safe @nogc - { - return protocol_; - } - - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop. - * - * Params: - * protocol = Application protocol. - * - * Precondition: $(D_INLINECODE protocol !is null) - */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc - in - { - assert(protocol !is null); - } - do - { - protocol_ = protocol; - } - - /** - * Invokes the watcher callback. - */ - override void invoke() @nogc - { - if (output.length) - { - immutable empty = input.length == 0; - protocol.received(output[0 .. $]); - output.clear(); - if (empty) - { - SocketState overlapped; - try - { - overlapped = defaultAllocator.make!SocketState; - socket.beginSend(input[], overlapped); - } - catch (SocketException e) - { - defaultAllocator.dispose(overlapped); - defaultAllocator.dispose(e); - } - } - } - else - { - protocol.disconnected(exception); - defaultAllocator.dispose(protocol_); - defaultAllocator.dispose(exception); - active = false; - } - } -} - -final class IOCPLoop : Loop -{ - protected HANDLE completionPort; - - protected OVERLAPPED overlap; - - /** - * Initializes the loop. - */ - this() @nogc - { - super(); - - completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0); - if (!completionPort) - { - throw make!BadLoopException(defaultAllocator, - "Creating completion port failed"); - } - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * watcher = Watcher. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - override protected bool reify(SocketWatcher watcher, - EventMask oldEvents, - EventMask events) @nogc - { - SocketState overlapped; - if (!(oldEvents & Event.accept) && (events & Event.accept)) - { - auto socket = cast(OverlappedStreamSocket) watcher.socket; - assert(socket !is null); - - if (CreateIoCompletionPort(cast(HANDLE) socket.handle, - completionPort, - cast(size_t) (cast(void*) watcher), - 0) !is completionPort) - { - return false; - } - - try - { - overlapped = defaultAllocator.make!SocketState; - socket.beginAccept(overlapped); - } - catch (SocketException e) - { - defaultAllocator.dispose(overlapped); - defaultAllocator.dispose(e); - return false; - } - } - if ((!(oldEvents & Event.read) && (events & Event.read)) - || (!(oldEvents & Event.write) && (events & Event.write))) - { - auto transport = cast(StreamTransport) watcher; - assert(transport !is null); - - if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle, - completionPort, - cast(size_t) (cast(void*) watcher), - 0) !is completionPort) - { - return false; - } - - // Begin to read - if (!(oldEvents & Event.read) && (events & Event.read)) - { - try - { - overlapped = defaultAllocator.make!SocketState; - transport.socket.beginReceive(transport.output[], overlapped); - } - catch (SocketException e) - { - defaultAllocator.dispose(overlapped); - defaultAllocator.dispose(e); - return false; - } - } - } - return true; - } - - private void kill(StreamTransport transport, - SocketException exception = null) @nogc - in - { - assert(transport !is null); - } - do - { - transport.socket.shutdown(); - defaultAllocator.dispose(transport.socket); - transport.exception = exception; - pendings.insertBack(transport); - } - - /** - * Does the actual polling. - */ - override protected void poll() @nogc - { - DWORD lpNumberOfBytes; - size_t key; - OVERLAPPED* overlap; - immutable timeout = cast(immutable int) blockTime.total!"msecs"; - - auto result = GetQueuedCompletionStatus(completionPort, - &lpNumberOfBytes, - &key, - &overlap, - timeout); - if (result == FALSE && overlap is null) - { - return; // Timeout - } - - enum size_t offset = size_t.sizeof * 2; - auto overlapped = cast(SocketState) ((cast(void*) overlap) - offset); - assert(overlapped !is null); - scope (failure) - { - defaultAllocator.dispose(overlapped); - } - - switch (overlapped.event) - { - case OverlappedSocketEvent.accept: - auto connection = cast(ConnectionWatcher) (cast(void*) key); - assert(connection !is null); - - auto listener = cast(OverlappedStreamSocket) connection.socket; - assert(listener !is null); - - auto socket = listener.endAccept(overlapped); - auto transport = defaultAllocator.make!StreamTransport(socket); - - connection.incoming.insertBack(transport); - - reify(transport, - EventMask(Event.none), - EventMask(Event.read | Event.write)); - - pendings.insertBack(connection); - listener.beginAccept(overlapped); - break; - case OverlappedSocketEvent.read: - auto transport = cast(StreamTransport) (cast(void*) key); - assert(transport !is null); - - if (!transport.active) - { - defaultAllocator.dispose(transport); - defaultAllocator.dispose(overlapped); - return; - } - - int received; - SocketException exception; - try - { - received = transport.socket.endReceive(overlapped); - } - catch (SocketException e) - { - exception = e; - } - if (transport.socket.disconnected) - { - // We want to get one last notification to destroy the watcher. - transport.socket.beginReceive(transport.output[], overlapped); - kill(transport, exception); - } - else if (received > 0) - { - immutable full = transport.output.free == received; - - transport.output += received; - // Receive was interrupted because the buffer is full. We have to continue. - if (full) - { - transport.socket.beginReceive(transport.output[], overlapped); - } - pendings.insertBack(transport); - } - break; - case OverlappedSocketEvent.write: - auto transport = cast(StreamTransport) (cast(void*) key); - assert(transport !is null); - - transport.input += transport.socket.endSend(overlapped); - if (transport.input.length > 0) - { - transport.socket.beginSend(transport.input[], overlapped); - } - else - { - transport.socket.beginReceive(transport.output[], overlapped); - if (transport.isClosing()) - { - kill(transport); - } - } - break; - default: - assert(false, "Unknown event"); - } - } -} diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d deleted file mode 100644 index b3c1826..0000000 --- a/source/tanya/async/event/kqueue.d +++ /dev/null @@ -1,331 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/* - * Event loop implementation for *BSD. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/kqueue.d, - * tanya/async/event/kqueue.d) - */ -module tanya.async.event.kqueue; - -version (D_Ddoc) -{ -} -else version (OSX) -{ - version = MacBSD; -} -else version (iOS) -{ - version = MacBSD; -} -else version (TVOS) -{ - version = MacBSD; -} -else version (WatchOS) -{ - version = MacBSD; -} -else version (FreeBSD) -{ - version = MacBSD; -} -else version (OpenBSD) -{ - version = MacBSD; -} -else version (DragonFlyBSD) -{ - version = MacBSD; -} - -version (MacBSD): - -import core.stdc.errno; -import core.sys.posix.time; // timespec -import core.sys.posix.unistd; -import core.time; -import tanya.algorithm.comparison; -import tanya.async.event.selector; -import tanya.async.loop; -import tanya.async.transport; -import tanya.async.watcher; -import tanya.container.array; -import tanya.memory.allocator; -import tanya.network.socket; - -void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) pure nothrow @nogc -{ - *kevp = kevent_t(args); -} - -enum : short -{ - EVFILT_READ = -1, - EVFILT_WRITE = -2, - EVFILT_AIO = -3, /* attached to aio requests */ - EVFILT_VNODE = -4, /* attached to vnodes */ - EVFILT_PROC = -5, /* attached to struct proc */ - EVFILT_SIGNAL = -6, /* attached to struct proc */ - EVFILT_TIMER = -7, /* timers */ - EVFILT_MACHPORT = -8, /* Mach portsets */ - EVFILT_FS = -9, /* filesystem events */ - EVFILT_USER = -10, /* User events */ - EVFILT_VM = -12, /* virtual memory events */ - EVFILT_SYSCOUNT = 11 -} - -struct kevent_t -{ - uintptr_t ident; // Identifier for this event - short filter; // Filter for event - ushort flags; - uint fflags; - intptr_t data; - void* udata; // Opaque user data identifier -} - -enum -{ - /* actions */ - EV_ADD = 0x0001, /* add event to kq (implies enable) */ - EV_DELETE = 0x0002, /* delete event from kq */ - EV_ENABLE = 0x0004, /* enable event */ - EV_DISABLE = 0x0008, /* disable event (not reported) */ - - /* flags */ - EV_ONESHOT = 0x0010, /* only report one occurrence */ - EV_CLEAR = 0x0020, /* clear event state after reporting */ - EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ - EV_DISPATCH = 0x0080, /* disable event after reporting */ - - EV_SYSFLAGS = 0xF000, /* reserved by system */ - EV_FLAG1 = 0x2000, /* filter-specific flag */ - - /* returned values */ - EV_EOF = 0x8000, /* EOF detected */ - EV_ERROR = 0x4000, /* error, data contains errno */ -} - -extern(C) int kqueue() nothrow @nogc; -extern(C) int kevent(int kq, const kevent_t *changelist, int nchanges, - kevent_t *eventlist, int nevents, const timespec *timeout) - nothrow @nogc; - -final class KqueueLoop : SelectorLoop -{ - protected int fd; - private Array!kevent_t events; - private Array!kevent_t changes; - private size_t changeCount; - - /** - * Returns: Maximal event count can be got at a time - * (should be supported by the backend). - */ - override protected @property uint maxEvents() - const pure nothrow @safe @nogc - { - return cast(uint) events.length; - } - - this() @nogc - { - super(); - - if ((fd = kqueue()) == -1) - { - throw make!BadLoopException(defaultAllocator, - "kqueue initialization failed"); - } - events = Array!kevent_t(64); - changes = Array!kevent_t(64); - } - - /** - * Frees loop internals. - */ - ~this() @nogc - { - close(fd); - } - - private void set(SocketType socket, short filter, ushort flags) @nogc - { - if (changes.length <= changeCount) - { - changes.length = changeCount + maxEvents; - } - EV_SET(&changes[changeCount], - cast(ulong) socket, - filter, - flags, - 0U, - 0, - null); - ++changeCount; - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * watcher = Watcher. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - override protected bool reify(SocketWatcher watcher, - EventMask oldEvents, - EventMask events) @nogc - { - if (events != oldEvents) - { - if (oldEvents & Event.read || oldEvents & Event.accept) - { - set(watcher.socket.handle, EVFILT_READ, EV_DELETE); - } - if (oldEvents & Event.write) - { - set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE); - } - } - if (events & (Event.read | events & Event.accept)) - { - set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE); - } - if (events & Event.write) - { - set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH); - } - return true; - } - - /** - * Does the actual polling. - */ - protected override void poll() @nogc - { - timespec ts; - blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec); - - if (changeCount > maxEvents) - { - events.length = changes.length; - } - - auto eventCount = kevent(fd, - changes.get().ptr, - cast(int) changeCount, - events.get().ptr, - maxEvents, - &ts); - changeCount = 0; - - if (eventCount < 0) - { - if (errno != EINTR) - { - throw defaultAllocator.make!BadLoopException(); - } - return; - } - - for (int i; i < eventCount; ++i) - { - assert(connections.length > events[i].ident); - - auto transport = cast(StreamTransport) connections[events[i].ident]; - // If it is a ConnectionWatcher. Accept connections. - if (transport is null) - { - auto connection = cast(ConnectionWatcher) connections[events[i].ident]; - assert(connection !is null); - - acceptConnections(connection); - } - else if (events[i].flags & EV_ERROR) - { - kill(transport); - } - else if (events[i].filter == EVFILT_READ) - { - SocketException exception; - try - { - ptrdiff_t received; - do - { - received = transport.socket.receive(transport.output[]); - transport.output += received; - } - while (received); - } - catch (SocketException e) - { - exception = e; - } - if (transport.socket.disconnected) - { - kill(transport, exception); - } - else if (transport.output.length) - { - pendings.insertBack(transport); - } - } - else if (events[i].filter == EVFILT_WRITE) - { - transport.writeReady = true; - if (transport.input.length) - { - feed(transport); - } - } - } - } - - /** - * Returns: The blocking time. - */ - override protected @property inout(Duration) blockTime() - inout @nogc @safe pure nothrow - { - return min(super.blockTime, 1.dur!"seconds"); - } - - /** - * If the transport couldn't send the data, the further sending should - * be handled by the event loop. - * - * Params: - * transport = Transport. - * exception = Exception thrown on sending. - * - * Returns: $(D_KEYWORD true) if the operation could be successfully - * completed or scheduled, $(D_KEYWORD false) otherwise (the - * transport will be destroyed then). - */ - protected override bool feed(StreamTransport transport, - SocketException exception = null) @nogc - { - if (!super.feed(transport, exception)) - { - return false; - } - if (!transport.writeReady) - { - set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH); - return true; - } - return false; - } -} diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d deleted file mode 100644 index f954e8c..0000000 --- a/source/tanya/async/event/selector.d +++ /dev/null @@ -1,407 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/* - * This module contains base implementations for reactor event loops. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/selector.d, - * tanya/async/event/selector.d) - */ -module tanya.async.event.selector; - -version (D_Ddoc) -{ -} -else version (Posix): - -import tanya.async.loop; -import tanya.async.protocol; -import tanya.async.transport; -import tanya.async.watcher; -import tanya.container.array; -import tanya.container.buffer; -import tanya.memory.allocator; -import tanya.net.socket; - -/** - * Transport for stream sockets. - */ -package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport -{ - private SelectorLoop loop; - - private SocketException exception; - - package ReadBuffer!ubyte output; - - package WriteBuffer!ubyte input; - - private Protocol protocol_; - - private bool closing; - - /// Received notification that the underlying socket is write-ready. - package bool writeReady; - - /** - * Params: - * loop = Event loop. - * socket = Socket. - * - * Precondition: $(D_INLINECODE loop !is null && socket !is null) - */ - this(SelectorLoop loop, ConnectedSocket socket) @nogc - in - { - assert(loop !is null); - } - do - { - super(socket); - this.loop = loop; - output = ReadBuffer!ubyte(8192, 1024); - input = WriteBuffer!ubyte(8192); - active = true; - } - - /** - * Returns: Socket. - * - * Postcondition: $(D_INLINECODE socket !is null) - */ - override @property ConnectedSocket socket() pure nothrow @safe @nogc - out (socket) - { - assert(socket !is null); - } - do - { - return cast(ConnectedSocket) socket_; - } - - private @property void socket(ConnectedSocket socket) - pure nothrow @safe @nogc - in - { - assert(socket !is null); - } - do - { - socket_ = socket; - } - - /** - * Returns: Application protocol. - */ - @property Protocol protocol() pure nothrow @safe @nogc - { - return protocol_; - } - - /** - * Switches the protocol. - * - * The protocol is deallocated by the event loop. - * - * Params: - * protocol = Application protocol. - * - * Precondition: $(D_INLINECODE protocol !is null) - */ - @property void protocol(Protocol protocol) pure nothrow @safe @nogc - in - { - assert(protocol !is null); - } - do - { - protocol_ = protocol; - } - - /** - * Returns $(D_PARAM true) if the transport is closing or closed. - */ - bool isClosing() const pure nothrow @safe @nogc - { - return closing; - } - - /** - * Close the transport. - * - * Buffered data will be flushed. No more data will be received. - */ - void close() @nogc - { - closing = true; - loop.reify(this, - EventMask(Event.read | Event.write), - EventMask(Event.write)); - } - - /** - * Invokes the watcher callback. - */ - override void invoke() @nogc - { - if (output.length) - { - protocol.received(output[0 .. $]); - output.clear(); - if (isClosing() && input.length == 0) - { - loop.kill(this); - } - } - else - { - protocol.disconnected(exception); - defaultAllocator.dispose(protocol_); - defaultAllocator.dispose(exception); - active = false; - } - } - - /** - * Write some data to the transport. - * - * Params: - * data = Data to send. - */ - void write(ubyte[] data) @nogc - { - if (!data.length) - { - return; - } - // Try to write if the socket is write ready. - if (writeReady) - { - ptrdiff_t sent; - SocketException exception; - try - { - sent = socket.send(data); - if (sent == 0) - { - writeReady = false; - } - } - catch (SocketException e) - { - writeReady = false; - exception = e; - } - if (sent < data.length) - { - input ~= data[sent..$]; - loop.feed(this, exception); - } - } - else - { - input ~= data; - } - } -} - -abstract class SelectorLoop : Loop -{ - /// Pending connections. - protected Array!SocketWatcher connections; - - this() @nogc - { - super(); - this.connections = Array!SocketWatcher(maxEvents); - } - - ~this() @nogc - { - foreach (ref connection; this.connections[]) - { - // We want to free only the transports. ConnectionWatcher are - // created by the user and should be freed by himself. - if (cast(StreamTransport) connection !is null) - { - defaultAllocator.dispose(connection); - } - } - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * watcher = Watcher. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - override abstract protected bool reify(SocketWatcher watcher, - EventMask oldEvents, - EventMask events) @nogc; - - /** - * Kills the watcher and closes the connection. - * - * Params: - * transport = Transport. - * exception = Occurred exception. - */ - protected void kill(StreamTransport transport, - SocketException exception = null) @nogc - in - { - assert(transport !is null); - } - do - { - transport.socket.shutdown(); - defaultAllocator.dispose(transport.socket); - transport.exception = exception; - pendings.insertBack(transport); - } - - /** - * If the transport couldn't send the data, the further sending should - * be handled by the event loop. - * - * Params: - * transport = Transport. - * exception = Exception thrown on sending. - * - * Returns: $(D_KEYWORD true) if the operation could be successfully - * completed or scheduled, $(D_KEYWORD false) otherwise (the - * transport will be destroyed then). - */ - protected bool feed(StreamTransport transport, - SocketException exception = null) @nogc - in - { - assert(transport !is null); - } - do - { - while (transport.input.length && transport.writeReady) - { - try - { - ptrdiff_t sent = transport.socket.send(transport.input[]); - if (sent == 0) - { - transport.writeReady = false; - } - else - { - transport.input += sent; - } - } - catch (SocketException e) - { - exception = e; - transport.writeReady = false; - } - } - if (exception !is null) - { - kill(transport, exception); - return false; - } - if (transport.input.length == 0 && transport.isClosing()) - { - kill(transport); - } - return true; - } - - /** - * Start watching. - * - * Params: - * watcher = Watcher. - */ - override void start(ConnectionWatcher watcher) @nogc - { - if (watcher.active) - { - return; - } - - if (connections.length <= watcher.socket) - { - connections.length = watcher.socket.handle + maxEvents / 2; - } - connections[watcher.socket.handle] = watcher; - - super.start(watcher); - } - - /** - * Accept incoming connections. - * - * Params: - * connection = Connection watcher ready to accept. - */ - package void acceptConnections(ConnectionWatcher connection) @nogc - in - { - assert(connection !is null); - } - do - { - while (true) - { - ConnectedSocket client; - try - { - client = (cast(StreamSocket) connection.socket).accept(); - } - catch (SocketException e) - { - defaultAllocator.dispose(e); - break; - } - if (client is null) - { - break; - } - - StreamTransport transport; - - if (connections.length > client.handle) - { - transport = cast(StreamTransport) connections[client.handle]; - } - else - { - connections.length = client.handle + maxEvents / 2; - } - if (transport is null) - { - transport = defaultAllocator.make!StreamTransport(this, client); - connections[client.handle] = transport; - } - else - { - transport.socket = client; - } - - reify(transport, - EventMask(Event.none), - EventMask(Event.read | Event.write)); - connection.incoming.insertBack(transport); - } - - if (!connection.incoming.empty) - { - pendings.insertBack(connection); - } - } -} diff --git a/source/tanya/async/iocp.d b/source/tanya/async/iocp.d deleted file mode 100644 index 156e7be..0000000 --- a/source/tanya/async/iocp.d +++ /dev/null @@ -1,56 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * This module provides API for Windows I/O Completion Ports. - * - * Note: Available only on Windows. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/iocp.d, - * tanya/async/iocp.d) - */ -module tanya.async.iocp; - -version (Windows) -{ - version = WindowsDoc; -} -else version (D_Ddoc) -{ - version = WindowsDoc; - version (Windows) - { - } - else - { - private struct OVERLAPPED - { - } - private alias HANDLE = void*; - } -} - -version (WindowsDoc): - -import core.sys.windows.winbase; - -/** - * Provides an extendable representation of a Win32 $(D_PSYMBOL OVERLAPPED) - * structure. - */ -class State -{ - /// For internal use by Windows API. - align(1) OVERLAPPED overlapped; - - /// File/socket handle. - HANDLE handle; - - /// For keeping events or event masks. - int event; -} diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d deleted file mode 100644 index 1d88575..0000000 --- a/source/tanya/async/loop.d +++ /dev/null @@ -1,352 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Interface for the event loop implementations and the default event loop
- * chooser.
- *
- * ---
- * import tanya.async;
- * import tanya.memory;
- * import tanya.network.socket;
- *
- * class EchoProtocol : TransmissionControlProtocol
- * {
- * private DuplexTransport transport;
- *
- * void received(in ubyte[] data) @nogc
- * {
- * ubyte[512] buffer;
- * buffer[0 .. data.length] = data;
- * transport.write(buffer[]);
- * }
- *
- * void connected(DuplexTransport transport) @nogc
- * {
- * this.transport = transport;
- * }
- *
- * void disconnected(SocketException e) @nogc
- * {
- * }
- * }
- *
- * void main()
- * {
- * auto address = address4("127.0.0.1");
- * auto endpoint = Endpoint(address.get, cast(ushort) 8192);
- *
- * version (Windows)
- * {
- * auto sock = defaultAllocator.make!OverlappedStreamSocket(AddressFamily.inet);
- * }
- * else
- * {
- * auto sock = defaultAllocator.make!StreamSocket(AddressFamily.inet);
- * sock.blocking = false;
- * }
- *
- * sock.bind(endpoint);
- * sock.listen(5);
- *
- * auto io = defaultAllocator.make!ConnectionWatcher(sock);
- * io.setProtocol!EchoProtocol;
- *
- * defaultLoop.start(io);
- * defaultLoop.run();
- *
- * sock.shutdown();
- * defaultAllocator.dispose(io);
- * defaultAllocator.dispose(sock);
- * }
- * ---
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/loop.d,
- * tanya/async/loop.d)
- */
-module tanya.async.loop;
-
-import core.time;
-import tanya.async.transport;
-import tanya.async.watcher;
-import tanya.bitmanip;
-import tanya.container.buffer;
-import tanya.container.list;
-import tanya.memory.allocator;
-import tanya.net.socket;
-
-version (DisableBackends)
-{
-}
-else version (D_Ddoc)
-{
-}
-else version (linux)
-{
- import tanya.async.event.epoll;
- version = Epoll;
-}
-else version (Windows)
-{
- import tanya.async.event.iocp;
- version = IOCP;
-}
-else version (OSX)
-{
- version = Kqueue;
-}
-else version (iOS)
-{
- version = Kqueue;
-}
-else version (FreeBSD)
-{
- version = Kqueue;
-}
-else version (OpenBSD)
-{
- version = Kqueue;
-}
-else version (DragonFlyBSD)
-{
- version = Kqueue;
-}
-
-/**
- * Events.
- */
-enum Event : uint
-{
- none = 0x00, /// No events.
- read = 0x01, /// Non-blocking read call.
- write = 0x02, /// Non-blocking write call.
- accept = 0x04, /// Connection made.
- error = 0x80000000, /// Sent when an error occurs.
-}
-
-alias EventMask = BitFlags!Event;
-
-/**
- * Event loop.
- */
-abstract class Loop
-{
- protected bool done = true;
-
- /// Pending watchers.
- protected DList!Watcher pendings;
-
- /**
- * Returns: Maximal event count can be got at a time
- * (should be supported by the backend).
- */
- protected @property uint maxEvents()
- const pure nothrow @safe @nogc
- {
- return 128U;
- }
-
- /**
- * Initializes the loop.
- */
- this() @nogc
- {
- }
-
- /**
- * Frees loop internals.
- */
- ~this() @nogc
- {
- for (; !this.pendings.empty; this.pendings.removeFront())
- {
- defaultAllocator.dispose(this.pendings.front);
- }
- }
-
- /**
- * Starts the loop.
- */
- void run() @nogc
- {
- this.done = false;
- do
- {
- poll();
-
- // Invoke pendings
- for (; !this.pendings.empty; this.pendings.removeFront())
- {
- this.pendings.front.invoke();
- }
- }
- while (!this.done);
- }
-
- /**
- * Break out of the loop.
- */
- void unloop() @safe pure nothrow @nogc
- {
- this.done = true;
- }
-
- /**
- * Start watching.
- *
- * Params:
- * watcher = Watcher.
- */
- void start(ConnectionWatcher watcher) @nogc
- {
- if (watcher.active)
- {
- return;
- }
- watcher.active = true;
-
- reify(watcher, EventMask(Event.none), EventMask(Event.accept));
- }
-
- /**
- * Stop watching.
- *
- * Params:
- * watcher = Watcher.
- */
- void stop(ConnectionWatcher watcher) @nogc
- {
- if (!watcher.active)
- {
- return;
- }
- watcher.active = false;
-
- reify(watcher, EventMask(Event.accept), EventMask(Event.none));
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- abstract protected bool reify(SocketWatcher watcher,
- EventMask oldEvents,
- EventMask events) @nogc;
-
- /**
- * Returns: The blocking time.
- */
- protected @property inout(Duration) blockTime()
- inout @safe pure nothrow @nogc
- {
- // Don't block if we have to do.
- return pendings.empty ? blockTime_ : Duration.zero;
- }
-
- /**
- * Sets the blocking time for IO watchers.
- *
- * Params:
- * blockTime = The blocking time. Cannot be larger than
- * $(D_PSYMBOL maxBlockTime).
- */
- protected @property void blockTime(in Duration blockTime) @safe pure nothrow @nogc
- in
- {
- assert(blockTime <= 1.dur!"hours", "Too long to wait.");
- assert(!blockTime.isNegative);
- }
- do
- {
- blockTime_ = blockTime;
- }
-
- /**
- * Does the actual polling.
- */
- abstract protected void poll() @nogc;
-
- /// Maximal block time.
- protected Duration blockTime_ = 1.dur!"minutes";
-}
-
-/**
- * Exception thrown on errors in the event loop.
- */
-class BadLoopException : Exception
-{
- /**
- * Params:
- * file = The file where the exception occurred.
- * line = The line number where the exception occurred.
- * next = The previous exception in the chain of exceptions, if any.
- */
- this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
- pure nothrow const @safe @nogc
- {
- super("Event loop cannot be initialized.", file, line, next);
- }
-}
-
-/**
- * Returns the event loop used by default. If an event loop wasn't set with
- * $(D_PSYMBOL defaultLoop) before, $(D_PSYMBOL defaultLoop) will try to
- * choose an event loop supported on the system.
- *
- * Returns: The default event loop.
- */
-@property Loop defaultLoop() @nogc
-{
- if (defaultLoop_ !is null)
- {
- return defaultLoop_;
- }
- version (Epoll)
- {
- defaultLoop_ = defaultAllocator.make!EpollLoop;
- }
- else version (IOCP)
- {
- defaultLoop_ = defaultAllocator.make!IOCPLoop;
- }
- else version (Kqueue)
- {
- import tanya.async.event.kqueue;
- defaultLoop_ = defaultAllocator.make!KqueueLoop;
- }
- return defaultLoop_;
-}
-
-/**
- * Sets the default event loop.
- *
- * This property makes it possible to implement your own backends or event
- * loops, for example, if the system is not supported or if you want to
- * extend the supported implementation. Just extend $(D_PSYMBOL Loop) and pass
- * your implementation to this property.
- *
- * Params:
- * loop = The event loop.
- */
-@property void defaultLoop(Loop loop) @nogc
-in
-{
- assert(loop !is null);
-}
-do
-{
- defaultLoop_ = loop;
-}
-
-private Loop defaultLoop_;
diff --git a/source/tanya/async/package.d b/source/tanya/async/package.d deleted file mode 100644 index 18f361e..0000000 --- a/source/tanya/async/package.d +++ /dev/null @@ -1,20 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * This package provides asynchronous capabilities.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/package.d,
- * tanya/async/package.d)
- */
-module tanya.async;
-
-public import tanya.async.loop;
-public import tanya.async.protocol;
-public import tanya.async.transport;
-public import tanya.async.watcher;
diff --git a/source/tanya/async/protocol.d b/source/tanya/async/protocol.d deleted file mode 100644 index c51dae5..0000000 --- a/source/tanya/async/protocol.d +++ /dev/null @@ -1,58 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * This module contains protocol which handle data in asynchronous
- * applications.
- *
- * When an event from the network arrives, a protocol method gets
- * called and can respond to the event.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/protocol.d,
- * tanya/async/protocol.d)
- */
-module tanya.async.protocol;
-
-import tanya.async.transport;
-import tanya.net.socket;
-
-/**
- * Common protocol interface.
- */
-interface Protocol
-{
- /**
- * Params:
- * data = Read data.
- */
- void received(in ubyte[] data) @nogc;
-
- /**
- * Called when a connection is made.
- *
- * Params:
- * transport = Protocol transport.
- */
- void connected(DuplexTransport transport) @nogc;
-
- /**
- * Called when a connection is lost.
- *
- * Params:
- * exception = $(D_PSYMBOL Exception) if an error caused
- * the disconnect, $(D_KEYWORD null) otherwise.
- */
- void disconnected(SocketException exception) @nogc;
-}
-
-/**
- * Interface for TCP.
- */
-interface TransmissionControlProtocol : Protocol
-{
-}
diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d deleted file mode 100644 index 87051e8..0000000 --- a/source/tanya/async/transport.d +++ /dev/null @@ -1,104 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * This module contains transports which are responsible for data dilvery
- * between two parties of an asynchronous communication.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/transport.d,
- * tanya/async/transport.d)
- */
-module tanya.async.transport;
-
-import tanya.async.protocol;
-import tanya.net.socket;
-
-/**
- * Base transport interface.
- */
-interface Transport
-{
-}
-
-/**
- * Interface for read-only transports.
- */
-interface ReadTransport : Transport
-{
-}
-
-/**
- * Interface for write-only transports.
- */
-interface WriteTransport : Transport
-{
- /**
- * Write some data to the transport.
- *
- * Params:
- * data = Data to send.
- */
- void write(ubyte[] data) @nogc;
-}
-
-/**
- * Represents a bidirectional transport.
- */
-interface DuplexTransport : ReadTransport, WriteTransport
-{
- /**
- * Returns: Application protocol.
- *
- * Postcondition: $(D_INLINECODE protocol !is null)
- */
- @property Protocol protocol() pure nothrow @safe @nogc
- out (protocol)
- {
- assert(protocol !is null);
- }
-
- /**
- * Switches the protocol.
- *
- * The protocol is deallocated by the event loop.
- *
- * Params:
- * protocol = Application protocol.
- *
- * Precondition: $(D_INLINECODE protocol !is null)
- */
- @property void protocol(Protocol protocol) pure nothrow @safe @nogc
- in
- {
- assert(protocol !is null);
- }
-
-
- /**
- * Returns $(D_PARAM true) if the transport is closing or closed.
- */
- bool isClosing() const pure nothrow @safe @nogc;
-
- /**
- * Close the transport.
- *
- * Buffered data will be flushed. No more data will be received.
- */
- void close() @nogc;
-}
-
-/**
- * Represents a socket transport.
- */
-interface SocketTransport : Transport
-{
- /**
- * Returns: Socket.
- */
- @property Socket socket() pure nothrow @safe @nogc;
-}
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d deleted file mode 100644 index 1159779..0000000 --- a/source/tanya/async/watcher.d +++ /dev/null @@ -1,117 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Watchers register user's interest in some event. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/watcher.d, - * tanya/async/watcher.d) - */ -module tanya.async.watcher; - -import tanya.async.loop; -import tanya.async.protocol; -import tanya.async.transport; -import tanya.container.buffer; -import tanya.container.list; -import tanya.memory.allocator; -import tanya.net.socket; - -/** - * A watcher is an opaque structure that you allocate and register to record - * your interest in some event. - */ -abstract class Watcher -{ - /// Whether the watcher is active. - bool active; - - /** - * Invoke some action on event. - */ - void invoke() @nogc; -} - -/** - * Socket watcher. - */ -abstract class SocketWatcher : Watcher -{ - /// Watched socket. - protected Socket socket_; - - /** - * Params: - * socket = Socket. - * - * Precondition: $(D_INLINECODE socket !is null) - */ - this(Socket socket) pure nothrow @safe @nogc - in - { - assert(socket !is null); - } - do - { - socket_ = socket; - } - - /** - * Returns: Socket. - */ - @property Socket socket() pure nothrow @safe @nogc - { - return socket_; - } -} - -/** - * Connection watcher. - */ -class ConnectionWatcher : SocketWatcher -{ - /// Incoming connection queue. - DList!DuplexTransport incoming; - - private Protocol delegate() @nogc protocolFactory; - - /** - * Params: - * socket = Socket. - */ - this(Socket socket) @nogc - { - super(socket); - } - - /** - * Params: - * P = Protocol should be used. - */ - void setProtocol(P : Protocol)() @nogc - { - this.protocolFactory = () @nogc => cast(Protocol) defaultAllocator.make!P; - } - - /** - * Invokes new connection callback. - */ - override void invoke() @nogc - in - { - assert(protocolFactory !is null, "Protocol isn't set."); - } - do - { - for (; !this.incoming.empty; this.incoming.removeFront()) - { - this.incoming.front.protocol = protocolFactory(); - this.incoming.front.protocol.connected(this.incoming.front); - } - } -} diff --git a/source/tanya/net/package.d b/source/tanya/net/package.d index 37ac2cc..9626b9e 100644 --- a/source/tanya/net/package.d +++ b/source/tanya/net/package.d @@ -5,7 +5,7 @@ /** * Network programming. * - * Copyright: Eugene Wissner 2017-2020. + * Copyright: Eugene Wissner 2017-2022. * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, * Mozilla Public License, v. 2.0). * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) @@ -17,5 +17,4 @@ module tanya.net; public import tanya.net.iface; public import tanya.net.inet; public import tanya.net.ip; -public import tanya.net.socket; public import tanya.net.uri; diff --git a/source/tanya/net/socket.d b/source/tanya/net/socket.d deleted file mode 100644 index 0bdb84a..0000000 --- a/source/tanya/net/socket.d +++ /dev/null @@ -1,1258 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Low-level socket programming. - * - * Current API supports only server-side TCP communication. - * - * For an example of an asynchronous server refer to the documentation of the - * $(D_PSYMBOL tanya.async.loop) module. - * - * Copyright: Eugene Wissner 2016-2020. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner) - * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/network/socket.d, - * tanya/network/socket.d) - */ -module tanya.net.socket; - -import core.stdc.errno; -import core.time; -public import std.socket : SocketOption, SocketOptionLevel; -import tanya.bitmanip; -import tanya.memory.allocator; -import tanya.meta.trait; -import tanya.os.error; -import tanya.net.iface; -import tanya.net.ip; - -/// Value returned by socket operations on error. -enum int socketError = -1; - -version (Posix) -{ - import core.stdc.errno; - import core.sys.posix.fcntl; - import core.sys.posix.netdb; - import core.sys.posix.netinet.in_; - import core.sys.posix.sys.socket; - import core.sys.posix.sys.time; - import core.sys.posix.unistd; - - enum SocketType : int - { - init = -1, - } - - private alias LingerField = int; -} -else version (Windows) -{ - import core.sys.windows.winbase; - import core.sys.windows.winerror; - import core.sys.windows.winsock2 : accept, - addrinfo, - bind, - closesocket, - FIONBIO, - freeaddrinfo, - getaddrinfo, - getsockopt, - ioctlsocket, - listen, - MSG_DONTROUTE, - MSG_OOB, - MSG_PEEK, - recv, - SD_BOTH, - SD_RECEIVE, - SD_SEND, - send, - setsockopt, - shutdown, - SO_TYPE, - SOCKADDR, - sockaddr, - sockaddr_in, - sockaddr_in6, - SOCKADDR_STORAGE, - socket, - socklen_t, - SOL_SOCKET, - WSAEWOULDBLOCK, - WSAGetLastError; - import tanya.async.iocp; - import tanya.sys.windows.def; - public import tanya.sys.windows.winbase; - public import tanya.sys.windows.winsock2; - - enum SocketType : size_t - { - init = ~0, - } - - private alias LingerField = ushort; - - enum OverlappedSocketEvent - { - accept = 1, - read = 2, - write = 3, - } - - class SocketState : State - { - private WSABUF buffer; - } - - /** - * Socket returned if a connection has been established. - * - * Note: Available only on Windows. - */ - class OverlappedConnectedSocket : ConnectedSocket - { - /** - * Create a socket. - * - * Params: - * handle = Socket handle. - * af = Address family. - */ - this(SocketType handle, AddressFamily af) @nogc - { - super(handle, af); - } - - /** - * Begins to asynchronously receive data from a connected socket. - * - * Params: - * buffer = Storage location for the received data. - * flags = Flags. - * overlapped = Unique operation identifier. - * - * Returns: $(D_KEYWORD true) if the operation could be finished synchronously. - * $(D_KEYWORD false) otherwise. - * - * Throws: $(D_PSYMBOL SocketException) if unable to receive. - */ - bool beginReceive(ubyte[] buffer, - SocketState overlapped, - Flags flags = Flags(Flag.none)) @nogc @trusted - { - auto receiveFlags = cast(DWORD) flags; - - overlapped.handle = cast(HANDLE) handle_; - overlapped.event = OverlappedSocketEvent.read; - overlapped.buffer.len = cast(ULONG) buffer.length; - overlapped.buffer.buf = cast(char*) buffer.ptr; - - auto result = WSARecv(handle_, - cast(WSABUF*) &overlapped.buffer, - 1u, - null, - &receiveFlags, - &overlapped.overlapped, - null); - - if (result == socketError && !wouldHaveBlocked) - { - throw defaultAllocator.make!SocketException("Unable to receive"); - } - return result == 0; - } - - /** - * Ends a pending asynchronous read. - * - * Params: - * overlapped = Unique operation identifier. - * - * Returns: Number of bytes received. - * - * Throws: $(D_PSYMBOL SocketException) if unable to receive. - * - * Postcondition: $(D_INLINECODE result >= 0). - */ - int endReceive(SocketState overlapped) @nogc @trusted - out (count) - { - assert(count >= 0); - } - do - { - DWORD lpNumber; - BOOL result = GetOverlappedResult(overlapped.handle, - &overlapped.overlapped, - &lpNumber, - FALSE); - if (result == FALSE && !wouldHaveBlocked) - { - disconnected_ = true; - throw defaultAllocator.make!SocketException("Unable to receive"); - } - if (lpNumber == 0) - { - disconnected_ = true; - } - return lpNumber; - } - - /** - * Sends data asynchronously to a connected socket. - * - * Params: - * buffer = Data to be sent. - * flags = Flags. - * overlapped = Unique operation identifier. - * - * Returns: $(D_KEYWORD true) if the operation could be finished synchronously. - * $(D_KEYWORD false) otherwise. - * - * Throws: $(D_PSYMBOL SocketException) if unable to send. - */ - bool beginSend(ubyte[] buffer, - SocketState overlapped, - Flags flags = Flags(Flag.none)) @nogc @trusted - { - overlapped.handle = cast(HANDLE) handle_; - overlapped.event = OverlappedSocketEvent.write; - overlapped.buffer.len = cast(ULONG) buffer.length; - overlapped.buffer.buf = cast(char*) buffer.ptr; - - auto result = WSASend(handle_, - &overlapped.buffer, - 1u, - null, - cast(DWORD) flags, - &overlapped.overlapped, - null); - - if (result == socketError && !wouldHaveBlocked) - { - disconnected_ = true; - throw defaultAllocator.make!SocketException("Unable to send"); - } - return result == 0; - } - - /** - * Ends a pending asynchronous send. - * - * Params: - * overlapped = Unique operation identifier. - * - * Returns: Number of bytes sent. - * - * Throws: $(D_PSYMBOL SocketException) if unable to receive. - * - * Postcondition: $(D_INLINECODE result >= 0). - */ - int endSend(SocketState overlapped) @nogc @trusted - out (count) - { - assert(count >= 0); - } - do - { - DWORD lpNumber; - BOOL result = GetOverlappedResult(overlapped.handle, - &overlapped.overlapped, - &lpNumber, - FALSE); - if (result == FALSE && !wouldHaveBlocked) - { - disconnected_ = true; - throw defaultAllocator.make!SocketException("Unable to receive"); - } - return lpNumber; - } - } - - /** - * Windows stream socket overlapped I/O. - */ - class OverlappedStreamSocket : StreamSocket - { - // Accept extension function pointer. - package LPFN_ACCEPTEX acceptExtension; - - /** - * Create a socket. - * - * Params: - * af = Address family. - * - * Throws: $(D_PSYMBOL SocketException) on errors. - */ - this(AddressFamily af) @nogc @trusted - { - super(af); - scope (failure) - { - this.close(); - } - blocking = false; - - GUID guidAcceptEx = WSAID_ACCEPTEX; - DWORD dwBytes; - - auto result = WSAIoctl(handle_, - SIO_GET_EXTENSION_FUNCTION_POINTER, - &guidAcceptEx, - guidAcceptEx.sizeof, - &acceptExtension, - acceptExtension.sizeof, - &dwBytes, - null, - null); - if (!result == socketError) - { - throw make!SocketException(defaultAllocator, - "Unable to retrieve an accept extension function pointer"); - } - } - - /** - * Begins an asynchronous operation to accept an incoming connection attempt. - * - * Params: - * overlapped = Unique operation identifier. - * - * Returns: $(D_KEYWORD true) if the operation could be finished synchronously. - * $(D_KEYWORD false) otherwise. - * - * Throws: $(D_PSYMBOL SocketException) on accept errors. - */ - bool beginAccept(SocketState overlapped) @nogc @trusted - { - auto socket = cast(SocketType) socket(addressFamily, 1, 0); - if (socket == SocketType.init) - { - throw defaultAllocator.make!SocketException("Unable to create socket"); - } - scope (failure) - { - closesocket(socket); - } - DWORD dwBytes; - overlapped.handle = cast(HANDLE) socket; - overlapped.event = OverlappedSocketEvent.accept; - - const len = (sockaddr_in.sizeof + 16) * 2; - overlapped.buffer.len = len; - overlapped.buffer.buf = cast(char*) defaultAllocator.allocate(len).ptr; - - // We don't want to get any data now, but only start to accept the connections - BOOL result = acceptExtension(handle_, - socket, - overlapped.buffer.buf, - 0u, - sockaddr_in.sizeof + 16, - sockaddr_in.sizeof + 16, - &dwBytes, - &overlapped.overlapped); - if (result == FALSE && !wouldHaveBlocked) - { - throw defaultAllocator.make!SocketException("Unable to accept socket connection"); - } - return result == TRUE; - } - - /** - * Asynchronously accepts an incoming connection attempt and creates a - * new socket to handle remote host communication. - * - * Params: - * overlapped = Unique operation identifier. - * - * Returns: Connected socket. - * - * Throws: $(D_PSYMBOL SocketException) if unable to accept. - */ - OverlappedConnectedSocket endAccept(SocketState overlapped) - @nogc @trusted - { - scope (exit) - { - defaultAllocator.dispose(overlapped.buffer.buf[0 .. overlapped.buffer.len]); - } - auto socket = make!OverlappedConnectedSocket(defaultAllocator, - cast(SocketType) overlapped.handle, - addressFamily); - scope (failure) - { - defaultAllocator.dispose(socket); - } - socket.setOption(SocketOptionLevel.SOCKET, - cast(SocketOption) SO_UPDATE_ACCEPT_CONTEXT, - cast(size_t) handle); - return socket; - } - } -} - -/** - * Socket option that specifies what should happen when the socket that - * promises reliable delivery still has untransmitted messages when - * it is closed. - */ -struct Linger -{ - /// If nonzero, $(D_PSYMBOL close) and $(D_PSYMBOL shutdown) block until - /// the data are transmitted or the timeout period has expired. - LingerField l_onoff; - - /// Time, in seconds to wait before any buffered data to be sent is - /// discarded. - LingerField l_linger; - - /** - * If $(D_PARAM timeout) is `0`, linger is disabled, otherwise enables the - * linger and sets the timeout. - * - * Params: - * timeout = Timeout, in seconds. - */ - this(const ushort timeout) - { - time = timeout; - } - - /// - unittest - { - { - auto linger = Linger(5); - assert(linger.enabled); - assert(linger.time == 5); - } - { - auto linger = Linger(0); - assert(!linger.enabled); - } - { // Default constructor. - Linger linger; - assert(!linger.enabled); - } - } - - /** - * System dependent constructor. - * - * Params: - * l_onoff = $(D_PSYMBOL l_onoff) value. - * l_linger = $(D_PSYMBOL l_linger) value. - */ - this(LingerField l_onoff, LingerField l_linger) - { - this.l_onoff = l_onoff; - this.l_linger = l_linger; - } - - /// - unittest - { - auto linger = Linger(1, 5); - assert(linger.l_onoff == 1); - assert(linger.l_linger == 5); - } - - /** - * Params: - * value = Whether to linger after the socket is closed. - * - * See_Also: $(D_PSYMBOL time). - */ - @property void enabled(const bool value) pure nothrow @safe @nogc - { - this.l_onoff = value; - } - - /** - * Returns: Whether to linger after the socket is closed. - */ - @property bool enabled() const pure nothrow @safe @nogc - { - return this.l_onoff != 0; - } - - /** - * Returns: Timeout period, in seconds, to wait before closing the socket - * if the $(D_PSYMBOL Linger) is $(D_PSYMBOL enabled). - */ - @property ushort time() const pure nothrow @safe @nogc - { - return this.l_linger & ushort.max; - } - - /** - * Sets timeout period, to wait before closing the socket if the - * $(D_PSYMBOL Linger) is $(D_PSYMBOL enabled), ignored otherwise. - * - * Params: - * timeout = Timeout period, in seconds. - */ - @property void time(const ushort timeout) pure nothrow @safe @nogc - { - this.l_onoff = timeout > 0; - this.l_linger = timeout; - } -} - -version (linux) -{ - enum SOCK_NONBLOCK = O_NONBLOCK; - extern(C) int accept4(int, sockaddr*, socklen_t*, int flags) @nogc nothrow; -} -else version (OSX) -{ - version = MacBSD; -} -else version (iOS) -{ - version = MacBSD; -} -else version (FreeBSD) -{ - version = MacBSD; -} -else version (OpenBSD) -{ - version = MacBSD; -} -else version (DragonFlyBSD) -{ - version = MacBSD; -} - -version (MacBSD) -{ - enum ESOCKTNOSUPPORT = 44; // Socket type not suppoted. -} - -private immutable -{ - typeof(&getaddrinfo) getaddrinfoPointer; - typeof(&freeaddrinfo) freeaddrinfoPointer; -} - -shared static this() -{ - version (Windows) - { - auto ws2Lib = GetModuleHandle("ws2_32.dll"); - - getaddrinfoPointer = cast(typeof(getaddrinfoPointer)) - GetProcAddress(ws2Lib, "getaddrinfo"); - freeaddrinfoPointer = cast(typeof(freeaddrinfoPointer)) - GetProcAddress(ws2Lib, "freeaddrinfo"); - } - else version (Posix) - { - getaddrinfoPointer = &getaddrinfo; - freeaddrinfoPointer = &freeaddrinfo; - } -} - -/** - * $(D_PSYMBOL SocketException) should be thrown only if one of the socket functions - * $(D_PSYMBOL socketError) and sets $(D_PSYMBOL errno), because - * $(D_PSYMBOL SocketException) relies on the $(D_PSYMBOL errno) value. - */ -class SocketException : Exception -{ - const ErrorCode.ErrorNo error = ErrorCode.ErrorNo.success; - - /** - * Params: - * msg = The message for the exception. - * file = The file where the exception occurred. - * line = The line number where the exception occurred. - * next = The previous exception in the chain of exceptions, if any. - */ - this(string msg, - string file = __FILE__, - size_t line = __LINE__, - Throwable next = null) @nogc @safe nothrow - { - super(msg, file, line, next); - - foreach (member; EnumMembers!(ErrorCode.ErrorNo)) - { - if (member == lastError) - { - error = member; - return; - } - } - if (lastError == ENOMEM) - { - error = ErrorCode.ErrorNo.noBufferSpace; - } - else if (lastError == EMFILE) - { - error = ErrorCode.ErrorNo.tooManyDescriptors; - } - else version (linux) - { - if (lastError == ENOSR) - { - error = ErrorCode.ErrorNo.networkDown; - } - } - else version (Posix) - { - if (lastError == EPROTO) - { - error = ErrorCode.ErrorNo.networkDown; - } - } - } -} - -/** - * Class for creating a network communication endpoint using the Berkeley - * sockets interfaces of different types. - */ -abstract class Socket -{ - version (Posix) - { - /** - * How a socket is shutdown. - */ - enum Shutdown : int - { - receive = SHUT_RD, /// Socket receives are disallowed - send = SHUT_WR, /// Socket sends are disallowed - both = SHUT_RDWR, /// Both receive and send - } - } - else version (Windows) - { - /// Property to get or set whether the socket is blocking or nonblocking. - private bool blocking_ = true; - - /** - * How a socket is shutdown. - */ - enum Shutdown : int - { - receive = SD_RECEIVE, /// Socket receives are disallowed. - send = SD_SEND, /// Socket sends are disallowed. - both = SD_BOTH, /// Both receive and send. - } - - // The WinSock timeouts seem to be effectively skewed by a constant - // offset of about half a second (in milliseconds). - private enum WINSOCK_TIMEOUT_SKEW = 500; - } - - /// Socket handle. - protected SocketType handle_; - - /// Address family. - protected AddressFamily family; - - private @property void handle(SocketType handle) @nogc - in - { - assert(handle != SocketType.init); - assert(handle_ == SocketType.init, "Socket handle cannot be changed"); - } - do - { - handle_ = handle; - - // Set the option to disable SIGPIPE on send() if the platform - // has it (e.g. on OS X). - static if (is(typeof(SO_NOSIGPIPE))) - { - setOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_NOSIGPIPE, true); - } - } - - @property inout(SocketType) handle() inout const pure nothrow @safe @nogc - { - return handle_; - } - - /** - * Create a socket. - * - * Params: - * handle = Socket. - * af = Address family. - */ - this(SocketType handle, AddressFamily af) @nogc - in - { - assert(handle != SocketType.init); - } - do - { - scope (failure) - { - this.close(); - } - this.handle = handle; - family = af; - } - - /** - * Closes the socket and calls the destructor on itself. - */ - ~this() nothrow @trusted @nogc - { - this.close(); - } - - /** - * Get a socket option. - * - * Params: - * level = Protocol level at that the option exists. - * option = Option. - * result = Buffer to save the result. - * - * Returns: The number of bytes written to $(D_PARAM result). - * - * Throws: $(D_PSYMBOL SocketException) on error. - */ - protected int getOption(SocketOptionLevel level, - SocketOption option, - void[] result) const @trusted @nogc - { - auto length = cast(socklen_t) result.length; - if (getsockopt(handle_, - cast(int) level, - cast(int) option, - result.ptr, - &length) == socketError) - { - throw defaultAllocator.make!SocketException("Unable to get socket option"); - } - return length; - } - - /// Ditto. - int getOption(SocketOptionLevel level, - SocketOption option, - out size_t result) const @trusted @nogc - { - return getOption(level, option, (&result)[0 .. 1]); - } - - /// Ditto. - int getOption(SocketOptionLevel level, - SocketOption option, - out Linger result) const @trusted @nogc - { - return getOption(level, option, (&result)[0 .. 1]); - } - - /// Ditto. - int getOption(SocketOptionLevel level, - SocketOption option, - out Duration result) const @trusted @nogc - { - // WinSock returns the timeout values as a milliseconds DWORD, - // while Linux and BSD return a timeval struct. - version (Posix) - { - timeval tv; - auto ret = getOption(level, option, (&tv)[0 .. 1]); - result = dur!"seconds"(tv.tv_sec) + dur!"usecs"(tv.tv_usec); - } - else version (Windows) - { - int msecs; - auto ret = getOption(level, option, (&msecs)[0 .. 1]); - if (option == SocketOption.RCVTIMEO) - { - msecs += WINSOCK_TIMEOUT_SKEW; - } - result = dur!"msecs"(msecs); - } - return ret; - } - - /** - * Set a socket option. - * - * Params: - * level = Protocol level at that the option exists. - * option = Option. - * value = Option value. - * - * Throws: $(D_PSYMBOL SocketException) on error. - */ - protected void setOption(SocketOptionLevel level, - SocketOption option, - void[] value) const @trusted @nogc - { - if (setsockopt(handle_, - cast(int)level, - cast(int)option, - value.ptr, - cast(uint) value.length) == socketError) - { - throw defaultAllocator.make!SocketException("Unable to set socket option"); - } - } - - /// Ditto. - void setOption(SocketOptionLevel level, SocketOption option, size_t value) - const @trusted @nogc - { - setOption(level, option, (&value)[0 .. 1]); - } - - /// Ditto. - void setOption(SocketOptionLevel level, SocketOption option, Linger value) - const @trusted @nogc - { - setOption(level, option, (&value)[0 .. 1]); - } - - /// Ditto. - void setOption(SocketOptionLevel level, SocketOption option, Duration value) - const @trusted @nogc - { - version (Posix) - { - timeval tv; - value.split!("seconds", "usecs")(tv.tv_sec, tv.tv_usec); - setOption(level, option, (&tv)[0 .. 1]); - } - else version (Windows) - { - auto msecs = cast(int) value.total!"msecs"; - if (msecs > 0 && option == SocketOption.RCVTIMEO) - { - msecs = max(1, msecs - WINSOCK_TIMEOUT_SKEW); - } - setOption(level, option, msecs); - } - } - - /** - * Returns: Socket's blocking flag. - */ - @property inout(bool) blocking() inout const nothrow @nogc - { - version (Posix) - { - return !(fcntl(handle_, F_GETFL, 0) & O_NONBLOCK); - } - else version (Windows) - { - return this.blocking_; - } - } - - /** - * Params: - * yes = Socket's blocking flag. - */ - @property void blocking(bool yes) @nogc - { - version (Posix) - { - int fl = fcntl(handle_, F_GETFL, 0); - - if (fl != socketError) - { - fl = yes ? fl & ~O_NONBLOCK : fl | O_NONBLOCK; - fl = fcntl(handle_, F_SETFL, fl); - } - if (fl == socketError) - { - throw make!SocketException(defaultAllocator, - "Unable to set socket blocking"); - } - } - else version (Windows) - { - uint num = !yes; - if (ioctlsocket(handle_, FIONBIO, &num) == socketError) - { - throw make!SocketException(defaultAllocator, - "Unable to set socket blocking"); - } - this.blocking_ = yes; - } - } - - /** - * Returns: The socket's address family. - */ - @property AddressFamily addressFamily() const @nogc @safe pure nothrow - { - return family; - } - - /** - * Returns: $(D_KEYWORD true) if this is a valid, alive socket. - */ - @property bool isAlive() @trusted const nothrow @nogc - { - int type; - socklen_t typesize = cast(socklen_t) type.sizeof; - return !getsockopt(handle_, SOL_SOCKET, SO_TYPE, cast(char*)&type, &typesize); - } - - /** - * Disables sends and/or receives. - * - * Params: - * how = What to disable. - * - * See_Also: - * $(D_PSYMBOL Shutdown) - */ - void shutdown(Shutdown how = Shutdown.both) @nogc @trusted const nothrow - { - .shutdown(handle_, cast(int)how); - } - - /** - * Immediately drop any connections and release socket resources. - * Calling $(D_PSYMBOL shutdown) before $(D_PSYMBOL close) is recommended - * for connection-oriented sockets. The $(D_PSYMBOL Socket) object is no - * longer usable after $(D_PSYMBOL close). - */ - void close() nothrow @trusted @nogc - { - version(Windows) - { - .closesocket(handle_); - } - else version(Posix) - { - .close(handle_); - } - handle_ = SocketType.init; - } - - /** - * Listen for an incoming connection. $(D_PSYMBOL bind) must be called before you - * can $(D_PSYMBOL listen). - * - * Params: - * backlog = Request of how many pending incoming connections are - * queued until $(D_PSYMBOL accept)ed. - */ - void listen(int backlog) const @trusted @nogc - { - if (.listen(handle_, backlog) == socketError) - { - throw defaultAllocator.make!SocketException("Unable to listen on socket"); - } - } - - /** - * Compare handles. - * - * Params: - * that = Another handle. - * - * Returns: Comparision result. - */ - int opCmp(size_t that) const pure nothrow @safe @nogc - { - return handle_ < that ? -1 : handle_ > that ? 1 : 0; - } -} - -/** - * Interface with common fileds for stream and connected sockets. - */ -interface ConnectionOrientedSocket -{ - /** - * Flags may be OR'ed together. - */ - enum Flag : int - { - /// No flags specified. - none = 0, - /// Out-of-band stream data. - outOfBand = MSG_OOB, - /// Peek at incoming data without removing it from the queue, only for receiving. - peek = MSG_PEEK, - /// Data should not be subject to routing; this flag may be ignored. Only for sending. - dontRoute = MSG_DONTROUTE, - } - - alias Flags = BitFlags!Flag; -} - -class StreamSocket : Socket, ConnectionOrientedSocket -{ - /** - * Create a socket. - * - * Params: - * af = Address family. - */ - this(AddressFamily af) @trusted @nogc - { - auto handle = cast(SocketType) socket(af, 1, 0); - if (handle == SocketType.init) - { - throw defaultAllocator.make!SocketException("Unable to create socket"); - } - super(handle, af); - } - - /** - * Associate a local address with this socket. - * - * Params: - * endpoint = Local address. - * - * Throws: $(D_PSYMBOL SocketException) if unable to bind. - */ - void bind(const Endpoint endpoint) const @nogc - { - if (.bind(handle_, cast(const(sockaddr)*) &endpoint, Endpoint.sizeof)) - { - throw defaultAllocator.make!SocketException("Unable to bind socket"); - } - } - - /** - * Accept an incoming connection. - * - * The blocking mode is always inherited. - * - * Returns: $(D_PSYMBOL Socket) for the accepted connection or - * $(D_KEYWORD null) if the call would block on a - * non-blocking socket. - * - * Throws: $(D_PSYMBOL SocketException) if unable to accept. - */ - ConnectedSocket accept() @trusted @nogc - { - SocketType sock; - - version (linux) - { - int flags; - if (!blocking) - { - flags |= SOCK_NONBLOCK; - } - sock = cast(SocketType).accept4(handle_, null, null, flags); - } - else - { - sock = cast(SocketType).accept(handle_, null, null); - } - - if (sock == SocketType.init) - { - if (wouldHaveBlocked()) - { - return null; - } - throw make!SocketException(defaultAllocator, - "Unable to accept socket connection"); - } - - auto newSocket = defaultAllocator.make!ConnectedSocket(sock, addressFamily); - - version (linux) - { // Blocking mode already set - } - else version (Posix) - { - if (!blocking) - { - try - { - newSocket.blocking = blocking; - } - catch (SocketException e) - { - defaultAllocator.dispose(newSocket); - throw e; - } - } - } - else version (Windows) - { // Inherits blocking mode - newSocket.blocking_ = blocking; - } - return newSocket; - } -} - -/** - * Socket returned if a connection has been established. - */ -class ConnectedSocket : Socket, ConnectionOrientedSocket -{ - /** - * $(D_KEYWORD true) if the stream socket peer has performed an orderly - * shutdown. - */ - protected bool disconnected_; - - /** - * Returns: $(D_KEYWORD true) if the stream socket peer has performed an orderly - * shutdown. - */ - @property inout(bool) disconnected() inout const pure nothrow @safe @nogc - { - return disconnected_; - } - - /** - * Create a socket. - * - * Params: - * handle = Socket. - * af = Address family. - */ - this(SocketType handle, AddressFamily af) @nogc - { - super(handle, af); - } - - version (Windows) - { - private static int capToMaxBuffer(size_t size) pure nothrow @safe @nogc - { - // Windows uses int instead of size_t for length arguments. - // Luckily, the send/recv functions make no guarantee that - // all the data is sent, so we use that to send at most - // int.max bytes. - return size > size_t (int.max) ? int.max : cast(int) size; - } - } - else - { - private static size_t capToMaxBuffer(size_t size) pure nothrow @safe @nogc - { - return size; - } - } - - /** - * Receive data on the connection. - * - * Params: - * buf = Buffer to save received data. - * flags = Flags. - * - * Returns: The number of bytes received or 0 if nothing received - * because the call would block. - * - * Throws: $(D_PSYMBOL SocketException) if unable to receive. - */ - ptrdiff_t receive(ubyte[] buf, Flags flags = Flag.none) @trusted @nogc - { - ptrdiff_t ret; - if (!buf.length) - { - return 0; - } - - ret = recv(handle_, buf.ptr, capToMaxBuffer(buf.length), cast(int) flags); - if (ret == 0) - { - disconnected_ = true; - } - else if (ret == socketError) - { - if (wouldHaveBlocked()) - { - return 0; - } - disconnected_ = true; - throw defaultAllocator.make!SocketException("Unable to receive"); - } - return ret; - } - - /** - * Send data on the connection. If the socket is blocking and there is no - * buffer space left, $(D_PSYMBOL send) waits, non-blocking socket returns - * 0 in this case. - * - * Params: - * buf = Data to be sent. - * flags = Flags. - * - * Returns: The number of bytes actually sent. - * - * Throws: $(D_PSYMBOL SocketException) if unable to send. - */ - ptrdiff_t send(const(ubyte)[] buf, Flags flags = Flag.none) - const @trusted @nogc - { - int sendFlags = cast(int) flags; - ptrdiff_t sent; - - static if (is(typeof(MSG_NOSIGNAL))) - { - sendFlags |= MSG_NOSIGNAL; - } - - sent = .send(handle_, buf.ptr, capToMaxBuffer(buf.length), sendFlags); - if (sent != socketError) - { - return sent; - } - else if (wouldHaveBlocked()) - { - return 0; - } - throw defaultAllocator.make!SocketException("Unable to send"); - } -} - -/** - * Checks if the last error is a serious error or just a special - * behaviour error of non-blocking sockets (for example an error - * returned because the socket would block or because the - * asynchronous operation was successfully started but not finished yet). - * - * Returns: $(D_KEYWORD false) if a serious error happened, $(D_KEYWORD true) - * otherwise. - */ -bool wouldHaveBlocked() nothrow @trusted @nogc -{ - version (Posix) - { - return errno == EAGAIN || errno == EWOULDBLOCK; - } - else version (Windows) - { - return WSAGetLastError() == ERROR_IO_PENDING - || WSAGetLastError() == WSAEWOULDBLOCK - || WSAGetLastError() == ERROR_IO_INCOMPLETE; - } -} - -/** - * Returns: Platform specific error code. - */ -private @property int lastError() nothrow @safe @nogc -{ - version (Windows) - { - return WSAGetLastError(); - } - else - { - return errno; - } -} |
