async: Switch to the internal use of the vector instead of built-in arrays

This commit is contained in:
Eugen Wissner 2017-01-12 10:17:12 +01:00
parent 4de42ca227
commit cb6cc65113
4 changed files with 54 additions and 41 deletions

View File

@ -18,6 +18,7 @@ import tanya.async.event.selector;
import tanya.async.loop; import tanya.async.loop;
import tanya.async.transport; import tanya.async.transport;
import tanya.async.watcher; import tanya.async.watcher;
import tanya.container.vector;
import tanya.memory; import tanya.memory;
import tanya.memory.mmappool; import tanya.memory.mmappool;
import tanya.network.socket; import tanya.network.socket;
@ -36,7 +37,7 @@ extern (C) nothrow @nogc
class EpollLoop : SelectorLoop class EpollLoop : SelectorLoop
{ {
protected int fd; protected int fd;
private epoll_event[] events; private Vector!epoll_event events;
/** /**
* Initializes the loop. * Initializes the loop.
@ -48,15 +49,14 @@ class EpollLoop : SelectorLoop
throw MmapPool.instance.make!BadLoopException("epoll initialization failed"); throw MmapPool.instance.make!BadLoopException("epoll initialization failed");
} }
super(); super();
MmapPool.instance.resizeArray(events, maxEvents); events = Vector!epoll_event(maxEvents, MmapPool.instance);
} }
/** /**
* Free loop internals. * Frees loop internals.
*/ */
~this() @nogc ~this() @nogc
{ {
MmapPool.instance.dispose(events);
close(fd); close(fd);
} }
@ -97,8 +97,8 @@ class EpollLoop : SelectorLoop
ev.data.fd = watcher.socket.handle; ev.data.fd = watcher.socket.handle;
ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0) ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
| (events & Event.write ? EPOLLOUT : 0) | (events & Event.write ? EPOLLOUT : 0)
| EPOLLET; | EPOLLET;
return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0; return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0;
} }
@ -110,7 +110,7 @@ class EpollLoop : SelectorLoop
{ {
// Don't block // Don't block
immutable timeout = cast(immutable int) blockTime.total!"msecs"; immutable timeout = cast(immutable int) blockTime.total!"msecs";
auto eventCount = epoll_wait(fd, events.ptr, maxEvents, timeout); auto eventCount = epoll_wait(fd, events.data.ptr, maxEvents, timeout);
if (eventCount < 0) if (eventCount < 0)
{ {

View File

@ -41,8 +41,19 @@ else version (DragonFlyBSD)
version (MacBSD): version (MacBSD):
import core.stdc.stdint; // intptr_t, uintptr_t import core.stdc.errno;
import core.sys.posix.time; // timespec import core.sys.posix.time; // timespec
import core.sys.posix.unistd;
import core.time;
import std.algorithm.comparison;
import tanya.async.event.selector;
import tanya.async.loop;
import tanya.async.transport;
import tanya.async.watcher;
import tanya.container.vector;
import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket;
void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) pure nothrow @nogc void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) pure nothrow @nogc
{ {
@ -102,24 +113,11 @@ extern(C) int kevent(int kq, const kevent_t *changelist, int nchanges,
kevent_t *eventlist, int nevents, const timespec *timeout) kevent_t *eventlist, int nevents, const timespec *timeout)
nothrow @nogc; nothrow @nogc;
import tanya.async.event.selector;
import tanya.async.loop;
import tanya.async.transport;
import tanya.async.watcher;
import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket;
import core.stdc.errno;
import core.sys.posix.unistd;
import core.sys.posix.sys.time;
import core.time;
import std.algorithm.comparison;
class KqueueLoop : SelectorLoop class KqueueLoop : SelectorLoop
{ {
protected int fd; protected int fd;
private kevent_t[] events; private Vector!kevent_t events;
private kevent_t[] changes; private Vector!kevent_t changes;
private size_t changeCount; private size_t changeCount;
/** /**
@ -138,19 +136,18 @@ class KqueueLoop : SelectorLoop
if ((fd = kqueue()) == -1) if ((fd = kqueue()) == -1)
{ {
throw MmapPool.instance.make!BadLoopException("epoll initialization failed"); throw make!BadLoopException(defaultAllocator,
"kqueue initialization failed");
} }
MmapPool.instance.resizeArray(events, 64); events = Vector!kevent_t(64, MmapPool.instance);
MmapPool.instance.resizeArray(changes, 64); changes = Vector!kevent_t(64, MmapPool.instance);
} }
/** /**
* Free loop internals. * Frees loop internals.
*/ */
~this() @nogc ~this() @nogc
{ {
MmapPool.instance.dispose(events);
MmapPool.instance.dispose(changes);
close(fd); close(fd);
} }
@ -158,7 +155,7 @@ class KqueueLoop : SelectorLoop
{ {
if (changes.length <= changeCount) if (changes.length <= changeCount)
{ {
MmapPool.instance.resizeArray(changes, changeCount + maxEvents); changes.length = changeCount + maxEvents;
} }
EV_SET(&changes[changeCount], EV_SET(&changes[changeCount],
cast(ulong) socket, cast(ulong) socket,
@ -216,10 +213,15 @@ class KqueueLoop : SelectorLoop
if (changeCount > maxEvents) if (changeCount > maxEvents)
{ {
MmapPool.instance.resizeArray(events, changes.length); events.length = changes.length;
} }
auto eventCount = kevent(fd, changes.ptr, cast(int) changeCount, events.ptr, maxEvents, &ts); auto eventCount = kevent(fd,
changes.data.ptr,
cast(int) changeCount,
events.data.ptr,
maxEvents,
&ts);
changeCount = 0; changeCount = 0;
if (eventCount < 0) if (eventCount < 0)

View File

@ -16,11 +16,10 @@ import tanya.async.loop;
import tanya.async.transport; import tanya.async.transport;
import tanya.async.watcher; import tanya.async.watcher;
import tanya.container.buffer; import tanya.container.buffer;
import tanya.container.vector;
import tanya.memory; import tanya.memory;
import tanya.memory.mmappool; import tanya.memory.mmappool;
import tanya.network.socket; import tanya.network.socket;
import core.sys.posix.netinet.in_;
import core.stdc.errno;
/** /**
* Transport for stream sockets. * Transport for stream sockets.
@ -103,12 +102,12 @@ class SelectorStreamTransport : StreamTransport
abstract class SelectorLoop : Loop abstract class SelectorLoop : Loop
{ {
/// Pending connections. /// Pending connections.
protected ConnectionWatcher[] connections; protected Vector!ConnectionWatcher connections;
this() @nogc this() @nogc
{ {
super(); super();
MmapPool.instance.resizeArray(connections, maxEvents); connections = Vector!ConnectionWatcher(maxEvents, MmapPool.instance);
} }
~this() @nogc ~this() @nogc
@ -124,7 +123,6 @@ abstract class SelectorLoop : Loop
connection = null; connection = null;
} }
} }
MmapPool.instance.dispose(connections);
} }
/** /**
@ -188,7 +186,7 @@ abstract class SelectorLoop : Loop
if (connections.length <= watcher.socket) if (connections.length <= watcher.socket)
{ {
MmapPool.instance.resizeArray(connections, watcher.socket.handle + maxEvents / 2); connections.length = watcher.socket.handle + maxEvents / 2;
} }
connections[watcher.socket.handle] = watcher; connections[watcher.socket.handle] = watcher;
@ -234,7 +232,7 @@ abstract class SelectorLoop : Loop
} }
else else
{ {
MmapPool.instance.resizeArray(connections, client.handle + maxEvents / 2); connections.length = client.handle + maxEvents / 2;
} }
if (io is null) if (io is null)
{ {

View File

@ -1083,7 +1083,7 @@ struct Vector(T)
* Params: * Params:
* dg = $(D_KEYWORD foreach) body. * dg = $(D_KEYWORD foreach) body.
*/ */
int opApply(scope int delegate(ref T) dg) int opApply(scope int delegate(ref T) @nogc dg)
{ {
T* end = vector + length_ - 1; T* end = vector + length_ - 1;
for (T* begin = vector; begin != end; ++begin) for (T* begin = vector; begin != end; ++begin)
@ -1098,7 +1098,7 @@ struct Vector(T)
} }
/// Ditto. /// Ditto.
int opApply(scope int delegate(ref size_t i, ref T) dg) int opApply(scope int delegate(ref size_t i, ref T) @nogc dg)
{ {
for (size_t i = 0; i < length_; ++i) for (size_t i = 0; i < length_; ++i)
{ {
@ -1395,6 +1395,19 @@ struct Vector(T)
assert(v2[1] == 3); assert(v2[1] == 3);
} }
/**
* Returns an array used internally by the vector to store its owned elements.
* The length of the returned array may differ from the size of the allocated
* memory for the vector: the array contains only initialized elements, but
* not the reserved memory.
*
* Returns: The array with elements of this vector.
*/
inout(T[]) data() inout
{
return vector[0 .. length];
}
mixin DefaultAllocator; mixin DefaultAllocator;
} }