summaryrefslogtreecommitdiff
path: root/source
diff options
context:
space:
mode:
Diffstat (limited to 'source')
-rw-r--r--source/tanya/async/event/epoll.d187
-rw-r--r--source/tanya/async/event/iocp.d390
-rw-r--r--source/tanya/async/event/kqueue.d331
-rw-r--r--source/tanya/async/event/selector.d407
-rw-r--r--source/tanya/async/iocp.d56
-rw-r--r--source/tanya/async/loop.d352
-rw-r--r--source/tanya/async/package.d20
-rw-r--r--source/tanya/async/protocol.d58
-rw-r--r--source/tanya/async/transport.d104
-rw-r--r--source/tanya/async/watcher.d117
-rw-r--r--source/tanya/net/package.d3
-rw-r--r--source/tanya/net/socket.d1258
12 files changed, 1 insertions, 3282 deletions
diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d
deleted file mode 100644
index c81d6ff..0000000
--- a/source/tanya/async/event/epoll.d
+++ /dev/null
@@ -1,187 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Event loop implementation for Linux.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/epoll.d,
- * tanya/async/event/epoll.d)
- */
-module tanya.async.event.epoll;
-
-version (D_Ddoc)
-{
-}
-else version (linux):
-
-import core.stdc.errno;
-public import core.sys.linux.epoll;
-import core.sys.posix.unistd;
-import core.time;
-import std.algorithm.comparison;
-import tanya.async.event.selector;
-import tanya.async.loop;
-import tanya.async.protocol;
-import tanya.async.transport;
-import tanya.async.watcher;
-import tanya.container.array;
-import tanya.memory.allocator;
-import tanya.net.socket;
-
-extern (C) nothrow @nogc
-{
- int epoll_create1(int flags);
- int epoll_ctl (int epfd, int op, int fd, epoll_event *event);
- int epoll_wait (int epfd, epoll_event *events, int maxevents, int timeout);
-}
-
-final class EpollLoop : SelectorLoop
-{
- protected int fd;
- private Array!epoll_event events;
-
- /**
- * Initializes the loop.
- */
- this() @nogc
- {
- if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
- {
- throw defaultAllocator.make!BadLoopException("epoll initialization failed");
- }
- super();
- events = Array!epoll_event(maxEvents);
- }
-
- /**
- * Frees loop internals.
- */
- ~this() @nogc
- {
- close(fd);
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- protected override bool reify(SocketWatcher watcher,
- EventMask oldEvents,
- EventMask events) @nogc
- {
- int op = EPOLL_CTL_DEL;
- epoll_event ev;
-
- if (events == oldEvents)
- {
- return true;
- }
- if (events && oldEvents)
- {
- op = EPOLL_CTL_MOD;
- }
- else if (events && !oldEvents)
- {
- op = EPOLL_CTL_ADD;
- }
-
- ev.data.fd = watcher.socket.handle;
- ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
- | (events & Event.write ? EPOLLOUT : 0)
- | EPOLLET;
-
- return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0;
- }
-
- /**
- * Does the actual polling.
- */
- protected override void poll() @nogc
- {
- // Don't block
- immutable timeout = cast(immutable int) blockTime.total!"msecs";
- auto eventCount = epoll_wait(fd, events.get().ptr, maxEvents, timeout);
-
- if (eventCount < 0)
- {
- if (errno != EINTR)
- {
- throw defaultAllocator.make!BadLoopException();
- }
- return;
- }
-
- for (auto i = 0; i < eventCount; ++i)
- {
- auto transport = cast(StreamTransport) connections[events[i].data.fd];
-
- if (transport is null)
- {
- auto connection = cast(ConnectionWatcher) connections[events[i].data.fd];
- assert(connection !is null);
-
- acceptConnections(connection);
- }
- else if (events[i].events & EPOLLERR)
- {
- kill(transport);
- continue;
- }
- else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
- {
- SocketException exception;
- try
- {
- ptrdiff_t received;
- do
- {
- received = transport.socket.receive(transport.output[]);
- transport.output += received;
- }
- while (received);
- }
- catch (SocketException e)
- {
- exception = e;
- }
- if (transport.socket.disconnected)
- {
- kill(transport, exception);
- continue;
- }
- else if (transport.output.length)
- {
- pendings.insertBack(transport);
- }
- }
- if (events[i].events & EPOLLOUT)
- {
- transport.writeReady = true;
- if (transport.input.length)
- {
- feed(transport);
- }
- }
- }
- }
-
- /**
- * Returns: The blocking time.
- */
- override protected @property inout(Duration) blockTime()
- inout @safe pure nothrow
- {
- return min(super.blockTime, 1.dur!"seconds");
- }
-}
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d
deleted file mode 100644
index df8c8d6..0000000
--- a/source/tanya/async/event/iocp.d
+++ /dev/null
@@ -1,390 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Event loop implementation for Windows.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/iocp.d,
- * tanya/async/event/iocp.d)
- */
-module tanya.async.event.iocp;
-
-version (D_Ddoc)
-{
-}
-else version (Windows):
-
-import core.sys.windows.mswsock;
-import core.sys.windows.winsock2;
-import tanya.async.loop;
-import tanya.async.protocol;
-import tanya.async.transport;
-import tanya.async.watcher;
-import tanya.container.buffer;
-import tanya.memory.allocator;
-import tanya.network.socket;
-import tanya.sys.windows.winbase;
-
-/**
- * Transport for stream sockets.
- */
-final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
-{
- private SocketException exception;
-
- private ReadBuffer!ubyte output;
-
- private WriteBuffer!ubyte input;
-
- private Protocol protocol_;
-
- private bool closing;
-
- /**
- * Creates new completion port transport.
- *
- * Params:
- * socket = Socket.
- *
- * Precondition: $(D_INLINECODE socket !is null)
- */
- this(OverlappedConnectedSocket socket) @nogc
- {
- super(socket);
- output = ReadBuffer!ubyte(8192, 1024);
- input = WriteBuffer!ubyte(8192);
- active = true;
- }
-
- /**
- * Returns: Socket.
- *
- * Postcondition: $(D_INLINECODE socket !is null)
- */
- override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc
- out (socket)
- {
- assert(socket !is null);
- }
- do
- {
- return cast(OverlappedConnectedSocket) socket_;
- }
-
- /**
- * Returns $(D_PARAM true) if the transport is closing or closed.
- */
- bool isClosing() const pure nothrow @safe @nogc
- {
- return closing;
- }
-
- /**
- * Close the transport.
- *
- * Buffered data will be flushed. No more data will be received.
- */
- void close() pure nothrow @safe @nogc
- {
- closing = true;
- }
-
- /**
- * Write some data to the transport.
- *
- * Params:
- * data = Data to send.
- */
- void write(ubyte[] data) @nogc
- {
- input ~= data;
- }
-
- /**
- * Returns: Application protocol.
- */
- @property Protocol protocol() pure nothrow @safe @nogc
- {
- return protocol_;
- }
-
- /**
- * Switches the protocol.
- *
- * The protocol is deallocated by the event loop.
- *
- * Params:
- * protocol = Application protocol.
- *
- * Precondition: $(D_INLINECODE protocol !is null)
- */
- @property void protocol(Protocol protocol) pure nothrow @safe @nogc
- in
- {
- assert(protocol !is null);
- }
- do
- {
- protocol_ = protocol;
- }
-
- /**
- * Invokes the watcher callback.
- */
- override void invoke() @nogc
- {
- if (output.length)
- {
- immutable empty = input.length == 0;
- protocol.received(output[0 .. $]);
- output.clear();
- if (empty)
- {
- SocketState overlapped;
- try
- {
- overlapped = defaultAllocator.make!SocketState;
- socket.beginSend(input[], overlapped);
- }
- catch (SocketException e)
- {
- defaultAllocator.dispose(overlapped);
- defaultAllocator.dispose(e);
- }
- }
- }
- else
- {
- protocol.disconnected(exception);
- defaultAllocator.dispose(protocol_);
- defaultAllocator.dispose(exception);
- active = false;
- }
- }
-}
-
-final class IOCPLoop : Loop
-{
- protected HANDLE completionPort;
-
- protected OVERLAPPED overlap;
-
- /**
- * Initializes the loop.
- */
- this() @nogc
- {
- super();
-
- completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, 0);
- if (!completionPort)
- {
- throw make!BadLoopException(defaultAllocator,
- "Creating completion port failed");
- }
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- override protected bool reify(SocketWatcher watcher,
- EventMask oldEvents,
- EventMask events) @nogc
- {
- SocketState overlapped;
- if (!(oldEvents & Event.accept) && (events & Event.accept))
- {
- auto socket = cast(OverlappedStreamSocket) watcher.socket;
- assert(socket !is null);
-
- if (CreateIoCompletionPort(cast(HANDLE) socket.handle,
- completionPort,
- cast(size_t) (cast(void*) watcher),
- 0) !is completionPort)
- {
- return false;
- }
-
- try
- {
- overlapped = defaultAllocator.make!SocketState;
- socket.beginAccept(overlapped);
- }
- catch (SocketException e)
- {
- defaultAllocator.dispose(overlapped);
- defaultAllocator.dispose(e);
- return false;
- }
- }
- if ((!(oldEvents & Event.read) && (events & Event.read))
- || (!(oldEvents & Event.write) && (events & Event.write)))
- {
- auto transport = cast(StreamTransport) watcher;
- assert(transport !is null);
-
- if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
- completionPort,
- cast(size_t) (cast(void*) watcher),
- 0) !is completionPort)
- {
- return false;
- }
-
- // Begin to read
- if (!(oldEvents & Event.read) && (events & Event.read))
- {
- try
- {
- overlapped = defaultAllocator.make!SocketState;
- transport.socket.beginReceive(transport.output[], overlapped);
- }
- catch (SocketException e)
- {
- defaultAllocator.dispose(overlapped);
- defaultAllocator.dispose(e);
- return false;
- }
- }
- }
- return true;
- }
-
- private void kill(StreamTransport transport,
- SocketException exception = null) @nogc
- in
- {
- assert(transport !is null);
- }
- do
- {
- transport.socket.shutdown();
- defaultAllocator.dispose(transport.socket);
- transport.exception = exception;
- pendings.insertBack(transport);
- }
-
- /**
- * Does the actual polling.
- */
- override protected void poll() @nogc
- {
- DWORD lpNumberOfBytes;
- size_t key;
- OVERLAPPED* overlap;
- immutable timeout = cast(immutable int) blockTime.total!"msecs";
-
- auto result = GetQueuedCompletionStatus(completionPort,
- &lpNumberOfBytes,
- &key,
- &overlap,
- timeout);
- if (result == FALSE && overlap is null)
- {
- return; // Timeout
- }
-
- enum size_t offset = size_t.sizeof * 2;
- auto overlapped = cast(SocketState) ((cast(void*) overlap) - offset);
- assert(overlapped !is null);
- scope (failure)
- {
- defaultAllocator.dispose(overlapped);
- }
-
- switch (overlapped.event)
- {
- case OverlappedSocketEvent.accept:
- auto connection = cast(ConnectionWatcher) (cast(void*) key);
- assert(connection !is null);
-
- auto listener = cast(OverlappedStreamSocket) connection.socket;
- assert(listener !is null);
-
- auto socket = listener.endAccept(overlapped);
- auto transport = defaultAllocator.make!StreamTransport(socket);
-
- connection.incoming.insertBack(transport);
-
- reify(transport,
- EventMask(Event.none),
- EventMask(Event.read | Event.write));
-
- pendings.insertBack(connection);
- listener.beginAccept(overlapped);
- break;
- case OverlappedSocketEvent.read:
- auto transport = cast(StreamTransport) (cast(void*) key);
- assert(transport !is null);
-
- if (!transport.active)
- {
- defaultAllocator.dispose(transport);
- defaultAllocator.dispose(overlapped);
- return;
- }
-
- int received;
- SocketException exception;
- try
- {
- received = transport.socket.endReceive(overlapped);
- }
- catch (SocketException e)
- {
- exception = e;
- }
- if (transport.socket.disconnected)
- {
- // We want to get one last notification to destroy the watcher.
- transport.socket.beginReceive(transport.output[], overlapped);
- kill(transport, exception);
- }
- else if (received > 0)
- {
- immutable full = transport.output.free == received;
-
- transport.output += received;
- // Receive was interrupted because the buffer is full. We have to continue.
- if (full)
- {
- transport.socket.beginReceive(transport.output[], overlapped);
- }
- pendings.insertBack(transport);
- }
- break;
- case OverlappedSocketEvent.write:
- auto transport = cast(StreamTransport) (cast(void*) key);
- assert(transport !is null);
-
- transport.input += transport.socket.endSend(overlapped);
- if (transport.input.length > 0)
- {
- transport.socket.beginSend(transport.input[], overlapped);
- }
- else
- {
- transport.socket.beginReceive(transport.output[], overlapped);
- if (transport.isClosing())
- {
- kill(transport);
- }
- }
- break;
- default:
- assert(false, "Unknown event");
- }
- }
-}
diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d
deleted file mode 100644
index b3c1826..0000000
--- a/source/tanya/async/event/kqueue.d
+++ /dev/null
@@ -1,331 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/*
- * Event loop implementation for *BSD.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/kqueue.d,
- * tanya/async/event/kqueue.d)
- */
-module tanya.async.event.kqueue;
-
-version (D_Ddoc)
-{
-}
-else version (OSX)
-{
- version = MacBSD;
-}
-else version (iOS)
-{
- version = MacBSD;
-}
-else version (TVOS)
-{
- version = MacBSD;
-}
-else version (WatchOS)
-{
- version = MacBSD;
-}
-else version (FreeBSD)
-{
- version = MacBSD;
-}
-else version (OpenBSD)
-{
- version = MacBSD;
-}
-else version (DragonFlyBSD)
-{
- version = MacBSD;
-}
-
-version (MacBSD):
-
-import core.stdc.errno;
-import core.sys.posix.time; // timespec
-import core.sys.posix.unistd;
-import core.time;
-import tanya.algorithm.comparison;
-import tanya.async.event.selector;
-import tanya.async.loop;
-import tanya.async.transport;
-import tanya.async.watcher;
-import tanya.container.array;
-import tanya.memory.allocator;
-import tanya.network.socket;
-
-void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) pure nothrow @nogc
-{
- *kevp = kevent_t(args);
-}
-
-enum : short
-{
- EVFILT_READ = -1,
- EVFILT_WRITE = -2,
- EVFILT_AIO = -3, /* attached to aio requests */
- EVFILT_VNODE = -4, /* attached to vnodes */
- EVFILT_PROC = -5, /* attached to struct proc */
- EVFILT_SIGNAL = -6, /* attached to struct proc */
- EVFILT_TIMER = -7, /* timers */
- EVFILT_MACHPORT = -8, /* Mach portsets */
- EVFILT_FS = -9, /* filesystem events */
- EVFILT_USER = -10, /* User events */
- EVFILT_VM = -12, /* virtual memory events */
- EVFILT_SYSCOUNT = 11
-}
-
-struct kevent_t
-{
- uintptr_t ident; // Identifier for this event
- short filter; // Filter for event
- ushort flags;
- uint fflags;
- intptr_t data;
- void* udata; // Opaque user data identifier
-}
-
-enum
-{
- /* actions */
- EV_ADD = 0x0001, /* add event to kq (implies enable) */
- EV_DELETE = 0x0002, /* delete event from kq */
- EV_ENABLE = 0x0004, /* enable event */
- EV_DISABLE = 0x0008, /* disable event (not reported) */
-
- /* flags */
- EV_ONESHOT = 0x0010, /* only report one occurrence */
- EV_CLEAR = 0x0020, /* clear event state after reporting */
- EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
- EV_DISPATCH = 0x0080, /* disable event after reporting */
-
- EV_SYSFLAGS = 0xF000, /* reserved by system */
- EV_FLAG1 = 0x2000, /* filter-specific flag */
-
- /* returned values */
- EV_EOF = 0x8000, /* EOF detected */
- EV_ERROR = 0x4000, /* error, data contains errno */
-}
-
-extern(C) int kqueue() nothrow @nogc;
-extern(C) int kevent(int kq, const kevent_t *changelist, int nchanges,
- kevent_t *eventlist, int nevents, const timespec *timeout)
- nothrow @nogc;
-
-final class KqueueLoop : SelectorLoop
-{
- protected int fd;
- private Array!kevent_t events;
- private Array!kevent_t changes;
- private size_t changeCount;
-
- /**
- * Returns: Maximal event count can be got at a time
- * (should be supported by the backend).
- */
- override protected @property uint maxEvents()
- const pure nothrow @safe @nogc
- {
- return cast(uint) events.length;
- }
-
- this() @nogc
- {
- super();
-
- if ((fd = kqueue()) == -1)
- {
- throw make!BadLoopException(defaultAllocator,
- "kqueue initialization failed");
- }
- events = Array!kevent_t(64);
- changes = Array!kevent_t(64);
- }
-
- /**
- * Frees loop internals.
- */
- ~this() @nogc
- {
- close(fd);
- }
-
- private void set(SocketType socket, short filter, ushort flags) @nogc
- {
- if (changes.length <= changeCount)
- {
- changes.length = changeCount + maxEvents;
- }
- EV_SET(&changes[changeCount],
- cast(ulong) socket,
- filter,
- flags,
- 0U,
- 0,
- null);
- ++changeCount;
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- override protected bool reify(SocketWatcher watcher,
- EventMask oldEvents,
- EventMask events) @nogc
- {
- if (events != oldEvents)
- {
- if (oldEvents & Event.read || oldEvents & Event.accept)
- {
- set(watcher.socket.handle, EVFILT_READ, EV_DELETE);
- }
- if (oldEvents & Event.write)
- {
- set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE);
- }
- }
- if (events & (Event.read | events & Event.accept))
- {
- set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE);
- }
- if (events & Event.write)
- {
- set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH);
- }
- return true;
- }
-
- /**
- * Does the actual polling.
- */
- protected override void poll() @nogc
- {
- timespec ts;
- blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec);
-
- if (changeCount > maxEvents)
- {
- events.length = changes.length;
- }
-
- auto eventCount = kevent(fd,
- changes.get().ptr,
- cast(int) changeCount,
- events.get().ptr,
- maxEvents,
- &ts);
- changeCount = 0;
-
- if (eventCount < 0)
- {
- if (errno != EINTR)
- {
- throw defaultAllocator.make!BadLoopException();
- }
- return;
- }
-
- for (int i; i < eventCount; ++i)
- {
- assert(connections.length > events[i].ident);
-
- auto transport = cast(StreamTransport) connections[events[i].ident];
- // If it is a ConnectionWatcher. Accept connections.
- if (transport is null)
- {
- auto connection = cast(ConnectionWatcher) connections[events[i].ident];
- assert(connection !is null);
-
- acceptConnections(connection);
- }
- else if (events[i].flags & EV_ERROR)
- {
- kill(transport);
- }
- else if (events[i].filter == EVFILT_READ)
- {
- SocketException exception;
- try
- {
- ptrdiff_t received;
- do
- {
- received = transport.socket.receive(transport.output[]);
- transport.output += received;
- }
- while (received);
- }
- catch (SocketException e)
- {
- exception = e;
- }
- if (transport.socket.disconnected)
- {
- kill(transport, exception);
- }
- else if (transport.output.length)
- {
- pendings.insertBack(transport);
- }
- }
- else if (events[i].filter == EVFILT_WRITE)
- {
- transport.writeReady = true;
- if (transport.input.length)
- {
- feed(transport);
- }
- }
- }
- }
-
- /**
- * Returns: The blocking time.
- */
- override protected @property inout(Duration) blockTime()
- inout @nogc @safe pure nothrow
- {
- return min(super.blockTime, 1.dur!"seconds");
- }
-
- /**
- * If the transport couldn't send the data, the further sending should
- * be handled by the event loop.
- *
- * Params:
- * transport = Transport.
- * exception = Exception thrown on sending.
- *
- * Returns: $(D_KEYWORD true) if the operation could be successfully
- * completed or scheduled, $(D_KEYWORD false) otherwise (the
- * transport will be destroyed then).
- */
- protected override bool feed(StreamTransport transport,
- SocketException exception = null) @nogc
- {
- if (!super.feed(transport, exception))
- {
- return false;
- }
- if (!transport.writeReady)
- {
- set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH);
- return true;
- }
- return false;
- }
-}
diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d
deleted file mode 100644
index f954e8c..0000000
--- a/source/tanya/async/event/selector.d
+++ /dev/null
@@ -1,407 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/*
- * This module contains base implementations for reactor event loops.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/event/selector.d,
- * tanya/async/event/selector.d)
- */
-module tanya.async.event.selector;
-
-version (D_Ddoc)
-{
-}
-else version (Posix):
-
-import tanya.async.loop;
-import tanya.async.protocol;
-import tanya.async.transport;
-import tanya.async.watcher;
-import tanya.container.array;
-import tanya.container.buffer;
-import tanya.memory.allocator;
-import tanya.net.socket;
-
-/**
- * Transport for stream sockets.
- */
-package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
-{
- private SelectorLoop loop;
-
- private SocketException exception;
-
- package ReadBuffer!ubyte output;
-
- package WriteBuffer!ubyte input;
-
- private Protocol protocol_;
-
- private bool closing;
-
- /// Received notification that the underlying socket is write-ready.
- package bool writeReady;
-
- /**
- * Params:
- * loop = Event loop.
- * socket = Socket.
- *
- * Precondition: $(D_INLINECODE loop !is null && socket !is null)
- */
- this(SelectorLoop loop, ConnectedSocket socket) @nogc
- in
- {
- assert(loop !is null);
- }
- do
- {
- super(socket);
- this.loop = loop;
- output = ReadBuffer!ubyte(8192, 1024);
- input = WriteBuffer!ubyte(8192);
- active = true;
- }
-
- /**
- * Returns: Socket.
- *
- * Postcondition: $(D_INLINECODE socket !is null)
- */
- override @property ConnectedSocket socket() pure nothrow @safe @nogc
- out (socket)
- {
- assert(socket !is null);
- }
- do
- {
- return cast(ConnectedSocket) socket_;
- }
-
- private @property void socket(ConnectedSocket socket)
- pure nothrow @safe @nogc
- in
- {
- assert(socket !is null);
- }
- do
- {
- socket_ = socket;
- }
-
- /**
- * Returns: Application protocol.
- */
- @property Protocol protocol() pure nothrow @safe @nogc
- {
- return protocol_;
- }
-
- /**
- * Switches the protocol.
- *
- * The protocol is deallocated by the event loop.
- *
- * Params:
- * protocol = Application protocol.
- *
- * Precondition: $(D_INLINECODE protocol !is null)
- */
- @property void protocol(Protocol protocol) pure nothrow @safe @nogc
- in
- {
- assert(protocol !is null);
- }
- do
- {
- protocol_ = protocol;
- }
-
- /**
- * Returns $(D_PARAM true) if the transport is closing or closed.
- */
- bool isClosing() const pure nothrow @safe @nogc
- {
- return closing;
- }
-
- /**
- * Close the transport.
- *
- * Buffered data will be flushed. No more data will be received.
- */
- void close() @nogc
- {
- closing = true;
- loop.reify(this,
- EventMask(Event.read | Event.write),
- EventMask(Event.write));
- }
-
- /**
- * Invokes the watcher callback.
- */
- override void invoke() @nogc
- {
- if (output.length)
- {
- protocol.received(output[0 .. $]);
- output.clear();
- if (isClosing() && input.length == 0)
- {
- loop.kill(this);
- }
- }
- else
- {
- protocol.disconnected(exception);
- defaultAllocator.dispose(protocol_);
- defaultAllocator.dispose(exception);
- active = false;
- }
- }
-
- /**
- * Write some data to the transport.
- *
- * Params:
- * data = Data to send.
- */
- void write(ubyte[] data) @nogc
- {
- if (!data.length)
- {
- return;
- }
- // Try to write if the socket is write ready.
- if (writeReady)
- {
- ptrdiff_t sent;
- SocketException exception;
- try
- {
- sent = socket.send(data);
- if (sent == 0)
- {
- writeReady = false;
- }
- }
- catch (SocketException e)
- {
- writeReady = false;
- exception = e;
- }
- if (sent < data.length)
- {
- input ~= data[sent..$];
- loop.feed(this, exception);
- }
- }
- else
- {
- input ~= data;
- }
- }
-}
-
-abstract class SelectorLoop : Loop
-{
- /// Pending connections.
- protected Array!SocketWatcher connections;
-
- this() @nogc
- {
- super();
- this.connections = Array!SocketWatcher(maxEvents);
- }
-
- ~this() @nogc
- {
- foreach (ref connection; this.connections[])
- {
- // We want to free only the transports. ConnectionWatcher are
- // created by the user and should be freed by himself.
- if (cast(StreamTransport) connection !is null)
- {
- defaultAllocator.dispose(connection);
- }
- }
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- override abstract protected bool reify(SocketWatcher watcher,
- EventMask oldEvents,
- EventMask events) @nogc;
-
- /**
- * Kills the watcher and closes the connection.
- *
- * Params:
- * transport = Transport.
- * exception = Occurred exception.
- */
- protected void kill(StreamTransport transport,
- SocketException exception = null) @nogc
- in
- {
- assert(transport !is null);
- }
- do
- {
- transport.socket.shutdown();
- defaultAllocator.dispose(transport.socket);
- transport.exception = exception;
- pendings.insertBack(transport);
- }
-
- /**
- * If the transport couldn't send the data, the further sending should
- * be handled by the event loop.
- *
- * Params:
- * transport = Transport.
- * exception = Exception thrown on sending.
- *
- * Returns: $(D_KEYWORD true) if the operation could be successfully
- * completed or scheduled, $(D_KEYWORD false) otherwise (the
- * transport will be destroyed then).
- */
- protected bool feed(StreamTransport transport,
- SocketException exception = null) @nogc
- in
- {
- assert(transport !is null);
- }
- do
- {
- while (transport.input.length && transport.writeReady)
- {
- try
- {
- ptrdiff_t sent = transport.socket.send(transport.input[]);
- if (sent == 0)
- {
- transport.writeReady = false;
- }
- else
- {
- transport.input += sent;
- }
- }
- catch (SocketException e)
- {
- exception = e;
- transport.writeReady = false;
- }
- }
- if (exception !is null)
- {
- kill(transport, exception);
- return false;
- }
- if (transport.input.length == 0 && transport.isClosing())
- {
- kill(transport);
- }
- return true;
- }
-
- /**
- * Start watching.
- *
- * Params:
- * watcher = Watcher.
- */
- override void start(ConnectionWatcher watcher) @nogc
- {
- if (watcher.active)
- {
- return;
- }
-
- if (connections.length <= watcher.socket)
- {
- connections.length = watcher.socket.handle + maxEvents / 2;
- }
- connections[watcher.socket.handle] = watcher;
-
- super.start(watcher);
- }
-
- /**
- * Accept incoming connections.
- *
- * Params:
- * connection = Connection watcher ready to accept.
- */
- package void acceptConnections(ConnectionWatcher connection) @nogc
- in
- {
- assert(connection !is null);
- }
- do
- {
- while (true)
- {
- ConnectedSocket client;
- try
- {
- client = (cast(StreamSocket) connection.socket).accept();
- }
- catch (SocketException e)
- {
- defaultAllocator.dispose(e);
- break;
- }
- if (client is null)
- {
- break;
- }
-
- StreamTransport transport;
-
- if (connections.length > client.handle)
- {
- transport = cast(StreamTransport) connections[client.handle];
- }
- else
- {
- connections.length = client.handle + maxEvents / 2;
- }
- if (transport is null)
- {
- transport = defaultAllocator.make!StreamTransport(this, client);
- connections[client.handle] = transport;
- }
- else
- {
- transport.socket = client;
- }
-
- reify(transport,
- EventMask(Event.none),
- EventMask(Event.read | Event.write));
- connection.incoming.insertBack(transport);
- }
-
- if (!connection.incoming.empty)
- {
- pendings.insertBack(connection);
- }
- }
-}
diff --git a/source/tanya/async/iocp.d b/source/tanya/async/iocp.d
deleted file mode 100644
index 156e7be..0000000
--- a/source/tanya/async/iocp.d
+++ /dev/null
@@ -1,56 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * This module provides API for Windows I/O Completion Ports.
- *
- * Note: Available only on Windows.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/iocp.d,
- * tanya/async/iocp.d)
- */
-module tanya.async.iocp;
-
-version (Windows)
-{
- version = WindowsDoc;
-}
-else version (D_Ddoc)
-{
- version = WindowsDoc;
- version (Windows)
- {
- }
- else
- {
- private struct OVERLAPPED
- {
- }
- private alias HANDLE = void*;
- }
-}
-
-version (WindowsDoc):
-
-import core.sys.windows.winbase;
-
-/**
- * Provides an extendable representation of a Win32 $(D_PSYMBOL OVERLAPPED)
- * structure.
- */
-class State
-{
- /// For internal use by Windows API.
- align(1) OVERLAPPED overlapped;
-
- /// File/socket handle.
- HANDLE handle;
-
- /// For keeping events or event masks.
- int event;
-}
diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d
deleted file mode 100644
index 1d88575..0000000
--- a/source/tanya/async/loop.d
+++ /dev/null
@@ -1,352 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Interface for the event loop implementations and the default event loop
- * chooser.
- *
- * ---
- * import tanya.async;
- * import tanya.memory;
- * import tanya.network.socket;
- *
- * class EchoProtocol : TransmissionControlProtocol
- * {
- * private DuplexTransport transport;
- *
- * void received(in ubyte[] data) @nogc
- * {
- * ubyte[512] buffer;
- * buffer[0 .. data.length] = data;
- * transport.write(buffer[]);
- * }
- *
- * void connected(DuplexTransport transport) @nogc
- * {
- * this.transport = transport;
- * }
- *
- * void disconnected(SocketException e) @nogc
- * {
- * }
- * }
- *
- * void main()
- * {
- * auto address = address4("127.0.0.1");
- * auto endpoint = Endpoint(address.get, cast(ushort) 8192);
- *
- * version (Windows)
- * {
- * auto sock = defaultAllocator.make!OverlappedStreamSocket(AddressFamily.inet);
- * }
- * else
- * {
- * auto sock = defaultAllocator.make!StreamSocket(AddressFamily.inet);
- * sock.blocking = false;
- * }
- *
- * sock.bind(endpoint);
- * sock.listen(5);
- *
- * auto io = defaultAllocator.make!ConnectionWatcher(sock);
- * io.setProtocol!EchoProtocol;
- *
- * defaultLoop.start(io);
- * defaultLoop.run();
- *
- * sock.shutdown();
- * defaultAllocator.dispose(io);
- * defaultAllocator.dispose(sock);
- * }
- * ---
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/loop.d,
- * tanya/async/loop.d)
- */
-module tanya.async.loop;
-
-import core.time;
-import tanya.async.transport;
-import tanya.async.watcher;
-import tanya.bitmanip;
-import tanya.container.buffer;
-import tanya.container.list;
-import tanya.memory.allocator;
-import tanya.net.socket;
-
-version (DisableBackends)
-{
-}
-else version (D_Ddoc)
-{
-}
-else version (linux)
-{
- import tanya.async.event.epoll;
- version = Epoll;
-}
-else version (Windows)
-{
- import tanya.async.event.iocp;
- version = IOCP;
-}
-else version (OSX)
-{
- version = Kqueue;
-}
-else version (iOS)
-{
- version = Kqueue;
-}
-else version (FreeBSD)
-{
- version = Kqueue;
-}
-else version (OpenBSD)
-{
- version = Kqueue;
-}
-else version (DragonFlyBSD)
-{
- version = Kqueue;
-}
-
-/**
- * Events.
- */
-enum Event : uint
-{
- none = 0x00, /// No events.
- read = 0x01, /// Non-blocking read call.
- write = 0x02, /// Non-blocking write call.
- accept = 0x04, /// Connection made.
- error = 0x80000000, /// Sent when an error occurs.
-}
-
-alias EventMask = BitFlags!Event;
-
-/**
- * Event loop.
- */
-abstract class Loop
-{
- protected bool done = true;
-
- /// Pending watchers.
- protected DList!Watcher pendings;
-
- /**
- * Returns: Maximal event count can be got at a time
- * (should be supported by the backend).
- */
- protected @property uint maxEvents()
- const pure nothrow @safe @nogc
- {
- return 128U;
- }
-
- /**
- * Initializes the loop.
- */
- this() @nogc
- {
- }
-
- /**
- * Frees loop internals.
- */
- ~this() @nogc
- {
- for (; !this.pendings.empty; this.pendings.removeFront())
- {
- defaultAllocator.dispose(this.pendings.front);
- }
- }
-
- /**
- * Starts the loop.
- */
- void run() @nogc
- {
- this.done = false;
- do
- {
- poll();
-
- // Invoke pendings
- for (; !this.pendings.empty; this.pendings.removeFront())
- {
- this.pendings.front.invoke();
- }
- }
- while (!this.done);
- }
-
- /**
- * Break out of the loop.
- */
- void unloop() @safe pure nothrow @nogc
- {
- this.done = true;
- }
-
- /**
- * Start watching.
- *
- * Params:
- * watcher = Watcher.
- */
- void start(ConnectionWatcher watcher) @nogc
- {
- if (watcher.active)
- {
- return;
- }
- watcher.active = true;
-
- reify(watcher, EventMask(Event.none), EventMask(Event.accept));
- }
-
- /**
- * Stop watching.
- *
- * Params:
- * watcher = Watcher.
- */
- void stop(ConnectionWatcher watcher) @nogc
- {
- if (!watcher.active)
- {
- return;
- }
- watcher.active = false;
-
- reify(watcher, EventMask(Event.accept), EventMask(Event.none));
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- abstract protected bool reify(SocketWatcher watcher,
- EventMask oldEvents,
- EventMask events) @nogc;
-
- /**
- * Returns: The blocking time.
- */
- protected @property inout(Duration) blockTime()
- inout @safe pure nothrow @nogc
- {
- // Don't block if we have to do.
- return pendings.empty ? blockTime_ : Duration.zero;
- }
-
- /**
- * Sets the blocking time for IO watchers.
- *
- * Params:
- * blockTime = The blocking time. Cannot be larger than
- * $(D_PSYMBOL maxBlockTime).
- */
- protected @property void blockTime(in Duration blockTime) @safe pure nothrow @nogc
- in
- {
- assert(blockTime <= 1.dur!"hours", "Too long to wait.");
- assert(!blockTime.isNegative);
- }
- do
- {
- blockTime_ = blockTime;
- }
-
- /**
- * Does the actual polling.
- */
- abstract protected void poll() @nogc;
-
- /// Maximal block time.
- protected Duration blockTime_ = 1.dur!"minutes";
-}
-
-/**
- * Exception thrown on errors in the event loop.
- */
-class BadLoopException : Exception
-{
- /**
- * Params:
- * file = The file where the exception occurred.
- * line = The line number where the exception occurred.
- * next = The previous exception in the chain of exceptions, if any.
- */
- this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
- pure nothrow const @safe @nogc
- {
- super("Event loop cannot be initialized.", file, line, next);
- }
-}
-
-/**
- * Returns the event loop used by default. If an event loop wasn't set with
- * $(D_PSYMBOL defaultLoop) before, $(D_PSYMBOL defaultLoop) will try to
- * choose an event loop supported on the system.
- *
- * Returns: The default event loop.
- */
-@property Loop defaultLoop() @nogc
-{
- if (defaultLoop_ !is null)
- {
- return defaultLoop_;
- }
- version (Epoll)
- {
- defaultLoop_ = defaultAllocator.make!EpollLoop;
- }
- else version (IOCP)
- {
- defaultLoop_ = defaultAllocator.make!IOCPLoop;
- }
- else version (Kqueue)
- {
- import tanya.async.event.kqueue;
- defaultLoop_ = defaultAllocator.make!KqueueLoop;
- }
- return defaultLoop_;
-}
-
-/**
- * Sets the default event loop.
- *
- * This property makes it possible to implement your own backends or event
- * loops, for example, if the system is not supported or if you want to
- * extend the supported implementation. Just extend $(D_PSYMBOL Loop) and pass
- * your implementation to this property.
- *
- * Params:
- * loop = The event loop.
- */
-@property void defaultLoop(Loop loop) @nogc
-in
-{
- assert(loop !is null);
-}
-do
-{
- defaultLoop_ = loop;
-}
-
-private Loop defaultLoop_;
diff --git a/source/tanya/async/package.d b/source/tanya/async/package.d
deleted file mode 100644
index 18f361e..0000000
--- a/source/tanya/async/package.d
+++ /dev/null
@@ -1,20 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * This package provides asynchronous capabilities.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/package.d,
- * tanya/async/package.d)
- */
-module tanya.async;
-
-public import tanya.async.loop;
-public import tanya.async.protocol;
-public import tanya.async.transport;
-public import tanya.async.watcher;
diff --git a/source/tanya/async/protocol.d b/source/tanya/async/protocol.d
deleted file mode 100644
index c51dae5..0000000
--- a/source/tanya/async/protocol.d
+++ /dev/null
@@ -1,58 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * This module contains protocol which handle data in asynchronous
- * applications.
- *
- * When an event from the network arrives, a protocol method gets
- * called and can respond to the event.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/protocol.d,
- * tanya/async/protocol.d)
- */
-module tanya.async.protocol;
-
-import tanya.async.transport;
-import tanya.net.socket;
-
-/**
- * Common protocol interface.
- */
-interface Protocol
-{
- /**
- * Params:
- * data = Read data.
- */
- void received(in ubyte[] data) @nogc;
-
- /**
- * Called when a connection is made.
- *
- * Params:
- * transport = Protocol transport.
- */
- void connected(DuplexTransport transport) @nogc;
-
- /**
- * Called when a connection is lost.
- *
- * Params:
- * exception = $(D_PSYMBOL Exception) if an error caused
- * the disconnect, $(D_KEYWORD null) otherwise.
- */
- void disconnected(SocketException exception) @nogc;
-}
-
-/**
- * Interface for TCP.
- */
-interface TransmissionControlProtocol : Protocol
-{
-}
diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d
deleted file mode 100644
index 87051e8..0000000
--- a/source/tanya/async/transport.d
+++ /dev/null
@@ -1,104 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * This module contains transports which are responsible for data dilvery
- * between two parties of an asynchronous communication.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/transport.d,
- * tanya/async/transport.d)
- */
-module tanya.async.transport;
-
-import tanya.async.protocol;
-import tanya.net.socket;
-
-/**
- * Base transport interface.
- */
-interface Transport
-{
-}
-
-/**
- * Interface for read-only transports.
- */
-interface ReadTransport : Transport
-{
-}
-
-/**
- * Interface for write-only transports.
- */
-interface WriteTransport : Transport
-{
- /**
- * Write some data to the transport.
- *
- * Params:
- * data = Data to send.
- */
- void write(ubyte[] data) @nogc;
-}
-
-/**
- * Represents a bidirectional transport.
- */
-interface DuplexTransport : ReadTransport, WriteTransport
-{
- /**
- * Returns: Application protocol.
- *
- * Postcondition: $(D_INLINECODE protocol !is null)
- */
- @property Protocol protocol() pure nothrow @safe @nogc
- out (protocol)
- {
- assert(protocol !is null);
- }
-
- /**
- * Switches the protocol.
- *
- * The protocol is deallocated by the event loop.
- *
- * Params:
- * protocol = Application protocol.
- *
- * Precondition: $(D_INLINECODE protocol !is null)
- */
- @property void protocol(Protocol protocol) pure nothrow @safe @nogc
- in
- {
- assert(protocol !is null);
- }
-
-
- /**
- * Returns $(D_PARAM true) if the transport is closing or closed.
- */
- bool isClosing() const pure nothrow @safe @nogc;
-
- /**
- * Close the transport.
- *
- * Buffered data will be flushed. No more data will be received.
- */
- void close() @nogc;
-}
-
-/**
- * Represents a socket transport.
- */
-interface SocketTransport : Transport
-{
- /**
- * Returns: Socket.
- */
- @property Socket socket() pure nothrow @safe @nogc;
-}
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d
deleted file mode 100644
index 1159779..0000000
--- a/source/tanya/async/watcher.d
+++ /dev/null
@@ -1,117 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Watchers register user's interest in some event.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/async/watcher.d,
- * tanya/async/watcher.d)
- */
-module tanya.async.watcher;
-
-import tanya.async.loop;
-import tanya.async.protocol;
-import tanya.async.transport;
-import tanya.container.buffer;
-import tanya.container.list;
-import tanya.memory.allocator;
-import tanya.net.socket;
-
-/**
- * A watcher is an opaque structure that you allocate and register to record
- * your interest in some event.
- */
-abstract class Watcher
-{
- /// Whether the watcher is active.
- bool active;
-
- /**
- * Invoke some action on event.
- */
- void invoke() @nogc;
-}
-
-/**
- * Socket watcher.
- */
-abstract class SocketWatcher : Watcher
-{
- /// Watched socket.
- protected Socket socket_;
-
- /**
- * Params:
- * socket = Socket.
- *
- * Precondition: $(D_INLINECODE socket !is null)
- */
- this(Socket socket) pure nothrow @safe @nogc
- in
- {
- assert(socket !is null);
- }
- do
- {
- socket_ = socket;
- }
-
- /**
- * Returns: Socket.
- */
- @property Socket socket() pure nothrow @safe @nogc
- {
- return socket_;
- }
-}
-
-/**
- * Connection watcher.
- */
-class ConnectionWatcher : SocketWatcher
-{
- /// Incoming connection queue.
- DList!DuplexTransport incoming;
-
- private Protocol delegate() @nogc protocolFactory;
-
- /**
- * Params:
- * socket = Socket.
- */
- this(Socket socket) @nogc
- {
- super(socket);
- }
-
- /**
- * Params:
- * P = Protocol should be used.
- */
- void setProtocol(P : Protocol)() @nogc
- {
- this.protocolFactory = () @nogc => cast(Protocol) defaultAllocator.make!P;
- }
-
- /**
- * Invokes new connection callback.
- */
- override void invoke() @nogc
- in
- {
- assert(protocolFactory !is null, "Protocol isn't set.");
- }
- do
- {
- for (; !this.incoming.empty; this.incoming.removeFront())
- {
- this.incoming.front.protocol = protocolFactory();
- this.incoming.front.protocol.connected(this.incoming.front);
- }
- }
-}
diff --git a/source/tanya/net/package.d b/source/tanya/net/package.d
index 37ac2cc..9626b9e 100644
--- a/source/tanya/net/package.d
+++ b/source/tanya/net/package.d
@@ -5,7 +5,7 @@
/**
* Network programming.
*
- * Copyright: Eugene Wissner 2017-2020.
+ * Copyright: Eugene Wissner 2017-2022.
* License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
* Mozilla Public License, v. 2.0).
* Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
@@ -17,5 +17,4 @@ module tanya.net;
public import tanya.net.iface;
public import tanya.net.inet;
public import tanya.net.ip;
-public import tanya.net.socket;
public import tanya.net.uri;
diff --git a/source/tanya/net/socket.d b/source/tanya/net/socket.d
deleted file mode 100644
index 0bdb84a..0000000
--- a/source/tanya/net/socket.d
+++ /dev/null
@@ -1,1258 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Low-level socket programming.
- *
- * Current API supports only server-side TCP communication.
- *
- * For an example of an asynchronous server refer to the documentation of the
- * $(D_PSYMBOL tanya.async.loop) module.
- *
- * Copyright: Eugene Wissner 2016-2020.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
- * Source: $(LINK2 https://github.com/caraus-ecms/tanya/blob/master/source/tanya/network/socket.d,
- * tanya/network/socket.d)
- */
-module tanya.net.socket;
-
-import core.stdc.errno;
-import core.time;
-public import std.socket : SocketOption, SocketOptionLevel;
-import tanya.bitmanip;
-import tanya.memory.allocator;
-import tanya.meta.trait;
-import tanya.os.error;
-import tanya.net.iface;
-import tanya.net.ip;
-
-/// Value returned by socket operations on error.
-enum int socketError = -1;
-
-version (Posix)
-{
- import core.stdc.errno;
- import core.sys.posix.fcntl;
- import core.sys.posix.netdb;
- import core.sys.posix.netinet.in_;
- import core.sys.posix.sys.socket;
- import core.sys.posix.sys.time;
- import core.sys.posix.unistd;
-
- enum SocketType : int
- {
- init = -1,
- }
-
- private alias LingerField = int;
-}
-else version (Windows)
-{
- import core.sys.windows.winbase;
- import core.sys.windows.winerror;
- import core.sys.windows.winsock2 : accept,
- addrinfo,
- bind,
- closesocket,
- FIONBIO,
- freeaddrinfo,
- getaddrinfo,
- getsockopt,
- ioctlsocket,
- listen,
- MSG_DONTROUTE,
- MSG_OOB,
- MSG_PEEK,
- recv,
- SD_BOTH,
- SD_RECEIVE,
- SD_SEND,
- send,
- setsockopt,
- shutdown,
- SO_TYPE,
- SOCKADDR,
- sockaddr,
- sockaddr_in,
- sockaddr_in6,
- SOCKADDR_STORAGE,
- socket,
- socklen_t,
- SOL_SOCKET,
- WSAEWOULDBLOCK,
- WSAGetLastError;
- import tanya.async.iocp;
- import tanya.sys.windows.def;
- public import tanya.sys.windows.winbase;
- public import tanya.sys.windows.winsock2;
-
- enum SocketType : size_t
- {
- init = ~0,
- }
-
- private alias LingerField = ushort;
-
- enum OverlappedSocketEvent
- {
- accept = 1,
- read = 2,
- write = 3,
- }
-
- class SocketState : State
- {
- private WSABUF buffer;
- }
-
- /**
- * Socket returned if a connection has been established.
- *
- * Note: Available only on Windows.
- */
- class OverlappedConnectedSocket : ConnectedSocket
- {
- /**
- * Create a socket.
- *
- * Params:
- * handle = Socket handle.
- * af = Address family.
- */
- this(SocketType handle, AddressFamily af) @nogc
- {
- super(handle, af);
- }
-
- /**
- * Begins to asynchronously receive data from a connected socket.
- *
- * Params:
- * buffer = Storage location for the received data.
- * flags = Flags.
- * overlapped = Unique operation identifier.
- *
- * Returns: $(D_KEYWORD true) if the operation could be finished synchronously.
- * $(D_KEYWORD false) otherwise.
- *
- * Throws: $(D_PSYMBOL SocketException) if unable to receive.
- */
- bool beginReceive(ubyte[] buffer,
- SocketState overlapped,
- Flags flags = Flags(Flag.none)) @nogc @trusted
- {
- auto receiveFlags = cast(DWORD) flags;
-
- overlapped.handle = cast(HANDLE) handle_;
- overlapped.event = OverlappedSocketEvent.read;
- overlapped.buffer.len = cast(ULONG) buffer.length;
- overlapped.buffer.buf = cast(char*) buffer.ptr;
-
- auto result = WSARecv(handle_,
- cast(WSABUF*) &overlapped.buffer,
- 1u,
- null,
- &receiveFlags,
- &overlapped.overlapped,
- null);
-
- if (result == socketError && !wouldHaveBlocked)
- {
- throw defaultAllocator.make!SocketException("Unable to receive");
- }
- return result == 0;
- }
-
- /**
- * Ends a pending asynchronous read.
- *
- * Params:
- * overlapped = Unique operation identifier.
- *
- * Returns: Number of bytes received.
- *
- * Throws: $(D_PSYMBOL SocketException) if unable to receive.
- *
- * Postcondition: $(D_INLINECODE result >= 0).
- */
- int endReceive(SocketState overlapped) @nogc @trusted
- out (count)
- {
- assert(count >= 0);
- }
- do
- {
- DWORD lpNumber;
- BOOL result = GetOverlappedResult(overlapped.handle,
- &overlapped.overlapped,
- &lpNumber,
- FALSE);
- if (result == FALSE && !wouldHaveBlocked)
- {
- disconnected_ = true;
- throw defaultAllocator.make!SocketException("Unable to receive");
- }
- if (lpNumber == 0)
- {
- disconnected_ = true;
- }
- return lpNumber;
- }
-
- /**
- * Sends data asynchronously to a connected socket.
- *
- * Params:
- * buffer = Data to be sent.
- * flags = Flags.
- * overlapped = Unique operation identifier.
- *
- * Returns: $(D_KEYWORD true) if the operation could be finished synchronously.
- * $(D_KEYWORD false) otherwise.
- *
- * Throws: $(D_PSYMBOL SocketException) if unable to send.
- */
- bool beginSend(ubyte[] buffer,
- SocketState overlapped,
- Flags flags = Flags(Flag.none)) @nogc @trusted
- {
- overlapped.handle = cast(HANDLE) handle_;
- overlapped.event = OverlappedSocketEvent.write;
- overlapped.buffer.len = cast(ULONG) buffer.length;
- overlapped.buffer.buf = cast(char*) buffer.ptr;
-
- auto result = WSASend(handle_,
- &overlapped.buffer,
- 1u,
- null,
- cast(DWORD) flags,
- &overlapped.overlapped,
- null);
-
- if (result == socketError && !wouldHaveBlocked)
- {
- disconnected_ = true;
- throw defaultAllocator.make!SocketException("Unable to send");
- }
- return result == 0;
- }
-
- /**
- * Ends a pending asynchronous send.
- *
- * Params:
- * overlapped = Unique operation identifier.
- *
- * Returns: Number of bytes sent.
- *
- * Throws: $(D_PSYMBOL SocketException) if unable to receive.
- *
- * Postcondition: $(D_INLINECODE result >= 0).
- */
- int endSend(SocketState overlapped) @nogc @trusted
- out (count)
- {
- assert(count >= 0);
- }
- do
- {
- DWORD lpNumber;
- BOOL result = GetOverlappedResult(overlapped.handle,
- &overlapped.overlapped,
- &lpNumber,
- FALSE);
- if (result == FALSE && !wouldHaveBlocked)
- {
- disconnected_ = true;
- throw defaultAllocator.make!SocketException("Unable to receive");
- }
- return lpNumber;
- }
- }
-
- /**
- * Windows stream socket overlapped I/O.
- */
- class OverlappedStreamSocket : StreamSocket
- {
- // Accept extension function pointer.
- package LPFN_ACCEPTEX acceptExtension;
-
- /**
- * Create a socket.
- *
- * Params:
- * af = Address family.
- *
- * Throws: $(D_PSYMBOL SocketException) on errors.
- */
- this(AddressFamily af) @nogc @trusted
- {
- super(af);
- scope (failure)
- {
- this.close();
- }
- blocking = false;
-
- GUID guidAcceptEx = WSAID_ACCEPTEX;
- DWORD dwBytes;
-
- auto result = WSAIoctl(handle_,
- SIO_GET_EXTENSION_FUNCTION_POINTER,
- &guidAcceptEx,
- guidAcceptEx.sizeof,
- &acceptExtension,
- acceptExtension.sizeof,
- &dwBytes,
- null,
- null);
- if (!result == socketError)
- {
- throw make!SocketException(defaultAllocator,
- "Unable to retrieve an accept extension function pointer");
- }
- }
-
- /**
- * Begins an asynchronous operation to accept an incoming connection attempt.
- *
- * Params:
- * overlapped = Unique operation identifier.
- *
- * Returns: $(D_KEYWORD true) if the operation could be finished synchronously.
- * $(D_KEYWORD false) otherwise.
- *
- * Throws: $(D_PSYMBOL SocketException) on accept errors.
- */
- bool beginAccept(SocketState overlapped) @nogc @trusted
- {
- auto socket = cast(SocketType) socket(addressFamily, 1, 0);
- if (socket == SocketType.init)
- {
- throw defaultAllocator.make!SocketException("Unable to create socket");
- }
- scope (failure)
- {
- closesocket(socket);
- }
- DWORD dwBytes;
- overlapped.handle = cast(HANDLE) socket;
- overlapped.event = OverlappedSocketEvent.accept;
-
- const len = (sockaddr_in.sizeof + 16) * 2;
- overlapped.buffer.len = len;
- overlapped.buffer.buf = cast(char*) defaultAllocator.allocate(len).ptr;
-
- // We don't want to get any data now, but only start to accept the connections
- BOOL result = acceptExtension(handle_,
- socket,
- overlapped.buffer.buf,
- 0u,
- sockaddr_in.sizeof + 16,
- sockaddr_in.sizeof + 16,
- &dwBytes,
- &overlapped.overlapped);
- if (result == FALSE && !wouldHaveBlocked)
- {
- throw defaultAllocator.make!SocketException("Unable to accept socket connection");
- }
- return result == TRUE;
- }
-
- /**
- * Asynchronously accepts an incoming connection attempt and creates a
- * new socket to handle remote host communication.
- *
- * Params:
- * overlapped = Unique operation identifier.
- *
- * Returns: Connected socket.
- *
- * Throws: $(D_PSYMBOL SocketException) if unable to accept.
- */
- OverlappedConnectedSocket endAccept(SocketState overlapped)
- @nogc @trusted
- {
- scope (exit)
- {
- defaultAllocator.dispose(overlapped.buffer.buf[0 .. overlapped.buffer.len]);
- }
- auto socket = make!OverlappedConnectedSocket(defaultAllocator,
- cast(SocketType) overlapped.handle,
- addressFamily);
- scope (failure)
- {
- defaultAllocator.dispose(socket);
- }
- socket.setOption(SocketOptionLevel.SOCKET,
- cast(SocketOption) SO_UPDATE_ACCEPT_CONTEXT,
- cast(size_t) handle);
- return socket;
- }
- }
-}
-
-/**
- * Socket option that specifies what should happen when the socket that
- * promises reliable delivery still has untransmitted messages when
- * it is closed.
- */
-struct Linger
-{
- /// If nonzero, $(D_PSYMBOL close) and $(D_PSYMBOL shutdown) block until
- /// the data are transmitted or the timeout period has expired.
- LingerField l_onoff;
-
- /// Time, in seconds to wait before any buffered data to be sent is
- /// discarded.
- LingerField l_linger;
-
- /**
- * If $(D_PARAM timeout) is `0`, linger is disabled, otherwise enables the
- * linger and sets the timeout.
- *
- * Params:
- * timeout = Timeout, in seconds.
- */
- this(const ushort timeout)
- {
- time = timeout;
- }
-
- ///
- unittest
- {
- {
- auto linger = Linger(5);
- assert(linger.enabled);
- assert(linger.time == 5);
- }
- {
- auto linger = Linger(0);
- assert(!linger.enabled);
- }
- { // Default constructor.
- Linger linger;
- assert(!linger.enabled);
- }
- }
-
- /**
- * System dependent constructor.
- *
- * Params:
- * l_onoff = $(D_PSYMBOL l_onoff) value.
- * l_linger = $(D_PSYMBOL l_linger) value.
- */
- this(LingerField l_onoff, LingerField l_linger)
- {
- this.l_onoff = l_onoff;
- this.l_linger = l_linger;
- }
-
- ///
- unittest
- {
- auto linger = Linger(1, 5);
- assert(linger.l_onoff == 1);
- assert(linger.l_linger == 5);
- }
-
- /**
- * Params:
- * value = Whether to linger after the socket is closed.
- *
- * See_Also: $(D_PSYMBOL time).
- */
- @property void enabled(const bool value) pure nothrow @safe @nogc
- {
- this.l_onoff = value;
- }
-
- /**
- * Returns: Whether to linger after the socket is closed.
- */
- @property bool enabled() const pure nothrow @safe @nogc
- {
- return this.l_onoff != 0;
- }
-
- /**
- * Returns: Timeout period, in seconds, to wait before closing the socket
- * if the $(D_PSYMBOL Linger) is $(D_PSYMBOL enabled).
- */
- @property ushort time() const pure nothrow @safe @nogc
- {
- return this.l_linger & ushort.max;
- }
-
- /**
- * Sets timeout period, to wait before closing the socket if the
- * $(D_PSYMBOL Linger) is $(D_PSYMBOL enabled), ignored otherwise.
- *
- * Params:
- * timeout = Timeout period, in seconds.
- */
- @property void time(const ushort timeout) pure nothrow @safe @nogc
- {
- this.l_onoff = timeout > 0;
- this.l_linger = timeout;
- }
-}
-
-version (linux)
-{
- enum SOCK_NONBLOCK = O_NONBLOCK;
- extern(C) int accept4(int, sockaddr*, socklen_t*, int flags) @nogc nothrow;
-}
-else version (OSX)
-{
- version = MacBSD;
-}
-else version (iOS)
-{
- version = MacBSD;
-}
-else version (FreeBSD)
-{
- version = MacBSD;
-}
-else version (OpenBSD)
-{
- version = MacBSD;
-}
-else version (DragonFlyBSD)
-{
- version = MacBSD;
-}
-
-version (MacBSD)
-{
- enum ESOCKTNOSUPPORT = 44; // Socket type not suppoted.
-}
-
-private immutable
-{
- typeof(&getaddrinfo) getaddrinfoPointer;
- typeof(&freeaddrinfo) freeaddrinfoPointer;
-}
-
-shared static this()
-{
- version (Windows)
- {
- auto ws2Lib = GetModuleHandle("ws2_32.dll");
-
- getaddrinfoPointer = cast(typeof(getaddrinfoPointer))
- GetProcAddress(ws2Lib, "getaddrinfo");
- freeaddrinfoPointer = cast(typeof(freeaddrinfoPointer))
- GetProcAddress(ws2Lib, "freeaddrinfo");
- }
- else version (Posix)
- {
- getaddrinfoPointer = &getaddrinfo;
- freeaddrinfoPointer = &freeaddrinfo;
- }
-}
-
-/**
- * $(D_PSYMBOL SocketException) should be thrown only if one of the socket functions
- * $(D_PSYMBOL socketError) and sets $(D_PSYMBOL errno), because
- * $(D_PSYMBOL SocketException) relies on the $(D_PSYMBOL errno) value.
- */
-class SocketException : Exception
-{
- const ErrorCode.ErrorNo error = ErrorCode.ErrorNo.success;
-
- /**
- * Params:
- * msg = The message for the exception.
- * file = The file where the exception occurred.
- * line = The line number where the exception occurred.
- * next = The previous exception in the chain of exceptions, if any.
- */
- this(string msg,
- string file = __FILE__,
- size_t line = __LINE__,
- Throwable next = null) @nogc @safe nothrow
- {
- super(msg, file, line, next);
-
- foreach (member; EnumMembers!(ErrorCode.ErrorNo))
- {
- if (member == lastError)
- {
- error = member;
- return;
- }
- }
- if (lastError == ENOMEM)
- {
- error = ErrorCode.ErrorNo.noBufferSpace;
- }
- else if (lastError == EMFILE)
- {
- error = ErrorCode.ErrorNo.tooManyDescriptors;
- }
- else version (linux)
- {
- if (lastError == ENOSR)
- {
- error = ErrorCode.ErrorNo.networkDown;
- }
- }
- else version (Posix)
- {
- if (lastError == EPROTO)
- {
- error = ErrorCode.ErrorNo.networkDown;
- }
- }
- }
-}
-
-/**
- * Class for creating a network communication endpoint using the Berkeley
- * sockets interfaces of different types.
- */
-abstract class Socket
-{
- version (Posix)
- {
- /**
- * How a socket is shutdown.
- */
- enum Shutdown : int
- {
- receive = SHUT_RD, /// Socket receives are disallowed
- send = SHUT_WR, /// Socket sends are disallowed
- both = SHUT_RDWR, /// Both receive and send
- }
- }
- else version (Windows)
- {
- /// Property to get or set whether the socket is blocking or nonblocking.
- private bool blocking_ = true;
-
- /**
- * How a socket is shutdown.
- */
- enum Shutdown : int
- {
- receive = SD_RECEIVE, /// Socket receives are disallowed.
- send = SD_SEND, /// Socket sends are disallowed.
- both = SD_BOTH, /// Both receive and send.
- }
-
- // The WinSock timeouts seem to be effectively skewed by a constant
- // offset of about half a second (in milliseconds).
- private enum WINSOCK_TIMEOUT_SKEW = 500;
- }
-
- /// Socket handle.
- protected SocketType handle_;
-
- /// Address family.
- protected AddressFamily family;
-
- private @property void handle(SocketType handle) @nogc
- in
- {
- assert(handle != SocketType.init);
- assert(handle_ == SocketType.init, "Socket handle cannot be changed");
- }
- do
- {
- handle_ = handle;
-
- // Set the option to disable SIGPIPE on send() if the platform
- // has it (e.g. on OS X).
- static if (is(typeof(SO_NOSIGPIPE)))
- {
- setOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_NOSIGPIPE, true);
- }
- }
-
- @property inout(SocketType) handle() inout const pure nothrow @safe @nogc
- {
- return handle_;
- }
-
- /**
- * Create a socket.
- *
- * Params:
- * handle = Socket.
- * af = Address family.
- */
- this(SocketType handle, AddressFamily af) @nogc
- in
- {
- assert(handle != SocketType.init);
- }
- do
- {
- scope (failure)
- {
- this.close();
- }
- this.handle = handle;
- family = af;
- }
-
- /**
- * Closes the socket and calls the destructor on itself.
- */
- ~this() nothrow @trusted @nogc
- {
- this.close();
- }
-
- /**
- * Get a socket option.
- *
- * Params:
- * level = Protocol level at that the option exists.
- * option = Option.
- * result = Buffer to save the result.
- *
- * Returns: The number of bytes written to $(D_PARAM result).
- *
- * Throws: $(D_PSYMBOL SocketException) on error.
- */
- protected int getOption(SocketOptionLevel level,
- SocketOption option,
- void[] result) const @trusted @nogc
- {
- auto length = cast(socklen_t) result.length;
- if (getsockopt(handle_,
- cast(int) level,
- cast(int) option,
- result.ptr,
- &length) == socketError)
- {
- throw defaultAllocator.make!SocketException("Unable to get socket option");
- }
- return length;
- }
-
- /// Ditto.
- int getOption(SocketOptionLevel level,
- SocketOption option,
- out size_t result) const @trusted @nogc
- {
- return getOption(level, option, (&result)[0 .. 1]);
- }
-
- /// Ditto.
- int getOption(SocketOptionLevel level,
- SocketOption option,
- out Linger result) const @trusted @nogc
- {
- return getOption(level, option, (&result)[0 .. 1]);
- }
-
- /// Ditto.
- int getOption(SocketOptionLevel level,
- SocketOption option,
- out Duration result) const @trusted @nogc
- {
- // WinSock returns the timeout values as a milliseconds DWORD,
- // while Linux and BSD return a timeval struct.
- version (Posix)
- {
- timeval tv;
- auto ret = getOption(level, option, (&tv)[0 .. 1]);
- result = dur!"seconds"(tv.tv_sec) + dur!"usecs"(tv.tv_usec);
- }
- else version (Windows)
- {
- int msecs;
- auto ret = getOption(level, option, (&msecs)[0 .. 1]);
- if (option == SocketOption.RCVTIMEO)
- {
- msecs += WINSOCK_TIMEOUT_SKEW;
- }
- result = dur!"msecs"(msecs);
- }
- return ret;
- }
-
- /**
- * Set a socket option.
- *
- * Params:
- * level = Protocol level at that the option exists.
- * option = Option.
- * value = Option value.
- *
- * Throws: $(D_PSYMBOL SocketException) on error.
- */
- protected void setOption(SocketOptionLevel level,
- SocketOption option,
- void[] value) const @trusted @nogc
- {
- if (setsockopt(handle_,
- cast(int)level,
- cast(int)option,
- value.ptr,
- cast(uint) value.length) == socketError)
- {
- throw defaultAllocator.make!SocketException("Unable to set socket option");
- }
- }
-
- /// Ditto.
- void setOption(SocketOptionLevel level, SocketOption option, size_t value)
- const @trusted @nogc
- {
- setOption(level, option, (&value)[0 .. 1]);
- }
-
- /// Ditto.
- void setOption(SocketOptionLevel level, SocketOption option, Linger value)
- const @trusted @nogc
- {
- setOption(level, option, (&value)[0 .. 1]);
- }
-
- /// Ditto.
- void setOption(SocketOptionLevel level, SocketOption option, Duration value)
- const @trusted @nogc
- {
- version (Posix)
- {
- timeval tv;
- value.split!("seconds", "usecs")(tv.tv_sec, tv.tv_usec);
- setOption(level, option, (&tv)[0 .. 1]);
- }
- else version (Windows)
- {
- auto msecs = cast(int) value.total!"msecs";
- if (msecs > 0 && option == SocketOption.RCVTIMEO)
- {
- msecs = max(1, msecs - WINSOCK_TIMEOUT_SKEW);
- }
- setOption(level, option, msecs);
- }
- }
-
- /**
- * Returns: Socket's blocking flag.
- */
- @property inout(bool) blocking() inout const nothrow @nogc
- {
- version (Posix)
- {
- return !(fcntl(handle_, F_GETFL, 0) & O_NONBLOCK);
- }
- else version (Windows)
- {
- return this.blocking_;
- }
- }
-
- /**
- * Params:
- * yes = Socket's blocking flag.
- */
- @property void blocking(bool yes) @nogc
- {
- version (Posix)
- {
- int fl = fcntl(handle_, F_GETFL, 0);
-
- if (fl != socketError)
- {
- fl = yes ? fl & ~O_NONBLOCK : fl | O_NONBLOCK;
- fl = fcntl(handle_, F_SETFL, fl);
- }
- if (fl == socketError)
- {
- throw make!SocketException(defaultAllocator,
- "Unable to set socket blocking");
- }
- }
- else version (Windows)
- {
- uint num = !yes;
- if (ioctlsocket(handle_, FIONBIO, &num) == socketError)
- {
- throw make!SocketException(defaultAllocator,
- "Unable to set socket blocking");
- }
- this.blocking_ = yes;
- }
- }
-
- /**
- * Returns: The socket's address family.
- */
- @property AddressFamily addressFamily() const @nogc @safe pure nothrow
- {
- return family;
- }
-
- /**
- * Returns: $(D_KEYWORD true) if this is a valid, alive socket.
- */
- @property bool isAlive() @trusted const nothrow @nogc
- {
- int type;
- socklen_t typesize = cast(socklen_t) type.sizeof;
- return !getsockopt(handle_, SOL_SOCKET, SO_TYPE, cast(char*)&type, &typesize);
- }
-
- /**
- * Disables sends and/or receives.
- *
- * Params:
- * how = What to disable.
- *
- * See_Also:
- * $(D_PSYMBOL Shutdown)
- */
- void shutdown(Shutdown how = Shutdown.both) @nogc @trusted const nothrow
- {
- .shutdown(handle_, cast(int)how);
- }
-
- /**
- * Immediately drop any connections and release socket resources.
- * Calling $(D_PSYMBOL shutdown) before $(D_PSYMBOL close) is recommended
- * for connection-oriented sockets. The $(D_PSYMBOL Socket) object is no
- * longer usable after $(D_PSYMBOL close).
- */
- void close() nothrow @trusted @nogc
- {
- version(Windows)
- {
- .closesocket(handle_);
- }
- else version(Posix)
- {
- .close(handle_);
- }
- handle_ = SocketType.init;
- }
-
- /**
- * Listen for an incoming connection. $(D_PSYMBOL bind) must be called before you
- * can $(D_PSYMBOL listen).
- *
- * Params:
- * backlog = Request of how many pending incoming connections are
- * queued until $(D_PSYMBOL accept)ed.
- */
- void listen(int backlog) const @trusted @nogc
- {
- if (.listen(handle_, backlog) == socketError)
- {
- throw defaultAllocator.make!SocketException("Unable to listen on socket");
- }
- }
-
- /**
- * Compare handles.
- *
- * Params:
- * that = Another handle.
- *
- * Returns: Comparision result.
- */
- int opCmp(size_t that) const pure nothrow @safe @nogc
- {
- return handle_ < that ? -1 : handle_ > that ? 1 : 0;
- }
-}
-
-/**
- * Interface with common fileds for stream and connected sockets.
- */
-interface ConnectionOrientedSocket
-{
- /**
- * Flags may be OR'ed together.
- */
- enum Flag : int
- {
- /// No flags specified.
- none = 0,
- /// Out-of-band stream data.
- outOfBand = MSG_OOB,
- /// Peek at incoming data without removing it from the queue, only for receiving.
- peek = MSG_PEEK,
- /// Data should not be subject to routing; this flag may be ignored. Only for sending.
- dontRoute = MSG_DONTROUTE,
- }
-
- alias Flags = BitFlags!Flag;
-}
-
-class StreamSocket : Socket, ConnectionOrientedSocket
-{
- /**
- * Create a socket.
- *
- * Params:
- * af = Address family.
- */
- this(AddressFamily af) @trusted @nogc
- {
- auto handle = cast(SocketType) socket(af, 1, 0);
- if (handle == SocketType.init)
- {
- throw defaultAllocator.make!SocketException("Unable to create socket");
- }
- super(handle, af);
- }
-
- /**
- * Associate a local address with this socket.
- *
- * Params:
- * endpoint = Local address.
- *
- * Throws: $(D_PSYMBOL SocketException) if unable to bind.
- */
- void bind(const Endpoint endpoint) const @nogc
- {
- if (.bind(handle_, cast(const(sockaddr)*) &endpoint, Endpoint.sizeof))
- {
- throw defaultAllocator.make!SocketException("Unable to bind socket");
- }
- }
-
- /**
- * Accept an incoming connection.
- *
- * The blocking mode is always inherited.
- *
- * Returns: $(D_PSYMBOL Socket) for the accepted connection or
- * $(D_KEYWORD null) if the call would block on a
- * non-blocking socket.
- *
- * Throws: $(D_PSYMBOL SocketException) if unable to accept.
- */
- ConnectedSocket accept() @trusted @nogc
- {
- SocketType sock;
-
- version (linux)
- {
- int flags;
- if (!blocking)
- {
- flags |= SOCK_NONBLOCK;
- }
- sock = cast(SocketType).accept4(handle_, null, null, flags);
- }
- else
- {
- sock = cast(SocketType).accept(handle_, null, null);
- }
-
- if (sock == SocketType.init)
- {
- if (wouldHaveBlocked())
- {
- return null;
- }
- throw make!SocketException(defaultAllocator,
- "Unable to accept socket connection");
- }
-
- auto newSocket = defaultAllocator.make!ConnectedSocket(sock, addressFamily);
-
- version (linux)
- { // Blocking mode already set
- }
- else version (Posix)
- {
- if (!blocking)
- {
- try
- {
- newSocket.blocking = blocking;
- }
- catch (SocketException e)
- {
- defaultAllocator.dispose(newSocket);
- throw e;
- }
- }
- }
- else version (Windows)
- { // Inherits blocking mode
- newSocket.blocking_ = blocking;
- }
- return newSocket;
- }
-}
-
-/**
- * Socket returned if a connection has been established.
- */
-class ConnectedSocket : Socket, ConnectionOrientedSocket
-{
- /**
- * $(D_KEYWORD true) if the stream socket peer has performed an orderly
- * shutdown.
- */
- protected bool disconnected_;
-
- /**
- * Returns: $(D_KEYWORD true) if the stream socket peer has performed an orderly
- * shutdown.
- */
- @property inout(bool) disconnected() inout const pure nothrow @safe @nogc
- {
- return disconnected_;
- }
-
- /**
- * Create a socket.
- *
- * Params:
- * handle = Socket.
- * af = Address family.
- */
- this(SocketType handle, AddressFamily af) @nogc
- {
- super(handle, af);
- }
-
- version (Windows)
- {
- private static int capToMaxBuffer(size_t size) pure nothrow @safe @nogc
- {
- // Windows uses int instead of size_t for length arguments.
- // Luckily, the send/recv functions make no guarantee that
- // all the data is sent, so we use that to send at most
- // int.max bytes.
- return size > size_t (int.max) ? int.max : cast(int) size;
- }
- }
- else
- {
- private static size_t capToMaxBuffer(size_t size) pure nothrow @safe @nogc
- {
- return size;
- }
- }
-
- /**
- * Receive data on the connection.
- *
- * Params:
- * buf = Buffer to save received data.
- * flags = Flags.
- *
- * Returns: The number of bytes received or 0 if nothing received
- * because the call would block.
- *
- * Throws: $(D_PSYMBOL SocketException) if unable to receive.
- */
- ptrdiff_t receive(ubyte[] buf, Flags flags = Flag.none) @trusted @nogc
- {
- ptrdiff_t ret;
- if (!buf.length)
- {
- return 0;
- }
-
- ret = recv(handle_, buf.ptr, capToMaxBuffer(buf.length), cast(int) flags);
- if (ret == 0)
- {
- disconnected_ = true;
- }
- else if (ret == socketError)
- {
- if (wouldHaveBlocked())
- {
- return 0;
- }
- disconnected_ = true;
- throw defaultAllocator.make!SocketException("Unable to receive");
- }
- return ret;
- }
-
- /**
- * Send data on the connection. If the socket is blocking and there is no
- * buffer space left, $(D_PSYMBOL send) waits, non-blocking socket returns
- * 0 in this case.
- *
- * Params:
- * buf = Data to be sent.
- * flags = Flags.
- *
- * Returns: The number of bytes actually sent.
- *
- * Throws: $(D_PSYMBOL SocketException) if unable to send.
- */
- ptrdiff_t send(const(ubyte)[] buf, Flags flags = Flag.none)
- const @trusted @nogc
- {
- int sendFlags = cast(int) flags;
- ptrdiff_t sent;
-
- static if (is(typeof(MSG_NOSIGNAL)))
- {
- sendFlags |= MSG_NOSIGNAL;
- }
-
- sent = .send(handle_, buf.ptr, capToMaxBuffer(buf.length), sendFlags);
- if (sent != socketError)
- {
- return sent;
- }
- else if (wouldHaveBlocked())
- {
- return 0;
- }
- throw defaultAllocator.make!SocketException("Unable to send");
- }
-}
-
-/**
- * Checks if the last error is a serious error or just a special
- * behaviour error of non-blocking sockets (for example an error
- * returned because the socket would block or because the
- * asynchronous operation was successfully started but not finished yet).
- *
- * Returns: $(D_KEYWORD false) if a serious error happened, $(D_KEYWORD true)
- * otherwise.
- */
-bool wouldHaveBlocked() nothrow @trusted @nogc
-{
- version (Posix)
- {
- return errno == EAGAIN || errno == EWOULDBLOCK;
- }
- else version (Windows)
- {
- return WSAGetLastError() == ERROR_IO_PENDING
- || WSAGetLastError() == WSAEWOULDBLOCK
- || WSAGetLastError() == ERROR_IO_INCOMPLETE;
- }
-}
-
-/**
- * Returns: Platform specific error code.
- */
-private @property int lastError() nothrow @safe @nogc
-{
- version (Windows)
- {
- return WSAGetLastError();
- }
- else
- {
- return errno;
- }
-}