Replace class Queue with the struct Queue

This commit is contained in:
2016-12-13 10:56:29 +01:00
parent 711855474c
commit ab9f96e0c7
10 changed files with 141 additions and 113 deletions

View File

@@ -26,6 +26,13 @@ import core.sys.posix.unistd;
import core.time;
import std.algorithm.comparison;
extern (C) nothrow @nogc
{
int epoll_create1(int flags);
int epoll_ctl (int epfd, int op, int fd, epoll_event *event);
int epoll_wait (int epfd, epoll_event *events, int maxevents, int timeout);
}
class EpollLoop : SelectorLoop
{
protected int fd;
@@ -34,7 +41,7 @@ class EpollLoop : SelectorLoop
/**
* Initializes the loop.
*/
this()
this() @nogc
{
if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
{
@@ -47,7 +54,7 @@ class EpollLoop : SelectorLoop
/**
* Free loop internals.
*/
~this()
~this() @nogc
{
MmapPool.instance.dispose(events);
close(fd);
@@ -63,7 +70,9 @@ class EpollLoop : SelectorLoop
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
protected override bool reify(ConnectionWatcher watcher, EventMask oldEvents, EventMask events)
protected override bool reify(ConnectionWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
in
{
assert(watcher !is null);
@@ -97,7 +106,7 @@ class EpollLoop : SelectorLoop
/**
* Does the actual polling.
*/
protected override void poll()
protected override void poll() @nogc
{
// Don't block
immutable timeout = cast(immutable int) blockTime.total!"msecs";

View File

@@ -37,7 +37,7 @@ class IOCPStreamTransport : StreamTransport
* Params:
* socket = Socket.
*/
this(OverlappedConnectedSocket socket)
this(OverlappedConnectedSocket socket) @nogc
in
{
assert(socket !is null);
@@ -53,7 +53,8 @@ class IOCPStreamTransport : StreamTransport
MmapPool.instance.dispose(input);
}
@property inout(OverlappedConnectedSocket) socket() inout pure nothrow @safe @nogc
@property inout(OverlappedConnectedSocket) socket()
inout pure nothrow @safe @nogc
{
return socket_;
}
@@ -64,7 +65,7 @@ class IOCPStreamTransport : StreamTransport
* Params:
* data = Data to send.
*/
void write(ubyte[] data)
void write(ubyte[] data) @nogc
{
immutable empty = input.length == 0;
input ~= data;
@@ -85,8 +86,8 @@ class IOCPStreamTransport : StreamTransport
}
}
class IOCPLoop : Loop
{
class IOCPLoop : Loop
{
protected HANDLE completionPort;
protected OVERLAPPED overlap;
@@ -94,7 +95,7 @@ class IOCPStreamTransport : StreamTransport
/**
* Initializes the loop.
*/
this()
this() @nogc
{
super();
@@ -118,7 +119,7 @@ class IOCPStreamTransport : StreamTransport
*/
override protected bool reify(ConnectionWatcher watcher,
EventMask oldEvents,
EventMask events)
EventMask events) @nogc
{
SocketState overlapped;
if (!(oldEvents & Event.accept) && (events & Event.accept))
@@ -185,7 +186,7 @@ class IOCPStreamTransport : StreamTransport
/**
* Does the actual polling.
*/
override protected void poll()
override protected void poll() @nogc
{
DWORD lpNumberOfBytes;
ULONG_PTR key;

View File

@@ -151,12 +151,13 @@ class KqueueLoop : SelectorLoop
* Returns: Maximal event count can be got at a time
* (should be supported by the backend).
*/
override protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc
override protected @property inout(uint) maxEvents()
inout const pure nothrow @safe @nogc
{
return cast(uint) events.length;
}
this()
this() @nogc
{
super();
@@ -171,14 +172,14 @@ class KqueueLoop : SelectorLoop
/**
* Free loop internals.
*/
~this()
~this() @nogc
{
MmapPool.instance.dispose(events);
MmapPool.instance.dispose(changes);
close(fd);
}
private void set(socket_t socket, short filter, ushort flags)
private void set(socket_t socket, short filter, ushort flags) @nogc
{
if (changes.length <= changeCount)
{
@@ -206,7 +207,7 @@ class KqueueLoop : SelectorLoop
*/
override protected bool reify(ConnectionWatcher watcher,
EventMask oldEvents,
EventMask events)
EventMask events) @nogc
{
if (events != oldEvents)
{
@@ -233,7 +234,7 @@ class KqueueLoop : SelectorLoop
/**
* Does the actual polling.
*/
protected override void poll()
protected override void poll() @nogc
{
timespec ts;
blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec);
@@ -316,7 +317,7 @@ class KqueueLoop : SelectorLoop
* Returns: The blocking time.
*/
override protected @property inout(Duration) blockTime()
inout @safe pure nothrow
inout @nogc @safe pure nothrow
{
return min(super.blockTime, 1.dur!"seconds");
}
@@ -333,7 +334,8 @@ class KqueueLoop : SelectorLoop
* completed or scheduled, $(D_KEYWORD false) otherwise (the
* transport is be destroyed then).
*/
protected override bool feed(SelectorStreamTransport transport, SocketException exception = null)
protected override bool feed(SelectorStreamTransport transport,
SocketException exception = null) @nogc
{
if (!super.feed(transport, exception))
{

View File

@@ -42,7 +42,7 @@ class SelectorStreamTransport : StreamTransport
* loop = Event loop.
* socket = Socket.
*/
this(SelectorLoop loop, ConnectedSocket socket)
this(SelectorLoop loop, ConnectedSocket socket) @nogc
{
socket_ = socket;
this.loop = loop;
@@ -52,7 +52,7 @@ class SelectorStreamTransport : StreamTransport
/**
* Close the transport and deallocate the data buffers.
*/
~this()
~this() @nogc
{
MmapPool.instance.dispose(input);
}
@@ -71,7 +71,7 @@ class SelectorStreamTransport : StreamTransport
* Params:
* data = Data to send.
*/
void write(ubyte[] data)
void write(ubyte[] data) @nogc
{
if (!data.length)
{
@@ -113,13 +113,13 @@ abstract class SelectorLoop : Loop
/// Pending connections.
protected ConnectionWatcher[] connections;
this()
this() @nogc
{
super();
connections = MmapPool.instance.makeArray!ConnectionWatcher(maxEvents);
}
~this()
~this() @nogc
{
foreach (ref connection; connections)
{
@@ -147,7 +147,8 @@ abstract class SelectorLoop : Loop
* completed or scheduled, $(D_KEYWORD false) otherwise (the
* transport will be destroyed then).
*/
protected bool feed(SelectorStreamTransport transport, SocketException exception = null)
protected bool feed(SelectorStreamTransport transport,
SocketException exception = null) @nogc
{
while (transport.input.length && transport.writeReady)
{
@@ -186,7 +187,7 @@ abstract class SelectorLoop : Loop
* Params:
* watcher = Watcher.
*/
override void start(ConnectionWatcher watcher)
override void start(ConnectionWatcher watcher) @nogc
{
if (watcher.active)
{
@@ -208,7 +209,7 @@ abstract class SelectorLoop : Loop
* Params:
* connection = Connection watcher ready to accept.
*/
package void acceptConnections(ConnectionWatcher connection)
package void acceptConnections(ConnectionWatcher connection) @nogc
in
{
assert(connection !is null);

View File

@@ -131,15 +131,16 @@ alias EventMask = BitFlags!Event;
abstract class Loop
{
/// Pending watchers.
protected Queue!Watcher pendings;
protected Queue!Watcher* pendings;
protected Queue!Watcher swapPendings;
protected Queue!Watcher* swapPendings;
/**
* Returns: Maximal event count can be got at a time
* (should be supported by the backend).
*/
protected @property inout(uint) maxEvents() inout const pure nothrow @safe @nogc
protected @property inout(uint) maxEvents()
inout const pure nothrow @safe @nogc
{
return 128U;
}
@@ -147,7 +148,7 @@ abstract class Loop
/**
* Initializes the loop.
*/
this()
this() @nogc
{
pendings = MmapPool.instance.make!(Queue!Watcher);
swapPendings = MmapPool.instance.make!(Queue!Watcher);
@@ -156,15 +157,15 @@ abstract class Loop
/**
* Frees loop internals.
*/
~this()
~this() @nogc
{
foreach (w; pendings)
foreach (w; *pendings)
{
MmapPool.instance.dispose(w);
}
MmapPool.instance.dispose(pendings);
foreach (w; swapPendings)
foreach (w; *swapPendings)
{
MmapPool.instance.dispose(w);
}
@@ -174,7 +175,7 @@ abstract class Loop
/**
* Starts the loop.
*/
void run()
void run() @nogc
{
done_ = false;
do
@@ -182,7 +183,7 @@ abstract class Loop
poll();
// Invoke pendings
swapPendings.each!((ref p) => p.invoke());
swapPendings.each!((ref p) @nogc => p.invoke());
swap(pendings, swapPendings);
}
@@ -192,7 +193,7 @@ abstract class Loop
/**
* Break out of the loop.
*/
void unloop() @safe pure nothrow
void unloop() @safe pure nothrow @nogc
{
done_ = true;
}
@@ -203,7 +204,7 @@ abstract class Loop
* Params:
* watcher = Watcher.
*/
void start(ConnectionWatcher watcher)
void start(ConnectionWatcher watcher) @nogc
{
if (watcher.active)
{
@@ -219,7 +220,7 @@ abstract class Loop
* Params:
* watcher = Watcher.
*/
void stop(ConnectionWatcher watcher)
void stop(ConnectionWatcher watcher) @nogc
{
if (!watcher.active)
{
@@ -242,13 +243,13 @@ abstract class Loop
*/
abstract protected bool reify(ConnectionWatcher watcher,
EventMask oldEvents,
EventMask events);
EventMask events) @nogc;
/**
* Returns: The blocking time.
*/
protected @property inout(Duration) blockTime()
inout @safe pure nothrow
inout @safe pure nothrow @nogc
{
// Don't block if we have to do.
return swapPendings.empty ? blockTime_ : Duration.zero;
@@ -261,7 +262,7 @@ abstract class Loop
* blockTime = The blocking time. Cannot be larger than
* $(D_PSYMBOL maxBlockTime).
*/
protected @property void blockTime(in Duration blockTime) @safe pure nothrow
protected @property void blockTime(in Duration blockTime) @safe pure nothrow @nogc
in
{
assert(blockTime <= 1.dur!"hours", "Too long to wait.");
@@ -275,7 +276,7 @@ abstract class Loop
/**
* Kills the watcher and closes the connection.
*/
protected void kill(IOWatcher watcher, SocketException exception)
protected void kill(IOWatcher watcher, SocketException exception) @nogc
{
watcher.socket.shutdown();
defaultAllocator.dispose(watcher.socket);
@@ -287,7 +288,7 @@ abstract class Loop
/**
* Does the actual polling.
*/
abstract protected void poll();
abstract protected void poll() @nogc;
/// Whether the event loop should be stopped.
private bool done_;
@@ -321,7 +322,7 @@ class BadLoopException : Exception
*
* Returns: The default event loop.
*/
@property Loop defaultLoop()
@property Loop defaultLoop() @nogc
{
if (defaultLoop_ !is null)
{
@@ -354,7 +355,7 @@ class BadLoopException : Exception
* Params:
* loop = The event loop.
*/
@property void defaultLoop(Loop loop)
@property void defaultLoop(Loop loop) @nogc
in
{
assert(loop !is null);

View File

@@ -22,7 +22,7 @@ interface Protocol
* Params:
* data = Read data.
*/
void received(ubyte[] data);
void received(ubyte[] data) @nogc;
/**
* Called when a connection is made.
@@ -30,7 +30,7 @@ interface Protocol
* Params:
* transport = Protocol transport.
*/
void connected(DuplexTransport transport);
void connected(DuplexTransport transport) @nogc;
/**
* Called when a connection is lost.
@@ -39,7 +39,7 @@ interface Protocol
* exception = $(D_PSYMBOL Exception) if an error caused
* the disconnect, $(D_KEYWORD null) otherwise.
*/
void disconnected(SocketException exception = null);
void disconnected(SocketException exception = null) @nogc;
}
/**

View File

@@ -37,7 +37,7 @@ interface WriteTransport : Transport
* Params:
* data = Data to send.
*/
void write(ubyte[] data);
void write(ubyte[] data) @nogc;
}
/**

View File

@@ -42,7 +42,7 @@ abstract class Watcher
/**
* Invoke some action on event.
*/
void invoke();
void invoke() @nogc;
}
class ConnectionWatcher : Watcher
@@ -51,28 +51,28 @@ class ConnectionWatcher : Watcher
private Socket socket_;
/// Protocol factory.
protected Protocol delegate() protocolFactory;
protected Protocol delegate() @nogc protocolFactory;
package Queue!IOWatcher incoming;
package Queue!IOWatcher* incoming;
/**
* Params:
* socket = Socket.
*/
this(Socket socket)
this(Socket socket) @nogc
{
socket_ = socket;
incoming = MmapPool.instance.make!(Queue!IOWatcher);
}
/// Ditto.
protected this()
protected this() pure nothrow @safe @nogc
{
}
~this()
~this() @nogc
{
foreach (w; incoming)
foreach (w; *incoming)
{
MmapPool.instance.dispose(w);
}
@@ -83,9 +83,9 @@ class ConnectionWatcher : Watcher
* Params:
* P = Protocol should be used.
*/
void setProtocol(P : Protocol)()
void setProtocol(P : Protocol)() @nogc
{
this.protocolFactory = () => cast(Protocol) MmapPool.instance.make!P;
this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
}
/**
@@ -99,7 +99,7 @@ class ConnectionWatcher : Watcher
/**
* Returns: New protocol instance.
*/
@property Protocol protocol()
@property Protocol protocol() @nogc
in
{
assert(protocolFactory !is null, "Protocol isn't set.");
@@ -112,9 +112,9 @@ class ConnectionWatcher : Watcher
/**
* Invokes new connection callback.
*/
override void invoke()
override void invoke() @nogc
{
foreach (io; incoming)
foreach (io; *incoming)
{
io.protocol.connected(cast(DuplexTransport) io.transport);
}
@@ -146,7 +146,7 @@ class IOWatcher : ConnectionWatcher
* transport = Transport.
* protocol = New instance of the application protocol.
*/
this(StreamTransport transport, Protocol protocol)
this(StreamTransport transport, Protocol protocol) @nogc
in
{
assert(transport !is null);
@@ -164,7 +164,7 @@ class IOWatcher : ConnectionWatcher
/**
* Destroys the watcher.
*/
protected ~this()
protected ~this() @nogc
{
MmapPool.instance.dispose(output);
MmapPool.instance.dispose(protocol_);
@@ -179,7 +179,8 @@ class IOWatcher : ConnectionWatcher
*
* Returns: $(D_KEYWORD this).
*/
IOWatcher opCall(StreamTransport transport, Protocol protocol) pure nothrow @nogc
IOWatcher opCall(StreamTransport transport, Protocol protocol)
pure nothrow @nogc
in
{
assert(transport !is null);
@@ -231,7 +232,7 @@ class IOWatcher : ConnectionWatcher
/**
* Invokes the watcher callback.
*/
override void invoke()
override void invoke() @nogc
{
if (output.length)
{