diff options
| author | Eugen Wissner <belka@caraus.de> | 2016-10-08 19:33:06 +0200 |
|---|---|---|
| committer | Eugen Wissner <belka@caraus.de> | 2016-10-08 19:33:06 +0200 |
| commit | 6b093cd5fac7dfa783bd82d4570a21f24cac29ab (patch) | |
| tree | a8caeb4f98e83598455cdbc9581af7354e97b99d | |
| parent | 154e2f2ff7b8e75720c36752caa234b813dcb295 (diff) | |
| download | tanya-6b093cd5fac7dfa783bd82d4570a21f24cac29ab.tar.gz | |
Add Windows IOCP and Kqueue implementations for the event loop
| -rw-r--r-- | source/tanya/async/event/epoll.d | 178 | ||||
| -rw-r--r-- | source/tanya/async/event/iocp.d | 294 | ||||
| -rw-r--r-- | source/tanya/async/event/kqueue.d | 349 | ||||
| -rw-r--r-- | source/tanya/async/event/selector.d | 266 | ||||
| -rw-r--r-- | source/tanya/async/loop.d | 493 | ||||
| -rw-r--r-- | source/tanya/async/package.d (renamed from source/tanya/event/config.d) | 45 | ||||
| -rw-r--r-- | source/tanya/async/protocol.d | 50 | ||||
| -rw-r--r-- | source/tanya/async/transport.d | 63 | ||||
| -rw-r--r-- | source/tanya/async/watcher.d | 242 | ||||
| -rw-r--r-- | source/tanya/container/buffer.d | 1336 | ||||
| -rw-r--r-- | source/tanya/event/internal/epoll.d | 223 | ||||
| -rw-r--r-- | source/tanya/event/internal/selector.d | 216 | ||||
| -rw-r--r-- | source/tanya/event/loop.d | 275 | ||||
| -rw-r--r-- | source/tanya/event/package.d | 16 | ||||
| -rw-r--r-- | source/tanya/event/protocol.d | 46 | ||||
| -rw-r--r-- | source/tanya/event/transport.d | 134 | ||||
| -rw-r--r-- | source/tanya/event/watcher.d | 207 |
17 files changed, 2654 insertions, 1779 deletions
diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d new file mode 100644 index 0000000..3451661 --- /dev/null +++ b/source/tanya/async/event/epoll.d @@ -0,0 +1,178 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async.event.epoll;
+
+version (linux):
+
+public import core.sys.linux.epoll;
+import tanya.async.protocol;
+import tanya.async.event.selector;
+import tanya.async.loop;
+import tanya.async.transport;
+import tanya.async.watcher;
+import tanya.memory;
+import tanya.memory.mmappool;
+import tanya.network.socket;
+import core.stdc.errno;
+import core.sys.posix.unistd;
+import core.time;
+import std.algorithm.comparison;
+
+class EpollLoop : SelectorLoop
+{
+ protected int fd;
+ private epoll_event[] events;
+
+ /**
+ * Initializes the loop.
+ */
+ this()
+ {
+ if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
+ {
+ throw MmapPool.instance.make!BadLoopException("epoll initialization failed");
+ }
+ super();
+ events = MmapPool.instance.makeArray!epoll_event(maxEvents);
+ }
+
+ /**
+ * Free loop internals.
+ */
+ ~this()
+ {
+ MmapPool.instance.dispose(events);
+ close(fd);
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ protected override bool reify(ConnectionWatcher watcher, EventMask oldEvents, EventMask events)
+ in
+ {
+ assert(watcher !is null);
+ }
+ body
+ {
+ int op = EPOLL_CTL_DEL;
+ epoll_event ev;
+
+ if (events == oldEvents)
+ {
+ return true;
+ }
+ if (events && oldEvents)
+ {
+ op = EPOLL_CTL_MOD;
+ }
+ else if (events && !oldEvents)
+ {
+ op = EPOLL_CTL_ADD;
+ }
+
+ ev.data.fd = watcher.socket.handle;
+ ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
+ | (events & Event.write ? EPOLLOUT : 0)
+ | EPOLLET;
+
+ return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0;
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ protected override void poll()
+ {
+ // Don't block
+ immutable timeout = cast(immutable int) blockTime.total!"msecs";
+ auto eventCount = epoll_wait(fd, events.ptr, maxEvents, timeout);
+
+ if (eventCount < 0)
+ {
+ if (errno != EINTR)
+ {
+ throw defaultAllocator.make!BadLoopException();
+ }
+ return;
+ }
+
+ for (auto i = 0; i < eventCount; ++i)
+ {
+ auto io = cast(IOWatcher) connections[events[i].data.fd];
+
+ if (io is null)
+ {
+ acceptConnections(connections[events[i].data.fd]);
+ }
+ else if (events[i].events & EPOLLERR)
+ {
+ kill(io, null);
+ }
+ else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ SocketException exception;
+ try
+ {
+ ptrdiff_t received;
+ do
+ {
+ received = transport.socket.receive(io.output[]);
+ io.output += received;
+ }
+ while (received);
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ }
+ if (transport.socket.disconnected)
+ {
+ kill(io, exception);
+ }
+ else if (io.output.length)
+ {
+ swapPendings.insertBack(io);
+ }
+ }
+ else if (events[i].events & EPOLLOUT)
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ transport.writeReady = true;
+ if (transport.input.length)
+ {
+ feed(transport);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns: The blocking time.
+ */
+ override protected @property inout(Duration) blockTime()
+ inout @safe pure nothrow
+ {
+ return min(super.blockTime, 1.dur!"seconds");
+ }
+}
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d new file mode 100644 index 0000000..569f974 --- /dev/null +++ b/source/tanya/async/event/iocp.d @@ -0,0 +1,294 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * Copyright: Eugene Wissner 2016. + * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, + * Mozilla Public License, v. 2.0). + * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) + */ +module tanya.async.event.iocp; + +version (Windows): + +import tanya.container.buffer; +import tanya.async.loop; +import tanya.async.protocol; +import tanya.async.transport; +import tanya.async.watcher; +import tanya.memory; +import tanya.memory.mmappool; +import tanya.network.socket; +import core.sys.windows.basetyps; +import core.sys.windows.mswsock; +import core.sys.windows.winbase; +import core.sys.windows.windef; +import core.sys.windows.winsock2; + +class IOCPStreamTransport : StreamTransport +{ + private OverlappedConnectedSocket socket_; + + private WriteBuffer input; + + /** + * Creates new completion port transport. + * Params: + * socket = Socket. + */ + this(OverlappedConnectedSocket socket) + in + { + assert(socket !is null); + } + body + { + socket_ = socket; + input = MmapPool.instance.make!WriteBuffer(); + } + + ~this() + { + MmapPool.instance.dispose(input); + } + + @property inout(OverlappedConnectedSocket) socket() inout pure nothrow @safe @nogc + { + return socket_; + } + + /** + * Write some data to the transport. + * + * Params: + * data = Data to send. + */ + void write(ubyte[] data) + { + immutable empty = input.length == 0; + input ~= data; + if (empty) + { + SocketState overlapped; + try + { + overlapped = MmapPool.instance.make!SocketState; + socket.beginSend(input[], overlapped); + } + catch (SocketException e) + { + MmapPool.instance.dispose(overlapped); + MmapPool.instance.dispose(e); + } + } + } +} + +class IOCPLoop : Loop +{ + protected HANDLE completionPort; + + protected OVERLAPPED overlap; + + /** + * Initializes the loop. + */ + this() + { + super(); + + completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + if (!completionPort) + { + throw defaultAllocator.make!BadLoopException("Creating completion port failed"); + } + } + + /** + * Should be called if the backend configuration changes. + * + * Params: + * watcher = Watcher. + * oldEvents = The events were already set. + * events = The events should be set. + * + * Returns: $(D_KEYWORD true) if the operation was successful. + */ + override protected bool reify(ConnectionWatcher watcher, + EventMask oldEvents, + EventMask events) + { + SocketState overlapped; + if (!(oldEvents & Event.accept) && (events & Event.accept)) + { + auto socket = cast(OverlappedStreamSocket) watcher.socket; + assert(socket !is null); + + if (CreateIoCompletionPort(cast(HANDLE) socket.handle, + completionPort, + cast(ULONG_PTR) (cast(void*) watcher), + 0) !is completionPort) + { + return false; + } + + try + { + overlapped = MmapPool.instance.make!SocketState; + socket.beginAccept(overlapped); + } + catch (SocketException e) + { + MmapPool.instance.dispose(overlapped); + defaultAllocator.dispose(e); + return false; + } + } + if (!(oldEvents & Event.read) && (events & Event.read) + || !(oldEvents & Event.write) && (events & Event.write)) + { + auto io = cast(IOWatcher) watcher; + assert(io !is null); + + auto transport = cast(IOCPStreamTransport) io.transport; + assert(transport !is null); + + if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle, + completionPort, + cast(ULONG_PTR) (cast(void*) watcher), + 0) !is completionPort) + { + return false; + } + + // Begin to read + if (!(oldEvents & Event.read) && (events & Event.read)) + { + try + { + overlapped = MmapPool.instance.make!SocketState; + transport.socket.beginReceive(io.output[], overlapped); + } + catch (SocketException e) + { + MmapPool.instance.dispose(overlapped); + defaultAllocator.dispose(e); + return false; + } + } + } + return true; + } + + /** + * Does the actual polling. + */ + override protected void poll() + { + DWORD lpNumberOfBytes; + ULONG_PTR key; + LPOVERLAPPED overlap; + immutable timeout = cast(immutable int) blockTime.total!"msecs"; + + auto result = GetQueuedCompletionStatus(completionPort, + &lpNumberOfBytes, + &key, + &overlap, + timeout); + if (result == FALSE && overlap == NULL) + { + return; // Timeout + } + + auto overlapped = (cast(SocketState) ((cast(void*) overlap) - 8)); + assert(overlapped !is null); + scope (failure) + { + MmapPool.instance.dispose(overlapped); + } + + switch (overlapped.event) + { + case OverlappedSocketEvent.accept: + auto connection = cast(ConnectionWatcher) (cast(void*) key); + assert(connection !is null); + + auto listener = cast(OverlappedStreamSocket) connection.socket; + assert(listener !is null); + + auto socket = listener.endAccept(overlapped); + auto transport = MmapPool.instance.make!IOCPStreamTransport(socket); + auto io = MmapPool.instance.make!IOWatcher(transport, connection.protocol); + + connection.incoming.insertBack(io); + + reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); + + swapPendings.insertBack(connection); + listener.beginAccept(overlapped); + break; + case OverlappedSocketEvent.read: + auto io = cast(IOWatcher) (cast(void*) key); + assert(io !is null); + if (!io.active) + { + MmapPool.instance.dispose(io); + MmapPool.instance.dispose(overlapped); + return; + } + + auto transport = cast(IOCPStreamTransport) io.transport; + assert(transport !is null); + + int received; + SocketException exception; + try + { + received = transport.socket.endReceive(overlapped); + } + catch (SocketException e) + { + exception = e; + } + if (transport.socket.disconnected) + { + // We want to get one last notification to destroy the watcher + transport.socket.beginReceive(io.output[], overlapped); + kill(io, exception); + } + else if (received > 0) + { + immutable full = io.output.free == received; + + io.output += received; + // Receive was interrupted because the buffer is full. We have to continue + if (full) + { + transport.socket.beginReceive(io.output[], overlapped); + } + swapPendings.insertBack(io); + } + break; + case OverlappedSocketEvent.write: + auto io = cast(IOWatcher) (cast(void*) key); + assert(io !is null); + + auto transport = cast(IOCPStreamTransport) io.transport; + assert(transport !is null); + + transport.input += transport.socket.endSend(overlapped); + if (transport.input.length) + { + transport.socket.beginSend(transport.input[], overlapped); + } + else + { + transport.socket.beginReceive(io.output[], overlapped); + } + break; + default: + assert(false, "Unknown event"); + } + } +} diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d new file mode 100644 index 0000000..37d9c1c --- /dev/null +++ b/source/tanya/async/event/kqueue.d @@ -0,0 +1,349 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * Copyright: Eugene Wissner 2016. + * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, + * Mozilla Public License, v. 2.0). + * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) + */ +module tanya.async.event.kqueue; + +version (OSX) +{ + version = MissingKevent; +} +else version (iOS) +{ + version = MissingKevent; +} +else version (TVOS) +{ + version = MissingKevent; +} +else version (WatchOS) +{ + version = MissingKevent; +} +else version (OpenBSD) +{ + version = MissingKevent; +} +else version (DragonFlyBSD) +{ + version = MissingKevent; +} + +version (MissingKevent) +{ + extern (C): + nothrow: + @nogc: + + import core.stdc.stdint; // intptr_t, uintptr_t + import core.sys.posix.time; // timespec + + enum : short + { + EVFILT_READ = -1, + EVFILT_WRITE = -2, + EVFILT_AIO = -3, /* attached to aio requests */ + EVFILT_VNODE = -4, /* attached to vnodes */ + EVFILT_PROC = -5, /* attached to struct proc */ + EVFILT_SIGNAL = -6, /* attached to struct proc */ + EVFILT_TIMER = -7, /* timers */ + EVFILT_MACHPORT = -8, /* Mach portsets */ + EVFILT_FS = -9, /* filesystem events */ + EVFILT_USER = -10, /* User events */ + EVFILT_VM = -12, /* virtual memory events */ + EVFILT_SYSCOUNT = 11 + } + + extern(D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) + { + *kevp = kevent_t(args); + } + + struct kevent_t + { + uintptr_t ident; /* identifier for this event */ + short filter; /* filter for event */ + ushort flags; + uint fflags; + intptr_t data; + void *udata; /* opaque user data identifier */ + } + + enum + { + /* actions */ + EV_ADD = 0x0001, /* add event to kq (implies enable) */ + EV_DELETE = 0x0002, /* delete event from kq */ + EV_ENABLE = 0x0004, /* enable event */ + EV_DISABLE = 0x0008, /* disable event (not reported) */ + + /* flags */ + EV_ONESHOT = 0x0010, /* only report one occurrence */ + EV_CLEAR = 0x0020, /* clear event state after reporting */ + EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ + EV_DISPATCH = 0x0080, /* disable event after reporting */ + + EV_SYSFLAGS = 0xF000, /* reserved by system */ + EV_FLAG1 = 0x2000, /* filter-specific flag */ + + /* returned values */ + EV_EOF = 0x8000, /* EOF detected */ + EV_ERROR = 0x4000, /* error, data contains errno */ + } + + int kqueue(); + int kevent(int kq, const kevent_t *changelist, int nchanges, + kevent_t *eventlist, int nevents, + const timespec *timeout); +} + +version (OSX) +{ + version = MacBSD; +} +else version (iOS) +{ + version = MacBSD; +} +else version (FreeBSD) +{ + version = MacBSD; + public import core.sys.freebsd.sys.event; +} +else version (OpenBSD) +{ + version = MacBSD; +} +else version (DragonFlyBSD) +{ + version = MacBSD; +} + +version (MacBSD): + +import dlib.async.event.selector; +import dlib.async.loop; +import dlib.async.transport; +import dlib.async.watcher; +import dlib.memory; +import dlib.memory.mmappool; +import dlib.network.socket; +import core.stdc.errno; +import core.sys.posix.unistd; +import core.sys.posix.sys.time; +import core.time; +import std.algorithm.comparison; + +class KqueueLoop : SelectorLoop +{ + protected int fd; + private kevent_t[] events; + private kevent_t[] changes; + private size_t changeCount; + + /** + * Returns: Maximal event count can be got at a time + * (should be supported by the backend). + */ + override protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc + { + return cast(uint) events.length; + } + + this() + { + super(); + + if ((fd = kqueue()) == -1) + { + throw MmapPool.instance.make!BadLoopException("epoll initialization failed"); + } + events = MmapPool.instance.makeArray!kevent_t(64); + changes = MmapPool.instance.makeArray!kevent_t(64); + } + + /** + * Free loop internals. + */ + ~this() + { + MmapPool.instance.dispose(events); + MmapPool.instance.dispose(changes); + close(fd); + } + + private void set(socket_t socket, short filter, ushort flags) + { + if (changes.length <= changeCount) + { + MmapPool.instance.resizeArray(changes, changeCount + maxEvents); + } + EV_SET(&changes[changeCount], + cast(ulong) socket, + filter, + flags, + 0U, + 0L, + null); + ++changeCount; + } + + /** + * Should be called if the backend configuration changes. + * + * Params: + * watcher = Watcher. + * oldEvents = The events were already set. + * events = The events should be set. + * + * Returns: $(D_KEYWORD true) if the operation was successful. + */ + override protected bool reify(ConnectionWatcher watcher, + EventMask oldEvents, + EventMask events) + { + if (events != oldEvents) + { + if (oldEvents & Event.read || oldEvents & Event.accept) + { + set(watcher.socket.handle, EVFILT_READ, EV_DELETE); + } + if (oldEvents & Event.write) + { + set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE); + } + } + if (events & (Event.read | events & Event.accept)) + { + set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE); + } + if (events & Event.write) + { + set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH); + } + return true; + } + + /** + * Does the actual polling. + */ + protected override void poll() + { + timespec ts; + blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec); + + if (changeCount > maxEvents) + { + MmapPool.instance.resizeArray(events, changes.length); + } + + auto eventCount = kevent(fd, changes.ptr, cast(int) changeCount, events.ptr, maxEvents, &ts); + changeCount = 0; + + if (eventCount < 0) + { + if (errno != EINTR) + { + throw defaultAllocator.make!BadLoopException(); + } + return; + } + + for (int i; i < eventCount; ++i) + { + assert(connections.length > events[i].ident); + + IOWatcher io = cast(IOWatcher) connections[events[i].ident]; + // If it is a ConnectionWatcher. Accept connections. + if (io is null) + { + acceptConnections(connections[events[i].ident]); + } + else if (events[i].flags & EV_ERROR) + { + kill(io, null); + } + else if (events[i].filter == EVFILT_READ) + { + auto transport = cast(SelectorStreamTransport) io.transport; + assert(transport !is null); + + SocketException exception; + try + { + ptrdiff_t received; + do + { + received = transport.socket.receive(io.output[]); + io.output += received; + } + while (received); + } + catch (SocketException e) + { + exception = e; + } + if (transport.socket.disconnected) + { + kill(io, exception); + } + else if (io.output.length) + { + swapPendings.insertBack(io); + } + } + else if (events[i].filter == EVFILT_WRITE) + { + auto transport = cast(SelectorStreamTransport) io.transport; + assert(transport !is null); + + transport.writeReady = true; + if (transport.input.length) + { + feed(transport); + } + } + } + } + + /** + * Returns: The blocking time. + */ + override protected @property inout(Duration) blockTime() + inout @safe pure nothrow + { + return min(super.blockTime, 1.dur!"seconds"); + } + + /** + * If the transport couldn't send the data, the further sending should + * be handled by the event loop. + * + * Params: + * transport = Transport. + * exception = Exception thrown on sending. + * + * Returns: $(D_KEYWORD true) if the operation could be successfully + * completed or scheduled, $(D_KEYWORD false) otherwise (the + * transport is be destroyed then). + */ + protected override bool feed(SelectorStreamTransport transport, SocketException exception = null) + { + if (!super.feed(transport, exception)) + { + return false; + } + if (!transport.writeReady) + { + set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH); + return true; + } + return false; + } +} diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d new file mode 100644 index 0000000..fa813d5 --- /dev/null +++ b/source/tanya/async/event/selector.d @@ -0,0 +1,266 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * Copyright: Eugene Wissner 2016. + * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, + * Mozilla Public License, v. 2.0). + * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) + */ +module tanya.async.event.selector; + +version (Posix): + +import tanya.async.loop; +import tanya.async.transport; +import tanya.async.watcher; +import tanya.container.buffer; +import tanya.memory; +import tanya.memory.mmappool; +import tanya.network.socket; +import core.sys.posix.netinet.in_; +import core.stdc.errno; + +/** + * Transport for stream sockets. + */ +class SelectorStreamTransport : StreamTransport +{ + private ConnectedSocket socket_; + + /// Input buffer. + package WriteBuffer input; + + private SelectorLoop loop; + + /// Received notification that the underlying socket is write-ready. + package bool writeReady; + + /** + * Params: + * loop = Event loop. + * socket = Socket. + */ + this(SelectorLoop loop, ConnectedSocket socket) + { + socket_ = socket; + this.loop = loop; + input = MmapPool.instance.make!WriteBuffer(); + } + + /** + * Close the transport and deallocate the data buffers. + */ + ~this() + { + MmapPool.instance.dispose(input); + } + + /** + * Returns: Transport socket. + */ + inout(ConnectedSocket) socket() inout pure nothrow @safe @nogc + { + return socket_; + } + + /** + * Write some data to the transport. + * + * Params: + * data = Data to send. + */ + void write(ubyte[] data) + { + if (!data.length) + { + return; + } + // Try to write if the socket is write ready. + if (writeReady) + { + ptrdiff_t sent; + SocketException exception; + try + { + sent = socket.send(data); + if (sent == 0) + { + writeReady = false; + } + } + catch (SocketException e) + { + writeReady = false; + exception = e; + } + if (sent < data.length) + { + input ~= data[sent..$]; + loop.feed(this, exception); + } + } + else + { + input ~= data; + } + } +} + +abstract class SelectorLoop : Loop +{ + /// Pending connections. + protected ConnectionWatcher[] connections; + + this() + { + super(); + connections = MmapPool.instance.makeArray!ConnectionWatcher(maxEvents); + } + + ~this() + { + foreach (ref connection; connections) + { + // We want to free only IOWatchers. ConnectionWatcher are created by the + // user and should be freed by himself. + auto io = cast(IOWatcher) connection; + if (io !is null) + { + MmapPool.instance.dispose(io); + connection = null; + } + } + MmapPool.instance.dispose(connections); + } + + /** + * If the transport couldn't send the data, the further sending should + * be handled by the event loop. + * + * Params: + * transport = Transport. + * exception = Exception thrown on sending. + * + * Returns: $(D_KEYWORD true) if the operation could be successfully + * completed or scheduled, $(D_KEYWORD false) otherwise (the + * transport will be destroyed then). + */ + protected bool feed(SelectorStreamTransport transport, SocketException exception = null) + { + while (transport.input.length && transport.writeReady) + { + try + { + ptrdiff_t sent = transport.socket.send(transport.input[]); + if (sent == 0) + { + transport.writeReady = false; + } + else + { + transport.input += sent; + } + } + catch (SocketException e) + { + exception = e; + transport.writeReady = false; + } + } + if (exception !is null) + { + auto watcher = cast(IOWatcher) connections[transport.socket.handle]; + assert(watcher !is null); + + kill(watcher, exception); + return false; + } + return true; + } + + /** + * Start watching. + * + * Params: + * watcher = Watcher. + */ + override void start(ConnectionWatcher watcher) + { + if (watcher.active) + { + return; + } + + if (connections.length <= watcher.socket) + { + MmapPool.instance.resizeArray(connections, watcher.socket.handle + maxEvents / 2); + } + connections[watcher.socket.handle] = watcher; + + super.start(watcher); + } + + /** + * Accept incoming connections. + * + * Params: + * connection = Connection watcher ready to accept. + */ + package void acceptConnections(ConnectionWatcher connection) + in + { + assert(connection !is null); + } + body + { + while (true) + { + ConnectedSocket client; + try + { + client = (cast(StreamSocket) connection.socket).accept(); + } + catch (SocketException e) + { + defaultAllocator.dispose(e); + break; + } + if (client is null) + { + break; + } + + IOWatcher io; + auto transport = MmapPool.instance.make!SelectorStreamTransport(this, client); + + if (connections.length >= client.handle) + { + io = cast(IOWatcher) connections[client.handle]; + } + else + { + MmapPool.instance.resizeArray(connections, client.handle + maxEvents / 2); + } + if (io is null) + { + io = MmapPool.instance.make!IOWatcher(transport, + connection.protocol); + connections[client.handle] = io; + } + else + { + io(transport, connection.protocol); + } + + reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); + connection.incoming.insertBack(io); + } + + if (!connection.incoming.empty) + { + swapPendings.insertBack(connection); + } + } +} diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d new file mode 100644 index 0000000..9031a83 --- /dev/null +++ b/source/tanya/async/loop.d @@ -0,0 +1,493 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ *
+ * ---
+ * import tanya.async;
+ * import tanya.network.socket;
+ *
+ * class EchoProtocol : TransmissionControlProtocol
+ * {
+ * private DuplexTransport transport;
+ *
+ * void received(ubyte[] data)
+ * {
+ * transport.write(data);
+ * }
+ *
+ * void connected(DuplexTransport transport)
+ * {
+ * this.transport = transport;
+ * }
+ *
+ * void disconnected(SocketException exception = null)
+ * {
+ * }
+ * }
+ *
+ * void main()
+ * {
+ * auto address = new InternetAddress("127.0.0.1", cast(ushort) 8192);
+ * version (Windows)
+ * {
+ * auto sock = new OverlappedStreamSocket(AddressFamily.INET)
+ * }
+ * else
+ * {
+ * auto sock = new StreamSocket(AddressFamily.INET)
+ * sock.blocking = false;
+ * }
+ *
+ * sock.bind(address);
+ * sock.listen(5);
+ *
+ * auto io = new ConnectionWatcher(sock);
+ * io.setProtocol!EchoProtocol;
+ *
+ * defaultLoop.start(io);
+ * defaultLoop.run();
+ *
+ * sock.shutdown();
+ * }
+ * ---
+ */
+module tanya.async.loop;
+
+import tanya.async.protocol;
+import tanya.async.transport;
+import tanya.async.watcher;
+import tanya.container.buffer;
+import tanya.memory;
+import tanya.memory.mmappool;
+import tanya.network.socket;
+import core.time;
+import std.algorithm.iteration;
+import std.algorithm.mutation;
+import std.typecons;
+
+version (DisableBackends)
+{
+}
+else version (linux)
+{
+ import tanya.async.event.epoll;
+ version = Epoll;
+}
+else version (Windows)
+{
+ import tanya.async.event.iocp;
+ version = IOCP;
+}
+else version (OSX)
+{
+ version = Kqueue;
+}
+else version (iOS)
+{
+ version = Kqueue;
+}
+else version (FreeBSD)
+{
+ version = Kqueue;
+}
+else version (OpenBSD)
+{
+ version = Kqueue;
+}
+else version (DragonFlyBSD)
+{
+ version = Kqueue;
+}
+
+/**
+ * Events.
+ */
+enum Event : uint
+{
+ none = 0x00, /// No events.
+ read = 0x01, /// Non-blocking read call.
+ write = 0x02, /// Non-blocking write call.
+ accept = 0x04, /// Connection made.
+ error = 0x80000000, /// Sent when an error occurs.
+}
+
+alias EventMask = BitFlags!Event;
+
+/**
+ * Tries to set $(D_PSYMBOL MmapPool) to the default allocator.
+ */
+shared static this()
+{
+ if (allocator is null)
+ {
+ allocator = MmapPool.instance;
+ }
+}
+
+/**
+ * Event loop.
+ */
+abstract class Loop
+{
+ /// Pending watchers.
+ protected PendingQueue!Watcher pendings;
+
+ protected PendingQueue!Watcher swapPendings;
+
+ /**
+ * Returns: Maximal event count can be got at a time
+ * (should be supported by the backend).
+ */
+ protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc
+ {
+ return 128U;
+ }
+
+ /**
+ * Initializes the loop.
+ */
+ this()
+ {
+ pendings = MmapPool.instance.make!(PendingQueue!Watcher);
+ swapPendings = MmapPool.instance.make!(PendingQueue!Watcher);
+ }
+
+ /**
+ * Frees loop internals.
+ */
+ ~this()
+ {
+ MmapPool.instance.dispose(pendings);
+ MmapPool.instance.dispose(swapPendings);
+ }
+
+ /**
+ * Starts the loop.
+ */
+ void run()
+ {
+ done_ = false;
+ do
+ {
+ poll();
+
+ // Invoke pendings
+ swapPendings.each!((ref p) => p.invoke());
+
+ swap(pendings, swapPendings);
+ }
+ while (!done_);
+ }
+
+ /**
+ * Break out of the loop.
+ */
+ void unloop() @safe pure nothrow
+ {
+ done_ = true;
+ }
+
+ /**
+ * Start watching.
+ *
+ * Params:
+ * watcher = Watcher.
+ */
+ void start(ConnectionWatcher watcher)
+ {
+ if (watcher.active)
+ {
+ return;
+ }
+ watcher.active = true;
+ reify(watcher, EventMask(Event.none), EventMask(Event.accept));
+ }
+
+ /**
+ * Stop watching.
+ *
+ * Params:
+ * watcher = Watcher.
+ */
+ void stop(ConnectionWatcher watcher)
+ {
+ if (!watcher.active)
+ {
+ return;
+ }
+ watcher.active = false;
+
+ reify(watcher, EventMask(Event.accept), EventMask(Event.none));
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ abstract protected bool reify(ConnectionWatcher watcher,
+ EventMask oldEvents,
+ EventMask events);
+
+ /**
+ * Returns: The blocking time.
+ */
+ protected @property inout(Duration) blockTime()
+ inout @safe pure nothrow
+ {
+ // Don't block if we have to do.
+ return swapPendings.empty ? blockTime_ : Duration.zero;
+ }
+
+ /**
+ * Sets the blocking time for IO watchers.
+ *
+ * Params:
+ * blockTime = The blocking time. Cannot be larger than
+ * $(D_PSYMBOL maxBlockTime).
+ */
+ protected @property void blockTime(in Duration blockTime) @safe pure nothrow
+ in
+ {
+ assert(blockTime <= 1.dur!"hours", "Too long to wait.");
+ assert(!blockTime.isNegative);
+ }
+ body
+ {
+ blockTime_ = blockTime;
+ }
+
+ /**
+ * Kills the watcher and closes the connection.
+ */
+ protected void kill(IOWatcher watcher, SocketException exception)
+ {
+ watcher.socket.shutdown();
+ defaultAllocator.dispose(watcher.socket);
+ MmapPool.instance.dispose(watcher.transport);
+ watcher.exception = exception;
+ swapPendings.insertBack(watcher);
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ abstract protected void poll();
+
+ /// Whether the event loop should be stopped.
+ private bool done_;
+
+ /// Maximal block time.
+ protected Duration blockTime_ = 1.dur!"minutes";
+}
+
+/**
+ * Exception thrown on errors in the event loop.
+ */
+class BadLoopException : Exception
+{
+@nogc:
+ /**
+ * Params:
+ * file = The file where the exception occurred.
+ * line = The line number where the exception occurred.
+ * next = The previous exception in the chain of exceptions, if any.
+ */
+ this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
+ pure @safe nothrow const
+ {
+ super("Event loop cannot be initialized.", file, line, next);
+ }
+}
+
+/**
+ * Returns the event loop used by default. If an event loop wasn't set with
+ * $(D_PSYMBOL defaultLoop) before, $(D_PSYMBOL defaultLoop) will try to
+ * choose an event loop supported on the system.
+ *
+ * Returns: The default event loop.
+ */
+@property Loop defaultLoop()
+{
+ if (defaultLoop_ !is null)
+ {
+ return defaultLoop_;
+ }
+ version (Epoll)
+ {
+ defaultLoop_ = MmapPool.instance.make!EpollLoop;
+ }
+ else version (IOCP)
+ {
+ defaultLoop_ = MmapPool.instance.make!IOCPLoop;
+ }
+ else version (Kqueue)
+ {
+ import tanya.async.event.kqueue;
+ defaultLoop_ = MmapPool.instance.make!KqueueLoop;
+ }
+ return defaultLoop_;
+}
+
+/**
+ * Sets the default event loop.
+ *
+ * This property makes it possible to implement your own backends or event
+ * loops, for example, if the system is not supported or if you want to
+ * extend the supported implementation. Just extend $(D_PSYMBOL Loop) and pass
+ * your implementation to this property.
+ *
+ * Params:
+ * loop = The event loop.
+ */
+@property void defaultLoop(Loop loop)
+in
+{
+ assert(loop !is null);
+}
+body
+{
+ defaultLoop_ = loop;
+}
+
+private Loop defaultLoop_;
+
+/**
+ * Queue.
+ *
+ * Params:
+ * T = Content type.
+ */
+class PendingQueue(T)
+{
+ /**
+ * Creates a new $(D_PSYMBOL Queue).
+ */
+ this()
+ {
+ }
+
+ /**
+ * Removes all elements from the queue.
+ */
+ ~this()
+ {
+ foreach (e; this)
+ {
+ MmapPool.instance.dispose(e);
+ }
+ }
+
+ /**
+ * Returns: First element.
+ */
+ @property ref T front()
+ in
+ {
+ assert(!empty);
+ }
+ body
+ {
+ return first.next.content;
+ }
+
+ /**
+ * Inserts a new element.
+ *
+ * Params:
+ * x = New element.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ typeof(this) insertBack(T x)
+ {
+ Entry* temp = MmapPool.instance.make!Entry;
+
+ temp.content = x;
+
+ if (empty)
+ {
+ first.next = rear = temp;
+ }
+ else
+ {
+ rear.next = temp;
+ rear = rear.next;
+ }
+
+ return this;
+ }
+
+ alias insert = insertBack;
+
+ /**
+ * Inserts a new element.
+ *
+ * Params:
+ * x = New element.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ typeof(this) opOpAssign(string Op)(ref T x)
+ if (Op == "~")
+ {
+ return insertBack(x);
+ }
+
+ /**
+ * Returns: $(D_KEYWORD true) if the queue is empty.
+ */
+ @property bool empty() const @safe pure nothrow
+ {
+ return first.next is null;
+ }
+
+ /**
+ * Move position to the next element.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ typeof(this) popFront()
+ in
+ {
+ assert(!empty);
+ }
+ body
+ {
+ auto n = first.next.next;
+
+ MmapPool.instance.dispose(first.next);
+ first.next = n;
+
+ return this;
+ }
+
+ /**
+ * Queue entry.
+ */
+ protected struct Entry
+ {
+ /// Queue item content.
+ T content;
+
+ /// Next list item.
+ Entry* next;
+ }
+
+ /// The first element of the list.
+ protected Entry first;
+
+ /// The last element of the list.
+ protected Entry* rear;
+}
diff --git a/source/tanya/event/config.d b/source/tanya/async/package.d index 11a432a..eb82f12 100644 --- a/source/tanya/event/config.d +++ b/source/tanya/async/package.d @@ -1,26 +1,19 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Copyright: Eugene Wissner 2016. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) - */ -module tanya.event.config; - -package version (DisableBackends) -{ -} -else -{ - version (linux) - { - enum UseEpoll = true; - } - else - { - enum UseEpoll = false; - } -} +/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async;
+
+public
+{
+ import tanya.async.loop;
+ import tanya.async.protocol;
+ import tanya.async.transport;
+ import tanya.async.watcher;
+}
diff --git a/source/tanya/async/protocol.d b/source/tanya/async/protocol.d new file mode 100644 index 0000000..62c0ab7 --- /dev/null +++ b/source/tanya/async/protocol.d @@ -0,0 +1,50 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async.protocol;
+
+import tanya.network.socket;
+import tanya.async.transport;
+
+/**
+ * Common protocol interface.
+ */
+interface Protocol
+{
+ /**
+ * Params:
+ * data = Read data.
+ */
+ void received(ubyte[] data);
+
+ /**
+ * Called when a connection is made.
+ *
+ * Params:
+ * transport = Protocol transport.
+ */
+ void connected(DuplexTransport transport);
+
+ /**
+ * Called when a connection is lost.
+ *
+ * Params:
+ * exception = $(D_PSYMBOL Exception) if an error caused
+ * the disconnect, $(D_KEYWORD null) otherwise.
+ */
+ void disconnected(SocketException exception = null);
+}
+
+/**
+ * Interface for TCP.
+ */
+interface TransmissionControlProtocol : Protocol
+{
+}
diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d new file mode 100644 index 0000000..6156815 --- /dev/null +++ b/source/tanya/async/transport.d @@ -0,0 +1,63 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.async.transport;
+
+import tanya.network.socket;
+
+/**
+ * Base transport interface.
+ */
+interface Transport
+{
+}
+
+/**
+ * Interface for read-only transports.
+ */
+interface ReadTransport : Transport
+{
+}
+
+/**
+ * Interface for write-only transports.
+ */
+interface WriteTransport : Transport
+{
+ /**
+ * Write some data to the transport.
+ *
+ * Params:
+ * data = Data to send.
+ */
+ void write(ubyte[] data);
+}
+
+/**
+ * Represents a bidirectional transport.
+ */
+interface DuplexTransport : ReadTransport, WriteTransport
+{
+}
+
+/**
+ * Represents a socket transport.
+ */
+interface SocketTransport : Transport
+{
+ @property inout(Socket) socket() inout pure nothrow @safe @nogc;
+}
+
+/**
+ * Represents a connection-oriented socket transport.
+ */
+package interface StreamTransport : DuplexTransport, SocketTransport
+{
+}
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d new file mode 100644 index 0000000..08fe6df --- /dev/null +++ b/source/tanya/async/watcher.d @@ -0,0 +1,242 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * Copyright: Eugene Wissner 2016. + * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, + * Mozilla Public License, v. 2.0). + * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) + */ +module tanya.async.watcher; + +import tanya.async.loop; +import tanya.async.protocol; +import tanya.async.transport; +import tanya.container.buffer; +import tanya.memory; +import tanya.memory.mmappool; +import tanya.network.socket; +import std.functional; +import std.exception; + +version (Windows) +{ + import core.sys.windows.basetyps; + import core.sys.windows.mswsock; + import core.sys.windows.winbase; + import core.sys.windows.windef; + import core.sys.windows.winsock2; +} + +/** + * A watcher is an opaque structure that you allocate and register to record + * your interest in some event. + */ +abstract class Watcher +{ + /// Whether the watcher is active. + bool active; + + /** + * Invoke some action on event. + */ + void invoke(); +} + +class ConnectionWatcher : Watcher +{ + /// Watched socket. + private Socket socket_; + + /// Protocol factory. + protected Protocol delegate() protocolFactory; + + package PendingQueue!IOWatcher incoming; + + /** + * Params: + * socket = Socket. + */ + this(Socket socket) + { + socket_ = socket; + incoming = MmapPool.instance.make!(PendingQueue!IOWatcher); + } + + /// Ditto. + protected this() + { + } + + ~this() + { + MmapPool.instance.dispose(incoming); + } + + /* + * Params: + * P = Protocol should be used. + */ + void setProtocol(P : Protocol)() + { + this.protocolFactory = () => cast(Protocol) MmapPool.instance.make!P; + } + + /** + * Returns: Socket. + */ + @property inout(Socket) socket() inout pure nothrow @nogc + { + return socket_; + } + + /** + * Returns: New protocol instance. + */ + @property Protocol protocol() + in + { + assert(protocolFactory !is null, "Protocol isn't set."); + } + body + { + return protocolFactory(); + } + + /** + * Invokes new connection callback. + */ + override void invoke() + { + foreach (io; incoming) + { + io.protocol.connected(cast(DuplexTransport) io.transport); + } + } +} + +/** + * Contains a pending watcher with the invoked events or a transport can be + * read from. + */ +class IOWatcher : ConnectionWatcher +{ + /// If an exception was thrown the transport should be already invalid. + private union + { + StreamTransport transport_; + SocketException exception_; + } + + private Protocol protocol_; + + /** + * Returns: Underlying output buffer. + */ + package ReadBuffer output; + + /** + * Params: + * transport = Transport. + * protocol = New instance of the application protocol. + */ + this(StreamTransport transport, Protocol protocol) + in + { + assert(transport !is null); + assert(protocol !is null); + } + body + { + super(); + transport_ = transport; + protocol_ = protocol; + output = MmapPool.instance.make!ReadBuffer(); + active = true; + } + + /** + * Destroys the watcher. + */ + protected ~this() + { + MmapPool.instance.dispose(output); + MmapPool.instance.dispose(protocol_); + } + + /** + * Assigns a transport. + * + * Params: + * transport = Transport. + * protocol = Application protocol. + * + * Returns: $(D_KEYWORD this). + */ + IOWatcher opCall(StreamTransport transport, Protocol protocol) pure nothrow @safe @nogc + in + { + assert(transport !is null); + assert(protocol !is null); + } + body + { + transport_ = transport; + protocol_ = protocol; + active = true; + return this; + } + + /** + * Returns: Transport used by this watcher. + */ + @property inout(StreamTransport) transport() inout pure nothrow @nogc + { + return transport_; + } + + /** + * Sets an exception occurred during a read/write operation. + * + * Params: + * exception = Thrown exception. + */ + @property void exception(SocketException exception) pure nothrow @nogc + { + exception_ = exception; + } + + /** + * Returns: Application protocol. + */ + override @property Protocol protocol() pure nothrow @safe @nogc + { + return protocol_; + } + + /** + * Returns: Socket. + */ + override @property inout(Socket) socket() inout pure nothrow @nogc + { + return transport.socket; + } + + /** + * Invokes the watcher callback. + */ + override void invoke() + { + if (output.length) + { + protocol.received(output[0..$]); + output.clear(); + } + else + { + protocol.disconnected(exception_); + active = false; + } + } +} diff --git a/source/tanya/container/buffer.d b/source/tanya/container/buffer.d index efe2e41..338c0bb 100644 --- a/source/tanya/container/buffer.d +++ b/source/tanya/container/buffer.d @@ -1,636 +1,700 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Copyright: Eugene Wissner 2016. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) - */ -module tanya.container.buffer; - -import tanya.memory; - -version (unittest) -{ - private int fillBuffer(void* buffer, - in size_t size, - int start = 0, - int end = 10) - in - { - assert(start < end); - } - body - { - ubyte[] buf = cast(ubyte[]) buffer[0..size]; - auto numberRead = end - start; - - for (ubyte i; i < numberRead; ++i) - { - buf[i] = cast(ubyte) (start + i); - } - return numberRead; - } -} - -/** - * Interface for implemeting input/output buffers. - */ -interface Buffer -{ - /** - * Returns: The size of the internal buffer. - */ - @property size_t capacity() const @safe pure nothrow; - - /** - * Returns: Data size. - */ - @property size_t length() const @safe pure nothrow; - - /** - * Returns: Available space. - */ - @property size_t free() const @safe pure nothrow; - - /** - * Appends some data to the buffer. - * - * Params: - * buffer = Buffer chunk got with $(D_PSYMBOL buffer). - */ - Buffer opOpAssign(string op)(void[] buffer) - if (op == "~"); -} - -/** - * Buffer that can be used with C functions accepting void pointer and - * returning the number of the read bytes. - */ -class ReadBuffer : Buffer -{ - /// Internal buffer. - protected ubyte[] _buffer; - - /// Filled buffer length. - protected size_t _length; - - /// Available space. - protected immutable size_t minAvailable; - - /// Size by which the buffer will grow. - protected immutable size_t blockSize; - - private shared Allocator allocator; - - invariant - { - assert(_length <= _buffer.length); - assert(blockSize > 0); - assert(minAvailable > 0); - } - - /** - * Params: - * size = Initial buffer size and the size by which the buffer - * will grow. - * minAvailable = minimal size should be always available to fill. - * So it will reallocate if $(D_INLINECODE - * $(D_PSYMBOL free) < $(D_PARAM minAvailable) - * ). - */ - this(size_t size = 8192, - size_t minAvailable = 1024, - shared Allocator allocator = defaultAllocator) - { - this.allocator = allocator; - this.minAvailable = minAvailable; - this.blockSize = size; - resizeArray!ubyte(this.allocator, _buffer, size); - } - - /** - * Deallocates the internal buffer. - */ - ~this() - { - dispose(allocator, _buffer); - } - - /// - unittest - { - auto b = make!ReadBuffer(defaultAllocator); - assert(b.capacity == 8192); - assert(b.length == 0); - - dispose(defaultAllocator, b); - } - - /** - * Returns: The size of the internal buffer. - */ - @property size_t capacity() const @safe pure nothrow - { - return _buffer.length; - } - - /** - * Returns: Data size. - */ - @property size_t length() const @safe pure nothrow - { - return _length; - } - - /** - * Returns: Available space. - */ - @property size_t free() const @safe pure nothrow - { - return capacity - length; - } - - /// - unittest - { - auto b = make!ReadBuffer(defaultAllocator); - size_t numberRead; - void* buf; - - // Fills the buffer with values 0..10 - assert(b.free == b.blockSize); - buf = b.buffer; - numberRead = fillBuffer(buf, b.free, 0, 10); - b ~= buf[0..numberRead]; - assert(b.free == b.blockSize - numberRead); - b[]; - assert(b.free == b.blockSize); - - dispose(defaultAllocator, b); - } - - /** - * Returns a pointer to a chunk of the internal buffer. You can pass it to - * a function that requires such a buffer. - * - * Set the buffer again after reading something into it. Append - * $(D_KEYWORD ~=) a slice from the beginning of the buffer you got and - * till the number of the read bytes. The data will be appended to the - * existing buffer. - * - * Returns: A chunk of available buffer. - */ - @property void* buffer() - { - if (capacity - length < minAvailable) - { - resizeArray!ubyte(this.allocator, _buffer, capacity + blockSize); - } - return _buffer[_length..$].ptr; - } - - /** - * Appends some data to the buffer. Use only the buffer you got - * with $(D_PSYMBOL buffer)! - * - * Params: - * buffer = Buffer chunk got with $(D_PSYMBOL buffer). - */ - ReadBuffer opOpAssign(string op)(void[] buffer) - if (op == "~") - { - _length += buffer.length; - return this; - } - - /// - unittest - { - auto b = make!ReadBuffer(defaultAllocator); - size_t numberRead; - void* buf; - ubyte[] result; - - // Fills the buffer with values 0..10 - buf = b.buffer; - numberRead = fillBuffer(buf, b.free, 0, 10); - b ~= buf[0..numberRead]; - - result = b[]; - assert(result[0] == 0); - assert(result[1] == 1); - assert(result[9] == 9); - - // It shouldn't overwrite, but append another 5 bytes to the buffer - buf = b.buffer; - numberRead = fillBuffer(buf, b.free, 0, 10); - b ~= buf[0..numberRead]; - - buf = b.buffer; - numberRead = fillBuffer(buf, b.free, 20, 25); - b ~= buf[0..numberRead]; - - result = b[]; - assert(result[0] == 0); - assert(result[1] == 1); - assert(result[9] == 9); - assert(result[10] == 20); - assert(result[14] == 24); - - dispose(defaultAllocator, b); - } - - /** - * Returns the buffer. The buffer is cleared after that. So you can get it - * only one time. - * - * Returns: The buffer as array. - */ - @property ubyte[] opIndex() - { - auto ret = _buffer[0.._length]; - _length = 0; - return ret; - } - - /// - unittest - { - auto b = make!ReadBuffer(defaultAllocator); - size_t numberRead; - void* buf; - ubyte[] result; - - // Fills the buffer with values 0..10 - buf = b.buffer; - numberRead = fillBuffer(buf, b.free, 0, 10); - b ~= buf[0..numberRead]; - - assert(b.length == 10); - result = b[]; - assert(result[0] == 0); - assert(result[9] == 9); - assert(b.length == 0); - - dispose(defaultAllocator, b); - } -} - -/** - * Circular, self-expanding buffer that can be used with C functions accepting - * void pointer and returning the number of the read bytes. - * - * The buffer is optimized for situations where you read all the data from it - * at once (without writing to it occasionally). It can become ineffective if - * you permanently keep some data in the buffer and alternate writing and - * reading, because it may allocate and move elements. - */ -class WriteBuffer : Buffer -{ - /// Internal buffer. - protected ubyte[] _buffer; - - /// Buffer start position. - protected size_t start; - - /// Buffer ring area size. After this position begins buffer overflow area. - protected size_t ring; - - /// Size by which the buffer will grow. - protected immutable size_t blockSize; - - /// The position of the free area in the buffer. - protected size_t position; - - private shared Allocator allocator; - - invariant - { - assert(blockSize > 0); - // position can refer to an element outside the buffer if the buffer is full. - assert(position <= _buffer.length); - } - - /** - * Params: - * size = Initial buffer size and the size by which the buffer - * will grow. - */ - this(size_t size = 8192, - shared Allocator allocator = defaultAllocator) - { - this.allocator = allocator; - blockSize = size; - ring = size - 1; - resizeArray!ubyte(this.allocator, _buffer, size); - } - - /** - * Deallocates the internal buffer. - */ - ~this() - { - dispose(allocator, _buffer); - } - - /** - * Returns: The size of the internal buffer. - */ - @property size_t capacity() const @safe pure nothrow - { - return _buffer.length; - } - - /** - * Note that $(D_PSYMBOL length) doesn't return the real length of the data, - * but only the array length that will be returned with $(D_PSYMBOL buffer) - * next time. Be sure to call $(D_PSYMBOL buffer) and set $(D_PSYMBOL written) - * until $(D_PSYMBOL length) returns 0. - * - * Returns: Data size. - */ - @property size_t length() const @safe pure nothrow - { - if (position > ring || position < start) // Buffer overflowed - { - return ring - start + 1; - } - else - { - return position - start; - } - } - - /// - unittest - { - auto b = make!WriteBuffer(defaultAllocator, 4); - ubyte[3] buf = [48, 23, 255]; - - b ~= buf; - assert(b.length == 3); - b.written = 2; - assert(b.length == 1); - - b ~= buf; - assert(b.length == 2); - b.written = 2; - assert(b.length == 2); - - b ~= buf; - assert(b.length == 5); - b.written = b.length; - assert(b.length == 0); - - dispose(defaultAllocator, b); - } - - /** - * Returns: Available space. - */ - @property size_t free() const @safe pure nothrow - { - return capacity - length; - } - - /** - * Appends data to the buffer. - * - * Params: - * buffer = Buffer chunk got with $(D_PSYMBOL buffer). - */ - WriteBuffer opOpAssign(string op)(ubyte[] buffer) - if (op == "~") - { - size_t end, start; - - if (position >= this.start && position <= ring) - { - auto afterRing = ring + 1; - - end = position + buffer.length; - if (end > afterRing) - { - end = afterRing; - } - start = end - position; - _buffer[position..end] = buffer[0..start]; - if (end == afterRing) - { - position = this.start == 0 ? afterRing : 0; - } - else - { - position = end; - } - } - - // Check if we have some free space at the beginning - if (start < buffer.length && position < this.start) - { - end = position + buffer.length - start; - if (end > this.start) - { - end = this.start; - } - auto areaEnd = end - position + start; - _buffer[position..end] = buffer[start..areaEnd]; - position = end == this.start ? ring + 1 : end - position; - start = areaEnd; - } - - // And if we still haven't found any place, save the rest in the overflow area - if (start < buffer.length) - { - end = position + buffer.length - start; - if (end > capacity) - { - auto newSize = end / blockSize * blockSize + blockSize; - - resizeArray!ubyte(this.allocator, _buffer, newSize); - } - _buffer[position..end] = buffer[start..$]; - position = end; - if (this.start == 0) - { - ring = capacity - 1; - } - } - - return this; - } - - /// - unittest - { - auto b = make!WriteBuffer(defaultAllocator, 4); - ubyte[3] buf = [48, 23, 255]; - - b ~= buf; - assert(b.capacity == 4); - assert(b._buffer[0] == 48 && b._buffer[1] == 23 && b._buffer[2] == 255); - - b.written = 2; - b ~= buf; - assert(b.capacity == 4); - assert(b._buffer[0] == 23 && b._buffer[1] == 255 - && b._buffer[2] == 255 && b._buffer[3] == 48); - - b.written = 2; - b ~= buf; - assert(b.capacity == 8); - assert(b._buffer[0] == 23 && b._buffer[1] == 255 - && b._buffer[2] == 48 && b._buffer[3] == 23 && b._buffer[4] == 255); - - dispose(defaultAllocator, b); - - b = make!WriteBuffer(defaultAllocator, 2); - - b ~= buf; - assert(b.start == 0); - assert(b.capacity == 4); - assert(b.ring == 3); - assert(b.position == 3); - - dispose(defaultAllocator, b); - } - - /** - * Sets how many bytes were written. It will shrink the buffer - * appropriately. Always set this property after calling - * $(D_PSYMBOL buffer). - * - * Params: - * length = Length of the written data. - */ - @property void written(size_t length) @safe pure nothrow - in - { - assert(length <= this.length); - } - body - { - auto afterRing = ring + 1; - auto oldStart = start; - - if (length <= 0) - { - return; - } - else if (position <= afterRing) - { - start += length; - if (start > 0 && position == afterRing) - { - position = oldStart; - } - } - else - { - auto overflow = position - afterRing; - - if (overflow > length) { - _buffer[start.. start + length] = _buffer[afterRing.. afterRing + length]; - _buffer[afterRing.. afterRing + length] = _buffer[afterRing + length ..position]; - position -= length; - } - else if (overflow == length) - { - _buffer[start.. start + overflow] = _buffer[afterRing..position]; - position -= overflow; - } - else - { - _buffer[start.. start + overflow] = _buffer[afterRing..position]; - position = overflow; - } - start += length; - - if (start == position) - { - if (position != afterRing) - { - position = 0; - } - start = 0; - ring = capacity - 1; - } - } - if (start > ring) - { - start = 0; - } - } - - /// - unittest - { - auto b = make!WriteBuffer(defaultAllocator); - ubyte[6] buf = [23, 23, 255, 128, 127, 9]; - - b ~= buf; - assert(b.length == 6); - b.written = 2; - assert(b.length == 4); - b.written = 4; - assert(b.length == 0); - - dispose(defaultAllocator, b); - } - - /** - * Returns a pointer to a buffer chunk with data. You can pass it to - * a function that requires such a buffer. - * - * After calling it, set $(D_PSYMBOL written) to the length could be - * written. - * - * $(D_PSYMBOL buffer) may return only part of the data. You may need - * to call it (and set $(D_PSYMBOL written) several times until - * $(D_PSYMBOL length) is 0. If all the data can be written, - * maximally 3 calls are required. - * - * Returns: A chunk of data buffer. - */ - @property void* buffer() @safe pure nothrow - { - if (position > ring || position < start) // Buffer overflowed - { - return _buffer[start.. ring + 1].ptr; - } - else - { - return _buffer[start..position].ptr; - } - } - - /// - unittest - { - auto b = make!WriteBuffer(defaultAllocator, 6); - ubyte[6] buf = [23, 23, 255, 128, 127, 9]; - void* returnedBuf; - - b ~= buf; - returnedBuf = b.buffer; - assert(returnedBuf[0..b.length] == buf[0..6]); - b.written = 2; - - returnedBuf = b.buffer; - assert(returnedBuf[0..b.length] == buf[2..6]); - - b ~= buf; - returnedBuf = b.buffer; - assert(returnedBuf[0..b.length] == buf[2..6]); - b.written = b.length; - - returnedBuf = b.buffer; - assert(returnedBuf[0..b.length] == buf[0..6]); - b.written = b.length; - - dispose(defaultAllocator, b); - } -} +/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/**
+ * Copyright: Eugene Wissner 2016.
+ * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
+ * Mozilla Public License, v. 2.0).
+ * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner)
+ */
+module tanya.container.buffer;
+
+import tanya.memory;
+
+version (unittest)
+{
+ private int fillBuffer(ubyte[] buffer,
+ in size_t size,
+ int start = 0,
+ int end = 10) @nogc pure nothrow
+ in
+ {
+ assert(start < end);
+ }
+ body
+ {
+ auto numberRead = end - start;
+ for (ubyte i; i < numberRead; ++i)
+ {
+ buffer[i] = cast(ubyte) (start + i);
+ }
+ return numberRead;
+ }
+}
+
+/**
+ * Interface for implemeting input/output buffers.
+ */
+interface Buffer
+{
+ /**
+ * Returns: The size of the internal buffer.
+ */
+ @property size_t capacity() const @nogc @safe pure nothrow;
+
+ /**
+ * Returns: Data size.
+ */
+ @property size_t length() const @nogc @safe pure nothrow;
+
+ /**
+ * Returns: Available space.
+ */
+ @property size_t free() const @nogc @safe pure nothrow;
+
+ /**
+ * Params:
+ * start = Start position.
+ * end = End position.
+ *
+ * Returns: Array between $(D_PARAM start) and $(D_PARAM end).
+ */
+ @property ubyte[] opSlice(size_t start, size_t end)
+ in
+ {
+ assert(start <= end);
+ assert(end <= length);
+ }
+
+ /**
+ * Returns: Length of available data.
+ */
+ @property size_t opDollar() const pure nothrow @safe @nogc;
+
+ /**
+ * Returns: Data chunk.
+ */
+ @property ubyte[] opIndex();
+}
+
+/**
+ * Self-expanding buffer, that can be used with functions returning the number
+ * of the read bytes.
+ *
+ * This buffer supports asynchronous reading. It means you can pass a new chunk
+ * to an asynchronous read function during you are working with already
+ * available data. But only one asynchronous call at a time is supported. Be
+ * sure to call $(D_PSYMBOL ReadBuffer.clear()) before you append the result
+ * of the pended asynchronous call.
+ */
+class ReadBuffer : Buffer
+{
+ /// Internal buffer.
+ protected ubyte[] buffer_;
+
+ /// Filled buffer length.
+ protected size_t length_;
+
+ /// Start of available data.
+ protected size_t start;
+
+ /// Last position returned with $(D_KEYWORD []).
+ protected size_t ring;
+
+ /// Available space.
+ protected immutable size_t minAvailable;
+
+ /// Size by which the buffer will grow.
+ protected immutable size_t blockSize;
+
+ invariant
+ {
+ assert(length_ <= buffer_.length);
+ assert(blockSize > 0);
+ assert(minAvailable > 0);
+ }
+
+ /**
+ * Creates a new read buffer.
+ *
+ * Params:
+ * size = Initial buffer size and the size by which the buffer
+ * will grow.
+ * minAvailable = minimal size should be always available to fill.
+ * So it will reallocate if $(D_INLINECODE
+ * $(D_PSYMBOL free) < $(D_PARAM minAvailable)
+ * ).
+ */
+ this(size_t size = 8192,
+ size_t minAvailable = 1024)
+ {
+ this.minAvailable = minAvailable;
+ this.blockSize = size;
+ defaultAllocator.resizeArray!ubyte(buffer_, size);
+ }
+
+ /**
+ * Deallocates the internal buffer.
+ */
+ ~this()
+ {
+ defaultAllocator.dispose(buffer_);
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!ReadBuffer;
+ assert(b.capacity == 8192);
+ assert(b.length == 0);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Returns: The size of the internal buffer.
+ */
+ @property size_t capacity() const @nogc @safe pure nothrow
+ {
+ return buffer_.length;
+ }
+
+ /**
+ * Returns: Data size.
+ */
+ @property size_t length() const @nogc @safe pure nothrow
+ {
+ return length_ - start;
+ }
+
+ /**
+ * Clears the buffer.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ ReadBuffer clear() pure nothrow @safe @nogc
+ {
+ start = length_ = ring;
+ return this;
+ }
+
+ /**
+ * Returns: Available space.
+ */
+ @property size_t free() const pure nothrow @safe @nogc
+ {
+ return length > ring ? capacity - length : capacity - ring;
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!ReadBuffer;
+ size_t numberRead;
+
+ // Fills the buffer with values 0..10
+ assert(b.free == b.blockSize);
+
+ numberRead = fillBuffer(b[], b.free, 0, 10);
+ b += numberRead;
+ assert(b.free == b.blockSize - numberRead);
+ b.clear();
+ assert(b.free == b.blockSize);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Appends some data to the buffer.
+ *
+ * Params:
+ * length = Number of the bytes read.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ ReadBuffer opOpAssign(string op)(size_t length)
+ if (op == "+")
+ {
+ length_ += length;
+ ring = start;
+ return this;
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!ReadBuffer;
+ size_t numberRead;
+ ubyte[] result;
+
+ // Fills the buffer with values 0..10
+ numberRead = fillBuffer(b[], b.free, 0, 10);
+ b += numberRead;
+
+ result = b[0..$];
+ assert(result[0] == 0);
+ assert(result[1] == 1);
+ assert(result[9] == 9);
+ b.clear();
+
+ // It shouldn't overwrite, but append another 5 bytes to the buffer
+ numberRead = fillBuffer(b[], b.free, 0, 10);
+ b += numberRead;
+
+ numberRead = fillBuffer(b[], b.free, 20, 25);
+ b += numberRead;
+
+ result = b[0..$];
+ assert(result[0] == 0);
+ assert(result[1] == 1);
+ assert(result[9] == 9);
+ assert(result[10] == 20);
+ assert(result[14] == 24);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Returns: Length of available data.
+ */
+ @property size_t opDollar() const pure nothrow @safe @nogc
+ {
+ return length;
+ }
+
+ /**
+ * Params:
+ * start = Start position.
+ * end = End position.
+ *
+ * Returns: Array between $(D_PARAM start) and $(D_PARAM end).
+ */
+ @property ubyte[] opSlice(size_t start, size_t end) pure nothrow @safe @nogc
+ {
+ return buffer_[this.start + start .. this.start + end];
+ }
+
+ /**
+ * Returns a free chunk of the buffer.
+ *
+ * Add ($(D_KEYWORD +=)) the number of the read bytes after using it.
+ *
+ * Returns: A free chunk of the buffer.
+ */
+ ubyte[] opIndex()
+ {
+ if (start > 0)
+ {
+ auto ret = buffer_[0..start];
+ ring = 0;
+ return ret;
+ }
+ else
+ {
+ if (capacity - length < minAvailable)
+ {
+ defaultAllocator.resizeArray!ubyte(buffer_, capacity + blockSize);
+ }
+ ring = length_;
+ return buffer_[length_..$];
+ }
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!ReadBuffer;
+ size_t numberRead;
+ ubyte[] result;
+
+ // Fills the buffer with values 0..10
+ numberRead = fillBuffer(b[], b.free, 0, 10);
+ b += numberRead;
+
+ assert(b.length == 10);
+ result = b[0..$];
+ assert(result[0] == 0);
+ assert(result[9] == 9);
+ b.clear();
+ assert(b.length == 0);
+
+ defaultAllocator.dispose(b);
+ }
+}
+
+/**
+ * Circular, self-expanding buffer with overflow support. Can be used with
+ * functions returning returning the number of the transferred bytes.
+ *
+ * The buffer is optimized for situations where you read all the data from it
+ * at once (without writing to it occasionally). It can become ineffective if
+ * you permanently keep some data in the buffer and alternate writing and
+ * reading, because it may allocate and move elements.
+ */
+class WriteBuffer : Buffer
+{
+ /// Internal buffer.
+ protected ubyte[] buffer_;
+
+ /// Buffer start position.
+ protected size_t start;
+
+ /// Buffer ring area size. After this position begins buffer overflow area.
+ protected size_t ring;
+
+ /// Size by which the buffer will grow.
+ protected immutable size_t blockSize;
+
+ /// The position of the free area in the buffer.
+ protected size_t position;
+
+ invariant
+ {
+ assert(blockSize > 0);
+ // position can refer to an element outside the buffer if the buffer is full.
+ assert(position <= buffer_.length);
+ }
+
+ /**
+ * Params:
+ * size = Initial buffer size and the size by which the buffer
+ * will grow.
+ */
+ this(size_t size = 8192)
+ {
+ blockSize = size;
+ ring = size - 1;
+ defaultAllocator.resizeArray!ubyte(buffer_, size);
+ }
+
+ /**
+ * Deallocates the internal buffer.
+ */
+ ~this()
+ {
+ defaultAllocator.dispose(buffer_);
+ }
+
+ /**
+ * Returns: The size of the internal buffer.
+ */
+ @property size_t capacity() const @nogc @safe pure nothrow
+ {
+ return buffer_.length;
+ }
+
+ /**
+ * Note that $(D_PSYMBOL length) doesn't return the real length of the data,
+ * but only the array length that will be returned with $(D_PSYMBOL buffer)
+ * next time. Be sure to call $(D_PSYMBOL buffer) and set $(D_KEYWORD +=)
+ * until $(D_PSYMBOL length) returns 0.
+ *
+ * Returns: Data size.
+ */
+ @property size_t length() const @nogc @safe pure nothrow
+ {
+ if (position > ring || position < start) // Buffer overflowed
+ {
+ return ring - start + 1;
+ }
+ else
+ {
+ return position - start;
+ }
+ }
+
+ /**
+ * Returns: Length of available data.
+ */
+ @property size_t opDollar() const pure nothrow @safe @nogc
+ {
+ return length;
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!WriteBuffer(4);
+ ubyte[3] buf = [48, 23, 255];
+
+ b ~= buf;
+ assert(b.length == 3);
+ b += 2;
+ assert(b.length == 1);
+
+ b ~= buf;
+ assert(b.length == 2);
+ b += 2;
+ assert(b.length == 2);
+
+ b ~= buf;
+ assert(b.length == 5);
+ b += b.length;
+ assert(b.length == 0);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Returns: Available space.
+ */
+ @property size_t free() const @nogc @safe pure nothrow
+ {
+ return capacity - length;
+ }
+
+ /**
+ * Appends data to the buffer.
+ *
+ * Params:
+ * buffer = Buffer chunk got with $(D_PSYMBOL buffer).
+ */
+ WriteBuffer opOpAssign(string op)(ubyte[] buffer)
+ if (op == "~")
+ {
+ size_t end, start;
+
+ if (position >= this.start && position <= ring)
+ {
+ auto afterRing = ring + 1;
+
+ end = position + buffer.length;
+ if (end > afterRing)
+ {
+ end = afterRing;
+ }
+ start = end - position;
+ buffer_[position..end] = buffer[0..start];
+ if (end == afterRing)
+ {
+ position = this.start == 0 ? afterRing : 0;
+ }
+ else
+ {
+ position = end;
+ }
+ }
+
+ // Check if we have some free space at the beginning
+ if (start < buffer.length && position < this.start)
+ {
+ end = position + buffer.length - start;
+ if (end > this.start)
+ {
+ end = this.start;
+ }
+ auto areaEnd = end - position + start;
+ buffer_[position..end] = buffer[start..areaEnd];
+ position = end == this.start ? ring + 1 : end - position;
+ start = areaEnd;
+ }
+
+ // And if we still haven't found any place, save the rest in the overflow area
+ if (start < buffer.length)
+ {
+ end = position + buffer.length - start;
+ if (end > capacity)
+ {
+ auto newSize = end / blockSize * blockSize + blockSize;
+
+ defaultAllocator.resizeArray!ubyte(buffer_, newSize);
+ }
+ buffer_[position..end] = buffer[start..$];
+ position = end;
+ if (this.start == 0)
+ {
+ ring = capacity - 1;
+ }
+ }
+
+ return this;
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!WriteBuffer(4);
+ ubyte[3] buf = [48, 23, 255];
+
+ b ~= buf;
+ assert(b.capacity == 4);
+ assert(b.buffer_[0] == 48 && b.buffer_[1] == 23 && b.buffer_[2] == 255);
+
+ b += 2;
+ b ~= buf;
+ assert(b.capacity == 4);
+ assert(b.buffer_[0] == 23 && b.buffer_[1] == 255
+ && b.buffer_[2] == 255 && b.buffer_[3] == 48);
+
+ b += 2;
+ b ~= buf;
+ assert(b.capacity == 8);
+ assert(b.buffer_[0] == 23 && b.buffer_[1] == 255
+ && b.buffer_[2] == 48 && b.buffer_[3] == 23 && b.buffer_[4] == 255);
+
+ defaultAllocator.dispose(b);
+
+ b = make!WriteBuffer(defaultAllocator, 2);
+
+ b ~= buf;
+ assert(b.start == 0);
+ assert(b.capacity == 4);
+ assert(b.ring == 3);
+ assert(b.position == 3);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Sets how many bytes were written. It will shrink the buffer
+ * appropriately. Always set this property after calling
+ * $(D_PSYMBOL buffer).
+ *
+ * Params:
+ * length = Length of the written data.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ @property WriteBuffer opOpAssign(string op)(size_t length) pure nothrow @safe @nogc
+ if (op == "+")
+ in
+ {
+ assert(length <= this.length);
+ }
+ body
+ {
+ auto afterRing = ring + 1;
+ auto oldStart = start;
+
+ if (length <= 0)
+ {
+ return this;
+ }
+ else if (position <= afterRing)
+ {
+ start += length;
+ if (start > 0 && position == afterRing)
+ {
+ position = oldStart;
+ }
+ }
+ else
+ {
+ auto overflow = position - afterRing;
+
+ if (overflow > length) {
+ buffer_[start.. start + length] = buffer_[afterRing.. afterRing + length];
+ buffer_[afterRing.. afterRing + length] = buffer_[afterRing + length ..position];
+ position -= length;
+ }
+ else if (overflow == length)
+ {
+ buffer_[start.. start + overflow] = buffer_[afterRing..position];
+ position -= overflow;
+ }
+ else
+ {
+ buffer_[start.. start + overflow] = buffer_[afterRing..position];
+ position = overflow;
+ }
+ start += length;
+
+ if (start == position)
+ {
+ if (position != afterRing)
+ {
+ position = 0;
+ }
+ start = 0;
+ ring = capacity - 1;
+ }
+ }
+ if (start > ring)
+ {
+ start = 0;
+ }
+ return this;
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!WriteBuffer;
+ ubyte[6] buf = [23, 23, 255, 128, 127, 9];
+
+ b ~= buf;
+ assert(b.length == 6);
+ b += 2;
+ assert(b.length == 4);
+ b += 4;
+ assert(b.length == 0);
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * Returns a chunk with data.
+ *
+ * After calling it, set $(D_KEYWORD +=) to the length could be
+ * written.
+ *
+ * $(D_PSYMBOL buffer) may return only part of the data. You may need
+ * to call it (and set $(D_KEYWORD +=) several times until
+ * $(D_PSYMBOL length) is 0. If all the data can be written,
+ * maximally 3 calls are required.
+ *
+ * Returns: A chunk of data buffer.
+ */
+ @property ubyte[] opSlice(size_t start, size_t end) pure nothrow @safe @nogc
+ {
+ immutable internStart = this.start + start;
+
+ if (position > ring || position < start) // Buffer overflowed
+ {
+ return buffer_[this.start.. ring + 1 - length + end];
+ }
+ else
+ {
+ return buffer_[this.start.. this.start + end];
+ }
+ }
+
+ ///
+ unittest
+ {
+ auto b = defaultAllocator.make!WriteBuffer(6);
+ ubyte[6] buf = [23, 23, 255, 128, 127, 9];
+
+ b ~= buf;
+ assert(b[0..$] == buf[0..6]);
+ b += 2;
+
+ assert(b[0..$] == buf[2..6]);
+
+ b ~= buf;
+ assert(b[0..$] == buf[2..6]);
+ b += b.length;
+
+ assert(b[0..$] == buf[0..6]);
+ b += b.length;
+
+ defaultAllocator.dispose(b);
+ }
+
+ /**
+ * After calling it, set $(D_KEYWORD +=) to the length could be
+ * written.
+ *
+ * $(D_PSYMBOL buffer) may return only part of the data. You may need
+ * to call it (and set $(D_KEYWORD +=) several times until
+ * $(D_PSYMBOL length) is 0. If all the data can be written,
+ * maximally 3 calls are required.
+ *
+ * Returns: A chunk of data buffer.
+ */
+ @property ubyte[] opIndex() pure nothrow @safe @nogc
+ {
+ return opSlice(0, length);
+ }
+}
diff --git a/source/tanya/event/internal/epoll.d b/source/tanya/event/internal/epoll.d deleted file mode 100644 index a129661..0000000 --- a/source/tanya/event/internal/epoll.d +++ /dev/null @@ -1,223 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Copyright: Eugene Wissner 2016. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) - */ -module tanya.event.internal.epoll; - -import tanya.event.config; - -static if (UseEpoll): - -public import core.sys.linux.epoll; -import tanya.event.internal.selector; -import tanya.event.protocol; -import tanya.event.transport; -import tanya.event.watcher; -import tanya.event.loop; -import tanya.container.list; -import tanya.memory; -import core.stdc.errno; -import core.sys.posix.fcntl; -import core.sys.posix.netinet.in_; -import core.time; -import std.algorithm.comparison; - -extern (C) nothrow -{ // TODO: Make a pull request for Phobos to mark this extern functions as @nogc. - int epoll_create1(int __flags); - int epoll_ctl(int __epfd, int __op, int __fd, epoll_event *__event); - int epoll_wait(int __epfd, epoll_event *__events, int __maxevents, int __timeout); - int accept4(int, sockaddr*, socklen_t*, int flags); -} - -private enum maxEvents = 128; - -class EpollLoop : Loop -{ - /** - * Initializes the loop. - */ - this() - { - super(); - - if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0) - { - return; - } - epollEvents = makeArray!epoll_event(defaultAllocator, maxEvents).ptr; - } - - /** - * Frees loop internals. - */ - ~this() - { - dispose(defaultAllocator, epollEvents); - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * socket = Socket. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - protected override bool modify(int socket, EventMask oldEvents, EventMask events) - { - int op = EPOLL_CTL_DEL; - epoll_event ev; - - if (events == oldEvents) - { - return true; - } - if (events && oldEvents) - { - op = EPOLL_CTL_MOD; - } - else if (events && !oldEvents) - { - op = EPOLL_CTL_ADD; - } - - ev.data.fd = socket; - ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0) - | (events & Event.write ? EPOLLOUT : 0) - | EPOLLET; - - return epoll_ctl(fd, op, socket, &ev) == 0; - } - - /** - * Accept incoming connections. - * - * Params: - * protocolFactory = Protocol factory. - * socket = Socket. - */ - protected override void acceptConnection(Protocol delegate() protocolFactory, - int socket) - { - sockaddr_in client_addr; - socklen_t client_len = client_addr.sizeof; - int client = accept4(socket, - cast(sockaddr *)&client_addr, - &client_len, - O_NONBLOCK); - while (client >= 0) - { - auto transport = make!SocketTransport(defaultAllocator, this, client); - IOWatcher connection; - - if (connections.length > client) - { - connection = cast(IOWatcher) connections[client]; - // If it is a ConnectionWatcher - if (connection is null && connections[client] !is null) - { - dispose(defaultAllocator, connections[client]); - connections[client] = null; - } - } - if (connection !is null) - { - connection(protocolFactory, transport); - } - else - { - connections[client] = make!IOWatcher(defaultAllocator, - protocolFactory, - transport); - } - - modify(client, EventMask(Event.none), EventMask(Event.read, Event.write)); - - swapPendings.insertBack(connections[client]); - - client = accept4(socket, - cast(sockaddr *)&client_addr, - &client_len, - O_NONBLOCK); - } - } - - /** - * Does the actual polling. - */ - protected override void poll() - { - // Don't block - immutable timeout = cast(immutable int) blockTime.total!"msecs"; - auto eventCount = epoll_wait(fd, epollEvents, maxEvents, timeout); - - if (eventCount < 0) - { - if (errno != EINTR) - { - throw make!BadLoopException(defaultAllocator); - } - - return; - } - - for (auto i = 0; i < eventCount; ++i) - { - epoll_event *ev = epollEvents + i; - auto connection = cast(IOWatcher) connections[ev.data.fd]; - - if (connection is null) - { - swapPendings.insertBack(connections[ev.data.fd]); -// acceptConnection(connections[ev.data.fd].protocol, -// connections[ev.data.fd].socket); - } - else - { - auto transport = cast(SocketTransport) connection.transport; - assert(transport !is null); - - if (ev.events & (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP)) - { - try - { - while (!transport.receive()) - { - } - swapPendings.insertBack(connection); - } - catch (TransportException e) - { - swapPendings.insertBack(connection); - dispose(defaultAllocator, e); - } - } - else if (ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) - { - transport.writeReady = true; - } - } - } - } - - /** - * Returns: The blocking time. - */ - override protected @property inout(Duration) blockTime() - inout @safe pure nothrow - { - return min(super.blockTime, 1.dur!"seconds"); - } - - private int fd; - private epoll_event* epollEvents; -} diff --git a/source/tanya/event/internal/selector.d b/source/tanya/event/internal/selector.d deleted file mode 100644 index 95bf5d8..0000000 --- a/source/tanya/event/internal/selector.d +++ /dev/null @@ -1,216 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Copyright: Eugene Wissner 2016. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) - */ -module tanya.event.internal.selector; - -import tanya.memory; -import tanya.container.buffer; -import tanya.event.loop; -import tanya.event.protocol; -import tanya.event.transport; -import core.stdc.errno; -import core.sys.posix.netinet.in_; -import core.sys.posix.unistd; - -/** - * Transport for stream sockets. - */ -class SocketTransport : DuplexTransport -{ - private int socket_ = -1; - - private Protocol protocol_; - - /// Input buffer. - private WriteBuffer input_; - - /// Output buffer. - private ReadBuffer output_; - - private Loop loop; - - private bool disconnected_; - - package bool writeReady; - - /** - * Params: - * loop = Event loop. - * socket = Socket. - * protocol = Protocol. - */ - this(Loop loop, int socket, Protocol protocol = null) - { - socket_ = socket; - protocol_ = protocol; - this.loop = loop; - input_ = make!WriteBuffer(defaultAllocator); - output_ = make!ReadBuffer(defaultAllocator); - } - - /** - * Close the transport and deallocate the data buffers. - */ - ~this() - { - close(socket); - dispose(defaultAllocator, input_); - dispose(defaultAllocator, output_); - dispose(defaultAllocator, protocol_); - } - - /** - * Returns: Transport socket. - */ - int socket() const @safe pure nothrow - { - return socket_; - } - - /** - * Returns: Protocol. - */ - @property Protocol protocol() @safe pure nothrow - { - return protocol_; - } - - /** - * Returns: $(D_KEYWORD true) if the remote peer closed the connection, - * $(D_KEYWORD false) otherwise. - */ - @property immutable(bool) disconnected() const @safe pure nothrow - { - return disconnected_; - } - - /** - * Params: - * protocol = Application protocol. - */ - @property void protocol(Protocol protocol) @safe pure nothrow - { - protocol_ = protocol; - } - - /** - * Returns: Application protocol. - */ - @property inout(Protocol) protocol() inout @safe pure nothrow - { - return protocol_; - } - - /** - * Write some data to the transport. - * - * Params: - * data = Data to send. - */ - void write(ubyte[] data) - { - // If the buffer wasn't empty the transport should be already there. - if (!input.length && data.length) - { - loop.feed(this); - } - input ~= data; - } - - /** - * Returns: Input buffer. - */ - @property WriteBuffer input() @safe pure nothrow - { - return input_; - } - - /** - * Returns: Output buffer. - */ - @property ReadBuffer output() @safe pure nothrow - { - return output_; - } - - /** - * Read data from the socket. Returns $(D_KEYWORD true) if the reading - * is completed. In the case that the peer closed the connection, returns - * $(D_KEYWORD true) aswell. - * - * Returns: Whether the reading is completed. - * - * Throws: $(D_PSYMBOL TransportException) if a read error is occured. - */ - bool receive() - { - auto readCount = recv(socket, output.buffer, output.free, 0); - - if (readCount > 0) - { - output_ ~= output.buffer[0..readCount]; - return false; - } - else if (readCount == 0) - { - disconnected_ = true; - return true; - } - else if (errno == EAGAIN || errno == EWOULDBLOCK) - { - return true; - } - else - { - disconnected_ = true; - throw make!TransportException(defaultAllocator, - "Read from the socket failed."); - } - } - - /** - * Returns: Whether the writing is completed. - * - * Throws: $(D_PSYMBOL TransportException) if a read error is occured. - */ - bool send() - { - auto sentCount = core.sys.posix.netinet.in_.send(socket, - input.buffer, - input.length, - 0); - - input.written = sentCount; - if (input.length == 0) - { - return true; - } - else if (sentCount >= 0) - { - loop.feed(this); - - return false; - } - else if (errno == EAGAIN || errno == EWOULDBLOCK) - { - writeReady = false; - loop.feed(this); - - return false; - } - else - { - disconnected_ = true; - loop.feed(this); - throw make!TransportException(defaultAllocator, - "Write to the socket failed."); - } - } -} diff --git a/source/tanya/event/loop.d b/source/tanya/event/loop.d deleted file mode 100644 index 035c0b6..0000000 --- a/source/tanya/event/loop.d +++ /dev/null @@ -1,275 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Copyright: Eugene Wissner 2016. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) - */ -module tanya.event.loop; - -import tanya.memory; -import tanya.container.queue; -import tanya.container.vector; -import tanya.event.config; -import tanya.event.protocol; -import tanya.event.transport; -import tanya.event.watcher; -import tanya.container.buffer; -import core.thread; -import core.time; -import std.algorithm.iteration; -import std.algorithm.mutation; -import std.typecons; - -static if (UseEpoll) -{ - import tanya.event.internal.epoll; -} - -/** - * Events. - */ -enum Event : uint -{ - none = 0x00, /// No events. - read = 0x01, /// Non-blocking read call. - write = 0x02, /// Non-blocking write call. - accept = 0x04, /// Connection made. -} - -alias EventMask = BitFlags!Event; - -/** - * Event loop. - */ -abstract class Loop -{ - /// Pending watchers. - protected Queue!Watcher pendings; - - protected Queue!Watcher swapPendings; - - /// Pending connections. - protected Vector!ConnectionWatcher connections; - - /** - * Initializes the loop. - */ - this() - { - connections = make!(Vector!ConnectionWatcher)(defaultAllocator); - pendings = make!(Queue!Watcher)(defaultAllocator); - swapPendings = make!(Queue!Watcher)(defaultAllocator); - } - - /** - * Frees loop internals. - */ - ~this() - { - dispose(defaultAllocator, connections); - dispose(defaultAllocator, pendings); - dispose(defaultAllocator, swapPendings); - } - - /** - * Starts the loop. - */ - void run() - { - done_ = false; - do - { - poll(); - - // Invoke pendings - swapPendings.each!((ref p) => p.invoke()); - - swap(pendings, swapPendings); - } - while (!done_); - } - - /** - * Break out of the loop. - */ - void unloop() @safe pure nothrow - { - done_ = true; - } - - /** - * Start watching. - * - * Params: - * watcher = Watcher. - */ - void start(ConnectionWatcher watcher) - { - if (watcher.active) - { - return; - } - watcher.active = true; - watcher.accept = &acceptConnection; - connections[watcher.socket] = watcher; - - modify(watcher.socket, EventMask(Event.none), EventMask(Event.accept)); - } - - /** - * Stop watching. - * - * Params: - * watcher = Watcher. - */ - void stop(ConnectionWatcher watcher) - { - if (!watcher.active) - { - return; - } - watcher.active = false; - - modify(watcher.socket, EventMask(Event.accept), EventMask(Event.none)); - } - - /** - * Feeds the given event set into the event loop, as if the specified event - * had happened for the specified watcher. - * - * Params: - * transport = Affected transport. - */ - void feed(DuplexTransport transport) - { - pendings.insertBack(connections[transport.socket]); - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * socket = Socket. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - protected bool modify(int socket, EventMask oldEvents, EventMask events); - - /** - * Returns: The blocking time. - */ - protected @property inout(Duration) blockTime() - inout @safe pure nothrow - { - // Don't block if we have to do. - return swapPendings.empty ? blockTime_ : Duration.zero; - } - - /** - * Sets the blocking time for IO watchers. - * - * Params: - * blockTime = The blocking time. Cannot be larger than - * $(D_PSYMBOL maxBlockTime). - */ - protected @property void blockTime(in Duration blockTime) @safe pure nothrow - in - { - assert(blockTime <= 1.dur!"hours", "Too long to wait."); - assert(!blockTime.isNegative); - } - body - { - blockTime_ = blockTime; - } - - /** - * Does the actual polling. - */ - protected void poll(); - - /** - * Accept incoming connections. - * - * Params: - * protocolFactory = Protocol factory. - * socket = Socket. - */ - protected void acceptConnection(Protocol delegate() protocolFactory, - int socket); - - /// Whether the event loop should be stopped. - private bool done_; - - /// Maximal block time. - protected Duration blockTime_ = 1.dur!"minutes"; -} - -/** - * Exception thrown on errors in the event loop. - */ -class BadLoopException : Exception -{ - /** - * Params: - * file = The file where the exception occurred. - * line = The line number where the exception occurred. - * next = The previous exception in the chain of exceptions, if any. - */ - this(string file = __FILE__, size_t line = __LINE__, Throwable next = null) - pure @safe nothrow const @nogc - { - super("Event loop cannot be initialized.", file, line, next); - } -} - -/** - * Returns the event loop used by default. If an event loop wasn't set with - * $(D_PSYMBOL defaultLoop) before, $(D_PSYMBOL getDefaultLoop()) will try to - * choose an event loop supported on the system. - * - * Returns: The default event loop. - */ -Loop getDefaultLoop() -{ - if (_defaultLoop !is null) - { - return _defaultLoop; - } - - static if (UseEpoll) - { - _defaultLoop = make!EpollLoop(defaultAllocator); - } - - return _defaultLoop; -} - -/** - * Sets the default event loop. - * - * This property makes it possible to implement your own backends or event - * loops, for example, if the system is not supported or if you want to - * extend the supported implementation. Just extend $(D_PSYMBOL Loop) and pass - * your implementation to this property. - * - * Params: - * loop = The event loop. - */ -@property void defaultLoop(Loop loop) -in -{ - assert(loop !is null); -} -body -{ - _defaultLoop = loop; -} - -private Loop _defaultLoop; diff --git a/source/tanya/event/package.d b/source/tanya/event/package.d deleted file mode 100644 index 9dcaeeb..0000000 --- a/source/tanya/event/package.d +++ /dev/null @@ -1,16 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Copyright: Eugene Wissner 2016. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) - */ -module tanya.event; - -public import tanya.event.loop; -public import tanya.event.protocol; -public import tanya.event.transport; -public import tanya.event.watcher; diff --git a/source/tanya/event/protocol.d b/source/tanya/event/protocol.d deleted file mode 100644 index 02c2f35..0000000 --- a/source/tanya/event/protocol.d +++ /dev/null @@ -1,46 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Copyright: Eugene Wissner 2016. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) - */ -module tanya.event.protocol; - -import tanya.event.transport; - -/** - * Common protocol interface. - */ -interface Protocol -{ -@nogc: - /** - * Params: - * data = Read data. - */ - void received(ubyte[] data); - - /** - * Called when a connection is made. - * - * Params: - * transport = Protocol transport. - */ - void connected(DuplexTransport transport); - - /** - * Called when a connection is lost. - */ - void disconnected(); -} - -/** - * Interface for TCP. - */ -interface TransmissionControlProtocol : Protocol -{ -} diff --git a/source/tanya/event/transport.d b/source/tanya/event/transport.d deleted file mode 100644 index e1dc6f7..0000000 --- a/source/tanya/event/transport.d +++ /dev/null @@ -1,134 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Copyright: Eugene Wissner 2016. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) - */ -module tanya.event.transport; - -import tanya.container.buffer; -import tanya.event.protocol; - -/** - * Exception thrown on read/write errors. - */ -class TransportException : Exception -{ - /** - * Params: - * msg = Message to output. - * file = The file where the exception occurred. - * line = The line number where the exception occurred. - * next = The previous exception in the chain of exceptions, if any. - */ - this(string msg, - string file = __FILE__, - size_t line = __LINE__, - Throwable next = null) pure @safe nothrow const @nogc - { - super(msg, file, line, next); - } -} - -/** - * Base transport interface. - */ -interface Transport -{ - /** - * Returns: Protocol. - */ - @property Protocol protocol() @safe pure nothrow; - - /** - * Returns: $(D_KEYWORD true) if the peer closed the connection, - * $(D_KEYWORD false) otherwise. - */ - @property immutable(bool) disconnected() const @safe pure nothrow; - - /** - * Params: - * protocol = Application protocol. - */ - @property void protocol(Protocol protocol) @safe pure nothrow - in - { - assert(protocol !is null, "protocolConnected cannot be unset."); - } - - /** - * Returns: Application protocol. - */ - @property inout(Protocol) protocol() inout @safe pure nothrow; - - /** - * Returns: Transport socket. - */ - int socket() const @safe pure nothrow; -} - -/** - * Interface for read-only transports. - */ -interface ReadTransport : Transport -{ - /** - * Returns: Underlying output buffer. - */ - @property ReadBuffer output(); - - /** - * Reads data into the buffer. - * - * Returns: Whether the reading is completed. - * - * Throws: $(D_PSYMBOL TransportException) if a read error is occured. - */ - bool receive() - in - { - assert(!disconnected); - } -} - -/** - * Interface for write-only transports. - */ -interface WriteTransport : Transport -{ - /** - * Returns: Underlying input buffer. - */ - @property WriteBuffer input(); - - /** - * Write some data to the transport. - * - * Params: - * data = Data to send. - */ - void write(ubyte[] data); - - /** - * Returns: Whether the writing is completed. - * - * Throws: $(D_PSYMBOL TransportException) if a read error is occured. - */ - bool send() - in - { - assert(input.length); - assert(!disconnected); - } -} - -/** - * Represents a bidirectional transport. - */ -abstract class DuplexTransport : ReadTransport, WriteTransport -{ -} diff --git a/source/tanya/event/watcher.d b/source/tanya/event/watcher.d deleted file mode 100644 index b9880eb..0000000 --- a/source/tanya/event/watcher.d +++ /dev/null @@ -1,207 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -/** - * Copyright: Eugene Wissner 2016. - * License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/, - * Mozilla Public License, v. 2.0). - * Authors: $(LINK2 mailto:belka@caraus.de, Eugene Wissner) - */ -module tanya.event.watcher; - -import tanya.event.protocol; -import tanya.event.transport; -import tanya.memory; -import std.functional; - -/** - * A watcher is an opaque structure that you allocate and register to record - * your interest in some event. - */ -abstract class Watcher -{ - /// Whether the watcher is active. - bool active; - - /** - * Invoke some action on event. - */ - void invoke(); -} - -class ConnectionWatcher : Watcher -{ - /// Watched file descriptor. - private int socket_; - - /// Protocol factory. - protected Protocol delegate() protocolFactory; - - /// Callback. - package void delegate(Protocol delegate() protocolFactory, - int socket) accept; - - invariant - { - assert(socket_ >= 0, "Called with negative file descriptor."); - } - - /** - * Params: - * protocolFactory = Function returning a new $(D_PSYMBOL Protocol) instance. - * socket = Socket. - */ - this(Protocol function() protocolFactory, int socket) - { - this.protocolFactory = toDelegate(protocolFactory); - socket_ = socket; - } - - /// Ditto. - this(Protocol delegate() protocolFactory, int socket) - { - this.protocolFactory = protocolFactory; - socket_ = socket; - } - - /// Ditto. - protected this(Protocol function() protocolFactory) - { - this.protocolFactory = toDelegate(protocolFactory); - } - - /// Ditto. - protected this(Protocol delegate() protocolFactory) - { - this.protocolFactory = protocolFactory; - } - - /** - * Returns: Socket. - */ - @property inout(int) socket() inout @safe pure nothrow - { - return socket_; - } - - /** - * Returns: Application protocol factory. - */ - @property inout(Protocol delegate()) protocol() inout - { - return protocolFactory; - } - - override void invoke() - { - accept(protocol, socket); - } -} - -/** - * Contains a pending watcher with the invoked events or a transport can be - * read from. - */ -class IOWatcher : ConnectionWatcher -{ - /// References a watcher or a transport. - DuplexTransport transport_; - - /** - * Params: - * protocolFactory = Function returning application specific protocol. - * transport = Transport. - */ - this(Protocol delegate() protocolFactory, - DuplexTransport transport) - in - { - assert(transport !is null); - assert(protocolFactory !is null); - } - body - { - super(protocolFactory); - this.transport_ = transport; - } - - ~this() - { - dispose(defaultAllocator, transport_); - } - - /** - * Assigns a transport. - * - * Params: - * protocolFactory = Function returning application specific protocol. - * transport = Transport. - * - * Returns: $(D_KEYWORD this). - */ - IOWatcher opCall(Protocol delegate() protocolFactory, - DuplexTransport transport) @safe pure nothrow - in - { - assert(transport !is null); - assert(protocolFactory !is null); - } - body - { - this.protocolFactory = protocolFactory; - this.transport_ = transport; - return this; - } - - /** - * Returns: Transport used by this watcher. - */ - @property inout(DuplexTransport) transport() inout @safe pure nothrow - { - return transport_; - } - - /** - * Returns: Socket. - */ - override @property inout(int) socket() inout @safe pure nothrow - { - return transport.socket; - } - - /** - * Invokes the watcher callback. - * - * Finalizes the transport on disconnect. - */ - override void invoke() - { - if (transport.protocol is null) - { - transport.protocol = protocolFactory(); - transport.protocol.connected(transport); - } - else if (transport.disconnected) - { - transport.protocol.disconnected(); - dispose(defaultAllocator, transport_); - protocolFactory = null; - } - else if (transport.output.length) - { - transport.protocol.received(transport.output[]); - } - else if (transport.input.length) - { - try - { - transport.send(); - } - catch (TransportException e) - { - dispose(defaultAllocator, e); - } - } - } -} |
