Pass client socket directly to the IOWatcher

This commit is contained in:
Eugen Wissner 2017-02-09 02:58:59 +01:00 committed by Eugene Wissner
parent 63c6226a2a
commit 0e91ea6786
3 changed files with 41 additions and 48 deletions

View File

@ -257,7 +257,7 @@ class IOCPLoop : Loop
auto socket = listener.endAccept(overlapped); auto socket = listener.endAccept(overlapped);
auto protocol = connection.protocol; auto protocol = connection.protocol;
auto transport = MmapPool.instance.make!IOCPStreamTransport(socket, protocol); auto transport = MmapPool.instance.make!IOCPStreamTransport(socket, protocol);
auto io = MmapPool.instance.make!IOWatcher(transport, protocol); auto io = MmapPool.instance.make!IOWatcher(transport, socket, protocol);
connection.incoming.enqueue(io); connection.incoming.enqueue(io);

View File

@ -282,12 +282,12 @@ abstract class SelectorLoop : Loop
} }
if (io is null) if (io is null)
{ {
io = MmapPool.instance.make!IOWatcher(transport, protocol); io = MmapPool.instance.make!IOWatcher(transport, client, protocol);
connections[client.handle] = io; connections[client.handle] = io;
} }
else else
{ {
io(transport, protocol); io(transport, client, protocol);
} }
reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));

View File

@ -48,7 +48,7 @@ abstract class Watcher
class ConnectionWatcher : Watcher class ConnectionWatcher : Watcher
{ {
/// Watched socket. /// Watched socket.
private Socket socket_; protected Socket socket_;
/// Protocol factory. /// Protocol factory.
protected Protocol delegate() @nogc protocolFactory; protected Protocol delegate() @nogc protocolFactory;
@ -65,11 +65,9 @@ class ConnectionWatcher : Watcher
socket_ = socket; socket_ = socket;
} }
/// Ditto. /**
protected this() pure nothrow @safe @nogc * Destroys the watcher.
{ */
}
~this() @nogc ~this() @nogc
{ {
foreach (w; incoming) foreach (w; incoming)
@ -90,7 +88,7 @@ class ConnectionWatcher : Watcher
/** /**
* Returns: Socket. * Returns: Socket.
*/ */
@property Socket socket() pure nothrow @nogc @property Socket socket() pure nothrow @safe @nogc
{ {
return socket_; return socket_;
} }
@ -126,12 +124,8 @@ class ConnectionWatcher : Watcher
*/ */
class IOWatcher : ConnectionWatcher class IOWatcher : ConnectionWatcher
{ {
/// If an exception was thrown the transport should be already invalid. package StreamTransport transport;
private union package SocketException exception;
{
StreamTransport transport_;
SocketException exception_;
}
private Protocol protocol_; private Protocol protocol_;
@ -143,18 +137,25 @@ class IOWatcher : ConnectionWatcher
/** /**
* Params: * Params:
* transport = Transport. * transport = Transport.
* socket = Socket.
* protocol = New instance of the application protocol. * protocol = New instance of the application protocol.
*
* Precondition: $(D_INLINECODE transport !is null
* && socket !is null
* && protocol !is null)
*/ */
this(StreamTransport transport, Protocol protocol) @nogc this(StreamTransport transport, ConnectedSocket socket, Protocol protocol)
@nogc
in in
{ {
assert(transport !is null); assert(transport !is null);
assert(socket !is null);
assert(protocol !is null); assert(protocol !is null);
} }
body body
{ {
super(); super(socket);
transport_ = transport; this.transport = transport;
protocol_ = protocol; protocol_ = protocol;
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance); output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
active = true; active = true;
@ -169,46 +170,31 @@ class IOWatcher : ConnectionWatcher
} }
/** /**
* Assigns a transport. * Reinitializes the watcher.
* *
* Params: * Params:
* transport = Transport. * transport = Transport.
* protocol = Application protocol. * socket = Socket.
* protocol = New instance of the application protocol.
* *
* Returns: $(D_KEYWORD this). * Precondition: $(D_INLINECODE transport !is null
* && socket !is null
* && protocol !is null)
*/ */
IOWatcher opCall(StreamTransport transport, Protocol protocol) void opCall(StreamTransport transport,
pure nothrow @nogc ConnectedSocket socket,
Protocol protocol) pure nothrow @nogc
in in
{ {
assert(transport !is null); assert(transport !is null);
assert(socket !is null);
assert(protocol !is null); assert(protocol !is null);
} }
body body
{ {
transport_ = transport; this.transport = transport;
protocol_ = protocol; protocol_ = protocol;
active = true; active = true;
return this;
}
/**
* Returns: Transport used by this watcher.
*/
@property inout(StreamTransport) transport() inout pure nothrow @nogc
{
return transport_;
}
/**
* Sets an exception occurred during a read/write operation.
*
* Params:
* exception = Thrown exception.
*/
@property void exception(SocketException exception) pure nothrow @nogc
{
exception_ = exception;
} }
/** /**
@ -221,10 +207,17 @@ class IOWatcher : ConnectionWatcher
/** /**
* Returns: Socket. * Returns: Socket.
*
* Precondition: $(D_INLINECODE socket !is null)
*/ */
override @property Socket socket() pure nothrow @nogc override @property ConnectedSocket socket() pure nothrow @safe @nogc
out (socket)
{ {
return transport.socket; assert(socket !is null);
}
body
{
return cast(ConnectedSocket) socket_;
} }
/** /**
@ -239,7 +232,7 @@ class IOWatcher : ConnectionWatcher
} }
else else
{ {
protocol.disconnected(exception_); protocol.disconnected(exception);
active = false; active = false;
} }
} }