Let Transport extend IOWatcher
This commit is contained in:
parent
0e91ea6786
commit
f9023cf0ab
@ -123,30 +123,27 @@ class EpollLoop : SelectorLoop
|
|||||||
|
|
||||||
for (auto i = 0; i < eventCount; ++i)
|
for (auto i = 0; i < eventCount; ++i)
|
||||||
{
|
{
|
||||||
auto io = cast(IOWatcher) connections[events[i].data.fd];
|
auto transport = cast(SelectorStreamTransport) connections[events[i].data.fd];
|
||||||
|
|
||||||
if (io is null)
|
if (transport is null)
|
||||||
{
|
{
|
||||||
acceptConnections(connections[events[i].data.fd]);
|
acceptConnections(connections[events[i].data.fd]);
|
||||||
}
|
}
|
||||||
else if (events[i].events & EPOLLERR)
|
else if (events[i].events & EPOLLERR)
|
||||||
{
|
{
|
||||||
kill(io, null);
|
kill(transport, null);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
|
else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
|
||||||
{
|
{
|
||||||
auto transport = cast(SelectorStreamTransport) io.transport;
|
|
||||||
assert(transport !is null);
|
|
||||||
|
|
||||||
SocketException exception;
|
SocketException exception;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
ptrdiff_t received;
|
ptrdiff_t received;
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
received = transport.socket.receive(io.output[]);
|
received = transport.socket.receive(transport.output[]);
|
||||||
io.output += received;
|
transport.output += received;
|
||||||
}
|
}
|
||||||
while (received);
|
while (received);
|
||||||
}
|
}
|
||||||
@ -156,19 +153,16 @@ class EpollLoop : SelectorLoop
|
|||||||
}
|
}
|
||||||
if (transport.socket.disconnected)
|
if (transport.socket.disconnected)
|
||||||
{
|
{
|
||||||
kill(io, exception);
|
kill(transport, exception);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else if (io.output.length)
|
else if (transport.output.length)
|
||||||
{
|
{
|
||||||
pendings.enqueue(io);
|
pendings.enqueue(transport);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (events[i].events & EPOLLOUT)
|
if (events[i].events & EPOLLOUT)
|
||||||
{
|
{
|
||||||
auto transport = cast(SelectorStreamTransport) io.transport;
|
|
||||||
assert(transport !is null);
|
|
||||||
|
|
||||||
transport.writeReady = true;
|
transport.writeReady = true;
|
||||||
if (transport.input.length)
|
if (transport.input.length)
|
||||||
{
|
{
|
||||||
|
@ -259,7 +259,7 @@ class IOCPLoop : Loop
|
|||||||
auto transport = MmapPool.instance.make!IOCPStreamTransport(socket, protocol);
|
auto transport = MmapPool.instance.make!IOCPStreamTransport(socket, protocol);
|
||||||
auto io = MmapPool.instance.make!IOWatcher(transport, socket, protocol);
|
auto io = MmapPool.instance.make!IOWatcher(transport, socket, protocol);
|
||||||
|
|
||||||
connection.incoming.enqueue(io);
|
connection.incoming.enqueue(transport);
|
||||||
|
|
||||||
reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
|
reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
|
||||||
|
|
||||||
|
@ -25,12 +25,8 @@ import tanya.network.socket;
|
|||||||
/**
|
/**
|
||||||
* Transport for stream sockets.
|
* Transport for stream sockets.
|
||||||
*/
|
*/
|
||||||
class SelectorStreamTransport : StreamTransport
|
class SelectorStreamTransport : IOWatcher, StreamTransport
|
||||||
{
|
{
|
||||||
private ConnectedSocket socket_;
|
|
||||||
|
|
||||||
private Protocol protocol_;
|
|
||||||
|
|
||||||
/// Input buffer.
|
/// Input buffer.
|
||||||
package WriteBuffer!ubyte input;
|
package WriteBuffer!ubyte input;
|
||||||
|
|
||||||
@ -43,41 +39,38 @@ class SelectorStreamTransport : StreamTransport
|
|||||||
* Params:
|
* Params:
|
||||||
* loop = Event loop.
|
* loop = Event loop.
|
||||||
* socket = Socket.
|
* socket = Socket.
|
||||||
* protocol = Application protocol.
|
|
||||||
*
|
*
|
||||||
* Precondition: $(D_INLINECODE loop !is null
|
* Precondition: $(D_INLINECODE loop !is null && socket !is null)
|
||||||
* && socket !is null
|
|
||||||
* && protocol !is null)
|
|
||||||
*/
|
*/
|
||||||
this(SelectorLoop loop, ConnectedSocket socket, Protocol protocol) @nogc
|
this(SelectorLoop loop, ConnectedSocket socket) @nogc
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
assert(loop !is null);
|
assert(loop !is null);
|
||||||
assert(socket !is null);
|
assert(socket !is null);
|
||||||
assert(protocol !is null);
|
|
||||||
}
|
}
|
||||||
body
|
body
|
||||||
{
|
{
|
||||||
socket_ = socket;
|
super(socket);
|
||||||
this.loop = loop;
|
this.loop = loop;
|
||||||
protocol_ = protocol;
|
|
||||||
input = WriteBuffer!ubyte(8192, MmapPool.instance);
|
input = WriteBuffer!ubyte(8192, MmapPool.instance);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns: Socket.
|
* Returns: Socket.
|
||||||
*/
|
*/
|
||||||
ConnectedSocket socket() pure nothrow @safe @nogc
|
override @property ConnectedSocket socket() pure nothrow @safe @nogc
|
||||||
{
|
{
|
||||||
return socket_;
|
return cast(ConnectedSocket) socket_;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private @property void socket(ConnectedSocket socket) pure nothrow @safe @nogc
|
||||||
* Returns: Application protocol.
|
in
|
||||||
*/
|
|
||||||
@property Protocol protocol() pure nothrow @safe @nogc
|
|
||||||
{
|
{
|
||||||
return protocol_;
|
assert(socket !is null);
|
||||||
|
}
|
||||||
|
body
|
||||||
|
{
|
||||||
|
socket_ = socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -268,30 +261,28 @@ abstract class SelectorLoop : Loop
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
IOWatcher io;
|
SelectorStreamTransport transport;
|
||||||
auto protocol = connection.protocol;
|
|
||||||
auto transport = MmapPool.instance.make!SelectorStreamTransport(this, client, protocol);
|
|
||||||
|
|
||||||
if (connections.length > client.handle)
|
if (connections.length > client.handle)
|
||||||
{
|
{
|
||||||
io = cast(IOWatcher) connections[client.handle];
|
transport = cast(SelectorStreamTransport) connections[client.handle];
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
connections.length = client.handle + maxEvents / 2;
|
connections.length = client.handle + maxEvents / 2;
|
||||||
}
|
}
|
||||||
if (io is null)
|
if (transport is null)
|
||||||
{
|
{
|
||||||
io = MmapPool.instance.make!IOWatcher(transport, client, protocol);
|
transport = MmapPool.instance.make!SelectorStreamTransport(this, client);
|
||||||
connections[client.handle] = io;
|
connections[client.handle] = transport;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
io(transport, client, protocol);
|
transport.socket = client;
|
||||||
}
|
}
|
||||||
|
|
||||||
reify(io, EventMask(Event.none), EventMask(Event.read, Event.write));
|
reify(transport, EventMask(Event.none), EventMask(Event.read, Event.write));
|
||||||
connection.incoming.enqueue(io);
|
connection.incoming.enqueue(transport);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!connection.incoming.empty)
|
if (!connection.incoming.empty)
|
||||||
|
@ -68,7 +68,6 @@ import core.time;
|
|||||||
import std.algorithm.iteration;
|
import std.algorithm.iteration;
|
||||||
import std.algorithm.mutation;
|
import std.algorithm.mutation;
|
||||||
import std.typecons;
|
import std.typecons;
|
||||||
import tanya.async.protocol;
|
|
||||||
import tanya.async.transport;
|
import tanya.async.transport;
|
||||||
import tanya.async.watcher;
|
import tanya.async.watcher;
|
||||||
import tanya.container.buffer;
|
import tanya.container.buffer;
|
||||||
@ -269,12 +268,20 @@ abstract class Loop
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Kills the watcher and closes the connection.
|
* Kills the watcher and closes the connection.
|
||||||
|
*
|
||||||
|
* Params:
|
||||||
|
* watcher = Watcher.
|
||||||
|
* exception = Occurred exception.
|
||||||
*/
|
*/
|
||||||
protected void kill(IOWatcher watcher, SocketException exception) @nogc
|
protected void kill(IOWatcher watcher, SocketException exception) @nogc
|
||||||
|
in
|
||||||
|
{
|
||||||
|
assert(watcher !is null);
|
||||||
|
}
|
||||||
|
body
|
||||||
{
|
{
|
||||||
watcher.socket.shutdown();
|
watcher.socket.shutdown();
|
||||||
defaultAllocator.dispose(watcher.socket);
|
defaultAllocator.dispose(watcher.socket);
|
||||||
MmapPool.instance.dispose(watcher.transport);
|
|
||||||
watcher.exception = exception;
|
watcher.exception = exception;
|
||||||
pendings.enqueue(watcher);
|
pendings.enqueue(watcher);
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ class ConnectionWatcher : Watcher
|
|||||||
/// Protocol factory.
|
/// Protocol factory.
|
||||||
protected Protocol delegate() @nogc protocolFactory;
|
protected Protocol delegate() @nogc protocolFactory;
|
||||||
|
|
||||||
package Queue!IOWatcher incoming;
|
package Queue!DuplexTransport incoming;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Params:
|
* Params:
|
||||||
@ -61,7 +61,7 @@ class ConnectionWatcher : Watcher
|
|||||||
*/
|
*/
|
||||||
this(Socket socket) @nogc
|
this(Socket socket) @nogc
|
||||||
{
|
{
|
||||||
incoming = Queue!IOWatcher(MmapPool.instance);
|
incoming = Queue!DuplexTransport(MmapPool.instance);
|
||||||
socket_ = socket;
|
socket_ = socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,26 +94,19 @@ class ConnectionWatcher : Watcher
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns: New protocol instance.
|
* Invokes new connection callback.
|
||||||
*/
|
*/
|
||||||
@property Protocol protocol() @nogc
|
override void invoke() @nogc
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
assert(protocolFactory !is null, "Protocol isn't set.");
|
assert(protocolFactory !is null, "Protocol isn't set.");
|
||||||
}
|
}
|
||||||
body
|
body
|
||||||
{
|
{
|
||||||
return protocolFactory();
|
foreach (transport; incoming)
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Invokes new connection callback.
|
|
||||||
*/
|
|
||||||
override void invoke() @nogc
|
|
||||||
{
|
|
||||||
foreach (io; incoming)
|
|
||||||
{
|
{
|
||||||
io.protocol.connected(cast(DuplexTransport) io.transport);
|
transport.protocol = protocolFactory();
|
||||||
|
transport.protocol.connected(transport);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -124,10 +117,9 @@ class ConnectionWatcher : Watcher
|
|||||||
*/
|
*/
|
||||||
class IOWatcher : ConnectionWatcher
|
class IOWatcher : ConnectionWatcher
|
||||||
{
|
{
|
||||||
package StreamTransport transport;
|
|
||||||
package SocketException exception;
|
package SocketException exception;
|
||||||
|
|
||||||
private Protocol protocol_;
|
protected Protocol protocol_;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns: Underlying output buffer.
|
* Returns: Underlying output buffer.
|
||||||
@ -136,27 +128,18 @@ class IOWatcher : ConnectionWatcher
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Params:
|
* Params:
|
||||||
* transport = Transport.
|
* socket = Socket.
|
||||||
* socket = Socket.
|
|
||||||
* protocol = New instance of the application protocol.
|
|
||||||
*
|
*
|
||||||
* Precondition: $(D_INLINECODE transport !is null
|
* Precondition: $(D_INLINECODE socket !is null)
|
||||||
* && socket !is null
|
|
||||||
* && protocol !is null)
|
|
||||||
*/
|
*/
|
||||||
this(StreamTransport transport, ConnectedSocket socket, Protocol protocol)
|
this(ConnectedSocket socket) @nogc
|
||||||
@nogc
|
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
assert(transport !is null);
|
|
||||||
assert(socket !is null);
|
assert(socket !is null);
|
||||||
assert(protocol !is null);
|
|
||||||
}
|
}
|
||||||
body
|
body
|
||||||
{
|
{
|
||||||
super(socket);
|
super(socket);
|
||||||
this.transport = transport;
|
|
||||||
protocol_ = protocol;
|
|
||||||
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
|
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
|
||||||
active = true;
|
active = true;
|
||||||
}
|
}
|
||||||
@ -169,38 +152,10 @@ class IOWatcher : ConnectionWatcher
|
|||||||
MmapPool.instance.dispose(protocol_);
|
MmapPool.instance.dispose(protocol_);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Reinitializes the watcher.
|
|
||||||
*
|
|
||||||
* Params:
|
|
||||||
* transport = Transport.
|
|
||||||
* socket = Socket.
|
|
||||||
* protocol = New instance of the application protocol.
|
|
||||||
*
|
|
||||||
* Precondition: $(D_INLINECODE transport !is null
|
|
||||||
* && socket !is null
|
|
||||||
* && protocol !is null)
|
|
||||||
*/
|
|
||||||
void opCall(StreamTransport transport,
|
|
||||||
ConnectedSocket socket,
|
|
||||||
Protocol protocol) pure nothrow @nogc
|
|
||||||
in
|
|
||||||
{
|
|
||||||
assert(transport !is null);
|
|
||||||
assert(socket !is null);
|
|
||||||
assert(protocol !is null);
|
|
||||||
}
|
|
||||||
body
|
|
||||||
{
|
|
||||||
this.transport = transport;
|
|
||||||
protocol_ = protocol;
|
|
||||||
active = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns: Application protocol.
|
* Returns: Application protocol.
|
||||||
*/
|
*/
|
||||||
override @property Protocol protocol() pure nothrow @safe @nogc
|
@property Protocol protocol() pure nothrow @safe @nogc
|
||||||
{
|
{
|
||||||
return protocol_;
|
return protocol_;
|
||||||
}
|
}
|
||||||
@ -233,6 +188,8 @@ class IOWatcher : ConnectionWatcher
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
protocol.disconnected(exception);
|
protocol.disconnected(exception);
|
||||||
|
MmapPool.instance.dispose(protocol);
|
||||||
|
defaultAllocator.dispose(exception);
|
||||||
active = false;
|
active = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user