Remove StreamTransport interface

Implement DuplexTransport and SocketTransport separately.
This commit is contained in:
Eugen Wissner 2017-02-10 22:30:12 +01:00
parent b74e5aa4ee
commit a012ca4003
7 changed files with 122 additions and 170 deletions

View File

@ -70,14 +70,9 @@ final class EpollLoop : SelectorLoop
* *
* Returns: $(D_KEYWORD true) if the operation was successful. * Returns: $(D_KEYWORD true) if the operation was successful.
*/ */
protected override bool reify(ConnectionWatcher watcher, protected override bool reify(SocketWatcher watcher,
EventMask oldEvents, EventMask oldEvents,
EventMask events) @nogc EventMask events) @nogc
in
{
assert(watcher !is null);
}
body
{ {
int op = EPOLL_CTL_DEL; int op = EPOLL_CTL_DEL;
epoll_event ev; epoll_event ev;
@ -123,15 +118,18 @@ final class EpollLoop : SelectorLoop
for (auto i = 0; i < eventCount; ++i) for (auto i = 0; i < eventCount; ++i)
{ {
auto transport = cast(SelectorStreamTransport) connections[events[i].data.fd]; auto transport = cast(StreamTransport) connections[events[i].data.fd];
if (transport is null) if (transport is null)
{ {
acceptConnections(connections[events[i].data.fd]); auto connection = cast(ConnectionWatcher) connections[events[i].data.fd];
assert(connection !is null);
acceptConnections(connection);
} }
else if (events[i].events & EPOLLERR) else if (events[i].events & EPOLLERR)
{ {
kill(transport, null); kill(transport);
continue; continue;
} }
else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP)) else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))

View File

@ -26,59 +26,39 @@ import core.sys.windows.winbase;
import core.sys.windows.windef; import core.sys.windows.windef;
import core.sys.windows.winsock2; import core.sys.windows.winsock2;
final class IOCPStreamTransport : StreamTransport /**
* Transport for stream sockets.
*/
final class StreamTransport : IOWatcher, SocketTransport
{ {
private WriteBuffer!ubyte input;
/** /**
* Creates new completion port transport. * Creates new completion port transport.
* *
* Params: * Params:
* socket = Socket. * socket = Socket.
* *
* Precondition: $(D_INLINECODE socket) * Precondition: $(D_INLINECODE socket !is null)
*/ */
this(OverlappedConnectedSocket socket) @nogc this(OverlappedConnectedSocket socket) @nogc
in {
super(socket);
}
/**
* Returns: Socket.
*
* Postcondition: $(D_INLINECODE socket !is null)
*/
override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc
out (socket)
{ {
assert(socket !is null); assert(socket !is null);
} }
body body
{
super(socket);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
}
/**
* Returns: Socket.
*/
override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc
{ {
return cast(OverlappedConnectedSocket) socket_; return cast(OverlappedConnectedSocket) socket_;
} }
/**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/
@property void protocol(Protocol protocol) pure nothrow @safe @nogc
in
{
assert(protocol !is null);
}
body
{
protocol_ = protocol;
}
/** /**
* Write some data to the transport. * Write some data to the transport.
* *
@ -137,7 +117,7 @@ final class IOCPLoop : Loop
* *
* Returns: $(D_KEYWORD true) if the operation was successful. * Returns: $(D_KEYWORD true) if the operation was successful.
*/ */
override protected bool reify(ConnectionWatcher watcher, override protected bool reify(SocketWatcher watcher,
EventMask oldEvents, EventMask oldEvents,
EventMask events) @nogc EventMask events) @nogc
{ {
@ -170,7 +150,7 @@ final class IOCPLoop : Loop
if (!(oldEvents & Event.read) && (events & Event.read) if (!(oldEvents & Event.read) && (events & Event.read)
|| !(oldEvents & Event.write) && (events & Event.write)) || !(oldEvents & Event.write) && (events & Event.write))
{ {
auto transport = cast(IOCPStreamTransport) watcher; auto transport = cast(StreamTransport) watcher;
assert(transport !is null); assert(transport !is null);
if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle, if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
@ -237,7 +217,7 @@ final class IOCPLoop : Loop
assert(listener !is null); assert(listener !is null);
auto socket = listener.endAccept(overlapped); auto socket = listener.endAccept(overlapped);
auto transport = MmapPool.instance.make!IOCPStreamTransport(socket); auto transport = MmapPool.instance.make!StreamTransport(socket);
connection.incoming.enqueue(transport); connection.incoming.enqueue(transport);
@ -247,7 +227,7 @@ final class IOCPLoop : Loop
listener.beginAccept(overlapped); listener.beginAccept(overlapped);
break; break;
case OverlappedSocketEvent.read: case OverlappedSocketEvent.read:
auto transport = cast(IOCPStreamTransport) (cast(void*) key); auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null); assert(transport !is null);
if (!transport.active) if (!transport.active)
@ -287,7 +267,7 @@ final class IOCPLoop : Loop
} }
break; break;
case OverlappedSocketEvent.write: case OverlappedSocketEvent.write:
auto transport = cast(IOCPStreamTransport) (cast(void*) key); auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null); assert(transport !is null);
transport.input += transport.socket.endSend(overlapped); transport.input += transport.socket.endSend(overlapped);

View File

@ -177,7 +177,7 @@ final class KqueueLoop : SelectorLoop
* *
* Returns: $(D_KEYWORD true) if the operation was successful. * Returns: $(D_KEYWORD true) if the operation was successful.
*/ */
override protected bool reify(ConnectionWatcher watcher, override protected bool reify(SocketWatcher watcher,
EventMask oldEvents, EventMask oldEvents,
EventMask events) @nogc EventMask events) @nogc
{ {
@ -237,15 +237,18 @@ final class KqueueLoop : SelectorLoop
{ {
assert(connections.length > events[i].ident); assert(connections.length > events[i].ident);
auto transport = cast(SelectorStreamTransport) connections[events[i].ident]; auto transport = cast(StreamTransport) connections[events[i].ident];
// If it is a ConnectionWatcher. Accept connections. // If it is a ConnectionWatcher. Accept connections.
if (transport is null) if (transport is null)
{ {
auto connection = cast(ConnectionWatcher) connections[events[i].data.fd];
assert(connection !is null);
acceptConnections(connections[events[i].ident]); acceptConnections(connections[events[i].ident]);
} }
else if (events[i].flags & EV_ERROR) else if (events[i].flags & EV_ERROR)
{ {
kill(transport, null); kill(transport);
} }
else if (events[i].filter == EVFILT_READ) else if (events[i].filter == EVFILT_READ)
{ {
@ -300,12 +303,8 @@ final class KqueueLoop : SelectorLoop
* Params: * Params:
* transport = Transport. * transport = Transport.
* exception = Exception thrown on sending. * exception = Exception thrown on sending.
*
* Returns: $(D_KEYWORD true) if the operation could be successfully
* completed or scheduled, $(D_KEYWORD false) otherwise (the
* transport is be destroyed then).
*/ */
protected override bool feed(SelectorStreamTransport transport, protected override void feed(StreamTransport transport,
SocketException exception = null) @nogc SocketException exception = null) @nogc
{ {
if (!super.feed(transport, exception)) if (!super.feed(transport, exception))

View File

@ -25,11 +25,8 @@ import tanya.network.socket;
/** /**
* Transport for stream sockets. * Transport for stream sockets.
*/ */
final class SelectorStreamTransport : IOWatcher, StreamTransport final class StreamTransport : IOWatcher, SocketTransport
{ {
/// Input buffer.
package WriteBuffer!ubyte input;
private SelectorLoop loop; private SelectorLoop loop;
/// Received notification that the underlying socket is write-ready. /// Received notification that the underlying socket is write-ready.
@ -37,8 +34,8 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
/** /**
* Params: * Params:
* loop = Event loop. * loop = Event loop.
* socket = Socket. * socket = Socket.
* *
* Precondition: $(D_INLINECODE loop !is null && socket !is null) * Precondition: $(D_INLINECODE loop !is null && socket !is null)
*/ */
@ -46,19 +43,24 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
in in
{ {
assert(loop !is null); assert(loop !is null);
assert(socket !is null);
} }
body body
{ {
super(socket); super(socket);
this.loop = loop; this.loop = loop;
input = WriteBuffer!ubyte(8192, MmapPool.instance);
} }
/** /**
* Returns: Socket. * Returns: Socket.
*
* Postcondition: $(D_INLINECODE socket !is null)
*/ */
override @property ConnectedSocket socket() pure nothrow @safe @nogc override @property ConnectedSocket socket() pure nothrow @safe @nogc
out (socket)
{
assert(socket !is null);
}
body
{ {
return cast(ConnectedSocket) socket_; return cast(ConnectedSocket) socket_;
} }
@ -73,27 +75,6 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
socket_ = socket; socket_ = socket;
} }
/**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/
@property void protocol(Protocol protocol) pure nothrow @safe @nogc
in
{
assert(protocol !is null);
}
body
{
protocol_ = protocol;
}
/** /**
* Write some data to the transport. * Write some data to the transport.
* *
@ -140,12 +121,12 @@ final class SelectorStreamTransport : IOWatcher, StreamTransport
abstract class SelectorLoop : Loop abstract class SelectorLoop : Loop
{ {
/// Pending connections. /// Pending connections.
protected Vector!ConnectionWatcher connections; protected Vector!SocketWatcher connections;
this() @nogc this() @nogc
{ {
super(); super();
connections = Vector!ConnectionWatcher(maxEvents, MmapPool.instance); connections = Vector!SocketWatcher(maxEvents, MmapPool.instance);
} }
~this() @nogc ~this() @nogc
@ -154,11 +135,9 @@ abstract class SelectorLoop : Loop
{ {
// We want to free only IOWatchers. ConnectionWatcher are created by the // We want to free only IOWatchers. ConnectionWatcher are created by the
// user and should be freed by himself. // user and should be freed by himself.
auto io = cast(IOWatcher) connection; if (cast(IOWatcher) connection !is null)
if (io !is null)
{ {
MmapPool.instance.dispose(io); MmapPool.instance.dispose(connection);
connection = null;
} }
} }
} }
@ -170,13 +149,14 @@ abstract class SelectorLoop : Loop
* Params: * Params:
* transport = Transport. * transport = Transport.
* exception = Exception thrown on sending. * exception = Exception thrown on sending.
*
* Returns: $(D_KEYWORD true) if the operation could be successfully
* completed or scheduled, $(D_KEYWORD false) otherwise (the
* transport will be destroyed then).
*/ */
protected bool feed(SelectorStreamTransport transport, protected void feed(StreamTransport transport,
SocketException exception = null) @nogc SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{ {
while (transport.input.length && transport.writeReady) while (transport.input.length && transport.writeReady)
{ {
@ -204,9 +184,7 @@ abstract class SelectorLoop : Loop
assert(watcher !is null); assert(watcher !is null);
kill(watcher, exception); kill(watcher, exception);
return false;
} }
return true;
} }
/** /**
@ -215,7 +193,7 @@ abstract class SelectorLoop : Loop
* Params: * Params:
* watcher = Watcher. * watcher = Watcher.
*/ */
override void start(ConnectionWatcher watcher) @nogc override void start(SocketWatcher watcher) @nogc
{ {
if (watcher.active) if (watcher.active)
{ {
@ -261,11 +239,11 @@ abstract class SelectorLoop : Loop
break; break;
} }
SelectorStreamTransport transport; StreamTransport transport;
if (connections.length > client.handle) if (connections.length > client.handle)
{ {
transport = cast(SelectorStreamTransport) connections[client.handle]; transport = cast(StreamTransport) connections[client.handle];
} }
else else
{ {
@ -273,7 +251,7 @@ abstract class SelectorLoop : Loop
} }
if (transport is null) if (transport is null)
{ {
transport = MmapPool.instance.make!SelectorStreamTransport(this, client); transport = MmapPool.instance.make!StreamTransport(this, client);
connections[client.handle] = transport; connections[client.handle] = transport;
} }
else else

View File

@ -196,7 +196,7 @@ abstract class Loop
* Params: * Params:
* watcher = Watcher. * watcher = Watcher.
*/ */
void start(ConnectionWatcher watcher) @nogc void start(SocketWatcher watcher) @nogc
{ {
if (watcher.active) if (watcher.active)
{ {
@ -234,9 +234,9 @@ abstract class Loop
* *
* Returns: $(D_KEYWORD true) if the operation was successful. * Returns: $(D_KEYWORD true) if the operation was successful.
*/ */
abstract protected bool reify(ConnectionWatcher watcher, abstract protected bool reify(SocketWatcher watcher,
EventMask oldEvents, EventMask oldEvents,
EventMask events) @nogc; EventMask events) @nogc;
/** /**
* Returns: The blocking time. * Returns: The blocking time.
@ -273,7 +273,7 @@ abstract class Loop
* watcher = Watcher. * watcher = Watcher.
* exception = Occurred exception. * exception = Occurred exception.
*/ */
protected void kill(IOWatcher watcher, SocketException exception) @nogc protected void kill(IOWatcher watcher, SocketException exception = null) @nogc
in in
{ {
assert(watcher !is null); assert(watcher !is null);

View File

@ -85,10 +85,3 @@ interface SocketTransport : Transport
*/ */
@property Socket socket() pure nothrow @safe @nogc; @property Socket socket() pure nothrow @safe @nogc;
} }
/**
* Represents a connection-oriented socket transport.
*/
package interface StreamTransport : DuplexTransport, SocketTransport
{
}

View File

@ -36,15 +36,48 @@ abstract class Watcher
void invoke() @nogc; void invoke() @nogc;
} }
class ConnectionWatcher : Watcher /**
* Socket watcher.
*/
abstract class SocketWatcher : Watcher
{ {
/// Watched socket. /// Watched socket.
protected Socket socket_; protected Socket socket_;
/// Protocol factory. /**
protected Protocol delegate() @nogc protocolFactory; * Params:
* socket = Socket.
*
* Precondition: $(D_INLINECODE socket !is null)
*/
this(Socket socket) pure nothrow @safe @nogc
in
{
assert(socket !is null);
}
body
{
socket_ = socket;
}
package Queue!DuplexTransport incoming; /**
* Returns: Socket.
*/
@property Socket socket() pure nothrow @safe @nogc
{
return socket_;
}
}
/**
* Connection watcher.
*/
class ConnectionWatcher : SocketWatcher
{
/// Incoming connection queue.
Queue!DuplexTransport incoming;
private Protocol delegate() @nogc protocolFactory;
/** /**
* Params: * Params:
@ -52,19 +85,8 @@ class ConnectionWatcher : Watcher
*/ */
this(Socket socket) @nogc this(Socket socket) @nogc
{ {
super(socket);
incoming = Queue!DuplexTransport(MmapPool.instance); incoming = Queue!DuplexTransport(MmapPool.instance);
socket_ = socket;
}
/**
* Destroys the watcher.
*/
~this() @nogc
{
foreach (w; incoming)
{
MmapPool.instance.dispose(w);
}
} }
/** /**
@ -76,14 +98,6 @@ class ConnectionWatcher : Watcher
this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P; this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
} }
/**
* Returns: Socket.
*/
@property Socket socket() pure nothrow @safe @nogc
{
return socket_;
}
/** /**
* Invokes new connection callback. * Invokes new connection callback.
*/ */
@ -102,21 +116,17 @@ class ConnectionWatcher : Watcher
} }
} }
/** package abstract class IOWatcher : SocketWatcher, DuplexTransport
* Contains a pending watcher with the invoked events or a transport can be
* read from.
*/
class IOWatcher : ConnectionWatcher
{ {
package SocketException exception; package SocketException exception;
protected Protocol protocol_;
/**
* Returns: Underlying output buffer.
*/
package ReadBuffer!ubyte output; package ReadBuffer!ubyte output;
package WriteBuffer!ubyte input;
/// Application protocol.
protected Protocol protocol_;
/** /**
* Params: * Params:
* socket = Socket. * socket = Socket.
@ -124,25 +134,13 @@ class IOWatcher : ConnectionWatcher
* Precondition: $(D_INLINECODE socket !is null) * Precondition: $(D_INLINECODE socket !is null)
*/ */
this(ConnectedSocket socket) @nogc this(ConnectedSocket socket) @nogc
in
{
assert(socket !is null);
}
body
{ {
super(socket); super(socket);
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true; active = true;
} }
/**
* Destroys the watcher.
*/
~this() @nogc
{
MmapPool.instance.dispose(protocol_);
}
/** /**
* Returns: Application protocol. * Returns: Application protocol.
*/ */
@ -152,18 +150,24 @@ class IOWatcher : ConnectionWatcher
} }
/** /**
* Returns: Socket. * Switches the protocol.
* *
* Precondition: $(D_INLINECODE socket !is null) * The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/ */
override @property ConnectedSocket socket() pure nothrow @safe @nogc @property void protocol(Protocol protocol) pure nothrow @safe @nogc
out (socket) in
{ {
assert(socket !is null); assert(protocol !is null);
} }
body body
{ {
return cast(ConnectedSocket) socket_; protocol_ = protocol;
} }
/** /**
@ -179,7 +183,7 @@ class IOWatcher : ConnectionWatcher
else else
{ {
protocol.disconnected(exception); protocol.disconnected(exception);
MmapPool.instance.dispose(protocol); MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception); defaultAllocator.dispose(exception);
active = false; active = false;
} }