summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEugen Wissner <belka@caraus.de>2016-12-02 19:18:37 +0100
committerEugen Wissner <belka@caraus.de>2016-12-02 19:18:37 +0100
commitf7f92e79066350cf7167700bb5a019994a4fdb63 (patch)
tree6b4aa79dd1adea7616423a74f108b8c98675b68c
parent1123d01e6c205796db376d4404cd6c53962788de (diff)
downloadtanya-f7f92e79066350cf7167700bb5a019994a4fdb63.tar.gz
Switch to container.queue. Remove PendingQueue
-rw-r--r--source/tanya/async/event/epoll.d294
-rw-r--r--source/tanya/async/event/iocp.d522
-rw-r--r--source/tanya/async/event/kqueue.d560
-rw-r--r--source/tanya/async/event/selector.d426
-rw-r--r--source/tanya/async/iocp.d12
-rw-r--r--source/tanya/async/loop.d612
-rw-r--r--source/tanya/async/protocol.d40
-rw-r--r--source/tanya/async/transport.d16
-rw-r--r--source/tanya/async/watcher.d401
-rw-r--r--source/tanya/container/queue.d5
10 files changed, 1389 insertions, 1499 deletions
diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d
index 7fec708..7717b52 100644
--- a/source/tanya/async/event/epoll.d
+++ b/source/tanya/async/event/epoll.d
@@ -28,151 +28,151 @@ import std.algorithm.comparison;
class EpollLoop : SelectorLoop
{
- protected int fd;
- private epoll_event[] events;
-
- /**
- * Initializes the loop.
- */
- this()
- {
- if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
- {
- throw MmapPool.instance.make!BadLoopException("epoll initialization failed");
- }
- super();
- events = MmapPool.instance.makeArray!epoll_event(maxEvents);
- }
-
- /**
- * Free loop internals.
- */
- ~this()
- {
- MmapPool.instance.dispose(events);
- close(fd);
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- protected override bool reify(ConnectionWatcher watcher, EventMask oldEvents, EventMask events)
- in
- {
- assert(watcher !is null);
- }
- body
- {
- int op = EPOLL_CTL_DEL;
- epoll_event ev;
-
- if (events == oldEvents)
- {
- return true;
- }
- if (events && oldEvents)
- {
- op = EPOLL_CTL_MOD;
- }
- else if (events && !oldEvents)
- {
- op = EPOLL_CTL_ADD;
- }
-
- ev.data.fd = watcher.socket.handle;
- ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
- | (events & Event.write ? EPOLLOUT : 0)
- | EPOLLET;
-
- return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0;
- }
-
- /**
- * Does the actual polling.
- */
- protected override void poll()
- {
- // Don't block
- immutable timeout = cast(immutable int) blockTime.total!"msecs";
- auto eventCount = epoll_wait(fd, events.ptr, maxEvents, timeout);
-
- if (eventCount < 0)
- {
- if (errno != EINTR)
- {
- throw theAllocator.make!BadLoopException();
- }
- return;
- }
-
- for (auto i = 0; i < eventCount; ++i)
- {
- auto io = cast(IOWatcher) connections[events[i].data.fd];
-
- if (io is null)
- {
- acceptConnections(connections[events[i].data.fd]);
- }
- else if (events[i].events & EPOLLERR)
- {
- kill(io, null);
- }
- else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
- {
- auto transport = cast(SelectorStreamTransport) io.transport;
- assert(transport !is null);
-
- SocketException exception;
- try
- {
- ptrdiff_t received;
- do
- {
- received = transport.socket.receive(io.output[]);
- io.output += received;
- }
- while (received);
- }
- catch (SocketException e)
- {
- exception = e;
- }
- if (transport.socket.disconnected)
- {
- kill(io, exception);
- }
- else if (io.output.length)
- {
- swapPendings.insertBack(io);
- }
- }
- else if (events[i].events & EPOLLOUT)
- {
- auto transport = cast(SelectorStreamTransport) io.transport;
- assert(transport !is null);
-
- transport.writeReady = true;
- if (transport.input.length)
- {
- feed(transport);
- }
- }
- }
- }
-
- /**
- * Returns: The blocking time.
- */
- override protected @property inout(Duration) blockTime()
- inout @safe pure nothrow
- {
- return min(super.blockTime, 1.dur!"seconds");
- }
+ protected int fd;
+ private epoll_event[] events;
+
+ /**
+ * Initializes the loop.
+ */
+ this()
+ {
+ if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
+ {
+ throw MmapPool.instance.make!BadLoopException("epoll initialization failed");
+ }
+ super();
+ events = MmapPool.instance.makeArray!epoll_event(maxEvents);
+ }
+
+ /**
+ * Free loop internals.
+ */
+ ~this()
+ {
+ MmapPool.instance.dispose(events);
+ close(fd);
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ protected override bool reify(ConnectionWatcher watcher, EventMask oldEvents, EventMask events)
+ in
+ {
+ assert(watcher !is null);
+ }
+ body
+ {
+ int op = EPOLL_CTL_DEL;
+ epoll_event ev;
+
+ if (events == oldEvents)
+ {
+ return true;
+ }
+ if (events && oldEvents)
+ {
+ op = EPOLL_CTL_MOD;
+ }
+ else if (events && !oldEvents)
+ {
+ op = EPOLL_CTL_ADD;
+ }
+
+ ev.data.fd = watcher.socket.handle;
+ ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
+ | (events & Event.write ? EPOLLOUT : 0)
+ | EPOLLET;
+
+ return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0;
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ protected override void poll()
+ {
+ // Don't block
+ immutable timeout = cast(immutable int) blockTime.total!"msecs";
+ auto eventCount = epoll_wait(fd, events.ptr, maxEvents, timeout);
+
+ if (eventCount < 0)
+ {
+ if (errno != EINTR)
+ {
+ throw theAllocator.make!BadLoopException();
+ }
+ return;
+ }
+
+ for (auto i = 0; i < eventCount; ++i)
+ {
+ auto io = cast(IOWatcher) connections[events[i].data.fd];
+
+ if (io is null)
+ {
+ acceptConnections(connections[events[i].data.fd]);
+ }
+ else if (events[i].events & EPOLLERR)
+ {
+ kill(io, null);
+ }
+ else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ SocketException exception;
+ try
+ {
+ ptrdiff_t received;
+ do
+ {
+ received = transport.socket.receive(io.output[]);
+ io.output += received;
+ }
+ while (received);
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ }
+ if (transport.socket.disconnected)
+ {
+ kill(io, exception);
+ }
+ else if (io.output.length)
+ {
+ swapPendings.insertBack(io);
+ }
+ }
+ else if (events[i].events & EPOLLOUT)
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ transport.writeReady = true;
+ if (transport.input.length)
+ {
+ feed(transport);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns: The blocking time.
+ */
+ override protected @property inout(Duration) blockTime()
+ inout @safe pure nothrow
+ {
+ return min(super.blockTime, 1.dur!"seconds");
+ }
}
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d
index bcfda71..935dd5e 100644
--- a/source/tanya/async/event/iocp.d
+++ b/source/tanya/async/event/iocp.d
@@ -28,267 +28,267 @@ import core.sys.windows.winsock2;
class IOCPStreamTransport : StreamTransport
{
- private OverlappedConnectedSocket socket_;
-
- private WriteBuffer input;
-
- /**
- * Creates new completion port transport.
- * Params:
- * socket = Socket.
- */
- this(OverlappedConnectedSocket socket)
- in
- {
- assert(socket !is null);
- }
- body
- {
- socket_ = socket;
- input = MmapPool.instance.make!WriteBuffer();
- }
-
- ~this()
- {
- MmapPool.instance.dispose(input);
- }
-
- @property inout(OverlappedConnectedSocket) socket() inout pure nothrow @safe @nogc
- {
- return socket_;
- }
-
- /**
- * Write some data to the transport.
- *
- * Params:
- * data = Data to send.
- */
- void write(ubyte[] data)
- {
- immutable empty = input.length == 0;
- input ~= data;
- if (empty)
- {
- SocketState overlapped;
- try
- {
- overlapped = MmapPool.instance.make!SocketState;
- socket.beginSend(input[], overlapped);
- }
- catch (SocketException e)
- {
- MmapPool.instance.dispose(overlapped);
- MmapPool.instance.dispose(e);
- }
- }
- }
+ private OverlappedConnectedSocket socket_;
+
+ private WriteBuffer input;
+
+ /**
+ * Creates new completion port transport.
+ * Params:
+ * socket = Socket.
+ */
+ this(OverlappedConnectedSocket socket)
+ in
+ {
+ assert(socket !is null);
+ }
+ body
+ {
+ socket_ = socket;
+ input = MmapPool.instance.make!WriteBuffer();
+ }
+
+ ~this()
+ {
+ MmapPool.instance.dispose(input);
+ }
+
+ @property inout(OverlappedConnectedSocket) socket() inout pure nothrow @safe @nogc
+ {
+ return socket_;
+ }
+
+ /**
+ * Write some data to the transport.
+ *
+ * Params:
+ * data = Data to send.
+ */
+ void write(ubyte[] data)
+ {
+ immutable empty = input.length == 0;
+ input ~= data;
+ if (empty)
+ {
+ SocketState overlapped;
+ try
+ {
+ overlapped = MmapPool.instance.make!SocketState;
+ socket.beginSend(input[], overlapped);
+ }
+ catch (SocketException e)
+ {
+ MmapPool.instance.dispose(overlapped);
+ MmapPool.instance.dispose(e);
+ }
+ }
+ }
}
-class IOCPLoop : Loop
-{
- protected HANDLE completionPort;
-
- protected OVERLAPPED overlap;
-
- /**
- * Initializes the loop.
- */
- this()
- {
- super();
-
- completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
- if (!completionPort)
- {
- throw theAllocator.make!BadLoopException("Creating completion port failed");
- }
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- override protected bool reify(ConnectionWatcher watcher,
- EventMask oldEvents,
- EventMask events)
- {
- SocketState overlapped;
- if (!(oldEvents & Event.accept) && (events & Event.accept))
- {
- auto socket = cast(OverlappedStreamSocket) watcher.socket;
- assert(socket !is null);
-
- if (CreateIoCompletionPort(cast(HANDLE) socket.handle,
- completionPort,
- cast(ULONG_PTR) (cast(void*) watcher),
- 0) !is completionPort)
- {
- return false;
- }
-
- try
- {
- overlapped = MmapPool.instance.make!SocketState;
- socket.beginAccept(overlapped);
- }
- catch (SocketException e)
- {
- MmapPool.instance.dispose(overlapped);
- theAllocator.dispose(e);
- return false;
- }
- }
- if (!(oldEvents & Event.read) && (events & Event.read)
- || !(oldEvents & Event.write) && (events & Event.write))
- {
- auto io = cast(IOWatcher) watcher;
- assert(io !is null);
-
- auto transport = cast(IOCPStreamTransport) io.transport;
- assert(transport !is null);
-
- if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
- completionPort,
- cast(ULONG_PTR) (cast(void*) watcher),
- 0) !is completionPort)
- {
- return false;
- }
-
- // Begin to read
- if (!(oldEvents & Event.read) && (events & Event.read))
- {
- try
- {
- overlapped = MmapPool.instance.make!SocketState;
- transport.socket.beginReceive(io.output[], overlapped);
- }
- catch (SocketException e)
- {
- MmapPool.instance.dispose(overlapped);
- theAllocator.dispose(e);
- return false;
- }
- }
- }
- return true;
- }
-
- /**
- * Does the actual polling.
- */
- override protected void poll()
- {
- DWORD lpNumberOfBytes;
- ULONG_PTR key;
- LPOVERLAPPED overlap;
- immutable timeout = cast(immutable int) blockTime.total!"msecs";
-
- auto result = GetQueuedCompletionStatus(completionPort,
- &lpNumberOfBytes,
- &key,
- &overlap,
- timeout);
- if (result == FALSE && overlap == NULL)
- {
- return; // Timeout
- }
-
- auto overlapped = (cast(SocketState) ((cast(void*) overlap) - 8));
- assert(overlapped !is null);
- scope (failure)
- {
- MmapPool.instance.dispose(overlapped);
- }
-
- switch (overlapped.event)
- {
- case OverlappedSocketEvent.accept:
- auto connection = cast(ConnectionWatcher) (cast(void*) key);
- assert(connection !is null);
-
- auto listener = cast(OverlappedStreamSocket) connection.socket;
- assert(listener !is null);
-
- auto socket = listener.endAccept(overlapped);
- auto transport = MmapPool.instance.make!IOCPStreamTransport(socket);
- auto io = MmapPool.instance.make!IOWatcher(transport, connection.protocol);
-
- connection.incoming.insertBack(io);
-
- reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
-
- swapPendings.insertBack(connection);
- listener.beginAccept(overlapped);
- break;
- case OverlappedSocketEvent.read:
- auto io = cast(IOWatcher) (cast(void*) key);
- assert(io !is null);
- if (!io.active)
- {
- MmapPool.instance.dispose(io);
- MmapPool.instance.dispose(overlapped);
- return;
- }
-
- auto transport = cast(IOCPStreamTransport) io.transport;
- assert(transport !is null);
-
- int received;
- SocketException exception;
- try
- {
- received = transport.socket.endReceive(overlapped);
- }
- catch (SocketException e)
- {
- exception = e;
- }
- if (transport.socket.disconnected)
- {
- // We want to get one last notification to destroy the watcher
- transport.socket.beginReceive(io.output[], overlapped);
- kill(io, exception);
- }
- else if (received > 0)
- {
- immutable full = io.output.free == received;
-
- io.output += received;
- // Receive was interrupted because the buffer is full. We have to continue
- if (full)
- {
- transport.socket.beginReceive(io.output[], overlapped);
- }
- swapPendings.insertBack(io);
- }
- break;
- case OverlappedSocketEvent.write:
- auto io = cast(IOWatcher) (cast(void*) key);
- assert(io !is null);
-
- auto transport = cast(IOCPStreamTransport) io.transport;
- assert(transport !is null);
-
- transport.input += transport.socket.endSend(overlapped);
- if (transport.input.length)
- {
- transport.socket.beginSend(transport.input[], overlapped);
- }
- else
- {
- transport.socket.beginReceive(io.output[], overlapped);
- }
- break;
- default:
- assert(false, "Unknown event");
- }
- }
+ class IOCPLoop : Loop
+ {
+ protected HANDLE completionPort;
+
+ protected OVERLAPPED overlap;
+
+ /**
+ * Initializes the loop.
+ */
+ this()
+ {
+ super();
+
+ completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ if (!completionPort)
+ {
+ throw theAllocator.make!BadLoopException("Creating completion port failed");
+ }
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ override protected bool reify(ConnectionWatcher watcher,
+ EventMask oldEvents,
+ EventMask events)
+ {
+ SocketState overlapped;
+ if (!(oldEvents & Event.accept) && (events & Event.accept))
+ {
+ auto socket = cast(OverlappedStreamSocket) watcher.socket;
+ assert(socket !is null);
+
+ if (CreateIoCompletionPort(cast(HANDLE) socket.handle,
+ completionPort,
+ cast(ULONG_PTR) (cast(void*) watcher),
+ 0) !is completionPort)
+ {
+ return false;
+ }
+
+ try
+ {
+ overlapped = MmapPool.instance.make!SocketState;
+ socket.beginAccept(overlapped);
+ }
+ catch (SocketException e)
+ {
+ MmapPool.instance.dispose(overlapped);
+ theAllocator.dispose(e);
+ return false;
+ }
+ }
+ if (!(oldEvents & Event.read) && (events & Event.read)
+ || !(oldEvents & Event.write) && (events & Event.write))
+ {
+ auto io = cast(IOWatcher) watcher;
+ assert(io !is null);
+
+ auto transport = cast(IOCPStreamTransport) io.transport;
+ assert(transport !is null);
+
+ if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
+ completionPort,
+ cast(ULONG_PTR) (cast(void*) watcher),
+ 0) !is completionPort)
+ {
+ return false;
+ }
+
+ // Begin to read
+ if (!(oldEvents & Event.read) && (events & Event.read))
+ {
+ try
+ {
+ overlapped = MmapPool.instance.make!SocketState;
+ transport.socket.beginReceive(io.output[], overlapped);
+ }
+ catch (SocketException e)
+ {
+ MmapPool.instance.dispose(overlapped);
+ theAllocator.dispose(e);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ override protected void poll()
+ {
+ DWORD lpNumberOfBytes;
+ ULONG_PTR key;
+ LPOVERLAPPED overlap;
+ immutable timeout = cast(immutable int) blockTime.total!"msecs";
+
+ auto result = GetQueuedCompletionStatus(completionPort,
+ &lpNumberOfBytes,
+ &key,
+ &overlap,
+ timeout);
+ if (result == FALSE && overlap == NULL)
+ {
+ return; // Timeout
+ }
+
+ auto overlapped = (cast(SocketState) ((cast(void*) overlap) - 8));
+ assert(overlapped !is null);
+ scope (failure)
+ {
+ MmapPool.instance.dispose(overlapped);
+ }
+
+ switch (overlapped.event)
+ {
+ case OverlappedSocketEvent.accept:
+ auto connection = cast(ConnectionWatcher) (cast(void*) key);
+ assert(connection !is null);
+
+ auto listener = cast(OverlappedStreamSocket) connection.socket;
+ assert(listener !is null);
+
+ auto socket = listener.endAccept(overlapped);
+ auto transport = MmapPool.instance.make!IOCPStreamTransport(socket);
+ auto io = MmapPool.instance.make!IOWatcher(transport, connection.protocol);
+
+ connection.incoming.insertBack(io);
+
+ reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
+
+ swapPendings.insertBack(connection);
+ listener.beginAccept(overlapped);
+ break;
+ case OverlappedSocketEvent.read:
+ auto io = cast(IOWatcher) (cast(void*) key);
+ assert(io !is null);
+ if (!io.active)
+ {
+ MmapPool.instance.dispose(io);
+ MmapPool.instance.dispose(overlapped);
+ return;
+ }
+
+ auto transport = cast(IOCPStreamTransport) io.transport;
+ assert(transport !is null);
+
+ int received;
+ SocketException exception;
+ try
+ {
+ received = transport.socket.endReceive(overlapped);
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ }
+ if (transport.socket.disconnected)
+ {
+ // We want to get one last notification to destroy the watcher
+ transport.socket.beginReceive(io.output[], overlapped);
+ kill(io, exception);
+ }
+ else if (received > 0)
+ {
+ immutable full = io.output.free == received;
+
+ io.output += received;
+ // Receive was interrupted because the buffer is full. We have to continue
+ if (full)
+ {
+ transport.socket.beginReceive(io.output[], overlapped);
+ }
+ swapPendings.insertBack(io);
+ }
+ break;
+ case OverlappedSocketEvent.write:
+ auto io = cast(IOWatcher) (cast(void*) key);
+ assert(io !is null);
+
+ auto transport = cast(IOCPStreamTransport) io.transport;
+ assert(transport !is null);
+
+ transport.input += transport.socket.endSend(overlapped);
+ if (transport.input.length)
+ {
+ transport.socket.beginSend(transport.input[], overlapped);
+ }
+ else
+ {
+ transport.socket.beginReceive(io.output[], overlapped);
+ }
+ break;
+ default:
+ assert(false, "Unknown event");
+ }
+ }
}
diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d
index 773ea12..fa9261f 100644
--- a/source/tanya/async/event/kqueue.d
+++ b/source/tanya/async/event/kqueue.d
@@ -12,117 +12,117 @@ module tanya.async.event.kqueue;
version (OSX)
{
- version = MissingKevent;
+ version = MissingKevent;
}
else version (iOS)
{
- version = MissingKevent;
+ version = MissingKevent;
}
else version (TVOS)
{
- version = MissingKevent;
+ version = MissingKevent;
}
else version (WatchOS)
{
- version = MissingKevent;
+ version = MissingKevent;
}
else version (OpenBSD)
{
- version = MissingKevent;
+ version = MissingKevent;
}
else version (DragonFlyBSD)
{
- version = MissingKevent;
+ version = MissingKevent;
}
version (MissingKevent)
{
- extern (C):
- nothrow:
- @nogc:
-
- import core.stdc.stdint; // intptr_t, uintptr_t
- import core.sys.posix.time; // timespec
-
- enum : short
- {
- EVFILT_READ = -1,
- EVFILT_WRITE = -2,
- EVFILT_AIO = -3, /* attached to aio requests */
- EVFILT_VNODE = -4, /* attached to vnodes */
- EVFILT_PROC = -5, /* attached to struct proc */
- EVFILT_SIGNAL = -6, /* attached to struct proc */
- EVFILT_TIMER = -7, /* timers */
- EVFILT_MACHPORT = -8, /* Mach portsets */
- EVFILT_FS = -9, /* filesystem events */
- EVFILT_USER = -10, /* User events */
- EVFILT_VM = -12, /* virtual memory events */
- EVFILT_SYSCOUNT = 11
- }
-
- extern(D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args)
- {
- *kevp = kevent_t(args);
- }
-
- struct kevent_t
- {
- uintptr_t ident; /* identifier for this event */
- short filter; /* filter for event */
- ushort flags;
- uint fflags;
- intptr_t data;
- void *udata; /* opaque user data identifier */
- }
-
- enum
- {
- /* actions */
- EV_ADD = 0x0001, /* add event to kq (implies enable) */
- EV_DELETE = 0x0002, /* delete event from kq */
- EV_ENABLE = 0x0004, /* enable event */
- EV_DISABLE = 0x0008, /* disable event (not reported) */
-
- /* flags */
- EV_ONESHOT = 0x0010, /* only report one occurrence */
- EV_CLEAR = 0x0020, /* clear event state after reporting */
- EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
- EV_DISPATCH = 0x0080, /* disable event after reporting */
-
- EV_SYSFLAGS = 0xF000, /* reserved by system */
- EV_FLAG1 = 0x2000, /* filter-specific flag */
-
- /* returned values */
- EV_EOF = 0x8000, /* EOF detected */
- EV_ERROR = 0x4000, /* error, data contains errno */
- }
-
- int kqueue();
- int kevent(int kq, const kevent_t *changelist, int nchanges,
- kevent_t *eventlist, int nevents,
- const timespec *timeout);
+ extern (C):
+ nothrow:
+ @nogc:
+
+ import core.stdc.stdint; // intptr_t, uintptr_t
+ import core.sys.posix.time; // timespec
+
+ enum : short
+ {
+ EVFILT_READ = -1,
+ EVFILT_WRITE = -2,
+ EVFILT_AIO = -3, /* attached to aio requests */
+ EVFILT_VNODE = -4, /* attached to vnodes */
+ EVFILT_PROC = -5, /* attached to struct proc */
+ EVFILT_SIGNAL = -6, /* attached to struct proc */
+ EVFILT_TIMER = -7, /* timers */
+ EVFILT_MACHPORT = -8, /* Mach portsets */
+ EVFILT_FS = -9, /* filesystem events */
+ EVFILT_USER = -10, /* User events */
+ EVFILT_VM = -12, /* virtual memory events */
+ EVFILT_SYSCOUNT = 11
+ }
+
+ extern(D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args)
+ {
+ *kevp = kevent_t(args);
+ }
+
+ struct kevent_t
+ {
+ uintptr_t ident; /* identifier for this event */
+ short filter; /* filter for event */
+ ushort flags;
+ uint fflags;
+ intptr_t data;
+ void *udata; /* opaque user data identifier */
+ }
+
+ enum
+ {
+ /* actions */
+ EV_ADD = 0x0001, /* add event to kq (implies enable) */
+ EV_DELETE = 0x0002, /* delete event from kq */
+ EV_ENABLE = 0x0004, /* enable event */
+ EV_DISABLE = 0x0008, /* disable event (not reported) */
+
+ /* flags */
+ EV_ONESHOT = 0x0010, /* only report one occurrence */
+ EV_CLEAR = 0x0020, /* clear event state after reporting */
+ EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
+ EV_DISPATCH = 0x0080, /* disable event after reporting */
+
+ EV_SYSFLAGS = 0xF000, /* reserved by system */
+ EV_FLAG1 = 0x2000, /* filter-specific flag */
+
+ /* returned values */
+ EV_EOF = 0x8000, /* EOF detected */
+ EV_ERROR = 0x4000, /* error, data contains errno */
+ }
+
+ int kqueue();
+ int kevent(int kq, const kevent_t *changelist, int nchanges,
+ kevent_t *eventlist, int nevents,
+ const timespec *timeout);
}
version (OSX)
{
- version = MacBSD;
+ version = MacBSD;
}
else version (iOS)
{
- version = MacBSD;
+ version = MacBSD;
}
else version (FreeBSD)
{
- version = MacBSD;
- public import core.sys.freebsd.sys.event;
+ version = MacBSD;
+ public import core.sys.freebsd.sys.event;
}
else version (OpenBSD)
{
- version = MacBSD;
+ version = MacBSD;
}
else version (DragonFlyBSD)
{
- version = MacBSD;
+ version = MacBSD;
}
version (MacBSD):
@@ -142,208 +142,208 @@ import std.algorithm.comparison;
class KqueueLoop : SelectorLoop
{
- protected int fd;
- private kevent_t[] events;
- private kevent_t[] changes;
- private size_t changeCount;
-
- /**
- * Returns: Maximal event count can be got at a time
- * (should be supported by the backend).
- */
- override protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc
- {
- return cast(uint) events.length;
- }
-
- this()
- {
- super();
-
- if ((fd = kqueue()) == -1)
- {
- throw MmapPool.instance.make!BadLoopException("epoll initialization failed");
- }
- events = MmapPool.instance.makeArray!kevent_t(64);
- changes = MmapPool.instance.makeArray!kevent_t(64);
- }
-
- /**
- * Free loop internals.
- */
- ~this()
- {
- MmapPool.instance.dispose(events);
- MmapPool.instance.dispose(changes);
- close(fd);
- }
-
- private void set(socket_t socket, short filter, ushort flags)
- {
- if (changes.length <= changeCount)
- {
- MmapPool.instance.resizeArray(changes, changeCount + maxEvents);
- }
- EV_SET(&changes[changeCount],
- cast(ulong) socket,
- filter,
- flags,
- 0U,
- 0L,
- null);
- ++changeCount;
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- override protected bool reify(ConnectionWatcher watcher,
- EventMask oldEvents,
- EventMask events)
- {
- if (events != oldEvents)
- {
- if (oldEvents & Event.read || oldEvents & Event.accept)
- {
- set(watcher.socket.handle, EVFILT_READ, EV_DELETE);
- }
- if (oldEvents & Event.write)
- {
- set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE);
- }
- }
- if (events & (Event.read | events & Event.accept))
- {
- set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE);
- }
- if (events & Event.write)
- {
- set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH);
- }
- return true;
- }
-
- /**
- * Does the actual polling.
- */
- protected override void poll()
- {
- timespec ts;
- blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec);
-
- if (changeCount > maxEvents)
- {
- MmapPool.instance.resizeArray(events, changes.length);
- }
-
- auto eventCount = kevent(fd, changes.ptr, cast(int) changeCount, events.ptr, maxEvents, &ts);
- changeCount = 0;
-
- if (eventCount < 0)
- {
- if (errno != EINTR)
- {
- throw theAllocator.make!BadLoopException();
- }
- return;
- }
-
- for (int i; i < eventCount; ++i)
- {
- assert(connections.length > events[i].ident);
-
- IOWatcher io = cast(IOWatcher) connections[events[i].ident];
- // If it is a ConnectionWatcher. Accept connections.
- if (io is null)
- {
- acceptConnections(connections[events[i].ident]);
- }
- else if (events[i].flags & EV_ERROR)
- {
- kill(io, null);
- }
- else if (events[i].filter == EVFILT_READ)
- {
- auto transport = cast(SelectorStreamTransport) io.transport;
- assert(transport !is null);
-
- SocketException exception;
- try
- {
- ptrdiff_t received;
- do
- {
- received = transport.socket.receive(io.output[]);
- io.output += received;
- }
- while (received);
- }
- catch (SocketException e)
- {
- exception = e;
- }
- if (transport.socket.disconnected)
- {
- kill(io, exception);
- }
- else if (io.output.length)
- {
- swapPendings.insertBack(io);
- }
- }
- else if (events[i].filter == EVFILT_WRITE)
- {
- auto transport = cast(SelectorStreamTransport) io.transport;
- assert(transport !is null);
-
- transport.writeReady = true;
- if (transport.input.length)
- {
- feed(transport);
- }
- }
- }
- }
-
- /**
- * Returns: The blocking time.
- */
- override protected @property inout(Duration) blockTime()
- inout @safe pure nothrow
- {
- return min(super.blockTime, 1.dur!"seconds");
- }
-
- /**
- * If the transport couldn't send the data, the further sending should
- * be handled by the event loop.
- *
- * Params:
- * transport = Transport.
- * exception = Exception thrown on sending.
- *
- * Returns: $(D_KEYWORD true) if the operation could be successfully
- * completed or scheduled, $(D_KEYWORD false) otherwise (the
- * transport is be destroyed then).
- */
- protected override bool feed(SelectorStreamTransport transport, SocketException exception = null)
- {
- if (!super.feed(transport, exception))
- {
- return false;
- }
- if (!transport.writeReady)
- {
- set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH);
- return true;
- }
- return false;
- }
+ protected int fd;
+ private kevent_t[] events;
+ private kevent_t[] changes;
+ private size_t changeCount;
+
+ /**
+ * Returns: Maximal event count can be got at a time
+ * (should be supported by the backend).
+ */
+ override protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc
+ {
+ return cast(uint) events.length;
+ }
+
+ this()
+ {
+ super();
+
+ if ((fd = kqueue()) == -1)
+ {
+ throw MmapPool.instance.make!BadLoopException("epoll initialization failed");
+ }
+ events = MmapPool.instance.makeArray!kevent_t(64);
+ changes = MmapPool.instance.makeArray!kevent_t(64);
+ }
+
+ /**
+ * Free loop internals.
+ */
+ ~this()
+ {
+ MmapPool.instance.dispose(events);
+ MmapPool.instance.dispose(changes);
+ close(fd);
+ }
+
+ private void set(socket_t socket, short filter, ushort flags)
+ {
+ if (changes.length <= changeCount)
+ {
+ MmapPool.instance.resizeArray(changes, changeCount + maxEvents);
+ }
+ EV_SET(&changes[changeCount],
+ cast(ulong) socket,
+ filter,
+ flags,
+ 0U,
+ 0L,
+ null);
+ ++changeCount;
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ override protected bool reify(ConnectionWatcher watcher,
+ EventMask oldEvents,
+ EventMask events)
+ {
+ if (events != oldEvents)
+ {
+ if (oldEvents & Event.read || oldEvents & Event.accept)
+ {
+ set(watcher.socket.handle, EVFILT_READ, EV_DELETE);
+ }
+ if (oldEvents & Event.write)
+ {
+ set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE);
+ }
+ }
+ if (events & (Event.read | events & Event.accept))
+ {
+ set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE);
+ }
+ if (events & Event.write)
+ {
+ set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH);
+ }
+ return true;
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ protected override void poll()
+ {
+ timespec ts;
+ blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec);
+
+ if (changeCount > maxEvents)
+ {
+ MmapPool.instance.resizeArray(events, changes.length);
+ }
+
+ auto eventCount = kevent(fd, changes.ptr, cast(int) changeCount, events.ptr, maxEvents, &ts);
+ changeCount = 0;
+
+ if (eventCount < 0)
+ {
+ if (errno != EINTR)
+ {
+ throw theAllocator.make!BadLoopException();
+ }
+ return;
+ }
+
+ for (int i; i < eventCount; ++i)
+ {
+ assert(connections.length > events[i].ident);
+
+ IOWatcher io = cast(IOWatcher) connections[events[i].ident];
+ // If it is a ConnectionWatcher. Accept connections.
+ if (io is null)
+ {
+ acceptConnections(connections[events[i].ident]);
+ }
+ else if (events[i].flags & EV_ERROR)
+ {
+ kill(io, null);
+ }
+ else if (events[i].filter == EVFILT_READ)
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ SocketException exception;
+ try
+ {
+ ptrdiff_t received;
+ do
+ {
+ received = transport.socket.receive(io.output[]);
+ io.output += received;
+ }
+ while (received);
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ }
+ if (transport.socket.disconnected)
+ {
+ kill(io, exception);
+ }
+ else if (io.output.length)
+ {
+ swapPendings.insertBack(io);
+ }
+ }
+ else if (events[i].filter == EVFILT_WRITE)
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ transport.writeReady = true;
+ if (transport.input.length)
+ {
+ feed(transport);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns: The blocking time.
+ */
+ override protected @property inout(Duration) blockTime()
+ inout @safe pure nothrow
+ {
+ return min(super.blockTime, 1.dur!"seconds");
+ }
+
+ /**
+ * If the transport couldn't send the data, the further sending should
+ * be handled by the event loop.
+ *
+ * Params:
+ * transport = Transport.
+ * exception = Exception thrown on sending.
+ *
+ * Returns: $(D_KEYWORD true) if the operation could be successfully
+ * completed or scheduled, $(D_KEYWORD false) otherwise (the
+ * transport is be destroyed then).
+ */
+ protected override bool feed(SelectorStreamTransport transport, SocketException exception = null)
+ {
+ if (!super.feed(transport, exception))
+ {
+ return false;
+ }
+ if (!transport.writeReady)
+ {
+ set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH);
+ return true;
+ }
+ return false;
+ }
}
diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d
index e9a103d..c35a782 100644
--- a/source/tanya/async/event/selector.d
+++ b/source/tanya/async/event/selector.d
@@ -27,240 +27,240 @@ import core.stdc.errno;
*/
class SelectorStreamTransport : StreamTransport
{
- private ConnectedSocket socket_;
+ private ConnectedSocket socket_;
- /// Input buffer.
- package WriteBuffer input;
+ /// Input buffer.
+ package WriteBuffer input;
- private SelectorLoop loop;
+ private SelectorLoop loop;
- /// Received notification that the underlying socket is write-ready.
- package bool writeReady;
+ /// Received notification that the underlying socket is write-ready.
+ package bool writeReady;
- /**
- * Params:
- * loop = Event loop.
- * socket = Socket.
- */
- this(SelectorLoop loop, ConnectedSocket socket)
- {
- socket_ = socket;
- this.loop = loop;
- input = MmapPool.instance.make!WriteBuffer();
- }
+ /**
+ * Params:
+ * loop = Event loop.
+ * socket = Socket.
+ */
+ this(SelectorLoop loop, ConnectedSocket socket)
+ {
+ socket_ = socket;
+ this.loop = loop;
+ input = MmapPool.instance.make!WriteBuffer();
+ }
- /**
- * Close the transport and deallocate the data buffers.
- */
- ~this()
- {
- MmapPool.instance.dispose(input);
- }
+ /**
+ * Close the transport and deallocate the data buffers.
+ */
+ ~this()
+ {
+ MmapPool.instance.dispose(input);
+ }
- /**
- * Returns: Transport socket.
- */
- inout(ConnectedSocket) socket() inout pure nothrow @safe @nogc
- {
- return socket_;
- }
+ /**
+ * Returns: Transport socket.
+ */
+ inout(ConnectedSocket) socket() inout pure nothrow @safe @nogc
+ {
+ return socket_;
+ }
- /**
- * Write some data to the transport.
- *
- * Params:
- * data = Data to send.
- */
- void write(ubyte[] data)
- {
- if (!data.length)
- {
- return;
- }
- // Try to write if the socket is write ready.
- if (writeReady)
- {
- ptrdiff_t sent;
- SocketException exception;
- try
- {
- sent = socket.send(data);
- if (sent == 0)
- {
- writeReady = false;
- }
- }
- catch (SocketException e)
- {
- writeReady = false;
- exception = e;
- }
- if (sent < data.length)
- {
- input ~= data[sent..$];
- loop.feed(this, exception);
- }
- }
- else
- {
- input ~= data;
- }
- }
+ /**
+ * Write some data to the transport.
+ *
+ * Params:
+ * data = Data to send.
+ */
+ void write(ubyte[] data)
+ {
+ if (!data.length)
+ {
+ return;
+ }
+ // Try to write if the socket is write ready.
+ if (writeReady)
+ {
+ ptrdiff_t sent;
+ SocketException exception;
+ try
+ {
+ sent = socket.send(data);
+ if (sent == 0)
+ {
+ writeReady = false;
+ }
+ }
+ catch (SocketException e)
+ {
+ writeReady = false;
+ exception = e;
+ }
+ if (sent < data.length)
+ {
+ input ~= data[sent..$];
+ loop.feed(this, exception);
+ }
+ }
+ else
+ {
+ input ~= data;
+ }
+ }
}
abstract class SelectorLoop : Loop
{
- /// Pending connections.
- protected ConnectionWatcher[] connections;
+ /// Pending connections.
+ protected ConnectionWatcher[] connections;
- this()
- {
- super();
- connections = MmapPool.instance.makeArray!ConnectionWatcher(maxEvents);
- }
+ this()
+ {
+ super();
+ connections = MmapPool.instance.makeArray!ConnectionWatcher(maxEvents);
+ }
- ~this()
- {
- foreach (ref connection; connections)
- {
- // We want to free only IOWatchers. ConnectionWatcher are created by the
- // user and should be freed by himself.
- auto io = cast(IOWatcher) connection;
- if (io !is null)
- {
- MmapPool.instance.dispose(io);
- connection = null;
- }
- }
- MmapPool.instance.dispose(connections);
- }
+ ~this()
+ {
+ foreach (ref connection; connections)
+ {
+ // We want to free only IOWatchers. ConnectionWatcher are created by the
+ // user and should be freed by himself.
+ auto io = cast(IOWatcher) connection;
+ if (io !is null)
+ {
+ MmapPool.instance.dispose(io);
+ connection = null;
+ }
+ }
+ MmapPool.instance.dispose(connections);
+ }
- /**
- * If the transport couldn't send the data, the further sending should
- * be handled by the event loop.
- *
- * Params:
- * transport = Transport.
- * exception = Exception thrown on sending.
- *
- * Returns: $(D_KEYWORD true) if the operation could be successfully
- * completed or scheduled, $(D_KEYWORD false) otherwise (the
- * transport will be destroyed then).
- */
- protected bool feed(SelectorStreamTransport transport, SocketException exception = null)
- {
- while (transport.input.length && transport.writeReady)
- {
- try
- {
- ptrdiff_t sent = transport.socket.send(transport.input[]);
- if (sent == 0)
- {
- transport.writeReady = false;
- }
- else
- {
- transport.input += sent;
- }
- }
- catch (SocketException e)
- {
- exception = e;
- transport.writeReady = false;
- }
- }
- if (exception !is null)
- {
- auto watcher = cast(IOWatcher) connections[transport.socket.handle];
- assert(watcher !is null);
+ /**
+ * If the transport couldn't send the data, the further sending should
+ * be handled by the event loop.
+ *
+ * Params:
+ * transport = Transport.
+ * exception = Exception thrown on sending.
+ *
+ * Returns: $(D_KEYWORD true) if the operation could be successfully
+ * completed or scheduled, $(D_KEYWORD false) otherwise (the
+ * transport will be destroyed then).
+ */
+ protected bool feed(SelectorStreamTransport transport, SocketException exception = null)
+ {
+ while (transport.input.length && transport.writeReady)
+ {
+ try
+ {
+ ptrdiff_t sent = transport.socket.send(transport.input[]);
+ if (sent == 0)
+ {
+ transport.writeReady = false;
+ }
+ else
+ {
+ transport.input += sent;
+ }
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ transport.writeReady = false;
+ }
+ }
+ if (exception !is null)
+ {
+ auto watcher = cast(IOWatcher) connections[transport.socket.handle];
+ assert(watcher !is null);
- kill(watcher, exception);
- return false;
- }
- return true;
- }
+ kill(watcher, exception);
+ return false;
+ }
+ return true;
+ }
- /**
- * Start watching.
- *
- * Params:
- * watcher = Watcher.
- */
- override void start(ConnectionWatcher watcher)
- {
- if (watcher.active)
- {
- return;
- }
+ /**
+ * Start watching.
+ *
+ * Params:
+ * watcher = Watcher.
+ */
+ override void start(ConnectionWatcher watcher)
+ {
+ if (watcher.active)
+ {
+ return;
+ }
- if (connections.length <= watcher.socket)
- {
- MmapPool.instance.resizeArray(connections, watcher.socket.handle + maxEvents / 2);
- }
- connections[watcher.socket.handle] = watcher;
+ if (connections.length <= watcher.socket)
+ {
+ MmapPool.instance.resizeArray(connections, watcher.socket.handle + maxEvents / 2);
+ }
+ connections[watcher.socket.handle] = watcher;
- super.start(watcher);
- }
+ super.start(watcher);
+ }
- /**
- * Accept incoming connections.
- *
- * Params:
- * connection = Connection watcher ready to accept.
- */
- package void acceptConnections(ConnectionWatcher connection)
- in
- {
- assert(connection !is null);
- }
- body
- {
- while (true)
- {
- ConnectedSocket client;
- try
- {
- client = (cast(StreamSocket) connection.socket).accept();
- }
- catch (SocketException e)
- {
- theAllocator.dispose(e);
- break;
- }
- if (client is null)
- {
- break;
- }
+ /**
+ * Accept incoming connections.
+ *
+ * Params:
+ * connection = Connection watcher ready to accept.
+ */
+ package void acceptConnections(ConnectionWatcher connection)
+ in
+ {
+ assert(connection !is null);
+ }
+ body
+ {
+ while (true)
+ {
+ ConnectedSocket client;
+ try
+ {
+ client = (cast(StreamSocket) connection.socket).accept();
+ }
+ catch (SocketException e)
+ {
+ theAllocator.dispose(e);
+ break;
+ }
+ if (client is null)
+ {
+ break;
+ }
- IOWatcher io;
- auto transport = MmapPool.instance.make!SelectorStreamTransport(this, client);
+ IOWatcher io;
+ auto transport = MmapPool.instance.make!SelectorStreamTransport(this, client);
- if (connections.length > client.handle)
- {
- io = cast(IOWatcher) connections[client.handle];
- }
- else
- {
- MmapPool.instance.resizeArray(connections, client.handle + maxEvents / 2);
- }
- if (io is null)
- {
- io = MmapPool.instance.make!IOWatcher(transport,
- connection.protocol);
- connections[client.handle] = io;
- }
- else
- {
- io(transport, connection.protocol);
- }
+ if (connections.length > client.handle)
+ {
+ io = cast(IOWatcher) connections[client.handle];
+ }
+ else
+ {
+ MmapPool.instance.resizeArray(connections, client.handle + maxEvents / 2);
+ }
+ if (io is null)
+ {
+ io = MmapPool.instance.make!IOWatcher(transport,
+ connection.protocol);
+ connections[client.handle] = io;
+ }
+ else
+ {
+ io(transport, connection.protocol);
+ }
- reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
- connection.incoming.insertBack(io);
- }
+ reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
+ connection.incoming.insertBack(io);
+ }
- if (!connection.incoming.empty)
- {
- swapPendings.insertBack(connection);
- }
- }
+ if (!connection.incoming.empty)
+ {
+ swapPendings.insertBack(connection);
+ }
+ }
}
diff --git a/source/tanya/async/iocp.d b/source/tanya/async/iocp.d
index 8817e0c..f915e36 100644
--- a/source/tanya/async/iocp.d
+++ b/source/tanya/async/iocp.d
@@ -21,12 +21,12 @@ import core.sys.windows.windef;
*/
class State
{
- /// For internal use by Windows API.
- align(1) OVERLAPPED overlapped;
+ /// For internal use by Windows API.
+ align(1) OVERLAPPED overlapped;
- /// File/socket handle.
- HANDLE handle;
+ /// File/socket handle.
+ HANDLE handle;
- /// For keeping events or event masks.
- int event;
+ /// For keeping events or event masks.
+ int event;
}
diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d
index de1e401..15dfafd 100644
--- a/source/tanya/async/loop.d
+++ b/source/tanya/async/loop.d
@@ -10,100 +10,105 @@
*
* ---
* import tanya.async;
+ * import tanya.memory;
* import tanya.network.socket;
*
* class EchoProtocol : TransmissionControlProtocol
* {
- * private DuplexTransport transport;
+ * private DuplexTransport transport;
*
- * void received(ubyte[] data)
- * {
- * transport.write(data);
- * }
+ * void received(ubyte[] data)
+ * {
+ * transport.write(data);
+ * }
*
- * void connected(DuplexTransport transport)
- * {
- * this.transport = transport;
- * }
+ * void connected(DuplexTransport transport)
+ * {
+ * this.transport = transport;
+ * }
*
- * void disconnected(SocketException e = null)
- * {
- * }
+ * void disconnected(SocketException e = null)
+ * {
+ * }
* }
*
* void main()
* {
- * auto address = new InternetAddress("127.0.0.1", cast(ushort) 8192);
+ * auto address = theAllocator.make!InternetAddress("127.0.0.1", cast(ushort) 8192);
*
- * version (Windows)
- * {
- * auto sock = new OverlappedStreamSocket(AddressFamily.INET);
- * }
- * else
- * {
- * auto sock = new StreamSocket(AddressFamily.INET);
- * sock.blocking = false;
- * }
+ * version (Windows)
+ * {
+ * auto sock = theAllocator.make!OverlappedStreamSocket(AddressFamily.INET);
+ * }
+ * else
+ * {
+ * auto sock = theAllocator.make!StreamSocket(AddressFamily.INET);
+ * sock.blocking = false;
+ * }
*
- * sock.bind(address);
- * sock.listen(5);
+ * sock.bind(address);
+ * sock.listen(5);
*
- * auto io = new ConnectionWatcher(sock);
- * io.setProtocol!EchoProtocol;
+ * auto io = theAllocator.make!ConnectionWatcher(sock);
+ * io.setProtocol!EchoProtocol;
*
- * defaultLoop.start(io);
- * defaultLoop.run();
+ * defaultLoop.start(io);
+ * defaultLoop.run();
*
- * sock.shutdown();
+ * sock.shutdown();
+ * theAllocator.dispose(io);
+ * theAllocator.dispose(sock);
+ * theAllocator.dispose(address);
* }
* ---
*/
module tanya.async.loop;
+import core.time;
+import std.algorithm.iteration;
+import std.algorithm.mutation;
+import std.typecons;
import tanya.async.protocol;
import tanya.async.transport;
import tanya.async.watcher;
import tanya.container.buffer;
+import tanya.container.queue;
import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket;
-import core.time;
-import std.algorithm.iteration;
-import std.algorithm.mutation;
-import std.typecons;
version (DisableBackends)
{
}
else version (linux)
{
- import tanya.async.event.epoll;
- version = Epoll;
+ import tanya.async.event.epoll;
+ version = Epoll;
}
else version (Windows)
{
- import tanya.async.event.iocp;
- version = IOCP;
+ import tanya.async.event.iocp;
+ version = IOCP;
}
else version (OSX)
{
- version = Kqueue;
+ version = Kqueue;
}
else version (iOS)
{
- version = Kqueue;
+ version = Kqueue;
}
else version (FreeBSD)
{
- version = Kqueue;
+ version = Kqueue;
}
else version (OpenBSD)
{
- version = Kqueue;
+ version = Kqueue;
}
else version (DragonFlyBSD)
{
- version = Kqueue;
+ version = Kqueue;
}
/**
@@ -111,11 +116,11 @@ else version (DragonFlyBSD)
*/
enum Event : uint
{
- none = 0x00, /// No events.
- read = 0x01, /// Non-blocking read call.
- write = 0x02, /// Non-blocking write call.
- accept = 0x04, /// Connection made.
- error = 0x80000000, /// Sent when an error occurs.
+ none = 0x00, /// No events.
+ read = 0x01, /// Non-blocking read call.
+ write = 0x02, /// Non-blocking write call.
+ accept = 0x04, /// Connection made.
+ error = 0x80000000, /// Sent when an error occurs.
}
alias EventMask = BitFlags!Event;
@@ -125,161 +130,170 @@ alias EventMask = BitFlags!Event;
*/
abstract class Loop
{
- /// Pending watchers.
- protected PendingQueue!Watcher pendings;
-
- protected PendingQueue!Watcher swapPendings;
-
- /**
- * Returns: Maximal event count can be got at a time
- * (should be supported by the backend).
- */
- protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc
- {
- return 128U;
- }
-
- /**
- * Initializes the loop.
- */
- this()
- {
- pendings = MmapPool.instance.make!(PendingQueue!Watcher);
- swapPendings = MmapPool.instance.make!(PendingQueue!Watcher);
- }
-
- /**
- * Frees loop internals.
- */
- ~this()
- {
- MmapPool.instance.dispose(pendings);
- MmapPool.instance.dispose(swapPendings);
- }
-
- /**
- * Starts the loop.
- */
- void run()
- {
- done_ = false;
- do
- {
- poll();
-
- // Invoke pendings
- swapPendings.each!((ref p) => p.invoke());
-
- swap(pendings, swapPendings);
- }
- while (!done_);
- }
-
- /**
- * Break out of the loop.
- */
- void unloop() @safe pure nothrow
- {
- done_ = true;
- }
-
- /**
- * Start watching.
- *
- * Params:
- * watcher = Watcher.
- */
- void start(ConnectionWatcher watcher)
- {
- if (watcher.active)
- {
- return;
- }
- watcher.active = true;
- reify(watcher, EventMask(Event.none), EventMask(Event.accept));
- }
-
- /**
- * Stop watching.
- *
- * Params:
- * watcher = Watcher.
- */
- void stop(ConnectionWatcher watcher)
- {
- if (!watcher.active)
- {
- return;
- }
- watcher.active = false;
-
- reify(watcher, EventMask(Event.accept), EventMask(Event.none));
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- abstract protected bool reify(ConnectionWatcher watcher,
- EventMask oldEvents,
- EventMask events);
-
- /**
- * Returns: The blocking time.
- */
- protected @property inout(Duration) blockTime()
- inout @safe pure nothrow
- {
- // Don't block if we have to do.
- return swapPendings.empty ? blockTime_ : Duration.zero;
- }
-
- /**
- * Sets the blocking time for IO watchers.
- *
- * Params:
- * blockTime = The blocking time. Cannot be larger than
- * $(D_PSYMBOL maxBlockTime).
- */
- protected @property void blockTime(in Duration blockTime) @safe pure nothrow
- in
- {
- assert(blockTime <= 1.dur!"hours", "Too long to wait.");
- assert(!blockTime.isNegative);
- }
- body
- {
- blockTime_ = blockTime;
- }
-
- /**
- * Kills the watcher and closes the connection.
- */
- protected void kill(IOWatcher watcher, SocketException exception)
- {
- watcher.socket.shutdown();
- theAllocator.dispose(watcher.socket);
- MmapPool.instance.dispose(watcher.transport);
- watcher.exception = exception;
- swapPendings.insertBack(watcher);
- }
-
- /**
- * Does the actual polling.
- */
- abstract protected void poll();
-
- /// Whether the event loop should be stopped.
- private bool done_;
-
- /// Maximal block time.
- protected Duration blockTime_ = 1.dur!"minutes";
+ /// Pending watchers.
+ protected Queue!Watcher pendings;
+
+ protected Queue!Watcher swapPendings;
+
+ /**
+ * Returns: Maximal event count can be got at a time
+ * (should be supported by the backend).
+ */
+ protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc
+ {
+ return 128U;
+ }
+
+ /**
+ * Initializes the loop.
+ */
+ this()
+ {
+ pendings = MmapPool.instance.make!(Queue!Watcher);
+ swapPendings = MmapPool.instance.make!(Queue!Watcher);
+ }
+
+ /**
+ * Frees loop internals.
+ */
+ ~this()
+ {
+ foreach (w; pendings)
+ {
+ MmapPool.instance.dispose(w);
+ }
+ MmapPool.instance.dispose(pendings);
+
+ foreach (w; swapPendings)
+ {
+ MmapPool.instance.dispose(w);
+ }
+ MmapPool.instance.dispose(swapPendings);
+ }
+
+ /**
+ * Starts the loop.
+ */
+ void run()
+ {
+ done_ = false;
+ do
+ {
+ poll();
+
+ // Invoke pendings
+ swapPendings.each!((ref p) => p.invoke());
+
+ swap(pendings, swapPendings);
+ }
+ while (!done_);
+ }
+
+ /**
+ * Break out of the loop.
+ */
+ void unloop() @safe pure nothrow
+ {
+ done_ = true;
+ }
+
+ /**
+ * Start watching.
+ *
+ * Params:
+ * watcher = Watcher.
+ */
+ void start(ConnectionWatcher watcher)
+ {
+ if (watcher.active)
+ {
+ return;
+ }
+ watcher.active = true;
+ reify(watcher, EventMask(Event.none), EventMask(Event.accept));
+ }
+
+ /**
+ * Stop watching.
+ *
+ * Params:
+ * watcher = Watcher.
+ */
+ void stop(ConnectionWatcher watcher)
+ {
+ if (!watcher.active)
+ {
+ return;
+ }
+ watcher.active = false;
+
+ reify(watcher, EventMask(Event.accept), EventMask(Event.none));
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ abstract protected bool reify(ConnectionWatcher watcher,
+ EventMask oldEvents,
+ EventMask events);
+
+ /**
+ * Returns: The blocking time.
+ */
+ protected @property inout(Duration) blockTime()
+ inout @safe pure nothrow
+ {
+ // Don't block if we have to do.
+ return swapPendings.empty ? blockTime_ : Duration.zero;
+ }
+
+ /**
+ * Sets the blocking time for IO watchers.
+ *
+ * Params:
+ * blockTime = The blocking time. Cannot be larger than
+ * $(D_PSYMBOL maxBlockTime).
+ */
+ protected @property void blockTime(in Duration blockTime) @safe pure nothrow
+ in
+ {
+ assert(blockTime <= 1.dur!"hours", "Too long to wait.");
+ assert(!blockTime.isNegative);
+ }
+ body
+ {
+ blockTime_ = blockTime;
+ }
+
+ /**
+ * Kills the watcher and closes the connection.
+ */
+ protected void kill(IOWatcher watcher, SocketException exception)
+ {
+ watcher.socket.shutdown();
+ theAllocator.dispose(watcher.socket);
+ MmapPool.instance.dispose(watcher.transport);
+ watcher.exception = exception;
+ swapPendings.insertBack(watcher);
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ abstract protected void poll();
+
+ /// Whether the event loop should be stopped.
+ private bool done_;
+
+ /// Maximal block time.
+ protected Duration blockTime_ = 1.dur!"minutes";
}
/**
@@ -287,18 +301,17 @@ abstract class Loop
*/
class BadLoopException : Exception
{
-@nogc:
- /**
- * Params:
- * file = The file where the exception occurred.
- * line = The line number where the exception occurred.
- * next = The previous exception in the chain of exceptions, if any.
- */
- this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
- pure @safe nothrow const
- {
- super("Event loop cannot be initialized.", file, line, next);
- }
+ /**
+ * Params:
+ * file = The file where the exception occurred.
+ * line = The line number where the exception occurred.
+ * next = The previous exception in the chain of exceptions, if any.
+ */
+ this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
+ pure nothrow const @safe @nogc
+ {
+ super("Event loop cannot be initialized.", file, line, next);
+ }
}
/**
@@ -310,24 +323,24 @@ class BadLoopException : Exception
*/
@property Loop defaultLoop()
{
- if (defaultLoop_ !is null)
- {
- return defaultLoop_;
- }
- version (Epoll)
- {
- defaultLoop_ = MmapPool.instance.make!EpollLoop;
- }
- else version (IOCP)
- {
- defaultLoop_ = MmapPool.instance.make!IOCPLoop;
- }
- else version (Kqueue)
- {
- import tanya.async.event.kqueue;
- defaultLoop_ = MmapPool.instance.make!KqueueLoop;
- }
- return defaultLoop_;
+ if (defaultLoop_ !is null)
+ {
+ return defaultLoop_;
+ }
+ version (Epoll)
+ {
+ defaultLoop_ = MmapPool.instance.make!EpollLoop;
+ }
+ else version (IOCP)
+ {
+ defaultLoop_ = MmapPool.instance.make!IOCPLoop;
+ }
+ else version (Kqueue)
+ {
+ import tanya.async.event.kqueue;
+ defaultLoop_ = MmapPool.instance.make!KqueueLoop;
+ }
+ return defaultLoop_;
}
/**
@@ -339,145 +352,16 @@ class BadLoopException : Exception
* your implementation to this property.
*
* Params:
- * loop = The event loop.
+ * loop = The event loop.
*/
@property void defaultLoop(Loop loop)
in
{
- assert(loop !is null);
+ assert(loop !is null);
}
body
{
- defaultLoop_ = loop;
+ defaultLoop_ = loop;
}
private Loop defaultLoop_;
-
-/**
- * Queue.
- *
- * Params:
- * T = Content type.
- */
-class PendingQueue(T)
-{
- /**
- * Creates a new $(D_PSYMBOL Queue).
- */
- this()
- {
- }
-
- /**
- * Removes all elements from the queue.
- */
- ~this()
- {
- foreach (e; this)
- {
- MmapPool.instance.dispose(e);
- }
- }
-
- /**
- * Returns: First element.
- */
- @property ref T front()
- in
- {
- assert(!empty);
- }
- body
- {
- return first.next.content;
- }
-
- /**
- * Inserts a new element.
- *
- * Params:
- * x = New element.
- *
- * Returns: $(D_KEYWORD this).
- */
- typeof(this) insertBack(T x)
- {
- Entry* temp = MmapPool.instance.make!Entry;
-
- temp.content = x;
-
- if (empty)
- {
- first.next = rear = temp;
- }
- else
- {
- rear.next = temp;
- rear = rear.next;
- }
-
- return this;
- }
-
- alias insert = insertBack;
-
- /**
- * Inserts a new element.
- *
- * Params:
- * x = New element.
- *
- * Returns: $(D_KEYWORD this).
- */
- typeof(this) opOpAssign(string Op)(ref T x)
- if (Op == "~")
- {
- return insertBack(x);
- }
-
- /**
- * Returns: $(D_KEYWORD true) if the queue is empty.
- */
- @property bool empty() const @safe pure nothrow
- {
- return first.next is null;
- }
-
- /**
- * Move position to the next element.
- *
- * Returns: $(D_KEYWORD this).
- */
- typeof(this) popFront()
- in
- {
- assert(!empty);
- }
- body
- {
- auto n = first.next.next;
-
- MmapPool.instance.dispose(first.next);
- first.next = n;
-
- return this;
- }
-
- /**
- * Queue entry.
- */
- protected struct Entry
- {
- /// Queue item content.
- T content;
-
- /// Next list item.
- Entry* next;
- }
-
- /// The first element of the list.
- protected Entry first;
-
- /// The last element of the list.
- protected Entry* rear;
-}
diff --git a/source/tanya/async/protocol.d b/source/tanya/async/protocol.d
index 62c0ab7..959937f 100644
--- a/source/tanya/async/protocol.d
+++ b/source/tanya/async/protocol.d
@@ -18,28 +18,28 @@ import tanya.async.transport;
*/
interface Protocol
{
- /**
- * Params:
- * data = Read data.
- */
- void received(ubyte[] data);
+ /**
+ * Params:
+ * data = Read data.
+ */
+ void received(ubyte[] data);
- /**
- * Called when a connection is made.
- *
- * Params:
- * transport = Protocol transport.
- */
- void connected(DuplexTransport transport);
+ /**
+ * Called when a connection is made.
+ *
+ * Params:
+ * transport = Protocol transport.
+ */
+ void connected(DuplexTransport transport);
- /**
- * Called when a connection is lost.
- *
- * Params:
- * exception = $(D_PSYMBOL Exception) if an error caused
- * the disconnect, $(D_KEYWORD null) otherwise.
- */
- void disconnected(SocketException exception = null);
+ /**
+ * Called when a connection is lost.
+ *
+ * Params:
+ * exception = $(D_PSYMBOL Exception) if an error caused
+ * the disconnect, $(D_KEYWORD null) otherwise.
+ */
+ void disconnected(SocketException exception = null);
}
/**
diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d
index 6156815..8191678 100644
--- a/source/tanya/async/transport.d
+++ b/source/tanya/async/transport.d
@@ -31,13 +31,13 @@ interface ReadTransport : Transport
*/
interface WriteTransport : Transport
{
- /**
- * Write some data to the transport.
- *
- * Params:
- * data = Data to send.
- */
- void write(ubyte[] data);
+ /**
+ * Write some data to the transport.
+ *
+ * Params:
+ * data = Data to send.
+ */
+ void write(ubyte[] data);
}
/**
@@ -52,7 +52,7 @@ interface DuplexTransport : ReadTransport, WriteTransport
*/
interface SocketTransport : Transport
{
- @property inout(Socket) socket() inout pure nothrow @safe @nogc;
+ @property inout(Socket) socket() inout pure nothrow @safe @nogc;
}
/**
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d
index 260c6a3..9a9e441 100644
--- a/source/tanya/async/watcher.d
+++ b/source/tanya/async/watcher.d
@@ -10,23 +10,24 @@
*/
module tanya.async.watcher;
+import std.functional;
+import std.exception;
import tanya.async.loop;
import tanya.async.protocol;
import tanya.async.transport;
import tanya.container.buffer;
+import tanya.container.queue;
import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket;
-import std.functional;
-import std.exception;
version (Windows)
{
- import core.sys.windows.basetyps;
- import core.sys.windows.mswsock;
- import core.sys.windows.winbase;
- import core.sys.windows.windef;
- import core.sys.windows.winsock2;
+ import core.sys.windows.basetyps;
+ import core.sys.windows.mswsock;
+ import core.sys.windows.winbase;
+ import core.sys.windows.windef;
+ import core.sys.windows.winsock2;
}
/**
@@ -35,85 +36,89 @@ version (Windows)
*/
abstract class Watcher
{
- /// Whether the watcher is active.
- bool active;
+ /// Whether the watcher is active.
+ bool active;
- /**
- * Invoke some action on event.
- */
- void invoke();
+ /**
+ * Invoke some action on event.
+ */
+ void invoke();
}
class ConnectionWatcher : Watcher
{
- /// Watched socket.
- private Socket socket_;
-
- /// Protocol factory.
- protected Protocol delegate() protocolFactory;
-
- package PendingQueue!IOWatcher incoming;
-
- /**
- * Params:
- * socket = Socket.
- */
- this(Socket socket)
- {
- socket_ = socket;
- incoming = MmapPool.instance.make!(PendingQueue!IOWatcher);
- }
-
- /// Ditto.
- protected this()
- {
- }
-
- ~this()
- {
- MmapPool.instance.dispose(incoming);
- }
-
- /*
- * Params:
- * P = Protocol should be used.
- */
- void setProtocol(P : Protocol)()
- {
- this.protocolFactory = () => cast(Protocol) MmapPool.instance.make!P;
- }
-
- /**
- * Returns: Socket.
- */
- @property inout(Socket) socket() inout pure nothrow @nogc
- {
- return socket_;
- }
-
- /**
- * Returns: New protocol instance.
- */
- @property Protocol protocol()
- in
- {
- assert(protocolFactory !is null, "Protocol isn't set.");
- }
- body
- {
- return protocolFactory();
- }
-
- /**
- * Invokes new connection callback.
- */
- override void invoke()
- {
- foreach (io; incoming)
- {
- io.protocol.connected(cast(DuplexTransport) io.transport);
- }
- }
+ /// Watched socket.
+ private Socket socket_;
+
+ /// Protocol factory.
+ protected Protocol delegate() protocolFactory;
+
+ package Queue!IOWatcher incoming;
+
+ /**
+ * Params:
+ * socket = Socket.
+ */
+ this(Socket socket)
+ {
+ socket_ = socket;
+ incoming = MmapPool.instance.make!(Queue!IOWatcher);
+ }
+
+ /// Ditto.
+ protected this()
+ {
+ }
+
+ ~this()
+ {
+ foreach (w; incoming)
+ {
+ MmapPool.instance.dispose(w);
+ }
+ MmapPool.instance.dispose(incoming);
+ }
+
+ /*
+ * Params:
+ * P = Protocol should be used.
+ */
+ void setProtocol(P : Protocol)()
+ {
+ this.protocolFactory = () => cast(Protocol) MmapPool.instance.make!P;
+ }
+
+ /**
+ * Returns: Socket.
+ */
+ @property inout(Socket) socket() inout pure nothrow @nogc
+ {
+ return socket_;
+ }
+
+ /**
+ * Returns: New protocol instance.
+ */
+ @property Protocol protocol()
+ in
+ {
+ assert(protocolFactory !is null, "Protocol isn't set.");
+ }
+ body
+ {
+ return protocolFactory();
+ }
+
+ /**
+ * Invokes new connection callback.
+ */
+ override void invoke()
+ {
+ foreach (io; incoming)
+ {
+ io.protocol.connected(cast(DuplexTransport) io.transport);
+ }
+ }
}
/**
@@ -122,121 +127,121 @@ class ConnectionWatcher : Watcher
*/
class IOWatcher : ConnectionWatcher
{
- /// If an exception was thrown the transport should be already invalid.
- private union
- {
- StreamTransport transport_;
- SocketException exception_;
- }
-
- private Protocol protocol_;
-
- /**
- * Returns: Underlying output buffer.
- */
- package ReadBuffer output;
-
- /**
- * Params:
- * transport = Transport.
- * protocol = New instance of the application protocol.
- */
- this(StreamTransport transport, Protocol protocol)
- in
- {
- assert(transport !is null);
- assert(protocol !is null);
- }
- body
- {
- super();
- transport_ = transport;
- protocol_ = protocol;
- output = MmapPool.instance.make!ReadBuffer();
- active = true;
- }
-
- /**
- * Destroys the watcher.
- */
- protected ~this()
- {
- MmapPool.instance.dispose(output);
- MmapPool.instance.dispose(protocol_);
- }
-
- /**
- * Assigns a transport.
- *
- * Params:
- * transport = Transport.
- * protocol = Application protocol.
- *
- * Returns: $(D_KEYWORD this).
- */
- IOWatcher opCall(StreamTransport transport, Protocol protocol) pure nothrow @nogc
- in
- {
- assert(transport !is null);
- assert(protocol !is null);
- }
- body
- {
- transport_ = transport;
- protocol_ = protocol;
- active = true;
- return this;
- }
-
- /**
- * Returns: Transport used by this watcher.
- */
- @property inout(StreamTransport) transport() inout pure nothrow @nogc
- {
- return transport_;
- }
-
- /**
- * Sets an exception occurred during a read/write operation.
- *
- * Params:
- * exception = Thrown exception.
- */
- @property void exception(SocketException exception) pure nothrow @nogc
- {
- exception_ = exception;
- }
-
- /**
- * Returns: Application protocol.
- */
- override @property Protocol protocol() pure nothrow @safe @nogc
- {
- return protocol_;
- }
-
- /**
- * Returns: Socket.
- */
- override @property inout(Socket) socket() inout pure nothrow @nogc
- {
- return transport.socket;
- }
-
- /**
- * Invokes the watcher callback.
- */
- override void invoke()
- {
- if (output.length)
- {
- protocol.received(output[0..$]);
- output.clear();
- }
- else
- {
- protocol.disconnected(exception_);
- active = false;
- }
- }
+ /// If an exception was thrown the transport should be already invalid.
+ private union
+ {
+ StreamTransport transport_;
+ SocketException exception_;
+ }
+
+ private Protocol protocol_;
+
+ /**
+ * Returns: Underlying output buffer.
+ */
+ package ReadBuffer output;
+
+ /**
+ * Params:
+ * transport = Transport.
+ * protocol = New instance of the application protocol.
+ */
+ this(StreamTransport transport, Protocol protocol)
+ in
+ {
+ assert(transport !is null);
+ assert(protocol !is null);
+ }
+ body
+ {
+ super();
+ transport_ = transport;
+ protocol_ = protocol;
+ output = MmapPool.instance.make!ReadBuffer();
+ active = true;
+ }
+
+ /**
+ * Destroys the watcher.
+ */
+ protected ~this()
+ {
+ MmapPool.instance.dispose(output);
+ MmapPool.instance.dispose(protocol_);
+ }
+
+ /**
+ * Assigns a transport.
+ *
+ * Params:
+ * transport = Transport.
+ * protocol = Application protocol.
+ *
+ * Returns: $(D_KEYWORD this).
+ */
+ IOWatcher opCall(StreamTransport transport, Protocol protocol) pure nothrow @nogc
+ in
+ {
+ assert(transport !is null);
+ assert(protocol !is null);
+ }
+ body
+ {
+ transport_ = transport;
+ protocol_ = protocol;
+ active = true;
+ return this;
+ }
+
+ /**
+ * Returns: Transport used by this watcher.
+ */
+ @property inout(StreamTransport) transport() inout pure nothrow @nogc
+ {
+ return transport_;
+ }
+
+ /**
+ * Sets an exception occurred during a read/write operation.
+ *
+ * Params:
+ * exception = Thrown exception.
+ */
+ @property void exception(SocketException exception) pure nothrow @nogc
+ {
+ exception_ = exception;
+ }
+
+ /**
+ * Returns: Application protocol.
+ */
+ override @property Protocol protocol() pure nothrow @safe @nogc
+ {
+ return protocol_;
+ }
+
+ /**
+ * Returns: Socket.
+ */
+ override @property inout(Socket) socket() inout pure nothrow @nogc
+ {
+ return transport.socket;
+ }
+
+ /**
+ * Invokes the watcher callback.
+ */
+ override void invoke()
+ {
+ if (output.length)
+ {
+ protocol.received(output[0..$]);
+ output.clear();
+ }
+ else
+ {
+ protocol.disconnected(exception_);
+ active = false;
+ }
+ }
}
diff --git a/source/tanya/container/queue.d b/source/tanya/container/queue.d
index ef09da2..cee163d 100644
--- a/source/tanya/container/queue.d
+++ b/source/tanya/container/queue.d
@@ -121,7 +121,7 @@ class Queue(T)
/**
* Returns: $(D_KEYWORD true) if the queue is empty.
*/
- @property bool empty() inout const
+ @property bool empty() inout const pure nothrow @safe @nogc
{
return first.next is null;
}
@@ -170,7 +170,8 @@ class Queue(T)
}
/**
- * $(D_KEYWORD foreach) iteration.
+ * $(D_KEYWORD foreach) iteration. The elements will be automatically
+ * dequeued.
*
* Params:
* dg = $(D_KEYWORD foreach) body.