1 module jaster.stream.memory;
2 
3 private
4 {
5     import std.experimental.allocator, std.experimental.allocator.gc_allocator, std.experimental.allocator.mallocator;
6     import jaster.stream.core, jaster.stream.util;
7 }
8 
9 /// A `MemoryStream` that uses the GC.
10 alias MemoryStreamGC = MemoryStream!GCAllocator;
11 
12 /// A `MemoryStream` that uses malloc
13 alias MemoryStreamMalloc = MemoryStream!Mallocator;
14 
15 /++
16  + A `Stream` that writes into a memory buffer.
17  +
18  + This stream supports `std.experimental.allocator` for all of it's allocations and freeing.
19  +
20  + This stream will automatically call `dispose` during it's deconstructor.
21  +
22  + Outside of exceptions, this stream doesn't allocate GC memory unless the given allocator does.
23  + ++/
24 class MemoryStream(Alloc) : Stream
25 {
26     private
27     {
28         ubyte[]           _buffer;
29         bool              _isDisposed;
30         size_t            _position;
31         AllocHelper!Alloc _alloc;
32     }
33 
34     static if(_alloc.IsStatic)
35     this()
36     {
37         super(Stream.Can.Write | Stream.Can.Read | Stream.Can.Seek);
38     }
39 
40     static if(!_alloc.IsStatic)
41     this(Alloc alloc)
42     {
43         this._alloc.alloc = alloc;
44         super(Stream.Can.Write | Stream.Can.Read | Stream.Can.Seek);
45     }
46 
47     ~this()
48     {
49         this.dispose();
50     }
51 
52     public override
53     {
54         const(ubyte[]) write(scope const ubyte[] data)
55         {
56             enforceNotDisposed(this);
57             enforceCanWrite(this);
58 
59             auto end = (this.position + data.length);
60             if(end > this.length)
61                 this.length = end;
62 
63             this._buffer[this.position..end] = data[];
64             this.position = end;
65 
66             return data;
67         }
68 
69         ubyte[] readToBuffer(scope ref ubyte[] buffer)
70         {
71             enforceNotDisposed(this);
72             enforceCanRead(this);
73 
74             auto end = (this.position + buffer.length);
75             if(end > this.length)
76                 end = this.length;
77 
78             auto amount = (end - this.position);
79 
80             buffer[0..amount] = this._buffer[this.position..end];
81             this.position = end;
82 
83             return buffer[0..amount];
84         }
85 
86         void dispose()
87         {
88             if(!this.isDisposed)
89             {
90                 this._isDisposed = true;
91                 this._alloc.dispose(this._buffer);
92             }
93         }
94 
95         void flush(){}
96 
97         void seek(SeekFrom from, ptrdiff_t amount)
98         {
99             auto start = (from == SeekFrom.Start)
100                          ? 0
101                          : (from == SeekFrom.Current)
102                             ? this.position
103                             : this.length;
104             this.position = start + amount;
105         }
106 
107         @property
108         bool isDisposed() const
109         {
110             return this._isDisposed;
111         }
112 
113         @property
114         void writeTimeout(Duration dur)
115         {
116             enforceNotDisposed(this);
117             throw new StreamCannotTimeoutException();
118         }
119 
120         @property
121         Duration writeTimeout()
122         {
123             enforceNotDisposed(this);
124             throw new StreamCannotTimeoutException();
125         }
126 
127         @property
128         void readTimeout(Duration dur)
129         {
130             enforceNotDisposed(this);
131             throw new StreamCannotTimeoutException();
132         }
133 
134         @property
135         Duration readTimeout()
136         {
137             enforceNotDisposed(this);
138             throw new StreamCannotTimeoutException();
139         }
140 
141         @property
142         void length(size_t size)
143         {
144             enforceNotDisposed(this);
145 
146             if(this._buffer.ptr is null)
147             {
148                 this._buffer = this._alloc.makeArray!ubyte(size);
149             }
150             else if(size > this.length)
151             {
152                 auto success = this._alloc.expandArray(this._buffer, (size - this.length));
153                 assert(success);
154             }
155             else if(size < this.length)
156             {
157                 auto success = this._alloc.shrinkArray(this._buffer, (this.length - size));
158                 assert(success);
159             }
160         }
161 
162         @property
163         size_t length()
164         {
165             enforceNotDisposed(this);
166             return this._buffer.length;
167         }
168 
169         @property
170         void position(size_t pos)
171         {
172             import std.exception : enforce;
173 
174             enforceNotDisposed(this);
175             enforce!StreamException(pos <= this.length, "Attempted to seek past the end of the stream.");
176             this._position = pos;
177         }
178 
179         @property
180         size_t position()
181         {
182             enforceNotDisposed(this);
183             return this._position;
184         }
185     }
186 
187     public
188     {
189         /++
190          + Gets the internal buffer of the stream.
191          +
192          + Notes:
193          +  While this function by itself is `@safe`, usage of this function is unsafe.
194          +
195          +  This is because the buffer can be reallocated and deallocated during certain usage of the stream, so
196          +  the slice returned by this function can point to freed/invalid memory, causing a crash (hopefully).
197          + ++/
198         @property @nogc
199         inout(ubyte[]) data() nothrow inout
200         {
201             return this._buffer;
202         }
203     }
204 }
205 ///
206 unittest
207 {
208     import std.experimental.allocator.mallocator      : Mallocator;
209     import std.experimental.allocator.building_blocks : StatsCollector, Options;
210 
211     alias Alloc   = StatsCollector!(Mallocator, Options.all, Options.all);
212     alias MStream = MemoryStream!(Alloc*);
213     auto alloc    = new Alloc();
214 
215     auto stream = new MStream(alloc);
216     assert(stream.position == 0);
217     assert(stream.length   == 0);
218     assert(stream.canWrite);
219     assert(stream.canRead);
220     assert(stream.canSeek);
221     assert(!stream.canTimeout);
222     assert(!stream.isDisposed);
223 
224     stream.write([0, 0, 0, 0]); // Testing to see if it cleans memory after use.
225     stream.dispose();
226     assert(stream.isDisposed);
227 
228     stream = new MStream(alloc);
229     stream.write([0, 1, 2, 3, 4]);
230     assert(stream.position == 5);
231     assert(stream.length == 5);
232     assert(stream.data == [0, 1, 2, 3, 4]);
233 
234     stream.length = 20;
235     assert(stream.length == 20);
236 
237     stream.length = 5;
238     assert(stream.length == 5);
239 
240     stream.position = stream.position - 2;
241     assert(stream.read(2) == [3, 4]);
242 
243     assert(alloc.bytesUsed > 0);
244     destroy(stream); // To simulate the GC collecting it
245     // import std.stdio;
246     // alloc.reportStatistics(stdout);
247     
248     assert(alloc.bytesUsed == 0);
249 }
250 
251 // CopyTo test
252 unittest
253 {
254     import std.experimental.allocator.mallocator      : Mallocator;
255     import std.experimental.allocator.building_blocks : StatsCollector, Options;
256 
257     alias Alloc   = StatsCollector!(Mallocator, Options.all, Options.all);
258     alias MStream = MemoryStream!(Alloc*);
259     auto alloc    = new Alloc();
260 
261     auto master = new MStream(alloc);
262     auto slave  = new MStream(alloc);
263 
264     void doAssert(ubyte[] buffer1 = null, ubyte[] buffer2 = null)
265     {
266         import std.format : format;
267 
268         if(buffer1 is null)
269             buffer1 = slave.data;
270         
271         if(buffer2 is null)
272             buffer2 = master.data;
273 
274         assert(buffer1 == buffer2, format("%s\n\n\n%s", buffer1, buffer2));
275     }
276 
277     // Optimisation example:
278     //  Without the line below, `alloc` reports 1000 reallocations.
279     //  With it, it reports a single one.
280     master.length = 1_000;
281     foreach(i; 0..master.length)
282     {
283         auto data = cast(ubyte)i;
284         master.write((&data)[0..1]);
285     }
286     assert(master.position == master.length);
287 
288     // Test copyToAlloc, with a small buffer, and a static allocator.
289     master.position = 0;
290     master.copyToAlloc!Mallocator(slave, 32);
291     assert(master.position == master.length);
292     doAssert();
293 
294     // Test copyToAlloc, with a large buffer, and a non-static allocator.
295     master.position = 0;
296     slave.length    = 0;
297     slave.position  = 0;
298     master.copyToAlloc(slave, 8126, alloc);
299     assert(master.position == master.length);
300     doAssert();
301 
302     // Test copyToStack, with a small buffer, with the master stream not at the start.
303     master.position = master.length / 2;
304     slave.length    = 0;
305     slave.position  = 0;
306     master.copyToStack!64(slave);
307     assert(master.position == master.length);
308     assert(slave.length    == master.length / 2);
309     doAssert(slave.data, master.data[master.length/2..$]);
310 
311     // Test copyToGC, with an average buffer, with the slave stream not at the start.
312     master.position = 0;
313     doAssert(slave.data, master.data[$/2..$]);
314     master.copyToGC(slave, 512);
315     assert(master.position == master.length);
316     assert(slave.length    == master.length + (master.length / 2));
317     doAssert(slave.data[0..master.length/2], master.data[$/2..$]);
318     doAssert(slave.data[master.length/2..$], master.data);
319 
320     master.dispose();
321     slave.dispose();
322 
323     // import std.stdio;
324     // alloc.reportStatistics(stdout);
325     assert(alloc.bytesUsed == 0);
326 }