Add scalar type template parameter for buffers

This commit is contained in:
Eugen Wissner 2016-12-19 21:24:28 +01:00
parent f1bc4dc2e2
commit e32af2d09e
5 changed files with 582 additions and 573 deletions

View File

@ -30,7 +30,7 @@ class IOCPStreamTransport : StreamTransport
{ {
private OverlappedConnectedSocket socket_; private OverlappedConnectedSocket socket_;
private WriteBuffer input; private WriteBuffer!ubyte input;
/** /**
* Creates new completion port transport. * Creates new completion port transport.
@ -45,12 +45,7 @@ class IOCPStreamTransport : StreamTransport
body body
{ {
socket_ = socket; socket_ = socket;
input = MmapPool.instance.make!WriteBuffer(8192, MmapPool.instance); input = WriteBuffer!ubyte(8192, MmapPool.instance);
}
~this()
{
MmapPool.instance.dispose(input);
} }
@property inout(OverlappedConnectedSocket) socket() @property inout(OverlappedConnectedSocket) socket()

View File

@ -30,7 +30,7 @@ class SelectorStreamTransport : StreamTransport
private ConnectedSocket socket_; private ConnectedSocket socket_;
/// Input buffer. /// Input buffer.
package WriteBuffer input; package WriteBuffer!ubyte input;
private SelectorLoop loop; private SelectorLoop loop;
@ -46,15 +46,7 @@ class SelectorStreamTransport : StreamTransport
{ {
socket_ = socket; socket_ = socket;
this.loop = loop; this.loop = loop;
input = MmapPool.instance.make!WriteBuffer(8192, MmapPool.instance); input = WriteBuffer!ubyte(8192, MmapPool.instance);
}
/**
* Close the transport and deallocate the data buffers.
*/
~this() @nogc
{
MmapPool.instance.dispose(input);
} }
/** /**

View File

@ -139,7 +139,7 @@ class IOWatcher : ConnectionWatcher
/** /**
* Returns: Underlying output buffer. * Returns: Underlying output buffer.
*/ */
package ReadBuffer output; package ReadBuffer!ubyte output;
/** /**
* Params: * Params:
@ -157,7 +157,7 @@ class IOWatcher : ConnectionWatcher
super(); super();
transport_ = transport; transport_ = transport;
protocol_ = protocol; protocol_ = protocol;
output = MmapPool.instance.make!ReadBuffer(8192, 1024, MmapPool.instance); output = ReadBuffer!ubyte(8192, 1024, MmapPool.instance);
active = true; active = true;
} }
@ -166,7 +166,6 @@ class IOWatcher : ConnectionWatcher
*/ */
protected ~this() @nogc protected ~this() @nogc
{ {
MmapPool.instance.dispose(output);
MmapPool.instance.dispose(protocol_); MmapPool.instance.dispose(protocol_);
} }

View File

@ -10,6 +10,7 @@
*/ */
module tanya.container.buffer; module tanya.container.buffer;
import std.traits;
import tanya.memory; import tanya.memory;
version (unittest) version (unittest)
@ -42,36 +43,36 @@ version (unittest)
* available data. But only one asynchronous call at a time is supported. Be * available data. But only one asynchronous call at a time is supported. Be
* sure to call $(D_PSYMBOL ReadBuffer.clear()) before you append the result * sure to call $(D_PSYMBOL ReadBuffer.clear()) before you append the result
* of the pended asynchronous call. * of the pended asynchronous call.
*
* Params:
* T = Buffer type.
*/ */
class ReadBuffer struct ReadBuffer(T = ubyte)
if (isScalarType!T)
{ {
/// Internal buffer. /// Internal buffer.
protected ubyte[] buffer_; private T[] buffer_;
/// Filled buffer length. /// Filled buffer length.
protected size_t length_; private size_t length_;
/// Start of available data. /// Start of available data.
protected size_t start; private size_t start;
/// Last position returned with $(D_KEYWORD []). /// Last position returned with $(D_KEYWORD []).
protected size_t ring; private size_t ring;
/// Available space. /// Available space.
protected immutable size_t minAvailable; private immutable size_t minAvailable = 1024;
/// Size by which the buffer will grow. /// Size by which the buffer will grow.
protected immutable size_t blockSize; private immutable size_t blockSize = 8192;
/// Allocator. invariant
protected shared Allocator allocator;
@nogc invariant
{ {
assert(length_ <= buffer_.length); assert(length_ <= buffer_.length);
assert(blockSize > 0); assert(blockSize > 0);
assert(minAvailable > 0); assert(minAvailable > 0);
assert(allocator !is null);
} }
/** /**
@ -85,38 +86,47 @@ class ReadBuffer
* $(D_PSYMBOL free) < $(D_PARAM minAvailable)). * $(D_PSYMBOL free) < $(D_PARAM minAvailable)).
* allocator = Allocator. * allocator = Allocator.
*/ */
this(size_t size = 8192, this(in size_t size,
size_t minAvailable = 1024, in size_t minAvailable = 1024,
shared Allocator allocator = defaultAllocator) @nogc shared Allocator allocator = defaultAllocator) @trusted
{ {
this(allocator_);
this.minAvailable = minAvailable; this.minAvailable = minAvailable;
this.blockSize = size; this.blockSize = size;
this.allocator = allocator; buffer_ = cast(T[]) allocator_.allocate(size * T.sizeof);
allocator.resizeArray!ubyte(buffer_, size); }
/// Ditto.
this(shared Allocator allocator)
in
{
assert(allocator_ is null);
}
body
{
allocator_ = allocator;
} }
/** /**
* Deallocates the internal buffer. * Deallocates the internal buffer.
*/ */
~this() @nogc ~this() @trusted
{ {
allocator.dispose(buffer_); allocator.deallocate(buffer_);
} }
/// ///
unittest unittest
{ {
auto b = defaultAllocator.make!ReadBuffer; ReadBuffer!ubyte b;
assert(b.capacity == 8192); assert(b.capacity == 0);
assert(b.length == 0); assert(b.length == 0);
defaultAllocator.dispose(b);
} }
/** /**
* Returns: The size of the internal buffer. * Returns: The size of the internal buffer.
*/ */
@property size_t capacity() const @nogc @safe pure nothrow @property size_t capacity() const
{ {
return buffer_.length; return buffer_.length;
} }
@ -124,26 +134,28 @@ class ReadBuffer
/** /**
* Returns: Data size. * Returns: Data size.
*/ */
@property size_t length() const @nogc @safe pure nothrow @property size_t length() const
{ {
return length_ - start; return length_ - start;
} }
/// Ditto.
alias opDollar = length;
/** /**
* Clears the buffer. * Clears the buffer.
* *
* Returns: $(D_KEYWORD this). * Returns: $(D_KEYWORD this).
*/ */
ReadBuffer clear() pure nothrow @safe @nogc void clear()
{ {
start = length_ = ring; start = length_ = ring;
return this;
} }
/** /**
* Returns: Available space. * Returns: Available space.
*/ */
@property size_t free() const pure nothrow @safe @nogc @property size_t free() const
{ {
return length > ring ? capacity - length : capacity - ring; return length > ring ? capacity - length : capacity - ring;
} }
@ -151,19 +163,17 @@ class ReadBuffer
/// ///
unittest unittest
{ {
auto b = defaultAllocator.make!ReadBuffer; ReadBuffer!ubyte b;
size_t numberRead; size_t numberRead;
// Fills the buffer with values 0..10 assert(b.free == 0);
assert(b.free == b.blockSize);
// Fills the buffer with values 0..10
numberRead = fillBuffer(b[], b.free, 0, 10); numberRead = fillBuffer(b[], b.free, 0, 10);
b += numberRead; b += numberRead;
assert(b.free == b.blockSize - numberRead); assert(b.free == b.blockSize - numberRead);
b.clear(); b.clear();
assert(b.free == b.blockSize); assert(b.free == b.blockSize);
defaultAllocator.dispose(b);
} }
/** /**
@ -174,7 +184,7 @@ class ReadBuffer
* *
* Returns: $(D_KEYWORD this). * Returns: $(D_KEYWORD this).
*/ */
ReadBuffer opOpAssign(string op)(size_t length) @nogc ref ReadBuffer opOpAssign(string op)(in size_t length)
if (op == "+") if (op == "+")
{ {
length_ += length; length_ += length;
@ -185,7 +195,7 @@ class ReadBuffer
/// ///
unittest unittest
{ {
auto b = defaultAllocator.make!ReadBuffer; ReadBuffer!ubyte b;
size_t numberRead; size_t numberRead;
ubyte[] result; ubyte[] result;
@ -212,16 +222,6 @@ class ReadBuffer
assert(result[9] == 9); assert(result[9] == 9);
assert(result[10] == 20); assert(result[10] == 20);
assert(result[14] == 24); assert(result[14] == 24);
defaultAllocator.dispose(b);
}
/**
* Returns: Length of available data.
*/
@property size_t opDollar() const pure nothrow @safe @nogc
{
return length;
} }
/** /**
@ -231,7 +231,7 @@ class ReadBuffer
* *
* Returns: Array between $(D_PARAM start) and $(D_PARAM end). * Returns: Array between $(D_PARAM start) and $(D_PARAM end).
*/ */
@property ubyte[] opSlice(size_t start, size_t end) pure nothrow @safe @nogc T[] opSlice(in size_t start, in size_t end)
{ {
return buffer_[this.start + start .. this.start + end]; return buffer_[this.start + start .. this.start + end];
} }
@ -243,11 +243,11 @@ class ReadBuffer
* *
* Returns: A free chunk of the buffer. * Returns: A free chunk of the buffer.
*/ */
ubyte[] opIndex() @nogc T[] opIndex()
{ {
if (start > 0) if (start > 0)
{ {
auto ret = buffer_[0..start]; auto ret = buffer_[0 .. start];
ring = 0; ring = 0;
return ret; return ret;
} }
@ -255,17 +255,23 @@ class ReadBuffer
{ {
if (capacity - length < minAvailable) if (capacity - length < minAvailable)
{ {
allocator.resizeArray!ubyte(buffer_, capacity + blockSize); void[] buf = buffer_;
immutable cap = capacity;
() @trusted {
allocator.reallocate(buf, (cap + blockSize) * T.sizeof);
buffer_ = cast(T[]) buf;
}();
buffer_[cap .. $] = T.init;
} }
ring = length_; ring = length_;
return buffer_[length_..$]; return buffer_[length_ .. $];
} }
} }
/// ///
unittest unittest
{ {
auto b = defaultAllocator.make!ReadBuffer; ReadBuffer!ubyte b;
size_t numberRead; size_t numberRead;
ubyte[] result; ubyte[] result;
@ -279,46 +285,51 @@ class ReadBuffer
assert(result[9] == 9); assert(result[9] == 9);
b.clear(); b.clear();
assert(b.length == 0); assert(b.length == 0);
defaultAllocator.dispose(b);
} }
mixin DefaultAllocator;
}
private unittest
{
static assert(is(ReadBuffer!int));
} }
/** /**
* Circular, self-expanding buffer with overflow support. Can be used with * Circular, self-expanding buffer with overflow support. Can be used with
* functions returning returning the number of the transferred bytes. * functions returning the number of the transferred bytes.
* *
* The buffer is optimized for situations where you read all the data from it * The buffer is optimized for situations where you read all the data from it
* at once (without writing to it occasionally). It can become ineffective if * at once (without writing to it occasionally). It can become ineffective if
* you permanently keep some data in the buffer and alternate writing and * you permanently keep some data in the buffer and alternate writing and
* reading, because it may allocate and move elements. * reading, because it may allocate and move elements.
*
* Params:
* T = Buffer type.
*/ */
class WriteBuffer struct WriteBuffer(T = ubyte)
if (isScalarType!T)
{ {
/// Internal buffer. /// Internal buffer.
protected ubyte[] buffer_; private T[] buffer_;
/// Buffer start position. /// Buffer start position.
protected size_t start; private size_t start;
/// Buffer ring area size. After this position begins buffer overflow area. /// Buffer ring area size. After this position begins buffer overflow area.
protected size_t ring; private size_t ring;
/// Size by which the buffer will grow. /// Size by which the buffer will grow.
protected immutable size_t blockSize; private immutable size_t blockSize;
/// The position of the free area in the buffer. /// The position of the free area in the buffer.
protected size_t position; private size_t position;
/// Allocator. invariant
protected shared Allocator allocator;
@nogc invariant
{ {
assert(blockSize > 0); assert(blockSize > 0);
// position can refer to an element outside the buffer if the buffer is full. // Position can refer to an element outside the buffer if the buffer is full.
assert(position <= buffer_.length); assert(position <= buffer_.length);
assert(allocator !is null);
} }
/** /**
@ -327,40 +338,57 @@ class WriteBuffer
* grow. * grow.
* allocator = Allocator. * allocator = Allocator.
*/ */
this(size_t size = 8192, shared Allocator allocator = defaultAllocator) this(in size_t size, shared Allocator allocator = defaultAllocator) @trusted
@nogc in
{
assert(size > 0);
}
body
{ {
this.allocator = allocator;
blockSize = size; blockSize = size;
ring = size - 1; ring = size - 1;
allocator.resizeArray!ubyte(buffer_, size); this(allocator);
buffer_ = cast(T[]) allocator_.allocate(size * T.sizeof);
}
@disable this();
/// Ditto.
this(shared Allocator allocator)
in
{
assert(allocator !is null);
}
body
{
allocator_ = allocator;
} }
/** /**
* Deallocates the internal buffer. * Deallocates the internal buffer.
*/ */
~this() @nogc ~this()
{ {
allocator.dispose(buffer_); allocator.deallocate(buffer_);
} }
/** /**
* Returns: The size of the internal buffer. * Returns: The size of the internal buffer.
*/ */
@property size_t capacity() const @nogc @safe pure nothrow @property size_t capacity() const
{ {
return buffer_.length; return buffer_.length;
} }
/** /**
* Note that $(D_PSYMBOL length) doesn't return the real length of the data, * Note that $(D_PSYMBOL length) doesn't return the real length of the data,
* but only the array length that will be returned with $(D_PSYMBOL buffer) * but only the array length that will be returned with $(D_PSYMBOL opIndex)
* next time. Be sure to call $(D_PSYMBOL buffer) and set $(D_KEYWORD +=) * next time. Be sure to call $(D_PSYMBOL opIndex) and set $(D_KEYWORD +=)
* until $(D_PSYMBOL length) returns 0. * until $(D_PSYMBOL length) returns 0.
* *
* Returns: Data size. * Returns: Data size.
*/ */
@property size_t length() const @nogc @safe pure nothrow @property size_t length() const
{ {
if (position > ring || position < start) // Buffer overflowed if (position > ring || position < start) // Buffer overflowed
{ {
@ -372,18 +400,13 @@ class WriteBuffer
} }
} }
/** /// Ditto.
* Returns: Length of available data. alias opDollar = length;
*/
@property size_t opDollar() const pure nothrow @safe @nogc
{
return length;
}
/// ///
unittest unittest
{ {
auto b = defaultAllocator.make!WriteBuffer(4); auto b = WriteBuffer!ubyte(4);
ubyte[3] buf = [48, 23, 255]; ubyte[3] buf = [48, 23, 255];
b ~= buf; b ~= buf;
@ -400,14 +423,12 @@ class WriteBuffer
assert(b.length == 5); assert(b.length == 5);
b += b.length; b += b.length;
assert(b.length == 0); assert(b.length == 0);
defaultAllocator.dispose(b);
} }
/** /**
* Returns: Available space. * Returns: Available space.
*/ */
@property size_t free() const @nogc @safe pure nothrow @property size_t free() const
{ {
return capacity - length; return capacity - length;
} }
@ -416,9 +437,9 @@ class WriteBuffer
* Appends data to the buffer. * Appends data to the buffer.
* *
* Params: * Params:
* buffer = Buffer chunk got with $(D_PSYMBOL buffer). * buffer = Buffer chunk got with $(D_PSYMBOL opIndex).
*/ */
WriteBuffer opOpAssign(string op)(ubyte[] buffer) @nogc ref WriteBuffer opOpAssign(string op)(in T[] buffer)
if (op == "~") if (op == "~")
{ {
size_t end, start; size_t end, start;
@ -433,7 +454,7 @@ class WriteBuffer
end = afterRing; end = afterRing;
} }
start = end - position; start = end - position;
buffer_[position..end] = buffer[0..start]; buffer_[position .. end] = buffer[0 .. start];
if (end == afterRing) if (end == afterRing)
{ {
position = this.start == 0 ? afterRing : 0; position = this.start == 0 ? afterRing : 0;
@ -453,7 +474,7 @@ class WriteBuffer
end = this.start; end = this.start;
} }
auto areaEnd = end - position + start; auto areaEnd = end - position + start;
buffer_[position..end] = buffer[start..areaEnd]; buffer_[position .. end] = buffer[start .. areaEnd];
position = end == this.start ? ring + 1 : end - position; position = end == this.start ? ring + 1 : end - position;
start = areaEnd; start = areaEnd;
} }
@ -464,11 +485,14 @@ class WriteBuffer
end = position + buffer.length - start; end = position + buffer.length - start;
if (end > capacity) if (end > capacity)
{ {
auto newSize = end / blockSize * blockSize + blockSize; auto newSize = (end / blockSize * blockSize + blockSize) * T.sizeof;
() @trusted {
allocator.resizeArray!ubyte(buffer_, newSize); void[] buf = buffer_;
allocator.reallocate(buf, newSize);
buffer_ = cast(T[]) buf;
}();
} }
buffer_[position..end] = buffer[start..$]; buffer_[position .. end] = buffer[start .. $];
position = end; position = end;
if (this.start == 0) if (this.start == 0)
{ {
@ -482,7 +506,7 @@ class WriteBuffer
/// ///
unittest unittest
{ {
auto b = defaultAllocator.make!WriteBuffer(4); auto b = WriteBuffer!ubyte(4);
ubyte[3] buf = [48, 23, 255]; ubyte[3] buf = [48, 23, 255];
b ~= buf; b ~= buf;
@ -501,30 +525,25 @@ class WriteBuffer
assert(b.buffer_[0] == 23 && b.buffer_[1] == 255 assert(b.buffer_[0] == 23 && b.buffer_[1] == 255
&& b.buffer_[2] == 48 && b.buffer_[3] == 23 && b.buffer_[4] == 255); && b.buffer_[2] == 48 && b.buffer_[3] == 23 && b.buffer_[4] == 255);
defaultAllocator.dispose(b); b = WriteBuffer!ubyte(2);
b = make!WriteBuffer(defaultAllocator, 2);
b ~= buf; b ~= buf;
assert(b.start == 0); assert(b.start == 0);
assert(b.capacity == 4); assert(b.capacity == 4);
assert(b.ring == 3); assert(b.ring == 3);
assert(b.position == 3); assert(b.position == 3);
defaultAllocator.dispose(b);
} }
/** /**
* Sets how many bytes were written. It will shrink the buffer * Sets how many bytes were written. It will shrink the buffer
* appropriately. Always set this property after calling * appropriately. Always call it after $(D_PSYMBOL opIndex).
* $(D_PSYMBOL buffer).
* *
* Params: * Params:
* length = Length of the written data. * length = Length of the written data.
* *
* Returns: $(D_KEYWORD this). * Returns: $(D_KEYWORD this).
*/ */
@property WriteBuffer opOpAssign(string op)(size_t length) pure nothrow @safe @nogc ref WriteBuffer opOpAssign(string op)(in size_t length)
if (op == "+") if (op == "+")
in in
{ {
@ -551,19 +570,21 @@ class WriteBuffer
{ {
auto overflow = position - afterRing; auto overflow = position - afterRing;
if (overflow > length) { if (overflow > length)
buffer_[start.. start + length] = buffer_[afterRing.. afterRing + length]; {
buffer_[afterRing.. afterRing + length] = buffer_[afterRing + length ..position]; immutable afterLength = afterRing + length;
buffer_[start .. start + length] = buffer_[afterRing .. afterLength];
buffer_[afterRing .. afterLength] = buffer_[afterLength .. position];
position -= length; position -= length;
} }
else if (overflow == length) else if (overflow == length)
{ {
buffer_[start.. start + overflow] = buffer_[afterRing..position]; buffer_[start .. start + overflow] = buffer_[afterRing .. position];
position -= overflow; position -= overflow;
} }
else else
{ {
buffer_[start.. start + overflow] = buffer_[afterRing..position]; buffer_[start .. start + overflow] = buffer_[afterRing .. position];
position = overflow; position = overflow;
} }
start += length; start += length;
@ -588,7 +609,7 @@ class WriteBuffer
/// ///
unittest unittest
{ {
auto b = defaultAllocator.make!WriteBuffer; auto b = WriteBuffer!ubyte(6);
ubyte[6] buf = [23, 23, 255, 128, 127, 9]; ubyte[6] buf = [23, 23, 255, 128, 127, 9];
b ~= buf; b ~= buf;
@ -597,8 +618,6 @@ class WriteBuffer
assert(b.length == 4); assert(b.length == 4);
b += 4; b += 4;
assert(b.length == 0); assert(b.length == 0);
defaultAllocator.dispose(b);
} }
/** /**
@ -607,62 +626,67 @@ class WriteBuffer
* After calling it, set $(D_KEYWORD +=) to the length could be * After calling it, set $(D_KEYWORD +=) to the length could be
* written. * written.
* *
* $(D_PSYMBOL buffer) may return only part of the data. You may need * $(D_PSYMBOL opIndex) may return only part of the data. You may need
* to call it (and set $(D_KEYWORD +=) several times until * to call it (and set $(D_KEYWORD +=) several times until
* $(D_PSYMBOL length) is 0. If all the data can be written, * $(D_PSYMBOL length) is 0. If all the data can be written,
* maximally 3 calls are required. * maximally 3 calls are required.
* *
* Returns: A chunk of data buffer. * Returns: A chunk of data buffer.
*/ */
@property ubyte[] opSlice(size_t start, size_t end) pure nothrow @safe @nogc T[] opSlice(in size_t start, in size_t end)
{ {
immutable internStart = this.start + start; immutable internStart = this.start + start;
if (position > ring || position < start) // Buffer overflowed if (position > ring || position < start) // Buffer overflowed
{ {
return buffer_[this.start.. ring + 1 - length + end]; return buffer_[this.start .. ring + 1 - length + end];
} }
else else
{ {
return buffer_[this.start.. this.start + end]; return buffer_[this.start .. this.start + end];
} }
} }
/// ///
unittest unittest
{ {
auto b = defaultAllocator.make!WriteBuffer(6); auto b = WriteBuffer!ubyte(6);
ubyte[6] buf = [23, 23, 255, 128, 127, 9]; ubyte[6] buf = [23, 23, 255, 128, 127, 9];
b ~= buf; b ~= buf;
assert(b[0..$] == buf[0..6]); assert(b[0 .. $] == buf[0 .. 6]);
b += 2; b += 2;
assert(b[0..$] == buf[2..6]); assert(b[0 .. $] == buf[2 .. 6]);
b ~= buf; b ~= buf;
assert(b[0..$] == buf[2..6]); assert(b[0 .. $] == buf[2 .. 6]);
b += b.length; b += b.length;
assert(b[0..$] == buf[0..6]); assert(b[0 .. $] == buf[0 .. 6]);
b += b.length; b += b.length;
defaultAllocator.dispose(b);
} }
/** /**
* After calling it, set $(D_KEYWORD +=) to the length could be * After calling it, set $(D_KEYWORD +=) to the length could be
* written. * written.
* *
* $(D_PSYMBOL buffer) may return only part of the data. You may need * $(D_PSYMBOL opIndex) may return only part of the data. You may need
* to call it (and set $(D_KEYWORD +=) several times until * to call it (and set $(D_KEYWORD +=) several times until
* $(D_PSYMBOL length) is 0. If all the data can be written, * $(D_PSYMBOL length) is 0. If all the data can be written,
* maximally 3 calls are required. * maximally 3 calls are required.
* *
* Returns: A chunk of data buffer. * Returns: A chunk of data buffer.
*/ */
@property ubyte[] opIndex() pure nothrow @safe @nogc T[] opIndex()
{ {
return opSlice(0, length); return opSlice(0, length);
} }
mixin DefaultAllocator;
}
private unittest
{
static assert(is(WriteBuffer!int));
} }

View File

@ -54,10 +54,9 @@ interface Allocator
/** /**
* The mixin generates common methods for classes and structs using * The mixin generates common methods for classes and structs using
* allocators. It provides a protected member, constructor and a read-only * allocators. It provides a protected member, constructor and a read-only property,
* property, that checks if an allocator was already set and sets it to the * that checks if an allocator was already set and sets it to the default
* default one, if not (useful for structs which don't have a default * one, if not (useful for structs which don't have a default constructor).
* constructor).
*/ */
mixin template DefaultAllocator() mixin template DefaultAllocator()
{ {