Inherit IOCPTransport from IOWatcher

This commit is contained in:
Eugen Wissner 2017-02-09 16:35:09 +01:00 committed by Eugene Wissner
parent f9023cf0ab
commit 530a482402
2 changed files with 23 additions and 57 deletions

View File

@ -28,48 +28,33 @@ import core.sys.windows.winsock2;
class IOCPStreamTransport : StreamTransport class IOCPStreamTransport : StreamTransport
{ {
private OverlappedConnectedSocket socket_;
private Protocol protocol_;
private WriteBuffer!ubyte input; private WriteBuffer!ubyte input;
/** /**
* Creates new completion port transport. * Creates new completion port transport.
* *
* Params: * Params:
* socket = Socket. * socket = Socket.
* protocol = Application protocol.
* *
* Precondition: $(D_INLINECODE socket !is null && protocol !is null) * Precondition: $(D_INLINECODE socket)
*/ */
this(OverlappedConnectedSocket socket, Protocol protocol) @nogc this(OverlappedConnectedSocket socket) @nogc
in in
{ {
assert(socket !is null); assert(socket !is null);
assert(protocol !is null);
} }
body body
{ {
socket_ = socket; super(socket);
protocol_ = protocol;
input = WriteBuffer!ubyte(8192, MmapPool.instance); input = WriteBuffer!ubyte(8192, MmapPool.instance);
} }
/** /**
* Returns: Socket. * Returns: Socket.
*/ */
@property OverlappedConnectedSocket socket() pure nothrow @safe @nogc override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc
{ {
return socket_; return cast(OverlappedConnectedSocket) socket_;
}
/**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
} }
/** /**
@ -185,10 +170,7 @@ class IOCPLoop : Loop
if (!(oldEvents & Event.read) && (events & Event.read) if (!(oldEvents & Event.read) && (events & Event.read)
|| !(oldEvents & Event.write) && (events & Event.write)) || !(oldEvents & Event.write) && (events & Event.write))
{ {
auto io = cast(IOWatcher) watcher; auto transport = cast(IOCPStreamTransport) watcher;
assert(io !is null);
auto transport = cast(IOCPStreamTransport) io.transport;
assert(transport !is null); assert(transport !is null);
if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle, if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
@ -205,7 +187,7 @@ class IOCPLoop : Loop
try try
{ {
overlapped = MmapPool.instance.make!SocketState; overlapped = MmapPool.instance.make!SocketState;
transport.socket.beginReceive(io.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
} }
catch (SocketException e) catch (SocketException e)
{ {
@ -255,30 +237,26 @@ class IOCPLoop : Loop
assert(listener !is null); assert(listener !is null);
auto socket = listener.endAccept(overlapped); auto socket = listener.endAccept(overlapped);
auto protocol = connection.protocol; auto transport = MmapPool.instance.make!IOCPStreamTransport(socket);
auto transport = MmapPool.instance.make!IOCPStreamTransport(socket, protocol);
auto io = MmapPool.instance.make!IOWatcher(transport, socket, protocol);
connection.incoming.enqueue(transport); connection.incoming.enqueue(transport);
reify(io, EventMask(Event.none), EventMask(Event.read, Event.write)); reify(transport, EventMask(Event.none), EventMask(Event.read, Event.write));
pendings.enqueue(connection); pendings.enqueue(connection);
listener.beginAccept(overlapped); listener.beginAccept(overlapped);
break; break;
case OverlappedSocketEvent.read: case OverlappedSocketEvent.read:
auto io = cast(IOWatcher) (cast(void*) key); auto transport = cast(IOCPStreamTransport) (cast(void*) key);
assert(io !is null); assert(transport !is null);
if (!io.active)
if (!transport.active)
{ {
MmapPool.instance.dispose(io); MmapPool.instance.dispose(transport);
MmapPool.instance.dispose(overlapped); MmapPool.instance.dispose(overlapped);
return; return;
} }
auto transport = cast(IOCPStreamTransport) io.transport;
assert(transport !is null);
int received; int received;
SocketException exception; SocketException exception;
try try
@ -292,27 +270,24 @@ class IOCPLoop : Loop
if (transport.socket.disconnected) if (transport.socket.disconnected)
{ {
// We want to get one last notification to destroy the watcher // We want to get one last notification to destroy the watcher
transport.socket.beginReceive(io.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
kill(io, exception); kill(transport, exception);
} }
else if (received > 0) else if (received > 0)
{ {
immutable full = io.output.free == received; immutable full = transport.output.free == received;
io.output += received; transport.output += received;
// Receive was interrupted because the buffer is full. We have to continue // Receive was interrupted because the buffer is full. We have to continue
if (full) if (full)
{ {
transport.socket.beginReceive(io.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
} }
pendings.enqueue(io); pendings.enqueue(transport);
} }
break; break;
case OverlappedSocketEvent.write: case OverlappedSocketEvent.write:
auto io = cast(IOWatcher) (cast(void*) key); auto transport = cast(IOCPStreamTransport) (cast(void*) key);
assert(io !is null);
auto transport = cast(IOCPStreamTransport) io.transport;
assert(transport !is null); assert(transport !is null);
transport.input += transport.socket.endSend(overlapped); transport.input += transport.socket.endSend(overlapped);
@ -322,7 +297,7 @@ class IOCPLoop : Loop
} }
else else
{ {
transport.socket.beginReceive(io.output[], overlapped); transport.socket.beginReceive(transport.output[], overlapped);
} }
break; break;
default: default:

View File

@ -21,15 +21,6 @@ import tanya.memory;
import tanya.memory.mmappool; import tanya.memory.mmappool;
import tanya.network.socket; import tanya.network.socket;
version (Windows)
{
import core.sys.windows.basetyps;
import core.sys.windows.mswsock;
import core.sys.windows.winbase;
import core.sys.windows.windef;
import core.sys.windows.winsock2;
}
/** /**
* A watcher is an opaque structure that you allocate and register to record * A watcher is an opaque structure that you allocate and register to record
* your interest in some event. * your interest in some event.