summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEugen Wissner <belka@caraus.de>2016-10-08 19:33:06 +0200
committerEugen Wissner <belka@caraus.de>2016-10-08 19:33:06 +0200
commit6b093cd5fac7dfa783bd82d4570a21f24cac29ab (patch)
treea8caeb4f98e83598455cdbc9581af7354e97b99d
parent154e2f2ff7b8e75720c36752caa234b813dcb295 (diff)
downloadtanya-6b093cd5fac7dfa783bd82d4570a21f24cac29ab.tar.gz
Add Windows IOCP and Kqueue implementations for the event loop
-rw-r--r--source/tanya/async/event/epoll.d178
-rw-r--r--source/tanya/async/event/iocp.d294
-rw-r--r--source/tanya/async/event/kqueue.d349
-rw-r--r--source/tanya/async/event/selector.d266
-rw-r--r--source/tanya/async/loop.d493
-rw-r--r--source/tanya/async/package.d (renamed from source/tanya/event/config.d)45
-rw-r--r--source/tanya/async/protocol.d50
-rw-r--r--source/tanya/async/transport.d63
-rw-r--r--source/tanya/async/watcher.d242
-rw-r--r--source/tanya/container/buffer.d1336
-rw-r--r--source/tanya/event/internal/epoll.d223
-rw-r--r--source/tanya/event/internal/selector.d216
-rw-r--r--source/tanya/event/loop.d275
-rw-r--r--source/tanya/event/package.d16
-rw-r--r--source/tanya/event/protocol.d46
-rw-r--r--source/tanya/event/transport.d134
-rw-r--r--source/tanya/event/watcher.d207
17 files changed, 2654 insertions, 1779 deletions
diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d
new file mode 100644
index 0000000..3451661
--- /dev/null
+++ b/source/tanya/async/event/epoll.d
@@ -0,0 +1,178 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async.event.epoll;
+
+version (linux):
+
+public import core.sys.linux.epoll;
+import tanya.async.protocol;
+import tanya.async.event.selector;
+import tanya.async.loop;
+import tanya.async.transport;
+import tanya.async.watcher;
+import tanya.memory;
+import tanya.memory.mmappool;
+import tanya.network.socket;
+import core.stdc.errno;
+import core.sys.posix.unistd;
+import core.time;
+import std.algorithm.comparison;
+
+class EpollLoop : SelectorLoop
+{
+ protected int fd;
+ private epoll_event[] events;
+
+ /**
+ * Initializes the loop.
+ */
+ this()
+ {
+ if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
+ {
+ throw MmapPool.instance.make!BadLoopException("epoll initialization failed");
+ }
+ super();
+ events = MmapPool.instance.makeArray!epoll_event(maxEvents);
+ }
+
+ /**
+ * Free loop internals.
+ */
+ ~this()
+ {
+ MmapPool.instance.dispose(events);
+ close(fd);
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ protected override bool reify(ConnectionWatcher watcher, EventMask oldEvents, EventMask events)
+ in
+ {
+ assert(watcher !is null);
+ }
+ body
+ {
+ int op = EPOLL_CTL_DEL;
+ epoll_event ev;
+
+ if (events == oldEvents)
+ {
+ return true;
+ }
+ if (events && oldEvents)
+ {
+ op = EPOLL_CTL_MOD;
+ }
+ else if (events && !oldEvents)
+ {
+ op = EPOLL_CTL_ADD;
+ }
+
+ ev.data.fd = watcher.socket.handle;
+ ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
+ | (events & Event.write ? EPOLLOUT : 0)
+ | EPOLLET;
+
+ return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0;
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ protected override void poll()
+ {
+ // Don't block
+ immutable timeout = cast(immutable int) blockTime.total!"msecs";
+ auto eventCount = epoll_wait(fd, events.ptr, maxEvents, timeout);
+
+ if (eventCount < 0)
+ {
+ if (errno != EINTR)
+ {
+ throw defaultAllocator.make!BadLoopException();
+ }
+ return;
+ }
+
+ for (auto i = 0; i < eventCount; ++i)
+ {
+ auto io = cast(IOWatcher) connections[events[i].data.fd];
+
+ if (io is null)
+ {
+ acceptConnections(connections[events[i].data.fd]);
+ }
+ else if (events[i].events & EPOLLERR)
+ {
+ kill(io, null);
+ }
+ else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ SocketException exception;
+ try
+ {
+ ptrdiff_t received;
+ do
+ {
+ received = transport.socket.receive(io.output[]);
+ io.output += received;
+ }
+ while (received);
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ }
+ if (transport.socket.disconnected)
+ {
+ kill(io, exception);
+ }
+ else if (io.output.length)
+ {
+ swapPendings.insertBack(io);
+ }
+ }
+ else if (events[i].events & EPOLLOUT)
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ transport.writeReady = true;
+ if (transport.input.length)
+ {
+ feed(transport);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns: The blocking time.
+ */
+ override protected @property inout(Duration) blockTime()
+ inout @safe pure nothrow
+ {
+ return min(super.blockTime, 1.dur!"seconds");
+ }
+}
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d
new file mode 100644
index 0000000..569f974
--- /dev/null
+++ b/source/tanya/async/event/iocp.d
@@ -0,0 +1,294 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async.event.iocp;
+
+version (Windows):
+
+import tanya.container.buffer;
+import tanya.async.loop;
+import tanya.async.protocol;
+import tanya.async.transport;
+import tanya.async.watcher;
+import tanya.memory;
+import tanya.memory.mmappool;
+import tanya.network.socket;
+import core.sys.windows.basetyps;
+import core.sys.windows.mswsock;
+import core.sys.windows.winbase;
+import core.sys.windows.windef;
+import core.sys.windows.winsock2;
+
+class IOCPStreamTransport : StreamTransport
+{
+ private OverlappedConnectedSocket socket_;
+
+ private WriteBuffer input;
+
+ /**
+ * Creates new completion port transport.
+ * Params:
+ * socket = Socket.
+ */
+ this(OverlappedConnectedSocket socket)
+ in
+ {
+ assert(socket !is null);
+ }
+ body
+ {
+ socket_ = socket;
+ input = MmapPool.instance.make!WriteBuffer();
+ }
+
+ ~this()
+ {
+ MmapPool.instance.dispose(input);
+ }
+
+ @property inout(OverlappedConnectedSocket) socket() inout pure nothrow @safe @nogc
+ {
+ return socket_;
+ }
+
+ /**
+ * Write some data to the transport.
+ *
+ * Params:
+ * data = Data to send.
+ */
+ void write(ubyte[] data)
+ {
+ immutable empty = input.length == 0;
+ input ~= data;
+ if (empty)
+ {
+ SocketState overlapped;
+ try
+ {
+ overlapped = MmapPool.instance.make!SocketState;
+ socket.beginSend(input[], overlapped);
+ }
+ catch (SocketException e)
+ {
+ MmapPool.instance.dispose(overlapped);
+ MmapPool.instance.dispose(e);
+ }
+ }
+ }
+}
+
+class IOCPLoop : Loop
+{
+ protected HANDLE completionPort;
+
+ protected OVERLAPPED overlap;
+
+ /**
+ * Initializes the loop.
+ */
+ this()
+ {
+ super();
+
+ completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ if (!completionPort)
+ {
+ throw defaultAllocator.make!BadLoopException("Creating completion port failed");
+ }
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ override protected bool reify(ConnectionWatcher watcher,
+ EventMask oldEvents,
+ EventMask events)
+ {
+ SocketState overlapped;
+ if (!(oldEvents & Event.accept) && (events & Event.accept))
+ {
+ auto socket = cast(OverlappedStreamSocket) watcher.socket;
+ assert(socket !is null);
+
+ if (CreateIoCompletionPort(cast(HANDLE) socket.handle,
+ completionPort,
+ cast(ULONG_PTR) (cast(void*) watcher),
+ 0) !is completionPort)
+ {
+ return false;
+ }
+
+ try
+ {
+ overlapped = MmapPool.instance.make!SocketState;
+ socket.beginAccept(overlapped);
+ }
+ catch (SocketException e)
+ {
+ MmapPool.instance.dispose(overlapped);
+ defaultAllocator.dispose(e);
+ return false;
+ }
+ }
+ if (!(oldEvents & Event.read) && (events & Event.read)
+ || !(oldEvents & Event.write) && (events & Event.write))
+ {
+ auto io = cast(IOWatcher) watcher;
+ assert(io !is null);
+
+ auto transport = cast(IOCPStreamTransport) io.transport;
+ assert(transport !is null);
+
+ if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
+ completionPort,
+ cast(ULONG_PTR) (cast(void*) watcher),
+ 0) !is completionPort)
+ {
+ return false;
+ }
+
+ // Begin to read
+ if (!(oldEvents & Event.read) && (events & Event.read))
+ {
+ try
+ {
+ overlapped = MmapPool.instance.make!SocketState;
+ transport.socket.beginReceive(io.output[], overlapped);
+ }
+ catch (SocketException e)
+ {
+ MmapPool.instance.dispose(overlapped);
+ defaultAllocator.dispose(e);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ override protected void poll()
+ {
+ DWORD lpNumberOfBytes;
+ ULONG_PTR key;
+ LPOVERLAPPED overlap;
+ immutable timeout = cast(immutable int) blockTime.total!"msecs";
+
+ auto result = GetQueuedCompletionStatus(completionPort,
+ &lpNumberOfBytes,
+ &key,
+ &overlap,
+ timeout);
+ if (result == FALSE && overlap == NULL)
+ {
+ return; // Timeout
+ }
+
+ auto overlapped = (cast(SocketState) ((cast(void*) overlap) - 8));
+ assert(overlapped !is null);
+ scope (failure)
+ {
+ MmapPool.instance.dispose(overlapped);
+ }
+
+ switch (overlapped.event)
+ {
+ case OverlappedSocketEvent.accept:
+ auto connection = cast(ConnectionWatcher) (cast(void*) key);
+ assert(connection !is null);
+
+ auto listener = cast(OverlappedStreamSocket) connection.socket;
+ assert(listener !is null);
+
+ auto socket = listener.endAccept(overlapped);
+ auto transport = MmapPool.instance.make!IOCPStreamTransport(socket);
+ auto io = MmapPool.instance.make!IOWatcher(transport, connection.protocol);
+
+ connection.incoming.insertBack(io);
+
+ reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
+
+ swapPendings.insertBack(connection);
+ listener.beginAccept(overlapped);
+ break;
+ case OverlappedSocketEvent.read:
+ auto io = cast(IOWatcher) (cast(void*) key);
+ assert(io !is null);
+ if (!io.active)
+ {
+ MmapPool.instance.dispose(io);
+ MmapPool.instance.dispose(overlapped);
+ return;
+ }
+
+ auto transport = cast(IOCPStreamTransport) io.transport;
+ assert(transport !is null);
+
+ int received;
+ SocketException exception;
+ try
+ {
+ received = transport.socket.endReceive(overlapped);
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ }
+ if (transport.socket.disconnected)
+ {
+ // We want to get one last notification to destroy the watcher
+ transport.socket.beginReceive(io.output[], overlapped);
+ kill(io, exception);
+ }
+ else if (received > 0)
+ {
+ immutable full = io.output.free == received;
+
+ io.output += received;
+ // Receive was interrupted because the buffer is full. We have to continue
+ if (full)
+ {
+ transport.socket.beginReceive(io.output[], overlapped);
+ }
+ swapPendings.insertBack(io);
+ }
+ break;
+ case OverlappedSocketEvent.write:
+ auto io = cast(IOWatcher) (cast(void*) key);
+ assert(io !is null);
+
+ auto transport = cast(IOCPStreamTransport) io.transport;
+ assert(transport !is null);
+
+ transport.input += transport.socket.endSend(overlapped);
+ if (transport.input.length)
+ {
+ transport.socket.beginSend(transport.input[], overlapped);
+ }
+ else
+ {
+ transport.socket.beginReceive(io.output[], overlapped);
+ }
+ break;
+ default:
+ assert(false, "Unknown event");
+ }
+ }
+}
diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d
new file mode 100644
index 0000000..37d9c1c
--- /dev/null
+++ b/source/tanya/async/event/kqueue.d
@@ -0,0 +1,349 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async.event.kqueue;
+
+version (OSX)
+{
+ version = MissingKevent;
+}
+else version (iOS)
+{
+ version = MissingKevent;
+}
+else version (TVOS)
+{
+ version = MissingKevent;
+}
+else version (WatchOS)
+{
+ version = MissingKevent;
+}
+else version (OpenBSD)
+{
+ version = MissingKevent;
+}
+else version (DragonFlyBSD)
+{
+ version = MissingKevent;
+}
+
+version (MissingKevent)
+{
+ extern (C):
+ nothrow:
+ @nogc:
+
+ import core.stdc.stdint; // intptr_t, uintptr_t
+ import core.sys.posix.time; // timespec
+
+ enum : short
+ {
+ EVFILT_READ = -1,
+ EVFILT_WRITE = -2,
+ EVFILT_AIO = -3, /* attached to aio requests */
+ EVFILT_VNODE = -4, /* attached to vnodes */
+ EVFILT_PROC = -5, /* attached to struct proc */
+ EVFILT_SIGNAL = -6, /* attached to struct proc */
+ EVFILT_TIMER = -7, /* timers */
+ EVFILT_MACHPORT = -8, /* Mach portsets */
+ EVFILT_FS = -9, /* filesystem events */
+ EVFILT_USER = -10, /* User events */
+ EVFILT_VM = -12, /* virtual memory events */
+ EVFILT_SYSCOUNT = 11
+ }
+
+ extern(D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args)
+ {
+ *kevp = kevent_t(args);
+ }
+
+ struct kevent_t
+ {
+ uintptr_t ident; /* identifier for this event */
+ short filter; /* filter for event */
+ ushort flags;
+ uint fflags;
+ intptr_t data;
+ void *udata; /* opaque user data identifier */
+ }
+
+ enum
+ {
+ /* actions */
+ EV_ADD = 0x0001, /* add event to kq (implies enable) */
+ EV_DELETE = 0x0002, /* delete event from kq */
+ EV_ENABLE = 0x0004, /* enable event */
+ EV_DISABLE = 0x0008, /* disable event (not reported) */
+
+ /* flags */
+ EV_ONESHOT = 0x0010, /* only report one occurrence */
+ EV_CLEAR = 0x0020, /* clear event state after reporting */
+ EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
+ EV_DISPATCH = 0x0080, /* disable event after reporting */
+
+ EV_SYSFLAGS = 0xF000, /* reserved by system */
+ EV_FLAG1 = 0x2000, /* filter-specific flag */
+
+ /* returned values */
+ EV_EOF = 0x8000, /* EOF detected */
+ EV_ERROR = 0x4000, /* error, data contains errno */
+ }
+
+ int kqueue();
+ int kevent(int kq, const kevent_t *changelist, int nchanges,
+ kevent_t *eventlist, int nevents,
+ const timespec *timeout);
+}
+
+version (OSX)
+{
+ version = MacBSD;
+}
+else version (iOS)
+{
+ version = MacBSD;
+}
+else version (FreeBSD)
+{
+ version = MacBSD;
+ public import core.sys.freebsd.sys.event;
+}
+else version (OpenBSD)
+{
+ version = MacBSD;
+}
+else version (DragonFlyBSD)
+{
+ version = MacBSD;
+}
+
+version (MacBSD):
+
+import dlib.async.event.selector;
+import dlib.async.loop;
+import dlib.async.transport;
+import dlib.async.watcher;
+import dlib.memory;
+import dlib.memory.mmappool;
+import dlib.network.socket;
+import core.stdc.errno;
+import core.sys.posix.unistd;
+import core.sys.posix.sys.time;
+import core.time;
+import std.algorithm.comparison;
+
+class KqueueLoop : SelectorLoop
+{
+ protected int fd;
+ private kevent_t[] events;
+ private kevent_t[] changes;
+ private size_t changeCount;
+
+ /**
+ * Returns: Maximal event count can be got at a time
+ * (should be supported by the backend).
+ */
+ override protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc
+ {
+ return cast(uint) events.length;
+ }
+
+ this()
+ {
+ super();
+
+ if ((fd = kqueue()) == -1)
+ {
+ throw MmapPool.instance.make!BadLoopException("epoll initialization failed");
+ }
+ events = MmapPool.instance.makeArray!kevent_t(64);
+ changes = MmapPool.instance.makeArray!kevent_t(64);
+ }
+
+ /**
+ * Free loop internals.
+ */
+ ~this()
+ {
+ MmapPool.instance.dispose(events);
+ MmapPool.instance.dispose(changes);
+ close(fd);
+ }
+
+ private void set(socket_t socket, short filter, ushort flags)
+ {
+ if (changes.length <= changeCount)
+ {
+ MmapPool.instance.resizeArray(changes, changeCount + maxEvents);
+ }
+ EV_SET(&changes[changeCount],
+ cast(ulong) socket,
+ filter,
+ flags,
+ 0U,
+ 0L,
+ null);
+ ++changeCount;
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ override protected bool reify(ConnectionWatcher watcher,
+ EventMask oldEvents,
+ EventMask events)
+ {
+ if (events != oldEvents)
+ {
+ if (oldEvents & Event.read || oldEvents & Event.accept)
+ {
+ set(watcher.socket.handle, EVFILT_READ, EV_DELETE);
+ }
+ if (oldEvents & Event.write)
+ {
+ set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE);
+ }
+ }
+ if (events & (Event.read | events & Event.accept))
+ {
+ set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE);
+ }
+ if (events & Event.write)
+ {
+ set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH);
+ }
+ return true;
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ protected override void poll()
+ {
+ timespec ts;
+ blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec);
+
+ if (changeCount > maxEvents)
+ {
+ MmapPool.instance.resizeArray(events, changes.length);
+ }
+
+ auto eventCount = kevent(fd, changes.ptr, cast(int) changeCount, events.ptr, maxEvents, &ts);
+ changeCount = 0;
+
+ if (eventCount < 0)
+ {
+ if (errno != EINTR)
+ {
+ throw defaultAllocator.make!BadLoopException();
+ }
+ return;
+ }
+
+ for (int i; i < eventCount; ++i)
+ {
+ assert(connections.length > events[i].ident);
+
+ IOWatcher io = cast(IOWatcher) connections[events[i].ident];
+ // If it is a ConnectionWatcher. Accept connections.
+ if (io is null)
+ {
+ acceptConnections(connections[events[i].ident]);
+ }
+ else if (events[i].flags & EV_ERROR)
+ {
+ kill(io, null);
+ }
+ else if (events[i].filter == EVFILT_READ)
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ SocketException exception;
+ try
+ {
+ ptrdiff_t received;
+ do
+ {
+ received = transport.socket.receive(io.output[]);
+ io.output += received;
+ }
+ while (received);
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ }
+ if (transport.socket.disconnected)
+ {
+ kill(io, exception);
+ }
+ else if (io.output.length)
+ {
+ swapPendings.insertBack(io);
+ }
+ }
+ else if (events[i].filter == EVFILT_WRITE)
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ transport.writeReady = true;
+ if (transport.input.length)
+ {
+ feed(transport);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns: The blocking time.
+ */
+ override protected @property inout(Duration) blockTime()
+ inout @safe pure nothrow
+ {
+ return min(super.blockTime, 1.dur!"seconds");
+ }
+
+ /**
+ * If the transport couldn't send the data, the further sending should
+ * be handled by the event loop.
+ *
+ * Params:
+ * transport = Transport.
+ * exception = Exception thrown on sending.
+ *
+ * Returns: $(D_KEYWORD true) if the operation could be successfully
+ * completed or scheduled, $(D_KEYWORD false) otherwise (the
+ * transport is be destroyed then).
+ */
+ protected override bool feed(SelectorStreamTransport transport, SocketException exception = null)
+ {
+ if (!super.feed(transport, exception))
+ {
+ return false;
+ }
+ if (!transport.writeReady)
+ {
+ set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH);
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d
new file mode 100644
index 0000000..fa813d5
--- /dev/null
+++ b/source/tanya/async/event/selector.d
@@ -0,0 +1,266 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async.event.selector;
+
+version (Posix):
+
+import tanya.async.loop;
+import tanya.async.transport;
+import tanya.async.watcher;
+import tanya.container.buffer;
+import tanya.memory;
+import tanya.memory.mmappool;
+import tanya.network.socket;
+import core.sys.posix.netinet.in_;
+import core.stdc.errno;
+
+/**
+ * Transport for stream sockets.
+ */
+class SelectorStreamTransport : StreamTransport
+{
+ private ConnectedSocket socket_;
+
+ /// Input buffer.
+ package WriteBuffer input;
+
+ private SelectorLoop loop;
+
+ /// Received notification that the underlying socket is write-ready.
+ package bool writeReady;
+
+ /**
+ * Params:
+ * loop = Event loop.
+ * socket = Socket.
+ */
+ this(SelectorLoop loop, ConnectedSocket socket)
+ {
+ socket_ = socket;
+ this.loop = loop;
+ input = MmapPool.instance.make!WriteBuffer();
+ }
+
+ /**
+ * Close the transport and deallocate the data buffers.
+ */
+ ~this()
+ {
+ MmapPool.instance.dispose(input);
+ }
+
+ /**
+ * Returns: Transport socket.
+ */
+ inout(ConnectedSocket) socket() inout pure nothrow @safe @nogc
+ {
+ return socket_;
+ }
+
+ /**
+ * Write some data to the transport.
+ *
+ * Params:
+ * data = Data to send.
+ */
+ void write(ubyte[] data)
+ {
+ if (!data.length)
+ {
+ return;
+ }
+ // Try to write if the socket is write ready.
+ if (writeReady)
+ {
+ ptrdiff_t sent;
+ SocketException exception;
+ try
+ {
+ sent = socket.send(data);
+ if (sent == 0)
+ {
+ writeReady = false;
+ }
+ }
+ catch (SocketException e)
+ {
+ writeReady = false;
+ exception = e;
+ }
+ if (sent < data.length)
+ {
+ input ~= data[sent..$];
+ loop.feed(this, exception);
+ }
+ }
+ else
+ {
+ input ~= data;
+ }
+ }
+}
+
+abstract class SelectorLoop : Loop
+{
+ /// Pending connections.
+ protected ConnectionWatcher[] connections;
+
+ this()
+ {
+ super();
+ connections = MmapPool.instance.makeArray!ConnectionWatcher(maxEvents);
+ }
+
+ ~this()
+ {
+ foreach (ref connection; connections)
+ {
+ // We want to free only IOWatchers. ConnectionWatcher are created by the
+ // user and should be freed by himself.
+ auto io = cast(IOWatcher) connection;
+ if (io !is null)
+ {
+ MmapPool.instance.dispose(io);
+ connection = null;
+ }
+ }
+ MmapPool.instance.dispose(connections);
+ }
+
+ /**
+ * If the transport couldn't send the data, the further sending should
+ * be handled by the event loop.
+ *
+ * Params:
+ * transport = Transport.
+ * exception = Exception thrown on sending.
+ *
+ * Returns: $(D_KEYWORD true) if the operation could be successfully
+ * completed or scheduled, $(D_KEYWORD false) otherwise (the
+ * transport will be destroyed then).
+ */
+ protected bool feed(SelectorStreamTransport transport, SocketException exception = null)
+ {
+ while (transport.input.length && transport.writeReady)
+ {
+ try
+ {
+ ptrdiff_t sent = transport.socket.send(transport.input[]);
+ if (sent == 0)
+ {
+ transport.writeReady = false;
+ }
+ else
+ {
+ transport.input += sent;
+ }
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ transport.writeReady = false;
+ }
+ }
+ if (exception !is null)
+ {
+ auto watcher = cast(IOWatcher) connections[transport.socket.handle];
+ assert(watcher !is null);
+
+ kill(watcher, exception);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Start watching.
+ *
+ * Params:
+ * watcher = Watcher.
+ */
+ override void start(ConnectionWatcher watcher)
+ {
+ if (watcher.active)
+ {
+ return;
+ }
+
+ if (connections.length <= watcher.socket)
+ {
+ MmapPool.instance.resizeArray(connections, watcher.socket.handle + maxEvents / 2);
+ }
+ connections[watcher.socket.handle] = watcher;
+
+ super.start(watcher);
+ }
+
+ /**
+ * Accept incoming connections.
+ *
+ * Params:
+ * connection = Connection watcher ready to accept.
+ */
+ package void acceptConnections(ConnectionWatcher connection)
+ in
+ {
+ assert(connection !is null);
+ }
+ body
+ {
+ while (true)
+ {
+ ConnectedSocket client;
+ try
+ {
+ client = (cast(StreamSocket) connection.socket).accept();
+ }
+ catch (SocketException e)
+ {
+ defaultAllocator.dispose(e);
+ break;
+ }
+ if (client is null)
+ {
+ break;
+ }
+
+ IOWatcher io;
+ auto transport = MmapPool.instance.make!SelectorStreamTransport(this, client);
+
+ if (connections.length >= client.handle)
+ {
+ io = cast(IOWatcher) connections[client.handle];
+ }
+ else
+ {
+ MmapPool.instance.resizeArray(connections, client.handle + maxEvents / 2);
+ }
+ if (io is null)
+ {
+ io = MmapPool.instance.make!IOWatcher(transport,
+ connection.protocol);
+ connections[client.handle] = io;
+ }
+ else
+ {
+ io(transport, connection.protocol);
+ }
+
+ reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
+ connection.incoming.insertBack(io);
+ }
+
+ if (!connection.incoming.empty)
+ {
+ swapPendings.insertBack(connection);
+ }
+ }
+}
diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d
new file mode 100644
index 0000000..9031a83
--- /dev/null
+++ b/source/tanya/async/loop.d
@@ -0,0 +1,493 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ *
+ * ---
+ * import tanya.async;
+ * import tanya.network.socket;
+ *
+ * class EchoProtocol : TransmissionControlProtocol
+ * {
+ * private DuplexTransport transport;
+ *
+ * void received(ubyte[] data)
+ * {
+ * transport.write(data);
+ * }
+ *
+ * void connected(DuplexTransport transport)
+ * {
+ * this.transport = transport;
+ * }
+ *
+ * void disconnected(SocketException exception = null)
+ * {
+ * }
+ * }
+ *
+ * void main()
+ * {
+ * auto address = new InternetAddress("127.0.0.1", cast(ushort) 8192);
+ * version (Windows)
+ * {
+ * auto sock = new OverlappedStreamSocket(AddressFamily.INET)
+ * }
+ * else
+ * {
+ * auto sock = new StreamSocket(AddressFamily.INET)
+ * sock.blocking = false;
+ * }
+ *
+ * sock.bind(address);
+ * sock.listen(5);
+ *
+ * auto io = new ConnectionWatcher(sock);
+ * io.setProtocol!EchoProtocol;
+ *
+ * defaultLoop.start(io);
+ * defaultLoop.run();
+ *
+ * sock.shutdown();
+ * }
+ * ---
+ */
+module tanya.async.loop;
+
+import tanya.async.protocol;
+import tanya.async.transport;
+import tanya.async.watcher;
+import tanya.container.buffer;
+import tanya.memory;
+import tanya.memory.mmappool;
+import tanya.network.socket;
+import core.time;
+import std.algorithm.iteration;
+import std.algorithm.mutation;
+import std.typecons;
+
+version (DisableBackends)
+{
+}
+else version (linux)
+{
+ import tanya.async.event.epoll;
+ version = Epoll;
+}
+else version (Windows)
+{
+ import tanya.async.event.iocp;
+ version = IOCP;
+}
+else version (OSX)
+{
+ version = Kqueue;
+}
+else version (iOS)
+{
+ version = Kqueue;
+}
+else version (FreeBSD)
+{
+ version = Kqueue;
+}
+else version (OpenBSD)
+{
+ version = Kqueue;
+}
+else version (DragonFlyBSD)
+{
+ version = Kqueue;
+}
+
+/**
+ * Events.
+ */
+enum Event : uint
+{
+ none = 0x00, /// No events.
+ read = 0x01, /// Non-blocking read call.
+ write = 0x02, /// Non-blocking write call.
+ accept = 0x04, /// Connection made.
+ error = 0x80000000, /// Sent when an error occurs.
+}
+
+alias EventMask = BitFlags!Event;
+
+/**
+ * Tries to set $(D_PSYMBOL MmapPool) to the default allocator.
+ */
+shared static this()
+{
+ if (allocator is null)
+ {
+ allocator = MmapPool.instance;
+ }
+}
+
+/**
+ * Event loop.
+ */
+abstract class Loop
+{
+ /// Pending watchers.
+ protected PendingQueue!Watcher pendings;
+
+ protected PendingQueue!Watcher swapPendings;
+
+ /**
+ * Returns: Maximal event count can be got at a time
+ * (should be supported by the backend).
+ */
+ protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc
+ {
+ return 128U;
+ }
+
+ /**
+ * Initializes the loop.
+ */
+ this()
+ {
+ pendings = MmapPool.instance.make!(PendingQueue!Watcher);
+ swapPendings = MmapPool.instance.make!(PendingQueue!Watcher);
+ }
+
+ /**
+ * Frees loop internals.
+ */
+ ~this()
+ {
+ MmapPool.instance.dispose(pendings);
+ MmapPool.instance.dispose(swapPendings);
+ }
+
+ /**
+ * Starts the loop.
+ */
+ void run()
+ {
+ done_ = false;
+ do
+ {
+ poll();
+
+ // Invoke pendings
+ swapPendings.each!((ref p) => p.invoke());
+
+ swap(pendings, swapPendings);
+ }
+ while (!done_);
+ }
+
+ /**
+ * Break out of the loop.
+ */
+ void unloop() @safe pure nothrow
+ {
+ done_ = true;
+ }
+
+ /**
+ * Start watching.
+ *
+ * Params:
+ * watcher = Watcher.
+ */
+ void start(ConnectionWatcher watcher)
+ {
+ if (watcher.active)
+ {
+ return;
+ }
+ watcher.active = true;
+ reify(watcher, EventMask(Event.none), EventMask(Event.accept));
+ }
+
+ /**
+ * Stop watching.
+ *
+ * Params:
+ * watcher = Watcher.
+ */
+ void stop(ConnectionWatcher watcher)
+ {
+ if (!watcher.active)
+ {
+ return;
+ }
+ watcher.active = false;
+
+ reify(watcher, EventMask(Event.accept), EventMask(Event.none));
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ abstract protected bool reify(ConnectionWatcher watcher,
+ EventMask oldEvents,
+ EventMask events);
+
+ /**
+ * Returns: The blocking time.
+ */
+ protected @property inout(Duration) blockTime()
+ inout @safe pure nothrow
+ {
+ // Don't block if we have to do.
+ return swapPendings.empty ? blockTime_ : Duration.zero;
+ }
+
+ /**
+ * Sets the blocking time for IO watchers.
+ *
+ * Params:
+ * blockTime = The blocking time. Cannot be larger than
+ * $(D_PSYMBOL maxBlockTime).
+ */
+ protected @property void blockTime(in Duration blockTime) @safe pure nothrow
+ in
+ {
+ assert(blockTime <= 1.dur!"hours", "Too long to wait.");
+ assert(!blockTime.isNegative);
+ }
+ body
+ {
+ blockTime_ = blockTime;
+ }
+
+ /**
+ * Kills the watcher and closes the connection.
+ */
+ protected void kill(IOWatcher watcher, SocketException exception)
+ {
+ watcher.socket.shutdown();
+ defaultAllocator.dispose(watcher.socket);
+ MmapPool.instance.dispose(watcher.transport);
+ watcher.exception = exception;
+ swapPendings.insertBack(watcher);
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ abstract protected void poll();
+
+ /// Whether the event loop should be stopped.
+ private bool done_;
+
+ /// Maximal block time.
+ protected Duration blockTime_ = 1.dur!"minutes";
+}
+
+/**
+ * Exception thrown on errors in the event loop.
+ */
+class BadLoopException : Exception
+{
+@nogc:
+ /**
+ * Params:
+ * file = The file where the exception occurred.
+ * line = The line number where the exception occurred.
+ * next = The previous exception in the chain of exceptions, if any.
+ */
+ this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
+ pure @safe nothrow const
+ {
+ super("Event loop cannot be initialized.", file, line, next);
+ }
+}
+
+/**
+ * Returns the event loop used by default. If an event loop wasn't set with
+ * $(D_PSYMBOL defaultLoop) before, $(D_PSYMBOL defaultLoop) will try to
+ * choose an event loop supported on the system.
+ *
+ * Returns: The default event loop.
+ */
+@property Loop defaultLoop()
+{
+ if (defaultLoop_ !is null)
+ {
+ return defaultLoop_;
+ }
+ version (Epoll)
+ {
+ defaultLoop_ = MmapPool.instance.make!EpollLoop;
+ }
+ else version (IOCP)
+ {
+ defaultLoop_ = MmapPool.instance.make!IOCPLoop;
+ }
+ else version (Kqueue)
+ {
+ import tanya.async.event.kqueue;
+ defaultLoop_ = MmapPool.instance.make!KqueueLoop;
+ }
+ return defaultLoop_;
+}
+
+/**
+ * Sets the default event loop.
+ *
+ * This property makes it possible to implement your own backends or event
+ * loops, for example, if the system is not supported or if you want to
+ * extend the supported implementation. Just extend $(D_PSYMBOL Loop) and pass
+ * your implementation to this property.
+ *
+ * Params:
+ * loop = The event loop.
+ */
+@property void defaultLoop(Loop loop)
+in
+{
+ assert(loop !is null);
+}
+body
+{
+ defaultLoop_ = loop;
+}
+
+private Loop defaultLoop_;
+
+/**
+ * Queue.
+ *
+ * Params:
+ * T = Content type.
+ */
+class PendingQueue(T)
+{
+ /**
+ * Creates a new $(D_PSYMBOL Queue).
+ */
+ this()
+ {
+ }
+
+ /**
+ * Removes all elements from the queue.
+ */
+ ~this()
+ {
+ foreach (e; this)
+ {
+ MmapPool.instance.dispose(e);
+ }
+ }
+
+ /**
+ * Returns: First element.
+ */
+ @property ref T front()
+ in
+ {
+ assert(!empty);
+ }
+ body
+ {
+ return first.next.content;
+ }
+
+ /**
+ * Inserts a new element.
+ *
+ * Params:
+ * x = New element.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ typeof(this) insertBack(T x)
+ {
+ Entry* temp = MmapPool.instance.make!Entry;
+
+ temp.content = x;
+
+ if (empty)
+ {
+ first.next = rear = temp;
+ }
+ else
+ {
+ rear.next = temp;
+ rear = rear.next;
+ }
+
+ return this;
+ }
+
+ alias insert = insertBack;
+
+ /**
+ * Inserts a new element.
+ *
+ * Params:
+ * x = New element.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ typeof(this) opOpAssign(string Op)(ref T x)
+ if (Op == "~")
+ {
+ return insertBack(x);
+ }
+
+ /**
+ * Returns: $(D_KEYWORD true) if the queue is empty.
+ */
+ @property bool empty() const @safe pure nothrow
+ {
+ return first.next is null;
+ }
+
+ /**
+ * Move position to the next element.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ typeof(this) popFront()
+ in
+ {
+ assert(!empty);
+ }
+ body
+ {
+ auto n = first.next.next;
+
+ MmapPool.instance.dispose(first.next);
+ first.next = n;
+
+ return this;
+ }
+
+ /**
+ * Queue entry.
+ */
+ protected struct Entry
+ {
+ /// Queue item content.
+ T content;
+
+ /// Next list item.
+ Entry* next;
+ }
+
+ /// The first element of the list.
+ protected Entry first;
+
+ /// The last element of the list.
+ protected Entry* rear;
+}
diff --git a/source/tanya/event/config.d b/source/tanya/async/package.d
index 11a432a..eb82f12 100644
--- a/source/tanya/event/config.d
+++ b/source/tanya/async/package.d
@@ -1,26 +1,19 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Copyright: Eugene Wissner 2016.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
- */
-module tanya.event.config;
-
-package version (DisableBackends)
-{
-}
-else
-{
- version (linux)
- {
- enum UseEpoll = true;
- }
- else
- {
- enum UseEpoll = false;
- }
-}
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async;
+
+public
+{
+ import tanya.async.loop;
+ import tanya.async.protocol;
+ import tanya.async.transport;
+ import tanya.async.watcher;
+}
diff --git a/source/tanya/async/protocol.d b/source/tanya/async/protocol.d
new file mode 100644
index 0000000..62c0ab7
--- /dev/null
+++ b/source/tanya/async/protocol.d
@@ -0,0 +1,50 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async.protocol;
+
+import tanya.network.socket;
+import tanya.async.transport;
+
+/**
+ * Common protocol interface.
+ */
+interface Protocol
+{
+ /**
+ * Params:
+ * data = Read data.
+ */
+ void received(ubyte[] data);
+
+ /**
+ * Called when a connection is made.
+ *
+ * Params:
+ * transport = Protocol transport.
+ */
+ void connected(DuplexTransport transport);
+
+ /**
+ * Called when a connection is lost.
+ *
+ * Params:
+ * exception = $(D_PSYMBOL Exception) if an error caused
+ * the disconnect, $(D_KEYWORD null) otherwise.
+ */
+ void disconnected(SocketException exception = null);
+}
+
+/**
+ * Interface for TCP.
+ */
+interface TransmissionControlProtocol : Protocol
+{
+}
diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d
new file mode 100644
index 0000000..6156815
--- /dev/null
+++ b/source/tanya/async/transport.d
@@ -0,0 +1,63 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async.transport;
+
+import tanya.network.socket;
+
+/**
+ * Base transport interface.
+ */
+interface Transport
+{
+}
+
+/**
+ * Interface for read-only transports.
+ */
+interface ReadTransport : Transport
+{
+}
+
+/**
+ * Interface for write-only transports.
+ */
+interface WriteTransport : Transport
+{
+ /**
+ * Write some data to the transport.
+ *
+ * Params:
+ * data = Data to send.
+ */
+ void write(ubyte[] data);
+}
+
+/**
+ * Represents a bidirectional transport.
+ */
+interface DuplexTransport : ReadTransport, WriteTransport
+{
+}
+
+/**
+ * Represents a socket transport.
+ */
+interface SocketTransport : Transport
+{
+ @property inout(Socket) socket() inout pure nothrow @safe @nogc;
+}
+
+/**
+ * Represents a connection-oriented socket transport.
+ */
+package interface StreamTransport : DuplexTransport, SocketTransport
+{
+}
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d
new file mode 100644
index 0000000..08fe6df
--- /dev/null
+++ b/source/tanya/async/watcher.d
@@ -0,0 +1,242 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async.watcher;
+
+import tanya.async.loop;
+import tanya.async.protocol;
+import tanya.async.transport;
+import tanya.container.buffer;
+import tanya.memory;
+import tanya.memory.mmappool;
+import tanya.network.socket;
+import std.functional;
+import std.exception;
+
+version (Windows)
+{
+ import core.sys.windows.basetyps;
+ import core.sys.windows.mswsock;
+ import core.sys.windows.winbase;
+ import core.sys.windows.windef;
+ import core.sys.windows.winsock2;
+}
+
+/**
+ * A watcher is an opaque structure that you allocate and register to record
+ * your interest in some event.
+ */
+abstract class Watcher
+{
+ /// Whether the watcher is active.
+ bool active;
+
+ /**
+ * Invoke some action on event.
+ */
+ void invoke();
+}
+
+class ConnectionWatcher : Watcher
+{
+ /// Watched socket.
+ private Socket socket_;
+
+ /// Protocol factory.
+ protected Protocol delegate() protocolFactory;
+
+ package PendingQueue!IOWatcher incoming;
+
+ /**
+ * Params:
+ * socket = Socket.
+ */
+ this(Socket socket)
+ {
+ socket_ = socket;
+ incoming = MmapPool.instance.make!(PendingQueue!IOWatcher);
+ }
+
+ /// Ditto.
+ protected this()
+ {
+ }
+
+ ~this()
+ {
+ MmapPool.instance.dispose(incoming);
+ }
+
+ /*
+ * Params:
+ * P = Protocol should be used.
+ */
+ void setProtocol(P : Protocol)()
+ {
+ this.protocolFactory = () => cast(Protocol) MmapPool.instance.make!P;
+ }
+
+ /**
+ * Returns: Socket.
+ */
+ @property inout(Socket) socket() inout pure nothrow @nogc
+ {
+ return socket_;
+ }
+
+ /**
+ * Returns: New protocol instance.
+ */
+ @property Protocol protocol()
+ in
+ {
+ assert(protocolFactory !is null, "Protocol isn't set.");
+ }
+ body
+ {
+ return protocolFactory();
+ }
+
+ /**
+ * Invokes new connection callback.
+ */
+ override void invoke()
+ {
+ foreach (io; incoming)
+ {
+ io.protocol.connected(cast(DuplexTransport) io.transport);
+ }
+ }
+}
+
+/**
+ * Contains a pending watcher with the invoked events or a transport can be
+ * read from.
+ */
+class IOWatcher : ConnectionWatcher
+{
+ /// If an exception was thrown the transport should be already invalid.
+ private union
+ {
+ StreamTransport transport_;
+ SocketException exception_;
+ }
+
+ private Protocol protocol_;
+
+ /**
+ * Returns: Underlying output buffer.
+ */
+ package ReadBuffer output;
+
+ /**
+ * Params:
+ * transport = Transport.
+ * protocol = New instance of the application protocol.
+ */
+ this(StreamTransport transport, Protocol protocol)
+ in
+ {
+ assert(transport !is null);
+ assert(protocol !is null);
+ }
+ body
+ {
+ super();
+ transport_ = transport;
+ protocol_ = protocol;
+ output = MmapPool.instance.make!ReadBuffer();
+ active = true;
+ }
+
+ /**
+ * Destroys the watcher.
+ */
+ protected ~this()
+ {
+ MmapPool.instance.dispose(output);
+ MmapPool.instance.dispose(protocol_);
+ }
+
+ /**
+ * Assigns a transport.
+ *
+ * Params:
+ * transport = Transport.
+ * protocol = Application protocol.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ IOWatcher opCall(StreamTransport transport, Protocol protocol) pure nothrow @safe @nogc
+ in
+ {
+ assert(transport !is null);
+ assert(protocol !is null);
+ }
+ body
+ {
+ transport_ = transport;
+ protocol_ = protocol;
+ active = true;
+ return this;
+ }
+
+ /**
+ * Returns: Transport used by this watcher.
+ */
+ @property inout(StreamTransport) transport() inout pure nothrow @nogc
+ {
+ return transport_;
+ }
+
+ /**
+ * Sets an exception occurred during a read/write operation.
+ *
+ * Params:
+ * exception = Thrown exception.
+ */
+ @property void exception(SocketException exception) pure nothrow @nogc
+ {
+ exception_ = exception;
+ }
+
+ /**
+ * Returns: Application protocol.
+ */
+ override @property Protocol protocol() pure nothrow @safe @nogc
+ {
+ return protocol_;
+ }
+
+ /**
+ * Returns: Socket.
+ */
+ override @property inout(Socket) socket() inout pure nothrow @nogc
+ {
+ return transport.socket;
+ }
+
+ /**
+ * Invokes the watcher callback.
+ */
+ override void invoke()
+ {
+ if (output.length)
+ {
+ protocol.received(output[0..$]);
+ output.clear();
+ }
+ else
+ {
+ protocol.disconnected(exception_);
+ active = false;
+ }
+ }
+}
diff --git a/source/tanya/container/buffer.d b/source/tanya/container/buffer.d
index efe2e41..338c0bb 100644
--- a/source/tanya/container/buffer.d
+++ b/source/tanya/container/buffer.d
@@ -1,636 +1,700 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Copyright: Eugene Wissner 2016.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
- */
-module tanya.container.buffer;
-
-import tanya.memory;
-
-version (unittest)
-{
- private int fillBuffer(void* buffer,
- in size_t size,
- int start = 0,
- int end = 10)
- in
- {
- assert(start < end);
- }
- body
- {
- ubyte[] buf = cast(ubyte[]) buffer[0..size];
- auto numberRead = end - start;
-
- for (ubyte i; i < numberRead; ++i)
- {
- buf[i] = cast(ubyte) (start + i);
- }
- return numberRead;
- }
-}
-
-/**
- * Interface for implemeting input/output buffers.
- */
-interface Buffer
-{
- /**
- * Returns: The size of the internal buffer.
- */
- @property size_t capacity() const @safe pure nothrow;
-
- /**
- * Returns: Data size.
- */
- @property size_t length() const @safe pure nothrow;
-
- /**
- * Returns: Available space.
- */
- @property size_t free() const @safe pure nothrow;
-
- /**
- * Appends some data to the buffer.
- *
- * Params:
- * buffer = Buffer chunk got with $(D_PSYMBOL buffer).
- */
- Buffer opOpAssign(string op)(void[] buffer)
- if (op == "~");
-}
-
-/**
- * Buffer that can be used with C functions accepting void pointer and
- * returning the number of the read bytes.
- */
-class ReadBuffer : Buffer
-{
- /// Internal buffer.
- protected ubyte[] _buffer;
-
- /// Filled buffer length.
- protected size_t _length;
-
- /// Available space.
- protected immutable size_t minAvailable;
-
- /// Size by which the buffer will grow.
- protected immutable size_t blockSize;
-
- private shared Allocator allocator;
-
- invariant
- {
- assert(_length <= _buffer.length);
- assert(blockSize > 0);
- assert(minAvailable > 0);
- }
-
- /**
- * Params:
- * size = Initial buffer size and the size by which the buffer
- * will grow.
- * minAvailable = minimal size should be always available to fill.
- * So it will reallocate if $(D_INLINECODE
- * $(D_PSYMBOL free) < $(D_PARAM minAvailable)
- * ).
- */
- this(size_t size = 8192,
- size_t minAvailable = 1024,
- shared Allocator allocator = defaultAllocator)
- {
- this.allocator = allocator;
- this.minAvailable = minAvailable;
- this.blockSize = size;
- resizeArray!ubyte(this.allocator, _buffer, size);
- }
-
- /**
- * Deallocates the internal buffer.
- */
- ~this()
- {
- dispose(allocator, _buffer);
- }
-
- ///
- unittest
- {
- auto b = make!ReadBuffer(defaultAllocator);
- assert(b.capacity == 8192);
- assert(b.length == 0);
-
- dispose(defaultAllocator, b);
- }
-
- /**
- * Returns: The size of the internal buffer.
- */
- @property size_t capacity() const @safe pure nothrow
- {
- return _buffer.length;
- }
-
- /**
- * Returns: Data size.
- */
- @property size_t length() const @safe pure nothrow
- {
- return _length;
- }
-
- /**
- * Returns: Available space.
- */
- @property size_t free() const @safe pure nothrow
- {
- return capacity - length;
- }
-
- ///
- unittest
- {
- auto b = make!ReadBuffer(defaultAllocator);
- size_t numberRead;
- void* buf;
-
- // Fills the buffer with values 0..10
- assert(b.free == b.blockSize);
- buf = b.buffer;
- numberRead = fillBuffer(buf, b.free, 0, 10);
- b ~= buf[0..numberRead];
- assert(b.free == b.blockSize - numberRead);
- b[];
- assert(b.free == b.blockSize);
-
- dispose(defaultAllocator, b);
- }
-
- /**
- * Returns a pointer to a chunk of the internal buffer. You can pass it to
- * a function that requires such a buffer.
- *
- * Set the buffer again after reading something into it. Append
- * $(D_KEYWORD ~=) a slice from the beginning of the buffer you got and
- * till the number of the read bytes. The data will be appended to the
- * existing buffer.
- *
- * Returns: A chunk of available buffer.
- */
- @property void* buffer()
- {
- if (capacity - length < minAvailable)
- {
- resizeArray!ubyte(this.allocator, _buffer, capacity + blockSize);
- }
- return _buffer[_length..$].ptr;
- }
-
- /**
- * Appends some data to the buffer. Use only the buffer you got
- * with $(D_PSYMBOL buffer)!
- *
- * Params:
- * buffer = Buffer chunk got with $(D_PSYMBOL buffer).
- */
- ReadBuffer opOpAssign(string op)(void[] buffer)
- if (op == "~")
- {
- _length += buffer.length;
- return this;
- }
-
- ///
- unittest
- {
- auto b = make!ReadBuffer(defaultAllocator);
- size_t numberRead;
- void* buf;
- ubyte[] result;
-
- // Fills the buffer with values 0..10
- buf = b.buffer;
- numberRead = fillBuffer(buf, b.free, 0, 10);
- b ~= buf[0..numberRead];
-
- result = b[];
- assert(result[0] == 0);
- assert(result[1] == 1);
- assert(result[9] == 9);
-
- // It shouldn't overwrite, but append another 5 bytes to the buffer
- buf = b.buffer;
- numberRead = fillBuffer(buf, b.free, 0, 10);
- b ~= buf[0..numberRead];
-
- buf = b.buffer;
- numberRead = fillBuffer(buf, b.free, 20, 25);
- b ~= buf[0..numberRead];
-
- result = b[];
- assert(result[0] == 0);
- assert(result[1] == 1);
- assert(result[9] == 9);
- assert(result[10] == 20);
- assert(result[14] == 24);
-
- dispose(defaultAllocator, b);
- }
-
- /**
- * Returns the buffer. The buffer is cleared after that. So you can get it
- * only one time.
- *
- * Returns: The buffer as array.
- */
- @property ubyte[] opIndex()
- {
- auto ret = _buffer[0.._length];
- _length = 0;
- return ret;
- }
-
- ///
- unittest
- {
- auto b = make!ReadBuffer(defaultAllocator);
- size_t numberRead;
- void* buf;
- ubyte[] result;
-
- // Fills the buffer with values 0..10
- buf = b.buffer;
- numberRead = fillBuffer(buf, b.free, 0, 10);
- b ~= buf[0..numberRead];
-
- assert(b.length == 10);
- result = b[];
- assert(result[0] == 0);
- assert(result[9] == 9);
- assert(b.length == 0);
-
- dispose(defaultAllocator, b);
- }
-}
-
-/**
- * Circular, self-expanding buffer that can be used with C functions accepting
- * void pointer and returning the number of the read bytes.
- *
- * The buffer is optimized for situations where you read all the data from it
- * at once (without writing to it occasionally). It can become ineffective if
- * you permanently keep some data in the buffer and alternate writing and
- * reading, because it may allocate and move elements.
- */
-class WriteBuffer : Buffer
-{
- /// Internal buffer.
- protected ubyte[] _buffer;
-
- /// Buffer start position.
- protected size_t start;
-
- /// Buffer ring area size. After this position begins buffer overflow area.
- protected size_t ring;
-
- /// Size by which the buffer will grow.
- protected immutable size_t blockSize;
-
- /// The position of the free area in the buffer.
- protected size_t position;
-
- private shared Allocator allocator;
-
- invariant
- {
- assert(blockSize > 0);
- // position can refer to an element outside the buffer if the buffer is full.
- assert(position <= _buffer.length);
- }
-
- /**
- * Params:
- * size = Initial buffer size and the size by which the buffer
- * will grow.
- */
- this(size_t size = 8192,
- shared Allocator allocator = defaultAllocator)
- {
- this.allocator = allocator;
- blockSize = size;
- ring = size - 1;
- resizeArray!ubyte(this.allocator, _buffer, size);
- }
-
- /**
- * Deallocates the internal buffer.
- */
- ~this()
- {
- dispose(allocator, _buffer);
- }
-
- /**
- * Returns: The size of the internal buffer.
- */
- @property size_t capacity() const @safe pure nothrow
- {
- return _buffer.length;
- }
-
- /**
- * Note that $(D_PSYMBOL length) doesn't return the real length of the data,
- * but only the array length that will be returned with $(D_PSYMBOL buffer)
- * next time. Be sure to call $(D_PSYMBOL buffer) and set $(D_PSYMBOL written)
- * until $(D_PSYMBOL length) returns 0.
- *
- * Returns: Data size.
- */
- @property size_t length() const @safe pure nothrow
- {
- if (position > ring || position < start) // Buffer overflowed
- {
- return ring - start + 1;
- }
- else
- {
- return position - start;
- }
- }
-
- ///
- unittest
- {
- auto b = make!WriteBuffer(defaultAllocator, 4);
- ubyte[3] buf = [48, 23, 255];
-
- b ~= buf;
- assert(b.length == 3);
- b.written = 2;
- assert(b.length == 1);
-
- b ~= buf;
- assert(b.length == 2);
- b.written = 2;
- assert(b.length == 2);
-
- b ~= buf;
- assert(b.length == 5);
- b.written = b.length;
- assert(b.length == 0);
-
- dispose(defaultAllocator, b);
- }
-
- /**
- * Returns: Available space.
- */
- @property size_t free() const @safe pure nothrow
- {
- return capacity - length;
- }
-
- /**
- * Appends data to the buffer.
- *
- * Params:
- * buffer = Buffer chunk got with $(D_PSYMBOL buffer).
- */
- WriteBuffer opOpAssign(string op)(ubyte[] buffer)
- if (op == "~")
- {
- size_t end, start;
-
- if (position >= this.start && position <= ring)
- {
- auto afterRing = ring + 1;
-
- end = position + buffer.length;
- if (end > afterRing)
- {
- end = afterRing;
- }
- start = end - position;
- _buffer[position..end] = buffer[0..start];
- if (end == afterRing)
- {
- position = this.start == 0 ? afterRing : 0;
- }
- else
- {
- position = end;
- }
- }
-
- // Check if we have some free space at the beginning
- if (start < buffer.length && position < this.start)
- {
- end = position + buffer.length - start;
- if (end > this.start)
- {
- end = this.start;
- }
- auto areaEnd = end - position + start;
- _buffer[position..end] = buffer[start..areaEnd];
- position = end == this.start ? ring + 1 : end - position;
- start = areaEnd;
- }
-
- // And if we still haven't found any place, save the rest in the overflow area
- if (start < buffer.length)
- {
- end = position + buffer.length - start;
- if (end > capacity)
- {
- auto newSize = end / blockSize * blockSize + blockSize;
-
- resizeArray!ubyte(this.allocator, _buffer, newSize);
- }
- _buffer[position..end] = buffer[start..$];
- position = end;
- if (this.start == 0)
- {
- ring = capacity - 1;
- }
- }
-
- return this;
- }
-
- ///
- unittest
- {
- auto b = make!WriteBuffer(defaultAllocator, 4);
- ubyte[3] buf = [48, 23, 255];
-
- b ~= buf;
- assert(b.capacity == 4);
- assert(b._buffer[0] == 48 && b._buffer[1] == 23 && b._buffer[2] == 255);
-
- b.written = 2;
- b ~= buf;
- assert(b.capacity == 4);
- assert(b._buffer[0] == 23 && b._buffer[1] == 255
- && b._buffer[2] == 255 && b._buffer[3] == 48);
-
- b.written = 2;
- b ~= buf;
- assert(b.capacity == 8);
- assert(b._buffer[0] == 23 && b._buffer[1] == 255
- && b._buffer[2] == 48 && b._buffer[3] == 23 && b._buffer[4] == 255);
-
- dispose(defaultAllocator, b);
-
- b = make!WriteBuffer(defaultAllocator, 2);
-
- b ~= buf;
- assert(b.start == 0);
- assert(b.capacity == 4);
- assert(b.ring == 3);
- assert(b.position == 3);
-
- dispose(defaultAllocator, b);
- }
-
- /**
- * Sets how many bytes were written. It will shrink the buffer
- * appropriately. Always set this property after calling
- * $(D_PSYMBOL buffer).
- *
- * Params:
- * length = Length of the written data.
- */
- @property void written(size_t length) @safe pure nothrow
- in
- {
- assert(length <= this.length);
- }
- body
- {
- auto afterRing = ring + 1;
- auto oldStart = start;
-
- if (length <= 0)
- {
- return;
- }
- else if (position <= afterRing)
- {
- start += length;
- if (start > 0 && position == afterRing)
- {
- position = oldStart;
- }
- }
- else
- {
- auto overflow = position - afterRing;
-
- if (overflow > length) {
- _buffer[start.. start + length] = _buffer[afterRing.. afterRing + length];
- _buffer[afterRing.. afterRing + length] = _buffer[afterRing + length ..position];
- position -= length;
- }
- else if (overflow == length)
- {
- _buffer[start.. start + overflow] = _buffer[afterRing..position];
- position -= overflow;
- }
- else
- {
- _buffer[start.. start + overflow] = _buffer[afterRing..position];
- position = overflow;
- }
- start += length;
-
- if (start == position)
- {
- if (position != afterRing)
- {
- position = 0;
- }
- start = 0;
- ring = capacity - 1;
- }
- }
- if (start > ring)
- {
- start = 0;
- }
- }
-
- ///
- unittest
- {
- auto b = make!WriteBuffer(defaultAllocator);
- ubyte[6] buf = [23, 23, 255, 128, 127, 9];
-
- b ~= buf;
- assert(b.length == 6);
- b.written = 2;
- assert(b.length == 4);
- b.written = 4;
- assert(b.length == 0);
-
- dispose(defaultAllocator, b);
- }
-
- /**
- * Returns a pointer to a buffer chunk with data. You can pass it to
- * a function that requires such a buffer.
- *
- * After calling it, set $(D_PSYMBOL written) to the length could be
- * written.
- *
- * $(D_PSYMBOL buffer) may return only part of the data. You may need
- * to call it (and set $(D_PSYMBOL written) several times until
- * $(D_PSYMBOL length) is 0. If all the data can be written,
- * maximally 3 calls are required.
- *
- * Returns: A chunk of data buffer.
- */
- @property void* buffer() @safe pure nothrow
- {
- if (position > ring || position < start) // Buffer overflowed
- {
- return _buffer[start.. ring + 1].ptr;
- }
- else
- {
- return _buffer[start..position].ptr;
- }
- }
-
- ///
- unittest
- {
- auto b = make!WriteBuffer(defaultAllocator, 6);
- ubyte[6] buf = [23, 23, 255, 128, 127, 9];
- void* returnedBuf;
-
- b ~= buf;
- returnedBuf = b.buffer;
- assert(returnedBuf[0..b.length] == buf[0..6]);
- b.written = 2;
-
- returnedBuf = b.buffer;
- assert(returnedBuf[0..b.length] == buf[2..6]);
-
- b ~= buf;
- returnedBuf = b.buffer;
- assert(returnedBuf[0..b.length] == buf[2..6]);
- b.written = b.length;
-
- returnedBuf = b.buffer;
- assert(returnedBuf[0..b.length] == buf[0..6]);
- b.written = b.length;
-
- dispose(defaultAllocator, b);
- }
-}
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.container.buffer;
+
+import tanya.memory;
+
+version (unittest)
+{
+ private int fillBuffer(ubyte[] buffer,
+ in size_t size,
+ int start = 0,
+ int end = 10) @nogc pure nothrow
+ in
+ {
+ assert(start < end);
+ }
+ body
+ {
+ auto numberRead = end - start;
+ for (ubyte i; i < numberRead; ++i)
+ {
+ buffer[i] = cast(ubyte) (start + i);
+ }
+ return numberRead;
+ }
+}
+
+/**
+ * Interface for implemeting input/output buffers.
+ */
+interface Buffer
+{
+ /**
+ * Returns: The size of the internal buffer.
+ */
+ @property size_t capacity() const @nogc @safe pure nothrow;
+
+ /**
+ * Returns: Data size.
+ */
+ @property size_t length() const @nogc @safe pure nothrow;
+
+ /**
+ * Returns: Available space.
+ */
+ @property size_t free() const @nogc @safe pure nothrow;
+
+ /**
+ * Params:
+ * start = Start position.
+ * end = End position.
+ *
+ * Returns: Array between $(D_PARAM start) and $(D_PARAM end).
+ */
+ @property ubyte[] opSlice(size_t start, size_t end)
+ in
+ {
+ assert(start <= end);
+ assert(end <= length);
+ }
+
+ /**
+ * Returns: Length of available data.
+ */
+ @property size_t opDollar() const pure nothrow @safe @nogc;
+
+ /**
+ * Returns: Data chunk.
+ */
+ @property ubyte[] opIndex();
+}
+
+/**
+ * Self-expanding buffer, that can be used with functions returning the number
+ * of the read bytes.
+ *
+ * This buffer supports asynchronous reading. It means you can pass a new chunk
+ * to an asynchronous read function during you are working with already
+ * available data. But only one asynchronous call at a time is supported. Be
+ * sure to call $(D_PSYMBOL ReadBuffer.clear()) before you append the result
+ * of the pended asynchronous call.
+ */
+class ReadBuffer : Buffer
+{
+ /// Internal buffer.
+ protected ubyte[] buffer_;
+
+ /// Filled buffer length.
+ protected size_t length_;
+
+ /// Start of available data.
+ protected size_t start;
+
+ /// Last position returned with $(D_KEYWORD []).
+ protected size_t ring;
+
+ /// Available space.
+ protected immutable size_t minAvailable;
+
+ /// Size by which the buffer will grow.
+ protected immutable size_t blockSize;
+
+ invariant
+ {
+ assert(length_ <= buffer_.length);
+ assert(blockSize > 0);
+ assert(minAvailable > 0);
+ }
+
+ /**
+ * Creates a new read buffer.
+ *
+ * Params:
+ * size = Initial buffer size and the size by which the buffer
+ * will grow.
+ * minAvailable = minimal size should be always available to fill.
+ * So it will reallocate if $(D_INLINECODE
+ * $(D_PSYMBOL free) < $(D_PARAM minAvailable)
+ * ).
+ */
+ this(size_t size = 8192,
+ size_t minAvailable = 1024)
+ {
+ this.minAvailable = minAvailable;
+ this.blockSize = size;
+ defaultAllocator.resizeArray!ubyte(buffer_, size);
+ }
+
+ /**
+ * Deallocates the internal buffer.
+ */
+ ~this()
+ {
+ defaultAllocator.dispose(buffer_);
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!ReadBuffer;
+ assert(b.capacity == 8192);
+ assert(b.length == 0);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Returns: The size of the internal buffer.
+ */
+ @property size_t capacity() const @nogc @safe pure nothrow
+ {
+ return buffer_.length;
+ }
+
+ /**
+ * Returns: Data size.
+ */
+ @property size_t length() const @nogc @safe pure nothrow
+ {
+ return length_ - start;
+ }
+
+ /**
+ * Clears the buffer.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ ReadBuffer clear() pure nothrow @safe @nogc
+ {
+ start = length_ = ring;
+ return this;
+ }
+
+ /**
+ * Returns: Available space.
+ */
+ @property size_t free() const pure nothrow @safe @nogc
+ {
+ return length > ring ? capacity - length : capacity - ring;
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!ReadBuffer;
+ size_t numberRead;
+
+ // Fills the buffer with values 0..10
+ assert(b.free == b.blockSize);
+
+ numberRead = fillBuffer(b[], b.free, 0, 10);
+ b += numberRead;
+ assert(b.free == b.blockSize - numberRead);
+ b.clear();
+ assert(b.free == b.blockSize);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Appends some data to the buffer.
+ *
+ * Params:
+ * length = Number of the bytes read.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ ReadBuffer opOpAssign(string op)(size_t length)
+ if (op == "+")
+ {
+ length_ += length;
+ ring = start;
+ return this;
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!ReadBuffer;
+ size_t numberRead;
+ ubyte[] result;
+
+ // Fills the buffer with values 0..10
+ numberRead = fillBuffer(b[], b.free, 0, 10);
+ b += numberRead;
+
+ result = b[0..$];
+ assert(result[0] == 0);
+ assert(result[1] == 1);
+ assert(result[9] == 9);
+ b.clear();
+
+ // It shouldn't overwrite, but append another 5 bytes to the buffer
+ numberRead = fillBuffer(b[], b.free, 0, 10);
+ b += numberRead;
+
+ numberRead = fillBuffer(b[], b.free, 20, 25);
+ b += numberRead;
+
+ result = b[0..$];
+ assert(result[0] == 0);
+ assert(result[1] == 1);
+ assert(result[9] == 9);
+ assert(result[10] == 20);
+ assert(result[14] == 24);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Returns: Length of available data.
+ */
+ @property size_t opDollar() const pure nothrow @safe @nogc
+ {
+ return length;
+ }
+
+ /**
+ * Params:
+ * start = Start position.
+ * end = End position.
+ *
+ * Returns: Array between $(D_PARAM start) and $(D_PARAM end).
+ */
+ @property ubyte[] opSlice(size_t start, size_t end) pure nothrow @safe @nogc
+ {
+ return buffer_[this.start + start .. this.start + end];
+ }
+
+ /**
+ * Returns a free chunk of the buffer.
+ *
+ * Add ($(D_KEYWORD +=)) the number of the read bytes after using it.
+ *
+ * Returns: A free chunk of the buffer.
+ */
+ ubyte[] opIndex()
+ {
+ if (start > 0)
+ {
+ auto ret = buffer_[0..start];
+ ring = 0;
+ return ret;
+ }
+ else
+ {
+ if (capacity - length < minAvailable)
+ {
+ defaultAllocator.resizeArray!ubyte(buffer_, capacity + blockSize);
+ }
+ ring = length_;
+ return buffer_[length_..$];
+ }
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!ReadBuffer;
+ size_t numberRead;
+ ubyte[] result;
+
+ // Fills the buffer with values 0..10
+ numberRead = fillBuffer(b[], b.free, 0, 10);
+ b += numberRead;
+
+ assert(b.length == 10);
+ result = b[0..$];
+ assert(result[0] == 0);
+ assert(result[9] == 9);
+ b.clear();
+ assert(b.length == 0);
+
+ defaultAllocator.dispose(b);
+ }
+}
+
+/**
+ * Circular, self-expanding buffer with overflow support. Can be used with
+ * functions returning returning the number of the transferred bytes.
+ *
+ * The buffer is optimized for situations where you read all the data from it
+ * at once (without writing to it occasionally). It can become ineffective if
+ * you permanently keep some data in the buffer and alternate writing and
+ * reading, because it may allocate and move elements.
+ */
+class WriteBuffer : Buffer
+{
+ /// Internal buffer.
+ protected ubyte[] buffer_;
+
+ /// Buffer start position.
+ protected size_t start;
+
+ /// Buffer ring area size. After this position begins buffer overflow area.
+ protected size_t ring;
+
+ /// Size by which the buffer will grow.
+ protected immutable size_t blockSize;
+
+ /// The position of the free area in the buffer.
+ protected size_t position;
+
+ invariant
+ {
+ assert(blockSize > 0);
+ // position can refer to an element outside the buffer if the buffer is full.
+ assert(position <= buffer_.length);
+ }
+
+ /**
+ * Params:
+ * size = Initial buffer size and the size by which the buffer
+ * will grow.
+ */
+ this(size_t size = 8192)
+ {
+ blockSize = size;
+ ring = size - 1;
+ defaultAllocator.resizeArray!ubyte(buffer_, size);
+ }
+
+ /**
+ * Deallocates the internal buffer.
+ */
+ ~this()
+ {
+ defaultAllocator.dispose(buffer_);
+ }
+
+ /**
+ * Returns: The size of the internal buffer.
+ */
+ @property size_t capacity() const @nogc @safe pure nothrow
+ {
+ return buffer_.length;
+ }
+
+ /**
+ * Note that $(D_PSYMBOL length) doesn't return the real length of the data,
+ * but only the array length that will be returned with $(D_PSYMBOL buffer)
+ * next time. Be sure to call $(D_PSYMBOL buffer) and set $(D_KEYWORD +=)
+ * until $(D_PSYMBOL length) returns 0.
+ *
+ * Returns: Data size.
+ */
+ @property size_t length() const @nogc @safe pure nothrow
+ {
+ if (position > ring || position < start) // Buffer overflowed
+ {
+ return ring - start + 1;
+ }
+ else
+ {
+ return position - start;
+ }
+ }
+
+ /**
+ * Returns: Length of available data.
+ */
+ @property size_t opDollar() const pure nothrow @safe @nogc
+ {
+ return length;
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!WriteBuffer(4);
+ ubyte[3] buf = [48, 23, 255];
+
+ b ~= buf;
+ assert(b.length == 3);
+ b += 2;
+ assert(b.length == 1);
+
+ b ~= buf;
+ assert(b.length == 2);
+ b += 2;
+ assert(b.length == 2);
+
+ b ~= buf;
+ assert(b.length == 5);
+ b += b.length;
+ assert(b.length == 0);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Returns: Available space.
+ */
+ @property size_t free() const @nogc @safe pure nothrow
+ {
+ return capacity - length;
+ }
+
+ /**
+ * Appends data to the buffer.
+ *
+ * Params:
+ * buffer = Buffer chunk got with $(D_PSYMBOL buffer).
+ */
+ WriteBuffer opOpAssign(string op)(ubyte[] buffer)
+ if (op == "~")
+ {
+ size_t end, start;
+
+ if (position >= this.start && position <= ring)
+ {
+ auto afterRing = ring + 1;
+
+ end = position + buffer.length;
+ if (end > afterRing)
+ {
+ end = afterRing;
+ }
+ start = end - position;
+ buffer_[position..end] = buffer[0..start];
+ if (end == afterRing)
+ {
+ position = this.start == 0 ? afterRing : 0;
+ }
+ else
+ {
+ position = end;
+ }
+ }
+
+ // Check if we have some free space at the beginning
+ if (start < buffer.length && position < this.start)
+ {
+ end = position + buffer.length - start;
+ if (end > this.start)
+ {
+ end = this.start;
+ }
+ auto areaEnd = end - position + start;
+ buffer_[position..end] = buffer[start..areaEnd];
+ position = end == this.start ? ring + 1 : end - position;
+ start = areaEnd;
+ }
+
+ // And if we still haven't found any place, save the rest in the overflow area
+ if (start < buffer.length)
+ {
+ end = position + buffer.length - start;
+ if (end > capacity)
+ {
+ auto newSize = end / blockSize * blockSize + blockSize;
+
+ defaultAllocator.resizeArray!ubyte(buffer_, newSize);
+ }
+ buffer_[position..end] = buffer[start..$];
+ position = end;
+ if (this.start == 0)
+ {
+ ring = capacity - 1;
+ }
+ }
+
+ return this;
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!WriteBuffer(4);
+ ubyte[3] buf = [48, 23, 255];
+
+ b ~= buf;
+ assert(b.capacity == 4);
+ assert(b.buffer_[0] == 48 && b.buffer_[1] == 23 && b.buffer_[2] == 255);
+
+ b += 2;
+ b ~= buf;
+ assert(b.capacity == 4);
+ assert(b.buffer_[0] == 23 && b.buffer_[1] == 255
+ && b.buffer_[2] == 255 && b.buffer_[3] == 48);
+
+ b += 2;
+ b ~= buf;
+ assert(b.capacity == 8);
+ assert(b.buffer_[0] == 23 && b.buffer_[1] == 255
+ && b.buffer_[2] == 48 && b.buffer_[3] == 23 && b.buffer_[4] == 255);
+
+ defaultAllocator.dispose(b);
+
+ b = make!WriteBuffer(defaultAllocator, 2);
+
+ b ~= buf;
+ assert(b.start == 0);
+ assert(b.capacity == 4);
+ assert(b.ring == 3);
+ assert(b.position == 3);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Sets how many bytes were written. It will shrink the buffer
+ * appropriately. Always set this property after calling
+ * $(D_PSYMBOL buffer).
+ *
+ * Params:
+ * length = Length of the written data.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ @property WriteBuffer opOpAssign(string op)(size_t length) pure nothrow @safe @nogc
+ if (op == "+")
+ in
+ {
+ assert(length <= this.length);
+ }
+ body
+ {
+ auto afterRing = ring + 1;
+ auto oldStart = start;
+
+ if (length <= 0)
+ {
+ return this;
+ }
+ else if (position <= afterRing)
+ {
+ start += length;
+ if (start > 0 && position == afterRing)
+ {
+ position = oldStart;
+ }
+ }
+ else
+ {
+ auto overflow = position - afterRing;
+
+ if (overflow > length) {
+ buffer_[start.. start + length] = buffer_[afterRing.. afterRing + length];
+ buffer_[afterRing.. afterRing + length] = buffer_[afterRing + length ..position];
+ position -= length;
+ }
+ else if (overflow == length)
+ {
+ buffer_[start.. start + overflow] = buffer_[afterRing..position];
+ position -= overflow;
+ }
+ else
+ {
+ buffer_[start.. start + overflow] = buffer_[afterRing..position];
+ position = overflow;
+ }
+ start += length;
+
+ if (start == position)
+ {
+ if (position != afterRing)
+ {
+ position = 0;
+ }
+ start = 0;
+ ring = capacity - 1;
+ }
+ }
+ if (start > ring)
+ {
+ start = 0;
+ }
+ return this;
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!WriteBuffer;
+ ubyte[6] buf = [23, 23, 255, 128, 127, 9];
+
+ b ~= buf;
+ assert(b.length == 6);
+ b += 2;
+ assert(b.length == 4);
+ b += 4;
+ assert(b.length == 0);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Returns a chunk with data.
+ *
+ * After calling it, set $(D_KEYWORD +=) to the length could be
+ * written.
+ *
+ * $(D_PSYMBOL buffer) may return only part of the data. You may need
+ * to call it (and set $(D_KEYWORD +=) several times until
+ * $(D_PSYMBOL length) is 0. If all the data can be written,
+ * maximally 3 calls are required.
+ *
+ * Returns: A chunk of data buffer.
+ */
+ @property ubyte[] opSlice(size_t start, size_t end) pure nothrow @safe @nogc
+ {
+ immutable internStart = this.start + start;
+
+ if (position > ring || position < start) // Buffer overflowed
+ {
+ return buffer_[this.start.. ring + 1 - length + end];
+ }
+ else
+ {
+ return buffer_[this.start.. this.start + end];
+ }
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!WriteBuffer(6);
+ ubyte[6] buf = [23, 23, 255, 128, 127, 9];
+
+ b ~= buf;
+ assert(b[0..$] == buf[0..6]);
+ b += 2;
+
+ assert(b[0..$] == buf[2..6]);
+
+ b ~= buf;
+ assert(b[0..$] == buf[2..6]);
+ b += b.length;
+
+ assert(b[0..$] == buf[0..6]);
+ b += b.length;
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * After calling it, set $(D_KEYWORD +=) to the length could be
+ * written.
+ *
+ * $(D_PSYMBOL buffer) may return only part of the data. You may need
+ * to call it (and set $(D_KEYWORD +=) several times until
+ * $(D_PSYMBOL length) is 0. If all the data can be written,
+ * maximally 3 calls are required.
+ *
+ * Returns: A chunk of data buffer.
+ */
+ @property ubyte[] opIndex() pure nothrow @safe @nogc
+ {
+ return opSlice(0, length);
+ }
+}
diff --git a/source/tanya/event/internal/epoll.d b/source/tanya/event/internal/epoll.d
deleted file mode 100644
index a129661..0000000
--- a/source/tanya/event/internal/epoll.d
+++ /dev/null
@@ -1,223 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Copyright: Eugene Wissner 2016.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
- */
-module tanya.event.internal.epoll;
-
-import tanya.event.config;
-
-static if (UseEpoll):
-
-public import core.sys.linux.epoll;
-import tanya.event.internal.selector;
-import tanya.event.protocol;
-import tanya.event.transport;
-import tanya.event.watcher;
-import tanya.event.loop;
-import tanya.container.list;
-import tanya.memory;
-import core.stdc.errno;
-import core.sys.posix.fcntl;
-import core.sys.posix.netinet.in_;
-import core.time;
-import std.algorithm.comparison;
-
-extern (C) nothrow
-{ // TODO: Make a pull request for Phobos to mark this extern functions as @nogc.
- int epoll_create1(int __flags);
- int epoll_ctl(int __epfd, int __op, int __fd, epoll_event *__event);
- int epoll_wait(int __epfd, epoll_event *__events, int __maxevents, int __timeout);
- int accept4(int, sockaddr*, socklen_t*, int flags);
-}
-
-private enum maxEvents = 128;
-
-class EpollLoop : Loop
-{
- /**
- * Initializes the loop.
- */
- this()
- {
- super();
-
- if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
- {
- return;
- }
- epollEvents = makeArray!epoll_event(defaultAllocator, maxEvents).ptr;
- }
-
- /**
- * Frees loop internals.
- */
- ~this()
- {
- dispose(defaultAllocator, epollEvents);
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * socket = Socket.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- protected override bool modify(int socket, EventMask oldEvents, EventMask events)
- {
- int op = EPOLL_CTL_DEL;
- epoll_event ev;
-
- if (events == oldEvents)
- {
- return true;
- }
- if (events && oldEvents)
- {
- op = EPOLL_CTL_MOD;
- }
- else if (events && !oldEvents)
- {
- op = EPOLL_CTL_ADD;
- }
-
- ev.data.fd = socket;
- ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
- | (events & Event.write ? EPOLLOUT : 0)
- | EPOLLET;
-
- return epoll_ctl(fd, op, socket, &ev) == 0;
- }
-
- /**
- * Accept incoming connections.
- *
- * Params:
- * protocolFactory = Protocol factory.
- * socket = Socket.
- */
- protected override void acceptConnection(Protocol delegate() protocolFactory,
- int socket)
- {
- sockaddr_in client_addr;
- socklen_t client_len = client_addr.sizeof;
- int client = accept4(socket,
- cast(sockaddr *)&client_addr,
- &client_len,
- O_NONBLOCK);
- while (client >= 0)
- {
- auto transport = make!SocketTransport(defaultAllocator, this, client);
- IOWatcher connection;
-
- if (connections.length > client)
- {
- connection = cast(IOWatcher) connections[client];
- // If it is a ConnectionWatcher
- if (connection is null && connections[client] !is null)
- {
- dispose(defaultAllocator, connections[client]);
- connections[client] = null;
- }
- }
- if (connection !is null)
- {
- connection(protocolFactory, transport);
- }
- else
- {
- connections[client] = make!IOWatcher(defaultAllocator,
- protocolFactory,
- transport);
- }
-
- modify(client, EventMask(Event.none), EventMask(Event.read, Event.write));
-
- swapPendings.insertBack(connections[client]);
-
- client = accept4(socket,
- cast(sockaddr *)&client_addr,
- &client_len,
- O_NONBLOCK);
- }
- }
-
- /**
- * Does the actual polling.
- */
- protected override void poll()
- {
- // Don't block
- immutable timeout = cast(immutable int) blockTime.total!"msecs";
- auto eventCount = epoll_wait(fd, epollEvents, maxEvents, timeout);
-
- if (eventCount < 0)
- {
- if (errno != EINTR)
- {
- throw make!BadLoopException(defaultAllocator);
- }
-
- return;
- }
-
- for (auto i = 0; i < eventCount; ++i)
- {
- epoll_event *ev = epollEvents + i;
- auto connection = cast(IOWatcher) connections[ev.data.fd];
-
- if (connection is null)
- {
- swapPendings.insertBack(connections[ev.data.fd]);
-// acceptConnection(connections[ev.data.fd].protocol,
-// connections[ev.data.fd].socket);
- }
- else
- {
- auto transport = cast(SocketTransport) connection.transport;
- assert(transport !is null);
-
- if (ev.events & (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP))
- {
- try
- {
- while (!transport.receive())
- {
- }
- swapPendings.insertBack(connection);
- }
- catch (TransportException e)
- {
- swapPendings.insertBack(connection);
- dispose(defaultAllocator, e);
- }
- }
- else if (ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP))
- {
- transport.writeReady = true;
- }
- }
- }
- }
-
- /**
- * Returns: The blocking time.
- */
- override protected @property inout(Duration) blockTime()
- inout @safe pure nothrow
- {
- return min(super.blockTime, 1.dur!"seconds");
- }
-
- private int fd;
- private epoll_event* epollEvents;
-}
diff --git a/source/tanya/event/internal/selector.d b/source/tanya/event/internal/selector.d
deleted file mode 100644
index 95bf5d8..0000000
--- a/source/tanya/event/internal/selector.d
+++ /dev/null
@@ -1,216 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Copyright: Eugene Wissner 2016.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
- */
-module tanya.event.internal.selector;
-
-import tanya.memory;
-import tanya.container.buffer;
-import tanya.event.loop;
-import tanya.event.protocol;
-import tanya.event.transport;
-import core.stdc.errno;
-import core.sys.posix.netinet.in_;
-import core.sys.posix.unistd;
-
-/**
- * Transport for stream sockets.
- */
-class SocketTransport : DuplexTransport
-{
- private int socket_ = -1;
-
- private Protocol protocol_;
-
- /// Input buffer.
- private WriteBuffer input_;
-
- /// Output buffer.
- private ReadBuffer output_;
-
- private Loop loop;
-
- private bool disconnected_;
-
- package bool writeReady;
-
- /**
- * Params:
- * loop = Event loop.
- * socket = Socket.
- * protocol = Protocol.
- */
- this(Loop loop, int socket, Protocol protocol = null)
- {
- socket_ = socket;
- protocol_ = protocol;
- this.loop = loop;
- input_ = make!WriteBuffer(defaultAllocator);
- output_ = make!ReadBuffer(defaultAllocator);
- }
-
- /**
- * Close the transport and deallocate the data buffers.
- */
- ~this()
- {
- close(socket);
- dispose(defaultAllocator, input_);
- dispose(defaultAllocator, output_);
- dispose(defaultAllocator, protocol_);
- }
-
- /**
- * Returns: Transport socket.
- */
- int socket() const @safe pure nothrow
- {
- return socket_;
- }
-
- /**
- * Returns: Protocol.
- */
- @property Protocol protocol() @safe pure nothrow
- {
- return protocol_;
- }
-
- /**
- * Returns: $(D_KEYWORD true) if the remote peer closed the connection,
- * $(D_KEYWORD false) otherwise.
- */
- @property immutable(bool) disconnected() const @safe pure nothrow
- {
- return disconnected_;
- }
-
- /**
- * Params:
- * protocol = Application protocol.
- */
- @property void protocol(Protocol protocol) @safe pure nothrow
- {
- protocol_ = protocol;
- }
-
- /**
- * Returns: Application protocol.
- */
- @property inout(Protocol) protocol() inout @safe pure nothrow
- {
- return protocol_;
- }
-
- /**
- * Write some data to the transport.
- *
- * Params:
- * data = Data to send.
- */
- void write(ubyte[] data)
- {
- // If the buffer wasn't empty the transport should be already there.
- if (!input.length && data.length)
- {
- loop.feed(this);
- }
- input ~= data;
- }
-
- /**
- * Returns: Input buffer.
- */
- @property WriteBuffer input() @safe pure nothrow
- {
- return input_;
- }
-
- /**
- * Returns: Output buffer.
- */
- @property ReadBuffer output() @safe pure nothrow
- {
- return output_;
- }
-
- /**
- * Read data from the socket. Returns $(D_KEYWORD true) if the reading
- * is completed. In the case that the peer closed the connection, returns
- * $(D_KEYWORD true) aswell.
- *
- * Returns: Whether the reading is completed.
- *
- * Throws: $(D_PSYMBOL TransportException) if a read error is occured.
- */
- bool receive()
- {
- auto readCount = recv(socket, output.buffer, output.free, 0);
-
- if (readCount > 0)
- {
- output_ ~= output.buffer[0..readCount];
- return false;
- }
- else if (readCount == 0)
- {
- disconnected_ = true;
- return true;
- }
- else if (errno == EAGAIN || errno == EWOULDBLOCK)
- {
- return true;
- }
- else
- {
- disconnected_ = true;
- throw make!TransportException(defaultAllocator,
- "Read from the socket failed.");
- }
- }
-
- /**
- * Returns: Whether the writing is completed.
- *
- * Throws: $(D_PSYMBOL TransportException) if a read error is occured.
- */
- bool send()
- {
- auto sentCount = core.sys.posix.netinet.in_.send(socket,
- input.buffer,
- input.length,
- 0);
-
- input.written = sentCount;
- if (input.length == 0)
- {
- return true;
- }
- else if (sentCount >= 0)
- {
- loop.feed(this);
-
- return false;
- }
- else if (errno == EAGAIN || errno == EWOULDBLOCK)
- {
- writeReady = false;
- loop.feed(this);
-
- return false;
- }
- else
- {
- disconnected_ = true;
- loop.feed(this);
- throw make!TransportException(defaultAllocator,
- "Write to the socket failed.");
- }
- }
-}
diff --git a/source/tanya/event/loop.d b/source/tanya/event/loop.d
deleted file mode 100644
index 035c0b6..0000000
--- a/source/tanya/event/loop.d
+++ /dev/null
@@ -1,275 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Copyright: Eugene Wissner 2016.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
- */
-module tanya.event.loop;
-
-import tanya.memory;
-import tanya.container.queue;
-import tanya.container.vector;
-import tanya.event.config;
-import tanya.event.protocol;
-import tanya.event.transport;
-import tanya.event.watcher;
-import tanya.container.buffer;
-import core.thread;
-import core.time;
-import std.algorithm.iteration;
-import std.algorithm.mutation;
-import std.typecons;
-
-static if (UseEpoll)
-{
- import tanya.event.internal.epoll;
-}
-
-/**
- * Events.
- */
-enum Event : uint
-{
- none = 0x00, /// No events.
- read = 0x01, /// Non-blocking read call.
- write = 0x02, /// Non-blocking write call.
- accept = 0x04, /// Connection made.
-}
-
-alias EventMask = BitFlags!Event;
-
-/**
- * Event loop.
- */
-abstract class Loop
-{
- /// Pending watchers.
- protected Queue!Watcher pendings;
-
- protected Queue!Watcher swapPendings;
-
- /// Pending connections.
- protected Vector!ConnectionWatcher connections;
-
- /**
- * Initializes the loop.
- */
- this()
- {
- connections = make!(Vector!ConnectionWatcher)(defaultAllocator);
- pendings = make!(Queue!Watcher)(defaultAllocator);
- swapPendings = make!(Queue!Watcher)(defaultAllocator);
- }
-
- /**
- * Frees loop internals.
- */
- ~this()
- {
- dispose(defaultAllocator, connections);
- dispose(defaultAllocator, pendings);
- dispose(defaultAllocator, swapPendings);
- }
-
- /**
- * Starts the loop.
- */
- void run()
- {
- done_ = false;
- do
- {
- poll();
-
- // Invoke pendings
- swapPendings.each!((ref p) => p.invoke());
-
- swap(pendings, swapPendings);
- }
- while (!done_);
- }
-
- /**
- * Break out of the loop.
- */
- void unloop() @safe pure nothrow
- {
- done_ = true;
- }
-
- /**
- * Start watching.
- *
- * Params:
- * watcher = Watcher.
- */
- void start(ConnectionWatcher watcher)
- {
- if (watcher.active)
- {
- return;
- }
- watcher.active = true;
- watcher.accept = &acceptConnection;
- connections[watcher.socket] = watcher;
-
- modify(watcher.socket, EventMask(Event.none), EventMask(Event.accept));
- }
-
- /**
- * Stop watching.
- *
- * Params:
- * watcher = Watcher.
- */
- void stop(ConnectionWatcher watcher)
- {
- if (!watcher.active)
- {
- return;
- }
- watcher.active = false;
-
- modify(watcher.socket, EventMask(Event.accept), EventMask(Event.none));
- }
-
- /**
- * Feeds the given event set into the event loop, as if the specified event
- * had happened for the specified watcher.
- *
- * Params:
- * transport = Affected transport.
- */
- void feed(DuplexTransport transport)
- {
- pendings.insertBack(connections[transport.socket]);
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * socket = Socket.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- protected bool modify(int socket, EventMask oldEvents, EventMask events);
-
- /**
- * Returns: The blocking time.
- */
- protected @property inout(Duration) blockTime()
- inout @safe pure nothrow
- {
- // Don't block if we have to do.
- return swapPendings.empty ? blockTime_ : Duration.zero;
- }
-
- /**
- * Sets the blocking time for IO watchers.
- *
- * Params:
- * blockTime = The blocking time. Cannot be larger than
- * $(D_PSYMBOL maxBlockTime).
- */
- protected @property void blockTime(in Duration blockTime) @safe pure nothrow
- in
- {
- assert(blockTime <= 1.dur!"hours", "Too long to wait.");
- assert(!blockTime.isNegative);
- }
- body
- {
- blockTime_ = blockTime;
- }
-
- /**
- * Does the actual polling.
- */
- protected void poll();
-
- /**
- * Accept incoming connections.
- *
- * Params:
- * protocolFactory = Protocol factory.
- * socket = Socket.
- */
- protected void acceptConnection(Protocol delegate() protocolFactory,
- int socket);
-
- /// Whether the event loop should be stopped.
- private bool done_;
-
- /// Maximal block time.
- protected Duration blockTime_ = 1.dur!"minutes";
-}
-
-/**
- * Exception thrown on errors in the event loop.
- */
-class BadLoopException : Exception
-{
- /**
- * Params:
- * file = The file where the exception occurred.
- * line = The line number where the exception occurred.
- * next = The previous exception in the chain of exceptions, if any.
- */
- this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
- pure @safe nothrow const @nogc
- {
- super("Event loop cannot be initialized.", file, line, next);
- }
-}
-
-/**
- * Returns the event loop used by default. If an event loop wasn't set with
- * $(D_PSYMBOL defaultLoop) before, $(D_PSYMBOL getDefaultLoop()) will try to
- * choose an event loop supported on the system.
- *
- * Returns: The default event loop.
- */
-Loop getDefaultLoop()
-{
- if (_defaultLoop !is null)
- {
- return _defaultLoop;
- }
-
- static if (UseEpoll)
- {
- _defaultLoop = make!EpollLoop(defaultAllocator);
- }
-
- return _defaultLoop;
-}
-
-/**
- * Sets the default event loop.
- *
- * This property makes it possible to implement your own backends or event
- * loops, for example, if the system is not supported or if you want to
- * extend the supported implementation. Just extend $(D_PSYMBOL Loop) and pass
- * your implementation to this property.
- *
- * Params:
- * loop = The event loop.
- */
-@property void defaultLoop(Loop loop)
-in
-{
- assert(loop !is null);
-}
-body
-{
- _defaultLoop = loop;
-}
-
-private Loop _defaultLoop;
diff --git a/source/tanya/event/package.d b/source/tanya/event/package.d
deleted file mode 100644
index 9dcaeeb..0000000
--- a/source/tanya/event/package.d
+++ /dev/null
@@ -1,16 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Copyright: Eugene Wissner 2016.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
- */
-module tanya.event;
-
-public import tanya.event.loop;
-public import tanya.event.protocol;
-public import tanya.event.transport;
-public import tanya.event.watcher;
diff --git a/source/tanya/event/protocol.d b/source/tanya/event/protocol.d
deleted file mode 100644
index 02c2f35..0000000
--- a/source/tanya/event/protocol.d
+++ /dev/null
@@ -1,46 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Copyright: Eugene Wissner 2016.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
- */
-module tanya.event.protocol;
-
-import tanya.event.transport;
-
-/**
- * Common protocol interface.
- */
-interface Protocol
-{
-@nogc:
- /**
- * Params:
- * data = Read data.
- */
- void received(ubyte[] data);
-
- /**
- * Called when a connection is made.
- *
- * Params:
- * transport = Protocol transport.
- */
- void connected(DuplexTransport transport);
-
- /**
- * Called when a connection is lost.
- */
- void disconnected();
-}
-
-/**
- * Interface for TCP.
- */
-interface TransmissionControlProtocol : Protocol
-{
-}
diff --git a/source/tanya/event/transport.d b/source/tanya/event/transport.d
deleted file mode 100644
index e1dc6f7..0000000
--- a/source/tanya/event/transport.d
+++ /dev/null
@@ -1,134 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Copyright: Eugene Wissner 2016.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
- */
-module tanya.event.transport;
-
-import tanya.container.buffer;
-import tanya.event.protocol;
-
-/**
- * Exception thrown on read/write errors.
- */
-class TransportException : Exception
-{
- /**
- * Params:
- * msg = Message to output.
- * file = The file where the exception occurred.
- * line = The line number where the exception occurred.
- * next = The previous exception in the chain of exceptions, if any.
- */
- this(string msg,
- string file = __FILE__,
- size_t line = __LINE__,
- Throwable next = null) pure @safe nothrow const @nogc
- {
- super(msg, file, line, next);
- }
-}
-
-/**
- * Base transport interface.
- */
-interface Transport
-{
- /**
- * Returns: Protocol.
- */
- @property Protocol protocol() @safe pure nothrow;
-
- /**
- * Returns: $(D_KEYWORD true) if the peer closed the connection,
- * $(D_KEYWORD false) otherwise.
- */
- @property immutable(bool) disconnected() const @safe pure nothrow;
-
- /**
- * Params:
- * protocol = Application protocol.
- */
- @property void protocol(Protocol protocol) @safe pure nothrow
- in
- {
- assert(protocol !is null, "protocolConnected cannot be unset.");
- }
-
- /**
- * Returns: Application protocol.
- */
- @property inout(Protocol) protocol() inout @safe pure nothrow;
-
- /**
- * Returns: Transport socket.
- */
- int socket() const @safe pure nothrow;
-}
-
-/**
- * Interface for read-only transports.
- */
-interface ReadTransport : Transport
-{
- /**
- * Returns: Underlying output buffer.
- */
- @property ReadBuffer output();
-
- /**
- * Reads data into the buffer.
- *
- * Returns: Whether the reading is completed.
- *
- * Throws: $(D_PSYMBOL TransportException) if a read error is occured.
- */
- bool receive()
- in
- {
- assert(!disconnected);
- }
-}
-
-/**
- * Interface for write-only transports.
- */
-interface WriteTransport : Transport
-{
- /**
- * Returns: Underlying input buffer.
- */
- @property WriteBuffer input();
-
- /**
- * Write some data to the transport.
- *
- * Params:
- * data = Data to send.
- */
- void write(ubyte[] data);
-
- /**
- * Returns: Whether the writing is completed.
- *
- * Throws: $(D_PSYMBOL TransportException) if a read error is occured.
- */
- bool send()
- in
- {
- assert(input.length);
- assert(!disconnected);
- }
-}
-
-/**
- * Represents a bidirectional transport.
- */
-abstract class DuplexTransport : ReadTransport, WriteTransport
-{
-}
diff --git a/source/tanya/event/watcher.d b/source/tanya/event/watcher.d
deleted file mode 100644
index b9880eb..0000000
--- a/source/tanya/event/watcher.d
+++ /dev/null
@@ -1,207 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-/**
- * Copyright: Eugene Wissner 2016.
- * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
- * Mozilla Public License, v. 2.0).
- * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
- */
-module tanya.event.watcher;
-
-import tanya.event.protocol;
-import tanya.event.transport;
-import tanya.memory;
-import std.functional;
-
-/**
- * A watcher is an opaque structure that you allocate and register to record
- * your interest in some event.
- */
-abstract class Watcher
-{
- /// Whether the watcher is active.
- bool active;
-
- /**
- * Invoke some action on event.
- */
- void invoke();
-}
-
-class ConnectionWatcher : Watcher
-{
- /// Watched file descriptor.
- private int socket_;
-
- /// Protocol factory.
- protected Protocol delegate() protocolFactory;
-
- /// Callback.
- package void delegate(Protocol delegate() protocolFactory,
- int socket) accept;
-
- invariant
- {
- assert(socket_ >= 0, "Called with negative file descriptor.");
- }
-
- /**
- * Params:
- * protocolFactory = Function returning a new $(D_PSYMBOL Protocol) instance.
- * socket = Socket.
- */
- this(Protocol function() protocolFactory, int socket)
- {
- this.protocolFactory = toDelegate(protocolFactory);
- socket_ = socket;
- }
-
- /// Ditto.
- this(Protocol delegate() protocolFactory, int socket)
- {
- this.protocolFactory = protocolFactory;
- socket_ = socket;
- }
-
- /// Ditto.
- protected this(Protocol function() protocolFactory)
- {
- this.protocolFactory = toDelegate(protocolFactory);
- }
-
- /// Ditto.
- protected this(Protocol delegate() protocolFactory)
- {
- this.protocolFactory = protocolFactory;
- }
-
- /**
- * Returns: Socket.
- */
- @property inout(int) socket() inout @safe pure nothrow
- {
- return socket_;
- }
-
- /**
- * Returns: Application protocol factory.
- */
- @property inout(Protocol delegate()) protocol() inout
- {
- return protocolFactory;
- }
-
- override void invoke()
- {
- accept(protocol, socket);
- }
-}
-
-/**
- * Contains a pending watcher with the invoked events or a transport can be
- * read from.
- */
-class IOWatcher : ConnectionWatcher
-{
- /// References a watcher or a transport.
- DuplexTransport transport_;
-
- /**
- * Params:
- * protocolFactory = Function returning application specific protocol.
- * transport = Transport.
- */
- this(Protocol delegate() protocolFactory,
- DuplexTransport transport)
- in
- {
- assert(transport !is null);
- assert(protocolFactory !is null);
- }
- body
- {
- super(protocolFactory);
- this.transport_ = transport;
- }
-
- ~this()
- {
- dispose(defaultAllocator, transport_);
- }
-
- /**
- * Assigns a transport.
- *
- * Params:
- * protocolFactory = Function returning application specific protocol.
- * transport = Transport.
- *
- * Returns: $(D_KEYWORD this).
- */
- IOWatcher opCall(Protocol delegate() protocolFactory,
- DuplexTransport transport) @safe pure nothrow
- in
- {
- assert(transport !is null);
- assert(protocolFactory !is null);
- }
- body
- {
- this.protocolFactory = protocolFactory;
- this.transport_ = transport;
- return this;
- }
-
- /**
- * Returns: Transport used by this watcher.
- */
- @property inout(DuplexTransport) transport() inout @safe pure nothrow
- {
- return transport_;
- }
-
- /**
- * Returns: Socket.
- */
- override @property inout(int) socket() inout @safe pure nothrow
- {
- return transport.socket;
- }
-
- /**
- * Invokes the watcher callback.
- *
- * Finalizes the transport on disconnect.
- */
- override void invoke()
- {
- if (transport.protocol is null)
- {
- transport.protocol = protocolFactory();
- transport.protocol.connected(transport);
- }
- else if (transport.disconnected)
- {
- transport.protocol.disconnected();
- dispose(defaultAllocator, transport_);
- protocolFactory = null;
- }
- else if (transport.output.length)
- {
- transport.protocol.received(transport.output[]);
- }
- else if (transport.input.length)
- {
- try
- {
- transport.send();
- }
- catch (TransportException e)
- {
- dispose(defaultAllocator, e);
- }
- }
- }
-}