Move all IOWatcher members to the transports

This commit is contained in:
Eugen Wissner 2017-02-11 19:47:57 +01:00
parent c41fa2e98f
commit 3454a1965a
4 changed files with 164 additions and 105 deletions

View File

@ -29,8 +29,16 @@ import core.sys.windows.winsock2;
/** /**
* Transport for stream sockets. * Transport for stream sockets.
*/ */
final class StreamTransport : IOWatcher, SocketTransport final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{ {
private SocketException exception;
private ReadBuffer!ubyte output;
private WriteBuffer!ubyte input;
private Protocol protocol_;
/** /**
* Creates new completion port transport. * Creates new completion port transport.
* *
@ -42,6 +50,9 @@ final class StreamTransport : IOWatcher, SocketTransport
this(OverlappedConnectedSocket socket) @nogc this(OverlappedConnectedSocket socket) @nogc
{ {
super(socket); super(socket);
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true;
} }
/** /**
@ -84,6 +95,54 @@ final class StreamTransport : IOWatcher, SocketTransport
} }
} }
} }
/**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
}
/**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/
@property void protocol(Protocol protocol) pure nothrow @safe @nogc
in
{
assert(protocol !is null);
}
body
{
protocol_ = protocol;
}
/**
* Invokes the watcher callback.
*/
override void invoke() @nogc
{
if (output.length)
{
protocol.received(output[0 .. $]);
output.clear();
}
else
{
protocol.disconnected(exception);
MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception);
active = false;
}
}
} }
final class IOCPLoop : Loop final class IOCPLoop : Loop
@ -251,7 +310,10 @@ final class IOCPLoop : Loop
{ {
// We want to get one last notification to destroy the watcher // We want to get one last notification to destroy the watcher
transport.socket.beginReceive(transport.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
kill(transport, exception); transport.socket.shutdown();
defaultAllocator.dispose(transport.socket);
transport.exception = exception;
pendings.enqueue(transport);
} }
else if (received > 0) else if (received > 0)
{ {

View File

@ -25,10 +25,18 @@ import tanya.network.socket;
/** /**
* Transport for stream sockets. * Transport for stream sockets.
*/ */
final class StreamTransport : IOWatcher, SocketTransport package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{ {
private SelectorLoop loop; private SelectorLoop loop;
private SocketException exception;
package ReadBuffer!ubyte output;
package WriteBuffer!ubyte input;
private Protocol protocol_;
/// Received notification that the underlying socket is write-ready. /// Received notification that the underlying socket is write-ready.
package bool writeReady; package bool writeReady;
@ -48,6 +56,9 @@ final class StreamTransport : IOWatcher, SocketTransport
{ {
super(socket); super(socket);
this.loop = loop; this.loop = loop;
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true;
} }
/** /**
@ -75,6 +86,54 @@ final class StreamTransport : IOWatcher, SocketTransport
socket_ = socket; socket_ = socket;
} }
/**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
}
/**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/
@property void protocol(Protocol protocol) pure nothrow @safe @nogc
in
{
assert(protocol !is null);
}
body
{
protocol_ = protocol;
}
/**
* Invokes the watcher callback.
*/
override void invoke() @nogc
{
if (output.length)
{
protocol.received(output[0 .. $]);
output.clear();
}
else
{
protocol.disconnected(exception);
MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception);
active = false;
}
}
/** /**
* Write some data to the transport. * Write some data to the transport.
* *
@ -133,15 +192,50 @@ abstract class SelectorLoop : Loop
{ {
foreach (ref connection; connections) foreach (ref connection; connections)
{ {
// We want to free only IOWatchers. ConnectionWatcher are created by the // We want to free only the transports. ConnectionWatcher are created by the
// user and should be freed by himself. // user and should be freed by himself.
if (cast(IOWatcher) connection !is null) if (cast(StreamTransport) connection !is null)
{ {
MmapPool.instance.dispose(connection); MmapPool.instance.dispose(connection);
} }
} }
} }
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
override abstract protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc;
/**
* Kills the watcher and closes the connection.
*
* Params:
* transport = Transport.
* exception = Occurred exception.
*/
protected void kill(StreamTransport transport,
SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{
transport.socket.shutdown();
defaultAllocator.dispose(transport.socket);
transport.exception = exception;
pendings.enqueue(transport);
}
/** /**
* If the transport couldn't send the data, the further sending should * If the transport couldn't send the data, the further sending should
* be handled by the event loop. * be handled by the event loop.
@ -184,10 +278,7 @@ abstract class SelectorLoop : Loop
} }
if (exception !is null) if (exception !is null)
{ {
auto watcher = cast(IOWatcher) connections[transport.socket.handle]; kill(transport, exception);
assert(watcher !is null);
kill(watcher, exception);
return false; return false;
} }
return true; return true;
@ -199,7 +290,7 @@ abstract class SelectorLoop : Loop
* Params: * Params:
* watcher = Watcher. * watcher = Watcher.
*/ */
override void start(SocketWatcher watcher) @nogc override void start(ConnectionWatcher watcher) @nogc
{ {
if (watcher.active) if (watcher.active)
{ {

View File

@ -196,7 +196,7 @@ abstract class Loop
* Params: * Params:
* watcher = Watcher. * watcher = Watcher.
*/ */
void start(SocketWatcher watcher) @nogc void start(ConnectionWatcher watcher) @nogc
{ {
if (watcher.active) if (watcher.active)
{ {
@ -266,26 +266,6 @@ abstract class Loop
blockTime_ = blockTime; blockTime_ = blockTime;
} }
/**
* Kills the watcher and closes the connection.
*
* Params:
* watcher = Watcher.
* exception = Occurred exception.
*/
protected void kill(IOWatcher watcher, SocketException exception = null) @nogc
in
{
assert(watcher !is null);
}
body
{
watcher.socket.shutdown();
defaultAllocator.dispose(watcher.socket);
watcher.exception = exception;
pendings.enqueue(watcher);
}
/** /**
* Does the actual polling. * Does the actual polling.
*/ */

View File

@ -115,77 +115,3 @@ class ConnectionWatcher : SocketWatcher
} }
} }
} }
package abstract class IOWatcher : SocketWatcher, DuplexTransport
{
package SocketException exception;
package ReadBuffer!ubyte output;
package WriteBuffer!ubyte input;
/// Application protocol.
protected Protocol protocol_;
/**
* Params:
* socket = Socket.
*
* Precondition: $(D_INLINECODE socket !is null)
*/
this(ConnectedSocket socket) @nogc
{
super(socket);
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true;
}
/**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
}
/**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/
@property void protocol(Protocol protocol) pure nothrow @safe @nogc
in
{
assert(protocol !is null);
}
body
{
protocol_ = protocol;
}
/**
* Invokes the watcher callback.
*/
override void invoke() @nogc
{
if (output.length)
{
protocol.received(output[0 .. $]);
output.clear();
}
else
{
protocol.disconnected(exception);
MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception);
active = false;
}
}
}