summaryrefslogtreecommitdiff
path: root/source
diff options
context:
space:
mode:
authorEugen Wissner <belka@caraus.de>2017-02-10 22:30:12 +0100
committerEugen Wissner <belka@caraus.de>2017-02-10 22:30:12 +0100
commita012ca40037566b223dbb9d40480d801282f449a (patch)
treea6e420d99bfcbb32af43848c9220f3d38a276064 /source
parentb74e5aa4ee57af17450d7ef63547002d99dffdbe (diff)
downloadtanya-a012ca40037566b223dbb9d40480d801282f449a.tar.gz
Remove StreamTransport interface
Implement DuplexTransport and SocketTransport separately.
Diffstat (limited to 'source')
-rw-r--r--source/tanya/async/event/epoll.d16
-rw-r--r--source/tanya/async/event/iocp.d50
-rw-r--r--source/tanya/async/event/kqueue.d15
-rw-r--r--source/tanya/async/event/selector.d70
-rw-r--r--source/tanya/async/loop.d10
-rw-r--r--source/tanya/async/transport.d7
-rw-r--r--source/tanya/async/watcher.d104
7 files changed, 112 insertions, 160 deletions
diff --git a/source/tanya/async/event/epoll.d b/source/tanya/async/event/epoll.d
index 998ad18..c10b241 100644
--- a/source/tanya/async/event/epoll.d
+++ b/source/tanya/async/event/epoll.d
@@ -70,14 +70,9 @@ final class EpollLoop : SelectorLoop
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
- protected override bool reify(ConnectionWatcher watcher,
+ protected override bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
- in
- {
- assert(watcher !is null);
- }
- body
{
int op = EPOLL_CTL_DEL;
epoll_event ev;
@@ -123,15 +118,18 @@ final class EpollLoop : SelectorLoop
for (auto i = 0; i < eventCount; ++i)
{
- auto transport = cast(SelectorStreamTransport) connections[events[i].data.fd];
+ auto transport = cast(StreamTransport) connections[events[i].data.fd];
if (transport is null)
{
- acceptConnections(connections[events[i].data.fd]);
+ auto connection = cast(ConnectionWatcher) connections[events[i].data.fd];
+ assert(connection !is null);
+
+ acceptConnections(connection);
}
else if (events[i].events & EPOLLERR)
{
- kill(transport, null);
+ kill(transport);
continue;
}
else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d
index 31e57c8..84296a8 100644
--- a/source/tanya/async/event/iocp.d
+++ b/source/tanya/async/event/iocp.d
@@ -26,59 +26,39 @@ import core.sys.windows.winbase;
import core.sys.windows.windef;
import core.sys.windows.winsock2;
-final class IOCPStreamTransport : StreamTransport
+/**
+ * Transport for stream sockets.
+ */
+final class StreamTransport : IOWatcher, SocketTransport
{
- private WriteBuffer!ubyte input;
-
/**
* Creates new completion port transport.
*
* Params:
* socket = Socket.
*
- * Precondition: $(D_INLINECODE socket)
+ * Precondition: $(D_INLINECODE socket !is null)
*/
this(OverlappedConnectedSocket socket) @nogc
- in
- {
- assert(socket !is null);
- }
- body
{
super(socket);
- input = WriteBuffer!ubyte(8192, MmapPool.instance);
}
/**
* Returns: Socket.
- */
- override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc
- {
- return cast(OverlappedConnectedSocket) socket_;
- }
-
- /**
- * Switches the protocol.
- *
- * The protocol is deallocated by the event loop, it should currently be
- * allocated with $(D_PSYMBOL MmapPool).
- *
- * Params:
- * protocol = Application protocol.
*
- * Precondition: $(D_INLINECODE protocol !is null)
+ * Postcondition: $(D_INLINECODE socket !is null)
*/
- @property void protocol(Protocol protocol) pure nothrow @safe @nogc
- in
+ override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc
+ out (socket)
{
- assert(protocol !is null);
+ assert(socket !is null);
}
body
{
- protocol_ = protocol;
+ return cast(OverlappedConnectedSocket) socket_;
}
-
/**
* Write some data to the transport.
*
@@ -137,7 +117,7 @@ final class IOCPLoop : Loop
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
- override protected bool reify(ConnectionWatcher watcher,
+ override protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
{
@@ -170,7 +150,7 @@ final class IOCPLoop : Loop
if (!(oldEvents & Event.read) && (events & Event.read)
|| !(oldEvents & Event.write) && (events & Event.write))
{
- auto transport = cast(IOCPStreamTransport) watcher;
+ auto transport = cast(StreamTransport) watcher;
assert(transport !is null);
if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
@@ -237,7 +217,7 @@ final class IOCPLoop : Loop
assert(listener !is null);
auto socket = listener.endAccept(overlapped);
- auto transport = MmapPool.instance.make!IOCPStreamTransport(socket);
+ auto transport = MmapPool.instance.make!StreamTransport(socket);
connection.incoming.enqueue(transport);
@@ -247,7 +227,7 @@ final class IOCPLoop : Loop
listener.beginAccept(overlapped);
break;
case OverlappedSocketEvent.read:
- auto transport = cast(IOCPStreamTransport) (cast(void*) key);
+ auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null);
if (!transport.active)
@@ -287,7 +267,7 @@ final class IOCPLoop : Loop
}
break;
case OverlappedSocketEvent.write:
- auto transport = cast(IOCPStreamTransport) (cast(void*) key);
+ auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null);
transport.input += transport.socket.endSend(overlapped);
diff --git a/source/tanya/async/event/kqueue.d b/source/tanya/async/event/kqueue.d
index e52a737..721a792 100644
--- a/source/tanya/async/event/kqueue.d
+++ b/source/tanya/async/event/kqueue.d
@@ -177,7 +177,7 @@ final class KqueueLoop : SelectorLoop
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
- override protected bool reify(ConnectionWatcher watcher,
+ override protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
{
@@ -237,15 +237,18 @@ final class KqueueLoop : SelectorLoop
{
assert(connections.length > events[i].ident);
- auto transport = cast(SelectorStreamTransport) connections[events[i].ident];
+ auto transport = cast(StreamTransport) connections[events[i].ident];
// If it is a ConnectionWatcher. Accept connections.
if (transport is null)
{
+ auto connection = cast(ConnectionWatcher) connections[events[i].data.fd];
+ assert(connection !is null);
+
acceptConnections(connections[events[i].ident]);
}
else if (events[i].flags & EV_ERROR)
{
- kill(transport, null);
+ kill(transport);
}
else if (events[i].filter == EVFILT_READ)
{
@@ -300,12 +303,8 @@ final class KqueueLoop : SelectorLoop
* Params:
* transport = Transport.
* exception = Exception thrown on sending.
- *
- * Returns: $(D_KEYWORD true) if the operation could be successfully
- * completed or scheduled, $(D_KEYWORD false) otherwise (the
- * transport is be destroyed then).
*/
- protected override bool feed(SelectorStreamTransport transport,
+ protected override void feed(StreamTransport transport,
SocketException exception = null) @nogc
{
if (!super.feed(transport, exception))
diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d
index 83a9fce..06662b5 100644
--- a/source/tanya/async/event/selector.d
+++ b/source/tanya/async/event/selector.d
@@ -25,11 +25,8 @@ import tanya.network.socket;
/**
* Transport for stream sockets.
*/
-final class SelectorStreamTransport : IOWatcher, StreamTransport
+final class StreamTransport : IOWatcher, SocketTransport
{
- /// Input buffer.
- package WriteBuffer!ubyte input;
-
private SelectorLoop loop;
/// Received notification that the underlying socket is write-ready.
@@ -37,8 +34,8 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
/**
* Params:
- * loop = Event loop.
- * socket = Socket.
+ * loop = Event loop.
+ * socket = Socket.
*
* Precondition: $(D_INLINECODE loop !is null && socket !is null)
*/
@@ -46,52 +43,36 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
in
{
assert(loop !is null);
- assert(socket !is null);
}
body
{
super(socket);
this.loop = loop;
- input = WriteBuffer!ubyte(8192, MmapPool.instance);
}
/**
* Returns: Socket.
+ *
+ * Postcondition: $(D_INLINECODE socket !is null)
*/
override @property ConnectedSocket socket() pure nothrow @safe @nogc
- {
- return cast(ConnectedSocket) socket_;
- }
-
- private @property void socket(ConnectedSocket socket) pure nothrow @safe @nogc
- in
+ out (socket)
{
assert(socket !is null);
}
body
{
- socket_ = socket;
+ return cast(ConnectedSocket) socket_;
}
- /**
- * Switches the protocol.
- *
- * The protocol is deallocated by the event loop, it should currently be
- * allocated with $(D_PSYMBOL MmapPool).
- *
- * Params:
- * protocol = Application protocol.
- *
- * Precondition: $(D_INLINECODE protocol !is null)
- */
- @property void protocol(Protocol protocol) pure nothrow @safe @nogc
+ private @property void socket(ConnectedSocket socket) pure nothrow @safe @nogc
in
{
- assert(protocol !is null);
+ assert(socket !is null);
}
body
{
- protocol_ = protocol;
+ socket_ = socket;
}
/**
@@ -140,12 +121,12 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
abstract class SelectorLoop : Loop
{
/// Pending connections.
- protected Vector!ConnectionWatcher connections;
+ protected Vector!SocketWatcher connections;
this() @nogc
{
super();
- connections = Vector!ConnectionWatcher(maxEvents, MmapPool.instance);
+ connections = Vector!SocketWatcher(maxEvents, MmapPool.instance);
}
~this() @nogc
@@ -154,11 +135,9 @@ abstract class SelectorLoop : Loop
{
// We want to free only IOWatchers. ConnectionWatcher are created by the
// user and should be freed by himself.
- auto io = cast(IOWatcher) connection;
- if (io !is null)
+ if (cast(IOWatcher) connection !is null)
{
- MmapPool.instance.dispose(io);
- connection = null;
+ MmapPool.instance.dispose(connection);
}
}
}
@@ -170,13 +149,14 @@ abstract class SelectorLoop : Loop
* Params:
* transport = Transport.
* exception = Exception thrown on sending.
- *
- * Returns: $(D_KEYWORD true) if the operation could be successfully
- * completed or scheduled, $(D_KEYWORD false) otherwise (the
- * transport will be destroyed then).
*/
- protected bool feed(SelectorStreamTransport transport,
+ protected void feed(StreamTransport transport,
SocketException exception = null) @nogc
+ in
+ {
+ assert(transport !is null);
+ }
+ body
{
while (transport.input.length && transport.writeReady)
{
@@ -204,9 +184,7 @@ abstract class SelectorLoop : Loop
assert(watcher !is null);
kill(watcher, exception);
- return false;
}
- return true;
}
/**
@@ -215,7 +193,7 @@ abstract class SelectorLoop : Loop
* Params:
* watcher = Watcher.
*/
- override void start(ConnectionWatcher watcher) @nogc
+ override void start(SocketWatcher watcher) @nogc
{
if (watcher.active)
{
@@ -261,11 +239,11 @@ abstract class SelectorLoop : Loop
break;
}
- SelectorStreamTransport transport;
+ StreamTransport transport;
if (connections.length > client.handle)
{
- transport = cast(SelectorStreamTransport) connections[client.handle];
+ transport = cast(StreamTransport) connections[client.handle];
}
else
{
@@ -273,7 +251,7 @@ abstract class SelectorLoop : Loop
}
if (transport is null)
{
- transport = MmapPool.instance.make!SelectorStreamTransport(this, client);
+ transport = MmapPool.instance.make!StreamTransport(this, client);
connections[client.handle] = transport;
}
else
diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d
index b7854a6..70f1f66 100644
--- a/source/tanya/async/loop.d
+++ b/source/tanya/async/loop.d
@@ -196,7 +196,7 @@ abstract class Loop
* Params:
* watcher = Watcher.
*/
- void start(ConnectionWatcher watcher) @nogc
+ void start(SocketWatcher watcher) @nogc
{
if (watcher.active)
{
@@ -234,9 +234,9 @@ abstract class Loop
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
- abstract protected bool reify(ConnectionWatcher watcher,
- EventMask oldEvents,
- EventMask events) @nogc;
+ abstract protected bool reify(SocketWatcher watcher,
+ EventMask oldEvents,
+ EventMask events) @nogc;
/**
* Returns: The blocking time.
@@ -273,7 +273,7 @@ abstract class Loop
* watcher = Watcher.
* exception = Occurred exception.
*/
- protected void kill(IOWatcher watcher, SocketException exception) @nogc
+ protected void kill(IOWatcher watcher, SocketException exception = null) @nogc
in
{
assert(watcher !is null);
diff --git a/source/tanya/async/transport.d b/source/tanya/async/transport.d
index b06bec7..f18fbee 100644
--- a/source/tanya/async/transport.d
+++ b/source/tanya/async/transport.d
@@ -85,10 +85,3 @@ interface SocketTransport : Transport
*/
@property Socket socket() pure nothrow @safe @nogc;
}
-
-/**
- * Represents a connection-oriented socket transport.
- */
-package interface StreamTransport : DuplexTransport, SocketTransport
-{
-}
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d
index 37742e2..997e033 100644
--- a/source/tanya/async/watcher.d
+++ b/source/tanya/async/watcher.d
@@ -36,52 +36,66 @@ abstract class Watcher
void invoke() @nogc;
}
-class ConnectionWatcher : Watcher
+/**
+ * Socket watcher.
+ */
+abstract class SocketWatcher : Watcher
{
/// Watched socket.
protected Socket socket_;
- /// Protocol factory.
- protected Protocol delegate() @nogc protocolFactory;
-
- package Queue!DuplexTransport incoming;
-
/**
* Params:
* socket = Socket.
+ *
+ * Precondition: $(D_INLINECODE socket !is null)
*/
- this(Socket socket) @nogc
+ this(Socket socket) pure nothrow @safe @nogc
+ in
+ {
+ assert(socket !is null);
+ }
+ body
{
- incoming = Queue!DuplexTransport(MmapPool.instance);
socket_ = socket;
}
/**
- * Destroys the watcher.
+ * Returns: Socket.
*/
- ~this() @nogc
+ @property Socket socket() pure nothrow @safe @nogc
{
- foreach (w; incoming)
- {
- MmapPool.instance.dispose(w);
- }
+ return socket_;
}
+}
+
+/**
+ * Connection watcher.
+ */
+class ConnectionWatcher : SocketWatcher
+{
+ /// Incoming connection queue.
+ Queue!DuplexTransport incoming;
+
+ private Protocol delegate() @nogc protocolFactory;
/**
* Params:
- * P = Protocol should be used.
+ * socket = Socket.
*/
- void setProtocol(P : Protocol)() @nogc
+ this(Socket socket) @nogc
{
- this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
+ super(socket);
+ incoming = Queue!DuplexTransport(MmapPool.instance);
}
/**
- * Returns: Socket.
+ * Params:
+ * P = Protocol should be used.
*/
- @property Socket socket() pure nothrow @safe @nogc
+ void setProtocol(P : Protocol)() @nogc
{
- return socket_;
+ this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
}
/**
@@ -102,21 +116,17 @@ class ConnectionWatcher : Watcher
}
}
-/**
- * Contains a pending watcher with the invoked events or a transport can be
- * read from.
- */
-class IOWatcher : ConnectionWatcher
+package abstract class IOWatcher : SocketWatcher, DuplexTransport
{
package SocketException exception;
- protected Protocol protocol_;
-
- /**
- * Returns: Underlying output buffer.
- */
package ReadBuffer!ubyte output;
+ package WriteBuffer!ubyte input;
+
+ /// Application protocol.
+ protected Protocol protocol_;
+
/**
* Params:
* socket = Socket.
@@ -124,26 +134,14 @@ class IOWatcher : ConnectionWatcher
* Precondition: $(D_INLINECODE socket !is null)
*/
this(ConnectedSocket socket) @nogc
- in
- {
- assert(socket !is null);
- }
- body
{
super(socket);
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
+ input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true;
}
/**
- * Destroys the watcher.
- */
- ~this() @nogc
- {
- MmapPool.instance.dispose(protocol_);
- }
-
- /**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
@@ -152,18 +150,24 @@ class IOWatcher : ConnectionWatcher
}
/**
- * Returns: Socket.
+ * Switches the protocol.
*
- * Precondition: $(D_INLINECODE socket !is null)
+ * The protocol is deallocated by the event loop, it should currently be
+ * allocated with $(D_PSYMBOL MmapPool).
+ *
+ * Params:
+ * protocol = Application protocol.
+ *
+ * Precondition: $(D_INLINECODE protocol !is null)
*/
- override @property ConnectedSocket socket() pure nothrow @safe @nogc
- out (socket)
+ @property void protocol(Protocol protocol) pure nothrow @safe @nogc
+ in
{
- assert(socket !is null);
+ assert(protocol !is null);
}
body
{
- return cast(ConnectedSocket) socket_;
+ protocol_ = protocol;
}
/**
@@ -179,7 +183,7 @@ class IOWatcher : ConnectionWatcher
else
{
protocol.disconnected(exception);
- MmapPool.instance.dispose(protocol);
+ MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception);
active = false;
}