Switch to container.queue. Remove PendingQueue

This commit is contained in:
Eugen Wissner 2016-12-02 19:18:37 +01:00
parent 1123d01e6c
commit f7f92e7906
10 changed files with 1289 additions and 1399 deletions

View File

@ -10,6 +10,7 @@
* *
* --- * ---
* import tanya.async; * import tanya.async;
* import tanya.memory;
* import tanya.network.socket; * import tanya.network.socket;
* *
* class EchoProtocol : TransmissionControlProtocol * class EchoProtocol : TransmissionControlProtocol
@ -33,44 +34,48 @@
* *
* void main() * void main()
* { * {
* auto address = new InternetAddress("127.0.0.1", cast(ushort) 8192); * auto address = theAllocator.make!InternetAddress("127.0.0.1", cast(ushort) 8192);
* *
* version (Windows) * version (Windows)
* { * {
* auto sock = new OverlappedStreamSocket(AddressFamily.INET); * auto sock = theAllocator.make!OverlappedStreamSocket(AddressFamily.INET);
* } * }
* else * else
* { * {
* auto sock = new StreamSocket(AddressFamily.INET); * auto sock = theAllocator.make!StreamSocket(AddressFamily.INET);
* sock.blocking = false; * sock.blocking = false;
* } * }
* *
* sock.bind(address); * sock.bind(address);
* sock.listen(5); * sock.listen(5);
* *
* auto io = new ConnectionWatcher(sock); * auto io = theAllocator.make!ConnectionWatcher(sock);
* io.setProtocol!EchoProtocol; * io.setProtocol!EchoProtocol;
* *
* defaultLoop.start(io); * defaultLoop.start(io);
* defaultLoop.run(); * defaultLoop.run();
* *
* sock.shutdown(); * sock.shutdown();
* theAllocator.dispose(io);
* theAllocator.dispose(sock);
* theAllocator.dispose(address);
* } * }
* --- * ---
*/ */
module tanya.async.loop; module tanya.async.loop;
import tanya.async.protocol;
import tanya.async.transport;
import tanya.async.watcher;
import tanya.container.buffer;
import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket;
import core.time; import core.time;
import std.algorithm.iteration; import std.algorithm.iteration;
import std.algorithm.mutation; import std.algorithm.mutation;
import std.typecons; import std.typecons;
import tanya.async.protocol;
import tanya.async.transport;
import tanya.async.watcher;
import tanya.container.buffer;
import tanya.container.queue;
import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket;
version (DisableBackends) version (DisableBackends)
{ {
@ -126,9 +131,9 @@ alias EventMask = BitFlags!Event;
abstract class Loop abstract class Loop
{ {
/// Pending watchers. /// Pending watchers.
protected PendingQueue!Watcher pendings; protected Queue!Watcher pendings;
protected PendingQueue!Watcher swapPendings; protected Queue!Watcher swapPendings;
/** /**
* Returns: Maximal event count can be got at a time * Returns: Maximal event count can be got at a time
@ -144,8 +149,8 @@ abstract class Loop
*/ */
this() this()
{ {
pendings = MmapPool.instance.make!(PendingQueue!Watcher); pendings = MmapPool.instance.make!(Queue!Watcher);
swapPendings = MmapPool.instance.make!(PendingQueue!Watcher); swapPendings = MmapPool.instance.make!(Queue!Watcher);
} }
/** /**
@ -153,7 +158,16 @@ abstract class Loop
*/ */
~this() ~this()
{ {
foreach (w; pendings)
{
MmapPool.instance.dispose(w);
}
MmapPool.instance.dispose(pendings); MmapPool.instance.dispose(pendings);
foreach (w; swapPendings)
{
MmapPool.instance.dispose(w);
}
MmapPool.instance.dispose(swapPendings); MmapPool.instance.dispose(swapPendings);
} }
@ -287,7 +301,6 @@ abstract class Loop
*/ */
class BadLoopException : Exception class BadLoopException : Exception
{ {
@nogc:
/** /**
* Params: * Params:
* file = The file where the exception occurred. * file = The file where the exception occurred.
@ -295,7 +308,7 @@ class BadLoopException : Exception
* next = The previous exception in the chain of exceptions, if any. * next = The previous exception in the chain of exceptions, if any.
*/ */
this(string file = __FILE__, size_t line = __LINE__, Throwable next = null) this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
pure @safe nothrow const pure nothrow const @safe @nogc
{ {
super("Event loop cannot be initialized.", file, line, next); super("Event loop cannot be initialized.", file, line, next);
} }
@ -352,132 +365,3 @@ body
} }
private Loop defaultLoop_; private Loop defaultLoop_;
/**
* Queue.
*
* Params:
* T = Content type.
*/
class PendingQueue(T)
{
/**
* Creates a new $(D_PSYMBOL Queue).
*/
this()
{
}
/**
* Removes all elements from the queue.
*/
~this()
{
foreach (e; this)
{
MmapPool.instance.dispose(e);
}
}
/**
* Returns: First element.
*/
@property ref T front()
in
{
assert(!empty);
}
body
{
return first.next.content;
}
/**
* Inserts a new element.
*
* Params:
* x = New element.
*
* Returns: $(D_KEYWORD this).
*/
typeof(this) insertBack(T x)
{
Entry* temp = MmapPool.instance.make!Entry;
temp.content = x;
if (empty)
{
first.next = rear = temp;
}
else
{
rear.next = temp;
rear = rear.next;
}
return this;
}
alias insert = insertBack;
/**
* Inserts a new element.
*
* Params:
* x = New element.
*
* Returns: $(D_KEYWORD this).
*/
typeof(this) opOpAssign(string Op)(ref T x)
if (Op == "~")
{
return insertBack(x);
}
/**
* Returns: $(D_KEYWORD true) if the queue is empty.
*/
@property bool empty() const @safe pure nothrow
{
return first.next is null;
}
/**
* Move position to the next element.
*
* Returns: $(D_KEYWORD this).
*/
typeof(this) popFront()
in
{
assert(!empty);
}
body
{
auto n = first.next.next;
MmapPool.instance.dispose(first.next);
first.next = n;
return this;
}
/**
* Queue entry.
*/
protected struct Entry
{
/// Queue item content.
T content;
/// Next list item.
Entry* next;
}
/// The first element of the list.
protected Entry first;
/// The last element of the list.
protected Entry* rear;
}

View File

@ -10,15 +10,16 @@
*/ */
module tanya.async.watcher; module tanya.async.watcher;
import std.functional;
import std.exception;
import tanya.async.loop; import tanya.async.loop;
import tanya.async.protocol; import tanya.async.protocol;
import tanya.async.transport; import tanya.async.transport;
import tanya.container.buffer; import tanya.container.buffer;
import tanya.container.queue;
import tanya.memory; import tanya.memory;
import tanya.memory.mmappool; import tanya.memory.mmappool;
import tanya.network.socket; import tanya.network.socket;
import std.functional;
import std.exception;
version (Windows) version (Windows)
{ {
@ -52,7 +53,7 @@ class ConnectionWatcher : Watcher
/// Protocol factory. /// Protocol factory.
protected Protocol delegate() protocolFactory; protected Protocol delegate() protocolFactory;
package PendingQueue!IOWatcher incoming; package Queue!IOWatcher incoming;
/** /**
* Params: * Params:
@ -61,7 +62,7 @@ class ConnectionWatcher : Watcher
this(Socket socket) this(Socket socket)
{ {
socket_ = socket; socket_ = socket;
incoming = MmapPool.instance.make!(PendingQueue!IOWatcher); incoming = MmapPool.instance.make!(Queue!IOWatcher);
} }
/// Ditto. /// Ditto.
@ -71,6 +72,10 @@ class ConnectionWatcher : Watcher
~this() ~this()
{ {
foreach (w; incoming)
{
MmapPool.instance.dispose(w);
}
MmapPool.instance.dispose(incoming); MmapPool.instance.dispose(incoming);
} }

View File

@ -121,7 +121,7 @@ class Queue(T)
/** /**
* Returns: $(D_KEYWORD true) if the queue is empty. * Returns: $(D_KEYWORD true) if the queue is empty.
*/ */
@property bool empty() inout const @property bool empty() inout const pure nothrow @safe @nogc
{ {
return first.next is null; return first.next is null;
} }
@ -170,7 +170,8 @@ class Queue(T)
} }
/** /**
* $(D_KEYWORD foreach) iteration. * $(D_KEYWORD foreach) iteration. The elements will be automatically
* dequeued.
* *
* Params: * Params:
* dg = $(D_KEYWORD foreach) body. * dg = $(D_KEYWORD foreach) body.