Newer
Older
#include "MantidKernel/DiskBuffer.h"
using namespace Mantid::Kernel;
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
namespace Mantid {
namespace Kernel {
//----------------------------------------------------------------------------------------------
/** Constructor
*/
DiskBuffer::DiskBuffer()
: m_writeBufferSize(50), m_writeBufferUsed(0), m_nObjectsToWrite(0),
m_free(), m_free_bySize(m_free.get<1>()), m_fileLength(0) {
m_free.clear();
}
//----------------------------------------------------------------------------------------------
/** Constructor
*
* @param m_writeBufferSize :: Amount of memory to accumulate in the write
*buffer before writing.
* @return
*/
DiskBuffer::DiskBuffer(uint64_t m_writeBufferSize)
: m_writeBufferSize(m_writeBufferSize), m_writeBufferUsed(0),
m_nObjectsToWrite(0), m_free(), m_free_bySize(m_free.get<1>()),
m_fileLength(0) {
m_free.clear();
}
//----------------------------------------------------------------------------------------------
/** Destructor
*/
DiskBuffer::~DiskBuffer() {}
//---------------------------------------------------------------------------------------------
/** Call this method when an object is ready to be written
* out to disk.
*
* When the to-write buffer is full, all of it gets written
* out to disk using writeOldObjects()
*
* @param item :: item that can be written to disk.
*/
void DiskBuffer::toWrite(ISaveable *item) {
if (item == NULL)
return;
// if (!m_useWriteBuffer) return;
if (item->getBufPostion()) // already in the buffer and probably have changed
// its size in memory
m_writeBufferUsed -= item->getBufferSize();
// add new size
size_t newMemorySize = item->getDataMemorySize();
m_writeBufferUsed += newMemorySize;
m_mutex.unlock();
item->setBufferSize(newMemorySize);
} else {
m_mutex.lock();
m_toWriteBuffer.push_front(item);
m_writeBufferUsed += item->setBufferPosition(m_toWriteBuffer.begin());
m_nObjectsToWrite++;
m_mutex.unlock();
// Should we now write out the old data?
if (m_writeBufferUsed > m_writeBufferSize)
writeOldObjects();
}
//---------------------------------------------------------------------------------------------
/** Call this method when an object that might be in the cache
* is getting deleted.
* The object is removed from the to-write buffer (if present).
* The space it uses on disk is marked as free.
*
* @param item :: ISaveable object that is getting deleted.
*/
void DiskBuffer::objectDeleted(ISaveable *item) {
if (item == NULL)
return;
// have it ever been in the buffer?
m_mutex.lock();
auto opt2it = item->getBufPostion();
if (opt2it) {
m_writeBufferUsed -= item->getBufferSize();
m_toWriteBuffer.erase(*opt2it);
} else {
m_mutex.unlock();
return;
}
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// indicate to the object that it is not stored in memory any more
item->clearBufferState();
m_mutex.unlock();
// Mark the amount of space used on disk as free
if (item->wasSaved())
this->freeBlock(item->getFilePosition(), item->getFileSize());
}
//---------------------------------------------------------------------------------------------
/** Method to write out the old objects that have been
* stored in the "toWrite" buffer.
*/
void DiskBuffer::writeOldObjects() {
Poco::ScopedLock<Kernel::Mutex> _lock(m_mutex);
// Holder for any objects that you were NOT able to write.
std::list<ISaveable *> couldNotWrite;
size_t objectsNotWritten(0);
size_t memoryNotWritten(0);
// Iterate through the list
auto it = m_toWriteBuffer.begin();
auto it_end = m_toWriteBuffer.end();
ISaveable *obj = NULL;
for (; it != it_end; ++it) {
obj = *it;
if (!obj->isBusy()) {
uint64_t NumObjEvents = obj->getTotalDataSize();
uint64_t fileIndexStart;
if (!obj->wasSaved()) {
fileIndexStart = this->allocate(NumObjEvents);
// Write to the disk; this will call the object specific save function;
// Prevent simultaneous file access (e.g. write while loading)
obj->saveAt(fileIndexStart, NumObjEvents);
} else {
uint64_t NumFileEvents = obj->getFileSize();
if (NumObjEvents != NumFileEvents) {
// Event list changed size. The MRU can tell us where it best fits
// now.
fileIndexStart = this->relocate(obj->getFilePosition(), NumFileEvents,
NumObjEvents);
// Write to the disk; this will call the object specific save
// function;
obj->saveAt(fileIndexStart, NumObjEvents);
} else // despite object size have not been changed, it can be modified
// other way. In this case, the method which changed the data
// should set dataChanged ID
if (obj->isDataChanged()) {
fileIndexStart = obj->getFilePosition();
// Write to the disk; this will call the object specific save
// function;
obj->saveAt(fileIndexStart, NumObjEvents);
// this is questionable operation, which adjust file size in case
// when the file postions were allocated externaly
if (fileIndexStart + NumObjEvents > m_fileLength)
m_fileLength = fileIndexStart + NumObjEvents;
} else // just clean the object up -- it just occupies memory
obj->clearDataFromMemory();
// tell the object that it has been removed from the buffer
obj->clearBufferState();
} else // object busy
// The object is busy, can't write. Save it for later
couldNotWrite.push_back(obj);
// When a prefix or postfix operator is applied to a function argument,
// the value of the argument is
// NOT GUARANTEED to be incremented or decremented before it is passed to
// the function.
std::list<ISaveable *>::iterator it = --couldNotWrite.end();
memoryNotWritten += obj->setBufferPosition(it);
objectsNotWritten++;
}
}
// use last object to clear NeXus buffer and actually write data to HDD
if (obj) {
// NXS needs to flush the writes to file by closing and re-opening the data
// block.
// For speed, it is best to do this only once per write dump, using last
// object saved
obj->flushData();
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
// Exchange with the new map you built out of the not-written blocks.
m_toWriteBuffer.swap(couldNotWrite);
m_writeBufferUsed = memoryNotWritten;
m_nObjectsToWrite = objectsNotWritten;
}
//---------------------------------------------------------------------------------------------
/** Flush out all the data in the memory; and writes out everything in the
* to-write cache. */
void DiskBuffer::flushCache() {
// Now write everything out.
writeOldObjects();
}
//---------------------------------------------------------------------------------------------
/** This method is called by this->relocate when object that has shrunk
* and so has left a bit of free space after itself on the file;
* or when an object gets moved to a new spot.
*
* @param pos :: position in the file of the START of the new free block
* @param size :: size of the free block
*/
void DiskBuffer::freeBlock(uint64_t const pos, uint64_t const size) {
if (size == 0 || size == std::numeric_limits<uint64_t>::max())
return;
m_freeMutex.lock();
// Make the block
FreeBlock newBlock(pos, size);
// Insert it
std::pair<freeSpace_t::iterator, bool> p = m_free.insert(newBlock);
// Failed insert? Should not happen since the map is NOT unique
// Or, if the map has only 1 item then it cannot do any merging. This solves a
// hanging bug in MacOS. Refs #3652
if (!p.second || m_free.size() <= 1) {
m_freeMutex.unlock();
// This is where we inserted
freeSpace_t::iterator it = p.first;
if (it != m_free.begin()) {
freeSpace_t::iterator it_before = it;
--it_before;
// There is a block before
FreeBlock block_before = *it_before;
if (FreeBlock::merge(block_before, newBlock)) {
// Change the map by replacing the old "before" block with the new merged
// one
m_free.replace(it_before, block_before);
// Remove the block we just inserted
m_free.erase(it);
// For cases where the new block was between two blocks.
newBlock = block_before;
it = it_before;
}
}
// Get an iterator to the block AFTER this one
freeSpace_t::iterator it_after = it;
++it_after;
// There is a block after
if (it_after != m_free.end()) {
FreeBlock block_after = *it_after;
if (FreeBlock::merge(newBlock, block_after)) {
// Change the map by replacing the old "new" block with the new merged one
m_free.replace(it, newBlock);
// Remove the block that was after this one
m_free.erase(it_after);
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
m_freeMutex.unlock();
}
//---------------------------------------------------------------------------------------------
/** Method that defrags free blocks by combining adjacent ones together
* NOTE: This is not necessary to run since the freeBlock() methods
* automatically defrags neighboring blocks.
*/
void DiskBuffer::defragFreeBlocks() {
m_freeMutex.lock();
freeSpace_t::iterator it = m_free.begin();
FreeBlock thisBlock;
thisBlock = *it;
while (it != m_free.end()) {
// Get iterator to the block after "it".
freeSpace_t::iterator it_after = it;
++it_after;
FreeBlock block_after = *it_after;
if (FreeBlock::merge(thisBlock, *it_after)) {
// Change the map by replacing the old "before" block with the new merged
// one
m_free.replace(it, thisBlock);
// Remove the block that was merged out
m_free.erase(it_after);
// And stay at this iterator to
} else {
// Move on to the next block
++it;
thisBlock = *it;
}
}
m_freeMutex.unlock();
}
//---------------------------------------------------------------------------------------------
/** Allocate a block of the given size in a free spot in the file,
* or at the end of the file if there is no space.
*
* @param newSize :: new size of the data
* @return a new position at which the data can be saved.
*/
uint64_t DiskBuffer::allocate(uint64_t const newSize) {
m_freeMutex.lock();
// Now, find the first available block of sufficient size.
freeSpace_bySize_t::iterator it;
bool putAtFileEnd = true;
if (m_free.size() > 0) {
// Unless there is nothing in the free space map
it = m_free_bySize.lower_bound(newSize);
putAtFileEnd = (it == m_free_bySize.end());
if (putAtFileEnd) {
// No block found
// Go to the end of the file.
uint64_t retVal = m_fileLength;
// And we assume the file will grow by this much.
m_fileLength += newSize;
// Will place the new block at the end of the file
m_freeMutex.unlock();
return retVal;
} else {
// std::cout << "Block found for allocate " << newSize << std::endl;
uint64_t foundPos = it->getFilePosition();
uint64_t foundSize = it->getSize();
// Remove the free block you found - it is no longer free
m_free_bySize.erase(it);
m_freeMutex.unlock();
// Block was too large - free the bit of space after it.
if (foundSize > newSize) {
this->freeBlock(foundPos + newSize, foundSize - newSize);
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
}
//---------------------------------------------------------------------------------------------
/** This method is called by an ISaveable object that has outgrown
* its space allocated on file and needs to relocate.
*
* This should only be called when the MRU is saving a block to disk,
* i.e. the ISaveable cannot be in either the MRU buffer or the toWrite buffer.
*
* @param oldPos :: original position in the file
* @param oldSize :: original size in the file. This will be marked as "free"
* @param newSize :: new size of the data
* @return a new position at which the data can be saved.
*/
uint64_t DiskBuffer::relocate(uint64_t const oldPos, uint64_t const oldSize,
const uint64_t newSize) {
// std::cout << "Relocating " << oldPos << ", " << oldSize << ", " << newSize
// << std::endl;
// First, release the space in the old block.
this->freeBlock(oldPos, oldSize);
return this->allocate(newSize);
}
//---------------------------------------------------------------------------------------------
/** Returns a vector with two entries per free block: position and size.
* @param[out] free :: vector to fill */
void DiskBuffer::getFreeSpaceVector(std::vector<uint64_t> &free) const {
free.reserve(m_free.size() * 2);
freeSpace_bySize_t::const_iterator it = m_free_bySize.begin();
freeSpace_bySize_t::const_iterator it_end = m_free_bySize.end();
for (; it != it_end; ++it) {
free.push_back(it->getFilePosition());
free.push_back(it->getSize());
}
}
/** Sets the free space map. Should only be used when loading a file.
* @param[in] free :: vector containing free space index to set */
void DiskBuffer::setFreeSpaceVector(std::vector<uint64_t> &free) {
m_free.clear();
if (free.size() % 2 != 0)
throw std::length_error("Free vector size is not a factor of 2.");
for (std::vector<uint64_t>::iterator it = free.begin(); it != free.end();
it += 2) {
std::vector<uint64_t>::iterator it_next = boost::next(it);
if (*it == 0 && *it_next == 0) {
continue; // Not really a free space block!
}
FreeBlock newBlock(*it, *it_next);
m_free.insert(newBlock);
}
/// @return a string describing the memory buffers, for debugging.
std::string DiskBuffer::getMemoryStr() const {
std::ostringstream mess;
mess << "Buffer: " << m_writeBufferUsed << " in " << m_nObjectsToWrite
<< " objects. ";
return mess.str();
}
} // namespace Mantid
} // namespace Kernel