Merge branch 'master' into utf8string

This commit is contained in:
Eugen Wissner 2017-02-20 08:02:01 +01:00
commit 074d027629
8 changed files with 308 additions and 212 deletions

View File

@ -14,6 +14,7 @@ Tanya extends Phobos functionality and provides alternative implementations for
data structures and utilities that depend on the Garbage Collector in Phobos. data structures and utilities that depend on the Garbage Collector in Phobos.
* [Bug tracker](https://issues.caraus.io/projects/tanya) * [Bug tracker](https://issues.caraus.io/projects/tanya)
* [Documentation](https://docs.caraus.io/tanya)
## Overview ## Overview
@ -51,9 +52,6 @@ needs more test cases, better performance and some additional features
* Tanya is a native D library. * Tanya is a native D library.
* Documentation and usage examples can be found in the source code.
Online documentation will be published soon.
* Tanya is cross-platform. The development happens on a 64-bit Linux, but it * Tanya is cross-platform. The development happens on a 64-bit Linux, but it
is being tested on Windows and FreeBSD as well. is being tested on Windows and FreeBSD as well.

View File

@ -70,14 +70,9 @@ final class EpollLoop : SelectorLoop
* *
* Returns: $(D_KEYWORD true) if the operation was successful. * Returns: $(D_KEYWORD true) if the operation was successful.
*/ */
protected override bool reify(ConnectionWatcher watcher, protected override bool reify(SocketWatcher watcher,
EventMask oldEvents, EventMask oldEvents,
EventMask events) @nogc EventMask events) @nogc
in
{
assert(watcher !is null);
}
body
{ {
int op = EPOLL_CTL_DEL; int op = EPOLL_CTL_DEL;
epoll_event ev; epoll_event ev;
@ -123,15 +118,18 @@ final class EpollLoop : SelectorLoop
for (auto i = 0; i < eventCount; ++i) for (auto i = 0; i < eventCount; ++i)
{ {
auto transport = cast(SelectorStreamTransport) connections[events[i].data.fd]; auto transport = cast(StreamTransport) connections[events[i].data.fd];
if (transport is null) if (transport is null)
{ {
acceptConnections(connections[events[i].data.fd]); auto connection = cast(ConnectionWatcher) connections[events[i].data.fd];
assert(connection !is null);
acceptConnections(connection);
} }
else if (events[i].events & EPOLLERR) else if (events[i].events & EPOLLERR)
{ {
kill(transport, null); kill(transport);
continue; continue;
} }
else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP)) else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))

View File

@ -26,35 +26,87 @@ import core.sys.windows.winbase;
import core.sys.windows.windef; import core.sys.windows.windef;
import core.sys.windows.winsock2; import core.sys.windows.winsock2;
final class IOCPStreamTransport : StreamTransport /**
* Transport for stream sockets.
*/
final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{ {
private SocketException exception;
private ReadBuffer!ubyte output;
private WriteBuffer!ubyte input; private WriteBuffer!ubyte input;
private Protocol protocol_;
private bool closing;
/** /**
* Creates new completion port transport. * Creates new completion port transport.
* *
* Params: * Params:
* socket = Socket. * socket = Socket.
* *
* Precondition: $(D_INLINECODE socket) * Precondition: $(D_INLINECODE socket !is null)
*/ */
this(OverlappedConnectedSocket socket) @nogc this(OverlappedConnectedSocket socket) @nogc
in {
super(socket);
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true;
}
/**
* Returns: Socket.
*
* Postcondition: $(D_INLINECODE socket !is null)
*/
override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc
out (socket)
{ {
assert(socket !is null); assert(socket !is null);
} }
body body
{ {
super(socket); return cast(OverlappedConnectedSocket) socket_;
input = WriteBuffer!ubyte(8192, MmapPool.instance);
} }
/** /**
* Returns: Socket. * Returns $(D_PARAM true) if the transport is closing or closed.
*/ */
override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc bool isClosing() const pure nothrow @safe @nogc
{ {
return cast(OverlappedConnectedSocket) socket_; return closing;
}
/**
* Close the transport.
*
* Buffered data will be flushed. No more data will be received.
*/
void close() pure nothrow @safe @nogc
{
closing = true;
}
/**
* Write some data to the transport.
*
* Params:
* data = Data to send.
*/
void write(ubyte[] data) @nogc
{
input ~= data;
}
/**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
} }
/** /**
@ -78,31 +130,38 @@ final class IOCPStreamTransport : StreamTransport
protocol_ = protocol; protocol_ = protocol;
} }
/** /**
* Write some data to the transport. * Invokes the watcher callback.
*
* Params:
* data = Data to send.
*/ */
void write(ubyte[] data) @nogc override void invoke() @nogc
{ {
immutable empty = input.length == 0; if (output.length)
input ~= data;
if (empty)
{ {
SocketState overlapped; immutable empty = input.length == 0;
try protocol.received(output[0 .. $]);
output.clear();
if (empty)
{ {
overlapped = MmapPool.instance.make!SocketState; SocketState overlapped;
socket.beginSend(input[], overlapped); try
} {
catch (SocketException e) overlapped = MmapPool.instance.make!SocketState;
{ socket.beginSend(input[], overlapped);
MmapPool.instance.dispose(overlapped); }
MmapPool.instance.dispose(e); catch (SocketException e)
{
MmapPool.instance.dispose(overlapped);
MmapPool.instance.dispose(e);
}
} }
} }
else
{
protocol.disconnected(exception);
MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception);
active = false;
}
} }
} }
@ -137,7 +196,7 @@ final class IOCPLoop : Loop
* *
* Returns: $(D_KEYWORD true) if the operation was successful. * Returns: $(D_KEYWORD true) if the operation was successful.
*/ */
override protected bool reify(ConnectionWatcher watcher, override protected bool reify(SocketWatcher watcher,
EventMask oldEvents, EventMask oldEvents,
EventMask events) @nogc EventMask events) @nogc
{ {
@ -170,7 +229,7 @@ final class IOCPLoop : Loop
if (!(oldEvents & Event.read) && (events & Event.read) if (!(oldEvents & Event.read) && (events & Event.read)
|| !(oldEvents & Event.write) && (events & Event.write)) || !(oldEvents & Event.write) && (events & Event.write))
{ {
auto transport = cast(IOCPStreamTransport) watcher; auto transport = cast(StreamTransport) watcher;
assert(transport !is null); assert(transport !is null);
if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle, if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
@ -200,6 +259,20 @@ final class IOCPLoop : Loop
return true; return true;
} }
private void kill(StreamTransport transport,
SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{
transport.socket.shutdown();
defaultAllocator.dispose(transport.socket);
transport.exception = exception;
pendings.enqueue(transport);
}
/** /**
* Does the actual polling. * Does the actual polling.
*/ */
@ -237,7 +310,7 @@ final class IOCPLoop : Loop
assert(listener !is null); assert(listener !is null);
auto socket = listener.endAccept(overlapped); auto socket = listener.endAccept(overlapped);
auto transport = MmapPool.instance.make!IOCPStreamTransport(socket); auto transport = MmapPool.instance.make!StreamTransport(socket);
connection.incoming.enqueue(transport); connection.incoming.enqueue(transport);
@ -247,7 +320,7 @@ final class IOCPLoop : Loop
listener.beginAccept(overlapped); listener.beginAccept(overlapped);
break; break;
case OverlappedSocketEvent.read: case OverlappedSocketEvent.read:
auto transport = cast(IOCPStreamTransport) (cast(void*) key); auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null); assert(transport !is null);
if (!transport.active) if (!transport.active)
@ -269,7 +342,7 @@ final class IOCPLoop : Loop
} }
if (transport.socket.disconnected) if (transport.socket.disconnected)
{ {
// We want to get one last notification to destroy the watcher // We want to get one last notification to destroy the watcher.
transport.socket.beginReceive(transport.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
kill(transport, exception); kill(transport, exception);
} }
@ -278,7 +351,7 @@ final class IOCPLoop : Loop
immutable full = transport.output.free == received; immutable full = transport.output.free == received;
transport.output += received; transport.output += received;
// Receive was interrupted because the buffer is full. We have to continue // Receive was interrupted because the buffer is full. We have to continue.
if (full) if (full)
{ {
transport.socket.beginReceive(transport.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
@ -287,21 +360,25 @@ final class IOCPLoop : Loop
} }
break; break;
case OverlappedSocketEvent.write: case OverlappedSocketEvent.write:
auto transport = cast(IOCPStreamTransport) (cast(void*) key); auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null); assert(transport !is null);
transport.input += transport.socket.endSend(overlapped); transport.input += transport.socket.endSend(overlapped);
if (transport.input.length) if (transport.input.length > 0)
{ {
transport.socket.beginSend(transport.input[], overlapped); transport.socket.beginSend(transport.input[], overlapped);
} }
else else
{ {
transport.socket.beginReceive(transport.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
if (transport.isClosing())
{
kill(transport);
}
} }
break; break;
default: default:
assert(false, "Unknown event"); assert(false, "Unknown event");
} }
} }
} }

View File

@ -177,7 +177,7 @@ final class KqueueLoop : SelectorLoop
* *
* Returns: $(D_KEYWORD true) if the operation was successful. * Returns: $(D_KEYWORD true) if the operation was successful.
*/ */
override protected bool reify(ConnectionWatcher watcher, override protected bool reify(SocketWatcher watcher,
EventMask oldEvents, EventMask oldEvents,
EventMask events) @nogc EventMask events) @nogc
{ {
@ -237,15 +237,18 @@ final class KqueueLoop : SelectorLoop
{ {
assert(connections.length > events[i].ident); assert(connections.length > events[i].ident);
auto transport = cast(SelectorStreamTransport) connections[events[i].ident]; auto transport = cast(StreamTransport) connections[events[i].ident];
// If it is a ConnectionWatcher. Accept connections. // If it is a ConnectionWatcher. Accept connections.
if (transport is null) if (transport is null)
{ {
acceptConnections(connections[events[i].ident]); auto connection = cast(ConnectionWatcher) connections[events[i].ident];
assert(connection !is null);
acceptConnections(connection);
} }
else if (events[i].flags & EV_ERROR) else if (events[i].flags & EV_ERROR)
{ {
kill(transport, null); kill(transport);
} }
else if (events[i].filter == EVFILT_READ) else if (events[i].filter == EVFILT_READ)
{ {
@ -303,9 +306,9 @@ final class KqueueLoop : SelectorLoop
* *
* Returns: $(D_KEYWORD true) if the operation could be successfully * Returns: $(D_KEYWORD true) if the operation could be successfully
* completed or scheduled, $(D_KEYWORD false) otherwise (the * completed or scheduled, $(D_KEYWORD false) otherwise (the
* transport is be destroyed then). * transport will be destroyed then).
*/ */
protected override bool feed(SelectorStreamTransport transport, protected override bool feed(StreamTransport transport,
SocketException exception = null) @nogc SocketException exception = null) @nogc
{ {
if (!super.feed(transport, exception)) if (!super.feed(transport, exception))

View File

@ -25,20 +25,27 @@ import tanya.network.socket;
/** /**
* Transport for stream sockets. * Transport for stream sockets.
*/ */
final class SelectorStreamTransport : IOWatcher, StreamTransport package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{ {
/// Input buffer. private SelectorLoop loop;
private SocketException exception;
package ReadBuffer!ubyte output;
package WriteBuffer!ubyte input; package WriteBuffer!ubyte input;
private SelectorLoop loop; private Protocol protocol_;
private bool closing;
/// Received notification that the underlying socket is write-ready. /// Received notification that the underlying socket is write-ready.
package bool writeReady; package bool writeReady;
/** /**
* Params: * Params:
* loop = Event loop. * loop = Event loop.
* socket = Socket. * socket = Socket.
* *
* Precondition: $(D_INLINECODE loop !is null && socket !is null) * Precondition: $(D_INLINECODE loop !is null && socket !is null)
*/ */
@ -46,19 +53,27 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
in in
{ {
assert(loop !is null); assert(loop !is null);
assert(socket !is null);
} }
body body
{ {
super(socket); super(socket);
this.loop = loop; this.loop = loop;
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance); input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true;
} }
/** /**
* Returns: Socket. * Returns: Socket.
*
* Postcondition: $(D_INLINECODE socket !is null)
*/ */
override @property ConnectedSocket socket() pure nothrow @safe @nogc override @property ConnectedSocket socket() pure nothrow @safe @nogc
out (socket)
{
assert(socket !is null);
}
body
{ {
return cast(ConnectedSocket) socket_; return cast(ConnectedSocket) socket_;
} }
@ -73,6 +88,14 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
socket_ = socket; socket_ = socket;
} }
/**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
}
/** /**
* Switches the protocol. * Switches the protocol.
* *
@ -94,6 +117,48 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
protocol_ = protocol; protocol_ = protocol;
} }
/**
* Returns $(D_PARAM true) if the transport is closing or closed.
*/
bool isClosing() const pure nothrow @safe @nogc
{
return closing;
}
/**
* Close the transport.
*
* Buffered data will be flushed. No more data will be received.
*/
void close() @nogc
{
closing = true;
loop.reify(this, EventMask(Event.read, Event.write), EventMask(Event.write));
}
/**
* Invokes the watcher callback.
*/
override void invoke() @nogc
{
if (output.length)
{
protocol.received(output[0 .. $]);
output.clear();
if (isClosing() && input.length == 0)
{
loop.kill(this);
}
}
else
{
protocol.disconnected(exception);
MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception);
active = false;
}
}
/** /**
* Write some data to the transport. * Write some data to the transport.
* *
@ -140,29 +205,62 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
abstract class SelectorLoop : Loop abstract class SelectorLoop : Loop
{ {
/// Pending connections. /// Pending connections.
protected Vector!ConnectionWatcher connections; protected Vector!SocketWatcher connections;
this() @nogc this() @nogc
{ {
super(); super();
connections = Vector!ConnectionWatcher(maxEvents, MmapPool.instance); connections = Vector!SocketWatcher(maxEvents, MmapPool.instance);
} }
~this() @nogc ~this() @nogc
{ {
foreach (ref connection; connections) foreach (ref connection; connections)
{ {
// We want to free only IOWatchers. ConnectionWatcher are created by the // We want to free only the transports. ConnectionWatcher are created by the
// user and should be freed by himself. // user and should be freed by himself.
auto io = cast(IOWatcher) connection; if (cast(StreamTransport) connection !is null)
if (io !is null)
{ {
MmapPool.instance.dispose(io); MmapPool.instance.dispose(connection);
connection = null;
} }
} }
} }
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
override abstract protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc;
/**
* Kills the watcher and closes the connection.
*
* Params:
* transport = Transport.
* exception = Occurred exception.
*/
protected void kill(StreamTransport transport,
SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{
transport.socket.shutdown();
defaultAllocator.dispose(transport.socket);
transport.exception = exception;
pendings.enqueue(transport);
}
/** /**
* If the transport couldn't send the data, the further sending should * If the transport couldn't send the data, the further sending should
* be handled by the event loop. * be handled by the event loop.
@ -175,8 +273,13 @@ abstract class SelectorLoop : Loop
* completed or scheduled, $(D_KEYWORD false) otherwise (the * completed or scheduled, $(D_KEYWORD false) otherwise (the
* transport will be destroyed then). * transport will be destroyed then).
*/ */
protected bool feed(SelectorStreamTransport transport, protected bool feed(StreamTransport transport,
SocketException exception = null) @nogc SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{ {
while (transport.input.length && transport.writeReady) while (transport.input.length && transport.writeReady)
{ {
@ -200,12 +303,13 @@ abstract class SelectorLoop : Loop
} }
if (exception !is null) if (exception !is null)
{ {
auto watcher = cast(IOWatcher) connections[transport.socket.handle]; kill(transport, exception);
assert(watcher !is null);
kill(watcher, exception);
return false; return false;
} }
if (transport.input.length == 0 && transport.isClosing())
{
kill(transport);
}
return true; return true;
} }
@ -261,11 +365,11 @@ abstract class SelectorLoop : Loop
break; break;
} }
SelectorStreamTransport transport; StreamTransport transport;
if (connections.length > client.handle) if (connections.length > client.handle)
{ {
transport = cast(SelectorStreamTransport) connections[client.handle]; transport = cast(StreamTransport) connections[client.handle];
} }
else else
{ {
@ -273,7 +377,7 @@ abstract class SelectorLoop : Loop
} }
if (transport is null) if (transport is null)
{ {
transport = MmapPool.instance.make!SelectorStreamTransport(this, client); transport = MmapPool.instance.make!StreamTransport(this, client);
connections[client.handle] = transport; connections[client.handle] = transport;
} }
else else

View File

@ -234,9 +234,9 @@ abstract class Loop
* *
* Returns: $(D_KEYWORD true) if the operation was successful. * Returns: $(D_KEYWORD true) if the operation was successful.
*/ */
abstract protected bool reify(ConnectionWatcher watcher, abstract protected bool reify(SocketWatcher watcher,
EventMask oldEvents, EventMask oldEvents,
EventMask events) @nogc; EventMask events) @nogc;
/** /**
* Returns: The blocking time. * Returns: The blocking time.
@ -266,26 +266,6 @@ abstract class Loop
blockTime_ = blockTime; blockTime_ = blockTime;
} }
/**
* Kills the watcher and closes the connection.
*
* Params:
* watcher = Watcher.
* exception = Occurred exception.
*/
protected void kill(IOWatcher watcher, SocketException exception) @nogc
in
{
assert(watcher !is null);
}
body
{
watcher.socket.shutdown();
defaultAllocator.dispose(watcher.socket);
watcher.exception = exception;
pendings.enqueue(watcher);
}
/** /**
* Does the actual polling. * Does the actual polling.
*/ */

View File

@ -73,6 +73,19 @@ interface DuplexTransport : ReadTransport, WriteTransport
{ {
assert(protocol !is null); assert(protocol !is null);
} }
/**
* Returns $(D_PARAM true) if the transport is closing or closed.
*/
bool isClosing() const pure nothrow @safe @nogc;
/**
* Close the transport.
*
* Buffered data will be flushed. No more data will be received.
*/
void close() @nogc;
} }
/** /**
@ -85,10 +98,3 @@ interface SocketTransport : Transport
*/ */
@property Socket socket() pure nothrow @safe @nogc; @property Socket socket() pure nothrow @safe @nogc;
} }
/**
* Represents a connection-oriented socket transport.
*/
package interface StreamTransport : DuplexTransport, SocketTransport
{
}

View File

@ -36,15 +36,48 @@ abstract class Watcher
void invoke() @nogc; void invoke() @nogc;
} }
class ConnectionWatcher : Watcher /**
* Socket watcher.
*/
abstract class SocketWatcher : Watcher
{ {
/// Watched socket. /// Watched socket.
protected Socket socket_; protected Socket socket_;
/// Protocol factory. /**
protected Protocol delegate() @nogc protocolFactory; * Params:
* socket = Socket.
*
* Precondition: $(D_INLINECODE socket !is null)
*/
this(Socket socket) pure nothrow @safe @nogc
in
{
assert(socket !is null);
}
body
{
socket_ = socket;
}
package Queue!DuplexTransport incoming; /**
* Returns: Socket.
*/
@property Socket socket() pure nothrow @safe @nogc
{
return socket_;
}
}
/**
* Connection watcher.
*/
class ConnectionWatcher : SocketWatcher
{
/// Incoming connection queue.
Queue!DuplexTransport incoming;
private Protocol delegate() @nogc protocolFactory;
/** /**
* Params: * Params:
@ -52,19 +85,8 @@ class ConnectionWatcher : Watcher
*/ */
this(Socket socket) @nogc this(Socket socket) @nogc
{ {
super(socket);
incoming = Queue!DuplexTransport(MmapPool.instance); incoming = Queue!DuplexTransport(MmapPool.instance);
socket_ = socket;
}
/**
* Destroys the watcher.
*/
~this() @nogc
{
foreach (w; incoming)
{
MmapPool.instance.dispose(w);
}
} }
/** /**
@ -76,14 +98,6 @@ class ConnectionWatcher : Watcher
this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P; this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
} }
/**
* Returns: Socket.
*/
@property Socket socket() pure nothrow @safe @nogc
{
return socket_;
}
/** /**
* Invokes new connection callback. * Invokes new connection callback.
*/ */
@ -101,87 +115,3 @@ class ConnectionWatcher : Watcher
} }
} }
} }
/**
* Contains a pending watcher with the invoked events or a transport can be
* read from.
*/
class IOWatcher : ConnectionWatcher
{
package SocketException exception;
protected Protocol protocol_;
/**
* Returns: Underlying output buffer.
*/
package ReadBuffer!ubyte output;
/**
* Params:
* socket = Socket.
*
* Precondition: $(D_INLINECODE socket !is null)
*/
this(ConnectedSocket socket) @nogc
in
{
assert(socket !is null);
}
body
{
super(socket);
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
active = true;
}
/**
* Destroys the watcher.
*/
~this() @nogc
{
MmapPool.instance.dispose(protocol_);
}
/**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
}
/**
* Returns: Socket.
*
* Precondition: $(D_INLINECODE socket !is null)
*/
override @property ConnectedSocket socket() pure nothrow @safe @nogc
out (socket)
{
assert(socket !is null);
}
body
{
return cast(ConnectedSocket) socket_;
}
/**
* Invokes the watcher callback.
*/
override void invoke() @nogc
{
if (output.length)
{
protocol.received(output[0 .. $]);
output.clear();
}
else
{
protocol.disconnected(exception);
MmapPool.instance.dispose(protocol);
defaultAllocator.dispose(exception);
active = false;
}
}
}