summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEugen Wissner <belka@caraus.de>2017-02-11 19:47:57 +0100
committerEugen Wissner <belka@caraus.de>2017-02-11 19:47:57 +0100
commit3454a1965a8760deb9acfa44d5c2cf311710daec (patch)
tree578783ac6967f760d7d19c3dd014833c8ef905d3
parentc41fa2e98fdab61e4806869dc49576a105176eb8 (diff)
downloadtanya-3454a1965a8760deb9acfa44d5c2cf311710daec.tar.gz
Move all IOWatcher members to the transports
-rw-r--r--source/tanya/async/event/iocp.d66
-rw-r--r--source/tanya/async/event/selector.d107
-rw-r--r--source/tanya/async/loop.d22
-rw-r--r--source/tanya/async/watcher.d74
4 files changed, 164 insertions, 105 deletions
diff --git a/source/tanya/async/event/iocp.d b/source/tanya/async/event/iocp.d
index 84296a8..8dc6f2d 100644
--- a/source/tanya/async/event/iocp.d
+++ b/source/tanya/async/event/iocp.d
@@ -29,8 +29,16 @@ import core.sys.windows.winsock2;
/**
* Transport for stream sockets.
*/
-final class StreamTransport : IOWatcher, SocketTransport
+final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{
+ private SocketException exception;
+
+ private ReadBuffer!ubyte output;
+
+ private WriteBuffer!ubyte input;
+
+ private Protocol protocol_;
+
/**
* Creates new completion port transport.
*
@@ -42,6 +50,9 @@ final class StreamTransport : IOWatcher, SocketTransport
this(OverlappedConnectedSocket socket) @nogc
{
super(socket);
+ output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
+ input = WriteBuffer!ubyte(8192, MmapPool.instance);
+ active = true;
}
/**
@@ -84,6 +95,54 @@ final class StreamTransport : IOWatcher, SocketTransport
}
}
}
+
+ /**
+ * Returns: Application protocol.
+ */
+ @property Protocol protocol() pure nothrow @safe @nogc
+ {
+ return protocol_;
+ }
+
+ /**
+ * Switches the protocol.
+ *
+ * The protocol is deallocated by the event loop, it should currently be
+ * allocated with $(D_PSYMBOL MmapPool).
+ *
+ * Params:
+ * protocol = Application protocol.
+ *
+ * Precondition: $(D_INLINECODE protocol !is null)
+ */
+ @property void protocol(Protocol protocol) pure nothrow @safe @nogc
+ in
+ {
+ assert(protocol !is null);
+ }
+ body
+ {
+ protocol_ = protocol;
+ }
+
+ /**
+ * Invokes the watcher callback.
+ */
+ override void invoke() @nogc
+ {
+ if (output.length)
+ {
+ protocol.received(output[0 .. $]);
+ output.clear();
+ }
+ else
+ {
+ protocol.disconnected(exception);
+ MmapPool.instance.dispose(protocol_);
+ defaultAllocator.dispose(exception);
+ active = false;
+ }
+ }
}
final class IOCPLoop : Loop
@@ -251,7 +310,10 @@ final class IOCPLoop : Loop
{
// We want to get one last notification to destroy the watcher
transport.socket.beginReceive(transport.output[], overlapped);
- kill(transport, exception);
+ transport.socket.shutdown();
+ defaultAllocator.dispose(transport.socket);
+ transport.exception = exception;
+ pendings.enqueue(transport);
}
else if (received > 0)
{
diff --git a/source/tanya/async/event/selector.d b/source/tanya/async/event/selector.d
index 67efd7f..9251b31 100644
--- a/source/tanya/async/event/selector.d
+++ b/source/tanya/async/event/selector.d
@@ -25,10 +25,18 @@ import tanya.network.socket;
/**
* Transport for stream sockets.
*/
-final class StreamTransport : IOWatcher, SocketTransport
+package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{
private SelectorLoop loop;
+ private SocketException exception;
+
+ package ReadBuffer!ubyte output;
+
+ package WriteBuffer!ubyte input;
+
+ private Protocol protocol_;
+
/// Received notification that the underlying socket is write-ready.
package bool writeReady;
@@ -48,6 +56,9 @@ final class StreamTransport : IOWatcher, SocketTransport
{
super(socket);
this.loop = loop;
+ output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
+ input = WriteBuffer!ubyte(8192, MmapPool.instance);
+ active = true;
}
/**
@@ -76,6 +87,54 @@ final class StreamTransport : IOWatcher, SocketTransport
}
/**
+ * Returns: Application protocol.
+ */
+ @property Protocol protocol() pure nothrow @safe @nogc
+ {
+ return protocol_;
+ }
+
+ /**
+ * Switches the protocol.
+ *
+ * The protocol is deallocated by the event loop, it should currently be
+ * allocated with $(D_PSYMBOL MmapPool).
+ *
+ * Params:
+ * protocol = Application protocol.
+ *
+ * Precondition: $(D_INLINECODE protocol !is null)
+ */
+ @property void protocol(Protocol protocol) pure nothrow @safe @nogc
+ in
+ {
+ assert(protocol !is null);
+ }
+ body
+ {
+ protocol_ = protocol;
+ }
+
+ /**
+ * Invokes the watcher callback.
+ */
+ override void invoke() @nogc
+ {
+ if (output.length)
+ {
+ protocol.received(output[0 .. $]);
+ output.clear();
+ }
+ else
+ {
+ protocol.disconnected(exception);
+ MmapPool.instance.dispose(protocol_);
+ defaultAllocator.dispose(exception);
+ active = false;
+ }
+ }
+
+ /**
* Write some data to the transport.
*
* Params:
@@ -133,9 +192,9 @@ abstract class SelectorLoop : Loop
{
foreach (ref connection; connections)
{
- // We want to free only IOWatchers. ConnectionWatcher are created by the
+ // We want to free only the transports. ConnectionWatcher are created by the
// user and should be freed by himself.
- if (cast(IOWatcher) connection !is null)
+ if (cast(StreamTransport) connection !is null)
{
MmapPool.instance.dispose(connection);
}
@@ -143,6 +202,41 @@ abstract class SelectorLoop : Loop
}
/**
+ * Should be called if the backend configuration changes.
+ *
+ * Params:
+ * watcher = Watcher.
+ * oldEvents = The events were already set.
+ * events = The events should be set.
+ *
+ * Returns: $(D_KEYWORD true) if the operation was successful.
+ */
+ override abstract protected bool reify(SocketWatcher watcher,
+ EventMask oldEvents,
+ EventMask events) @nogc;
+
+ /**
+ * Kills the watcher and closes the connection.
+ *
+ * Params:
+ * transport = Transport.
+ * exception = Occurred exception.
+ */
+ protected void kill(StreamTransport transport,
+ SocketException exception = null) @nogc
+ in
+ {
+ assert(transport !is null);
+ }
+ body
+ {
+ transport.socket.shutdown();
+ defaultAllocator.dispose(transport.socket);
+ transport.exception = exception;
+ pendings.enqueue(transport);
+ }
+
+ /**
* If the transport couldn't send the data, the further sending should
* be handled by the event loop.
*
@@ -184,10 +278,7 @@ abstract class SelectorLoop : Loop
}
if (exception !is null)
{
- auto watcher = cast(IOWatcher) connections[transport.socket.handle];
- assert(watcher !is null);
-
- kill(watcher, exception);
+ kill(transport, exception);
return false;
}
return true;
@@ -199,7 +290,7 @@ abstract class SelectorLoop : Loop
* Params:
* watcher = Watcher.
*/
- override void start(SocketWatcher watcher) @nogc
+ override void start(ConnectionWatcher watcher) @nogc
{
if (watcher.active)
{
diff --git a/source/tanya/async/loop.d b/source/tanya/async/loop.d
index 70f1f66..54b5238 100644
--- a/source/tanya/async/loop.d
+++ b/source/tanya/async/loop.d
@@ -196,7 +196,7 @@ abstract class Loop
* Params:
* watcher = Watcher.
*/
- void start(SocketWatcher watcher) @nogc
+ void start(ConnectionWatcher watcher) @nogc
{
if (watcher.active)
{
@@ -267,26 +267,6 @@ abstract class Loop
}
/**
- * Kills the watcher and closes the connection.
- *
- * Params:
- * watcher = Watcher.
- * exception = Occurred exception.
- */
- protected void kill(IOWatcher watcher, SocketException exception = null) @nogc
- in
- {
- assert(watcher !is null);
- }
- body
- {
- watcher.socket.shutdown();
- defaultAllocator.dispose(watcher.socket);
- watcher.exception = exception;
- pendings.enqueue(watcher);
- }
-
- /**
* Does the actual polling.
*/
abstract protected void poll() @nogc;
diff --git a/source/tanya/async/watcher.d b/source/tanya/async/watcher.d
index 997e033..9756d99 100644
--- a/source/tanya/async/watcher.d
+++ b/source/tanya/async/watcher.d
@@ -115,77 +115,3 @@ class ConnectionWatcher : SocketWatcher
}
}
}
-
-package abstract class IOWatcher : SocketWatcher, DuplexTransport
-{
- package SocketException exception;
-
- package ReadBuffer!ubyte output;
-
- package WriteBuffer!ubyte input;
-
- /// Application protocol.
- protected Protocol protocol_;
-
- /**
- * Params:
- * socket = Socket.
- *
- * Precondition: $(D_INLINECODE socket !is null)
- */
- this(ConnectedSocket socket) @nogc
- {
- super(socket);
- output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
- input = WriteBuffer!ubyte(8192, MmapPool.instance);
- active = true;
- }
-
- /**
- * Returns: Application protocol.
- */
- @property Protocol protocol() pure nothrow @safe @nogc
- {
- return protocol_;
- }
-
- /**
- * Switches the protocol.
- *
- * The protocol is deallocated by the event loop, it should currently be
- * allocated with $(D_PSYMBOL MmapPool).
- *
- * Params:
- * protocol = Application protocol.
- *
- * Precondition: $(D_INLINECODE protocol !is null)
- */
- @property void protocol(Protocol protocol) pure nothrow @safe @nogc
- in
- {
- assert(protocol !is null);
- }
- body
- {
- protocol_ = protocol;
- }
-
- /**
- * Invokes the watcher callback.
- */
- override void invoke() @nogc
- {
- if (output.length)
- {
- protocol.received(output[0 .. $]);
- output.clear();
- }
- else
- {
- protocol.disconnected(exception);
- MmapPool.instance.dispose(protocol_);
- defaultAllocator.dispose(exception);
- active = false;
- }
- }
-}