76 Commits

Author SHA1 Message Date
278e851414 Rename String.toString to String.get()
Last changed it only by the Range and forgotten about the string itself.
2017-06-08 07:59:16 +02:00
6f549df243 Update README description 2017-06-07 08:04:50 +02:00
4633bcc680 Set: Fix comparing with removed elements 2017-06-07 07:57:22 +02:00
dc39efd316 Add some unit tests for InternetAddress 2017-06-03 15:18:53 +02:00
260937e4fb Put socket overlapped I/O docs into a D_Ddoc block 2017-06-03 13:20:32 +02:00
e17fff2881 Update 2.074 compiler 2017-06-02 22:01:13 +02:00
bc32511254 Fix template parameters for Set 2017-06-01 22:36:38 +02:00
f30972f948 Add basic constructors and opAssign to Set 2017-06-01 06:26:06 +02:00
ea33ca62c8 Implement lookups in the Set 2017-05-31 10:29:07 +02:00
0f365758e1 Add optional fieldnames for Pair 2017-05-30 20:20:20 +02:00
2815b53a88 Implement Set Range 2017-05-30 15:52:18 +02:00
6c0588164a Rename String.toString back to get()
Since it is expected that the return type of toString() is
immutable(char)[] and not char[] or const(char)[].
2017-05-29 11:41:49 +02:00
8ee1d647ce Close issue 212
Introduce Range and constRange aliases for containers.
2017-05-29 11:26:39 +02:00
25791775e6 Add information about the Set to README 2017-05-29 10:58:37 +02:00
f013e2f1f4 Implement a Set container first 2017-05-29 10:50:01 +02:00
ac3935d71b Merge branch 'master' into horton-table 2017-05-28 10:15:02 +02:00
b1c217e272 Fix kqueue to work with SocketType 2017-05-25 22:21:45 +02:00
d007aaa310 Rename socket_t to SocketType 2017-05-25 21:59:40 +02:00
8aaf9e14be Add HashTable struct 2017-05-23 22:17:35 +02:00
ae3e6b46f6 Import std.algorithm.comparison for network.socket on Windows 2017-05-21 10:25:54 +02:00
8687df1fbb Define AddressFamily in network.socket 2017-05-21 10:20:57 +02:00
ba0aff6737 Add tanya.typecons.Pair 2017-05-19 21:15:56 +02:00
a648e2120a Fix parameter count for docs in container.string 2017-05-19 20:01:04 +02:00
bc61809050 Implement DList.insertBack 2017-05-16 13:16:18 +02:00
8c42cbfd63 Rename Vector to Array
For consistency with Phobos.
2017-05-16 12:12:57 +02:00
58664570f9 Add new branch, add DList to package description 2017-05-15 20:09:32 +02:00
decb82f437 Remove crypto.mac for now
It wasn't released yet and needs some work.
2017-05-15 19:57:36 +02:00
357c7e279d Add doubly-linked list
DList is an adjusted copy of SList. Missing:
* insertBack
* insertAfter
* removeBack
2017-05-15 19:50:20 +02:00
32e19c8b58 Rename String.get into toString. Add String.toStringz 2017-05-14 11:56:57 +02:00
f5c6c5b483 Add Payload template for memory.types 2017-05-13 08:43:49 +02:00
ba2d086fb8 Add memory.types.Scoped 2017-05-13 08:27:51 +02:00
7a0241b484 Fix unittest text for strings 2017-05-12 22:42:43 +02:00
36dad80e18 Use char ranges to avoid compilation errors on elder compilers 2017-05-12 21:46:48 +02:00
29d883150e Fix unittests on 2.072 and 2.071 2017-05-12 21:10:22 +02:00
e2bed0cfcb Replace tabs with spaces in mmappool and buffer 2017-05-12 21:02:24 +02:00
38afeac071 Insert String.insertFront and String.insertBack 2017-05-12 20:55:42 +02:00
001c7c3e33 Replace immutable with const in Vector 2017-05-12 20:35:16 +02:00
d4ab339feb Add String.remove 2017-05-12 20:23:16 +02:00
8477312769 Add editorconfig 2017-05-11 13:57:24 +02:00
67f90e137d Add codecov badge 2017-05-11 13:15:04 +02:00
f264fd5597 Generate unittest coverage information 2017-05-11 13:11:12 +02:00
9e75620f1b Fix appveyor branch badges 2017-05-11 07:05:13 +02:00
45825946c0 Appveyor (#10)
* Add appveyor.yml

* Try major VC version

* Switch to VC 2015

* Try new version

* Try enterprise

* Try another path

* Change VC template

* Set arch

* Set LINKCMD64

* Fix quotes

* Update LINKCMD64

* remove dir

* Update arch

* Fix syntax

* Set arch to x64

* Remove extra dub downloading

* Remove dub version

* Download dub for 2.071.2

* Use DVersion

* Fix nul in powershell

* Put quotes to commands

* Add badges
2017-05-11 06:26:59 +02:00
8afb552d59 mp.Integer: add two's complement constructor 2017-05-10 19:27:25 +02:00
e4091669f8 Add information about io branch 2017-05-10 13:18:58 +02:00
1cb9349226 math.mp.Integer.toVector return two's complement 2017-05-09 06:27:30 +02:00
06620dc5df math.mp.Integer: Return two's complement length 2017-05-08 21:09:52 +02:00
708d95db49 Remove utf8string branch 2017-05-06 11:55:20 +02:00
85d9361bfb Fix fill with char on older compilers 2017-05-05 07:03:16 +02:00
a6a6f496eb Implement string slice assignments 2017-05-04 23:17:50 +02:00
db12f03264 Merge branch 'master' into utf8string 2017-05-03 19:15:13 +02:00
231aedb8ad Add HMAC 2017-05-03 19:05:23 +02:00
c3b63ee40d Merge branch 'master' into utf8string 2017-05-02 10:59:00 +02:00
6f405c5e08 Make Vector's opSliceAssign accept only own ranges
Vector.opSliceAssign and Vector.opIndexAssign should accept only vector
ranges. For assigning other ranges, std.algorithm.mutation.copy and
std.algorithm.mutation.fill should be used.
2017-05-02 10:56:32 +02:00
16cf8478cf Add ByCodePoint 2017-05-01 20:17:37 +02:00
8915a0c7a7 Implement opCmp and opEquals for the String 2017-05-01 18:43:12 +02:00
e5c7edb72c Implement String opAssign 2017-05-01 12:58:37 +02:00
64e0d666ed Merge branch 'master' of github.com:caraus-ecms/tanya into utf8string 2017-05-01 09:59:29 +02:00
f2aac680c5 Fix container ctors and opAssign ref parameters
Container constructors and opAssign should accept any ref container and
not only const, otherwise the source container will be copied because
the constructor/opAssign without ref would be a better match.
2017-05-01 09:48:12 +02:00
d629525a4b Make String to be a char Slice alias 2017-04-21 14:03:20 +02:00
33d321f0d7 Merge branch 'master' into utf8string 2017-04-20 17:32:59 +02:00
3d64d59ba9 Merge branch 'master' of github.com:caraus-ecms/tanya 2017-04-20 17:32:29 +02:00
4635835a99 Rename Vector range to Slice 2017-04-20 17:32:16 +02:00
8725ec5f20 Make Integer representation little endian 2017-04-19 13:49:44 +02:00
9a4c8cea06 Merge branch 'master' into utf8string 2017-04-16 20:52:40 +02:00
eb360bda38 Add unittest to check RefCounted calles struct destructors 2017-04-16 20:52:24 +02:00
4b1cd2cbfd Merge branch 'master' into utf8string 2017-04-16 20:15:11 +02:00
cd944a61b7 Merge remote-tracking branch 'origin/master' into utf8string 2017-04-13 16:03:00 +02:00
47ef787353 Add missing constructors to the String 2017-04-10 08:10:08 +02:00
6436ad49df Add ByteRange to the String 2017-04-08 17:44:08 +02:00
e1964e47a5 Merge branch 'master' into utf8string 2017-04-08 08:44:21 +02:00
43319e4e3a Initialization from a UTF-16 string 2017-02-27 11:27:24 +01:00
33dbf042c2 Add dchar constructor 2017-02-26 22:40:27 +01:00
885fca9b5e Add String.reserve and shrink 2017-02-20 12:01:15 +01:00
074d027629 Merge branch 'master' into utf8string 2017-02-20 08:02:01 +01:00
f4b90d8b51 Add string skeleton 2017-02-10 19:22:46 +01:00
27 changed files with 8056 additions and 3895 deletions

9
.editorconfig Normal file
View File

@ -0,0 +1,9 @@
root = true
[*]
end_of_line = lf
insert_final_newline = true
charset = utf-8
indent_style = space
indent_size = 4
trim_trailing_whitespace = true

View File

@ -7,7 +7,7 @@ os:
language: d
d:
- dmd-2.074.0
- dmd-2.074.1
- dmd-2.073.2
- dmd-2.072.2
- dmd-2.071.2
@ -17,4 +17,7 @@ env:
- ARCH=x86_64
script:
- dub test --arch=$ARCH
- dub test -b unittest-cov --arch=$ARCH
after_success:
- bash <(curl -s https://codecov.io/bash)

View File

@ -1,6 +1,8 @@
# Tanya
[![Build Status](https://travis-ci.org/caraus-ecms/tanya.svg?branch=master)](https://travis-ci.org/caraus-ecms/tanya)
[![Build status](https://travis-ci.org/caraus-ecms/tanya.svg?branch=master)](https://travis-ci.org/caraus-ecms/tanya)
[![Build status](https://ci.appveyor.com/api/projects/status/djkmverdfsylc7ti/branch/master?svg=true)](https://ci.appveyor.com/project/belka-ew/tanya/branch/master)
[![codecov](https://codecov.io/gh/caraus-ecms/tanya/branch/master/graph/badge.svg)](https://codecov.io/gh/caraus-ecms/tanya)
[![Dub version](https://img.shields.io/dub/v/tanya.svg)](https://code.dlang.org/packages/tanya)
[![Dub downloads](https://img.shields.io/dub/dt/tanya.svg)](https://code.dlang.org/packages/tanya)
[![License](https://img.shields.io/badge/license-MPL_2.0-blue.svg)](https://raw.githubusercontent.com/caraus-ecms/tanya/master/LICENSE)
@ -13,7 +15,7 @@ Garbage Collector heap. Everything in the library is usable in @nogc code.
Tanya extends Phobos functionality and provides alternative implementations for
data structures and utilities that depend on the Garbage Collector in Phobos.
* [Bug tracker](https://issues.caraus.io/projects/tanya)
* [Bug tracker](https://issues.caraus.io/projects/tanya/issues)
* [Documentation](https://docs.caraus.io/tanya)
## Overview
@ -21,51 +23,46 @@ data structures and utilities that depend on the Garbage Collector in Phobos.
Tanya consists of the following packages:
* `async`: Event loop (epoll, kqueue and IOCP).
* `container`: Queue, Vector, Singly linked list, buffers.
* `container`: Queue, Array, Singly and doubly linked lists, Buffers, UTF-8
string, Hash set.
* `math`: Arbitrary precision integer and a set of functions.
* `memory`: Tools for manual memory management (allocator, reference counting,
helper functions).
* `network`: URL-Parsing, sockets.
* `memory`: Tools for manual memory management (allocators, smart pointers).
* `network`: URL-Parsing, sockets, utilities.
### Supported compilers
| dmd |
|:-------:|
| 2.074.0 |
| 2.074.1 |
| 2.073.2 |
| 2.072.2 |
| 2.071.2 |
### Current status
The library is currently under development, but the API is becoming gradually
stable.
Following modules are under development:
Following modules are coming soon:
| Feature | Build status |
|--------------|:-----------------------------------------------------------------------------------------------------------------------:|
| UTF-8 string | [![utf8string](https://travis-ci.org/caraus-ecms/tanya.svg?branch=utf8string)](https://travis-ci.org/caraus-ecms/tanya) |
| BitVector | [![bitvector](https://travis-ci.org/caraus-ecms/tanya.svg?branch=bitvector)](https://travis-ci.org/caraus-ecms/tanya) |
| Hash table | N/A |
`math` package contains an arbitrary precision integer implementation that
needs more test cases, better performance and some additional features
(constructing from a string and an ubyte array, and converting it back).
| Feature | Branch | Build status |
|----------|:---------:|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| BitArray | bitvector | [![bitvector](https://travis-ci.org/caraus-ecms/tanya.svg?branch=bitvector)](https://travis-ci.org/caraus-ecms/tanya) [![bitvector](https://ci.appveyor.com/api/projects/status/djkmverdfsylc7ti/branch/bitvector?svg=true)](https://ci.appveyor.com/project/belka-ew/tanya/branch/bitvector) |
| TLS | crypto | [![crypto](https://travis-ci.org/caraus-ecms/tanya.svg?branch=crypto)](https://travis-ci.org/caraus-ecms/tanya) [![crypto](https://ci.appveyor.com/api/projects/status/djkmverdfsylc7ti/branch/crypto?svg=true)](https://ci.appveyor.com/project/belka-ew/tanya/branch/crypto) |
| File IO | io | [![io](https://travis-ci.org/caraus-ecms/tanya.svg?branch=io)](https://travis-ci.org/caraus-ecms/tanya) [![io](https://ci.appveyor.com/api/projects/status/djkmverdfsylc7ti/branch/io?svg=true)](https://ci.appveyor.com/project/belka-ew/tanya/branch/io) |
### Further characteristics
* Tanya is a native D library.
* Tanya is a native D library without any external dependencies.
* Tanya is cross-platform. The development happens on a 64-bit Linux, but it
is being tested on Windows and FreeBSD as well.
* The library isn't thread-safe. Thread-safity should be added later.
* The library isn't thread-safe yet.
## Release management
3-week release cycle.
Deprecated features are removed after one release (in approximately 6 weeks after deprecating).
## Contributing
Since I'm mostly busy writing new code and implementing new features I would
appreciate, if anyone uses the library. It would help me to improve the
codebase and fix issues.
Feel free to contact me if you have any questions.
Feel free to contact me if you have any questions: info@caraus.de.

52
appveyor.yml Normal file
View File

@ -0,0 +1,52 @@
platform: x64
os: Visual Studio 2017
environment:
matrix:
- DC: dmd
DVersion: 2.074.1
arch: x86
- DC: dmd
DVersion: 2.073.2
arch: x86
- DC: dmd
DVersion: 2.072.2
arch: x86
- DC: dmd
DVersion: 2.071.2
arch: x86
skip_tags: true
install:
- ps: function SetUpDCompiler
{
$env:toolchain = "msvc";
$version = $env:DVersion;
Invoke-WebRequest "http://downloads.dlang.org/releases/2.x/$($version)/dmd.$($version).windows.7z" -OutFile "c:\dmd.7z";
echo "finished.";
pushd c:\\;
7z x dmd.7z > $null;
popd;
}
- ps: SetUpDCompiler
- ps: if($env:DVersion -eq "2.071.2"){
Invoke-WebRequest "http://code.dlang.org/files/dub-1.2.1-windows-x86.zip" -OutFile "dub.zip";
7z x dub.zip -odub > $null;
Move-Item "dub/dub.exe" "C:\dmd2\windows\bin"
}
before_build:
- ps: $env:PATH += ";C:\dmd2\windows\bin;";
- call "C:\Program Files (x86)\Microsoft Visual Studio\2017\Community\Common7\Tools\VsDevCmd.bat" -arch=%arch%
build_script:
- echo dummy build script - dont remove me
test_script:
- echo %DC%
- echo %PATH%
- 'dub --version'
- '%DC% --version'
- dub test --arch=x86 --compiler=%DC%

View File

@ -1,6 +1,6 @@
{
"name": "tanya",
"description": "General purpose, @nogc library",
"description": "General purpose, @nogc library. Containers, networking, memory management, utilities",
"license": "MPL-2.0",
"copyright": "(c) Eugene Wissner <info@caraus.de>",
"authors": [

View File

@ -18,7 +18,7 @@ import tanya.async.event.selector;
import tanya.async.loop;
import tanya.async.transport;
import tanya.async.watcher;
import tanya.container.vector;
import tanya.container.array;
import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket;
@ -29,153 +29,153 @@ import std.algorithm.comparison;
extern (C) nothrow @nogc
{
int epoll_create1(int flags);
int epoll_ctl (int epfd, int op, int fd, epoll_event *event);
int epoll_wait (int epfd, epoll_event *events, int maxevents, int timeout);
int epoll_create1(int flags);
int epoll_ctl (int epfd, int op, int fd, epoll_event *event);
int epoll_wait (int epfd, epoll_event *events, int maxevents, int timeout);
}
final class EpollLoop : SelectorLoop
{
protected int fd;
private Vector!epoll_event events;
protected int fd;
private Array!epoll_event events;
/**
* Initializes the loop.
*/
this() @nogc
{
if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
{
throw defaultAllocator.make!BadLoopException("epoll initialization failed");
}
super();
events = Vector!epoll_event(maxEvents, MmapPool.instance);
}
/**
* Initializes the loop.
*/
this() @nogc
{
if ((fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
{
throw defaultAllocator.make!BadLoopException("epoll initialization failed");
}
super();
events = Array!epoll_event(maxEvents, MmapPool.instance);
}
/**
* Frees loop internals.
*/
~this() @nogc
{
close(fd);
}
/**
* Frees loop internals.
*/
~this() @nogc
{
close(fd);
}
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
protected override bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
{
int op = EPOLL_CTL_DEL;
epoll_event ev;
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
protected override bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
{
int op = EPOLL_CTL_DEL;
epoll_event ev;
if (events == oldEvents)
{
return true;
}
if (events && oldEvents)
{
op = EPOLL_CTL_MOD;
}
else if (events && !oldEvents)
{
op = EPOLL_CTL_ADD;
}
if (events == oldEvents)
{
return true;
}
if (events && oldEvents)
{
op = EPOLL_CTL_MOD;
}
else if (events && !oldEvents)
{
op = EPOLL_CTL_ADD;
}
ev.data.fd = watcher.socket.handle;
ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
| (events & Event.write ? EPOLLOUT : 0)
| EPOLLET;
ev.data.fd = watcher.socket.handle;
ev.events = (events & (Event.read | Event.accept) ? EPOLLIN | EPOLLPRI : 0)
| (events & Event.write ? EPOLLOUT : 0)
| EPOLLET;
return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0;
}
return epoll_ctl(fd, op, watcher.socket.handle, &ev) == 0;
}
/**
* Does the actual polling.
*/
protected override void poll() @nogc
{
// Don't block
immutable timeout = cast(immutable int) blockTime.total!"msecs";
auto eventCount = epoll_wait(fd, events.get().ptr, maxEvents, timeout);
/**
* Does the actual polling.
*/
protected override void poll() @nogc
{
// Don't block
immutable timeout = cast(immutable int) blockTime.total!"msecs";
auto eventCount = epoll_wait(fd, events.get().ptr, maxEvents, timeout);
if (eventCount < 0)
{
if (errno != EINTR)
{
throw defaultAllocator.make!BadLoopException();
}
return;
}
if (eventCount < 0)
{
if (errno != EINTR)
{
throw defaultAllocator.make!BadLoopException();
}
return;
}
for (auto i = 0; i < eventCount; ++i)
{
auto transport = cast(StreamTransport) connections[events[i].data.fd];
for (auto i = 0; i < eventCount; ++i)
{
auto transport = cast(StreamTransport) connections[events[i].data.fd];
if (transport is null)
{
auto connection = cast(ConnectionWatcher) connections[events[i].data.fd];
assert(connection !is null);
if (transport is null)
{
auto connection = cast(ConnectionWatcher) connections[events[i].data.fd];
assert(connection !is null);
acceptConnections(connection);
}
else if (events[i].events & EPOLLERR)
{
kill(transport);
continue;
}
else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
{
SocketException exception;
try
{
ptrdiff_t received;
do
{
received = transport.socket.receive(transport.output[]);
transport.output += received;
}
while (received);
}
catch (SocketException e)
{
exception = e;
}
if (transport.socket.disconnected)
{
kill(transport, exception);
continue;
}
else if (transport.output.length)
{
pendings.enqueue(transport);
}
}
if (events[i].events & EPOLLOUT)
{
transport.writeReady = true;
if (transport.input.length)
{
feed(transport);
}
}
}
}
acceptConnections(connection);
}
else if (events[i].events & EPOLLERR)
{
kill(transport);
continue;
}
else if (events[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP))
{
SocketException exception;
try
{
ptrdiff_t received;
do
{
received = transport.socket.receive(transport.output[]);
transport.output += received;
}
while (received);
}
catch (SocketException e)
{
exception = e;
}
if (transport.socket.disconnected)
{
kill(transport, exception);
continue;
}
else if (transport.output.length)
{
pendings.enqueue(transport);
}
}
if (events[i].events & EPOLLOUT)
{
transport.writeReady = true;
if (transport.input.length)
{
feed(transport);
}
}
}
}
/**
* Returns: The blocking time.
*/
override protected @property inout(Duration) blockTime()
inout @safe pure nothrow
{
return min(super.blockTime, 1.dur!"seconds");
}
/**
* Returns: The blocking time.
*/
override protected @property inout(Duration) blockTime()
inout @safe pure nothrow
{
return min(super.blockTime, 1.dur!"seconds");
}
}

View File

@ -31,354 +31,354 @@ import core.sys.windows.winsock2;
*/
final class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{
private SocketException exception;
private SocketException exception;
private ReadBuffer!ubyte output;
private ReadBuffer!ubyte output;
private WriteBuffer!ubyte input;
private WriteBuffer!ubyte input;
private Protocol protocol_;
private Protocol protocol_;
private bool closing;
private bool closing;
/**
* Creates new completion port transport.
*
* Params:
* socket = Socket.
*
* Precondition: $(D_INLINECODE socket !is null)
*/
this(OverlappedConnectedSocket socket) @nogc
{
super(socket);
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true;
}
/**
* Creates new completion port transport.
*
* Params:
* socket = Socket.
*
* Precondition: $(D_INLINECODE socket !is null)
*/
this(OverlappedConnectedSocket socket) @nogc
{
super(socket);
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true;
}
/**
* Returns: Socket.
*
* Postcondition: $(D_INLINECODE socket !is null)
*/
override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc
out (socket)
{
assert(socket !is null);
}
body
{
return cast(OverlappedConnectedSocket) socket_;
}
/**
* Returns: Socket.
*
* Postcondition: $(D_INLINECODE socket !is null)
*/
override @property OverlappedConnectedSocket socket() pure nothrow @safe @nogc
out (socket)
{
assert(socket !is null);
}
body
{
return cast(OverlappedConnectedSocket) socket_;
}
/**
* Returns $(D_PARAM true) if the transport is closing or closed.
*/
bool isClosing() const pure nothrow @safe @nogc
{
return closing;
}
/**
* Returns $(D_PARAM true) if the transport is closing or closed.
*/
bool isClosing() const pure nothrow @safe @nogc
{
return closing;
}
/**
* Close the transport.
*
* Buffered data will be flushed. No more data will be received.
*/
void close() pure nothrow @safe @nogc
{
closing = true;
}
/**
* Close the transport.
*
* Buffered data will be flushed. No more data will be received.
*/
void close() pure nothrow @safe @nogc
{
closing = true;
}
/**
* Write some data to the transport.
*
* Params:
* data = Data to send.
*/
void write(ubyte[] data) @nogc
{
input ~= data;
}
/**
* Write some data to the transport.
*
* Params:
* data = Data to send.
*/
void write(ubyte[] data) @nogc
{
input ~= data;
}
/**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
}
/**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
}
/**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/
@property void protocol(Protocol protocol) pure nothrow @safe @nogc
in
{
assert(protocol !is null);
}
body
{
protocol_ = protocol;
}
/**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/
@property void protocol(Protocol protocol) pure nothrow @safe @nogc
in
{
assert(protocol !is null);
}
body
{
protocol_ = protocol;
}
/**
* Invokes the watcher callback.
*/
override void invoke() @nogc
{
if (output.length)
{
immutable empty = input.length == 0;
protocol.received(output[0 .. $]);
output.clear();
if (empty)
{
SocketState overlapped;
try
{
overlapped = MmapPool.instance.make!SocketState;
socket.beginSend(input[], overlapped);
}
catch (SocketException e)
{
MmapPool.instance.dispose(overlapped);
MmapPool.instance.dispose(e);
}
}
}
else
{
protocol.disconnected(exception);
MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception);
active = false;
}
}
/**
* Invokes the watcher callback.
*/
override void invoke() @nogc
{
if (output.length)
{
immutable empty = input.length == 0;
protocol.received(output[0 .. $]);
output.clear();
if (empty)
{
SocketState overlapped;
try
{
overlapped = MmapPool.instance.make!SocketState;
socket.beginSend(input[], overlapped);
}
catch (SocketException e)
{
MmapPool.instance.dispose(overlapped);
MmapPool.instance.dispose(e);
}
}
}
else
{
protocol.disconnected(exception);
MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception);
active = false;
}
}
}
final class IOCPLoop : Loop
{
protected HANDLE completionPort;
protected HANDLE completionPort;
protected OVERLAPPED overlap;
protected OVERLAPPED overlap;
/**
* Initializes the loop.
*/
this() @nogc
{
super();
/**
* Initializes the loop.
*/
this() @nogc
{
super();
completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (!completionPort)
{
throw make!BadLoopException(defaultAllocator,
"Creating completion port failed");
}
}
completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (!completionPort)
{
throw make!BadLoopException(defaultAllocator,
"Creating completion port failed");
}
}
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
override protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
{
SocketState overlapped;
if (!(oldEvents & Event.accept) && (events & Event.accept))
{
auto socket = cast(OverlappedStreamSocket) watcher.socket;
assert(socket !is null);
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
override protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
{
SocketState overlapped;
if (!(oldEvents & Event.accept) && (events & Event.accept))
{
auto socket = cast(OverlappedStreamSocket) watcher.socket;
assert(socket !is null);
if (CreateIoCompletionPort(cast(HANDLE) socket.handle,
completionPort,
cast(ULONG_PTR) (cast(void*) watcher),
0) !is completionPort)
{
return false;
}
if (CreateIoCompletionPort(cast(HANDLE) socket.handle,
completionPort,
cast(ULONG_PTR) (cast(void*) watcher),
0) !is completionPort)
{
return false;
}
try
{
overlapped = MmapPool.instance.make!SocketState;
socket.beginAccept(overlapped);
}
catch (SocketException e)
{
MmapPool.instance.dispose(overlapped);
defaultAllocator.dispose(e);
return false;
}
}
if (!(oldEvents & Event.read) && (events & Event.read)
|| !(oldEvents & Event.write) && (events & Event.write))
{
auto transport = cast(StreamTransport) watcher;
assert(transport !is null);
try
{
overlapped = MmapPool.instance.make!SocketState;
socket.beginAccept(overlapped);
}
catch (SocketException e)
{
MmapPool.instance.dispose(overlapped);
defaultAllocator.dispose(e);
return false;
}
}
if (!(oldEvents & Event.read) && (events & Event.read)
|| !(oldEvents & Event.write) && (events & Event.write))
{
auto transport = cast(StreamTransport) watcher;
assert(transport !is null);
if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
completionPort,
cast(ULONG_PTR) (cast(void*) watcher),
0) !is completionPort)
{
return false;
}
if (CreateIoCompletionPort(cast(HANDLE) transport.socket.handle,
completionPort,
cast(ULONG_PTR) (cast(void*) watcher),
0) !is completionPort)
{
return false;
}
// Begin to read
if (!(oldEvents & Event.read) && (events & Event.read))
{
try
{
overlapped = MmapPool.instance.make!SocketState;
transport.socket.beginReceive(transport.output[], overlapped);
}
catch (SocketException e)
{
MmapPool.instance.dispose(overlapped);
defaultAllocator.dispose(e);
return false;
}
}
}
return true;
}
// Begin to read
if (!(oldEvents & Event.read) && (events & Event.read))
{
try
{
overlapped = MmapPool.instance.make!SocketState;
transport.socket.beginReceive(transport.output[], overlapped);
}
catch (SocketException e)
{
MmapPool.instance.dispose(overlapped);
defaultAllocator.dispose(e);
return false;
}
}
}
return true;
}
private void kill(StreamTransport transport,
SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{
transport.socket.shutdown();
defaultAllocator.dispose(transport.socket);
transport.exception = exception;
pendings.enqueue(transport);
}
private void kill(StreamTransport transport,
SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{
transport.socket.shutdown();
defaultAllocator.dispose(transport.socket);
transport.exception = exception;
pendings.enqueue(transport);
}
/**
* Does the actual polling.
*/
override protected void poll() @nogc
{
DWORD lpNumberOfBytes;
ULONG_PTR key;
LPOVERLAPPED overlap;
immutable timeout = cast(immutable int) blockTime.total!"msecs";
/**
* Does the actual polling.
*/
override protected void poll() @nogc
{
DWORD lpNumberOfBytes;
ULONG_PTR key;
LPOVERLAPPED overlap;
immutable timeout = cast(immutable int) blockTime.total!"msecs";
auto result = GetQueuedCompletionStatus(completionPort,
&lpNumberOfBytes,
&key,
&overlap,
timeout);
if (result == FALSE && overlap == NULL)
{
return; // Timeout
}
auto result = GetQueuedCompletionStatus(completionPort,
&lpNumberOfBytes,
&key,
&overlap,
timeout);
if (result == FALSE && overlap == NULL)
{
return; // Timeout
}
auto overlapped = (cast(SocketState) ((cast(void*) overlap) - 8));
assert(overlapped !is null);
scope (failure)
{
MmapPool.instance.dispose(overlapped);
}
auto overlapped = (cast(SocketState) ((cast(void*) overlap) - 8));
assert(overlapped !is null);
scope (failure)
{
MmapPool.instance.dispose(overlapped);
}
switch (overlapped.event)
{
case OverlappedSocketEvent.accept:
auto connection = cast(ConnectionWatcher) (cast(void*) key);
assert(connection !is null);
switch (overlapped.event)
{
case OverlappedSocketEvent.accept:
auto connection = cast(ConnectionWatcher) (cast(void*) key);
assert(connection !is null);
auto listener = cast(OverlappedStreamSocket) connection.socket;
assert(listener !is null);
auto listener = cast(OverlappedStreamSocket) connection.socket;
assert(listener !is null);
auto socket = listener.endAccept(overlapped);
auto transport = MmapPool.instance.make!StreamTransport(socket);
auto socket = listener.endAccept(overlapped);
auto transport = MmapPool.instance.make!StreamTransport(socket);
connection.incoming.enqueue(transport);
connection.incoming.enqueue(transport);
reify(transport, EventMask(Event.none), EventMask(Event.read, Event.write));
reify(transport, EventMask(Event.none), EventMask(Event.read, Event.write));
pendings.enqueue(connection);
listener.beginAccept(overlapped);
break;
case OverlappedSocketEvent.read:
auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null);
pendings.enqueue(connection);
listener.beginAccept(overlapped);
break;
case OverlappedSocketEvent.read:
auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null);
if (!transport.active)
{
MmapPool.instance.dispose(transport);
MmapPool.instance.dispose(overlapped);
return;
}
if (!transport.active)
{
MmapPool.instance.dispose(transport);
MmapPool.instance.dispose(overlapped);
return;
}
int received;
SocketException exception;
try
{
received = transport.socket.endReceive(overlapped);
}
catch (SocketException e)
{
exception = e;
}
if (transport.socket.disconnected)
{
// We want to get one last notification to destroy the watcher.
transport.socket.beginReceive(transport.output[], overlapped);
kill(transport, exception);
}
else if (received > 0)
{
immutable full = transport.output.free == received;
int received;
SocketException exception;
try
{
received = transport.socket.endReceive(overlapped);
}
catch (SocketException e)
{
exception = e;
}
if (transport.socket.disconnected)
{
// We want to get one last notification to destroy the watcher.
transport.socket.beginReceive(transport.output[], overlapped);
kill(transport, exception);
}
else if (received > 0)
{
immutable full = transport.output.free == received;
transport.output += received;
// Receive was interrupted because the buffer is full. We have to continue.
if (full)
{
transport.socket.beginReceive(transport.output[], overlapped);
}
pendings.enqueue(transport);
}
break;
case OverlappedSocketEvent.write:
auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null);
transport.output += received;
// Receive was interrupted because the buffer is full. We have to continue.
if (full)
{
transport.socket.beginReceive(transport.output[], overlapped);
}
pendings.enqueue(transport);
}
break;
case OverlappedSocketEvent.write:
auto transport = cast(StreamTransport) (cast(void*) key);
assert(transport !is null);
transport.input += transport.socket.endSend(overlapped);
if (transport.input.length > 0)
{
transport.socket.beginSend(transport.input[], overlapped);
}
else
{
transport.socket.beginReceive(transport.output[], overlapped);
if (transport.isClosing())
{
kill(transport);
}
}
break;
default:
assert(false, "Unknown event");
}
}
}
transport.input += transport.socket.endSend(overlapped);
if (transport.input.length > 0)
{
transport.socket.beginSend(transport.input[], overlapped);
}
else
{
transport.socket.beginReceive(transport.output[], overlapped);
if (transport.isClosing())
{
kill(transport);
}
}
break;
default:
assert(false, "Unknown event");
}
}
}

View File

@ -12,31 +12,31 @@ module tanya.async.event.kqueue;
version (OSX)
{
version = MacBSD;
version = MacBSD;
}
else version (iOS)
{
version = MacBSD;
version = MacBSD;
}
else version (TVOS)
{
version = MacBSD;
version = MacBSD;
}
else version (WatchOS)
{
version = MacBSD;
version = MacBSD;
}
else version (FreeBSD)
{
version = MacBSD;
version = MacBSD;
}
else version (OpenBSD)
{
version = MacBSD;
version = MacBSD;
}
else version (DragonFlyBSD)
{
version = MacBSD;
version = MacBSD;
}
version (MacBSD):
@ -50,62 +50,62 @@ import tanya.async.event.selector;
import tanya.async.loop;
import tanya.async.transport;
import tanya.async.watcher;
import tanya.container.vector;
import tanya.container.array;
import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket;
void EV_SET(kevent_t* kevp, typeof(kevent_t.tupleof) args) pure nothrow @nogc
{
*kevp = kevent_t(args);
*kevp = kevent_t(args);
}
enum : short
{
EVFILT_READ = -1,
EVFILT_WRITE = -2,
EVFILT_AIO = -3, /* attached to aio requests */
EVFILT_VNODE = -4, /* attached to vnodes */
EVFILT_PROC = -5, /* attached to struct proc */
EVFILT_SIGNAL = -6, /* attached to struct proc */
EVFILT_TIMER = -7, /* timers */
EVFILT_MACHPORT = -8, /* Mach portsets */
EVFILT_FS = -9, /* filesystem events */
EVFILT_USER = -10, /* User events */
EVFILT_VM = -12, /* virtual memory events */
EVFILT_SYSCOUNT = 11
EVFILT_READ = -1,
EVFILT_WRITE = -2,
EVFILT_AIO = -3, /* attached to aio requests */
EVFILT_VNODE = -4, /* attached to vnodes */
EVFILT_PROC = -5, /* attached to struct proc */
EVFILT_SIGNAL = -6, /* attached to struct proc */
EVFILT_TIMER = -7, /* timers */
EVFILT_MACHPORT = -8, /* Mach portsets */
EVFILT_FS = -9, /* filesystem events */
EVFILT_USER = -10, /* User events */
EVFILT_VM = -12, /* virtual memory events */
EVFILT_SYSCOUNT = 11
}
struct kevent_t
{
uintptr_t ident; /* identifier for this event */
short filter; /* filter for event */
ushort flags;
uint fflags;
intptr_t data;
void *udata; /* opaque user data identifier */
uintptr_t ident; /* identifier for this event */
short filter; /* filter for event */
ushort flags;
uint fflags;
intptr_t data;
void *udata; /* opaque user data identifier */
}
enum
{
/* actions */
EV_ADD = 0x0001, /* add event to kq (implies enable) */
EV_DELETE = 0x0002, /* delete event from kq */
EV_ENABLE = 0x0004, /* enable event */
EV_DISABLE = 0x0008, /* disable event (not reported) */
/* actions */
EV_ADD = 0x0001, /* add event to kq (implies enable) */
EV_DELETE = 0x0002, /* delete event from kq */
EV_ENABLE = 0x0004, /* enable event */
EV_DISABLE = 0x0008, /* disable event (not reported) */
/* flags */
EV_ONESHOT = 0x0010, /* only report one occurrence */
EV_CLEAR = 0x0020, /* clear event state after reporting */
EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
EV_DISPATCH = 0x0080, /* disable event after reporting */
/* flags */
EV_ONESHOT = 0x0010, /* only report one occurrence */
EV_CLEAR = 0x0020, /* clear event state after reporting */
EV_RECEIPT = 0x0040, /* force EV_ERROR on success, data=0 */
EV_DISPATCH = 0x0080, /* disable event after reporting */
EV_SYSFLAGS = 0xF000, /* reserved by system */
EV_FLAG1 = 0x2000, /* filter-specific flag */
EV_SYSFLAGS = 0xF000, /* reserved by system */
EV_FLAG1 = 0x2000, /* filter-specific flag */
/* returned values */
EV_EOF = 0x8000, /* EOF detected */
EV_ERROR = 0x4000, /* error, data contains errno */
/* returned values */
EV_EOF = 0x8000, /* EOF detected */
EV_ERROR = 0x4000, /* error, data contains errno */
}
extern(C) int kqueue() nothrow @nogc;
@ -115,211 +115,211 @@ extern(C) int kevent(int kq, const kevent_t *changelist, int nchanges,
final class KqueueLoop : SelectorLoop
{
protected int fd;
private Vector!kevent_t events;
private Vector!kevent_t changes;
private size_t changeCount;
protected int fd;
private Array!kevent_t events;
private Array!kevent_t changes;
private size_t changeCount;
/**
* Returns: Maximal event count can be got at a time
* (should be supported by the backend).
*/
override protected @property uint maxEvents()
const pure nothrow @safe @nogc
{
return cast(uint) events.length;
}
/**
* Returns: Maximal event count can be got at a time
* (should be supported by the backend).
*/
override protected @property uint maxEvents()
const pure nothrow @safe @nogc
{
return cast(uint) events.length;
}
this() @nogc
{
super();
this() @nogc
{
super();
if ((fd = kqueue()) == -1)
{
throw make!BadLoopException(defaultAllocator,
"kqueue initialization failed");
}
events = Vector!kevent_t(64, MmapPool.instance);
changes = Vector!kevent_t(64, MmapPool.instance);
}
if ((fd = kqueue()) == -1)
{
throw make!BadLoopException(defaultAllocator,
"kqueue initialization failed");
}
events = Array!kevent_t(64, MmapPool.instance);
changes = Array!kevent_t(64, MmapPool.instance);
}
/**
* Frees loop internals.
*/
~this() @nogc
{
close(fd);
}
/**
* Frees loop internals.
*/
~this() @nogc
{
close(fd);
}
private void set(socket_t socket, short filter, ushort flags) @nogc
{
if (changes.length <= changeCount)
{
changes.length = changeCount + maxEvents;
}
EV_SET(&changes[changeCount],
cast(ulong) socket,
filter,
flags,
0U,
0L,
null);
++changeCount;
}
private void set(SocketType socket, short filter, ushort flags) @nogc
{
if (changes.length <= changeCount)
{
changes.length = changeCount + maxEvents;
}
EV_SET(&changes[changeCount],
cast(ulong) socket,
filter,
flags,
0U,
0L,
null);
++changeCount;
}
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
override protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
{
if (events != oldEvents)
{
if (oldEvents & Event.read || oldEvents & Event.accept)
{
set(watcher.socket.handle, EVFILT_READ, EV_DELETE);
}
if (oldEvents & Event.write)
{
set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE);
}
}
if (events & (Event.read | events & Event.accept))
{
set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE);
}
if (events & Event.write)
{
set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH);
}
return true;
}
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
override protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc
{
if (events != oldEvents)
{
if (oldEvents & Event.read || oldEvents & Event.accept)
{
set(watcher.socket.handle, EVFILT_READ, EV_DELETE);
}
if (oldEvents & Event.write)
{
set(watcher.socket.handle, EVFILT_WRITE, EV_DELETE);
}
}
if (events & (Event.read | events & Event.accept))
{
set(watcher.socket.handle, EVFILT_READ, EV_ADD | EV_ENABLE);
}
if (events & Event.write)
{
set(watcher.socket.handle, EVFILT_WRITE, EV_ADD | EV_DISPATCH);
}
return true;
}
/**
* Does the actual polling.
*/
protected override void poll() @nogc
{
timespec ts;
blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec);
/**
* Does the actual polling.
*/
protected override void poll() @nogc
{
timespec ts;
blockTime.split!("seconds", "nsecs")(ts.tv_sec, ts.tv_nsec);
if (changeCount > maxEvents)
{
events.length = changes.length;
}
if (changeCount > maxEvents)
{
events.length = changes.length;
}
auto eventCount = kevent(fd,
changes.get().ptr,
cast(int) changeCount,
events.get().ptr,
maxEvents,
&ts);
changeCount = 0;
auto eventCount = kevent(fd,
changes.get().ptr,
cast(int) changeCount,
events.get().ptr,
maxEvents,
&ts);
changeCount = 0;
if (eventCount < 0)
{
if (errno != EINTR)
{
throw defaultAllocator.make!BadLoopException();
}
return;
}
if (eventCount < 0)
{
if (errno != EINTR)
{
throw defaultAllocator.make!BadLoopException();
}
return;
}
for (int i; i < eventCount; ++i)
{
assert(connections.length > events[i].ident);
for (int i; i < eventCount; ++i)
{
assert(connections.length > events[i].ident);
auto transport = cast(StreamTransport) connections[events[i].ident];
// If it is a ConnectionWatcher. Accept connections.
if (transport is null)
{
auto connection = cast(ConnectionWatcher) connections[events[i].ident];
assert(connection !is null);
auto transport = cast(StreamTransport) connections[events[i].ident];
// If it is a ConnectionWatcher. Accept connections.
if (transport is null)
{
auto connection = cast(ConnectionWatcher) connections[events[i].ident];
assert(connection !is null);
acceptConnections(connection);
}
else if (events[i].flags & EV_ERROR)
{
kill(transport);
}
else if (events[i].filter == EVFILT_READ)
{
SocketException exception;
try
{
ptrdiff_t received;
do
{
received = transport.socket.receive(transport.output[]);
transport.output += received;
}
while (received);
}
catch (SocketException e)
{
exception = e;
}
if (transport.socket.disconnected)
{
kill(transport, exception);
}
else if (transport.output.length)
{
pendings.enqueue(transport);
}
}
else if (events[i].filter == EVFILT_WRITE)
{
transport.writeReady = true;
if (transport.input.length)
{
feed(transport);
}
}
}
}
acceptConnections(connection);
}
else if (events[i].flags & EV_ERROR)
{
kill(transport);
}
else if (events[i].filter == EVFILT_READ)
{
SocketException exception;
try
{
ptrdiff_t received;
do
{
received = transport.socket.receive(transport.output[]);
transport.output += received;
}
while (received);
}
catch (SocketException e)
{
exception = e;
}
if (transport.socket.disconnected)
{
kill(transport, exception);
}
else if (transport.output.length)
{
pendings.enqueue(transport);
}
}
else if (events[i].filter == EVFILT_WRITE)
{
transport.writeReady = true;
if (transport.input.length)
{
feed(transport);
}
}
}
}
/**
* Returns: The blocking time.
*/
override protected @property inout(Duration) blockTime()
inout @nogc @safe pure nothrow
{
return min(super.blockTime, 1.dur!"seconds");
}
/**
* Returns: The blocking time.
*/
override protected @property inout(Duration) blockTime()
inout @nogc @safe pure nothrow
{
return min(super.blockTime, 1.dur!"seconds");
}
/**
* If the transport couldn't send the data, the further sending should
* be handled by the event loop.
*
* Params:
* transport = Transport.
* exception = Exception thrown on sending.
*
* Returns: $(D_KEYWORD true) if the operation could be successfully
* completed or scheduled, $(D_KEYWORD false) otherwise (the
* transport will be destroyed then).
*/
protected override bool feed(StreamTransport transport,
SocketException exception = null) @nogc
{
if (!super.feed(transport, exception))
{
return false;
}
if (!transport.writeReady)
{
set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH);
return true;
}
return false;
}
/**
* If the transport couldn't send the data, the further sending should
* be handled by the event loop.
*
* Params:
* transport = Transport.
* exception = Exception thrown on sending.
*
* Returns: $(D_KEYWORD true) if the operation could be successfully
* completed or scheduled, $(D_KEYWORD false) otherwise (the
* transport will be destroyed then).
*/
protected override bool feed(StreamTransport transport,
SocketException exception = null) @nogc
{
if (!super.feed(transport, exception))
{
return false;
}
if (!transport.writeReady)
{
set(transport.socket.handle, EVFILT_WRITE, EV_DISPATCH);
return true;
}
return false;
}
}

View File

@ -17,7 +17,7 @@ import tanya.async.protocol;
import tanya.async.transport;
import tanya.async.watcher;
import tanya.container.buffer;
import tanya.container.vector;
import tanya.container.array;
import tanya.memory;
import tanya.memory.mmappool;
import tanya.network.socket;
@ -27,371 +27,374 @@ import tanya.network.socket;
*/
package class StreamTransport : SocketWatcher, DuplexTransport, SocketTransport
{
private SelectorLoop loop;
private SelectorLoop loop;
private SocketException exception;
private SocketException exception;
package ReadBuffer!ubyte output;
package ReadBuffer!ubyte output;
package WriteBuffer!ubyte input;
package WriteBuffer!ubyte input;
private Protocol protocol_;
private Protocol protocol_;
private bool closing;
private bool closing;
/// Received notification that the underlying socket is write-ready.
package bool writeReady;
/// Received notification that the underlying socket is write-ready.
package bool writeReady;
/**
* Params:
* loop = Event loop.
* socket = Socket.
*
* Precondition: $(D_INLINECODE loop !is null && socket !is null)
*/
this(SelectorLoop loop, ConnectedSocket socket) @nogc
in
{
assert(loop !is null);
}
body
{
super(socket);
this.loop = loop;
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true;
}
/**
* Params:
* loop = Event loop.
* socket = Socket.
*
* Precondition: $(D_INLINECODE loop !is null && socket !is null)
*/
this(SelectorLoop loop, ConnectedSocket socket) @nogc
in
{
assert(loop !is null);
}
body
{
super(socket);
this.loop = loop;
output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
input = WriteBuffer!ubyte(8192, MmapPool.instance);
active = true;
}
/**
* Returns: Socket.
*
* Postcondition: $(D_INLINECODE socket !is null)
*/
override @property ConnectedSocket socket() pure nothrow @safe @nogc
out (socket)
{
assert(socket !is null);
}
body
{
return cast(ConnectedSocket) socket_;
}
/**
* Returns: Socket.
*
* Postcondition: $(D_INLINECODE socket !is null)
*/
override @property ConnectedSocket socket() pure nothrow @safe @nogc
out (socket)
{
assert(socket !is null);
}
body
{
return cast(ConnectedSocket) socket_;
}
private @property void socket(ConnectedSocket socket) pure nothrow @safe @nogc
in
{
assert(socket !is null);
}
body
{
socket_ = socket;
}
private @property void socket(ConnectedSocket socket)
pure nothrow @safe @nogc
in
{
assert(socket !is null);
}
body
{
socket_ = socket;
}
/**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
}
/**
* Returns: Application protocol.
*/
@property Protocol protocol() pure nothrow @safe @nogc
{
return protocol_;
}
/**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/
@property void protocol(Protocol protocol) pure nothrow @safe @nogc
in
{
assert(protocol !is null);
}
body
{
protocol_ = protocol;
}
/**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/
@property void protocol(Protocol protocol) pure nothrow @safe @nogc
in
{
assert(protocol !is null);
}
body
{
protocol_ = protocol;
}
/**
* Returns $(D_PARAM true) if the transport is closing or closed.
*/
bool isClosing() const pure nothrow @safe @nogc
{
return closing;
}
/**
* Returns $(D_PARAM true) if the transport is closing or closed.
*/
bool isClosing() const pure nothrow @safe @nogc
{
return closing;
}
/**
* Close the transport.
*
* Buffered data will be flushed. No more data will be received.
*/
void close() @nogc
{
closing = true;
loop.reify(this, EventMask(Event.read, Event.write), EventMask(Event.write));
}
/**
* Close the transport.
*
* Buffered data will be flushed. No more data will be received.
*/
void close() @nogc
{
closing = true;
loop.reify(this,
EventMask(Event.read, Event.write),
EventMask(Event.write));
}
/**
* Invokes the watcher callback.
*/
override void invoke() @nogc
{
if (output.length)
{
protocol.received(output[0 .. $]);
output.clear();
if (isClosing() && input.length == 0)
{
loop.kill(this);
}
}
else
{
protocol.disconnected(exception);
MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception);
active = false;
}
}
/**
* Invokes the watcher callback.
*/
override void invoke() @nogc
{
if (output.length)
{
protocol.received(output[0 .. $]);
output.clear();
if (isClosing() && input.length == 0)
{
loop.kill(this);
}
}
else
{
protocol.disconnected(exception);
MmapPool.instance.dispose(protocol_);
defaultAllocator.dispose(exception);
active = false;
}
}
/**
* Write some data to the transport.
*
* Params:
* data = Data to send.
*/
void write(ubyte[] data) @nogc
{
if (!data.length)
{
return;
}
// Try to write if the socket is write ready.
if (writeReady)
{
ptrdiff_t sent;
SocketException exception;
try
{
sent = socket.send(data);
if (sent == 0)
{
writeReady = false;
}
}
catch (SocketException e)
{
writeReady = false;
exception = e;
}
if (sent < data.length)
{
input ~= data[sent..$];
loop.feed(this, exception);
}
}
else
{
input ~= data;
}
}
/**
* Write some data to the transport.
*
* Params:
* data = Data to send.
*/
void write(ubyte[] data) @nogc
{
if (!data.length)
{
return;
}
// Try to write if the socket is write ready.
if (writeReady)
{
ptrdiff_t sent;
SocketException exception;
try
{
sent = socket.send(data);
if (sent == 0)
{
writeReady = false;
}
}
catch (SocketException e)
{
writeReady = false;
exception = e;
}
if (sent < data.length)
{
input ~= data[sent..$];
loop.feed(this, exception);
}
}
else
{
input ~= data;
}
}
}
abstract class SelectorLoop : Loop
{
/// Pending connections.
protected Vector!SocketWatcher connections;
/// Pending connections.
protected Array!SocketWatcher connections;
this() @nogc
{
super();
connections = Vector!SocketWatcher(maxEvents, MmapPool.instance);
}
this() @nogc
{
super();
connections = Array!SocketWatcher(maxEvents, MmapPool.instance);
}
~this() @nogc
{
foreach (ref connection; connections)
{
// We want to free only the transports. ConnectionWatcher are created by the
// user and should be freed by himself.
if (cast(StreamTransport) connection !is null)
{
MmapPool.instance.dispose(connection);
}
}
}
~this() @nogc
{
foreach (ref connection; connections)
{
// We want to free only the transports. ConnectionWatcher are
// created by the user and should be freed by himself.
if (cast(StreamTransport) connection !is null)
{
MmapPool.instance.dispose(connection);
}
}
}
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
override abstract protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc;
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
override abstract protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc;
/**
* Kills the watcher and closes the connection.
*
* Params:
* transport = Transport.
* exception = Occurred exception.
*/
protected void kill(StreamTransport transport,
SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{
transport.socket.shutdown();
defaultAllocator.dispose(transport.socket);
transport.exception = exception;
pendings.enqueue(transport);
}
/**
* Kills the watcher and closes the connection.
*
* Params:
* transport = Transport.
* exception = Occurred exception.
*/
protected void kill(StreamTransport transport,
SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{
transport.socket.shutdown();
defaultAllocator.dispose(transport.socket);
transport.exception = exception;
pendings.enqueue(transport);
}
/**
* If the transport couldn't send the data, the further sending should
* be handled by the event loop.
*
* Params:
* transport = Transport.
* exception = Exception thrown on sending.
*
* Returns: $(D_KEYWORD true) if the operation could be successfully
* completed or scheduled, $(D_KEYWORD false) otherwise (the
* transport will be destroyed then).
*/
protected bool feed(StreamTransport transport,
SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{
while (transport.input.length && transport.writeReady)
{
try
{
ptrdiff_t sent = transport.socket.send(transport.input[]);
if (sent == 0)
{
transport.writeReady = false;
}
else
{
transport.input += sent;
}
}
catch (SocketException e)
{
exception = e;
transport.writeReady = false;
}
}
if (exception !is null)
{
kill(transport, exception);
return false;
}
if (transport.input.length == 0 && transport.isClosing())
{
kill(transport);
}
return true;
}
/**
* If the transport couldn't send the data, the further sending should
* be handled by the event loop.
*
* Params:
* transport = Transport.
* exception = Exception thrown on sending.
*
* Returns: $(D_KEYWORD true) if the operation could be successfully
* completed or scheduled, $(D_KEYWORD false) otherwise (the
* transport will be destroyed then).
*/
protected bool feed(StreamTransport transport,
SocketException exception = null) @nogc
in
{
assert(transport !is null);
}
body
{
while (transport.input.length && transport.writeReady)
{
try
{
ptrdiff_t sent = transport.socket.send(transport.input[]);
if (sent == 0)
{
transport.writeReady = false;
}
else
{
transport.input += sent;
}
}
catch (SocketException e)
{
exception = e;
transport.writeReady = false;
}
}
if (exception !is null)
{
kill(transport, exception);
return false;
}
if (transport.input.length == 0 && transport.isClosing())
{
kill(transport);
}
return true;
}
/**
* Start watching.
*
* Params:
* watcher = Watcher.
*/
override void start(ConnectionWatcher watcher) @nogc
{
if (watcher.active)
{
return;
}
/**
* Start watching.
*
* Params:
* watcher = Watcher.
*/
override void start(ConnectionWatcher watcher) @nogc
{
if (watcher.active)
{
return;
}
if (connections.length <= watcher.socket)
{
connections.length = watcher.socket.handle + maxEvents / 2;
}
connections[watcher.socket.handle] = watcher;
if (connections.length <= watcher.socket)
{
connections.length = watcher.socket.handle + maxEvents / 2;
}
connections[watcher.socket.handle] = watcher;
super.start(watcher);
}
super.start(watcher);
}
/**
* Accept incoming connections.
*
* Params:
* connection = Connection watcher ready to accept.
*/
package void acceptConnections(ConnectionWatcher connection) @nogc
in
{
assert(connection !is null);
}
body
{
while (true)
{
ConnectedSocket client;
try
{
client = (cast(StreamSocket) connection.socket).accept();
}
catch (SocketException e)
{
defaultAllocator.dispose(e);
break;
}
if (client is null)
{
break;
}
/**
* Accept incoming connections.
*
* Params:
* connection = Connection watcher ready to accept.
*/
package void acceptConnections(ConnectionWatcher connection) @nogc
in
{
assert(connection !is null);
}
body
{
while (true)
{
ConnectedSocket client;
try
{
client = (cast(StreamSocket) connection.socket).accept();
}
catch (SocketException e)
{
defaultAllocator.dispose(e);
break;
}
if (client is null)
{
break;
}
StreamTransport transport;
StreamTransport transport;
if (connections.length > client.handle)
{
transport = cast(StreamTransport) connections[client.handle];
}
else
{
connections.length = client.handle + maxEvents / 2;
}
if (transport is null)
{
transport = MmapPool.instance.make!StreamTransport(this, client);
connections[client.handle] = transport;
}
else
{
transport.socket = client;
}
if (connections.length > client.handle)
{
transport = cast(StreamTransport) connections[client.handle];
}
else
{
connections.length = client.handle + maxEvents / 2;
}
if (transport is null)
{
transport = MmapPool.instance.make!StreamTransport(this, client);
connections[client.handle] = transport;
}
else
{
transport.socket = client;
}
reify(transport, EventMask(Event.none), EventMask(Event.read, Event.write));
connection.incoming.enqueue(transport);
}
reify(transport, EventMask(Event.none), EventMask(Event.read, Event.write));
connection.incoming.enqueue(transport);
}
if (!connection.incoming.empty)
{
pendings.enqueue(connection);
}
}
if (!connection.incoming.empty)
{
pendings.enqueue(connection);
}
}
}

View File

@ -21,12 +21,12 @@ import core.sys.windows.windef;
*/
class State
{
/// For internal use by Windows API.
align(1) OVERLAPPED overlapped;
/// For internal use by Windows API.
align(1) OVERLAPPED overlapped;
/// File/socket handle.
HANDLE handle;
/// File/socket handle.
HANDLE handle;
/// For keeping events or event masks.
int event;
/// For keeping events or event masks.
int event;
}

View File

@ -15,50 +15,50 @@
*
* class EchoProtocol : TransmissionControlProtocol
* {
* private DuplexTransport transport;
* private DuplexTransport transport;
*
* void received(in ubyte[] data) @nogc
* {
* transport.write(data);
* }
* void received(in ubyte[] data) @nogc
* {
* transport.write(data);
* }
*
* void connected(DuplexTransport transport) @nogc
* {
* this.transport = transport;
* }
* void connected(DuplexTransport transport) @nogc
* {
* this.transport = transport;
* }
*
* void disconnected(SocketException e) @nogc
* {
* }
* void disconnected(SocketException e) @nogc
* {
* }
* }
*
* void main()
* {
* auto address = defaultAllocator.make!InternetAddress("127.0.0.1", cast(ushort) 8192);
* auto address = defaultAllocator.make!InternetAddress("127.0.0.1", cast(ushort) 8192);
*
* version (Windows)
* {
* auto sock = defaultAllocator.make!OverlappedStreamSocket(AddressFamily.INET);
* }
* else
* {
* auto sock = defaultAllocator.make!StreamSocket(AddressFamily.INET);
* sock.blocking = false;
* }
* version (Windows)
* {
* auto sock = defaultAllocator.make!OverlappedStreamSocket(AddressFamily.inet);
* }
* else
* {
* auto sock = defaultAllocator.make!StreamSocket(AddressFamily.inet);
* sock.blocking = false;
* }
*
* sock.bind(address);
* sock.listen(5);
* sock.bind(address);
* sock.listen(5);
*
* auto io = defaultAllocator.make!ConnectionWatcher(sock);
* io.setProtocol!EchoProtocol;
* auto io = defaultAllocator.make!ConnectionWatcher(sock);
* io.setProtocol!EchoProtocol;
*
* defaultLoop.start(io);
* defaultLoop.run();
* defaultLoop.start(io);
* defaultLoop.run();
*
* sock.shutdown();
* defaultAllocator.dispose(io);
* defaultAllocator.dispose(sock);
* defaultAllocator.dispose(address);
* sock.shutdown();
* defaultAllocator.dispose(io);
* defaultAllocator.dispose(sock);
* defaultAllocator.dispose(address);
* }
* ---
*/
@ -81,33 +81,33 @@ version (DisableBackends)
}
else version (linux)
{
import tanya.async.event.epoll;
version = Epoll;
import tanya.async.event.epoll;
version = Epoll;
}
else version (Windows)
{
import tanya.async.event.iocp;
version = IOCP;
import tanya.async.event.iocp;
version = IOCP;
}
else version (OSX)
{
version = Kqueue;
version = Kqueue;
}
else version (iOS)
{
version = Kqueue;
version = Kqueue;
}
else version (FreeBSD)
{
version = Kqueue;
version = Kqueue;
}
else version (OpenBSD)
{
version = Kqueue;
version = Kqueue;
}
else version (DragonFlyBSD)
{
version = Kqueue;
version = Kqueue;
}
/**
@ -115,11 +115,11 @@ else version (DragonFlyBSD)
*/
enum Event : uint
{
none = 0x00, /// No events.
read = 0x01, /// Non-blocking read call.
write = 0x02, /// Non-blocking write call.
accept = 0x04, /// Connection made.
error = 0x80000000, /// Sent when an error occurs.
none = 0x00, /// No events.
read = 0x01, /// Non-blocking read call.
write = 0x02, /// Non-blocking write call.
accept = 0x04, /// Connection made.
error = 0x80000000, /// Sent when an error occurs.
}
alias EventMask = BitFlags!Event;
@ -129,150 +129,150 @@ alias EventMask = BitFlags!Event;
*/
abstract class Loop
{
private bool done;
private bool done;
/// Pending watchers.
protected Queue!Watcher pendings;
/// Pending watchers.
protected Queue!Watcher pendings;
/**
* Returns: Maximal event count can be got at a time
* (should be supported by the backend).
*/
protected @property uint maxEvents()
const pure nothrow @safe @nogc
{
return 128U;
}
/**
* Returns: Maximal event count can be got at a time
* (should be supported by the backend).
*/
protected @property uint maxEvents()
const pure nothrow @safe @nogc
{
return 128U;
}
/**
* Initializes the loop.
*/
this() @nogc
{
pendings = Queue!Watcher(MmapPool.instance);
}
/**
* Initializes the loop.
*/
this() @nogc
{
pendings = Queue!Watcher(MmapPool.instance);
}
/**
* Frees loop internals.
*/
~this() @nogc
{
foreach (w; pendings)
{
MmapPool.instance.dispose(w);
}
}
/**
* Frees loop internals.
*/
~this() @nogc
{
foreach (w; pendings)
{
MmapPool.instance.dispose(w);
}
}
/**
* Starts the loop.
*/
void run() @nogc
{
done = false;
do
{
poll();
/**
* Starts the loop.
*/
void run() @nogc
{
done = false;
do
{
poll();
// Invoke pendings
foreach (ref w; pendings)
{
w.invoke();
}
}
while (!done);
}
// Invoke pendings
foreach (ref w; pendings)
{
w.invoke();
}
}
while (!done);
}
/**
* Break out of the loop.
*/
void unloop() @safe pure nothrow @nogc
{
done = true;
}
/**
* Break out of the loop.
*/
void unloop() @safe pure nothrow @nogc
{
done = true;
}
/**
* Start watching.
*
* Params:
* watcher = Watcher.
*/
void start(ConnectionWatcher watcher) @nogc
{
if (watcher.active)
{
return;
}
watcher.active = true;
/**
* Start watching.
*
* Params:
* watcher = Watcher.
*/
void start(ConnectionWatcher watcher) @nogc
{
if (watcher.active)
{
return;
}
watcher.active = true;
reify(watcher, EventMask(Event.none), EventMask(Event.accept));
}
reify(watcher, EventMask(Event.none), EventMask(Event.accept));
}
/**
* Stop watching.
*
* Params:
* watcher = Watcher.
*/
void stop(ConnectionWatcher watcher) @nogc
{
if (!watcher.active)
{
return;
}
watcher.active = false;
/**
* Stop watching.
*
* Params:
* watcher = Watcher.
*/
void stop(ConnectionWatcher watcher) @nogc
{
if (!watcher.active)
{
return;
}
watcher.active = false;
reify(watcher, EventMask(Event.accept), EventMask(Event.none));
}
reify(watcher, EventMask(Event.accept), EventMask(Event.none));
}
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
abstract protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc;
/**
* Should be called if the backend configuration changes.
*
* Params:
* watcher = Watcher.
* oldEvents = The events were already set.
* events = The events should be set.
*
* Returns: $(D_KEYWORD true) if the operation was successful.
*/
abstract protected bool reify(SocketWatcher watcher,
EventMask oldEvents,
EventMask events) @nogc;
/**
* Returns: The blocking time.
*/
protected @property inout(Duration) blockTime()
inout @safe pure nothrow @nogc
{
// Don't block if we have to do.
return pendings.empty ? blockTime_ : Duration.zero;
}
/**
* Returns: The blocking time.
*/
protected @property inout(Duration) blockTime()
inout @safe pure nothrow @nogc
{
// Don't block if we have to do.
return pendings.empty ? blockTime_ : Duration.zero;
}
/**
* Sets the blocking time for IO watchers.
*
* Params:
* blockTime = The blocking time. Cannot be larger than
* $(D_PSYMBOL maxBlockTime).
*/
protected @property void blockTime(in Duration blockTime) @safe pure nothrow @nogc
in
{
assert(blockTime <= 1.dur!"hours", "Too long to wait.");
assert(!blockTime.isNegative);
}
body
{
blockTime_ = blockTime;
}
/**
* Sets the blocking time for IO watchers.
*
* Params:
* blockTime = The blocking time. Cannot be larger than
* $(D_PSYMBOL maxBlockTime).
*/
protected @property void blockTime(in Duration blockTime) @safe pure nothrow @nogc
in
{
assert(blockTime <= 1.dur!"hours", "Too long to wait.");
assert(!blockTime.isNegative);
}
body
{
blockTime_ = blockTime;
}
/**
* Does the actual polling.
*/
abstract protected void poll() @nogc;
/**
* Does the actual polling.
*/
abstract protected void poll() @nogc;
/// Maximal block time.
protected Duration blockTime_ = 1.dur!"minutes";
/// Maximal block time.
protected Duration blockTime_ = 1.dur!"minutes";
}
/**
@ -280,17 +280,17 @@ abstract class Loop
*/
class BadLoopException : Exception
{
/**
* Params:
* file = The file where the exception occurred.
* line = The line number where the exception occurred.
* next = The previous exception in the chain of exceptions, if any.
*/
this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
pure nothrow const @safe @nogc
{
super("Event loop cannot be initialized.", file, line, next);
}
/**
* Params:
* file = The file where the exception occurred.
* line = The line number where the exception occurred.
* next = The previous exception in the chain of exceptions, if any.
*/
this(string file = __FILE__, size_t line = __LINE__, Throwable next = null)
pure nothrow const @safe @nogc
{
super("Event loop cannot be initialized.", file, line, next);
}
}
/**
@ -302,24 +302,24 @@ class BadLoopException : Exception
*/
@property Loop defaultLoop() @nogc
{
if (defaultLoop_ !is null)
{
return defaultLoop_;
}
version (Epoll)
{
defaultLoop_ = MmapPool.instance.make!EpollLoop;
}
else version (IOCP)
{
defaultLoop_ = MmapPool.instance.make!IOCPLoop;
}
else version (Kqueue)
{
import tanya.async.event.kqueue;
defaultLoop_ = MmapPool.instance.make!KqueueLoop;
}
return defaultLoop_;
if (defaultLoop_ !is null)
{
return defaultLoop_;
}
version (Epoll)
{
defaultLoop_ = MmapPool.instance.make!EpollLoop;
}
else version (IOCP)
{
defaultLoop_ = MmapPool.instance.make!IOCPLoop;
}
else version (Kqueue)
{
import tanya.async.event.kqueue;
defaultLoop_ = MmapPool.instance.make!KqueueLoop;
}
return defaultLoop_;
}
/**
@ -331,16 +331,16 @@ class BadLoopException : Exception
* your implementation to this property.
*
* Params:
* loop = The event loop.
* loop = The event loop.
*/
@property void defaultLoop(Loop loop) @nogc
in
{
assert(loop !is null);
assert(loop !is null);
}
body
{
defaultLoop_ = loop;
defaultLoop_ = loop;
}
private Loop defaultLoop_;

View File

@ -18,28 +18,28 @@ import tanya.async.transport;
*/
interface Protocol
{
/**
* Params:
* data = Read data.
*/
void received(in ubyte[] data) @nogc;
/**
* Params:
* data = Read data.
*/
void received(in ubyte[] data) @nogc;
/**
* Called when a connection is made.
*
* Params:
* transport = Protocol transport.
*/
void connected(DuplexTransport transport) @nogc;
/**
* Called when a connection is made.
*
* Params:
* transport = Protocol transport.
*/
void connected(DuplexTransport transport) @nogc;
/**
* Called when a connection is lost.
*
* Params:
* exception = $(D_PSYMBOL Exception) if an error caused
* the disconnect, $(D_KEYWORD null) otherwise.
*/
void disconnected(SocketException exception) @nogc;
/**
* Called when a connection is lost.
*
* Params:
* exception = $(D_PSYMBOL Exception) if an error caused
* the disconnect, $(D_KEYWORD null) otherwise.
*/
void disconnected(SocketException exception) @nogc;
}
/**

View File

@ -32,13 +32,13 @@ interface ReadTransport : Transport
*/
interface WriteTransport : Transport
{
/**
* Write some data to the transport.
*
* Params:
* data = Data to send.
*/
void write(ubyte[] data) @nogc;
/**
* Write some data to the transport.
*
* Params:
* data = Data to send.
*/
void write(ubyte[] data) @nogc;
}
/**
@ -46,46 +46,46 @@ interface WriteTransport : Transport
*/
interface DuplexTransport : ReadTransport, WriteTransport
{
/**
* Returns: Application protocol.
*
* Postcondition: $(D_INLINECODE protocol !is null)
*/
@property Protocol protocol() pure nothrow @safe @nogc
out (protocol)
{
assert(protocol !is null);
}
/**
* Returns: Application protocol.
*
* Postcondition: $(D_INLINECODE protocol !is null)
*/
@property Protocol protocol() pure nothrow @safe @nogc
out (protocol)
{
assert(protocol !is null);
}
/**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/
@property void protocol(Protocol protocol) pure nothrow @safe @nogc
in
{
assert(protocol !is null);
}
/**
* Switches the protocol.
*
* The protocol is deallocated by the event loop, it should currently be
* allocated with $(D_PSYMBOL MmapPool).
*
* Params:
* protocol = Application protocol.
*
* Precondition: $(D_INLINECODE protocol !is null)
*/
@property void protocol(Protocol protocol) pure nothrow @safe @nogc
in
{
assert(protocol !is null);
}
/**
* Returns $(D_PARAM true) if the transport is closing or closed.
*/
bool isClosing() const pure nothrow @safe @nogc;
/**
* Returns $(D_PARAM true) if the transport is closing or closed.
*/
bool isClosing() const pure nothrow @safe @nogc;
/**
* Close the transport.
*
* Buffered data will be flushed. No more data will be received.
*/
void close() @nogc;
/**
* Close the transport.
*
* Buffered data will be flushed. No more data will be received.
*/
void close() @nogc;
}
/**
@ -93,8 +93,8 @@ interface DuplexTransport : ReadTransport, WriteTransport
*/
interface SocketTransport : Transport
{
/**
* Returns: Socket.
*/
@property Socket socket() pure nothrow @safe @nogc;
/**
* Returns: Socket.
*/
@property Socket socket() pure nothrow @safe @nogc;
}

View File

@ -27,13 +27,13 @@ import tanya.network.socket;
*/
abstract class Watcher
{
/// Whether the watcher is active.
bool active;
/// Whether the watcher is active.
bool active;
/**
* Invoke some action on event.
*/
void invoke() @nogc;
/**
* Invoke some action on event.
*/
void invoke() @nogc;
}
/**
@ -41,32 +41,32 @@ abstract class Watcher
*/
abstract class SocketWatcher : Watcher
{
/// Watched socket.
protected Socket socket_;
/// Watched socket.
protected Socket socket_;
/**
* Params:
* socket = Socket.
*
* Precondition: $(D_INLINECODE socket !is null)
*/
this(Socket socket) pure nothrow @safe @nogc
in
{
assert(socket !is null);
}
body
{
socket_ = socket;
}
/**
* Params:
* socket = Socket.
*
* Precondition: $(D_INLINECODE socket !is null)
*/
this(Socket socket) pure nothrow @safe @nogc
in
{
assert(socket !is null);
}
body
{
socket_ = socket;
}
/**
* Returns: Socket.
*/
@property Socket socket() pure nothrow @safe @nogc
{
return socket_;
}
/**
* Returns: Socket.
*/
@property Socket socket() pure nothrow @safe @nogc
{
return socket_;
}
}
/**
@ -74,44 +74,44 @@ abstract class SocketWatcher : Watcher
*/
class ConnectionWatcher : SocketWatcher
{
/// Incoming connection queue.
Queue!DuplexTransport incoming;
/// Incoming connection queue.
Queue!DuplexTransport incoming;
private Protocol delegate() @nogc protocolFactory;
private Protocol delegate() @nogc protocolFactory;
/**
* Params:
* socket = Socket.
*/
this(Socket socket) @nogc
{
super(socket);
incoming = Queue!DuplexTransport(MmapPool.instance);
}
/**
* Params:
* socket = Socket.
*/
this(Socket socket) @nogc
{
super(socket);
incoming = Queue!DuplexTransport(MmapPool.instance);
}
/**
* Params:
* P = Protocol should be used.
*/
void setProtocol(P : Protocol)() @nogc
{
this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
}
/**
* Params:
* P = Protocol should be used.
*/
void setProtocol(P : Protocol)() @nogc
{
this.protocolFactory = () @nogc => cast(Protocol) MmapPool.instance.make!P;
}
/**
* Invokes new connection callback.
*/
override void invoke() @nogc
in
{
assert(protocolFactory !is null, "Protocol isn't set.");
}
body
{
foreach (transport; incoming)
{
transport.protocol = protocolFactory();
transport.protocol.connected(transport);
}
}
/**
* Invokes new connection callback.
*/
override void invoke() @nogc
in
{
assert(protocolFactory !is null, "Protocol isn't set.");
}
body
{
foreach (transport; incoming)
{
transport.protocol = protocolFactory();
transport.protocol.connected(transport);
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -333,9 +333,9 @@ struct WriteBuffer(T = ubyte)
/**
* Params:
* size = Initial buffer size and the size by which the buffer will
* grow.
* allocator = Allocator.
* size = Initial buffer size and the size by which the buffer will
* grow.
* allocator = Allocator.
*
* Precondition: $(D_INLINECODE size > 0 && allocator !is null)
*/
@ -428,7 +428,7 @@ struct WriteBuffer(T = ubyte)
* Appends data to the buffer.
*
* Params:
* buffer = Buffer chunk got with $(D_PSYMBOL opIndex).
* buffer = Buffer chunk got with $(D_PSYMBOL opIndex).
*/
ref WriteBuffer opOpAssign(string op)(in T[] buffer)
if (op == "~")
@ -535,7 +535,7 @@ struct WriteBuffer(T = ubyte)
* appropriately. Always call it after $(D_PSYMBOL opIndex).
*
* Params:
* length = Length of the written data.
* length = Length of the written data.
*
* Returns: $(D_KEYWORD this).
*/

View File

@ -12,11 +12,100 @@
*/
module tanya.container.entry;
import std.traits;
import tanya.typecons;
package struct SEntry(T)
{
/// Item content.
// Item content.
T content;
/// Next item.
// Next item.
SEntry* next;
}
package struct DEntry(T)
{
// Item content.
T content;
// Previous and next item.
DEntry* next, prev;
}
package struct HashEntry(K, V)
{
this(ref K key, ref V value)
{
this.pair = Pair!(K, V)(key, value);
}
Pair!(K, V) pair;
HashEntry* next;
}
package enum BucketStatus : byte
{
deleted = -1,
empty = 0,
used = 1,
}
package struct Bucket(T)
{
this(ref T content)
{
this.content = content;
}
@property void content(ref T content)
{
this.content_ = content;
this.status = BucketStatus.used;
}
@property ref inout(T) content() inout
{
return this.content_;
}
bool opEquals(ref T content)
{
if (this.status == BucketStatus.used && this.content == content)
{
return true;
}
return false;
}
bool opEquals(ref T content) const
{
if (this.status == BucketStatus.used && this.content == content)
{
return true;
}
return false;
}
bool opEquals(ref typeof(this) that)
{
return this.content == that.content && this.status == that.status;
}
bool opEquals(ref typeof(this) that) const
{
return this.content == that.content && this.status == that.status;
}
void remove()
{
static if (hasElaborateDestructor!T)
{
destroy(this.content);
}
this.status = BucketStatus.deleted;
}
T content_;
BucketStatus status = BucketStatus.empty;
}

File diff suppressed because it is too large Load Diff

View File

@ -12,7 +12,31 @@
*/
module tanya.container;
public import tanya.container.array;
public import tanya.container.buffer;
public import tanya.container.set;
public import tanya.container.list;
public import tanya.container.vector;
public import tanya.container.string;
public import tanya.container.queue;
/**
* Thrown if $(D_PSYMBOL Set) cannot insert a new element because the container
* is full.
*/
class HashContainerFullException : Exception
{
/**
* Params:
* msg = The message for the exception.
* file = The file where the exception occurred.
* line = The line number where the exception occurred.
* next = The previous exception in the chain of exceptions, if any.
*/
this(string msg,
string file = __FILE__,
size_t line = __LINE__,
Throwable next = null) @nogc @safe pure nothrow
{
super(msg, file, line, next);
}
}

View File

@ -0,0 +1,710 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/**
* This module implements a $(D_PSYMBOL Set) container that stores unique
* values without any particular order.
*
* Copyright: Eugene Wissner 2017.
* License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
* Mozilla Public License, v. 2.0).
* Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
*/
module tanya.container.set;
import std.algorithm.mutation;
import std.traits;
import tanya.container;
import tanya.container.entry;
import tanya.memory;
/**
* Bidirectional range that iterates over the $(D_PSYMBOL Set)'s values.
*
* Params:
* E = Element type.
*/
struct Range(E)
{
static if (isMutable!E)
{
private alias DataRange = Array!(Bucket!(Unqual!E)).Range;
}
else
{
private alias DataRange = Array!(Bucket!(Unqual!E)).ConstRange;
}
private DataRange dataRange;
@disable this();
private this(DataRange dataRange)
{
while (!dataRange.empty && dataRange.front.status != BucketStatus.used)
{
dataRange.popFront();
}
while (!dataRange.empty && dataRange.back.status != BucketStatus.used)
{
dataRange.popBack();
}
this.dataRange = dataRange;
}
@property Range save()
{
return this;
}
@property bool empty() const
{
return this.dataRange.empty();
}
@property void popFront()
in
{
assert(!this.dataRange.empty);
assert(this.dataRange.front.status == BucketStatus.used);
}
out
{
assert(this.dataRange.empty
|| this.dataRange.back.status == BucketStatus.used);
}
body
{
do
{
dataRange.popFront();
}
while (!dataRange.empty && dataRange.front.status != BucketStatus.used);
}
@property void popBack()
in
{
assert(!this.dataRange.empty);
assert(this.dataRange.back.status == BucketStatus.used);
}
out
{
assert(this.dataRange.empty
|| this.dataRange.back.status == BucketStatus.used);
}
body
{
do
{
dataRange.popBack();
}
while (!dataRange.empty && dataRange.back.status != BucketStatus.used);
}
@property ref inout(E) front() inout
in
{
assert(!this.dataRange.empty);
assert(this.dataRange.front.status == BucketStatus.used);
}
body
{
return dataRange.front.content;
}
@property ref inout(E) back() inout
in
{
assert(!this.dataRange.empty);
assert(this.dataRange.back.status == BucketStatus.used);
}
body
{
return dataRange.back.content;
}
Range opIndex()
{
return typeof(return)(this.dataRange[]);
}
Range!(const E) opIndex() const
{
return typeof(return)(this.dataRange[]);
}
}
/**
* Set is a data structure that stores unique values without any particular
* order.
*
* This $(D_PSYMBOL Set) is implemented using closed hashing. Hash collisions
* are resolved with linear probing.
*
* Currently works only with integral types.
*
* Params:
* T = Element type.
*/
struct Set(T)
if (isIntegral!T || is(Unqual!T == bool))
{
/// The range types for $(D_PSYMBOL Set).
alias Range = .Range!T;
/// Ditto.
alias ConstRange = .Range!(const T);
invariant
{
assert(this.lengthIndex < primes.length);
assert(this.data.length == 0
|| this.data.length == primes[this.lengthIndex]);
}
/**
* Constructor.
*
* Params:
* n = Minimum number of buckets.
* allocator = Allocator.
*
* Precondition: $(D_INLINECODE allocator !is null).
*/
this(const size_t n, shared Allocator allocator = defaultAllocator)
in
{
assert(allocator !is null);
}
body
{
this(allocator);
rehash(n);
}
/// Ditto.
this(shared Allocator allocator)
in
{
assert(allocator !is null);
}
body
{
this.data = typeof(this.data)(allocator);
}
///
unittest
{
{
auto set = Set!int(defaultAllocator);
assert(set.capacity == 0);
}
{
auto set = Set!int(8);
assert(set.capacity == 13);
}
}
/**
* Initializes this $(D_PARAM Set) from another one.
*
* If $(D_PARAM init) is passed by reference, it will be copied.
* If $(D_PARAM init) is passed by value, it will be moved.
*
* Params:
* S = Source set type.
* init = Source set.
* allocator = Allocator.
*/
this(S)(ref S init, shared Allocator allocator = defaultAllocator)
if (is(Unqual!S == Set))
in
{
assert(allocator !is null);
}
body
{
this.data = typeof(this.data)(init.data, allocator);
}
/// Ditto.
this(S)(S init, shared Allocator allocator = defaultAllocator)
if (is(S == Set))
in
{
assert(allocator !is null);
}
body
{
this.data = typeof(this.data)(move(init.data), allocator);
this.lengthIndex = init.lengthIndex;
init.lengthIndex = 0;
}
/**
* Assigns another set.
*
* If $(D_PARAM that) is passed by reference, it will be copied.
* If $(D_PARAM that) is passed by value, it will be moved.
*
* Params:
* S = Content type.
* that = The value should be assigned.
*
* Returns: $(D_KEYWORD this).
*/
ref typeof(this) opAssign(S)(ref S that)
if (is(Unqual!S == Set))
{
this.data = that.data;
this.lengthIndex = that.lengthIndex;
return this;
}
/// Ditto.
ref typeof(this) opAssign(S)(S that) @trusted
if (is(S == Set))
{
swap(this.data, that.data);
swap(this.lengthIndex, that.lengthIndex);
return this;
}
/**
* Returns: Used allocator.
*
* Postcondition: $(D_INLINECODE allocator !is null)
*/
@property shared(Allocator) allocator() const
out (allocator)
{
assert(allocator !is null);
}
body
{
return cast(shared Allocator) this.data.allocator;
}
/**
* Maximum amount of elements this $(D_PSYMBOL Set) can hold without
* resizing and rehashing. Note that it doesn't mean that the
* $(D_PSYMBOL Set) will hold $(I exactly) $(D_PSYMBOL capacity) elements.
* $(D_PSYMBOL capacity) tells the size of the container under a best-case
* distribution of elements.
*
* Returns: $(D_PSYMBOL Set) capacity.
*/
@property size_t capacity() const
{
return this.data.length;
}
///
unittest
{
Set!int set;
assert(set.capacity == 0);
set.insert(8);
assert(set.capacity == 3);
}
/**
* Iterates over the $(D_PSYMBOL Set) and counts the elements.
*
* Returns: Count of elements within the $(D_PSYMBOL Set).
*/
@property size_t length() const
{
size_t count;
foreach (ref e; this.data[])
{
if (e.status == BucketStatus.used)
{
++count;
}
}
return count;
}
///
unittest
{
Set!int set;
assert(set.length == 0);
set.insert(8);
assert(set.length == 1);
}
private static const size_t[41] primes = [
3, 7, 13, 23, 29, 37, 53, 71, 97, 131, 163, 193, 239, 293, 389, 521,
769, 919, 1103, 1327, 1543, 2333, 3079, 4861, 6151, 12289, 24593,
49157, 98317, 196613, 393241, 786433, 1572869, 3145739, 6291469,
12582917, 25165843, 139022417, 282312799, 573292817, 1164186217,
];
/// The maximum number of buckets the container can have.
enum size_t maxBucketCount = primes[$ - 1];
static private size_t calculateHash(ref T value)
{
static if (isIntegral!T || isSomeChar!T || is(T == bool))
{
return (cast(size_t) value);
}
else
{
static assert(false);
}
}
static private size_t locateBucket(ref const DataType buckets, size_t hash)
{
return hash % buckets.length;
}
private enum InsertStatus : byte
{
found = -1,
failed = 0,
added = 1,
}
/*
* Inserts the value in an empty or deleted bucket. If the value is
* already in there, does nothing and returns InsertStatus.found. If the
* hash array is full returns InsertStatus.failed.
*/
private InsertStatus insertInUnusedBucket(ref T value)
{
auto bucketPosition = locateBucket(this.data, calculateHash(value));
foreach (ref e; this.data[bucketPosition .. $])
{
if (e == value) // Already in the set.
{
return InsertStatus.found;
}
else if (e.status != BucketStatus.used) // Insert the value.
{
e.content = value;
return InsertStatus.added;
}
}
return InsertStatus.failed;
}
/**
* Inserts a new element.
*
* Params:
* value = Element value.
*
* Returns: Amount of new elements inserted.
*
* Throws: $(D_PSYMBOL HashContainerFullException) if the insertion failed.
*/
size_t insert(T value)
{
if (this.data.length == 0)
{
this.data = DataType(primes[0], allocator);
}
InsertStatus status = insertInUnusedBucket(value);
for (; !status; status = insertInUnusedBucket(value))
{
if ((this.primes.length - 1) == this.lengthIndex)
{
throw make!HashContainerFullException(defaultAllocator,
"Set is full");
}
rehashToSize(this.lengthIndex + 1);
}
return status == InsertStatus.added;
}
///
unittest
{
Set!int set;
assert(8 !in set);
assert(set.insert(8) == 1);
assert(set.length == 1);
assert(8 in set);
assert(set.insert(8) == 0);
assert(set.length == 1);
assert(8 in set);
assert(set.remove(8));
assert(set.insert(8) == 1);
}
/**
* Removes an element.
*
* Params:
* value = Element value.
*
* Returns: Number of elements removed, which is in the container with
* unique values `1` if an element existed, and `0` otherwise.
*/
size_t remove(T value)
{
if (this.data.length == 0)
{
return 0;
}
auto bucketPosition = locateBucket(this.data, calculateHash(value));
foreach (ref e; this.data[bucketPosition .. $])
{
if (e == value) // Found.
{
e.remove();
return 1;
}
else if (e.status == BucketStatus.empty)
{
break;
}
}
return 0;
}
///
unittest
{
Set!int set;
assert(8 !in set);
set.insert(8);
assert(8 in set);
assert(set.remove(8) == 1);
assert(set.remove(8) == 0);
assert(8 !in set);
}
/**
* $(D_KEYWORD in) operator.
*
* Params:
* value = Element to be searched for.
*
* Returns: $(D_KEYWORD true) if the given element exists in the container,
* $(D_KEYWORD false) otherwise.
*/
bool opBinaryRight(string op : "in")(auto ref T value)
{
if (this.data.length == 0)
{
return 0;
}
auto bucketPosition = locateBucket(this.data, calculateHash(value));
foreach (ref e; this.data[bucketPosition .. $])
{
if (e == value) // Found.
{
return true;
}
else if (e.status == BucketStatus.empty)
{
break;
}
}
return false;
}
/// Ditto.
bool opBinaryRight(string op : "in")(auto ref const T value) const
{
if (this.data.length == 0)
{
return 0;
}
auto bucketPosition = locateBucket(this.data, calculateHash(value));
foreach (ref e; this.data[bucketPosition .. $])
{
if (e.status == BucketStatus.used && e.content == value) // Found.
{
return true;
}
else if (e.status == BucketStatus.empty)
{
break;
}
}
return false;
}
///
unittest
{
Set!int set;
assert(5 !in set);
set.insert(5);
assert(5 in set);
}
/**
* Sets the number of buckets in the container to at least $(D_PARAM n)
* and rearranges all the elements according to their hash values.
*
* If $(D_PARAM n) is greater than the current $(D_PSYMBOL capacity)
* and lower than or equal to $(D_PSYMBOL maxBucketCount), a rehash is
* forced.
*
* If $(D_PARAM n) is greater than $(D_PSYMBOL maxBucketCount),
* $(D_PSYMBOL maxBucketCount) is used instead as a new number of buckets.
*
* If $(D_PARAM n) is equal to the current $(D_PSYMBOL capacity), rehashing
* is forced without resizing the container.
*
* If $(D_PARAM n) is lower than the current $(D_PSYMBOL capacity), the
* function may have no effect.
*
* Rehashing is automatically performed whenever the container needs space
* to insert new elements.
*
* Params:
* n = Minimum number of buckets.
*/
void rehash(const size_t n)
{
size_t lengthIndex;
for (; lengthIndex < primes.length; ++lengthIndex)
{
if (primes[lengthIndex] >= n)
{
break;
}
}
rehashToSize(lengthIndex);
}
// Takes an index in the primes array.
private void rehashToSize(const size_t n)
{
auto storage = DataType(primes[n], allocator);
DataLoop: foreach (ref e1; this.data[])
{
if (e1.status == BucketStatus.used)
{
auto bucketPosition = locateBucket(storage,
calculateHash(e1.content));
foreach (ref e2; storage[bucketPosition .. $])
{
if (e2.status != BucketStatus.used) // Insert the value.
{
e2.content = e1.content;
continue DataLoop;
}
}
return; // Rehashing failed.
}
}
move(storage, this.data);
this.lengthIndex = n;
}
/**
* Returns: A bidirectional range that iterates over the $(D_PSYMBOL Set)'s
* elements.
*/
Range opIndex()
{
return typeof(return)(this.data[]);
}
/// Ditto.
ConstRange opIndex() const
{
return typeof(return)(this.data[]);
}
///
unittest
{
Set!int set;
assert(set[].empty);
set.insert(8);
assert(!set[].empty);
assert(set[].front == 8);
assert(set[].back == 8);
set.remove(8);
assert(set[].empty);
}
private alias DataType = Array!(Bucket!T);
private DataType data;
private size_t lengthIndex;
}
// Basic insertion logic.
private unittest
{
Set!int set;
assert(set.insert(5) == 1);
assert(set.data[0].status == BucketStatus.empty);
assert(set.data[1].status == BucketStatus.empty);
assert(set.data[2].content == 5 && set.data[2].status == BucketStatus.used);
assert(set.data.length == 3);
assert(set.insert(5) == 0);
assert(set.data[0].status == BucketStatus.empty);
assert(set.data[1].status == BucketStatus.empty);
assert(set.data[2].content == 5 && set.data[2].status == BucketStatus.used);
assert(set.data.length == 3);
assert(set.insert(9) == 1);
assert(set.data[0].content == 9 && set.data[0].status == BucketStatus.used);
assert(set.data[1].status == BucketStatus.empty);
assert(set.data[2].content == 5 && set.data[2].status == BucketStatus.used);
assert(set.data.length == 3);
assert(set.insert(7) == 1);
assert(set.insert(8) == 1);
assert(set.data[0].content == 7);
assert(set.data[1].content == 8);
assert(set.data[2].content == 9);
assert(set.data[3].status == BucketStatus.empty);
assert(set.data[5].content == 5);
assert(set.data.length == 7);
assert(set.insert(16) == 1);
assert(set.data[2].content == 9);
assert(set.data[3].content == 16);
assert(set.data[4].status == BucketStatus.empty);
}
// Static checks.
private unittest
{
import std.range.primitives;
static assert(isBidirectionalRange!(Set!int.ConstRange));
static assert(isBidirectionalRange!(Set!int.Range));
static assert(!isInfinite!(Set!int.Range));
static assert(!hasLength!(Set!int.Range));
static assert(is(Set!uint));
static assert(is(Set!long));
static assert(is(Set!ulong));
static assert(is(Set!short));
static assert(is(Set!ushort));
static assert(is(Set!bool));
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -16,7 +16,7 @@ import std.algorithm;
import std.ascii;
import std.range;
import std.traits;
import tanya.container.vector;
import tanya.container.array;
import tanya.memory;
/**
@ -160,6 +160,45 @@ struct Integer
assert(integer == 7383520307673030126);
}
/**
* Constructs the integer from a two's complement representation.
*
* Params:
* R = Range type.
* value = Range.
* allocator = Allocator.
*
* Precondition: $(D_INLINECODE allocator !is null)
*/
this(R)(R value,
shared Allocator allocator = defaultAllocator)
if (isBidirectionalRange!R && hasLength!R
&& is(Unqual!(ElementType!R) == ubyte))
{
this(Sign.positive, value, allocator);
if (!value.empty && ((value.front & 0x80) != 0))
{
// Negative number.
opOpAssign!"-"(exp2(countBits()));
}
}
///
nothrow @safe @nogc unittest
{
{
ubyte[8] range = [ 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xdd, 0xee ];
auto integer = Integer(range[]);
assert(integer == 7383520307673030126);
}
{
ubyte[8] range = [ 0xe6, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xdd, 0xee ];
auto integer = Integer(range[]);
assert(integer == -1839851729181745682);
}
}
/**
* Copies the integer.
*/
@ -178,12 +217,64 @@ struct Integer
allocator.resize(this.rep, 0);
}
static private const short[16] bitCounts = [
4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0
];
// Counts the number of LSBs before the first non-zero bit.
private ptrdiff_t countLSBs() const pure nothrow @safe @nogc
{
if (this.size == 0)
{
return 0;
}
ptrdiff_t bits;
for (bits = 0; (bits < this.size) && (this.rep[bits] == 0); ++bits)
{
}
digit nonZero = this.rep[bits];
bits *= digitBitCount;
/* now scan this digit until a 1 is found */
if ((nonZero & 0x01) == 0)
{
digit bitCountsPos;
do
{
bitCountsPos = nonZero & 0x0f;
bits += bitCounts[bitCountsPos];
nonZero >>= 4;
}
while (bitCountsPos == 0);
}
return bits;
}
/**
* Returns: Integer byte length.
* Returns: Number of bytes in the two's complement representation.
*/
@property size_t length() const pure nothrow @safe @nogc
{
return (countBits() + 7) / 8; // Round up.
if (this.sign)
{
const bc = countBits();
auto length = bc + (8 - (bc & 0x07));
if (((countLSBs() + 1) == bc) && ((bc & 0x07) == 0))
{
--length;
}
return length / 8;
}
else if (this.size == 0)
{
return 0;
}
else
{
return (countBits() / 8) + 1;
}
}
/**
@ -245,16 +336,10 @@ struct Integer
ref Integer opAssign(T)(T value) nothrow @safe @nogc
if (is(T == Integer))
{
if (this.allocator is value.allocator)
{
swap(this.rep, value.rep);
swap(this.sign, value.sign);
swap(this.size, value.size);
}
else
{
this = value;
}
swap(this.rep, value.rep);
swap(this.sign, value.sign);
swap(this.size, value.size);
swap(this.allocator_, value.allocator_);
return this;
}
@ -1343,79 +1428,72 @@ struct Integer
return result;
}
Vector!ubyte toVector() const nothrow @safe @nogc
// Returns 2^^n.
private Integer exp2(size_t n) const nothrow @safe @nogc
{
Vector!ubyte vector;
bool firstBit;
ubyte carry;
auto ret = Integer(allocator);
const bytes = n / digitBitCount;
ret.grow(bytes + 1);
ret.size = bytes + 1;
ret.rep[bytes] = (cast(digit) 1) << (n % digitBitCount);
return ret;
}
/**
* Returns: Two's complement representation of the integer.
*/
Array!ubyte toArray() const nothrow @safe @nogc
out (array)
{
assert(array.length == length);
}
body
{
Array!ubyte array;
if (this.size == 0)
{
return vector;
return array;
}
vector.reserve(this.size * digit.sizeof);
const bc = countBits();
const remainingBits = bc & 0x07;
// The first digit needs extra handling since it can have leading
// non significant zeros.
int digitCount = digitBitCount - 8;
const first = this.rep[this.size - 1];
const prevBitCount = ((this.size - 1) * digitBitCount);
const fullBytesBitCount = ((prevBitCount - 1) / 8 + 1) * 8;
// Find out the right alignment of the first byte.
if ((fullBytesBitCount - prevBitCount) == 0)
array.reserve(bc / 8);
if (remainingBits == 0)
{
digitCount -= digit.sizeof * 8 - digitBitCount;
array.insertBack(ubyte.init);
}
for (; digitCount >= 0; digitCount -= 8)
Integer tmp;
if (this.sign)
{
if (firstBit || ((first >> digitCount) != 0))
auto length = bc + (8 - remainingBits);
if (((countLSBs() + 1) == bc) && (remainingBits == 0))
{
firstBit = true;
vector.insertBack(cast(ubyte) (first >> digitCount));
length -= 8;
}
}
if (digitCount >= -8)
{
carry = (first << -digitCount) & 0xff;
digitCount += digitBitCount;
tmp = exp2(length) + this;
}
else
{
carry = 0;
digitCount = digitBitCount - 8;
tmp = this;
}
foreach_reverse (d; this.rep[0 .. this.size - 1])
do
{
if (carry != 0) // Check the carry from the previous digit.
{
vector.insertBack(cast(ubyte) (carry | (d >> digitCount)));
digitCount -= 8;
}
// Write the digit by bytes.
for (; digitCount >= 0; digitCount -= 8)
{
vector.insertBack(cast(ubyte) (d >> digitCount));
}
// Check for an incomplete byte.
if (digitCount >= -8)
{
carry = (d << -digitCount) & 0xff;
digitCount += digitBitCount;
}
else
{
carry = 0;
digitCount = digitBitCount - 8;
}
}
if (carry != 0)
{
vector.insertBack(cast(ubyte) (carry >> (digitBitCount - digitCount)));
array.insertBack(cast(ubyte) (tmp.rep[0] & 0xff));
tmp >>= 8;
}
while (tmp != 0);
return vector;
array[].reverse();
return array;
}
///
@ -1425,15 +1503,15 @@ struct Integer
auto integer = Integer(0x66778899aabbddee);
ubyte[8] expected = [ 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xdd, 0xee ];
auto vector = integer.toVector();
assert(equal(vector[], expected[]));
auto array = integer.toArray();
assert(equal(array[], expected[]));
}
{
auto integer = Integer(0x03);
ubyte[1] expected = [ 0x03 ];
auto vector = integer.toVector();
assert(equal(vector[], expected[]));
auto array = integer.toArray();
assert(equal(array[], expected[]));
}
{
ubyte[63] expected = [
@ -1448,8 +1526,8 @@ struct Integer
];
auto integer = Integer(Sign.positive, expected[]);
auto vector = integer.toVector();
assert(equal(vector[], expected[]));
auto array = integer.toArray();
assert(equal(array[], expected[]));
}
{
ubyte[14] expected = [
@ -1458,8 +1536,8 @@ struct Integer
];
auto integer = Integer(Sign.positive, expected[]);
auto vector = integer.toVector();
assert(equal(vector[], expected[]));
auto array = integer.toArray();
assert(equal(array[], expected[]));
}
}

File diff suppressed because it is too large Load Diff

View File

@ -18,7 +18,19 @@ import std.range;
import std.traits;
import tanya.memory;
package(tanya) final class RefCountedStore(T)
private template Payload(T)
{
static if (is(T == class) || is(T == interface) || isArray!T)
{
alias Payload = T;
}
else
{
alias Payload = T*;
}
}
final class RefCountedStore(T)
{
T payload;
size_t counter = 1;
@ -34,7 +46,7 @@ package(tanya) final class RefCountedStore(T)
mixin("return " ~ op ~ "counter;");
}
int opCmp(size_t counter)
int opCmp(const size_t counter)
{
if (this.counter > counter)
{
@ -50,7 +62,7 @@ package(tanya) final class RefCountedStore(T)
}
}
int opEquals(size_t counter)
int opEquals(const size_t counter)
{
return this.counter == counter;
}
@ -81,15 +93,7 @@ private void unifiedDeleter(T)(RefCountedStore!T storage,
*/
struct RefCounted(T)
{
static if (is(T == class) || is(T == interface) || isArray!T)
{
private alias Payload = T;
}
else
{
private alias Payload = T*;
}
private alias Storage = RefCountedStore!Payload;
private alias Storage = RefCountedStore!(Payload!T);
private Storage storage;
private void function(Storage storage,
@ -112,13 +116,18 @@ struct RefCounted(T)
*
* Precondition: $(D_INLINECODE allocator !is null)
*/
this()(auto ref Payload value,
this()(auto ref Payload!T value,
shared Allocator allocator = defaultAllocator)
{
this(allocator);
this.storage = allocator.make!Storage();
this.deleter = &separateDeleter!Payload;
this.deleter = &separateDeleter!(Payload!T);
move(value, this.storage.payload);
static if (__traits(isRef, value))
{
value = null;
}
}
/// Ditto.
@ -139,7 +148,7 @@ struct RefCounted(T)
{
if (count != 0)
{
++storage;
++this.storage;
}
}
@ -152,7 +161,7 @@ struct RefCounted(T)
{
if (this.storage !is null && !(this.storage.counter && --this.storage))
{
deleter(storage, allocator);
deleter(this.storage, allocator);
}
}
@ -163,34 +172,34 @@ struct RefCounted(T)
* If it is the last reference of the previously owned object,
* it will be destroyed.
*
* To reset the $(D_PSYMBOL RefCounted) assign $(D_KEYWORD null).
* To reset $(D_PSYMBOL RefCounted) assign $(D_KEYWORD null).
*
* If the allocator wasn't set before, $(D_PSYMBOL defaultAllocator) will
* be used. If you need a different allocator, create a new
* $(D_PSYMBOL RefCounted) and assign it.
*
* Params:
* rhs = $(D_KEYWORD this).
* rhs = New object.
*
* Returns: $(D_KEYWORD this).
*/
ref typeof(this) opAssign()(auto ref Payload rhs)
ref typeof(this) opAssign()(auto ref Payload!T rhs)
{
if (this.storage is null)
{
this.storage = allocator.make!Storage();
this.deleter = &separateDeleter!Payload;
this.deleter = &separateDeleter!(Payload!T);
}
else if (this.storage > 1)
{
--this.storage;
this.storage = allocator.make!Storage();
this.deleter = &separateDeleter!Payload;
this.deleter = &separateDeleter!(Payload!T);
}
else
{
// Created with refCounted. Always destroyed togethter with the pointer.
assert(this.storage.counter != 0);
finalize(this.storage.payload);
this.storage.payload = Payload.init;
this.storage.payload = Payload!T.init;
}
move(rhs, this.storage.payload);
return this;
@ -206,15 +215,13 @@ struct RefCounted(T)
else if (this.storage > 1)
{
--this.storage;
this.storage = null;
}
else
{
// Created with refCounted. Always destroyed togethter with the pointer.
assert(this.storage.counter != 0);
finalize(this.storage.payload);
this.storage.payload = Payload.init;
deleter(this.storage, allocator);
}
this.storage = null;
return this;
}
@ -229,8 +236,10 @@ struct RefCounted(T)
/**
* Returns: Reference to the owned object.
*
* Precondition: $(D_INLINECODE cound > 0).
*/
inout(Payload) get() inout pure nothrow @safe @nogc
Payload!T get() pure nothrow @safe @nogc
in
{
assert(count > 0, "Attempted to access an uninitialized reference.");
@ -240,7 +249,7 @@ struct RefCounted(T)
return storage.payload;
}
static if (isPointer!Payload)
version (D_Ddoc)
{
/**
* Params:
@ -251,6 +260,11 @@ struct RefCounted(T)
*
* Returns: Reference to the pointed value.
*/
ref T opUnary(string op)()
if (op == "*");
}
else static if (isPointer!(Payload!T))
{
ref T opUnary(string op)()
if (op == "*")
{
@ -352,6 +366,44 @@ private unittest
assert(rc.count == 1);
}
private unittest
{
auto rc = defaultAllocator.refCounted!int(5);
assert(rc.count == 1);
void func(RefCounted!int rc)
{
assert(rc.count == 2);
rc = null;
assert(!rc.isInitialized);
assert(rc.count == 0);
}
assert(rc.count == 1);
func(rc);
assert(rc.count == 1);
rc = null;
assert(!rc.isInitialized);
assert(rc.count == 0);
}
private unittest
{
auto rc = defaultAllocator.refCounted!int(5);
assert(*rc == 5);
void func(RefCounted!int rc)
{
assert(rc.count == 2);
rc = defaultAllocator.refCounted!int(4);
assert(*rc == 4);
assert(rc.count == 1);
}
func(rc);
assert(*rc == 5);
}
private unittest
{
static assert(is(typeof(RefCounted!int.storage.payload) == int*));
@ -414,13 +466,13 @@ body
auto ptr = (() @trusted => (cast(T*) mem[storageSize .. $].ptr))();
rc.storage.payload = emplace!T(ptr, args);
}
rc.deleter = &unifiedDeleter!(RefCounted!T.Payload);
rc.deleter = &unifiedDeleter!(Payload!T);
return rc;
}
/**
* Constructs a new array with $(D_PARAM size) elements and wraps it in a
* $(D_PSYMBOL RefCounted) using.
* $(D_PSYMBOL RefCounted).
*
* Params:
* T = Array type.
@ -432,7 +484,8 @@ body
* Precondition: $(D_INLINECODE allocator !is null
* && size <= size_t.max / ElementType!T.sizeof)
*/
RefCounted!T refCounted(T)(shared Allocator allocator, const size_t size)
RefCounted!T refCounted(T)(shared Allocator allocator, const size_t size)
@trusted
if (isArray!T)
in
{
@ -441,8 +494,7 @@ in
}
body
{
auto payload = cast(T) allocator.allocate(ElementType!T.sizeof * size);
initializeAll(payload);
auto payload = allocator.resize!(ElementType!T)(null, size);
return RefCounted!T(payload, allocator);
}
@ -495,3 +547,266 @@ private @nogc unittest
auto rc = defaultAllocator.refCounted!(int[])(5);
assert(rc.length == 5);
}
private @nogc unittest
{
static bool destroyed = false;
struct F
{
~this() @nogc
{
destroyed = true;
}
}
{
auto rc = defaultAllocator.refCounted!F();
}
assert(destroyed);
}
/**
* $(D_PSYMBOL Scoped) stores an object that gets destroyed at the end of its scope.
*
* Params:
* T = Value type.
*/
struct Scoped(T)
{
private Payload!T payload;
invariant
{
assert(payload is null || allocator_ !is null);
}
/**
* Takes ownership over $(D_PARAM value), setting the counter to 1.
* $(D_PARAM value) may be a pointer, an object or a dynamic array.
*
* Params:
* value = Value whose ownership is taken over.
* allocator = Allocator used to destroy the $(D_PARAM value) and to
* allocate/deallocate internal storage.
*
* Precondition: $(D_INLINECODE allocator !is null)
*/
this()(auto ref Payload!T value,
shared Allocator allocator = defaultAllocator)
{
this(allocator);
move(value, this.payload);
static if (__traits(isRef, value))
{
value = null;
}
}
/// Ditto.
this(shared Allocator allocator)
in
{
assert(allocator !is null);
}
body
{
this.allocator_ = allocator;
}
/**
* $(D_PSYMBOL Scoped) is noncopyable.
*/
@disable this(this);
/**
* Destroys the owned object.
*/
~this()
{
if (this.payload !is null)
{
allocator.dispose(this.payload);
}
}
/**
* Initialized this $(D_PARAM Scoped) and takes ownership over
* $(D_PARAM rhs).
*
* To reset $(D_PSYMBOL Scoped) assign $(D_KEYWORD null).
*
* If the allocator wasn't set before, $(D_PSYMBOL defaultAllocator) will
* be used. If you need a different allocator, create a new
* $(D_PSYMBOL Scoped) and assign it.
*
* Params:
* rhs = New object.
*
* Returns: $(D_KEYWORD this).
*/
ref typeof(this) opAssign()(auto ref Payload!T rhs)
{
allocator.dispose(this.payload);
move(rhs, this.payload);
return this;
}
/// Ditto.
ref typeof(this) opAssign(typeof(null))
{
allocator.dispose(this.payload);
return this;
}
/// Ditto.
ref typeof(this) opAssign(typeof(this) rhs)
{
swap(this.allocator_, rhs.allocator_);
swap(this.payload, rhs.payload);
return this;
}
/**
* Returns: Reference to the owned object.
*/
Payload!T get() pure nothrow @safe @nogc
{
return payload;
}
version (D_Ddoc)
{
/**
* Params:
* op = Operation.
*
* Dereferences the pointer. It is defined only for pointers, not for
* reference types like classes, that can be accessed directly.
*
* Returns: Reference to the pointed value.
*/
ref T opUnary(string op)()
if (op == "*");
}
else static if (isPointer!(Payload!T))
{
ref T opUnary(string op)()
if (op == "*")
{
return *payload;
}
}
mixin DefaultAllocator;
alias get this;
}
///
@nogc unittest
{
auto p = defaultAllocator.make!int(5);
auto s = Scoped!int(p, defaultAllocator);
assert(p is null);
assert(*s == 5);
}
///
@nogc unittest
{
static bool destroyed = false;
struct F
{
~this() @nogc
{
destroyed = true;
}
}
{
auto s = Scoped!F(defaultAllocator.make!F(), defaultAllocator);
}
assert(destroyed);
}
/**
* Constructs a new object of type $(D_PARAM T) and wraps it in a
* $(D_PSYMBOL Scoped) using $(D_PARAM args) as the parameter list for
* the constructor of $(D_PARAM T).
*
* Params:
* T = Type of the constructed object.
* A = Types of the arguments to the constructor of $(D_PARAM T).
* allocator = Allocator.
* args = Constructor arguments of $(D_PARAM T).
*
* Returns: Newly created $(D_PSYMBOL Scoped!T).
*
* Precondition: $(D_INLINECODE allocator !is null)
*/
Scoped!T scoped(T, A...)(shared Allocator allocator, auto ref A args)
if (!is(T == interface) && !isAbstractClass!T
&& !isAssociativeArray!T && !isArray!T)
in
{
assert(allocator !is null);
}
body
{
auto payload = allocator.make!(T, shared Allocator, A)(args);
return Scoped!T(payload, allocator);
}
/**
* Constructs a new array with $(D_PARAM size) elements and wraps it in a
* $(D_PSYMBOL Scoped).
*
* Params:
* T = Array type.
* size = Array size.
* allocator = Allocator.
*
* Returns: Newly created $(D_PSYMBOL Scoped!T).
*
* Precondition: $(D_INLINECODE allocator !is null
* && size <= size_t.max / ElementType!T.sizeof)
*/
Scoped!T scoped(T)(shared Allocator allocator, const size_t size)
@trusted
if (isArray!T)
in
{
assert(allocator !is null);
assert(size <= size_t.max / ElementType!T.sizeof);
}
body
{
auto payload = allocator.resize!(ElementType!T)(null, size);
return Scoped!T(payload, allocator);
}
private unittest
{
static assert(is(typeof(defaultAllocator.scoped!B(5))));
static assert(is(typeof(defaultAllocator.scoped!(int[])(5))));
}
private unittest
{
auto s = defaultAllocator.scoped!int(5);
assert(*s == 5);
s = null;
assert(s is null);
}
private unittest
{
auto s = defaultAllocator.scoped!int(5);
assert(*s == 5);
s = defaultAllocator.scoped!int(4);
assert(*s == 4);
}

View File

@ -3,7 +3,7 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/**
* Socket programming.
* Low-level socket programming.
*
* Copyright: Eugene Wissner 2016-2017.
* License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
@ -12,15 +12,16 @@
*/
module tanya.network.socket;
import tanya.memory;
import core.stdc.errno;
import core.time;
import std.algorithm.comparison;
import std.algorithm.searching;
public import std.socket : socket_t, Linger, SocketOptionLevel, SocketOption,
SocketType, AddressFamily, AddressInfo;
public import std.socket : SocketOptionLevel, SocketOption;
import std.traits;
import std.typecons;
import tanya.memory;
/// Value returned by socket operations on error.
enum int socketError = -1;
version (Posix)
{
@ -32,7 +33,12 @@ version (Posix)
import core.sys.posix.sys.time;
import core.sys.posix.unistd;
private enum SOCKET_ERROR = -1;
enum SocketType : int
{
init = -1,
}
private alias LingerField = int;
}
else version (Windows)
{
@ -43,16 +49,23 @@ else version (Windows)
import core.sys.windows.windef;
import core.sys.windows.winsock2;
enum SocketType : size_t
{
init = ~0,
}
private alias LingerField = ushort;
enum : uint
{
IOC_UNIX = 0x00000000,
IOC_WS2 = 0x08000000,
IOC_PROTOCOL = 0x10000000,
IOC_VOID = 0x20000000, /// No parameters.
IOC_OUT = 0x40000000, /// Copy parameters back.
IOC_IN = 0x80000000, /// Copy parameters into.
IOC_VOID = 0x20000000, // No parameters.
IOC_OUT = 0x40000000, // Copy parameters back.
IOC_IN = 0x80000000, // Copy parameters into.
IOC_VENDOR = 0x18000000,
IOC_INOUT = (IOC_IN | IOC_OUT), /// Copy parameter into and get back.
IOC_INOUT = (IOC_IN | IOC_OUT), // Copy parameter into and get back.
}
template _WSAIO(int x, int y)
@ -183,19 +196,26 @@ else version (Windows)
LPINT,
SOCKADDR**,
LPINT);
const GUID WSAID_GETACCEPTEXSOCKADDRS = {0xb5367df2,0xcbac,0x11cf,[0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92]};
const GUID WSAID_GETACCEPTEXSOCKADDRS = {
0xb5367df2, 0xcbac, 0x11cf,
[ 0x95, 0xca, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92 ],
};
struct WSABUF {
struct WSABUF
{
ULONG len;
CHAR* buf;
}
alias WSABUF* LPWSABUF;
struct WSAOVERLAPPED {
struct WSAOVERLAPPED
{
ULONG_PTR Internal;
ULONG_PTR InternalHigh;
union {
struct {
union
{
struct
{
DWORD Offset;
DWORD OffsetHigh;
}
@ -219,36 +239,13 @@ else version (Windows)
private WSABUF buffer;
}
/**
* Socket returned if a connection has been established.
*/
class OverlappedConnectedSocket : ConnectedSocket
{
/**
* Create a socket.
*
* Params:
* handle = Socket handle.
* af = Address family.
*/
this(socket_t handle, AddressFamily af) @nogc
this(SocketType handle, AddressFamily af) @nogc
{
super(handle, af);
}
/**
* Begins to asynchronously receive data from a connected socket.
*
* Params:
* buffer = Storage location for the received data.
* flags = Flags.
* overlapped = Unique operation identifier.
*
* Returns: $(D_KEYWORD true) if the operation could be finished synchronously.
* $(D_KEYWORD false) otherwise.
*
* Throws: $(D_PSYMBOL SocketException) if unable to receive.
*/
bool beginReceive(ubyte[] buffer,
SocketState overlapped,
Flags flags = Flags(Flag.none)) @nogc @trusted
@ -268,23 +265,13 @@ else version (Windows)
&overlapped.overlapped,
NULL);
if (result == SOCKET_ERROR && !wouldHaveBlocked)
if (result == socketError && !wouldHaveBlocked)
{
throw defaultAllocator.make!SocketException("Unable to receive");
}
return result == 0;
}
/**
* Ends a pending asynchronous read.
*
* Params
* overlapped = Unique operation identifier.
*
* Returns: Number of bytes received.
*
* Throws: $(D_PSYMBOL SocketException) if unable to receive.
*/
int endReceive(SocketState overlapped) @nogc @trusted
out (count)
{
@ -309,19 +296,6 @@ else version (Windows)
return lpNumber;
}
/**
* Sends data asynchronously to a connected socket.
*
* Params:
* buffer = Data to be sent.
* flags = Flags.
* overlapped = Unique operation identifier.
*
* Returns: $(D_KEYWORD true) if the operation could be finished synchronously.
* $(D_KEYWORD false) otherwise.
*
* Throws: $(D_PSYMBOL SocketException) if unable to send.
*/
bool beginSend(ubyte[] buffer,
SocketState overlapped,
Flags flags = Flags(Flag.none)) @nogc @trusted
@ -339,7 +313,7 @@ else version (Windows)
&overlapped.overlapped,
NULL);
if (result == SOCKET_ERROR && !wouldHaveBlocked)
if (result == socketError && !wouldHaveBlocked)
{
disconnected_ = true;
throw defaultAllocator.make!SocketException("Unable to send");
@ -347,16 +321,6 @@ else version (Windows)
return result == 0;
}
/**
* Ends a pending asynchronous send.
*
* Params
* overlapped = Unique operation identifier.
*
* Returns: Number of bytes sent.
*
* Throws: $(D_PSYMBOL SocketException) if unable to receive.
*/
int endSend(SocketState overlapped) @nogc @trusted
out (count)
{
@ -380,17 +344,9 @@ else version (Windows)
class OverlappedStreamSocket : StreamSocket
{
/// Accept extension function pointer.
// Accept extension function pointer.
package LPFN_ACCEPTEX acceptExtension;
/**
* Create a socket.
*
* Params:
* af = Address family.
*
* Throws: $(D_PSYMBOL SocketException) on errors.
*/
this(AddressFamily af) @nogc @trusted
{
super(af);
@ -412,28 +368,17 @@ else version (Windows)
&dwBytes,
NULL,
NULL);
if (!result == SOCKET_ERROR)
if (!result == socketError)
{
throw make!SocketException(defaultAllocator,
"Unable to retrieve an accept extension function pointer");
}
}
/**
* Begins an asynchronous operation to accept an incoming connection attempt.
*
* Params:
* overlapped = Unique operation identifier.
*
* Returns: $(D_KEYWORD true) if the operation could be finished synchronously.
* $(D_KEYWORD false) otherwise.
*
* Throws: $(D_PSYMBOL SocketException) on accept errors.
*/
bool beginAccept(SocketState overlapped) @nogc @trusted
{
auto socket = cast(socket_t) socket(addressFamily, SOCK_STREAM, 0);
if (socket == socket_t.init)
auto socket = cast(SocketType) socket(addressFamily, 1, 0);
if (socket == SocketType.init)
{
throw defaultAllocator.make!SocketException("Unable to create socket");
}
@ -445,7 +390,7 @@ else version (Windows)
overlapped.handle = cast(HANDLE) socket;
overlapped.event = OverlappedSocketEvent.accept;
immutable len = (sockaddr_in.sizeof + 16) * 2;
const len = (sockaddr_in.sizeof + 16) * 2;
overlapped.buffer.len = len;
overlapped.buffer.buf = cast(char*) defaultAllocator.allocate(len).ptr;
@ -465,6 +410,139 @@ else version (Windows)
return result == TRUE;
}
OverlappedConnectedSocket endAccept(SocketState overlapped)
@nogc @trusted
{
scope (exit)
{
defaultAllocator.dispose(overlapped.buffer.buf[0 .. overlapped.buffer.len]);
}
auto socket = make!OverlappedConnectedSocket(defaultAllocator,
cast(SocketType) overlapped.handle,
addressFamily);
scope (failure)
{
defaultAllocator.dispose(socket);
}
socket.setOption(SocketOptionLevel.SOCKET,
cast(SocketOption) SO_UPDATE_ACCEPT_CONTEXT,
cast(size_t) handle);
return socket;
}
}
}
else version (D_Ddoc)
{
/// Native socket representation type.
enum SocketType;
/**
* Socket returned if a connection has been established.
*
* Note: Available only on Windows.
*/
class OverlappedConnectedSocket : ConnectedSocket
{
/**
* Create a socket.
*
* Params:
* handle = Socket handle.
* af = Address family.
*/
this(SocketType handle, AddressFamily af) @nogc;
/**
* Begins to asynchronously receive data from a connected socket.
*
* Params:
* buffer = Storage location for the received data.
* flags = Flags.
* overlapped = Unique operation identifier.
*
* Returns: $(D_KEYWORD true) if the operation could be finished synchronously.
* $(D_KEYWORD false) otherwise.
*
* Throws: $(D_PSYMBOL SocketException) if unable to receive.
*/
bool beginReceive(ubyte[] buffer,
SocketState overlapped,
Flags flags = Flags(Flag.none)) @nogc @trusted;
/**
* Ends a pending asynchronous read.
*
* Params:
* overlapped = Unique operation identifier.
*
* Returns: Number of bytes received.
*
* Throws: $(D_PSYMBOL SocketException) if unable to receive.
*
* Postcondition: $(D_INLINECODE result >= 0).
*/
int endReceive(SocketState overlapped) @nogc @trusted;
/**
* Sends data asynchronously to a connected socket.
*
* Params:
* buffer = Data to be sent.
* flags = Flags.
* overlapped = Unique operation identifier.
*
* Returns: $(D_KEYWORD true) if the operation could be finished synchronously.
* $(D_KEYWORD false) otherwise.
*
* Throws: $(D_PSYMBOL SocketException) if unable to send.
*/
bool beginSend(ubyte[] buffer,
SocketState overlapped,
Flags flags = Flags(Flag.none)) @nogc @trusted;
/**
* Ends a pending asynchronous send.
*
* Params:
* overlapped = Unique operation identifier.
*
* Returns: Number of bytes sent.
*
* Throws: $(D_PSYMBOL SocketException) if unable to receive.
*
* Postcondition: $(D_INLINECODE result >= 0).
*/
int endSend(SocketState overlapped) @nogc @trusted;
}
/**
* Windows stream socket overlapped I/O.
*/
class OverlappedStreamSocket : StreamSocket
{
/**
* Create a socket.
*
* Params:
* af = Address family.
*
* Throws: $(D_PSYMBOL SocketException) on errors.
*/
this(AddressFamily af) @nogc @trusted;
/**
* Begins an asynchronous operation to accept an incoming connection attempt.
*
* Params:
* overlapped = Unique operation identifier.
*
* Returns: $(D_KEYWORD true) if the operation could be finished synchronously.
* $(D_KEYWORD false) otherwise.
*
* Throws: $(D_PSYMBOL SocketException) on accept errors.
*/
bool beginAccept(SocketState overlapped) @nogc @trusted;
/**
* Asynchronously accepts an incoming connection attempt and creates a
* new socket to handle remote host communication.
@ -476,24 +554,116 @@ else version (Windows)
*
* Throws: $(D_PSYMBOL SocketException) if unable to accept.
*/
OverlappedConnectedSocket endAccept(SocketState overlapped) @nogc @trusted
OverlappedConnectedSocket endAccept(SocketState overlapped)
@nogc @trusted;
}
}
/**
* Socket option that specifies what should happen when the socket that
* promises reliable delivery still has untransmitted messages when
* it is closed.
*/
struct Linger
{
/// If nonzero, $(D_PSYMBOL close) and $(D_PSYMBOL shutdown) block until
/// the data are transmitted or the timeout period has expired.
LingerField l_onoff;
/// Time, in seconds to wait before any buffered data to be sent is
/// discarded.
LingerField l_linger;
/**
* If $(D_PARAM timeout) is `0`, linger is disabled, otherwise enables the
* linger and sets the timeout.
*
* Params:
* timeout = Timeout, in seconds.
*/
this(const ushort timeout)
{
time = timeout;
}
///
unittest
{
{
scope (exit)
{
defaultAllocator.dispose(overlapped.buffer.buf[0 .. overlapped.buffer.len]);
}
auto socket = make!OverlappedConnectedSocket(defaultAllocator,
cast(socket_t) overlapped.handle,
addressFamily);
scope (failure)
{
defaultAllocator.dispose(socket);
}
socket.setOption(SocketOptionLevel.SOCKET,
cast(SocketOption) SO_UPDATE_ACCEPT_CONTEXT,
cast(size_t) handle);
return socket;
auto linger = Linger(5);
assert(linger.enabled);
assert(linger.time == 5);
}
{
auto linger = Linger(0);
assert(!linger.enabled);
}
{ // Default constructor.
Linger linger;
assert(!linger.enabled);
}
}
/**
* System dependent constructor.
*
* Params:
* l_onoff = $(D_PSYMBOL l_onoff) value.
* l_linger = $(D_PSYMBOL l_linger) value.
*/
this(LingerField l_onoff, LingerField l_linger)
{
this.l_onoff = l_onoff;
this.l_linger = l_linger;
}
///
unittest
{
auto linger = Linger(1, 5);
assert(linger.l_onoff == 1);
assert(linger.l_linger == 5);
}
/**
* Params:
* value = Whether to linger after the socket is closed.
*
* See_Also: $(D_PSYMBOL time).
*/
@property enabled(const bool value) pure nothrow @safe @nogc
{
this.l_onoff = value;
}
/**
* Returns: Whether to linger after the socket is closed.
*/
@property bool enabled() const pure nothrow @safe @nogc
{
return this.l_onoff != 0;
}
/**
* Returns: Timeout period, in seconds, to wait before closing the socket
* if the $(D_PSYMBOL Linger) is $(D_PSYMBOL enabled).
*/
@property ushort time() const pure nothrow @safe @nogc
{
return this.l_linger & ushort.max;
}
/**
* Sets timeout period, to wait before closing the socket if the
* $(D_PSYMBOL Linger) is $(D_PSYMBOL enabled), ignored otherwise.
*
* Params:
* timeout = Timeout period, in seconds.
*/
@property void time(const ushort timeout) pure nothrow @safe @nogc
{
this.l_onoff = timeout > 0;
this.l_linger = timeout;
}
}
@ -525,7 +695,7 @@ else version (DragonFlyBSD)
version (MacBSD)
{
enum ESOCKTNOSUPPORT = 44; /// Socket type not suppoted.
enum ESOCKTNOSUPPORT = 44; // Socket type not suppoted.
}
private immutable
@ -552,12 +722,32 @@ shared static this()
}
}
/**
* $(D_PSYMBOL AddressFamily) specifies a communication domain; this selects
* the protocol family which will be used for communication.
*/
enum AddressFamily : int
{
unspec = 0, /// Unspecified.
local = 1, /// Local to host (pipes and file-domain).
unix = local, /// POSIX name for PF_LOCAL.
inet = 2, /// IP protocol family.
ax25 = 3, /// Amateur Radio AX.25.
ipx = 4, /// Novell Internet Protocol.
appletalk = 5, /// Appletalk DDP.
netrom = 6, /// Amateur radio NetROM.
bridge = 7, /// Multiprotocol bridge.
atmpvc = 8, /// ATM PVCs.
x25 = 9, /// Reserved for X.25 project.
inet6 = 10, /// IP version 6.
}
/**
* Error codes for $(D_PSYMBOL Socket).
*/
enum SocketError : int
{
/// Unknown error
/// Unknown error.
unknown = 0,
/// Firewall rules forbid connection.
accessDenied = EPERM,
@ -587,12 +777,12 @@ enum SocketError : int
/**
* $(D_PSYMBOL SocketException) should be thrown only if one of the socket functions
* returns -1 or $(D_PSYMBOL SOCKET_ERROR) and sets $(D_PSYMBOL errno),
* because $(D_PSYMBOL SocketException) relies on the $(D_PSYMBOL errno) value.
* $(D_PSYMBOL socketError) and sets $(D_PSYMBOL errno), because
* $(D_PSYMBOL SocketException) relies on the $(D_PSYMBOL errno) value.
*/
class SocketException : Exception
{
immutable SocketError error = SocketError.unknown;
const SocketError error = SocketError.unknown;
/**
* Params:
@ -680,16 +870,16 @@ abstract class Socket
}
/// Socket handle.
protected socket_t handle_;
protected SocketType handle_;
/// Address family.
protected AddressFamily family;
private @property void handle(socket_t handle) @nogc
private @property void handle(SocketType handle) @nogc
in
{
assert(handle != socket_t.init);
assert(handle_ == socket_t.init, "Socket handle cannot be changed");
assert(handle != SocketType.init);
assert(handle_ == SocketType.init, "Socket handle cannot be changed");
}
body
{
@ -703,7 +893,7 @@ abstract class Socket
}
}
@property inout(socket_t) handle() inout const pure nothrow @safe @nogc
@property inout(SocketType) handle() inout const pure nothrow @safe @nogc
{
return handle_;
}
@ -715,10 +905,10 @@ abstract class Socket
* handle = Socket.
* af = Address family.
*/
this(socket_t handle, AddressFamily af) @nogc
this(SocketType handle, AddressFamily af) @nogc
in
{
assert(handle != socket_t.init);
assert(handle != SocketType.init);
}
body
{
@ -759,7 +949,7 @@ abstract class Socket
cast(int) level,
cast(int) option,
result.ptr,
&length) == SOCKET_ERROR)
&length) == socketError)
{
throw defaultAllocator.make!SocketException("Unable to get socket option");
}
@ -771,7 +961,7 @@ abstract class Socket
SocketOption option,
out size_t result) const @trusted @nogc
{
return getOption(level, option, (&result)[0..1]);
return getOption(level, option, (&result)[0 .. 1]);
}
/// Ditto.
@ -779,7 +969,7 @@ abstract class Socket
SocketOption option,
out Linger result) const @trusted @nogc
{
return getOption(level, option, (&result.clinger)[0..1]);
return getOption(level, option, (&result)[0 .. 1]);
}
/// Ditto.
@ -792,7 +982,7 @@ abstract class Socket
version (Posix)
{
timeval tv;
auto ret = getOption(level, option, (&tv)[0..1]);
auto ret = getOption(level, option, (&tv)[0 .. 1]);
result = dur!"seconds"(tv.tv_sec) + dur!"usecs"(tv.tv_usec);
}
else version (Windows)
@ -826,7 +1016,7 @@ abstract class Socket
cast(int)level,
cast(int)option,
value.ptr,
cast(uint) value.length) == SOCKET_ERROR)
cast(uint) value.length) == socketError)
{
throw defaultAllocator.make!SocketException("Unable to set socket option");
}
@ -836,14 +1026,14 @@ abstract class Socket
void setOption(SocketOptionLevel level, SocketOption option, size_t value)
const @trusted @nogc
{
setOption(level, option, (&value)[0..1]);
setOption(level, option, (&value)[0 .. 1]);
}
/// Ditto.
void setOption(SocketOptionLevel level, SocketOption option, Linger value)
const @trusted @nogc
{
setOption(level, option, (&value.clinger)[0..1]);
setOption(level, option, (&value)[0 .. 1]);
}
/// Ditto.
@ -854,7 +1044,7 @@ abstract class Socket
{
timeval tv;
value.split!("seconds", "usecs")(tv.tv_sec, tv.tv_usec);
setOption(level, option, (&tv)[0..1]);
setOption(level, option, (&tv)[0 .. 1]);
}
else version (Windows)
{
@ -878,7 +1068,7 @@ abstract class Socket
}
else version (Windows)
{
return blocking_;
return this.blocking_;
}
}
@ -892,12 +1082,12 @@ abstract class Socket
{
int fl = fcntl(handle_, F_GETFL, 0);
if (fl != SOCKET_ERROR)
if (fl != socketError)
{
fl = yes ? fl & ~O_NONBLOCK : fl | O_NONBLOCK;
fl = fcntl(handle_, F_SETFL, fl);
}
if (fl == SOCKET_ERROR)
if (fl == socketError)
{
throw make!SocketException(defaultAllocator,
"Unable to set socket blocking");
@ -906,12 +1096,12 @@ abstract class Socket
else version (Windows)
{
uint num = !yes;
if (ioctlsocket(handle_, FIONBIO, &num) == SOCKET_ERROR)
if (ioctlsocket(handle_, FIONBIO, &num) == socketError)
{
throw make!SocketException(defaultAllocator,
"Unable to set socket blocking");
}
blocking_ = yes;
this.blocking_ = yes;
}
}
@ -963,7 +1153,7 @@ abstract class Socket
{
.close(handle_);
}
handle_ = socket_t.init;
handle_ = SocketType.init;
}
/**
@ -971,12 +1161,12 @@ abstract class Socket
* can $(D_PSYMBOL listen).
*
* Params:
* backlog = Request of how many pending incoming connections are
* backlog = Request of how many pending incoming connections are
* queued until $(D_PSYMBOL accept)ed.
*/
void listen(int backlog) const @trusted @nogc
{
if (.listen(handle_, backlog) == SOCKET_ERROR)
if (.listen(handle_, backlog) == socketError)
{
throw defaultAllocator.make!SocketException("Unable to listen on socket");
}
@ -1029,8 +1219,8 @@ class StreamSocket : Socket, ConnectionOrientedSocket
*/
this(AddressFamily af) @trusted @nogc
{
auto handle = cast(socket_t) socket(af, SOCK_STREAM, 0);
if (handle == socket_t.init)
auto handle = cast(SocketType) socket(af, 1, 0);
if (handle == SocketType.init)
{
throw defaultAllocator.make!SocketException("Unable to create socket");
}
@ -1047,7 +1237,7 @@ class StreamSocket : Socket, ConnectionOrientedSocket
*/
void bind(Address address) const @trusted @nogc
{
if (.bind(handle_, address.name, address.length) == SOCKET_ERROR)
if (.bind(handle_, address.name, address.length) == socketError)
{
throw defaultAllocator.make!SocketException("Unable to bind socket");
}
@ -1066,7 +1256,7 @@ class StreamSocket : Socket, ConnectionOrientedSocket
*/
ConnectedSocket accept() @trusted @nogc
{
socket_t sock;
SocketType sock;
version (linux)
{
@ -1075,14 +1265,14 @@ class StreamSocket : Socket, ConnectionOrientedSocket
{
flags |= SOCK_NONBLOCK;
}
sock = cast(socket_t).accept4(handle_, null, null, flags);
sock = cast(SocketType).accept4(handle_, null, null, flags);
}
else
{
sock = cast(socket_t).accept(handle_, null, null);
sock = cast(SocketType).accept(handle_, null, null);
}
if (sock == socket_t.init)
if (sock == SocketType.init)
{
if (wouldHaveBlocked())
{
@ -1147,7 +1337,7 @@ class ConnectedSocket : Socket, ConnectionOrientedSocket
* handle = Socket.
* af = Address family.
*/
this(socket_t handle, AddressFamily af) @nogc
this(SocketType handle, AddressFamily af) @nogc
{
super(handle, af);
}
@ -1196,7 +1386,7 @@ class ConnectedSocket : Socket, ConnectionOrientedSocket
{
disconnected_ = true;
}
else if (ret == SOCKET_ERROR)
else if (ret == socketError)
{
if (wouldHaveBlocked())
{
@ -1233,7 +1423,7 @@ class ConnectedSocket : Socket, ConnectionOrientedSocket
}
sent = .send(handle_, buf.ptr, capToMaxBuffer(buf.length), sendFlags);
if (sent != SOCKET_ERROR)
if (sent != socketError)
{
return sent;
}
@ -1273,14 +1463,11 @@ class InternetAddress : Address
/// Internal internet address representation.
protected sockaddr_storage storage;
}
immutable ushort port_;
const ushort port_;
enum
{
anyPort = 0,
}
enum ushort anyPort = 0;
this(in string host, ushort port = anyPort) @nogc
this(string host, const ushort port = anyPort) @nogc
{
if (getaddrinfoPointer is null || freeaddrinfoPointer is null)
{
@ -1288,11 +1475,11 @@ class InternetAddress : Address
"Address info lookup is not available on this system");
}
addrinfo* ai_res;
port_ = port;
this.port_ = port;
// Make C-string from host.
auto node = cast(char[]) allocator.allocate(host.length + 1);
node[0.. $ - 1] = host;
node[0 .. $ - 1] = host;
node[$ - 1] = '\0';
scope (exit)
{
@ -1304,18 +1491,19 @@ class InternetAddress : Address
const(char)* servicePointer;
if (port)
{
ushort originalPort = port;
ushort start;
for (ushort j = 10, i = 4; i > 0; j *= 10, --i)
{
ushort rest = port % 10;
ushort rest = originalPort % 10;
if (rest != 0)
{
service[i] = cast(char) (rest + '0');
start = i;
}
port /= 10;
originalPort /= 10;
}
servicePointer = service[start..$].ptr;
servicePointer = service[start .. $].ptr;
}
auto ret = getaddrinfoPointer(node.ptr, servicePointer, null, &ai_res);
@ -1327,18 +1515,29 @@ class InternetAddress : Address
{
freeaddrinfoPointer(ai_res);
}
ubyte* dp = cast(ubyte*) &storage, sp = cast(ubyte*) ai_res.ai_addr;
for (auto i = ai_res.ai_addrlen; i > 0; --i, *dp++, *sp++)
{
*dp = *sp;
}
if (ai_res.ai_family != AddressFamily.INET && ai_res.ai_family != AddressFamily.INET6)
if (ai_res.ai_family != AddressFamily.inet && ai_res.ai_family != AddressFamily.inet6)
{
throw defaultAllocator.make!SocketException("Wrong address family");
}
}
///
unittest
{
auto address = defaultAllocator.make!InternetAddress("127.0.0.1");
assert(address.port == InternetAddress.anyPort);
assert(address.name !is null);
assert(address.family == AddressFamily.inet);
defaultAllocator.dispose(address);
}
/**
* Returns: Pointer to underlying $(D_PSYMBOL sockaddr) structure.
*/
@ -1355,9 +1554,9 @@ class InternetAddress : Address
// FreeBSD wants to know the exact length of the address on bind.
switch (family)
{
case AddressFamily.INET:
case AddressFamily.inet:
return sockaddr_in.sizeof;
case AddressFamily.INET6:
case AddressFamily.inet6:
return sockaddr_in6.sizeof;
default:
assert(false);
@ -1376,6 +1575,15 @@ class InternetAddress : Address
{
return port_;
}
///
unittest
{
auto address = defaultAllocator.make!InternetAddress("127.0.0.1",
cast(ushort) 1234);
assert(address.port == 1234);
defaultAllocator.dispose(address);
}
}
/**

107
source/tanya/typecons.d Normal file
View File

@ -0,0 +1,107 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/**
* Type constructors.
*
* This module contains templates that allow to build new types from the
* available ones.
*
* Copyright: Eugene Wissner 2017.
* License: $(LINK2 https://www.mozilla.org/en-US/MPL/2.0/,
* Mozilla Public License, v. 2.0).
* Authors: $(LINK2 mailto:info@caraus.de, Eugene Wissner)
*/
module tanya.typecons;
import std.meta;
/**
* $(D_PSYMBOL Pair) can store two heterogeneous objects.
*
* The objects can by accessed by index as $(D_INLINECODE obj[0]) and
* $(D_INLINECODE obj[1]) or by optional names (e.g.
* $(D_INLINECODE obj.first)).
*
* $(D_PARAM Specs) contains a list of object types and names. First
* comes the object type, then an optional string containing the name.
* If you want the object be accessible only by its index (`0` or `1`),
* just skip the name.
*
* Params:
* Specs = Field types and names.
*/
template Pair(Specs...)
{
template parseSpecs(int fieldCount, Specs...)
{
static if (Specs.length == 0)
{
alias parseSpecs = AliasSeq!();
}
else static if (is(Specs[0]) && fieldCount < 2)
{
static if (is(typeof(Specs[1]) == string))
{
alias parseSpecs
= AliasSeq!(Specs[0],
parseSpecs!(fieldCount + 1, Specs[2 .. $]));
}
else
{
alias parseSpecs
= AliasSeq!(Specs[0],
parseSpecs!(fieldCount + 1, Specs[1 .. $]));
}
}
else
{
static assert(false, "Invalid argument: " ~ Specs[0].stringof);
}
}
struct Pair
{
/// Field types.
alias Types = parseSpecs!(0, Specs);
static assert(Types.length == 2, "Invalid argument count.");
// Create field aliases.
static if (is(typeof(Specs[1]) == string))
{
mixin("alias " ~ Specs[1] ~ " = expand[0];");
}
static if (is(typeof(Specs[2]) == string))
{
mixin("alias " ~ Specs[2] ~ " = expand[1];");
}
else static if (is(typeof(Specs[3]) == string))
{
mixin("alias " ~ Specs[3] ~ " = expand[1];");
}
/// Represents the values of the $(D_PSYMBOL Pair) as a list of values.
Types expand;
alias expand this;
}
}
///
unittest
{
static assert(is(Pair!(int, int)));
static assert(!is(Pair!(int, 5)));
static assert(is(Pair!(int, "first", int)));
static assert(is(Pair!(int, "first", int, "second")));
static assert(is(Pair!(int, "first", int)));
static assert(is(Pair!(int, int, "second")));
static assert(!is(Pair!("first", int, "second", int)));
static assert(!is(Pair!(int, int, int)));
static assert(!is(Pair!(int, "first")));
}