next up previous contents index
Next: MMap.h Up: The Sources Previous: PersistentStorage.h

PersistentStorage.cc

// PersistentStorage.cc
// Copyright (C) 1997,98 Mori Tetsuya
// 12/16/97
// 12/17/97 
// 12/20/97 map()
// 12/21/97 grow()
// 12/24/97 initializeObjectTable(), splitTopChunk(), ...
// 12/26/97 destroy(), findChunk(), split()
// 12/28/97 removeRedundantWindow(), -O3 optimization bug avoidance
// 12/29/97 map() with validation, minimumChunkSize()
// 12/30/97 cleanUp()
// 1/2/98   setObjectFlag(), getEntryPoint(), stabilized
// 1/3/98   PointerPair, PersistentObject

#pragma implementation "MMap.h"
#pragma implementation "PersistentStorage.h"
#pragma implementation "FileStat.h"
#pragma implementation "SortedTable.h"

#ifdef DEBUG
void breakpoint() {} 
#endif

#include "PersistentStorage.h"
#include <strstream.h>
#include <new.h>

// null objects for SortableObject's
PersistentOffset PersistentOffset::nullObject = PersistentOffset();
Entry Entry::nullObject = Entry();

// This is the only persistent storage object in the program
PersistentStorage *ps = 0;

// PersistentStorage member functions

/*
  constructor -- initialization of persistent storage

  This constructor initializes the global pointer PersistentStorage *ps
  with 'this' pointer.  The pointer is referenced by PersistentPointer
  objects to find out the unique PersistentStorage object.  Therefore,
  no two PersistentStorage object can co-exist.
  parameter:
  const char *dir -- the directory for arenas
 */
PersistentStorage::PersistentStorage(const char *dir) {
  if (dir == 0) {
    // storage directory is not specified
    return;
  }
  if (ps != 0) { 
    // this constructor can be called only once
    return;
  }
  FileStat fs(dir);
  if (!(fs.isDirectory() && 
        fs.isReadable() && 
        fs.isWritable())) {
    return;
  }
  baseDir = new char[strlen(dir) + 1];
  strcpy(baseDir, dir);
  totalWindowSize = 0;
  
  initializeArenaTable();
  ps = this;
  initializeEntryTable();
}

// destructor for PersistentStorage
PersistentStorage::~PersistentStorage() {
  if (ps == 0) {
    return;
  }
  //cleanUp(True);
  synchronize();
  arenaTable.removeAll();
  //entryTable.removeAll();
  if (baseDir != 0) {
    delete baseDir;
  }
  ps = 0;
}

// public operations on PersistentStorage

/*
  allocate -- one of the primitive operations on PersistentStorage

  This operation finds an appropriate Arena and allocates a storage
  block.
  parameters:
  SizeType size -- the size of the new storage, excluding Chunk object
  BoolType persistent -- False for requesting non-persistent storage block

  return value:
  PersistentPointer -- the pointer to the allocated Chunk, 
                       nullObject for error
 */
PersistentPointer 
PersistentStorage::allocate(SizeType size, BoolType persistent) {
  OffsetType off;
  ArenaID aid;
  if (size > 0) {
    if (persistent) {
      cleanUp();
      ArenaID aidBase = Arena::calculateMinimumChunkSizeBits(size);
      ArenaID aidMax = aidBase + (1 << 24);
      aid = 1;
      off = 0;
      SizeType max = arenaTable.searchByKey(aidMax) + 1;
      if (arenaTable.getSize() < max)
        max = arenaTable.getSize();
      SizeType index = arenaTable.searchByKey(aidBase + 1);
      Arena *arena = 0;
      Arena *lastArena = 0;
#ifdef DEBUG
      {
        SizeType tmpindex;
        SizeType tmpMax = arenaTable.getSize();
        cerr << "ArenaTable " << tmpMax << endl;
        for (tmpindex = 0; tmpindex < tmpMax; tmpindex++) {
          arena = arenaTable(tmpindex);
          cerr << "(" << tmpindex << ") " 
               << arena->masterWindow.getFileName()
               << " " << (void *)arena->getKey() 
               << " " << arena->getMinimumChunkSize()
               << ((tmpindex == index) ? " index " : "")
               << ((tmpindex == max) ? " max " : "")
               << endl;
        }
        cerr << "AidBase " << (void *)aidBase << endl
             << "AidMax " << (void *)aidMax << endl
             << "size " << size << endl;
      }
#endif
      for (; index < max; index++) {
        arena = arenaTable(index);
        if (arena == 0)
          continue;
        aid = arena->getKey();
        if (aidBase < aid && aid < aidMax) {
#ifdef DEBUG
          cerr << "selected Arena " 
               << arena->masterWindow.getFileName() 
               << " for size " << size << endl;
#endif
          lastArena = arena;
          off = arena->allocate(size);
          if (off > 0) { // allocation succeeded 
            break;
          }
          else {
#ifdef DEBUG
            cerr << "growing Arena " 
                 << arena->masterWindow.getFileName() 
                 << " for size " << size << endl;
#endif
            arena->grow();
            off = arena->allocate(size);
            if (off > 0)
              break;
#ifdef DEBUG
            else 
              cerr << "grown Arena "
                   << arena->masterWindow.getFileName() 
                   << " allocation failed for size " << size << endl;
#endif
          }
        }
      }
      if (off <= 0) { // allocation failed
        if (lastArena != 0 && 
            aidBase < lastArena->getKey() && 
            lastArena->getKey() < aidMax) { 
          // there are some Arena objects for the size
          aid = lastArena->getKey() + 1; // next ArenaID
        } else { // there are no persistent Arena objects
          aid = aidBase + 1;
        } 
        char *file = new char[512];
        strcpy(file, baseDir);
        strcat(file, "/");
        sprintf(file + strlen(file), arenaFilenameFormat, aid);
#ifdef DEBUG
        cerr << "creating arena " << file << " for size " << size << endl;
#endif
        arena = new Arena(aid, file); // create new Arena
        if (arena != 0 && arena->getKey() == aid) {
          arenaTable += arena;
          off = arena->allocate(size);
        } else { // Arena creation failed
          if (arena != 0) 
            delete arena; // clean up the junk Arena
          off = 0;
        }
      }
    } else { // non-persistent allocation requested
      aid = 0;
      Arena *arena = arenaTable[aid];
      if (arena == 0) {
        arena = new Arena(0, "/dev/zero"); // create non-persistent Arena
        if (arena != 0) {
          if (arena->getKey() == 0) {
            arenaTable += arena;
          } else {
            delete arena;
            arena = 0;
          }
        } 
      } 
      if (arena != 0) {
        off = arena->allocate(size);
        if (off == 0) {
          arena->grow();
          off = arena->allocate(size);
        }
      } else {
        off = 0;
      }
    }
  } else {
    aid = 0;
    off = 0;
  }
  PersistentPointer pp(aid, off);
#ifdef DEBUG
  if (aid != 0 && aid != 1) {
    cerr << "PersistentStorage::allocate() aid " << aid 
         << " off " << off << endl;
  }
#endif
  return pp;
}

/*
  destroy -- one of the primitive operations on PersistentStorage

  This operation destroys the allocated Chunk and returns it to
  the free Chunk list.
  parameter:
  PersistentPointer &pp -- the pointer to the allocated Chunk,
                           this pointer must grab the object and 
                           no other pointers must readonly-grab
                           the object.

  return value:
  BoolType -- True if successful

 */
BoolType PersistentStorage::destroy(PersistentPointer &pp) {
  if (pp.isGrabbed()) { // pp must be grabbed
    if (pp.getArenaID() >= 0 && pp.getOffset() != 0) { // pp must not be null
      Arena *arena = getArena(pp.getArenaID()); // get the Arena object
      if (arena != 0) { // Arena found
        if (arena->getGrabCountReadOnly(pp.getOffset()) == 0) { 
          // the object must not be grabbed read-only
          if (arena->destroy(pp.getOffset())) {
            pp.clearGrabbed();
            cleanUp();
            return True;
          } else { // destruction failed
#ifdef DEBUG
            cerr << "PersistentStorage::destroy destruction failed" << endl;
#endif
            return False;
          }
        } else { // the object is grabbed read-only
#ifdef DEBUG
          cerr << "PersistentStorage::destroy grabbed read-only" << endl;
#endif
          return False;
        }
      } else { // Arena not found
#ifdef DEBUG
        cerr << "PersistentStorage::destroy arena not found" << endl;
#endif
        return False;
      }
    } else { // pp is null
#ifdef DEBUG
      cerr << "PersistentStorage::destroy pp is null" << endl;
#endif
      return False;
    }
  } else { // pp is not grabbed
#ifdef DEBUG
      cerr << "PersistentStorage::destroy pp is not grabbed" << endl;
#endif
    return False;
  }
}

PersistentPointer PersistentStorage::getEntryPoint(EntryID eid) {
  Entry entry(entryTable[eid]);
  return entry.entryPoint;
}

BoolType PersistentStorage::setEntryPoint(EntryID eid, PersistentPointer &pp) {
  if (!entryTable[eid].isNull()) {
    entryTable -= eid;
  }
  if (pp == 0)
    return True;

  Entry entry(eid, pp);
  entryTable += entry;

  return True;
}

BoolType PersistentStorage::synchronize() {
  SizeType index;
  SizeType size = arenaTable.getSize();
  Arena *arena;
  for (index = 0; index < size; index++) {
    arena = arenaTable(index);
    if (arena != 0) 
      arena->synchronize();
  }
  
  SizeType entrySize;
  char entryFilePath[1024];
  
  if (baseDir == 0)
    return False;
  strcpy(entryFilePath, baseDir);
  strcat(entryFilePath, "/");
  strcat(entryFilePath, entryPointTableFilename);

  RawFile entryFile(entryFilePath);

  if (entryFile.exists() && !entryFile.isRegularFile()) 
    return False; // file type is inappropriate

  entrySize = entryTable.getSize();
  if (entrySize > 0) {
    entryFile.create();
    Entry *table = entryTable.getTable();
    entryFile.write((void *)table, entrySize * sizeof(Entry));
    entryFile.close();
  }
  return True;
}

// protected member functions of PersistentStorage

PointerType PersistentStorage::grab(PersistentPointer &pp) {
  if (!pp.isGrabbed()) { // pp must not be grabbed
    if (pp.getArenaID() >= 0 && pp.getOffset() != 0) { // pp must not be null
      Arena *arena = getArena(pp.getArenaID()); // get the Arena object
      if (arena != 0) { // Arena found
        Chunk *chunk = arena->grab(pp.getOffset()); // grab the object
        cleanUp();
        if (chunk != 0) { // grabbing succeeded
          pp.setGrabbed();
          return chunk->getMemBlock(); // return the memory block
        } else { // grabbing failed
#ifdef DEBUG
          cerr << "PersistentStorage::grab grabbing failed" << endl;
#endif
          return 0;
        }
      } else { // Arena not found
#ifdef DEBUG
        cerr << "PersistentStorage::grab Arena not found" << endl;
#endif
        return 0;
      }
    } else { // pp is null
#ifdef DEBUG
      cerr << "PersistentStorage::grab pp is null" << endl;
#endif
      return 0;
    }
  } else { // pp has been grabbed
#ifdef DEBUG
    cerr << "PersistentStorage::grab pp has been grabbed" << endl;
#endif
    return 0;
  }
}

BoolType PersistentStorage::release(PersistentPointer &pp) {
  if (pp.isGrabbed()) { // pp must be grabbed
    if (pp.getArenaID() >= 0 && pp.getOffset() != 0) { // pp must not be null
      Arena *arena = getArena(pp.getArenaID()); // get the Arena object
      if (arena != 0) { // Arena found
        if (arena->release(pp.getOffset())) { // release the object
          pp.clearGrabbed();
          return True;
        } else {
          return False;
        }
      } else { // Arena not found
        return False;
      }
    } else { // pp is null
      return False;
    }
  } else { // pp has not been grabbed
    return False;
  }
}

PointerType PersistentStorage::grabReadOnly(PersistentPointer &pp) {
  if (!pp.isGrabbedReadOnly()) { // pp must not be grabbed
    if (pp.getArenaID() >= 0 && pp.getOffset() != 0) { // pp must not be null
      Arena *arena = getArena(pp.getArenaID()); // get the Arena object
      if (arena != 0) { // Arena found
        Chunk *chunk = arena->grabReadOnly(pp.getOffset()); // grab the object
        cleanUp();
        if (chunk != 0) { // grabbing succeeded
          pp.setGrabbedReadOnly();
          return chunk->getMemBlock(); // return the memory block
        } else { // grabbing failed
          return 0;
        }
      } else { // Arena not found
        return 0;
      }
    } else { // pp is null
      return 0;
    }
  } else { // pp has been grabbed
    return 0;
  }
}

BoolType PersistentStorage::releaseReadOnly(PersistentPointer &pp) {
  if (pp.isGrabbedReadOnly()) { // pp must be grabbed
    if (pp.getArenaID() >= 0 && pp.getOffset() != 0) { // pp must not be null
      Arena *arena = getArena(pp.getArenaID()); // get the Arena object
      if (arena != 0) { // Arena found
        if (arena->releaseReadOnly(pp.getOffset())) { // release the object
          pp.clearGrabbedReadOnly();
          return True;
        } else {
          return False;
        }
      } else { // Arena not found
        return False;
      }
    } else { // pp is null
      return False;
    }
  } else { // pp has not been grabbed
    return False;
  }
}

BoolType PersistentStorage::initializeArenaTable() {
  if (baseDir == 0)
    return False;
  ScanDir storageDir(baseDir, arenaFilenameWildcard);
  SizeType number = storageDir.entries();
  if (number < 0) 
    return False;
  ArenaID aid = 0;
  Arena *arena = new Arena(aid, "/dev/zero");
  arenaTable += arena;
  arena = 0;
  const char *storageFilename = 0;
  char storageFile[1024];
  storageDir.initFilter(); 
  while ((storageFilename = storageDir.getNextEntry()) != 0) {
    strcpy(storageFile, baseDir);
    strcat(storageFile, "/");
    strcat(storageFile, storageFilename);
    FileStat fs(storageFile);
    if (!(fs.isRegularFile() && 
        fs.isReadable() && 
        fs.isWritable()))
      continue;
    istrstream iss(storageFilename);
    iss.scan(arenaFilenameFormat, &aid);
#ifdef DEBUG
    cerr << "initializing Arena " << storageFile << endl;
#endif
    arena = new Arena(aid, storageFile);
    if (arena->getKey() == aid) {
      arenaTable += arena;
    } else {
      delete arena;
      return False;
    }
  }
  return True;
}

BoolType PersistentStorage::initializeEntryTable() {
  SizeType entrySize;
  char entryFilePath[1024];
  
  if (baseDir == 0)
    return False;
  strcpy(entryFilePath, baseDir);
  strcat(entryFilePath, "/");
  strcat(entryFilePath, entryPointTableFilename);

  RawFile entryFile(entryFilePath);

  if (entryFile.exists() && !entryFile.isRegularFile()) 
    return False; // file type is inappropriate

  if (entryFile.exists()) {
    entrySize = entryFile.getSize(); // get the size of the file in bytes
    if (entrySize < (SizeType)sizeof(Entry))
      return False;
    entryFile.open();
    entrySize /= sizeof(Entry);
    Entry *buf = (Entry *)new char[entrySize * sizeof(Entry)];
    entrySize = 
      entryFile.read((void *)buf, entrySize * sizeof(Entry));
    entryFile.close();
    if (entrySize < (SizeType)sizeof(Entry)) {
      delete buf;
      return False;
    }
    entrySize /= sizeof(Entry);
    entryTable.SortedObjectTable(buf, entrySize);
    if (entryTable.getSize() < entrySize) {
      delete buf;
      return False;
    }
    delete buf;
  } else {
    // the file does not exist
    entrySize = 0;
  }
  SizeType index;
  SortedObjectTable<Entry, EntryID> invalidEntries;
  for (index = 0; index < entrySize; index++) {
    Entry entry(entryTable(index));
    if (entry.entryPoint.grab() == 0)
      invalidEntries += entry;
    else
      entry.entryPoint.release();
  }
  for (index = 0; index < invalidEntries.getSize(); index++) {
    entryTable -= invalidEntries(index).getKey();
  }
  return True;
}

SizeType PersistentStorage::cleanUp(BoolType completely) {
  SizeType before;
  if ((before = totalWindowSize) < maxTotalWindowSize) 
    return 0;
  SizeType tableSize = arenaTable.getSize();
  SizeType index = tableSize - 1;
  SizeType size = 0;
  while (index > 0) {
    Arena *arena = arenaTable(index);
    size += arena->cleanUp(completely);
    if (!completely && size > maxTotalWindowSize / 4)
      break;
    index--;
  }
#ifdef DEBUG
  cerr << "\nPersistentStorage::cleanUp before "
       << before << " after " << totalWindowSize 
       << " size " << size << endl;
#endif
  return size;
}

inline PersistentStorage *PersistentStorage::getThePS() {
  return ps;
}

// member functions of Arena 

// constructor
Arena::Arena(ArenaID aid, const char *storageFile) : 
  arenaID(aid), masterWindow(storageFile) {
    if (aid == 0) { // non-persistent arena
      arenaSize = PersistentStorage::pageSize;
      freeList = 0;
      topChunk = 0;
      maximumFreeChunk = 0;
      if (grow()) {
        arenaID = aid;
      } else {
        arenaID = -1;
      }
    } else { // persistent arena
      FileStat fs(storageFile);
      if (fs.isRegularFile() && 
          fs.isReadable() &&
          fs.isWritable()) {
        masterWindow.MMap::map(0, PersistentStorage::pageSize);
        ArenaInitialBlock *aib = 
          (ArenaInitialBlock *)masterWindow.getAddr();
        if (aib != 0 && 
            aib->arenaID == arenaID) {
          arenaSize = aib->arenaSize;
          freeList = aib->freeList;
          topChunk = aib->topChunk;
          maximumFreeChunk = 0;
          arenaID = aid;
        } else {
          arenaID = -1;
        }
      } else if (!fs.exists()) { // storage file should be created
        RawFile sf(storageFile);
        sf.create();
        // create initial block
        char *buf = new char[PersistentStorage::pageSize];
        memset((PointerType)buf, 0, PersistentStorage::pageSize);
        sf.write(buf, PersistentStorage::pageSize);
        delete buf;
        sf.close();
        // re-create masterWindow
        masterWindow.Window(storageFile);
        masterWindow.MMap::map(0,PersistentStorage::pageSize);
        ArenaInitialBlock *aib =
          new(masterWindow.getAddr()) ArenaInitialBlock(aid);
        arenaSize = aib->arenaSize = PersistentStorage::pageSize;
        freeList = aib->freeList = 0;
        topChunk = aib->topChunk = 0;
        maximumFreeChunk = 0;
        if (grow()) { // generate a free block
          masterWindow.sync(MS_ASYNC);
          arenaID = aid; // successful
        } else {
          arenaID = -1;
        }
      } else { // erraneous storage file
        arenaID = -1;
      }
    }
}

// destructor
Arena::~Arena() {
  masterWindow.sync(MS_ASYNC);
  synchronize();
  windowTable.removeAll();
}

void Arena::printStatus() {
  SizeType size;
  ostream &os = cerr;
  os << "Arena {\n  arenaID " << arenaID
     << "\n  arenaSize " << arenaSize 
     << "\n  storageFile " << masterWindow.getFileName() 
     << "\n  freeList " << freeList
     << "\n  topChunk " << topChunk
     << "\n  windowTable {\n  size " << (size = windowTable.getSize())
     << "\n";
  Window *window;
  SizeType index;
  for (index = 0; index < size; index++) {
    window = windowTable(index);
    window->printStatus();
  }
  if (freeList != 0) {
    OffsetChunkPair fChunk(freeList);
    os << "freeList {\n";
    do {
      if (fChunk.map(this) == 0) 
        break;
      os << "freeChunk " << (OffsetType)fChunk 
         << " size " << PChunk(fChunk)->getSize() 
         << " backward " << PChunk(fChunk)->backward
         << " forward " << PChunk(fChunk)->forward
         << "\n";
      fChunk = PChunk(fChunk)->forward;
    } while (fChunk != freeList);
    os << "} // freeList\n";
  }
  os << "} // Arena\n";
}

BoolType Arena::objectTableCheck() {
  ostream &os = cerr;

  BoolType Consistent = True;
  SizeType windowIndex = 0;
  SizeType windowTableSize = windowTable.getSize();
  for (; windowIndex < windowTableSize; windowIndex++) {
    Window *current = windowTable(windowIndex);
    Window *before = windowTable(windowIndex - 1);
    Window *after = windowTable(windowIndex + 1);
    if (before != 0 && current->isSeparatedFrom(*before))
      before = 0;
    if (after != 0 && current->isSeparatedFrom(*after))
      after = 0;

    SizeType objectIndex = 0;
    SizeType objectTableSize = current->objectTable.getSize();
    BoolType consistent = True;
    for (; objectIndex < objectTableSize; objectIndex++) {
      OffsetType off = current->objectTable(objectIndex).getOffset();
      if (current->objectTable(objectIndex).isFragmented()) {
        if (before != 0)
          if (before->isWithinRange(off)) 
            if (before->objectTable[off].isFragmented())
              consistent = False;
      } else {
        if (after != 0)
          if (after->isWithinRange(off))
            if (!after->objectTable[off].isFragmented())
              consistent = False;
      }
      if (current->objectTable(objectIndex).isInUse()) {
        if (before != 0)
          if (before->isWithinRange(off)) 
            if (!before->objectTable[off].isInUse())
              consistent = False;
        if (after != 0)
          if (after->isWithinRange(off)) 
            if (!after->objectTable[off].isInUse())
              consistent = False;
      } else {
        if (before != 0)
          if (before->isWithinRange(off)) 
            if (before->objectTable[off].isInUse())
              consistent = False;
        if (after != 0)
          if (after->isWithinRange(off)) 
            if (after->objectTable[off].isInUse())
              consistent = False;
      }
    }
    if (!consistent) {
      Consistent = False;
      os << "Arena::objectTableCheck {\n";
      if (before != 0) {
        os << "before ";
        before->printStatus();
      }
      os << "current ";
      current->printStatus();
      if (after != 0) {
        os << "after ";
        after->printStatus();
      }
      os << "} // Arena::objectTableCheck" << endl;
    }
  }
  return Consistent;
}



OffsetType Arena::allocate(SizeType size) {
  size = alignChunkSize(size);
  OffsetChunkPair p = findChunk(size);
  if (p != 0) { // found in freeList
    OffsetChunkPair q = split(p, size);
    removeRedundantWindow(p);
    if (q != 0) freeList = q.offset;
  } else { // not found in freeList
    p = splitTopChunk(size);
    removeRedundantWindow(p);
  }
#ifdef DEBUG
  if (freeList != 0 && !isValidFreeChunk(freeList)) 
    breakpoint();
#endif
  if (p != 0) {
    if (p.map(this, False) != 0) {
      p.chunk->forward = p.chunk->backward = 0; // non-demanding operation
      return p.offset;
    } else {
      return 0;
    }
  } else {
    return 0;
  }
}

BoolType Arena::destroy(OffsetType o) {
#ifdef DEBUG
  cerr << "Arena::destroy " << o << " ";
#endif
  if (o == 0) {
#ifdef DEBUG
    cerr << "Arena::destroy try to destroy 0" << endl;
#endif
    return False;
  }
  BoolType success = True;
  if (isGrabbed(o)) { // must be grabbed exclusively
    if (release(o)) {
      success = destroyChunk(o);
      removeRedundantWindow(o);
#ifdef DEBUG
      if (getObject(freeList, True).isInUse()) {
        cerr << "freeList " << freeList << " is InUse " << endl;
        STATUS("freeList");
      }
#endif
      if (success) {
        return consolidateIntoTopChunk();
      } else {
#ifdef DEBUG
        cerr << "Arena::destroy consolidateIntoTopChunk failed" << endl;
#endif
        return False;
      }
    } else { // release failed
#ifdef DEBUG
      cerr << "Arena::destroy release failed" << endl;
#endif
      return False;
    }
  } else { // attempt to destroy non-grabbed object
#ifdef DEBUG
    cerr << "Arena::destroy not grabbed" << endl;
#endif
    return False;
  }
}

Chunk *Arena::map(OffsetType o) {
  Window *window = mapWindow(o);
  o &= PersistentOffset::offsetMask;
  return window != 0 ? window->map(o) : 0;
}

Window *Arena::mapWindow(OffsetType o) {
  BoolType mapObjects = 
    o & PersistentOffset::inUseMask ? False : True;
  BoolType validateObject = 
    o & PersistentOffset::fragmentedMask ? False : True;
  o &= ~PersistentOffset::inUseMask;
  Window *window = windowTable[o];
  if (window != 0) { // already mapped
    if (!mapObjects) { // anywhere
      o &= PersistentOffset::offsetMask;
      return window;
    }
    o &= PersistentOffset::offsetMask;
    if (window->isValid(o)) { // maybe fragmented
      return window;
    } else {
      return 0;
    }
  } else { // not mapped
    if (arenaID > 0) { // persistent
      o &= PersistentOffset::offsetMask;
      // the offset is out of the Arena
      if (!(PersistentStorage::pageSize <= o && o < arenaSize)) {
        return 0;
      }

      OffsetType begin;
      OffsetType end;
      SizeType size;
      // SizeType windowTableSize = windowTable.getSize();
      SizeType index = windowTable.searchByKey(o);
      Window *before = windowTable(index - 1);
      Window *after = windowTable(index);
      
#ifdef DEBUG
      if (before != 0 || after != 0)
        cerr << "Window::mapWindow before != 0 && after != 0" << endl;
#endif

      // calculate the minimum range of mapping
      if (validateObject) {
        BoolType withinBefore = before != 0 && before->isWithinRange(o);
        BoolType withinAfter = after != 0 && after->isWithinRange(o);
        size = 0;
        begin = o;
        if (withinBefore) {
          // get the size from the before Window
          size = before->map(o)->getSize();
        } else if (withinAfter) {
          // get the size from the after Window
          size = after->map(o)->getSize();
        } else if (after != 0) {
          // get the size from the first object of the after Window
          OffsetChunkPair next(after->objectTable(0).getOffset());
          if (next.map(after) != 0) 
            if ((OffsetType)next - PChunk(next)->getPrevSize() == o)
              size = PChunk(next)->getPrevSize();
        }
        if (size > 0) {
          // the object size detected
          end = begin + size;
        } else {
          // the object size not detected
          // the size may be less than or equal to minimunChunkSize * 2
          end = o + getMinimumChunkSize() * 2;
          // the range must not exceed arenaSize
          if (end > arenaSize)
            end = arenaSize;
          // the range must not contain 
          // the first object of the after Window
          if (after != 0 && after->objectTable.getSize() > 0) {
            OffsetChunkPair next(after->objectTable(0).getOffset());
            if ((OffsetType)next < end)
              end = next;
          }
        }
      } else { // validation not required
        // just one chunk
        begin = o;
        end = o + sizeof(Chunk);
      }
      
      // grow from the minimum requirement
      if (end - begin < PersistentStorage::standardWindowSize) {
        // the range should grow up
        OffsetType lowerLimit = begin;
        OffsetType upperLimit = end;
        // detect lowerLimit
        if (before != 0) {
          OffsetType beforeEnd = 
            (OffsetType)before->getOffset() + 
            (OffsetType)before->getLength();
          if (beforeEnd <= begin) {
            SizeType index = before->objectTable.getSize() - 1;
            if (index >= 0) {
              if (before->objectTable(index).isFragmented()) {
                lowerLimit = before->objectTable(index).getOffset();
              } else {
                lowerLimit = beforeEnd;
              }    
            }
          }
        } else { // before == 0
          lowerLimit = PersistentStorage::pageSize;
        }
        // detect upperLimit
        if (after != 0) {
          OffsetType afterBegin = (OffsetType)after->getOffset();
          if (end < afterBegin) {
            upperLimit = after->objectTable(0).getOffset();
          }
        } else { // after == 0
          upperLimit = arenaSize;
        }
        OffsetType tmp = end - PersistentStorage::standardWindowSize;
        if (lowerLimit < tmp)
          lowerLimit = tmp;
        tmp = begin + PersistentStorage::standardWindowSize;
        if (tmp < upperLimit)
          upperLimit = tmp;
        begin = lowerLimit;
        end = upperLimit;
      }
      begin &= ~(PersistentStorage::pageSize - 1);

      if (begin < end) {
        size = end - begin;
#ifdef DEBUG
        if ((before != 0 && before->isWithinRange(begin)) ||
            (after != 0 && after->isWithinRange(end))) {
          cerr << "Overlapped Window mapping " 
               << " begin " << begin << " end " << end 
               << " object " << o << endl;
          cerr << "before Window ";
          if (before != 0) {
            before->printStatus();
          } else {
            cerr << "null ";
          }
          cerr << "after Window ";
          if (after != 0) {
            after->printStatus();
          } else {
            cerr << "null ";
          }
        }
#endif
        if (mapObjects) {
          window = 
            new Window(masterWindow, begin, size, o, before, after, this,
                       getMinimumChunkSize());
#ifdef DEBUG
          if ((before != 0 && before->isWithinRange(begin)) ||
              (after != 0 && after->isWithinRange(end))) {
            cerr << "just mapped" << endl;
            window->printStatus();
          }
#endif
          if (window->isValid(o)) {
            windowTable += window;
            return window;
          } else {
#ifdef DEBUG
            breakpoint();
#endif
            delete window;
            return 0;
          }
        } else {
          window = 
            new Window(masterWindow, begin, size, 0, 0, 0, this,
                       getMinimumChunkSize());
          if (window->isWithinRange(o)) {
            windowTable += window;
            return window;
          } else {
#ifdef DEBUG
            breakpoint();
#endif
            delete window;
            return 0;
          }
        }           
      } else {
#ifdef DEBUG
        breakpoint();
#endif
        return 0;
      }
    } else { // non-persistent
      return 0;
    }
  }
}

BoolType Arena::synchronize(OffsetType o) {
  if (arenaID > 0) { // persistent
    if (o != 0) {
      Window *window = windowTable[o];
      if (window != 0) 
        window->sync(MS_SYNC);
      return True;
    } else {
      SizeType size = windowTable.getSize();
      SizeType index;
      Window *window;
      for (index = 0; index < size; index++) {
        window = windowTable(index);
        if (window != 0 && window->isDirty()) {
          window->sync(MS_SYNC);
          window->clearDirty();
        }
      }
      masterWindow.sync(MS_SYNC);
      return True;
    }
  } else { // non-persistent
    return True;
  }
}

SizeType Arena::cleanUp(BoolType completely) {
  PersistentStorage *p = PersistentStorage::getThePS();
  if (p != 0 && 
      p->totalWindowSize > PersistentStorage::maxTotalWindowSize) {
    Window *window;
    SizeType tableSize = windowTable.getSize();
    SizeType index;
    SortedPointerTable<Window, OffsetType> removableWindows(tableSize);
    SizeType memorySize = 0;
    for (index = 0; index < tableSize; index++) {
      window = windowTable(index);
      if (window->isCompletelyReleased()) {
        removableWindows += window;
        memorySize += window->getLength();
      }
      if (!completely &&
          memorySize > PersistentStorage::maxTotalWindowSize / 4) 
        break;
    }
    if ((tableSize = removableWindows.getSize()) > 0) {
      for (index = 0; index < tableSize; index++) {
        Window *window = removableWindows(index);
        window->sync(MS_ASYNC);
        windowTable -= window;
      }
      removableWindows.removeAll();
    }
    return memorySize;
  } else { // cleaning up is unnecessary
    return 0;
  }
}


// private member functions of Arena

OffsetChunkPair Arena::findChunk(SizeType size) {
#ifdef DEBUG
  cerr << "Arena::findChunk " << freeList << " ";
#endif
  if (freeList == 0) 
    return 0;

  if (maximumFreeChunk > 0 && maximumFreeChunk < size)
    return 0;

  OffsetChunkPair fitChunk(0);
  Window *window;

#if 0  
  OffsetChunkPair completeFitChunk(0);
  SizeType windowTableSize = windowTable.getSize();
  SizeType windowIndex = 0;
  SizeType tableSize;
  SizeType index;
  SizeType freeChunkSize;
  SizeType minimum = getMinimumChunkSize();
  // search first in the mapped Windows
  for (windowIndex = 0; 
       windowIndex < windowTableSize; 
       windowIndex++) {
    window = windowTable(windowIndex);
    index = 0;
    tableSize = window->objectTable.getSize();
    while (index < tableSize) {
      if (!window->objectTable(index).isInUse()) {
        completeFitChunk = window->objectTable(index).getOffset();
        if (index + 1 < tableSize) {
          freeChunkSize = 
            window->objectTable(index + 1).getOffset() - 
            completeFitChunk;
        } else {
          completeFitChunk.map(window);
          freeChunkSize = PChunk(completeFitChunk)->getSize();
        }
        if (freeChunkSize >= size) {
          fitChunk = completeFitChunk;
          if (index + 1 < tableSize) {
            return completeFitChunk;
          } else {
            if (freeChunkSize - size >= minimum) {
              // really split
              if (window->isWithinRange((OffsetType)completeFitChunk + 
                                        size + sizeof(Chunk) - 1)) {
                return completeFitChunk;
              }
            } else { 
              // not split, allocate the whole Chunk
              if (window->isWithinRange((OffsetType)completeFitChunk + 
                                        size - 1)) {
                return completeFitChunk;
              }
            }
          }
        }
      }
    }
  }
  if ((OffsetType)fitChunk != 0)
    return fitChunk;
#endif
  fitChunk = freeList;
  OffsetType nextChunk;
  OffsetType maxChunk = freeList;
  OffsetType topFreeChunk = 0;
  BoolType tooFar = False;
  SizeType s = 0;
  SizeType max = 0;
  CounterType probeCount = 0;
  do {
    window = mapWindow((OffsetType)fitChunk | 
                       PersistentOffset::fragmentedMask);
    if (fitChunk.map(window) == 0) {
      cerr << "fitChunk is null" << endl;
      return 0;
    }
    if (arenaID > 0) {
      // this grabbing should be changed to read-only grabbing
      window->grab(fitChunk | PersistentOffset::fragmentedMask);
#ifdef DEBUG
      SizeType cleanedUp = cleanUp();
      if (cleanedUp > 0) 
        cerr << "clean up " << cleanedUp << endl;
#else
      cleanUp();
#endif
    }
#ifdef DEBUG
    cerr << "fitChunk " << (OffsetType)fitChunk 
         << " size " << PChunk(fitChunk)->getSize()  
         << " backward " << PChunk(fitChunk)->backward
         << " forward " << PChunk(fitChunk)->forward << endl;
#endif
    nextChunk = PChunk(fitChunk)->backward;
    s = PChunk(fitChunk)->getSize();
    if (max < s) {
      maxChunk = fitChunk;
      max = s;
    }
    if (arenaID > 0) {
      window->release(fitChunk | PersistentOffset::fragmentedMask);
    }    
    probeCount++;
    if (s >= size) {
      break; // first fit scheme
    }
    fitChunk = nextChunk;
    if (freeList - fitChunk > 
        PersistentStorage::maxTotalWindowSize / 4) {
      tooFar = True;
      break;
    }
    if (topFreeChunk == 0 && 
        freeList < (OffsetType)fitChunk)
      topFreeChunk = fitChunk;
    if (topFreeChunk != 0 &&
        topFreeChunk - (OffsetType)fitChunk > 
        PersistentStorage::maxTotalWindowSize / 4) {
      tooFar = True;
      break;
    }
  } while (fitChunk != freeList);
  if (s >= size) {
    return fitChunk;
  } else if (!tooFar) {
    freeList = maxChunk;
    maximumFreeChunk = max;
    return 0;
  } else { // tooFar
    // freeList = fitChunk;
    return 0;
  }
}

OffsetChunkPair Arena::splitTopChunk(SizeType size) {
  OffsetChunkPair tChunk(topChunk);
  tChunk.map(this, True, False);
  if (PChunk(tChunk) == 0 ||
      PChunk(tChunk)->getSize() < size + (SizeType)sizeof(Chunk)) 
    return 0; // topChunk is too small
  // put new topChunk
  SizeType tChunkSize = PChunk(tChunk)->getSize();
  PChunk(tChunk)->setSize(size);
  PChunk(tChunk)->setPrevInUse();

  setObjectFlag(tChunk, True, size);

  OffsetChunkPair newTop(topChunk + size);
  newTop.map(this, True, True);
  new(PChunk(newTop)) Chunk(size, tChunkSize - size, True, 0, 0);
  topChunk = newTop;
  insertObject(topChunk, False);

  newTop = topChunk;
  if (newTop.map(this, True) == 0) {
    STATUS("Arena::splitTopChunk newTop is null\n");
  } else {
    if ((OffsetType)newTop + PChunk(newTop)->getSize() != arenaSize) {
      STATUS("Arena::splitTopChunk insertion of newTopChunk failed\n");
    }
  }

  updateInitialBlock();
  return tChunk;
}

OffsetChunkPair Arena::split(OffsetChunkPair fChunk, SizeType size) {
  fChunk.map(this);
  if (PChunk(fChunk) == 0) {
    STATUS("Arena::split() fChunk is null\n");
    return 0;
  }
  OffsetChunkPair bFree(PChunk(fChunk)->backward);
  bFree.map(this);
  if (PChunk(bFree) == 0) {
    STATUS("Arena::split() bFree is null\n");
    return 0;
  }
  OffsetChunkPair fFree(PChunk(fChunk)->forward);
  fFree.map(this);
  if (PChunk(fFree) == 0) {
    STATUS("Arena::split() fFree is null\n");
    return 0;
  }
  SizeType splitFreeChunkSize = PChunk(fChunk)->getSize() - size;
  if (splitFreeChunkSize < getMinimumChunkSize()) { 
    // cannot split, then grab the whole
    if (fChunk == bFree && fChunk == fFree) { 
      freeList = 0; // fChunk is the last free Chunk
    } else {
      // maybe fFree == bFree
      PChunk(bFree)->forward = fFree;
      PChunk(fFree)->backward = bFree;
      if (fChunk == freeList) {
        freeList = bFree;
      }
    }
    OffsetChunkPair nChunk(fChunk.getNextChunk(this));
    nChunk.map(this);
    if (PChunk(nChunk) == 0) {
      STATUS("Arena::split() nChunk is null\n");
      return 0;
    }
    PChunk(nChunk)->setPrevInUse();

    setObjectFlag(fChunk, True, 0);
    return 0;
  } else { // split 
    // get the nChunk
    OffsetChunkPair nChunk(fChunk.getNextChunk(this));
    nChunk.map(this);
    if (PChunk(nChunk) == 0) {
      STATUS("Arena::split() nChunk is null\n");
      return 0;
    }
    // set the size
    PChunk(fChunk)->setSize(size);
    PChunk(nChunk)->setPrevSize(splitFreeChunkSize);

    // put new splitFreeChunk
    OffsetChunkPair splitFreeChunk((OffsetType)fChunk + size);
    splitFreeChunk.map(this, True, True);
    if (PChunk(splitFreeChunk) == 0) {
      STATUS("Arena::split() splitFreeChunk is null\n");
      return 0;
    }
    new(PChunk(splitFreeChunk)) 
      Chunk(size, splitFreeChunkSize, True, bFree, fFree);
    if (bFree == fChunk && fFree == fChunk) {
      // double linked to itself
      PChunk(splitFreeChunk)->forward = 
        PChunk(splitFreeChunk)->backward = splitFreeChunk;
    } else {
      PChunk(bFree)->forward = PChunk(fFree)->backward 
        = splitFreeChunk;
    }
    // update freeList
    if (fChunk == freeList) {
      freeList = splitFreeChunk;
    }
    // splitFreeChunk object
    insertObject(splitFreeChunk, False);

    // update objectTables
    // fChunk object
    setObjectFlag(fChunk, True, size);

    updateInitialBlock();
    return splitFreeChunk;
  }
}

BoolType Arena::consolidateIntoTopChunk() {
  OffsetChunkPair top(topChunk);
  top.map(this);
  if (PChunk(top) == 0) {
    STATUS("Arena::consolidateIntoTopChunk() top is null\n");
    return False;
  }
  if (PChunk(top)->isPrevInUse()) // prevChunk is in use
    return True;
  if (PChunk(top)->getPrevSize() == 0) // no region is allocated
    return True;
  OffsetChunkPair fChunk(top.getPrevChunk(this));
  fChunk.map(this);
  if (PChunk(fChunk) == 0) {
    STATUS("Arena::consolidateIntoTopChunk() prev of top is null\n");
    return False;
  }
  OffsetChunkPair fFree(PChunk(fChunk)->forward);
  OffsetChunkPair bFree(PChunk(fChunk)->backward);
  if (fFree == 0 || bFree == 0) {
    STATUS("Arena::consolidateIntoTopChunk() freeList corrupted\n");
    return False;
  }
  // extract fChunk from freeList
  fFree.map(this);
  if (PChunk(fFree) == 0) {
    STATUS("Arena::consolidateIntoTopChunk() fFree is null\n");
    return False;
  }
  bFree.map(this);
  if (PChunk(bFree) == 0) {
    STATUS("Arena::consolidateIntoTopChunk() bFree is null\n");
    return False;
  }
  // extract and remove fChunk from freeList
  PChunk(fFree)->backward = bFree;
  PChunk(bFree)->forward = fFree;

  // update freeList
  if (fChunk == freeList) {
    // bFree may be fChunk
    freeList = (fChunk == bFree) ? 0 : bFree;
  }

  // modify fChunk, which will become a new topChunk
  SizeType newTopChunkSize = 
    PChunk(fChunk)->getSize() + PChunk(top)->getSize();
  PChunk(fChunk)->setSize(newTopChunkSize); // fChunk is the new topChunk

  // remove the old topChunk object
  if (!removeObject(top)) {
    STATUS("Arena::consolidateIntoTopChunk() removeObject(top) failed\n");
    return False;
  }
  // update topChunk
  topChunk = fChunk;

  // update objectTables
  setObjectFlag(fChunk, False, newTopChunkSize);
  removeRedundantWindow(top);
  updateInitialBlock();
  return True;
}

BoolType Arena::destroyChunk(OffsetType o) {
  OffsetChunkPair aChunk(o);
  aChunk.map(this, False);
  if (PChunk(aChunk) == 0) {
    STATUS("Arena::destroyChunk() aChunk is null\n");
    return False;
  }
  if (PChunk(aChunk)->isPrevInUse()) {
    OffsetChunkPair nChunk = aChunk.getNextChunk(this);
    nChunk.map(this);
    if (PChunk(nChunk) == 0) {
      STATUS("Arena::destroyChunk() nChunk is null\n");
      return False;
    }
    OffsetChunkPair nnChunk(0); // the next of the next chunk
    if (nChunk != topChunk) {
      nnChunk = nChunk.getNextChunk(this);
      nnChunk.map(this);
      if (PChunk(nnChunk) == 0) {
        STATUS("Arena::destroyChunk() nnChunk is null\n");
        return False;
      }
    }
    if (nChunk != topChunk && 
        !PChunk(nnChunk)->isPrevInUse()) {
#ifdef DESTROYLOG
      cerr << "aaf " << "aChunk " << aChunk
           << "nChunk " << nChunk << endl;
#endif
      // consolidate with nChunk
      OffsetChunkPair fFree(PChunk(nChunk)->forward);
      OffsetChunkPair bFree(PChunk(nChunk)->backward);
      // calculate the new size
      SizeType newSize = 
        PChunk(aChunk)->getSize() + PChunk(nChunk)->getSize();
      // set the new size
      // remove nChunk from objectTables
      if (!removeObject(nChunk)) {
        STATUS("Arena::destroyChunk() removeObject(nChunk) failed\n");
        return False;
      }

      PChunk(aChunk)->setSize(newSize);
      PChunk(nnChunk)->setPrevSize(newSize);

      // in case freeList has only one element
      if (fFree == nChunk) 
        fFree = aChunk;
      if (bFree == nChunk)
        bFree = aChunk;

      // setup aChunk as a new free Chunk
      PChunk(aChunk)->forward = fFree;
      PChunk(aChunk)->backward = bFree;
      fFree.map(this);
      if (PChunk(fFree) == 0) {
        STATUS("Arena::destroyChunk() fFree is null\n");
        return False;
      }
      bFree.map(this);
      if (PChunk(bFree) == 0) {
        STATUS("Arena::destroyChunk() bFree is null\n");
        return False;
      }
      // update freeList
      PChunk(bFree)->forward = PChunk(fFree)->backward = aChunk;
      if (nChunk == freeList) {
        freeList = aChunk;
      }

      setObjectFlag(aChunk, False, newSize);
      if (maximumFreeChunk < newSize)
        maximumFreeChunk = newSize;
    } else {
#ifdef DESTROYLOG
      cerr << "aaa " << "aChunk " << aChunk << endl;
#endif
      // no consolidation
      OffsetChunkPair fFree(0);
      OffsetChunkPair bFree(0);
      if (freeList == 0) {
        freeList = bFree = fFree = aChunk;
      } else {
        Window *window = windowTable[aChunk];
        if (window == 0) {
          STATUS("Arena::destroyChunk() windowTable[aChunk] is null\n");
          return False;
        }
        
        // find the nearest free Chunk
        bFree = findNearestFreeChunk(aChunk);
#ifdef DEBUG
        if (!isValidFreeChunk(bFree))
          breakpoint();
#endif
        if (bFree != 0) {
          if ((OffsetType)bFree < aChunk) {
            bFree.map(this);
            if (PChunk(bFree) == 0) {
              STATUS("Arena::destroyChunk() bFree is null\n");
              return False;
            }
            fFree = PChunk(bFree)->forward;
            if (bFree != fFree) {
              while ((OffsetType)bFree < fFree && 
                     (OffsetType)fFree < aChunk) {
                fFree.map(this);
                if (PChunk(fFree) == 0) {
                  STATUS("Arena::destroyChunk() fFree is null\n");
                  return False;
                }
                bFree = fFree;
                fFree = PChunk(fFree)->forward;
              }
            } 
          } else { 
            fFree = bFree;
            fFree.map(this);
            if (PChunk(fFree) == 0) {
              STATUS("Arena::destroyChunk() fFree is null\n");
              return False;
            }
            bFree = PChunk(fFree)->backward;
            if (bFree != fFree) {
              while ((OffsetType)bFree < fFree && 
                     (OffsetType)bFree > aChunk) {
                bFree.map(this);
                if (PChunk(bFree) == 0) {
                  STATUS("Arena::destroyChunk() bFree is null\n");
                  return False;
                }
                fFree = bFree;
                bFree = PChunk(bFree)->backward;
              }
            } 
          }
        } else {
          freeList = bFree = fFree = aChunk;
        }
      }
      fFree.map(this);
      if (PChunk(fFree) == 0) {
        STATUS("Arena::destroyChunk() fFree is null\n");
        return False;
      }
      bFree.map(this);
      if (PChunk(bFree) == 0) {
        STATUS("Arena::destroyChunk() bFree is null\n");
        return False;
      }

      PChunk(nChunk)->clearPrevInUse();
      PChunk(aChunk)->forward = fFree;
      PChunk(aChunk)->backward = bFree;
      PChunk(fFree)->backward = PChunk(bFree)->forward = aChunk;

      
      setObjectFlag(aChunk, False, 0);
      if (maximumFreeChunk < PChunk(aChunk)->getSize())
        maximumFreeChunk = PChunk(aChunk)->getSize();
    }
  } else {
    // pChunk is not in use
    OffsetChunkPair pChunk(aChunk.getPrevChunk(this));
    pChunk.map(this);
    if (PChunk(pChunk) == 0) {
      STATUS("Arena::destroyChunk() pChunk is null\n");
      return False;
    }
    OffsetChunkPair nChunk(aChunk.getNextChunk(this));
    nChunk.map(this);
    if (PChunk(nChunk) == 0) {
      STATUS("Arena::destroyChunk() nChunk is null\n");
      return False;
    }
    if (nChunk != PChunk(pChunk)->forward) {
#ifdef DESTROYLOG
      cerr << "faa " << "pChunk" << pChunk
           << " aChunk " << aChunk << endl;
#endif
      // nChunk is in use
      // consolidate with pChunk
      SizeType newSize = 
        PChunk(pChunk)->getSize() + PChunk(aChunk)->getSize();
      // remove aChunk
      if (!removeObject(aChunk)) {
        STATUS("Arena::destroyChunk() removeObject(aChunk) failed\n");
        return False;
      }
      PChunk(pChunk)->setSize(newSize);
      PChunk(nChunk)->setPrevSize(newSize);
      PChunk(nChunk)->clearPrevInUse();


      // update fragmentation flag of pChunk
      setObjectFlag(pChunk, False, newSize);
      if (maximumFreeChunk < newSize)
        maximumFreeChunk = newSize;
    } else {
#ifdef DESTROYLOG
      cerr << "faf " << "pChunk " << pChunk
           << " aChunk " << aChunk 
           << " nChunk " << nChunk << endl;
#endif
      // nChunk in not in use
      // consolidate with pChunk and nChunk
      OffsetChunkPair nnChunk(nChunk.getNextChunk(this));
      nnChunk.map(this);
      if (PChunk(nnChunk) == 0) {
        STATUS("Arena::destroyChunk() nnChunk is null\n");
        return False;
      }
      OffsetChunkPair nfChunk(PChunk(nChunk)->forward);
      nfChunk.map(this);
      if (PChunk(nfChunk) == 0) {
        STATUS("Arena::destroyChunk() nfChunk is null\n");
        return False;
      }
      SizeType newSize = 
        PChunk(pChunk)->getSize() + 
        PChunk(aChunk)->getSize() + 
        PChunk(nChunk)->getSize();
      if (!removeObject(aChunk)) {
        STATUS("Arena::destroyChunk() removeObject(aChunk) failed\n");
        return False;
      }
      if (!removeObject(nChunk)) {
        STATUS("Arena::destroyChunk() removeObject(nChunk) failed\n");
        return False;
      }
      PChunk(pChunk)->setSize(newSize);
      PChunk(nnChunk)->setPrevSize(newSize);
      PChunk(nnChunk)->clearPrevInUse();
      PChunk(pChunk)->forward = nfChunk;
      PChunk(nfChunk)->backward = pChunk;
      if (freeList == nChunk) {
        freeList = pChunk;
      }
      // update fragmentation flag of pChunk
      setObjectFlag(pChunk, False, newSize);
      if (maximumFreeChunk < newSize)
        maximumFreeChunk = newSize;
    }
  }
  updateInitialBlock();
  return True;
} // Arena::destroyChunk()


BoolType Arena::insertObject(OffsetType offset, BoolType inUse) {
  SizeType tableSize =
    windowTable.getSize();
  SizeType index = 
    windowTable.searchByKey(offset | PersistentOffset::fragmentedMask);

  Window *window1 = 0;
  Window *window2 = 0;
  if (0 <= index && index < tableSize) {
    window1 = windowTable(index);
    ComparisonType cmp = 
      window1->compare(offset | PersistentOffset::fragmentedMask);
    if (cmp > 0) {
      index--;
      window1 = windowTable(index);
    } else if (cmp < 0) {
      index++;
      window1 = windowTable(index);
    }
  }
  if (0 <= index - 1 && index - 1 < tableSize) {
    window2 = windowTable(index - 1);
    if (!window2->isWithinRange(offset))
      window2 = 0;
  }
  if (window2 == 0 &&
      0 <= index + 1 && index + 1 < tableSize) {
    window2 = windowTable(index + 1);
    if (!window2->isWithinRange(offset))
      window2 = 0;
  }
  if (window1 != 0 && window2 != 0) {
    if (window1->objectTable.getSize() == 0) {
      Window *tmp = window2;
      window2 = window1;
      window1 = tmp;
    } else if (window2->objectTable.getSize() > 0) {
      if (window1->compare(*window2) > 0) {
        if (window1->objectTable(0).isFragmented()) {
          Window *tmp = window2;
          window2 = window1;
          window1 = tmp;
        }
      } else {
        if (!window2->objectTable(0).isFragmented()) {
          Window *tmp = window2;
          window2 = window1;
          window1 = tmp;
        }
      }
    }
  }
#ifdef DEBUG
  if (window1 == window2)
    breakpoint();
#endif
  OffsetChunkPair obj(offset);
  obj.map(window1);
  if (window1->objectTable.getSize() > 0) {
    BoolType fragmented = 
      !window1->isWithinRange((OffsetType)obj + PChunk(obj)->getSize() - 1);
    PersistentOffset po(offset, inUse, fragmented);
    window1->objectTable += po;
    window1->setDirty();
    if (window2 == 0) {
      return True;
    } else {
      if (window2->objectTable.getSize() > 0) {
        fragmented = 
          !(fragmented && 
            window2->isWithinRange((OffsetType)obj + 
                                   PChunk(obj)->getSize() - 1));
        PersistentOffset po2(offset, inUse, fragmented);
        window2->objectTable += po2;
        window2->setDirty();
        if (window2->objectTable[obj].isNull())
          return False;
        else 
          return True;
      } else {
        window2->initializeObjectTable(obj, 0, 0, this);
        window2->setDirty();
#ifdef DEBUG
        window2->printStatus();
#endif
        if (window2->objectTable[obj].isNull())
          return False;
        else 
          return True;
      }
    }
  } else {
    if (window2 == 0) {
      window1->initializeObjectTable(obj, 0, 0, this);
#ifdef DEBUG
      window1->printStatus();
#endif
      if (window1->objectTable[obj].isNull())
        return False;
      else 
        return True;
    } else { // should not come here
      return False;
    }
  }
}

BoolType Arena::removeObject(OffsetType offset) {
  SizeType tableSize =
    windowTable.getSize();
  SizeType index = 
    windowTable.searchByKey(offset | PersistentOffset::fragmentedMask);
  SizeType min = index;
  SizeType max = index;
  
  while (min >= 0) {
    if (!windowTable(min)->isWithinRange(offset))
      break;
    min--;
  }
  min++;
  while (max < tableSize) {
    if (!windowTable(max)->isWithinRange(offset)) 
      break;
    max++;
  }
  max--;

  BoolType success = True;
  for (index = min; index <= max; index++) {
    Window *window = windowTable(index);
    window->setDirty();
    if ((window->objectTable -= offset).isNull())
      success = False;
  }


#ifdef DEBUG
  if (!success) {
    cerr << "Arena::removeObject " << offset 
         << "failed" << endl;
    for (index = min; index <= max; index++) {
      cerr << "index " << index << " ";
      windowTable(index)->printStatus();
    }
    objectTableCheck();
  }
#endif

  return True; // success1 && success2;
}

BoolType Arena::setObjectFlag(OffsetType offset, 
                              BoolType inUse,
                              SizeType size) {
  SizeType tableSize =
    windowTable.getSize();
  SizeType index = 
    windowTable.searchByKey(offset | PersistentOffset::fragmentedMask);

  SizeType min = index;
  SizeType max = index;

  while (min >= 0) {
    if (!windowTable(min)->isWithinRange(offset))
      break;
    min--;
  }
  min++;
  while (max < tableSize) {
    if (!windowTable(max)->isWithinRange(offset)) 
      break;
    max++;
  }
  max--;

  BoolType success = True;
  BoolType cleared = False;
  SizeType candidate = -1;
  for (index = min; index <= max; index++) {
    Window *window = windowTable(index);
    window->setDirty();
    PersistentOffset &po = window->objectTable[offset];
    if (po.isNull()) {
      success = False;
      continue;
    }
    if (inUse) 
      po.setInUse();
    else
      po.clearInUse();
    if (size > 0) {
      BoolType complete = window->isWithinRange(offset + size - 1);
      if (!complete) {
        po.setFragmented();
      } else {
        if (po.isFragmented()) {
          ComparisonType cmp = window->compare(offset);
          if (!cleared)
            if (cmp >= 0) {
              po.clearFragmented();
              cleared = True;
            } else {
              candidate = index;
            } 
        } else {
          if (cleared)
            po.setFragmented();
          else 
            cleared = True;
        }
      }
    } else {
      cleared = True;
    }
  }
  if (!cleared && candidate >= 0) {
    windowTable(candidate)->objectTable[offset].clearFragmented();
    cleared = True;
  }

#ifdef DEBUG
  if (!cleared || max > min)
    if (!(max == min && 
          !windowTable(min)->isWithinRange(offset + size - 1)))
      breakpoint();
  if (!success) {
    cerr << "Arena::setObjectFlag " << offset 
         << "failed" << endl;
    for (index = min; index <= max; index++) {
      cerr << "index " << index << " ";
      windowTable(index)->printStatus();
    }
    objectTableCheck();
  }
#endif
  return success;
}

BoolType Arena::isValidFreeChunk(OffsetType offset) {
  OffsetChunkPair o(offset);
  if (o.map(this) == 0)
    return False;
  OffsetChunkPair f(PChunk(o)->forward);
  if (f == 0)
    return False;
  if (f.map(this) == 0)
    return False;
  if (PChunk(f)->backward == o)
    return True;
  else
    return False;
}

OffsetType Arena::findNearestFreeChunk(OffsetType hint) {
  if (freeList == 0)
    return 0;
  if (hint == 0)
    return freeList;
  SizeType wtableSize = windowTable.getSize();
  SizeType windex = windowTable.searchByKey(hint);
  OffsetType fChunk = 0;
  Window *window;
  if (wtableSize > 0) {
    window = windowTable(windex);
    SizeType tableSize;
    SizeType index;
    if (window->isWithinRange(hint)) {
      tableSize = window->objectTable.getSize();
      index = window->objectTable.searchByKey(hint);
      while (0 <= index && index < tableSize) {
        if (!window->objectTable(index).isInUse()) {
          fChunk = window->objectTable(index).getOffset();
          if (fChunk != topChunk)
            return fChunk;
        }
        index--;
      }
      index = window->objectTable.searchByKey(hint);
      while (0 <= index && index < tableSize) {
        if (!window->objectTable(index).isInUse()) {
          fChunk = window->objectTable(index).getOffset();
          if (fChunk != topChunk)
            return fChunk;
        }
        index++;
      }
    }
    // search backwards
    SizeType windex2 = windex; // backup
    while (0 <= windex && windex < wtableSize) {
      window = windowTable(windex);
      tableSize = window->objectTable.getSize();
      if (tableSize > 0) {
        index = tableSize - 1;
        while (0 <= index && index < tableSize) {
          if (!window->objectTable(index).isInUse()) {
            fChunk = window->objectTable(index).getOffset();
            if (fChunk != topChunk) 
              return fChunk;
          }
          index--;
        }
      }
      windex--;
    }
    // search forwards
    windex = windex2 + 1;
    while (0 <= windex && windex < wtableSize) {
      window = windowTable(windex);
      tableSize = window->objectTable.getSize();
      if (tableSize > 0) {
        index = 0;
        while (0 <= index && index < tableSize) {
          if (!window->objectTable(index).isInUse()) {
            fChunk = window->objectTable(index).getOffset();
            if (fChunk != topChunk) 
              return fChunk;
          }
          index++;
        }
      }
      windex++;
    }
    // there is no mapped freeChunk
    return freeList;
  } else {
    return freeList;
  }
}

PersistentOffset &Arena::getObject(OffsetType offset, 
                            BoolType fragmented = False) {
  Window *window = 
    windowTable[offset | 
               (fragmented ? PersistentOffset::fragmentedMask : 0)];
  if (window == 0)
    return PersistentOffset::nullObject;
  if (fragmented) {
    PersistentOffset &po = window->objectTable[offset];
    if (po.isFragmented()) {
      window->setDirty();
      return po;
    }
    SizeType index = windowTable.searchByKey(offset);
    if (index - 1 >= 0) {
      window = windowTable(index - 1);
      if (window->isWithinRange(offset)) {
        PersistentOffset &po2 = window->objectTable[offset];
        if (po2.isFragmented()) {
          window->setDirty();
          return po2;
        } else { 
          return PersistentOffset::nullObject;
        }
      }
    }
    if (index + 1 < windowTable.getSize()) {
      window = windowTable(index + 1);
      if (window->isWithinRange(offset)) {
        PersistentOffset &po3 = window->objectTable[offset];
        if (po3.isFragmented()) {
          window->setDirty();
          return po3;
        } else {
          return PersistentOffset::nullObject;
        }
      }
    }
    return PersistentOffset::nullObject;
  } else {
    PersistentOffset &po4 = window->objectTable[offset];
    if (!po4.isFragmented()) {
      window->setDirty();
      return po4;
    } else {
      return PersistentOffset::nullObject;
    }
  }
}

Window *Arena::getWindow(OffsetType offset, 
                  BoolType fragmented = False) {
  Window *window = 
    windowTable[offset | 
               (fragmented ? PersistentOffset::fragmentedMask : 0)];
  if (window == 0)
    return 0;
  if (fragmented) {
    PersistentOffset &po = window->objectTable[offset];
    if (po.isFragmented())
      return window;
    SizeType index = windowTable.searchByKey(offset);
    if (index - 1 >= 0) {
      window = windowTable(index - 1);
      if (window->isWithinRange(offset)) {
        PersistentOffset &po2 = window->objectTable[offset];
        if (po2.isFragmented())
          return window;
      }
    }
    if (index + 1 < windowTable.getSize()) {
      window = windowTable(index + 1);
      if (window->isWithinRange(offset)) {
        PersistentOffset &po3 = window->objectTable[offset];
        if (po3.isFragmented())
          return window;
      }
    }
    return 0;
  } else {
    PersistentOffset &po4 = window->objectTable[offset];
    if (!po4.isFragmented())
      return window;
    else 
      return 0;
  }
}

BoolType Arena::updateFragmentedFlags(SizeType windowIndex) {
  Window *window = windowTable(windowIndex);
  if (window == 0)
    return True;
  SizeType tableSize = window->objectTable.getSize();
  if (tableSize == 0)
    return True;
  SizeType index = 0;
  Window *before = windowTable(windowIndex - 1);
  Window *after = windowTable(windowIndex + 1);
  if (before != 0 && window->isSeparatedFrom(*before))
    before = 0;
  if (after != 0 && window->isSeparatedFrom(*after))
    after = 0;
  OffsetType nextOffset;
  for (index = 0; index < tableSize; index++) {
    PersistentOffset &po = window->objectTable(index);
    if (index < tableSize - 1) {
      nextOffset = 
        window->objectTable(index + 1).getOffset();
    } else {
      nextOffset = 
        po.getOffset() + window->map(po.getOffset())->getSize();
    }
    BoolType complete = window->isWithinRange(nextOffset - 1);
    BoolType fragFlag = po.isFragmented();
    Window *overlap = 0;
    if (before != 0 &&
        before->isWithinRange(po.getOffset())) {
      overlap = before;
    } else if (after != 0 &&
               after->isWithinRange(po.getOffset())) {
      overlap = after;
    }
    
    if (overlap != 0) {
      if (complete) {
        BoolType fragFlagAnother = 
          overlap->objectTable[po.getOffset()].isFragmented();
        if (fragFlagAnother) {
          if (fragFlag)
            po.clearFragmented();
        } else {
          if (!fragFlag)
            po.setFragmented();
        }
      } else {
        if (!fragFlag)
          po.setFragmented();
      }
    } else {
      if (complete) {
        if (fragFlag)
          po.clearFragmented();
      } else {
        if (!fragFlag)
          po.setFragmented();
      }
    }
  }
  return True;
}

BoolType Arena::removeRedundantWindow(OffsetType offset) {
  SizeType tableSize = windowTable.getSize();
  SizeType windowIndex = windowTable.searchByKey(offset);
  if (windowIndex < tableSize && isRedundant(windowIndex)) {
#ifdef DEBUG
    cerr << "Arena::removeRedundantWindow(" << offset << ")\n";
#endif
    Window *window = windowTable(windowIndex);
    if (False || window->isDirty()) {
      window->sync();
      window->clearDirty();
    }
    if ((windowTable -= window) == 0) {
      return False;
    }
    delete window;
    tableSize = windowTable.getSize();
    windowIndex = windowTable.searchByKey(offset);
    updateFragmentedFlags(windowIndex);
    updateFragmentedFlags(windowIndex - 1);
    updateFragmentedFlags(windowIndex + 1);
  } 
  return True;
}

BoolType Arena::isRedundant(SizeType windowIndex) {
  Window *window = windowTable(windowIndex);
  BoolType allRedundant = True;
  SizeType index = 0;
  OffsetType off;
  OffsetType nextOffset;
  OffsetType tableSize = window->objectTable.getSize();

  if (window == 0)
    return False; // no such window
  if (!window->isCompletelyReleased())
    return False; // some objects are grabbed
  while (allRedundant && index < tableSize) { 
    PersistentOffset &po = window->objectTable(index);
    off = po.getOffset();
    if (!po.isFragmented()) {
      if (index + 1 < tableSize) {
        nextOffset = 
          window->objectTable(index + 1).getOffset();
      } else {
        nextOffset =
          off + window->map(off)->getSize();
      }
      if (!(windowTable(windowIndex - 1) != 0 && 
            windowTable(windowIndex - 1)->isWithinRange(off) &&
            windowTable(windowIndex - 1)->isWithinRange(nextOffset - 1)))
        if (!(windowTable(windowIndex + 1) != 0 && 
              windowTable(windowIndex + 1)->isWithinRange(off) && 
              windowTable(windowIndex + 1)->isWithinRange(nextOffset - 1)))
          allRedundant = False; // found a complete but replaceable chunk
    } else {
      if (!(windowTable(windowIndex - 1) != 0 && 
            windowTable(windowIndex - 1)->isWithinRange(off))) 
        if (!(windowTable(windowIndex + 1) != 0 && 
              windowTable(windowIndex + 1)->isWithinRange(off))) 
          allRedundant = False; // found a non-overlapped chunk
    }
    index++;
  }
  return allRedundant;
}

BoolType Arena::updateInitialBlock(BoolType sync = False) {
  ArenaInitialBlock *aib = (ArenaInitialBlock *)masterWindow.getAddr();
  if (aib ==0)
    return False;
  aib->arenaSize = arenaSize;
  aib->topChunk = topChunk;
  aib->freeList = freeList;
  if (sync) 
    masterWindow.sync();
  return True;
}



/*
  grow -- expand the size of the Arena storage

  parameter: none
  
  return value:
  BoolType -- True if successful
 */
BoolType Arena::grow() {
  if (arenaID > 0) { // persistent arena
    RawFile storage(masterWindow.getFd(), masterWindow.getFileName());
    if (arenaSize + PersistentStorage::growingChunkSize > 
        PersistentStorage::maxArenaSize) 
      return False;
    if (storage.getSize() != arenaSize)
      return False;
    OffsetType tail = storage.seek(arenaSize, SEEK_SET);
    if (tail != arenaSize)
      return False;
    Chunk *tc = 0;
    if (topChunk != 0) {
      tc = map(topChunk);
      if (tc == 0) 
        return False;
      if (topChunk + tc->getSize() != arenaSize)
        return False;
    } else {
      if (arenaSize != PersistentStorage::pageSize)
        return False;
    }
    char buf[PersistentStorage::pageSize];
    memset(buf, 0, PersistentStorage::pageSize);
    OffsetType grownArenaSize = 
      arenaSize + PersistentStorage::growingChunkSize;
    for (SizeType bytes = 0; tail < grownArenaSize; tail += bytes) {
      bytes = storage.write(buf, PersistentStorage::pageSize);
      if (bytes != PersistentStorage::pageSize) {
        if (bytes > 0) 
          tail += bytes;
        break;
      }
    }
    tail &= ~(sizeof(Chunk) - 1);
    if (arenaSize == tail)
      return False;
    if (storage.seek(tail, SEEK_SET) != tail)
      return False;
    if (topChunk != 0) {
      tc->setSize(tc->getSize() + (tail - arenaSize));
      arenaSize = tail;
    } else {
      Window *window = new Window(masterWindow, 
                                  PersistentStorage::pageSize,
                                  PersistentStorage::pageSize);
      tc = new(window->getAddr()) Chunk(0, tail - arenaSize, True);
      window->sync();
      arenaSize = tail;
      delete window;
      topChunk = PersistentStorage::pageSize;
    }
    ArenaInitialBlock *aib = (ArenaInitialBlock *)masterWindow.getAddr();
    if (aib == 0) 
      return False;
    aib->arenaSize = arenaSize;
    aib->topChunk = topChunk;
    aib->freeList = freeList;
    return True;
  } else if (arenaID == 0) { // non-persistent arena
    OffsetType tail = arenaSize;
    // size checking is missing
    Chunk *tc = 0;
    OffsetType lastFreeChunk = 0;
    Chunk *fChunk = 0;
    if (topChunk != 0) {
      tc = map(topChunk);
      if (tc == 0)
        return False;
      if (freeList != 0) {
        OffsetType f = freeList;
        fChunk = map(f);
        for (; fChunk->forward > f; f = fChunk->forward, fChunk = map(f))
          ;
        lastFreeChunk = f;
      }
    } 
    Window *window = 
      new Window(masterWindow,
                 tail,
                 PersistentStorage::standardWindowSizeNonPersistent,
                 0, 0, 0, this);
    Chunk *dummy = new(window->getAddr()) 
      Chunk((tc != 0 ? tc->getSize() : 0), 
            sizeof(Chunk),
            (tc != 0 ? False : True),
            0, 0, True);
    Chunk *newTopChunk = new(window->getAddr() + dummy->getSize())
      Chunk(dummy->getSize(),
            PersistentStorage::standardWindowSizeNonPersistent - 
            dummy->getSize(), 
            True);
    if (topChunk != 0) {
      if (lastFreeChunk != 0) {
        tc->backward = lastFreeChunk;
        tc->forward = fChunk->forward;
        fChunk->forward = topChunk;
      } else {
        tc->backward = topChunk;
        tc->forward = topChunk;
        freeList = topChunk;
      }
    }
    arenaSize = tail + PersistentStorage::standardWindowSizeNonPersistent;
    windowTable += window;
    topChunk = tail + dummy->getSize();
    window->initializeObjectTable(tail, 0, 0, this);
#ifdef DEBUG
    window->printStatus();
#endif
    Chunk *newTopChunk2 = map(topChunk | PersistentOffset::fragmentedMask);
    if (newTopChunk2 != newTopChunk)
      return False;
    return True;
  } else { // arenaID < 0
    return False;
  }
}




// member functions of Window

// constructor
Window::Window(Window &w, 
               OffsetType off, 
               SizeType len, 
               OffsetType obj,
               Window *before,
               Window *after,
               Arena *arena,
               SizeType minimumChunkSize):
  MMap(w), // inherit the file descriptor of masterWindow
  dirty(False),
  totalGrabCount(0), // initialize the counters
  totalGrabCountReadOnly(0),
  objectTable((len / minimumChunkSize) + 1) {
    // map 
    if (MMap::map(off, len) == 0) { 
      totalGrabCount = -1;
      return;
    }
#ifdef DEBUG
    cerr << "Window::Window(offset " << off 
         << ", length " << len
         << ", object " << obj << ")" << endl;
#endif
    // construct objectTable with the hint
    if (obj != 0 &&
        !initializeObjectTable(obj, before, after, arena)) {
      totalGrabCount = -1;
      return;
    }
    
    PersistentStorage *s = PersistentStorage::getThePS();
    if (s != 0) {
      s->notifyMapping(len, this);
    }
#ifdef DEBUG
    // printStatus();
#endif
}

// destructor
Window::~Window() {
  objectTable.removeAll();
  PersistentStorage *s = PersistentStorage::getThePS();
  if (s != 0) {
    s->notifyUnmapping(getLength(), this);
  }
}


void Window::printStatus() {
  SizeType size;
  ostream &os = cerr;
  os << "Window {\n  begin " << getOffset()
     << "\n  length " << getLength()
     << "\n  end " << getOffset() + getLength()
     << "\n  objectTable {\n  size " << (size = objectTable.getSize())
       << "\n";
  SizeType index;
  OffsetType o;
  BoolType consistent = True;
  SizeType psize = 0;
  for (index = 0; index < size; index++) {
    o = objectTable(index).getOffset();
    Chunk *chunk = map(o);
    if (chunk == 0) {
      os << "  offset " << o << " chunk null "
         << (objectTable(index).isInUse() ? "Used " : "Free ") 
         << (objectTable(index).isFragmented() ? 
             "Frag " : "Comp ") 
         << (objectTable(index).isGrabbed() ? 
             "Grab" : "Rel ") 
         << " Count " 
         << objectTable(index).getGrabCountReadOnly()
         << "\n";
    } else {
      os << "  offset " << o 
         << " size " << chunk->getSize()
         << " pSize " << chunk->getPrevSize()
         << " pIU " << (chunk->isPrevInUse() ? "True  " : "False ")
         << (objectTable(index).isInUse() ? "Used " : "Free ") 
         << (objectTable(index).isFragmented() ? 
             "Frag " : "Comp ") 
         << (objectTable(index).isGrabbed() ? 
             "Grab" : "Rel ") 
         << " Count " 
         << objectTable(index).getGrabCountReadOnly()
         << "\n";
    }
    if (chunk != 0 && psize != 0 && psize != chunk->getPrevSize()) 
      consistent = False;
    psize = chunk->getSize();
    if (psize == 0)
      consistent = False;
  }
  os << "} // Window\n";
#ifdef DEBUG
  if (!consistent)
    breakpoint();
#endif
}


BoolType Window::initializeObjectTable(OffsetType obj, 
                                       Window *before, 
                                       Window *after,
                                       Arena *arena) {
#ifdef DEBUG
  cerr << "Window::initializeObjectTable(object " << obj << ")" << endl;
#endif
  // get the neighbors with the hint arena
  if (arena != 0 && before == 0 && after == 0) {
    SizeType tableSize = arena->windowTable.getSize();
    if (tableSize > 0) {
      SizeType index = arena->windowTable.searchByElement(this);
      Window *self = arena->windowTable(index);
      if (self == this) { // 'this' is in windowTable 
        // 0 is assgined when the index is illegal
        before = arena->windowTable(index - 1);
        after = arena->windowTable(index + 1);
      } else {
        // 0 is assgined when the index is illegal
        before = arena->windowTable(index);
        after = arena->windowTable(index + 1);
      }    
    }  
  }
  // boundaries
  OffsetType begin = getOffset();
  OffsetType end = begin + getLength();
  if (!(begin <= obj && obj < end)) { // obj must be within the range
    cerr << "Window::initializeObjectTable() invalid hint obj\n"
         << "begin: " << begin << " end: " << end << " hint: " << obj
         << endl;
    return False;
  }
  
  // current objectTable status
  SizeType objects = objectTable.getSize();
  if (True || objects == 0) { // complete initialization required
    if (objects > 0) 
      objectTable.removeAll();
    OffsetChunkPair hint(obj);
    hint.map(this);
    { // check if the hint is valid
      OffsetChunkPair neighbor;
      if (isWithinRange(neighbor = 
                        (OffsetType)hint - 
                        PChunk(hint)->getPrevSize())) {
        // previous chunk is within the range
        neighbor.map(this);
        if (PChunk(hint)->getPrevSize() != 0 &&
            PChunk(neighbor)->getSize() != PChunk(hint)->getPrevSize()) {
#ifdef DEBUG
          cerr << "Window::initializeObjectTable() invalid hint obj\n"
               << "begin: " << begin << " end: " << end << " hint: " << obj
               << endl;
#endif
          return False;
        }
      } else if (isWithinRange(neighbor = 
                               (OffsetType)hint + 
                               PChunk(hint)->getSize())) {
        // next chunk is within the range
        neighbor.map(this);
        if (PChunk(neighbor)->getPrevSize() != PChunk(hint)->getSize()) {
#ifdef DEBUG
          cerr << "Window::initializeObjectTable() invalid hint obj\n"
               << "begin: " << begin << " end: " << end << " hint: " << obj
               << endl;
#endif
          return False;
        }
      } else {
        // more strict checking is desirable
      }
    } // the hint obj is valid
    // trace to the last object in the range
    OffsetChunkPair o = hint;
    OffsetChunkPair last = o;
    while (isWithinRange(o)) {
      last = o;
      last.map(this);
      o = (OffsetType)last + PChunk(last)->getSize();
    }
    // 'last' is mapped
    // whether 'last' Chunk is in use or not is unknown
    BoolType lastChunkUsageUnknown = True;
    BoolType inUse = True;
    OffsetChunkPair prev;
    o = last;
    while (isWithinRange(o)) {
      PersistentOffset po(o, inUse);
      objectTable += po; // add new object in objectTable
      o.map(this);
      if (lastChunkUsageUnknown && !inUse) { // find a free Chunk
        lastChunkUsageUnknown = False;
        if (PChunk(o)->forward == last) { // next free Chunk is 'last'
          objectTable[last].clearInUse();
        } 
      }
      if (PChunk(o)->getPrevSize() == 0)
        break;
      prev = (OffsetType)o - PChunk(o)->getPrevSize();
      inUse = PChunk(o)->isPrevInUse();
      o = prev;
    }
    // detect the usage of the last Chunk
    if (lastChunkUsageUnknown) {
      // trying various hints
      if (lastChunkUsageUnknown && 
          arena != 0 && arena->topChunk == last) {
        // 'last' Chunk is the topChunk
        lastChunkUsageUnknown = False;
        objectTable[last].clearInUse();
      }  
      if (lastChunkUsageUnknown && arena != 0) {
        if (arena->freeList == last) {
          // 'last' Chunk is the freeList
          lastChunkUsageUnknown = False;
          objectTable[last].clearInUse();
        } else if (arena->freeList == 0) {
          lastChunkUsageUnknown = False;
          objectTable[last].setInUse();
        }         
      }  
      if (lastChunkUsageUnknown && before != 0) {
        SizeType tableSize = before->objectTable.getSize();
        if (tableSize > 0) {
          SizeType index = tableSize - 1;
          while (lastChunkUsageUnknown && index >= 0) {
            PersistentOffset &po = before->objectTable(index);
            if (!po.isInUse()) {
              OffsetChunkPair f(po.getOffset());
              f.map(before);
              if (PChunk(f)->forward == last) {
                lastChunkUsageUnknown = False;
                objectTable[last].clearInUse();
              } else if (PChunk(f)->forward > last) {
                lastChunkUsageUnknown = False;
                objectTable[last].setInUse();
              }
              break;
            }
            index--;
          }
        }
      }
      if (lastChunkUsageUnknown && after != 0) {
        SizeType tableSize = after->objectTable.getSize();
        if (tableSize > 0) {
          PersistentOffset &po = after->objectTable[last];
          if (!po.isNull()) {
            lastChunkUsageUnknown = False;
            if (!po.isInUse()) {
              objectTable[last].clearInUse();
            } else {
              objectTable[last].setInUse();
            }         
          } else {
            SizeType index = 0;
            while (lastChunkUsageUnknown && index < tableSize) {
              if (!after->objectTable(index).isInUse()) {
                OffsetChunkPair f(after->objectTable(index)
                                  .getOffset());
                f.map(after);
                if (PChunk(f)->backward == last) {
                  lastChunkUsageUnknown = False;
                  objectTable[last].clearInUse();
                } else if (PChunk(f)->backward < last) {
                  lastChunkUsageUnknown = False;
                  objectTable[last].setInUse();
                }
                break;
              }
              index++;
            }
          }
        }
      }
      // remaining hints require Window mapping
      if (lastChunkUsageUnknown) {
        OffsetChunkPair nextOfLast((OffsetType)last + 
                                   PChunk(last)->getSize());
        nextOfLast.map(arena);
        if (PChunk(nextOfLast) == 0) {
#ifdef DEBUG
          STATUS("Window::initializeObjectTable next of last chunk is null");
#endif
          return False;
        }
        if (PChunk(nextOfLast)->isPrevInUse()) {
          lastChunkUsageUnknown = False;
          objectTable[last].setInUse();
        } else {
          lastChunkUsageUnknown = False;
          objectTable[last].clearInUse();
        }
      }
      // gave up 
#ifdef DEBUG
      if (lastChunkUsageUnknown) {
        cerr << "Window::initializeObjectTable()\n"
          "  giving up the detection of the last Chunk usage" << endl;
      }
#endif
    }
    // fragmentation check
    if (before != 0 && !isSeparatedFrom(*before)) {
      // overlapped with 'before' Window
      SizeType tableSize = objectTable.getSize();
      SizeType beforeTableSize = before->objectTable.getSize();
      OffsetType off;
      if (tableSize > 0 && beforeTableSize > 0) {
        SizeType index = beforeTableSize - 1;
        while (index > 0 && 
               before->objectTable(index).isFragmented()) {
          index--;
        }
        if (!before->objectTable(index).isFragmented()) {
          off = before->objectTable(index).getOffset();
          index = objectTable.searchByKey(off);
          if (objectTable(index).getOffset() == off) {
            while (index >= 0) {
              objectTable(index).setFragmented();
              index--;
            }
          }
        }
      }
    }
    if (after != 0 && !isSeparatedFrom(*after)) {
      // overlapped with 'after' Window
      SizeType tableSize = objectTable.getSize();
      SizeType afterTableSize = after->objectTable.getSize();
      if (tableSize > 0 && afterTableSize > 0) {
        SizeType index = 0;
        OffsetType off;
        while (index < afterTableSize && 
               (off = 
                after->objectTable(index).getOffset()) < 
               end) {
          if (!after->objectTable(index).isFragmented()) {
            objectTable[off].setFragmented();
          }
          index++;
        }
      }
    }
    if (!isWithinRange((OffsetType)last + PChunk(last)->getSize() - 1)) {
      objectTable[last].setFragmented();
    }
    return True;
  } else { // objects > 0; additional initialization required
    return False;
  }
}

// member functions of OffsetChunkPair

inline OffsetType OffsetChunkPair::getPrevChunk(Arena *arena, 
                                                BoolType fragmented) {
  if (arena == 0) 
    return 0;
  this->map(arena, fragmented);
  return offset - PChunk(*this)->getPrevSize();
}

inline OffsetType OffsetChunkPair::getNextChunk(Arena *arena, 
                                                BoolType fragmented) {
  if (arena == 0) 
    return 0;
  this->map(arena, fragmented);
  return offset + PChunk(*this)->getSize();
}

inline Chunk *OffsetChunkPair::map(Arena *arena, 
                                   BoolType fragmented, 
                                   BoolType anywhere) {
  if (chunk == 0) {
    chunk = arena->map(offset | 
                       (fragmented ? PersistentOffset::fragmentedMask : 0) |
                       (anywhere ? PersistentOffset::inUseMask : 0));
#ifdef DEBUG
    if (chunk != 0)
      if (!anywhere && chunk->getSize() == 0)
        breakpoint();
#endif
  }
  return chunk;
}

Chunk *OffsetChunkPair::map(Window *window) {
  if (chunk == 0) {
    chunk = window->map(offset);
  }
  return chunk;
}

// member functions of PersistentPointer

// redirect primitive operations to the PersistentStorage object

inline PointerType PersistentPointer::grab() {
  return PersistentStorage::getThePS()->grab(*this);
}

inline BoolType PersistentPointer::release() {
  return PersistentStorage::getThePS()->release(*this);
}

inline PointerType PersistentPointer::grabReadOnly() {
  return PersistentStorage::getThePS()->grabReadOnly(*this);
}

inline BoolType PersistentPointer::releaseReadOnly() {
  return PersistentStorage::getThePS()->releaseReadOnly(*this);
}



Mori Tetsuya / t2y3141592@gmail.com