1 module jaster.stream.memory; 2 3 private 4 { 5 import std.experimental.allocator, std.experimental.allocator.gc_allocator, std.experimental.allocator.mallocator; 6 import jaster.stream.core, jaster.stream.util; 7 } 8 9 /// A `MemoryStream` that uses the GC. 10 alias MemoryStreamGC = MemoryStream!GCAllocator; 11 12 /// A `MemoryStream` that uses malloc 13 alias MemoryStreamMalloc = MemoryStream!Mallocator; 14 15 /++ 16 + A `Stream` that writes into a memory buffer. 17 + 18 + This stream supports `std.experimental.allocator` for all of it's allocations and freeing. 19 + 20 + This stream will automatically call `dispose` during it's deconstructor. 21 + 22 + Outside of exceptions, this stream doesn't allocate GC memory unless the given allocator does. 23 + ++/ 24 class MemoryStream(Alloc) : Stream 25 { 26 private 27 { 28 ubyte[] _buffer; 29 bool _isDisposed; 30 size_t _position; 31 AllocHelper!Alloc _alloc; 32 } 33 34 static if(_alloc.IsStatic) 35 this() 36 { 37 super(Stream.Can.Write | Stream.Can.Read | Stream.Can.Seek); 38 } 39 40 static if(!_alloc.IsStatic) 41 this(Alloc alloc) 42 { 43 this._alloc.alloc = alloc; 44 super(Stream.Can.Write | Stream.Can.Read | Stream.Can.Seek); 45 } 46 47 ~this() 48 { 49 this.dispose(); 50 } 51 52 public override 53 { 54 const(ubyte[]) write(scope const ubyte[] data) 55 { 56 enforceNotDisposed(this); 57 enforceCanWrite(this); 58 59 auto end = (this.position + data.length); 60 if(end > this.length) 61 this.length = end; 62 63 this._buffer[this.position..end] = data[]; 64 this.position = end; 65 66 return data; 67 } 68 69 ubyte[] readToBuffer(scope ref ubyte[] buffer) 70 { 71 enforceNotDisposed(this); 72 enforceCanRead(this); 73 74 auto end = (this.position + buffer.length); 75 if(end > this.length) 76 end = this.length; 77 78 auto amount = (end - this.position); 79 80 buffer[0..amount] = this._buffer[this.position..end]; 81 this.position = end; 82 83 return buffer[0..amount]; 84 } 85 86 void dispose() 87 { 88 if(!this.isDisposed) 89 { 90 this._isDisposed = true; 91 this._alloc.dispose(this._buffer); 92 } 93 } 94 95 void flush(){} 96 97 void seek(SeekFrom from, ptrdiff_t amount) 98 { 99 auto start = (from == SeekFrom.Start) 100 ? 0 101 : (from == SeekFrom.Current) 102 ? this.position 103 : this.length; 104 this.position = start + amount; 105 } 106 107 @property 108 bool isDisposed() const 109 { 110 return this._isDisposed; 111 } 112 113 @property 114 void writeTimeout(Duration dur) 115 { 116 enforceNotDisposed(this); 117 throw new StreamCannotTimeoutException(); 118 } 119 120 @property 121 Duration writeTimeout() 122 { 123 enforceNotDisposed(this); 124 throw new StreamCannotTimeoutException(); 125 } 126 127 @property 128 void readTimeout(Duration dur) 129 { 130 enforceNotDisposed(this); 131 throw new StreamCannotTimeoutException(); 132 } 133 134 @property 135 Duration readTimeout() 136 { 137 enforceNotDisposed(this); 138 throw new StreamCannotTimeoutException(); 139 } 140 141 @property 142 void length(size_t size) 143 { 144 enforceNotDisposed(this); 145 146 if(this._buffer.ptr is null) 147 { 148 this._buffer = this._alloc.makeArray!ubyte(size); 149 } 150 else if(size > this.length) 151 { 152 auto success = this._alloc.expandArray(this._buffer, (size - this.length)); 153 assert(success); 154 } 155 else if(size < this.length) 156 { 157 auto success = this._alloc.shrinkArray(this._buffer, (this.length - size)); 158 assert(success); 159 } 160 } 161 162 @property 163 size_t length() 164 { 165 enforceNotDisposed(this); 166 return this._buffer.length; 167 } 168 169 @property 170 void position(size_t pos) 171 { 172 import std.exception : enforce; 173 174 enforceNotDisposed(this); 175 enforce!StreamException(pos <= this.length, "Attempted to seek past the end of the stream."); 176 this._position = pos; 177 } 178 179 @property 180 size_t position() 181 { 182 enforceNotDisposed(this); 183 return this._position; 184 } 185 } 186 187 public 188 { 189 /++ 190 + Gets the internal buffer of the stream. 191 + 192 + Notes: 193 + While this function by itself is `@safe`, usage of this function is unsafe. 194 + 195 + This is because the buffer can be reallocated and deallocated during certain usage of the stream, so 196 + the slice returned by this function can point to freed/invalid memory, causing a crash (hopefully). 197 + ++/ 198 @property @nogc 199 inout(ubyte[]) data() nothrow inout 200 { 201 return this._buffer; 202 } 203 } 204 } 205 /// 206 unittest 207 { 208 import std.experimental.allocator.mallocator : Mallocator; 209 import std.experimental.allocator.building_blocks : StatsCollector, Options; 210 211 alias Alloc = StatsCollector!(Mallocator, Options.all, Options.all); 212 alias MStream = MemoryStream!(Alloc*); 213 auto alloc = new Alloc(); 214 215 auto stream = new MStream(alloc); 216 assert(stream.position == 0); 217 assert(stream.length == 0); 218 assert(stream.canWrite); 219 assert(stream.canRead); 220 assert(stream.canSeek); 221 assert(!stream.canTimeout); 222 assert(!stream.isDisposed); 223 224 stream.write([0, 0, 0, 0]); // Testing to see if it cleans memory after use. 225 stream.dispose(); 226 assert(stream.isDisposed); 227 228 stream = new MStream(alloc); 229 stream.write([0, 1, 2, 3, 4]); 230 assert(stream.position == 5); 231 assert(stream.length == 5); 232 assert(stream.data == [0, 1, 2, 3, 4]); 233 234 stream.length = 20; 235 assert(stream.length == 20); 236 237 stream.length = 5; 238 assert(stream.length == 5); 239 240 stream.position = stream.position - 2; 241 assert(stream.read(2) == [3, 4]); 242 243 assert(alloc.bytesUsed > 0); 244 destroy(stream); // To simulate the GC collecting it 245 // import std.stdio; 246 // alloc.reportStatistics(stdout); 247 248 assert(alloc.bytesUsed == 0); 249 } 250 251 // CopyTo test 252 unittest 253 { 254 import std.experimental.allocator.mallocator : Mallocator; 255 import std.experimental.allocator.building_blocks : StatsCollector, Options; 256 257 alias Alloc = StatsCollector!(Mallocator, Options.all, Options.all); 258 alias MStream = MemoryStream!(Alloc*); 259 auto alloc = new Alloc(); 260 261 auto master = new MStream(alloc); 262 auto slave = new MStream(alloc); 263 264 void doAssert(ubyte[] buffer1 = null, ubyte[] buffer2 = null) 265 { 266 import std.format : format; 267 268 if(buffer1 is null) 269 buffer1 = slave.data; 270 271 if(buffer2 is null) 272 buffer2 = master.data; 273 274 assert(buffer1 == buffer2, format("%s\n\n\n%s", buffer1, buffer2)); 275 } 276 277 // Optimisation example: 278 // Without the line below, `alloc` reports 1000 reallocations. 279 // With it, it reports a single one. 280 master.length = 1_000; 281 foreach(i; 0..master.length) 282 { 283 auto data = cast(ubyte)i; 284 master.write((&data)[0..1]); 285 } 286 assert(master.position == master.length); 287 288 // Test copyToAlloc, with a small buffer, and a static allocator. 289 master.position = 0; 290 master.copyToAlloc!Mallocator(slave, 32); 291 assert(master.position == master.length); 292 doAssert(); 293 294 // Test copyToAlloc, with a large buffer, and a non-static allocator. 295 master.position = 0; 296 slave.length = 0; 297 slave.position = 0; 298 master.copyToAlloc(slave, 8126, alloc); 299 assert(master.position == master.length); 300 doAssert(); 301 302 // Test copyToStack, with a small buffer, with the master stream not at the start. 303 master.position = master.length / 2; 304 slave.length = 0; 305 slave.position = 0; 306 master.copyToStack!64(slave); 307 assert(master.position == master.length); 308 assert(slave.length == master.length / 2); 309 doAssert(slave.data, master.data[master.length/2..$]); 310 311 // Test copyToGC, with an average buffer, with the slave stream not at the start. 312 master.position = 0; 313 doAssert(slave.data, master.data[$/2..$]); 314 master.copyToGC(slave, 512); 315 assert(master.position == master.length); 316 assert(slave.length == master.length + (master.length / 2)); 317 doAssert(slave.data[0..master.length/2], master.data[$/2..$]); 318 doAssert(slave.data[master.length/2..$], master.data); 319 320 master.dispose(); 321 slave.dispose(); 322 323 // import std.stdio; 324 // alloc.reportStatistics(stdout); 325 assert(alloc.bytesUsed == 0); 326 }