summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEugen Wissner <belka@caraus.de>2017-02-20 08:02:01 +0100
committerEugen Wissner <belka@caraus.de>2017-02-20 08:02:01 +0100
commit074d02762927f706a4e02002a785ef1c7492e0ab (patch)
tree6b448351826de34f1d76a187f0ad269bb5fadd6d
parentf4b90d8b511c12324e6801386bd9f2d822a547dc (diff)
parent70632d975da09471d0fac5df51a96bd3d9bbeb6f (diff)
downloadtanya-074d02762927f706a4e02002a785ef1c7492e0ab.tar.gz
Merge branch 'master' into utf8string
-rw-r--r--README.md4
-rw-r--r--source/tanya/async/event/epoll.d16
-rw-r--r--source/tanya/async/event/iocp.d145
-rw-r--r--source/tanya/async/event/kqueue.d15
-rw-r--r--source/tanya/async/event/selector.d146
-rw-r--r--source/tanya/async/loop.d26
-rw-r--r--source/tanya/async/transport.d20
-rw-r--r--source/tanya/async/watcher.d128
8 files changed, 298 insertions, 202 deletions
diff --git a/README.md b/README.md
index d7da98a..ffb554c 100644
--- a/README.md
+++ b/README.md
@@ -14,6 +14,7 @@ Tanya extends Phobos functionality and provides alternative implementations for
data structures and utilities that depend on the Garbage Collector in Phobos.
* [Bug tracker](https://issues.caraus.io/projects/tanya)
+* [Documentation](https://docs.caraus.io/tanya)
## Overview
@@ -51,9 +52,6 @@ needs more test cases, better performance and some additional features
* Tanya is a native D library.
-* Documentation and usage examples can be found in the source code.
-Online documentation will be published soon.
-
* Tanya is cross-platform. The development happens on a 64-bit Linux, but it
is being tested on Windows and FreeBSD as well.
diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d
index 998ad18..c10b241 100644
--- a/source/tanya/async/event/epoll.d
+++ b/source/tanya/async/event/epoll.d
@@ -70,14 +70,9 @@ final class EpollLoop : SelectorLoop
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
- protected override bool reify(ConnectionWatcher watcher,
+ protected override bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
- in
- {
- assert(watcher !is null);
- }
- body
{
int op = EPOLL_CTL_DEL;
epoll_event ev;
@@ -123,15 +118,18 @@ final class EpollLoop : SelectorLoop
for (auto i = 0; i < eventCount; ++i)
{
- auto transport = cast(SelectorStreamTransport) connections[events[i].data.fd];
+ auto transport = cast(StreamTransport) connections[events[i].data.fd];
if (transport is null)
{
- acceptConnections(connections[events[i].data.fd]);
+ auto connection = cast(ConnectionWatcher) connections[events[i].data.fd];
+ assert(connection !is null);
+
+ acceptConnections(connection);
}
else if (events[i].events & EPOLLERR)
{
- kill(transport, null);
+ kill(transport);
continue;
}
else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d
index 31e57c8..26678e1 100644
--- a/source/tanya/async/event/iocp.d
+++ b/source/tanya/async/event/iocp.d
@@ -26,38 +26,90 @@ import core.sys.windows.winbase;
import core.sys.windows.windef;
import core.sys.windows.winsock2;
-final class IOCPStreamTransport : StreamTransport
+/**
+ * Transport for stream sockets.
+ */
+final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{
+ private SocketException exception;
+
+ private ReadBuffer!ubyte output;
+
private WriteBuffer!ubyte input;
+ private Protocol protocol_;
+
+ private bool closing;
+
/**
* Creates new completion port transport.
*
* Params:
* socket = Socket.
*
- * Precondition: $(D_INLINECODE socket)
+ * Precondition: $(D_INLINECODE socket !is null)
*/
this(OverlappedConnectedSocket socket) @nogc
- in
- {
- assert(socket !is null);
- }
- body
{
super(socket);
+ output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
+ active = true;
}
/**
* Returns: Socket.
+ *
+ * Postcondition: $(D_INLINECODE socket !is null)
*/
override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc
+ out (socket)
+ {
+ assert(socket !is null);
+ }
+ body
{
return cast(OverlappedConnectedSocket) socket_;
}
/**
+ * Returns $(D_PARAM true) if the transport is closing or closed.
+ */
+ bool isClosing() const pure nothrow @safe @nogc
+ {
+ return closing;
+ }
+
+ /**
+ * Close the transport.
+ *
+ * Buffered data will be flushed. No more data will be received.
+ */
+ void close() pure nothrow @safe @nogc
+ {
+ closing = true;
+ }
+
+ /**
+ * Write some data to the transport.
+ *
+ * Params:
+ * data = Data to send.
+ */
+ void write(ubyte[] data) @nogc
+ {
+ input ~= data;
+ }
+
+ /**
+ * Returns: Application protocol.
+ */
+ @property Protocol protocol() pure nothrow @safe @nogc
+ {
+ return protocol_;
+ }
+
+ /**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
@@ -78,31 +130,38 @@ final class IOCPStreamTransport : StreamTransport
protocol_ = protocol;
}
-
/**
- * Write some data to the transport.
- *
- * Params:
- * data = Data to send.
+ * Invokes the watcher callback.
*/
- void write(ubyte[] data) @nogc
+ override void invoke() @nogc
{
- immutable empty = input.length == 0;
- input ~= data;
- if (empty)
+ if (output.length)
{
- SocketState overlapped;
- try
+ immutable empty = input.length == 0;
+ protocol.received(output[0 .. $]);
+ output.clear();
+ if (empty)
{
- overlapped = MmapPool.instance.make!SocketState;
- socket.beginSend(input[], overlapped);
- }
- catch (SocketException e)
- {
- MmapPool.instance.dispose(overlapped);
- MmapPool.instance.dispose(e);
+ SocketState overlapped;
+ try
+ {
+ overlapped = MmapPool.instance.make!SocketState;
+ socket.beginSend(input[], overlapped);
+ }
+ catch (SocketException e)
+ {
+ MmapPool.instance.dispose(overlapped);
+ MmapPool.instance.dispose(e);
+ }
}
}
+ else
+ {
+ protocol.disconnected(exception);
+ MmapPool.instance.dispose(protocol_);
+ defaultAllocator.dispose(exception);
+ active = false;
+ }
}
}
@@ -137,7 +196,7 @@ final class IOCPLoop : Loop
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
- override protected bool reify(ConnectionWatcher watcher,
+ override protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
{
@@ -170,7 +229,7 @@ final class IOCPLoop : Loop
if (!(oldEvents & Event.read) && (events & Event.read)
|| !(oldEvents & Event.write) && (events & Event.write))
{
- auto transport = cast(IOCPStreamTransport) watcher;
+ auto transport = cast(StreamTransport) watcher;
assert(transport !is null);
if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
@@ -200,6 +259,20 @@ final class IOCPLoop : Loop
return true;
}
+ private void kill(StreamTransport transport,
+ SocketException exception = null) @nogc
+ in
+ {
+ assert(transport !is null);
+ }
+ body
+ {
+ transport.socket.shutdown();
+ defaultAllocator.dispose(transport.socket);
+ transport.exception = exception;
+ pendings.enqueue(transport);
+ }
+
/**
* Does the actual polling.
*/
@@ -237,7 +310,7 @@ final class IOCPLoop : Loop
assert(listener !is null);
auto socket = listener.endAccept(overlapped);
- auto transport = MmapPool.instance.make!IOCPStreamTransport(socket);
+ auto transport = MmapPool.instance.make!StreamTransport(socket);
connection.incoming.enqueue(transport);
@@ -247,7 +320,7 @@ final class IOCPLoop : Loop
listener.beginAccept(overlapped);
break;
case OverlappedSocketEvent.read:
- auto transport = cast(IOCPStreamTransport) (cast(void*) key);
+ auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null);
if (!transport.active)
@@ -269,7 +342,7 @@ final class IOCPLoop : Loop
}
if (transport.socket.disconnected)
{
- // We want to get one last notification to destroy the watcher
+ // We want to get one last notification to destroy the watcher.
transport.socket.beginReceive(transport.output[], overlapped);
kill(transport, exception);
}
@@ -278,7 +351,7 @@ final class IOCPLoop : Loop
immutable full = transport.output.free == received;
transport.output += received;
- // Receive was interrupted because the buffer is full. We have to continue
+ // Receive was interrupted because the buffer is full. We have to continue.
if (full)
{
transport.socket.beginReceive(transport.output[], overlapped);
@@ -287,21 +360,25 @@ final class IOCPLoop : Loop
}
break;
case OverlappedSocketEvent.write:
- auto transport = cast(IOCPStreamTransport) (cast(void*) key);
+ auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null);
transport.input += transport.socket.endSend(overlapped);
- if (transport.input.length)
+ if (transport.input.length > 0)
{
transport.socket.beginSend(transport.input[], overlapped);
}
else
{
transport.socket.beginReceive(transport.output[], overlapped);
+ if (transport.isClosing())
+ {
+ kill(transport);
+ }
}
break;
default:
assert(false, "Unknown event");
}
}
-}
+} \ No newline at end of file
diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d
index e52a737..8673ff7 100644
--- a/source/tanya/async/event/kqueue.d
+++ b/source/tanya/async/event/kqueue.d
@@ -177,7 +177,7 @@ final class KqueueLoop : SelectorLoop
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
- override protected bool reify(ConnectionWatcher watcher,
+ override protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
{
@@ -237,15 +237,18 @@ final class KqueueLoop : SelectorLoop
{
assert(connections.length > events[i].ident);
- auto transport = cast(SelectorStreamTransport) connections[events[i].ident];
+ auto transport = cast(StreamTransport) connections[events[i].ident];
// If it is a ConnectionWatcher. Accept connections.
if (transport is null)
{
- acceptConnections(connections[events[i].ident]);
+ auto connection = cast(ConnectionWatcher) connections[events[i].ident];
+ assert(connection !is null);
+
+ acceptConnections(connection);
}
else if (events[i].flags & EV_ERROR)
{
- kill(transport, null);
+ kill(transport);
}
else if (events[i].filter == EVFILT_READ)
{
@@ -303,9 +306,9 @@ final class KqueueLoop : SelectorLoop
*
* Returns: $(D_KEYWORD true) if the operation could be successfully
* completed or scheduled, $(D_KEYWORD false) otherwise (the
- * transport is be destroyed then).
+ * transport will be destroyed then).
*/
- protected override bool feed(SelectorStreamTransport transport,
+ protected override bool feed(StreamTransport transport,
SocketException exception = null) @nogc
{
if (!super.feed(transport, exception))
diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d
index 83a9fce..13254ce 100644
--- a/source/tanya/async/event/selector.d
+++ b/source/tanya/async/event/selector.d
@@ -25,20 +25,27 @@ import tanya.network.socket;
/**
* Transport for stream sockets.
*/
-final class SelectorStreamTransport : IOWatcher, StreamTransport
+package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{
- /// Input buffer.
+ private SelectorLoop loop;
+
+ private SocketException exception;
+
+ package ReadBuffer!ubyte output;
+
package WriteBuffer!ubyte input;
- private SelectorLoop loop;
+ private Protocol protocol_;
+
+ private bool closing;
/// Received notification that the underlying socket is write-ready.
package bool writeReady;
/**
* Params:
- * loop = Event loop.
- * socket = Socket.
+ * loop = Event loop.
+ * socket = Socket.
*
* Precondition: $(D_INLINECODE loop !is null && socket !is null)
*/
@@ -46,19 +53,27 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
in
{
assert(loop !is null);
- assert(socket !is null);
}
body
{
super(socket);
this.loop = loop;
+ output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
+ active = true;
}
/**
* Returns: Socket.
+ *
+ * Postcondition: $(D_INLINECODE socket !is null)
*/
override @property ConnectedSocket socket() pure nothrow @safe @nogc
+ out (socket)
+ {
+ assert(socket !is null);
+ }
+ body
{
return cast(ConnectedSocket) socket_;
}
@@ -74,6 +89,14 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
}
/**
+ * Returns: Application protocol.
+ */
+ @property Protocol protocol() pure nothrow @safe @nogc
+ {
+ return protocol_;
+ }
+
+ /**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
@@ -95,6 +118,48 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
}
/**
+ * Returns $(D_PARAM true) if the transport is closing or closed.
+ */
+ bool isClosing() const pure nothrow @safe @nogc
+ {
+ return closing;
+ }
+
+ /**
+ * Close the transport.
+ *
+ * Buffered data will be flushed. No more data will be received.
+ */
+ void close() @nogc
+ {
+ closing = true;
+ loop.reify(this, EventMask(Event.read, Event.write), EventMask(Event.write));
+ }
+
+ /**
+ * Invokes the watcher callback.
+ */
+ override void invoke() @nogc
+ {
+ if (output.length)
+ {
+ protocol.received(output[0 .. $]);
+ output.clear();
+ if (isClosing() && input.length == 0)
+ {
+ loop.kill(this);
+ }
+ }
+ else
+ {
+ protocol.disconnected(exception);
+ MmapPool.instance.dispose(protocol_);
+ defaultAllocator.dispose(exception);
+ active = false;
+ }
+ }
+
+ /**
* Write some data to the transport.
*
* Params:
@@ -140,30 +205,63 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
abstract class SelectorLoop : Loop
{
/// Pending connections.
- protected Vector!ConnectionWatcher connections;
+ protected Vector!SocketWatcher connections;
this() @nogc
{
super();
- connections = Vector!ConnectionWatcher(maxEvents, MmapPool.instance);
+ connections = Vector!SocketWatcher(maxEvents, MmapPool.instance);
}
~this() @nogc
{
foreach (ref connection; connections)
{
- // We want to free only IOWatchers. ConnectionWatcher are created by the
+ // We want to free only the transports. ConnectionWatcher are created by the
// user and should be freed by himself.
- auto io = cast(IOWatcher) connection;
- if (io !is null)
+ if (cast(StreamTransport) connection !is null)
{
- MmapPool.instance.dispose(io);
- connection = null;
+ MmapPool.instance.dispose(connection);
}
}
}
/**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ override abstract protected bool reify(SocketWatcher watcher,
+ EventMask oldEvents,
+ EventMask events) @nogc;
+
+ /**
+ * Kills the watcher and closes the connection.
+ *
+ * Params:
+ * transport = Transport.
+ * exception = Occurred exception.
+ */
+ protected void kill(StreamTransport transport,
+ SocketException exception = null) @nogc
+ in
+ {
+ assert(transport !is null);
+ }
+ body
+ {
+ transport.socket.shutdown();
+ defaultAllocator.dispose(transport.socket);
+ transport.exception = exception;
+ pendings.enqueue(transport);
+ }
+
+ /**
* If the transport couldn't send the data, the further sending should
* be handled by the event loop.
*
@@ -175,8 +273,13 @@ abstract class SelectorLoop : Loop
* completed or scheduled, $(D_KEYWORD false) otherwise (the
* transport will be destroyed then).
*/
- protected bool feed(SelectorStreamTransport transport,
+ protected bool feed(StreamTransport transport,
SocketException exception = null) @nogc
+ in
+ {
+ assert(transport !is null);
+ }
+ body
{
while (transport.input.length && transport.writeReady)
{
@@ -200,12 +303,13 @@ abstract class SelectorLoop : Loop
}
if (exception !is null)
{
- auto watcher = cast(IOWatcher) connections[transport.socket.handle];
- assert(watcher !is null);
-
- kill(watcher, exception);
+ kill(transport, exception);
return false;
}
+ if (transport.input.length == 0 && transport.isClosing())
+ {
+ kill(transport);
+ }
return true;
}
@@ -261,11 +365,11 @@ abstract class SelectorLoop : Loop
break;
}
- SelectorStreamTransport transport;
+ StreamTransport transport;
if (connections.length > client.handle)
{
- transport = cast(SelectorStreamTransport) connections[client.handle];
+ transport = cast(StreamTransport) connections[client.handle];
}
else
{
@@ -273,7 +377,7 @@ abstract class SelectorLoop : Loop
}
if (transport is null)
{
- transport = MmapPool.instance.make!SelectorStreamTransport(this, client);
+ transport = MmapPool.instance.make!StreamTransport(this, client);
connections[client.handle] = transport;
}
else
diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d
index b7854a6..54b5238 100644
--- a/source/tanya/async/loop.d
+++ b/source/tanya/async/loop.d
@@ -234,9 +234,9 @@ abstract class Loop
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
- abstract protected bool reify(ConnectionWatcher watcher,
- EventMask oldEvents,
- EventMask events) @nogc;
+ abstract protected bool reify(SocketWatcher watcher,
+ EventMask oldEvents,
+ EventMask events) @nogc;
/**
* Returns: The blocking time.
@@ -267,26 +267,6 @@ abstract class Loop
}
/**
- * Kills the watcher and closes the connection.
- *
- * Params:
- * watcher = Watcher.
- * exception = Occurred exception.
- */
- protected void kill(IOWatcher watcher, SocketException exception) @nogc
- in
- {
- assert(watcher !is null);
- }
- body
- {
- watcher.socket.shutdown();
- defaultAllocator.dispose(watcher.socket);
- watcher.exception = exception;
- pendings.enqueue(watcher);
- }
-
- /**
* Does the actual polling.
*/
abstract protected void poll() @nogc;
diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d
index b06bec7..4550522 100644
--- a/source/tanya/async/transport.d
+++ b/source/tanya/async/transport.d
@@ -73,6 +73,19 @@ interface DuplexTransport : ReadTransport, WriteTransport
{
assert(protocol !is null);
}
+
+
+ /**
+ * Returns $(D_PARAM true) if the transport is closing or closed.
+ */
+ bool isClosing() const pure nothrow @safe @nogc;
+
+ /**
+ * Close the transport.
+ *
+ * Buffered data will be flushed. No more data will be received.
+ */
+ void close() @nogc;
}
/**
@@ -85,10 +98,3 @@ interface SocketTransport : Transport
*/
@property Socket socket() pure nothrow @safe @nogc;
}
-
-/**
- * Represents a connection-oriented socket transport.
- */
-package interface StreamTransport : DuplexTransport, SocketTransport
-{
-}
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d
index 37742e2..9756d99 100644
--- a/source/tanya/async/watcher.d
+++ b/source/tanya/async/watcher.d
@@ -36,44 +36,28 @@ abstract class Watcher
void invoke() @nogc;
}
-class ConnectionWatcher : Watcher
+/**
+ * Socket watcher.
+ */
+abstract class SocketWatcher : Watcher
{
/// Watched socket.
protected Socket socket_;
- /// Protocol factory.
- protected Protocol delegate() @nogc protocolFactory;
-
- package Queue!DuplexTransport incoming;
-
/**
* Params:
* socket = Socket.
+ *
+ * Precondition: $(D_INLINECODE socket !is null)
*/
- this(Socket socket) @nogc
- {
- incoming = Queue!DuplexTransport(MmapPool.instance);
- socket_ = socket;
- }
-
- /**
- * Destroys the watcher.
- */
- ~this() @nogc
+ this(Socket socket) pure nothrow @safe @nogc
+ in
{
- foreach (w; incoming)
- {
- MmapPool.instance.dispose(w);
- }
+ assert(socket !is null);
}
-
- /**
- * Params:
- * P = Protocol should be used.
- */
- void setProtocol(P : Protocol)() @nogc
+ body
{
- this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
+ socket_ = socket;
}
/**
@@ -83,105 +67,51 @@ class ConnectionWatcher : Watcher
{
return socket_;
}
-
- /**
- * Invokes new connection callback.
- */
- override void invoke() @nogc
- in
- {
- assert(protocolFactory !is null, "Protocol isn't set.");
- }
- body
- {
- foreach (transport; incoming)
- {
- transport.protocol = protocolFactory();
- transport.protocol.connected(transport);
- }
- }
}
/**
- * Contains a pending watcher with the invoked events or a transport can be
- * read from.
+ * Connection watcher.
*/
-class IOWatcher : ConnectionWatcher
+class ConnectionWatcher : SocketWatcher
{
- package SocketException exception;
-
- protected Protocol protocol_;
+ /// Incoming connection queue.
+ Queue!DuplexTransport incoming;
- /**
- * Returns: Underlying output buffer.
- */
- package ReadBuffer!ubyte output;
+ private Protocol delegate() @nogc protocolFactory;
/**
* Params:
* socket = Socket.
- *
- * Precondition: $(D_INLINECODE socket !is null)
*/
- this(ConnectedSocket socket) @nogc
- in
- {
- assert(socket !is null);
- }
- body
+ this(Socket socket) @nogc
{
super(socket);
- output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
- active = true;
- }
-
- /**
- * Destroys the watcher.
- */
- ~this() @nogc
- {
- MmapPool.instance.dispose(protocol_);
+ incoming = Queue!DuplexTransport(MmapPool.instance);
}
/**
- * Returns: Application protocol.
+ * Params:
+ * P = Protocol should be used.
*/
- @property Protocol protocol() pure nothrow @safe @nogc
+ void setProtocol(P : Protocol)() @nogc
{
- return protocol_;
+ this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
}
/**
- * Returns: Socket.
- *
- * Precondition: $(D_INLINECODE socket !is null)
+ * Invokes new connection callback.
*/
- override @property ConnectedSocket socket() pure nothrow @safe @nogc
- out (socket)
+ override void invoke() @nogc
+ in
{
- assert(socket !is null);
+ assert(protocolFactory !is null, "Protocol isn't set.");
}
body
{
- return cast(ConnectedSocket) socket_;
- }
-
- /**
- * Invokes the watcher callback.
- */
- override void invoke() @nogc
- {
- if (output.length)
- {
- protocol.received(output[0 .. $]);
- output.clear();
- }
- else
+ foreach (transport; incoming)
{
- protocol.disconnected(exception);
- MmapPool.instance.dispose(protocol);
- defaultAllocator.dispose(exception);
- active = false;
+ transport.protocol = protocolFactory();
+ transport.protocol.connected(transport);
}
}
}