Use defaultAllocator in the async

Instead of hard-coded MmapPool.
This commit is contained in:
Eugen Wissner 2018-03-05 17:42:44 +01:00
parent b795267e75
commit fbbdb36853
7 changed files with 33 additions and 42 deletions

View File

@ -31,7 +31,6 @@ import tanya.async.transport;
import tanya.async.watcher; import tanya.async.watcher;
import tanya.container.array; import tanya.container.array;
import tanya.memory; import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket; import tanya.network.socket;
extern (C) nothrow @nogc extern (C) nothrow @nogc
@ -56,7 +55,7 @@ final class EpollLoop : SelectorLoop
throw defaultAllocator.make!BadLoopException("epoll initialization failed"); throw defaultAllocator.make!BadLoopException("epoll initialization failed");
} }
super(); super();
events = Array!epoll_event(maxEvents, MmapPool.instance); events = Array!epoll_event(maxEvents);
} }
/** /**

View File

@ -27,7 +27,6 @@ import tanya.async.transport;
import tanya.async.watcher; import tanya.async.watcher;
import tanya.container.buffer; import tanya.container.buffer;
import tanya.memory; import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket; import tanya.network.socket;
import tanya.sys.windows.winbase; import tanya.sys.windows.winbase;
@ -57,8 +56,8 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
this(OverlappedConnectedSocket socket) @nogc this(OverlappedConnectedSocket socket) @nogc
{ {
super(socket); super(socket);
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); output = ReadBuffer!ubyte(8192, 1024);
input = WriteBuffer!ubyte(8192, MmapPool.instance); input = WriteBuffer!ubyte(8192);
active = true; active = true;
} }
@ -117,8 +116,7 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
/** /**
* Switches the protocol. * Switches the protocol.
* *
* The protocol is deallocated by the event loop, it should currently be * The protocol is deallocated by the event loop.
* allocated with $(D_PSYMBOL MmapPool).
* *
* Params: * Params:
* protocol = Application protocol. * protocol = Application protocol.
@ -150,20 +148,20 @@ final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
SocketState overlapped; SocketState overlapped;
try try
{ {
overlapped = MmapPool.instance.make!SocketState; overlapped = defaultAllocator.make!SocketState;
socket.beginSend(input[], overlapped); socket.beginSend(input[], overlapped);
} }
catch (SocketException e) catch (SocketException e)
{ {
MmapPool.instance.dispose(overlapped); defaultAllocator.dispose(overlapped);
MmapPool.instance.dispose(e); defaultAllocator.dispose(e);
} }
} }
} }
else else
{ {
protocol.disconnected(exception); protocol.disconnected(exception);
MmapPool.instance.dispose(protocol_); defaultAllocator.dispose(protocol_);
defaultAllocator.dispose(exception); defaultAllocator.dispose(exception);
active = false; active = false;
} }
@ -221,12 +219,12 @@ final class IOCPLoop : Loop
try try
{ {
overlapped = MmapPool.instance.make!SocketState; overlapped = defaultAllocator.make!SocketState;
socket.beginAccept(overlapped); socket.beginAccept(overlapped);
} }
catch (SocketException e) catch (SocketException e)
{ {
MmapPool.instance.dispose(overlapped); defaultAllocator.dispose(overlapped);
defaultAllocator.dispose(e); defaultAllocator.dispose(e);
return false; return false;
} }
@ -250,12 +248,12 @@ final class IOCPLoop : Loop
{ {
try try
{ {
overlapped = MmapPool.instance.make!SocketState; overlapped = defaultAllocator.make!SocketState;
transport.socket.beginReceive(transport.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
} }
catch (SocketException e) catch (SocketException e)
{ {
MmapPool.instance.dispose(overlapped); defaultAllocator.dispose(overlapped);
defaultAllocator.dispose(e); defaultAllocator.dispose(e);
return false; return false;
} }
@ -302,7 +300,7 @@ final class IOCPLoop : Loop
assert(overlapped !is null); assert(overlapped !is null);
scope (failure) scope (failure)
{ {
MmapPool.instance.dispose(overlapped); defaultAllocator.dispose(overlapped);
} }
switch (overlapped.event) switch (overlapped.event)
@ -315,7 +313,7 @@ final class IOCPLoop : Loop
assert(listener !is null); assert(listener !is null);
auto socket = listener.endAccept(overlapped); auto socket = listener.endAccept(overlapped);
auto transport = MmapPool.instance.make!StreamTransport(socket); auto transport = defaultAllocator.make!StreamTransport(socket);
connection.incoming.enqueue(transport); connection.incoming.enqueue(transport);
@ -330,8 +328,8 @@ final class IOCPLoop : Loop
if (!transport.active) if (!transport.active)
{ {
MmapPool.instance.dispose(transport); defaultAllocator.dispose(transport);
MmapPool.instance.dispose(overlapped); defaultAllocator.dispose(overlapped);
return; return;
} }

View File

@ -59,7 +59,6 @@ import tanya.async.transport;
import tanya.async.watcher; import tanya.async.watcher;
import tanya.container.array; import tanya.container.array;
import tanya.memory; import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket; import tanya.network.socket;
void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) pure nothrow @nogc void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) pure nothrow @nogc
@ -146,8 +145,8 @@ final class KqueueLoop : SelectorLoop
throw make!BadLoopException(defaultAllocator, throw make!BadLoopException(defaultAllocator,
"kqueue initialization failed"); "kqueue initialization failed");
} }
events = Array!kevent_t(64, MmapPool.instance); events = Array!kevent_t(64);
changes = Array!kevent_t(64, MmapPool.instance); changes = Array!kevent_t(64);
} }
/** /**

View File

@ -26,7 +26,6 @@ import tanya.async.watcher;
import tanya.container.array; import tanya.container.array;
import tanya.container.buffer; import tanya.container.buffer;
import tanya.memory; import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket; import tanya.network.socket;
/** /**
@ -65,8 +64,8 @@ package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{ {
super(socket); super(socket);
this.loop = loop; this.loop = loop;
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); output = ReadBuffer!ubyte(8192, 1024);
input = WriteBuffer!ubyte(8192, MmapPool.instance); input = WriteBuffer!ubyte(8192);
active = true; active = true;
} }
@ -107,8 +106,7 @@ package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
/** /**
* Switches the protocol. * Switches the protocol.
* *
* The protocol is deallocated by the event loop, it should currently be * The protocol is deallocated by the event loop.
* allocated with $(D_PSYMBOL MmapPool).
* *
* Params: * Params:
* protocol = Application protocol. * protocol = Application protocol.
@ -163,7 +161,7 @@ package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
else else
{ {
protocol.disconnected(exception); protocol.disconnected(exception);
MmapPool.instance.dispose(protocol_); defaultAllocator.dispose(protocol_);
defaultAllocator.dispose(exception); defaultAllocator.dispose(exception);
active = false; active = false;
} }
@ -220,7 +218,7 @@ abstract class SelectorLoop : Loop
this() @nogc this() @nogc
{ {
super(); super();
connections = Array!SocketWatcher(maxEvents, MmapPool.instance); connections = Array!SocketWatcher(maxEvents);
} }
~this() @nogc ~this() @nogc
@ -231,7 +229,7 @@ abstract class SelectorLoop : Loop
// created by the user and should be freed by himself. // created by the user and should be freed by himself.
if (cast(StreamTransport) connection !is null) if (cast(StreamTransport) connection !is null)
{ {
MmapPool.instance.dispose(connection); defaultAllocator.dispose(connection);
} }
} }
} }
@ -387,7 +385,7 @@ abstract class SelectorLoop : Loop
} }
if (transport is null) if (transport is null)
{ {
transport = MmapPool.instance.make!StreamTransport(this, client); transport = defaultAllocator.make!StreamTransport(this, client);
connections[client.handle] = transport; connections[client.handle] = transport;
} }
else else

View File

@ -78,7 +78,6 @@ import tanya.async.watcher;
import tanya.container.buffer; import tanya.container.buffer;
import tanya.container.queue; import tanya.container.queue;
import tanya.memory; import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket; import tanya.network.socket;
version (DisableBackends) version (DisableBackends)
@ -189,7 +188,7 @@ abstract class Loop
*/ */
this() @nogc this() @nogc
{ {
pendings = Queue!Watcher(MmapPool.instance); pendings = Queue!Watcher();
} }
/** /**
@ -199,7 +198,7 @@ abstract class Loop
{ {
foreach (w; pendings) foreach (w; pendings)
{ {
MmapPool.instance.dispose(w); defaultAllocator.dispose(w);
} }
} }
@ -384,16 +383,16 @@ class BadLoopException : Exception
} }
version (Epoll) version (Epoll)
{ {
defaultLoop_ = MmapPool.instance.make!EpollLoop; defaultLoop_ = defaultAllocator.make!EpollLoop;
} }
else version (IOCP) else version (IOCP)
{ {
defaultLoop_ = MmapPool.instance.make!IOCPLoop; defaultLoop_ = defaultAllocator.make!IOCPLoop;
} }
else version (Kqueue) else version (Kqueue)
{ {
import tanya.async.event.kqueue; import tanya.async.event.kqueue;
defaultLoop_ = MmapPool.instance.make!KqueueLoop; defaultLoop_ = defaultAllocator.make!KqueueLoop;
} }
return defaultLoop_; return defaultLoop_;
} }

View File

@ -65,8 +65,7 @@ interface DuplexTransport : ReadTransport, WriteTransport
/** /**
* Switches the protocol. * Switches the protocol.
* *
* The protocol is deallocated by the event loop, it should currently be * The protocol is deallocated by the event loop.
* allocated with $(D_PSYMBOL MmapPool).
* *
* Params: * Params:
* protocol = Application protocol. * protocol = Application protocol.

View File

@ -22,7 +22,6 @@ import tanya.async.transport;
import tanya.container.buffer; import tanya.container.buffer;
import tanya.container.queue; import tanya.container.queue;
import tanya.memory; import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket; import tanya.network.socket;
/** /**
@ -103,7 +102,7 @@ class ConnectionWatcher : SocketWatcher
this(Socket socket) @nogc this(Socket socket) @nogc
{ {
super(socket); super(socket);
incoming = Queue!DuplexTransport(MmapPool.instance); incoming = Queue!DuplexTransport();
} }
/** /**
@ -112,7 +111,7 @@ class ConnectionWatcher : SocketWatcher
*/ */
void setProtocol(P : Protocol)() @nogc void setProtocol(P : Protocol)() @nogc
{ {
this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P; this.protocolFactory = () @nogc => cast(Protocol) defaultAllocator.make!P;
} }
/** /**