diff options
| author | Eugen Wissner <belka@caraus.de> | 2016-12-02 19:18:37 +0100 |
|---|---|---|
| committer | Eugen Wissner <belka@caraus.de> | 2016-12-02 19:18:37 +0100 |
| commit | f7f92e79066350cf7167700bb5a019994a4fdb63 (patch) | |
| tree | 6b4aa79dd1adea7616423a74f108b8c98675b68c /source | |
| parent | 1123d01e6c205796db376d4404cd6c53962788de (diff) | |
| download | tanya-f7f92e79066350cf7167700bb5a019994a4fdb63.tar.gz | |
Switch to container.queue. Remove PendingQueue
Diffstat (limited to 'source')
| -rw-r--r-- | source/tanya/async/event/epoll.d | 294 | ||||
| -rw-r--r-- | source/tanya/async/event/iocp.d | 522 | ||||
| -rw-r--r-- | source/tanya/async/event/kqueue.d | 560 | ||||
| -rw-r--r-- | source/tanya/async/event/selector.d | 426 | ||||
| -rw-r--r-- | source/tanya/async/iocp.d | 12 | ||||
| -rw-r--r-- | source/tanya/async/loop.d | 612 | ||||
| -rw-r--r-- | source/tanya/async/protocol.d | 40 | ||||
| -rw-r--r-- | source/tanya/async/transport.d | 16 | ||||
| -rw-r--r-- | source/tanya/async/watcher.d | 401 | ||||
| -rw-r--r-- | source/tanya/container/queue.d | 5 |
10 files changed, 1389 insertions, 1499 deletions
diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d index 7fec708..7717b52 100644 --- a/source/tanya/async/event/epoll.d +++ b/source/tanya/async/event/epoll.d @@ -28,151 +28,151 @@ import std.algorithm.comparison; class EpollLoop : SelectorLoop
{
- protected int fd;
- private epoll_event[] events;
-
- /**
- * Initializes the loop.
- */
- this()
- {
- if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
- {
- throw MmapPool.instance.make!BadLoopException("epoll initialization failed");
- }
- super();
- events = MmapPool.instance.makeArray!epoll_event(maxEvents);
- }
-
- /**
- * Free loop internals.
- */
- ~this()
- {
- MmapPool.instance.dispose(events);
- close(fd);
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- protected override bool reify(ConnectionWatcher watcher, EventMask oldEvents, EventMask events)
- in
- {
- assert(watcher !is null);
- }
- body
- {
- int op = EPOLL_CTL_DEL;
- epoll_event ev;
-
- if (events == oldEvents)
- {
- return true;
- }
- if (events && oldEvents)
- {
- op = EPOLL_CTL_MOD;
- }
- else if (events && !oldEvents)
- {
- op = EPOLL_CTL_ADD;
- }
-
- ev.data.fd = watcher.socket.handle;
- ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
- | (events & Event.write ? EPOLLOUT : 0)
- | EPOLLET;
-
- return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0;
- }
-
- /**
- * Does the actual polling.
- */
- protected override void poll()
- {
- // Don't block
- immutable timeout = cast(immutable int) blockTime.total!"msecs";
- auto eventCount = epoll_wait(fd, events.ptr, maxEvents, timeout);
-
- if (eventCount < 0)
- {
- if (errno != EINTR)
- {
- throw theAllocator.make!BadLoopException();
- }
- return;
- }
-
- for (auto i = 0; i < eventCount; ++i)
- {
- auto io = cast(IOWatcher) connections[events[i].data.fd];
-
- if (io is null)
- {
- acceptConnections(connections[events[i].data.fd]);
- }
- else if (events[i].events & EPOLLERR)
- {
- kill(io, null);
- }
- else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
- {
- auto transport = cast(SelectorStreamTransport) io.transport;
- assert(transport !is null);
-
- SocketException exception;
- try
- {
- ptrdiff_t received;
- do
- {
- received = transport.socket.receive(io.output[]);
- io.output += received;
- }
- while (received);
- }
- catch (SocketException e)
- {
- exception = e;
- }
- if (transport.socket.disconnected)
- {
- kill(io, exception);
- }
- else if (io.output.length)
- {
- swapPendings.insertBack(io);
- }
- }
- else if (events[i].events & EPOLLOUT)
- {
- auto transport = cast(SelectorStreamTransport) io.transport;
- assert(transport !is null);
-
- transport.writeReady = true;
- if (transport.input.length)
- {
- feed(transport);
- }
- }
- }
- }
-
- /**
- * Returns: The blocking time.
- */
- override protected @property inout(Duration) blockTime()
- inout @safe pure nothrow
- {
- return min(super.blockTime, 1.dur!"seconds");
- }
+ protected int fd;
+ private epoll_event[] events;
+
+ /**
+ * Initializes the loop.
+ */
+ this()
+ {
+ if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
+ {
+ throw MmapPool.instance.make!BadLoopException("epoll initialization failed");
+ }
+ super();
+ events = MmapPool.instance.makeArray!epoll_event(maxEvents);
+ }
+
+ /**
+ * Free loop internals.
+ */
+ ~this()
+ {
+ MmapPool.instance.dispose(events);
+ close(fd);
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ protected override bool reify(ConnectionWatcher watcher, EventMask oldEvents, EventMask events)
+ in
+ {
+ assert(watcher !is null);
+ }
+ body
+ {
+ int op = EPOLL_CTL_DEL;
+ epoll_event ev;
+
+ if (events == oldEvents)
+ {
+ return true;
+ }
+ if (events && oldEvents)
+ {
+ op = EPOLL_CTL_MOD;
+ }
+ else if (events && !oldEvents)
+ {
+ op = EPOLL_CTL_ADD;
+ }
+
+ ev.data.fd = watcher.socket.handle;
+ ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
+ | (events & Event.write ? EPOLLOUT : 0)
+ | EPOLLET;
+
+ return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0;
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ protected override void poll()
+ {
+ // Don't block
+ immutable timeout = cast(immutable int) blockTime.total!"msecs";
+ auto eventCount = epoll_wait(fd, events.ptr, maxEvents, timeout);
+
+ if (eventCount < 0)
+ {
+ if (errno != EINTR)
+ {
+ throw theAllocator.make!BadLoopException();
+ }
+ return;
+ }
+
+ for (auto i = 0; i < eventCount; ++i)
+ {
+ auto io = cast(IOWatcher) connections[events[i].data.fd];
+
+ if (io is null)
+ {
+ acceptConnections(connections[events[i].data.fd]);
+ }
+ else if (events[i].events & EPOLLERR)
+ {
+ kill(io, null);
+ }
+ else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ SocketException exception;
+ try
+ {
+ ptrdiff_t received;
+ do
+ {
+ received = transport.socket.receive(io.output[]);
+ io.output += received;
+ }
+ while (received);
+ }
+ catch (SocketException e)
+ {
+ exception = e;
+ }
+ if (transport.socket.disconnected)
+ {
+ kill(io, exception);
+ }
+ else if (io.output.length)
+ {
+ swapPendings.insertBack(io);
+ }
+ }
+ else if (events[i].events & EPOLLOUT)
+ {
+ auto transport = cast(SelectorStreamTransport) io.transport;
+ assert(transport !is null);
+
+ transport.writeReady = true;
+ if (transport.input.length)
+ {
+ feed(transport);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns: The blocking time.
+ */
+ override protected @property inout(Duration) blockTime()
+ inout @safe pure nothrow
+ {
+ return min(super.blockTime, 1.dur!"seconds");
+ }
}
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d index bcfda71..935dd5e 100644 --- a/source/tanya/async/event/iocp.d +++ b/source/tanya/async/event/iocp.d @@ -28,267 +28,267 @@ import core.sys.windows.winsock2; class IOCPStreamTransport : StreamTransport { - private OverlappedConnectedSocket socket_; - - private WriteBuffer input; - - /** - * Creates new completion port transport. - * Params: - * socket = Socket. - */ - this(OverlappedConnectedSocket socket) - in - { - assert(socket !is null); - } - body - { - socket_ = socket; - input = MmapPool.instance.make!WriteBuffer(); - } - - ~this() - { - MmapPool.instance.dispose(input); - } - - @property inout(OverlappedConnectedSocket) socket() inout pure nothrow @safe @nogc - { - return socket_; - } - - /** - * Write some data to the transport. - * - * Params: - * data = Data to send. - */ - void write(ubyte[] data) - { - immutable empty = input.length == 0; - input ~= data; - if (empty) - { - SocketState overlapped; - try - { - overlapped = MmapPool.instance.make!SocketState; - socket.beginSend(input[], overlapped); - } - catch (SocketException e) - { - MmapPool.instance.dispose(overlapped); - MmapPool.instance.dispose(e); - } - } - } + private OverlappedConnectedSocket socket_; + + private WriteBuffer input; + + /** + * Creates new completion port transport. + * Params: + * socket = Socket. + */ + this(OverlappedConnectedSocket socket) + in + { + assert(socket !is null); + } + body + { + socket_ = socket; + input = MmapPool.instance.make!WriteBuffer(); + } + + ~this() + { + MmapPool.instance.dispose(input); + } + + @property inout(OverlappedConnectedSocket) socket() inout pure nothrow @safe @nogc + { + return socket_; + } + + /** + * Write some data to the transport. + * + * Params: + * data = Data to send. + */ + void write(ubyte[] data) + { + immutable empty = input.length == 0; + input ~= data; + if (empty) + { + SocketState overlapped; + try + { + overlapped = MmapPool.instance.make!SocketState; + socket.beginSend(input[], overlapped); + } + catch (SocketException e) + { + MmapPool.instance.dispose(overlapped); + MmapPool.instance.dispose(e); + } + } + } } -class IOCPLoop : Loop -{ - protected HANDLE completionPort; - - protected OVERLAPPED overlap; - - /** - * Initializes the loop. - */ - this() - { - super(); - - completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); - if (!completionPort) - { - throw theAllocator.make!BadLoopException("Creating completion port failed"); - } - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * watcher = Watcher. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - override protected bool reify(ConnectionWatcher watcher, - EventMask oldEvents, - EventMask events) - { - SocketState overlapped; - if (!(oldEvents & Event.accept) && (events & Event.accept)) - { - auto socket = cast(OverlappedStreamSocket) watcher.socket; - assert(socket !is null); - - if (CreateIoCompletionPort(cast(HANDLE) socket.handle, - completionPort, - cast(ULONG_PTR) (cast(void*) watcher), - 0) !is completionPort) - { - return false; - } - - try - { - overlapped = MmapPool.instance.make!SocketState; - socket.beginAccept(overlapped); - } - catch (SocketException e) - { - MmapPool.instance.dispose(overlapped); - theAllocator.dispose(e); - return false; - } - } - if (!(oldEvents & Event.read) && (events & Event.read) - || !(oldEvents & Event.write) && (events & Event.write)) - { - auto io = cast(IOWatcher) watcher; - assert(io !is null); - - auto transport = cast(IOCPStreamTransport) io.transport; - assert(transport !is null); - - if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle, - completionPort, - cast(ULONG_PTR) (cast(void*) watcher), - 0) !is completionPort) - { - return false; - } - - // Begin to read - if (!(oldEvents & Event.read) && (events & Event.read)) - { - try - { - overlapped = MmapPool.instance.make!SocketState; - transport.socket.beginReceive(io.output[], overlapped); - } - catch (SocketException e) - { - MmapPool.instance.dispose(overlapped); - theAllocator.dispose(e); - return false; - } - } - } - return true; - } - - /** - * Does the actual polling. - */ - override protected void poll() - { - DWORD lpNumberOfBytes; - ULONG_PTR key; - LPOVERLAPPED overlap; - immutable timeout = cast(immutable int) blockTime.total!"msecs"; - - auto result = GetQueuedCompletionStatus(completionPort, - &lpNumberOfBytes, - &key, - &overlap, - timeout); - if (result == FALSE && overlap == NULL) - { - return; // Timeout - } - - auto overlapped = (cast(SocketState) ((cast(void*) overlap) - 8)); - assert(overlapped !is null); - scope (failure) - { - MmapPool.instance.dispose(overlapped); - } - - switch (overlapped.event) - { - case OverlappedSocketEvent.accept: - auto connection = cast(ConnectionWatcher) (cast(void*) key); - assert(connection !is null); - - auto listener = cast(OverlappedStreamSocket) connection.socket; - assert(listener !is null); - - auto socket = listener.endAccept(overlapped); - auto transport = MmapPool.instance.make!IOCPStreamTransport(socket); - auto io = MmapPool.instance.make!IOWatcher(transport, connection.protocol); - - connection.incoming.insertBack(io); - - reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); - - swapPendings.insertBack(connection); - listener.beginAccept(overlapped); - break; - case OverlappedSocketEvent.read: - auto io = cast(IOWatcher) (cast(void*) key); - assert(io !is null); - if (!io.active) - { - MmapPool.instance.dispose(io); - MmapPool.instance.dispose(overlapped); - return; - } - - auto transport = cast(IOCPStreamTransport) io.transport; - assert(transport !is null); - - int received; - SocketException exception; - try - { - received = transport.socket.endReceive(overlapped); - } - catch (SocketException e) - { - exception = e; - } - if (transport.socket.disconnected) - { - // We want to get one last notification to destroy the watcher - transport.socket.beginReceive(io.output[], overlapped); - kill(io, exception); - } - else if (received > 0) - { - immutable full = io.output.free == received; - - io.output += received; - // Receive was interrupted because the buffer is full. We have to continue - if (full) - { - transport.socket.beginReceive(io.output[], overlapped); - } - swapPendings.insertBack(io); - } - break; - case OverlappedSocketEvent.write: - auto io = cast(IOWatcher) (cast(void*) key); - assert(io !is null); - - auto transport = cast(IOCPStreamTransport) io.transport; - assert(transport !is null); - - transport.input += transport.socket.endSend(overlapped); - if (transport.input.length) - { - transport.socket.beginSend(transport.input[], overlapped); - } - else - { - transport.socket.beginReceive(io.output[], overlapped); - } - break; - default: - assert(false, "Unknown event"); - } - } + class IOCPLoop : Loop + { + protected HANDLE completionPort; + + protected OVERLAPPED overlap; + + /** + * Initializes the loop. + */ + this() + { + super(); + + completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + if (!completionPort) + { + throw theAllocator.make!BadLoopException("Creating completion port failed"); + } + } + + /** + * Should be called if the backend configuration changes. + * + * Params: + * watcher = Watcher. + * oldEvents = The events were already set. + * events = The events should be set. + * + * Returns: $(D_KEYWORD true) if the operation was successful. + */ + override protected bool reify(ConnectionWatcher watcher, + EventMask oldEvents, + EventMask events) + { + SocketState overlapped; + if (!(oldEvents & Event.accept) && (events & Event.accept)) + { + auto socket = cast(OverlappedStreamSocket) watcher.socket; + assert(socket !is null); + + if (CreateIoCompletionPort(cast(HANDLE) socket.handle, + completionPort, + cast(ULONG_PTR) (cast(void*) watcher), + 0) !is completionPort) + { + return false; + } + + try + { + overlapped = MmapPool.instance.make!SocketState; + socket.beginAccept(overlapped); + } + catch (SocketException e) + { + MmapPool.instance.dispose(overlapped); + theAllocator.dispose(e); + return false; + } + } + if (!(oldEvents & Event.read) && (events & Event.read) + || !(oldEvents & Event.write) && (events & Event.write)) + { + auto io = cast(IOWatcher) watcher; + assert(io !is null); + + auto transport = cast(IOCPStreamTransport) io.transport; + assert(transport !is null); + + if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle, + completionPort, + cast(ULONG_PTR) (cast(void*) watcher), + 0) !is completionPort) + { + return false; + } + + // Begin to read + if (!(oldEvents & Event.read) && (events & Event.read)) + { + try + { + overlapped = MmapPool.instance.make!SocketState; + transport.socket.beginReceive(io.output[], overlapped); + } + catch (SocketException e) + { + MmapPool.instance.dispose(overlapped); + theAllocator.dispose(e); + return false; + } + } + } + return true; + } + + /** + * Does the actual polling. + */ + override protected void poll() + { + DWORD lpNumberOfBytes; + ULONG_PTR key; + LPOVERLAPPED overlap; + immutable timeout = cast(immutable int) blockTime.total!"msecs"; + + auto result = GetQueuedCompletionStatus(completionPort, + &lpNumberOfBytes, + &key, + &overlap, + timeout); + if (result == FALSE && overlap == NULL) + { + return; // Timeout + } + + auto overlapped = (cast(SocketState) ((cast(void*) overlap) - 8)); + assert(overlapped !is null); + scope (failure) + { + MmapPool.instance.dispose(overlapped); + } + + switch (overlapped.event) + { + case OverlappedSocketEvent.accept: + auto connection = cast(ConnectionWatcher) (cast(void*) key); + assert(connection !is null); + + auto listener = cast(OverlappedStreamSocket) connection.socket; + assert(listener !is null); + + auto socket = listener.endAccept(overlapped); + auto transport = MmapPool.instance.make!IOCPStreamTransport(socket); + auto io = MmapPool.instance.make!IOWatcher(transport, connection.protocol); + + connection.incoming.insertBack(io); + + reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); + + swapPendings.insertBack(connection); + listener.beginAccept(overlapped); + break; + case OverlappedSocketEvent.read: + auto io = cast(IOWatcher) (cast(void*) key); + assert(io !is null); + if (!io.active) + { + MmapPool.instance.dispose(io); + MmapPool.instance.dispose(overlapped); + return; + } + + auto transport = cast(IOCPStreamTransport) io.transport; + assert(transport !is null); + + int received; + SocketException exception; + try + { + received = transport.socket.endReceive(overlapped); + } + catch (SocketException e) + { + exception = e; + } + if (transport.socket.disconnected) + { + // We want to get one last notification to destroy the watcher + transport.socket.beginReceive(io.output[], overlapped); + kill(io, exception); + } + else if (received > 0) + { + immutable full = io.output.free == received; + + io.output += received; + // Receive was interrupted because the buffer is full. We have to continue + if (full) + { + transport.socket.beginReceive(io.output[], overlapped); + } + swapPendings.insertBack(io); + } + break; + case OverlappedSocketEvent.write: + auto io = cast(IOWatcher) (cast(void*) key); + assert(io !is null); + + auto transport = cast(IOCPStreamTransport) io.transport; + assert(transport !is null); + + transport.input += transport.socket.endSend(overlapped); + if (transport.input.length) + { + transport.socket.beginSend(transport.input[], overlapped); + } + else + { + transport.socket.beginReceive(io.output[], overlapped); + } + break; + default: + assert(false, "Unknown event"); + } + } } diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d index 773ea12..fa9261f 100644 --- a/source/tanya/async/event/kqueue.d +++ b/source/tanya/async/event/kqueue.d @@ -12,117 +12,117 @@ module tanya.async.event.kqueue; version (OSX) { - version = MissingKevent; + version = MissingKevent; } else version (iOS) { - version = MissingKevent; + version = MissingKevent; } else version (TVOS) { - version = MissingKevent; + version = MissingKevent; } else version (WatchOS) { - version = MissingKevent; + version = MissingKevent; } else version (OpenBSD) { - version = MissingKevent; + version = MissingKevent; } else version (DragonFlyBSD) { - version = MissingKevent; + version = MissingKevent; } version (MissingKevent) { - extern (C): - nothrow: - @nogc: - - import core.stdc.stdint; // intptr_t, uintptr_t - import core.sys.posix.time; // timespec - - enum : short - { - EVFILT_READ = -1, - EVFILT_WRITE = -2, - EVFILT_AIO = -3, /* attached to aio requests */ - EVFILT_VNODE = -4, /* attached to vnodes */ - EVFILT_PROC = -5, /* attached to struct proc */ - EVFILT_SIGNAL = -6, /* attached to struct proc */ - EVFILT_TIMER = -7, /* timers */ - EVFILT_MACHPORT = -8, /* Mach portsets */ - EVFILT_FS = -9, /* filesystem events */ - EVFILT_USER = -10, /* User events */ - EVFILT_VM = -12, /* virtual memory events */ - EVFILT_SYSCOUNT = 11 - } - - extern(D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) - { - *kevp = kevent_t(args); - } - - struct kevent_t - { - uintptr_t ident; /* identifier for this event */ - short filter; /* filter for event */ - ushort flags; - uint fflags; - intptr_t data; - void *udata; /* opaque user data identifier */ - } - - enum - { - /* actions */ - EV_ADD = 0x0001, /* add event to kq (implies enable) */ - EV_DELETE = 0x0002, /* delete event from kq */ - EV_ENABLE = 0x0004, /* enable event */ - EV_DISABLE = 0x0008, /* disable event (not reported) */ - - /* flags */ - EV_ONESHOT = 0x0010, /* only report one occurrence */ - EV_CLEAR = 0x0020, /* clear event state after reporting */ - EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ - EV_DISPATCH = 0x0080, /* disable event after reporting */ - - EV_SYSFLAGS = 0xF000, /* reserved by system */ - EV_FLAG1 = 0x2000, /* filter-specific flag */ - - /* returned values */ - EV_EOF = 0x8000, /* EOF detected */ - EV_ERROR = 0x4000, /* error, data contains errno */ - } - - int kqueue(); - int kevent(int kq, const kevent_t *changelist, int nchanges, - kevent_t *eventlist, int nevents, - const timespec *timeout); + extern (C): + nothrow: + @nogc: + + import core.stdc.stdint; // intptr_t, uintptr_t + import core.sys.posix.time; // timespec + + enum : short + { + EVFILT_READ = -1, + EVFILT_WRITE = -2, + EVFILT_AIO = -3, /* attached to aio requests */ + EVFILT_VNODE = -4, /* attached to vnodes */ + EVFILT_PROC = -5, /* attached to struct proc */ + EVFILT_SIGNAL = -6, /* attached to struct proc */ + EVFILT_TIMER = -7, /* timers */ + EVFILT_MACHPORT = -8, /* Mach portsets */ + EVFILT_FS = -9, /* filesystem events */ + EVFILT_USER = -10, /* User events */ + EVFILT_VM = -12, /* virtual memory events */ + EVFILT_SYSCOUNT = 11 + } + + extern(D) void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) + { + *kevp = kevent_t(args); + } + + struct kevent_t + { + uintptr_t ident; /* identifier for this event */ + short filter; /* filter for event */ + ushort flags; + uint fflags; + intptr_t data; + void *udata; /* opaque user data identifier */ + } + + enum + { + /* actions */ + EV_ADD = 0x0001, /* add event to kq (implies enable) */ + EV_DELETE = 0x0002, /* delete event from kq */ + EV_ENABLE = 0x0004, /* enable event */ + EV_DISABLE = 0x0008, /* disable event (not reported) */ + + /* flags */ + EV_ONESHOT = 0x0010, /* only report one occurrence */ + EV_CLEAR = 0x0020, /* clear event state after reporting */ + EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */ + EV_DISPATCH = 0x0080, /* disable event after reporting */ + + EV_SYSFLAGS = 0xF000, /* reserved by system */ + EV_FLAG1 = 0x2000, /* filter-specific flag */ + + /* returned values */ + EV_EOF = 0x8000, /* EOF detected */ + EV_ERROR = 0x4000, /* error, data contains errno */ + } + + int kqueue(); + int kevent(int kq, const kevent_t *changelist, int nchanges, + kevent_t *eventlist, int nevents, + const timespec *timeout); } version (OSX) { - version = MacBSD; + version = MacBSD; } else version (iOS) { - version = MacBSD; + version = MacBSD; } else version (FreeBSD) { - version = MacBSD; - public import core.sys.freebsd.sys.event; + version = MacBSD; + public import core.sys.freebsd.sys.event; } else version (OpenBSD) { - version = MacBSD; + version = MacBSD; } else version (DragonFlyBSD) { - version = MacBSD; + version = MacBSD; } version (MacBSD): @@ -142,208 +142,208 @@ import std.algorithm.comparison; class KqueueLoop : SelectorLoop { - protected int fd; - private kevent_t[] events; - private kevent_t[] changes; - private size_t changeCount; - - /** - * Returns: Maximal event count can be got at a time - * (should be supported by the backend). - */ - override protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc - { - return cast(uint) events.length; - } - - this() - { - super(); - - if ((fd = kqueue()) == -1) - { - throw MmapPool.instance.make!BadLoopException("epoll initialization failed"); - } - events = MmapPool.instance.makeArray!kevent_t(64); - changes = MmapPool.instance.makeArray!kevent_t(64); - } - - /** - * Free loop internals. - */ - ~this() - { - MmapPool.instance.dispose(events); - MmapPool.instance.dispose(changes); - close(fd); - } - - private void set(socket_t socket, short filter, ushort flags) - { - if (changes.length <= changeCount) - { - MmapPool.instance.resizeArray(changes, changeCount + maxEvents); - } - EV_SET(&changes[changeCount], - cast(ulong) socket, - filter, - flags, - 0U, - 0L, - null); - ++changeCount; - } - - /** - * Should be called if the backend configuration changes. - * - * Params: - * watcher = Watcher. - * oldEvents = The events were already set. - * events = The events should be set. - * - * Returns: $(D_KEYWORD true) if the operation was successful. - */ - override protected bool reify(ConnectionWatcher watcher, - EventMask oldEvents, - EventMask events) - { - if (events != oldEvents) - { - if (oldEvents & Event.read || oldEvents & Event.accept) - { - set(watcher.socket.handle, EVFILT_READ, EV_DELETE); - } - if (oldEvents & Event.write) - { - set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE); - } - } - if (events & (Event.read | events & Event.accept)) - { - set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE); - } - if (events & Event.write) - { - set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH); - } - return true; - } - - /** - * Does the actual polling. - */ - protected override void poll() - { - timespec ts; - blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec); - - if (changeCount > maxEvents) - { - MmapPool.instance.resizeArray(events, changes.length); - } - - auto eventCount = kevent(fd, changes.ptr, cast(int) changeCount, events.ptr, maxEvents, &ts); - changeCount = 0; - - if (eventCount < 0) - { - if (errno != EINTR) - { - throw theAllocator.make!BadLoopException(); - } - return; - } - - for (int i; i < eventCount; ++i) - { - assert(connections.length > events[i].ident); - - IOWatcher io = cast(IOWatcher) connections[events[i].ident]; - // If it is a ConnectionWatcher. Accept connections. - if (io is null) - { - acceptConnections(connections[events[i].ident]); - } - else if (events[i].flags & EV_ERROR) - { - kill(io, null); - } - else if (events[i].filter == EVFILT_READ) - { - auto transport = cast(SelectorStreamTransport) io.transport; - assert(transport !is null); - - SocketException exception; - try - { - ptrdiff_t received; - do - { - received = transport.socket.receive(io.output[]); - io.output += received; - } - while (received); - } - catch (SocketException e) - { - exception = e; - } - if (transport.socket.disconnected) - { - kill(io, exception); - } - else if (io.output.length) - { - swapPendings.insertBack(io); - } - } - else if (events[i].filter == EVFILT_WRITE) - { - auto transport = cast(SelectorStreamTransport) io.transport; - assert(transport !is null); - - transport.writeReady = true; - if (transport.input.length) - { - feed(transport); - } - } - } - } - - /** - * Returns: The blocking time. - */ - override protected @property inout(Duration) blockTime() - inout @safe pure nothrow - { - return min(super.blockTime, 1.dur!"seconds"); - } - - /** - * If the transport couldn't send the data, the further sending should - * be handled by the event loop. - * - * Params: - * transport = Transport. - * exception = Exception thrown on sending. - * - * Returns: $(D_KEYWORD true) if the operation could be successfully - * completed or scheduled, $(D_KEYWORD false) otherwise (the - * transport is be destroyed then). - */ - protected override bool feed(SelectorStreamTransport transport, SocketException exception = null) - { - if (!super.feed(transport, exception)) - { - return false; - } - if (!transport.writeReady) - { - set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH); - return true; - } - return false; - } + protected int fd; + private kevent_t[] events; + private kevent_t[] changes; + private size_t changeCount; + + /** + * Returns: Maximal event count can be got at a time + * (should be supported by the backend). + */ + override protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc + { + return cast(uint) events.length; + } + + this() + { + super(); + + if ((fd = kqueue()) == -1) + { + throw MmapPool.instance.make!BadLoopException("epoll initialization failed"); + } + events = MmapPool.instance.makeArray!kevent_t(64); + changes = MmapPool.instance.makeArray!kevent_t(64); + } + + /** + * Free loop internals. + */ + ~this() + { + MmapPool.instance.dispose(events); + MmapPool.instance.dispose(changes); + close(fd); + } + + private void set(socket_t socket, short filter, ushort flags) + { + if (changes.length <= changeCount) + { + MmapPool.instance.resizeArray(changes, changeCount + maxEvents); + } + EV_SET(&changes[changeCount], + cast(ulong) socket, + filter, + flags, + 0U, + 0L, + null); + ++changeCount; + } + + /** + * Should be called if the backend configuration changes. + * + * Params: + * watcher = Watcher. + * oldEvents = The events were already set. + * events = The events should be set. + * + * Returns: $(D_KEYWORD true) if the operation was successful. + */ + override protected bool reify(ConnectionWatcher watcher, + EventMask oldEvents, + EventMask events) + { + if (events != oldEvents) + { + if (oldEvents & Event.read || oldEvents & Event.accept) + { + set(watcher.socket.handle, EVFILT_READ, EV_DELETE); + } + if (oldEvents & Event.write) + { + set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE); + } + } + if (events & (Event.read | events & Event.accept)) + { + set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE); + } + if (events & Event.write) + { + set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH); + } + return true; + } + + /** + * Does the actual polling. + */ + protected override void poll() + { + timespec ts; + blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec); + + if (changeCount > maxEvents) + { + MmapPool.instance.resizeArray(events, changes.length); + } + + auto eventCount = kevent(fd, changes.ptr, cast(int) changeCount, events.ptr, maxEvents, &ts); + changeCount = 0; + + if (eventCount < 0) + { + if (errno != EINTR) + { + throw theAllocator.make!BadLoopException(); + } + return; + } + + for (int i; i < eventCount; ++i) + { + assert(connections.length > events[i].ident); + + IOWatcher io = cast(IOWatcher) connections[events[i].ident]; + // If it is a ConnectionWatcher. Accept connections. + if (io is null) + { + acceptConnections(connections[events[i].ident]); + } + else if (events[i].flags & EV_ERROR) + { + kill(io, null); + } + else if (events[i].filter == EVFILT_READ) + { + auto transport = cast(SelectorStreamTransport) io.transport; + assert(transport !is null); + + SocketException exception; + try + { + ptrdiff_t received; + do + { + received = transport.socket.receive(io.output[]); + io.output += received; + } + while (received); + } + catch (SocketException e) + { + exception = e; + } + if (transport.socket.disconnected) + { + kill(io, exception); + } + else if (io.output.length) + { + swapPendings.insertBack(io); + } + } + else if (events[i].filter == EVFILT_WRITE) + { + auto transport = cast(SelectorStreamTransport) io.transport; + assert(transport !is null); + + transport.writeReady = true; + if (transport.input.length) + { + feed(transport); + } + } + } + } + + /** + * Returns: The blocking time. + */ + override protected @property inout(Duration) blockTime() + inout @safe pure nothrow + { + return min(super.blockTime, 1.dur!"seconds"); + } + + /** + * If the transport couldn't send the data, the further sending should + * be handled by the event loop. + * + * Params: + * transport = Transport. + * exception = Exception thrown on sending. + * + * Returns: $(D_KEYWORD true) if the operation could be successfully + * completed or scheduled, $(D_KEYWORD false) otherwise (the + * transport is be destroyed then). + */ + protected override bool feed(SelectorStreamTransport transport, SocketException exception = null) + { + if (!super.feed(transport, exception)) + { + return false; + } + if (!transport.writeReady) + { + set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH); + return true; + } + return false; + } } diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d index e9a103d..c35a782 100644 --- a/source/tanya/async/event/selector.d +++ b/source/tanya/async/event/selector.d @@ -27,240 +27,240 @@ import core.stdc.errno; */ class SelectorStreamTransport : StreamTransport { - private ConnectedSocket socket_; + private ConnectedSocket socket_; - /// Input buffer. - package WriteBuffer input; + /// Input buffer. + package WriteBuffer input; - private SelectorLoop loop; + private SelectorLoop loop; - /// Received notification that the underlying socket is write-ready. - package bool writeReady; + /// Received notification that the underlying socket is write-ready. + package bool writeReady; - /** - * Params: - * loop = Event loop. - * socket = Socket. - */ - this(SelectorLoop loop, ConnectedSocket socket) - { - socket_ = socket; - this.loop = loop; - input = MmapPool.instance.make!WriteBuffer(); - } + /** + * Params: + * loop = Event loop. + * socket = Socket. + */ + this(SelectorLoop loop, ConnectedSocket socket) + { + socket_ = socket; + this.loop = loop; + input = MmapPool.instance.make!WriteBuffer(); + } - /** - * Close the transport and deallocate the data buffers. - */ - ~this() - { - MmapPool.instance.dispose(input); - } + /** + * Close the transport and deallocate the data buffers. + */ + ~this() + { + MmapPool.instance.dispose(input); + } - /** - * Returns: Transport socket. - */ - inout(ConnectedSocket) socket() inout pure nothrow @safe @nogc - { - return socket_; - } + /** + * Returns: Transport socket. + */ + inout(ConnectedSocket) socket() inout pure nothrow @safe @nogc + { + return socket_; + } - /** - * Write some data to the transport. - * - * Params: - * data = Data to send. - */ - void write(ubyte[] data) - { - if (!data.length) - { - return; - } - // Try to write if the socket is write ready. - if (writeReady) - { - ptrdiff_t sent; - SocketException exception; - try - { - sent = socket.send(data); - if (sent == 0) - { - writeReady = false; - } - } - catch (SocketException e) - { - writeReady = false; - exception = e; - } - if (sent < data.length) - { - input ~= data[sent..$]; - loop.feed(this, exception); - } - } - else - { - input ~= data; - } - } + /** + * Write some data to the transport. + * + * Params: + * data = Data to send. + */ + void write(ubyte[] data) + { + if (!data.length) + { + return; + } + // Try to write if the socket is write ready. + if (writeReady) + { + ptrdiff_t sent; + SocketException exception; + try + { + sent = socket.send(data); + if (sent == 0) + { + writeReady = false; + } + } + catch (SocketException e) + { + writeReady = false; + exception = e; + } + if (sent < data.length) + { + input ~= data[sent..$]; + loop.feed(this, exception); + } + } + else + { + input ~= data; + } + } } abstract class SelectorLoop : Loop { - /// Pending connections. - protected ConnectionWatcher[] connections; + /// Pending connections. + protected ConnectionWatcher[] connections; - this() - { - super(); - connections = MmapPool.instance.makeArray!ConnectionWatcher(maxEvents); - } + this() + { + super(); + connections = MmapPool.instance.makeArray!ConnectionWatcher(maxEvents); + } - ~this() - { - foreach (ref connection; connections) - { - // We want to free only IOWatchers. ConnectionWatcher are created by the - // user and should be freed by himself. - auto io = cast(IOWatcher) connection; - if (io !is null) - { - MmapPool.instance.dispose(io); - connection = null; - } - } - MmapPool.instance.dispose(connections); - } + ~this() + { + foreach (ref connection; connections) + { + // We want to free only IOWatchers. ConnectionWatcher are created by the + // user and should be freed by himself. + auto io = cast(IOWatcher) connection; + if (io !is null) + { + MmapPool.instance.dispose(io); + connection = null; + } + } + MmapPool.instance.dispose(connections); + } - /** - * If the transport couldn't send the data, the further sending should - * be handled by the event loop. - * - * Params: - * transport = Transport. - * exception = Exception thrown on sending. - * - * Returns: $(D_KEYWORD true) if the operation could be successfully - * completed or scheduled, $(D_KEYWORD false) otherwise (the - * transport will be destroyed then). - */ - protected bool feed(SelectorStreamTransport transport, SocketException exception = null) - { - while (transport.input.length && transport.writeReady) - { - try - { - ptrdiff_t sent = transport.socket.send(transport.input[]); - if (sent == 0) - { - transport.writeReady = false; - } - else - { - transport.input += sent; - } - } - catch (SocketException e) - { - exception = e; - transport.writeReady = false; - } - } - if (exception !is null) - { - auto watcher = cast(IOWatcher) connections[transport.socket.handle]; - assert(watcher !is null); + /** + * If the transport couldn't send the data, the further sending should + * be handled by the event loop. + * + * Params: + * transport = Transport. + * exception = Exception thrown on sending. + * + * Returns: $(D_KEYWORD true) if the operation could be successfully + * completed or scheduled, $(D_KEYWORD false) otherwise (the + * transport will be destroyed then). + */ + protected bool feed(SelectorStreamTransport transport, SocketException exception = null) + { + while (transport.input.length && transport.writeReady) + { + try + { + ptrdiff_t sent = transport.socket.send(transport.input[]); + if (sent == 0) + { + transport.writeReady = false; + } + else + { + transport.input += sent; + } + } + catch (SocketException e) + { + exception = e; + transport.writeReady = false; + } + } + if (exception !is null) + { + auto watcher = cast(IOWatcher) connections[transport.socket.handle]; + assert(watcher !is null); - kill(watcher, exception); - return false; - } - return true; - } + kill(watcher, exception); + return false; + } + return true; + } - /** - * Start watching. - * - * Params: - * watcher = Watcher. - */ - override void start(ConnectionWatcher watcher) - { - if (watcher.active) - { - return; - } + /** + * Start watching. + * + * Params: + * watcher = Watcher. + */ + override void start(ConnectionWatcher watcher) + { + if (watcher.active) + { + return; + } - if (connections.length <= watcher.socket) - { - MmapPool.instance.resizeArray(connections, watcher.socket.handle + maxEvents / 2); - } - connections[watcher.socket.handle] = watcher; + if (connections.length <= watcher.socket) + { + MmapPool.instance.resizeArray(connections, watcher.socket.handle + maxEvents / 2); + } + connections[watcher.socket.handle] = watcher; - super.start(watcher); - } + super.start(watcher); + } - /** - * Accept incoming connections. - * - * Params: - * connection = Connection watcher ready to accept. - */ - package void acceptConnections(ConnectionWatcher connection) - in - { - assert(connection !is null); - } - body - { - while (true) - { - ConnectedSocket client; - try - { - client = (cast(StreamSocket) connection.socket).accept(); - } - catch (SocketException e) - { - theAllocator.dispose(e); - break; - } - if (client is null) - { - break; - } + /** + * Accept incoming connections. + * + * Params: + * connection = Connection watcher ready to accept. + */ + package void acceptConnections(ConnectionWatcher connection) + in + { + assert(connection !is null); + } + body + { + while (true) + { + ConnectedSocket client; + try + { + client = (cast(StreamSocket) connection.socket).accept(); + } + catch (SocketException e) + { + theAllocator.dispose(e); + break; + } + if (client is null) + { + break; + } - IOWatcher io; - auto transport = MmapPool.instance.make!SelectorStreamTransport(this, client); + IOWatcher io; + auto transport = MmapPool.instance.make!SelectorStreamTransport(this, client); - if (connections.length > client.handle) - { - io = cast(IOWatcher) connections[client.handle]; - } - else - { - MmapPool.instance.resizeArray(connections, client.handle + maxEvents / 2); - } - if (io is null) - { - io = MmapPool.instance.make!IOWatcher(transport, - connection.protocol); - connections[client.handle] = io; - } - else - { - io(transport, connection.protocol); - } + if (connections.length > client.handle) + { + io = cast(IOWatcher) connections[client.handle]; + } + else + { + MmapPool.instance.resizeArray(connections, client.handle + maxEvents / 2); + } + if (io is null) + { + io = MmapPool.instance.make!IOWatcher(transport, + connection.protocol); + connections[client.handle] = io; + } + else + { + io(transport, connection.protocol); + } - reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); - connection.incoming.insertBack(io); - } + reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); + connection.incoming.insertBack(io); + } - if (!connection.incoming.empty) - { - swapPendings.insertBack(connection); - } - } + if (!connection.incoming.empty) + { + swapPendings.insertBack(connection); + } + } } diff --git a/source/tanya/async/iocp.d b/source/tanya/async/iocp.d index 8817e0c..f915e36 100644 --- a/source/tanya/async/iocp.d +++ b/source/tanya/async/iocp.d @@ -21,12 +21,12 @@ import core.sys.windows.windef; */ class State { - /// For internal use by Windows API. - align(1) OVERLAPPED overlapped; + /// For internal use by Windows API. + align(1) OVERLAPPED overlapped; - /// File/socket handle. - HANDLE handle; + /// File/socket handle. + HANDLE handle; - /// For keeping events or event masks. - int event; + /// For keeping events or event masks. + int event; } diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d index de1e401..15dfafd 100644 --- a/source/tanya/async/loop.d +++ b/source/tanya/async/loop.d @@ -10,100 +10,105 @@ *
* ---
* import tanya.async;
+ * import tanya.memory;
* import tanya.network.socket;
*
* class EchoProtocol : TransmissionControlProtocol
* {
- * private DuplexTransport transport;
+ * private DuplexTransport transport;
*
- * void received(ubyte[] data)
- * {
- * transport.write(data);
- * }
+ * void received(ubyte[] data)
+ * {
+ * transport.write(data);
+ * }
*
- * void connected(DuplexTransport transport)
- * {
- * this.transport = transport;
- * }
+ * void connected(DuplexTransport transport)
+ * {
+ * this.transport = transport;
+ * }
*
- * void disconnected(SocketException e = null)
- * {
- * }
+ * void disconnected(SocketException e = null)
+ * {
+ * }
* }
*
* void main()
* {
- * auto address = new InternetAddress("127.0.0.1", cast(ushort) 8192);
+ * auto address = theAllocator.make!InternetAddress("127.0.0.1", cast(ushort) 8192);
*
- * version (Windows)
- * {
- * auto sock = new OverlappedStreamSocket(AddressFamily.INET);
- * }
- * else
- * {
- * auto sock = new StreamSocket(AddressFamily.INET);
- * sock.blocking = false;
- * }
+ * version (Windows)
+ * {
+ * auto sock = theAllocator.make!OverlappedStreamSocket(AddressFamily.INET);
+ * }
+ * else
+ * {
+ * auto sock = theAllocator.make!StreamSocket(AddressFamily.INET);
+ * sock.blocking = false;
+ * }
*
- * sock.bind(address);
- * sock.listen(5);
+ * sock.bind(address);
+ * sock.listen(5);
*
- * auto io = new ConnectionWatcher(sock);
- * io.setProtocol!EchoProtocol;
+ * auto io = theAllocator.make!ConnectionWatcher(sock);
+ * io.setProtocol!EchoProtocol;
*
- * defaultLoop.start(io);
- * defaultLoop.run();
+ * defaultLoop.start(io);
+ * defaultLoop.run();
*
- * sock.shutdown();
+ * sock.shutdown();
+ * theAllocator.dispose(io);
+ * theAllocator.dispose(sock);
+ * theAllocator.dispose(address);
* }
* ---
*/
module tanya.async.loop;
+import core.time;
+import std.algorithm.iteration;
+import std.algorithm.mutation;
+import std.typecons;
import tanya.async.protocol;
import tanya.async.transport;
import tanya.async.watcher;
import tanya.container.buffer;
+import tanya.container.queue;
import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket;
-import core.time;
-import std.algorithm.iteration;
-import std.algorithm.mutation;
-import std.typecons;
version (DisableBackends)
{
}
else version (linux)
{
- import tanya.async.event.epoll;
- version = Epoll;
+ import tanya.async.event.epoll;
+ version = Epoll;
}
else version (Windows)
{
- import tanya.async.event.iocp;
- version = IOCP;
+ import tanya.async.event.iocp;
+ version = IOCP;
}
else version (OSX)
{
- version = Kqueue;
+ version = Kqueue;
}
else version (iOS)
{
- version = Kqueue;
+ version = Kqueue;
}
else version (FreeBSD)
{
- version = Kqueue;
+ version = Kqueue;
}
else version (OpenBSD)
{
- version = Kqueue;
+ version = Kqueue;
}
else version (DragonFlyBSD)
{
- version = Kqueue;
+ version = Kqueue;
}
/**
@@ -111,11 +116,11 @@ else version (DragonFlyBSD) */
enum Event : uint
{
- none = 0x00, /// No events.
- read = 0x01, /// Non-blocking read call.
- write = 0x02, /// Non-blocking write call.
- accept = 0x04, /// Connection made.
- error = 0x80000000, /// Sent when an error occurs.
+ none = 0x00, /// No events.
+ read = 0x01, /// Non-blocking read call.
+ write = 0x02, /// Non-blocking write call.
+ accept = 0x04, /// Connection made.
+ error = 0x80000000, /// Sent when an error occurs.
}
alias EventMask = BitFlags!Event;
@@ -125,161 +130,170 @@ alias EventMask = BitFlags!Event; */
abstract class Loop
{
- /// Pending watchers.
- protected PendingQueue!Watcher pendings;
-
- protected PendingQueue!Watcher swapPendings;
-
- /**
- * Returns: Maximal event count can be got at a time
- * (should be supported by the backend).
- */
- protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc
- {
- return 128U;
- }
-
- /**
- * Initializes the loop.
- */
- this()
- {
- pendings = MmapPool.instance.make!(PendingQueue!Watcher);
- swapPendings = MmapPool.instance.make!(PendingQueue!Watcher);
- }
-
- /**
- * Frees loop internals.
- */
- ~this()
- {
- MmapPool.instance.dispose(pendings);
- MmapPool.instance.dispose(swapPendings);
- }
-
- /**
- * Starts the loop.
- */
- void run()
- {
- done_ = false;
- do
- {
- poll();
-
- // Invoke pendings
- swapPendings.each!((ref p) => p.invoke());
-
- swap(pendings, swapPendings);
- }
- while (!done_);
- }
-
- /**
- * Break out of the loop.
- */
- void unloop() @safe pure nothrow
- {
- done_ = true;
- }
-
- /**
- * Start watching.
- *
- * Params:
- * watcher = Watcher.
- */
- void start(ConnectionWatcher watcher)
- {
- if (watcher.active)
- {
- return;
- }
- watcher.active = true;
- reify(watcher, EventMask(Event.none), EventMask(Event.accept));
- }
-
- /**
- * Stop watching.
- *
- * Params:
- * watcher = Watcher.
- */
- void stop(ConnectionWatcher watcher)
- {
- if (!watcher.active)
- {
- return;
- }
- watcher.active = false;
-
- reify(watcher, EventMask(Event.accept), EventMask(Event.none));
- }
-
- /**
- * Should be called if the backend configuration changes.
- *
- * Params:
- * watcher = Watcher.
- * oldEvents = The events were already set.
- * events = The events should be set.
- *
- * Returns: $(D_KEYWORD true) if the operation was successful.
- */
- abstract protected bool reify(ConnectionWatcher watcher,
- EventMask oldEvents,
- EventMask events);
-
- /**
- * Returns: The blocking time.
- */
- protected @property inout(Duration) blockTime()
- inout @safe pure nothrow
- {
- // Don't block if we have to do.
- return swapPendings.empty ? blockTime_ : Duration.zero;
- }
-
- /**
- * Sets the blocking time for IO watchers.
- *
- * Params:
- * blockTime = The blocking time. Cannot be larger than
- * $(D_PSYMBOL maxBlockTime).
- */
- protected @property void blockTime(in Duration blockTime) @safe pure nothrow
- in
- {
- assert(blockTime <= 1.dur!"hours", "Too long to wait.");
- assert(!blockTime.isNegative);
- }
- body
- {
- blockTime_ = blockTime;
- }
-
- /**
- * Kills the watcher and closes the connection.
- */
- protected void kill(IOWatcher watcher, SocketException exception)
- {
- watcher.socket.shutdown();
- theAllocator.dispose(watcher.socket);
- MmapPool.instance.dispose(watcher.transport);
- watcher.exception = exception;
- swapPendings.insertBack(watcher);
- }
-
- /**
- * Does the actual polling.
- */
- abstract protected void poll();
-
- /// Whether the event loop should be stopped.
- private bool done_;
-
- /// Maximal block time.
- protected Duration blockTime_ = 1.dur!"minutes";
+ /// Pending watchers.
+ protected Queue!Watcher pendings;
+
+ protected Queue!Watcher swapPendings;
+
+ /**
+ * Returns: Maximal event count can be got at a time
+ * (should be supported by the backend).
+ */
+ protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc
+ {
+ return 128U;
+ }
+
+ /**
+ * Initializes the loop.
+ */
+ this()
+ {
+ pendings = MmapPool.instance.make!(Queue!Watcher);
+ swapPendings = MmapPool.instance.make!(Queue!Watcher);
+ }
+
+ /**
+ * Frees loop internals.
+ */
+ ~this()
+ {
+ foreach (w; pendings)
+ {
+ MmapPool.instance.dispose(w);
+ }
+ MmapPool.instance.dispose(pendings);
+
+ foreach (w; swapPendings)
+ {
+ MmapPool.instance.dispose(w);
+ }
+ MmapPool.instance.dispose(swapPendings);
+ }
+
+ /**
+ * Starts the loop.
+ */
+ void run()
+ {
+ done_ = false;
+ do
+ {
+ poll();
+
+ // Invoke pendings
+ swapPendings.each!((ref p) => p.invoke());
+
+ swap(pendings, swapPendings);
+ }
+ while (!done_);
+ }
+
+ /**
+ * Break out of the loop.
+ */
+ void unloop() @safe pure nothrow
+ {
+ done_ = true;
+ }
+
+ /**
+ * Start watching.
+ *
+ * Params:
+ * watcher = Watcher.
+ */
+ void start(ConnectionWatcher watcher)
+ {
+ if (watcher.active)
+ {
+ return;
+ }
+ watcher.active = true;
+ reify(watcher, EventMask(Event.none), EventMask(Event.accept));
+ }
+
+ /**
+ * Stop watching.
+ *
+ * Params:
+ * watcher = Watcher.
+ */
+ void stop(ConnectionWatcher watcher)
+ {
+ if (!watcher.active)
+ {
+ return;
+ }
+ watcher.active = false;
+
+ reify(watcher, EventMask(Event.accept), EventMask(Event.none));
+ }
+
+ /**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ abstract protected bool reify(ConnectionWatcher watcher,
+ EventMask oldEvents,
+ EventMask events);
+
+ /**
+ * Returns: The blocking time.
+ */
+ protected @property inout(Duration) blockTime()
+ inout @safe pure nothrow
+ {
+ // Don't block if we have to do.
+ return swapPendings.empty ? blockTime_ : Duration.zero;
+ }
+
+ /**
+ * Sets the blocking time for IO watchers.
+ *
+ * Params:
+ * blockTime = The blocking time. Cannot be larger than
+ * $(D_PSYMBOL maxBlockTime).
+ */
+ protected @property void blockTime(in Duration blockTime) @safe pure nothrow
+ in
+ {
+ assert(blockTime <= 1.dur!"hours", "Too long to wait.");
+ assert(!blockTime.isNegative);
+ }
+ body
+ {
+ blockTime_ = blockTime;
+ }
+
+ /**
+ * Kills the watcher and closes the connection.
+ */
+ protected void kill(IOWatcher watcher, SocketException exception)
+ {
+ watcher.socket.shutdown();
+ theAllocator.dispose(watcher.socket);
+ MmapPool.instance.dispose(watcher.transport);
+ watcher.exception = exception;
+ swapPendings.insertBack(watcher);
+ }
+
+ /**
+ * Does the actual polling.
+ */
+ abstract protected void poll();
+
+ /// Whether the event loop should be stopped.
+ private bool done_;
+
+ /// Maximal block time.
+ protected Duration blockTime_ = 1.dur!"minutes";
}
/**
@@ -287,18 +301,17 @@ abstract class Loop */
class BadLoopException : Exception
{
-@nogc:
- /**
- * Params:
- * file = The file where the exception occurred.
- * line = The line number where the exception occurred.
- * next = The previous exception in the chain of exceptions, if any.
- */
- this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
- pure @safe nothrow const
- {
- super("Event loop cannot be initialized.", file, line, next);
- }
+ /**
+ * Params:
+ * file = The file where the exception occurred.
+ * line = The line number where the exception occurred.
+ * next = The previous exception in the chain of exceptions, if any.
+ */
+ this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
+ pure nothrow const @safe @nogc
+ {
+ super("Event loop cannot be initialized.", file, line, next);
+ }
}
/**
@@ -310,24 +323,24 @@ class BadLoopException : Exception */
@property Loop defaultLoop()
{
- if (defaultLoop_ !is null)
- {
- return defaultLoop_;
- }
- version (Epoll)
- {
- defaultLoop_ = MmapPool.instance.make!EpollLoop;
- }
- else version (IOCP)
- {
- defaultLoop_ = MmapPool.instance.make!IOCPLoop;
- }
- else version (Kqueue)
- {
- import tanya.async.event.kqueue;
- defaultLoop_ = MmapPool.instance.make!KqueueLoop;
- }
- return defaultLoop_;
+ if (defaultLoop_ !is null)
+ {
+ return defaultLoop_;
+ }
+ version (Epoll)
+ {
+ defaultLoop_ = MmapPool.instance.make!EpollLoop;
+ }
+ else version (IOCP)
+ {
+ defaultLoop_ = MmapPool.instance.make!IOCPLoop;
+ }
+ else version (Kqueue)
+ {
+ import tanya.async.event.kqueue;
+ defaultLoop_ = MmapPool.instance.make!KqueueLoop;
+ }
+ return defaultLoop_;
}
/**
@@ -339,145 +352,16 @@ class BadLoopException : Exception * your implementation to this property.
*
* Params:
- * loop = The event loop.
+ * loop = The event loop.
*/
@property void defaultLoop(Loop loop)
in
{
- assert(loop !is null);
+ assert(loop !is null);
}
body
{
- defaultLoop_ = loop;
+ defaultLoop_ = loop;
}
private Loop defaultLoop_;
-
-/**
- * Queue.
- *
- * Params:
- * T = Content type.
- */
-class PendingQueue(T)
-{
- /**
- * Creates a new $(D_PSYMBOL Queue).
- */
- this()
- {
- }
-
- /**
- * Removes all elements from the queue.
- */
- ~this()
- {
- foreach (e; this)
- {
- MmapPool.instance.dispose(e);
- }
- }
-
- /**
- * Returns: First element.
- */
- @property ref T front()
- in
- {
- assert(!empty);
- }
- body
- {
- return first.next.content;
- }
-
- /**
- * Inserts a new element.
- *
- * Params:
- * x = New element.
- *
- * Returns: $(D_KEYWORD this).
- */
- typeof(this) insertBack(T x)
- {
- Entry* temp = MmapPool.instance.make!Entry;
-
- temp.content = x;
-
- if (empty)
- {
- first.next = rear = temp;
- }
- else
- {
- rear.next = temp;
- rear = rear.next;
- }
-
- return this;
- }
-
- alias insert = insertBack;
-
- /**
- * Inserts a new element.
- *
- * Params:
- * x = New element.
- *
- * Returns: $(D_KEYWORD this).
- */
- typeof(this) opOpAssign(string Op)(ref T x)
- if (Op == "~")
- {
- return insertBack(x);
- }
-
- /**
- * Returns: $(D_KEYWORD true) if the queue is empty.
- */
- @property bool empty() const @safe pure nothrow
- {
- return first.next is null;
- }
-
- /**
- * Move position to the next element.
- *
- * Returns: $(D_KEYWORD this).
- */
- typeof(this) popFront()
- in
- {
- assert(!empty);
- }
- body
- {
- auto n = first.next.next;
-
- MmapPool.instance.dispose(first.next);
- first.next = n;
-
- return this;
- }
-
- /**
- * Queue entry.
- */
- protected struct Entry
- {
- /// Queue item content.
- T content;
-
- /// Next list item.
- Entry* next;
- }
-
- /// The first element of the list.
- protected Entry first;
-
- /// The last element of the list.
- protected Entry* rear;
-}
diff --git a/source/tanya/async/protocol.d b/source/tanya/async/protocol.d index 62c0ab7..959937f 100644 --- a/source/tanya/async/protocol.d +++ b/source/tanya/async/protocol.d @@ -18,28 +18,28 @@ import tanya.async.transport; */
interface Protocol
{
- /**
- * Params:
- * data = Read data.
- */
- void received(ubyte[] data);
+ /**
+ * Params:
+ * data = Read data.
+ */
+ void received(ubyte[] data);
- /**
- * Called when a connection is made.
- *
- * Params:
- * transport = Protocol transport.
- */
- void connected(DuplexTransport transport);
+ /**
+ * Called when a connection is made.
+ *
+ * Params:
+ * transport = Protocol transport.
+ */
+ void connected(DuplexTransport transport);
- /**
- * Called when a connection is lost.
- *
- * Params:
- * exception = $(D_PSYMBOL Exception) if an error caused
- * the disconnect, $(D_KEYWORD null) otherwise.
- */
- void disconnected(SocketException exception = null);
+ /**
+ * Called when a connection is lost.
+ *
+ * Params:
+ * exception = $(D_PSYMBOL Exception) if an error caused
+ * the disconnect, $(D_KEYWORD null) otherwise.
+ */
+ void disconnected(SocketException exception = null);
}
/**
diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d index 6156815..8191678 100644 --- a/source/tanya/async/transport.d +++ b/source/tanya/async/transport.d @@ -31,13 +31,13 @@ interface ReadTransport : Transport */
interface WriteTransport : Transport
{
- /**
- * Write some data to the transport.
- *
- * Params:
- * data = Data to send.
- */
- void write(ubyte[] data);
+ /**
+ * Write some data to the transport.
+ *
+ * Params:
+ * data = Data to send.
+ */
+ void write(ubyte[] data);
}
/**
@@ -52,7 +52,7 @@ interface DuplexTransport : ReadTransport, WriteTransport */
interface SocketTransport : Transport
{
- @property inout(Socket) socket() inout pure nothrow @safe @nogc;
+ @property inout(Socket) socket() inout pure nothrow @safe @nogc;
}
/**
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d index 260c6a3..9a9e441 100644 --- a/source/tanya/async/watcher.d +++ b/source/tanya/async/watcher.d @@ -10,23 +10,24 @@ */ module tanya.async.watcher; +import std.functional; +import std.exception; import tanya.async.loop; import tanya.async.protocol; import tanya.async.transport; import tanya.container.buffer; +import tanya.container.queue; import tanya.memory; import tanya.memory.mmappool; import tanya.network.socket; -import std.functional; -import std.exception; version (Windows) { - import core.sys.windows.basetyps; - import core.sys.windows.mswsock; - import core.sys.windows.winbase; - import core.sys.windows.windef; - import core.sys.windows.winsock2; + import core.sys.windows.basetyps; + import core.sys.windows.mswsock; + import core.sys.windows.winbase; + import core.sys.windows.windef; + import core.sys.windows.winsock2; } /** @@ -35,85 +36,89 @@ version (Windows) */ abstract class Watcher { - /// Whether the watcher is active. - bool active; + /// Whether the watcher is active. + bool active; - /** - * Invoke some action on event. - */ - void invoke(); + /** + * Invoke some action on event. + */ + void invoke(); } class ConnectionWatcher : Watcher { - /// Watched socket. - private Socket socket_; - - /// Protocol factory. - protected Protocol delegate() protocolFactory; - - package PendingQueue!IOWatcher incoming; - - /** - * Params: - * socket = Socket. - */ - this(Socket socket) - { - socket_ = socket; - incoming = MmapPool.instance.make!(PendingQueue!IOWatcher); - } - - /// Ditto. - protected this() - { - } - - ~this() - { - MmapPool.instance.dispose(incoming); - } - - /* - * Params: - * P = Protocol should be used. - */ - void setProtocol(P : Protocol)() - { - this.protocolFactory = () => cast(Protocol) MmapPool.instance.make!P; - } - - /** - * Returns: Socket. - */ - @property inout(Socket) socket() inout pure nothrow @nogc - { - return socket_; - } - - /** - * Returns: New protocol instance. - */ - @property Protocol protocol() - in - { - assert(protocolFactory !is null, "Protocol isn't set."); - } - body - { - return protocolFactory(); - } - - /** - * Invokes new connection callback. - */ - override void invoke() - { - foreach (io; incoming) - { - io.protocol.connected(cast(DuplexTransport) io.transport); - } - } + /// Watched socket. + private Socket socket_; + + /// Protocol factory. + protected Protocol delegate() protocolFactory; + + package Queue!IOWatcher incoming; + + /** + * Params: + * socket = Socket. + */ + this(Socket socket) + { + socket_ = socket; + incoming = MmapPool.instance.make!(Queue!IOWatcher); + } + + /// Ditto. + protected this() + { + } + + ~this() + { + foreach (w; incoming) + { + MmapPool.instance.dispose(w); + } + MmapPool.instance.dispose(incoming); + } + + /* + * Params: + * P = Protocol should be used. + */ + void setProtocol(P : Protocol)() + { + this.protocolFactory = () => cast(Protocol) MmapPool.instance.make!P; + } + + /** + * Returns: Socket. + */ + @property inout(Socket) socket() inout pure nothrow @nogc + { + return socket_; + } + + /** + * Returns: New protocol instance. + */ + @property Protocol protocol() + in + { + assert(protocolFactory !is null, "Protocol isn't set."); + } + body + { + return protocolFactory(); + } + + /** + * Invokes new connection callback. + */ + override void invoke() + { + foreach (io; incoming) + { + io.protocol.connected(cast(DuplexTransport) io.transport); + } + } } /** @@ -122,121 +127,121 @@ class ConnectionWatcher : Watcher */ class IOWatcher : ConnectionWatcher { - /// If an exception was thrown the transport should be already invalid. - private union - { - StreamTransport transport_; - SocketException exception_; - } - - private Protocol protocol_; - - /** - * Returns: Underlying output buffer. - */ - package ReadBuffer output; - - /** - * Params: - * transport = Transport. - * protocol = New instance of the application protocol. - */ - this(StreamTransport transport, Protocol protocol) - in - { - assert(transport !is null); - assert(protocol !is null); - } - body - { - super(); - transport_ = transport; - protocol_ = protocol; - output = MmapPool.instance.make!ReadBuffer(); - active = true; - } - - /** - * Destroys the watcher. - */ - protected ~this() - { - MmapPool.instance.dispose(output); - MmapPool.instance.dispose(protocol_); - } - - /** - * Assigns a transport. - * - * Params: - * transport = Transport. - * protocol = Application protocol. - * - * Returns: $(D_KEYWORD this). - */ - IOWatcher opCall(StreamTransport transport, Protocol protocol) pure nothrow @nogc - in - { - assert(transport !is null); - assert(protocol !is null); - } - body - { - transport_ = transport; - protocol_ = protocol; - active = true; - return this; - } - - /** - * Returns: Transport used by this watcher. - */ - @property inout(StreamTransport) transport() inout pure nothrow @nogc - { - return transport_; - } - - /** - * Sets an exception occurred during a read/write operation. - * - * Params: - * exception = Thrown exception. - */ - @property void exception(SocketException exception) pure nothrow @nogc - { - exception_ = exception; - } - - /** - * Returns: Application protocol. - */ - override @property Protocol protocol() pure nothrow @safe @nogc - { - return protocol_; - } - - /** - * Returns: Socket. - */ - override @property inout(Socket) socket() inout pure nothrow @nogc - { - return transport.socket; - } - - /** - * Invokes the watcher callback. - */ - override void invoke() - { - if (output.length) - { - protocol.received(output[0..$]); - output.clear(); - } - else - { - protocol.disconnected(exception_); - active = false; - } - } + /// If an exception was thrown the transport should be already invalid. + private union + { + StreamTransport transport_; + SocketException exception_; + } + + private Protocol protocol_; + + /** + * Returns: Underlying output buffer. + */ + package ReadBuffer output; + + /** + * Params: + * transport = Transport. + * protocol = New instance of the application protocol. + */ + this(StreamTransport transport, Protocol protocol) + in + { + assert(transport !is null); + assert(protocol !is null); + } + body + { + super(); + transport_ = transport; + protocol_ = protocol; + output = MmapPool.instance.make!ReadBuffer(); + active = true; + } + + /** + * Destroys the watcher. + */ + protected ~this() + { + MmapPool.instance.dispose(output); + MmapPool.instance.dispose(protocol_); + } + + /** + * Assigns a transport. + * + * Params: + * transport = Transport. + * protocol = Application protocol. + * + * Returns: $(D_KEYWORD this). + */ + IOWatcher opCall(StreamTransport transport, Protocol protocol) pure nothrow @nogc + in + { + assert(transport !is null); + assert(protocol !is null); + } + body + { + transport_ = transport; + protocol_ = protocol; + active = true; + return this; + } + + /** + * Returns: Transport used by this watcher. + */ + @property inout(StreamTransport) transport() inout pure nothrow @nogc + { + return transport_; + } + + /** + * Sets an exception occurred during a read/write operation. + * + * Params: + * exception = Thrown exception. + */ + @property void exception(SocketException exception) pure nothrow @nogc + { + exception_ = exception; + } + + /** + * Returns: Application protocol. + */ + override @property Protocol protocol() pure nothrow @safe @nogc + { + return protocol_; + } + + /** + * Returns: Socket. + */ + override @property inout(Socket) socket() inout pure nothrow @nogc + { + return transport.socket; + } + + /** + * Invokes the watcher callback. + */ + override void invoke() + { + if (output.length) + { + protocol.received(output[0..$]); + output.clear(); + } + else + { + protocol.disconnected(exception_); + active = false; + } + } } diff --git a/source/tanya/container/queue.d b/source/tanya/container/queue.d index ef09da2..cee163d 100644 --- a/source/tanya/container/queue.d +++ b/source/tanya/container/queue.d @@ -121,7 +121,7 @@ class Queue(T) /** * Returns: $(D_KEYWORD true) if the queue is empty. */ - @property bool empty() inout const + @property bool empty() inout const pure nothrow @safe @nogc { return first.next is null; } @@ -170,7 +170,8 @@ class Queue(T) } /** - * $(D_KEYWORD foreach) iteration. + * $(D_KEYWORD foreach) iteration. The elements will be automatically + * dequeued. * * Params: * dg = $(D_KEYWORD foreach) body. |
