Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include "MantidKernel/DiskBuffer.h"
#include "MantidKernel/System.h"
#include <iostream>
#include <sstream>
using namespace Mantid::Kernel;
namespace Mantid
{
namespace Kernel
{
//----------------------------------------------------------------------------------------------
/** Constructor
*/
DiskBuffer::DiskBuffer() :
m_useWriteBuffer(false),
m_writeBufferSize(50),
m_writeBuffer_byId( m_writeBuffer.get<1>() ),
m_writeBufferUsed(0),
m_free(),
m_free_bySize( m_free.get<1>() ),
m_fileLength(0)
{
m_free.clear();
}
//----------------------------------------------------------------------------------------------
/** Constructor
*
* @param m_writeBufferSize :: Amount of memory to accumulate in the write buffer before writing.
* @return
*/
DiskBuffer::DiskBuffer(uint64_t m_writeBufferSize) :
m_useWriteBuffer(m_writeBufferSize > 0),
m_writeBufferSize(m_writeBufferSize),
m_writeBuffer_byId( m_writeBuffer.get<1>() ),
m_writeBufferUsed(0),
m_free(),
m_free_bySize( m_free.get<1>() ),
m_fileLength(0)
{
m_free.clear();
}
//----------------------------------------------------------------------------------------------
/** Destructor
*/
DiskBuffer::~DiskBuffer()
{
}
//---------------------------------------------------------------------------------------------
/** Call this method when an object is ready to be written
* out to disk.
*
* When the to-write buffer is full, all of it gets written
* out to disk using writeOldObjects()
*
* @param item :: item that can be written to disk.
*/
void DiskBuffer::toWrite(const ISaveable * item)
{
if (item == NULL) return;
if (!m_useWriteBuffer) return;
m_mutex.lock();
// And put it in the queue of stuff to write.
// std::cout << "DiskBuffer adding ID " << item->getId() << " to current size " << m_writeBuffer.size() << std::endl;
std::pair<writeBuffer_t::iterator,bool> result = m_writeBuffer.insert(item);
// Result.second is FALSE if the item was already there
if (result.second)
{
// Track the memory change
m_writeBufferUsed += item->getMRUMemorySize();
// Should we now write out the old data?
if (m_writeBufferUsed >= m_writeBufferSize)
writeOldObjects();
}
m_mutex.unlock();
}
//---------------------------------------------------------------------------------------------
/** Call this method when an object that might be in the cache
* is getting deleted.
* The object is removed from the to-write buffer (if present).
* The space it uses on disk is marked as free.
*
* @param item :: ISaveable object that is getting deleted.
* @param sizeOnFile :: size that the object used on file. This amount of space is marked as "free"
*/
void DiskBuffer::objectDeleted(const ISaveable * item)
// const uint64_t sizeOnFile
size_t id = item->getId();
uint64_t size = item->getMRUMemorySize();
m_mutex.lock();
// Take it out of the to-write buffer
writeBuffer_byId_t::iterator it2 = m_writeBuffer_byId.find(id);
if (it2 != m_writeBuffer_byId.end())
{
m_writeBuffer_byId.erase(it2);
m_writeBufferUsed -= size;
}
m_mutex.unlock();
//std::cout << "DiskBuffer deleting ID " << item->getId() << "; new size " << m_writeBuffer.size() << std::endl;
// Mark the amount of space used on disk as free
//this->freeBlock(item->getFilePosition(), sizeOnFile);
this->freeBlock(item->getFilePosition(), size);
}
//---------------------------------------------------------------------------------------------
/** Method to write out the old objects that have been
* stored in the "toWrite" buffer.
*/
void DiskBuffer::writeOldObjects()
{
if (m_writeBufferUsed > 0)
std::cout << "DiskBuffer:: Writing out " << m_writeBufferUsed << " events in " << m_writeBuffer.size() << " blocks." << std::endl;
// std::cout << getMemoryStr() << std::endl;
// std::cout << getFreeSpaceMap().size() << " entries in the free size map." << std::endl;
// for (freeSpace_t::iterator it = m_free.begin(); it != m_free.end(); it++)
// std::cout << " Free : " << it->getFilePosition() << " size " << it->getSize() << std::endl;
// std::cout << m_fileLength << " length of file" << std::endl;
// Holder for any objects that you were NOT able to write.
writeBuffer_t couldNotWrite;
size_t memoryNotWritten = 0;
// Prevent simultaneous file access (e.g. write while loading)
m_fileMutex.lock();
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Iterate through the map
writeBuffer_t::iterator it = m_writeBuffer.begin();
writeBuffer_t::iterator it_end = m_writeBuffer.end();
const ISaveable * obj = NULL;
for (; it != it_end; ++it)
{
obj = *it;
if (!obj->dataBusy())
{
// Write to the disk
obj->save();
}
else
{
// The object is busy, can't write. Save it for later
//couldNotWrite.insert( pairObj_t(obj->getFilePosition(), obj) );
couldNotWrite.insert( obj );
memoryNotWritten += obj->getMRUMemorySize();
}
}
if (obj)
{
// NXS needs to flush the writes to file by closing and re-opening the data block.
// For speed, it is best to do this only once per write dump.
obj->flushData();
}
// Exchange with the new map you built out of the not-written blocks.
m_writeBuffer.swap(couldNotWrite);
m_writeBufferUsed = memoryNotWritten;
m_fileMutex.unlock();
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
}
//---------------------------------------------------------------------------------------------
/** Flush out all the data in the memory; and writes out everything in the to-write cache.
* Mostly used for debugging and unit tests */
void DiskBuffer::flushCache()
{
m_mutex.lock();
// Now write everything out.
writeOldObjects();
m_mutex.unlock();
}
//---------------------------------------------------------------------------------------------
/** This method is called by an ISaveable object that has shrunk
* and so has left a bit of free space after itself on the file;
* or when an object gets moved to a new spot.
*
* @param pos :: position in the file of the START of the new free block
* @param size :: size of the free block
*/
void DiskBuffer::freeBlock(uint64_t const pos, uint64_t const size)
{
if (size == 0) return;
m_freeMutex.lock();
// Make the block
FreeBlock newBlock(pos, size);
// Insert it
std::pair<freeSpace_t::iterator,bool> p = m_free.insert( newBlock );
// Failed insert? Should not happen since the map is NOT unique
// Or, if the map has only 1 item then it cannot do any merging. This solves a hanging bug in MacOS. Refs #3652
if (!p.second || m_free.size() <= 1)
{
m_freeMutex.unlock();
return;
}
// This is where we inserted
freeSpace_t::iterator it = p.first;
if (it != m_free.begin())
{
freeSpace_t::iterator it_before = it; --it_before;
// There is a block before
FreeBlock block_before = *it_before;
if (FreeBlock::merge(block_before, newBlock))
{
// Change the map by replacing the old "before" block with the new merged one
m_free.replace(it_before, block_before);
// Remove the block we just inserted
m_free.erase(it);
// For cases where the new block was between two blocks.
newBlock = block_before;
it = it_before;
}
}
// Get an iterator to the block AFTER this one
freeSpace_t::iterator it_after = it; ++it_after;
// There is a block after
if (it_after != m_free.end())
{
FreeBlock block_after = *it_after;
if (FreeBlock::merge(newBlock, block_after))
{
// Change the map by replacing the old "new" block with the new merged one
m_free.replace(it, newBlock);
// Remove the block that was after this one
m_free.erase(it_after);
}
}
m_freeMutex.unlock();
}
//---------------------------------------------------------------------------------------------
/** Method that defrags free blocks by combining adjacent ones together
* NOTE: This is not necessary to run since the freeBlock() methods
* automatically defrags neighboring blocks.
*/
void DiskBuffer::defragFreeBlocks()
{
m_freeMutex.lock();
freeSpace_t::iterator it = m_free.begin();
FreeBlock thisBlock;
thisBlock = *it;
while (it != m_free.end())
{
// Get iterator to the block after "it".
freeSpace_t::iterator it_after = it;
++it_after;
FreeBlock block_after = *it_after;
if (FreeBlock::merge(thisBlock, *it_after))
{
// Change the map by replacing the old "before" block with the new merged one
m_free.replace(it, thisBlock);
// Remove the block that was merged out
m_free.erase(it_after);
// And stay at this iterator to
}
else
{
// Move on to the next block
++it;
thisBlock = *it;
}
}
m_freeMutex.unlock();
}
//---------------------------------------------------------------------------------------------
/** Allocate a block of the given size in a free spot in the file,
* or at the end of the file if there is no space.
*
* @param newSize :: new size of the data
* @return a new position at which the data can be saved.
*/
uint64_t DiskBuffer::allocate(uint64_t const newSize)
{
m_freeMutex.lock();
// Now, find the first available block of sufficient size.
freeSpace_bySize_t::iterator it;
bool putAtFileEnd = true;
if (m_free.size() > 0)
{
// Unless there is nothing in the free space map
it = m_free_bySize.lower_bound( newSize );
putAtFileEnd = (it == m_free_bySize.end());
}
if (putAtFileEnd)
{
// No block found
// Go to the end of the file.
uint64_t retVal = m_fileLength;
// And we assume the file will grow by this much.
m_fileLength += newSize;
// Will place the new block at the end of the file
m_freeMutex.unlock();
return retVal;
}
else
{
// std::cout << "Block found for allocate " << newSize << std::endl;
uint64_t foundPos = it->getFilePosition();
uint64_t foundSize = it->getSize();
// Remove the free block you found - it is no longer free
m_free_bySize.erase(it);
m_freeMutex.unlock();
// Block was too large - free the bit of space after it.
if (foundSize > newSize)
{
this->freeBlock( foundPos + newSize, foundSize - newSize );
}
return foundPos;
}
}
//---------------------------------------------------------------------------------------------
/** This method is called by an ISaveable object that has outgrown
* its space allocated on file and needs to relocate.
*
* This should only be called when the MRU is saving a block to disk,
* i.e. the ISaveable cannot be in either the MRU buffer or the toWrite buffer.
*
* @param oldPos :: original position in the file
* @param oldSize :: original size in the file. This will be marked as "free"
* @param newSize :: new size of the data
* @return a new position at which the data can be saved.
*/
uint64_t DiskBuffer::relocate(uint64_t const oldPos, uint64_t const oldSize, const uint64_t newSize)
{
//std::cout << "Relocating " << oldPos << ", " << oldSize << ", " << newSize << std::endl;
// First, release the space in the old block.
this->freeBlock(oldPos, oldSize);
return this->allocate(newSize);
}
//---------------------------------------------------------------------------------------------
/** Returns a vector with two entries per free block: position and size.
* @param[out] free :: vector to fill */
void DiskBuffer::getFreeSpaceVector(std::vector<uint64_t> & free) const
{
free.reserve( m_free.size() * 2);
freeSpace_bySize_t::const_iterator it = m_free_bySize.begin();
freeSpace_bySize_t::const_iterator it_end = m_free_bySize.end();
for (; it != it_end; ++it)
{
free.push_back(it->getFilePosition());
free.push_back(it->getSize());
}
}
/// @return a string describing the memory buffers, for debugging.
std::string DiskBuffer::getMemoryStr() const
{
std::ostringstream mess;
mess << "Buffer: " << m_writeBufferUsed << " in " << m_writeBuffer.size() << " blocks. ";
return mess.str();
}
} // namespace Mantid
} // namespace Kernel