| | |
| | | import java.nio.charset.Charset; |
| | | import java.nio.file.Files; |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | | import java.util.Collection; |
| | | import java.util.Comparator; |
| | | import java.util.HashMap; |
| | |
| | | import java.util.NavigableSet; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.Set; |
| | | import java.util.SortedMap; |
| | | import java.util.SortedSet; |
| | | import java.util.TimerTask; |
| | | import java.util.TreeMap; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Callable; |
| | | import java.util.concurrent.ConcurrentNavigableMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | |
| | | return new UnsupportedOperationException("Not implemented"); |
| | | } |
| | | |
| | | /** Concurrent {@link Set} implementation backed by a {@link ConcurrentHashMap}. */ |
| | | private static final class ConcurrentHashSet<E> implements Set<E> |
| | | { |
| | | private final ConcurrentHashMap<E, E> delegate = new ConcurrentHashMap<>(); |
| | | |
| | | @Override |
| | | public boolean add(E e) |
| | | { |
| | | return delegate.put(e, e) == null; |
| | | } |
| | | |
| | | @Override |
| | | public boolean addAll(Collection<? extends E> c) |
| | | { |
| | | boolean changed = false; |
| | | for (E e : c) |
| | | { |
| | | changed &= add(e); |
| | | } |
| | | return changed; |
| | | } |
| | | |
| | | @Override |
| | | public void clear() |
| | | { |
| | | delegate.clear(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean contains(Object o) |
| | | { |
| | | return delegate.containsKey(o); |
| | | } |
| | | |
| | | @Override |
| | | public boolean containsAll(Collection<?> c) |
| | | { |
| | | return delegateSet().containsAll(c); |
| | | } |
| | | |
| | | @Override |
| | | public boolean equals(Object o) |
| | | { |
| | | return delegateSet().equals(o); |
| | | } |
| | | |
| | | @Override |
| | | public int hashCode() |
| | | { |
| | | return delegateSet().hashCode(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean isEmpty() |
| | | { |
| | | return delegate.isEmpty(); |
| | | } |
| | | |
| | | @Override |
| | | public Iterator<E> iterator() |
| | | { |
| | | return delegateSet().iterator(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean remove(Object o) |
| | | { |
| | | return delegateSet().remove(o); |
| | | } |
| | | |
| | | @Override |
| | | public boolean removeAll(Collection<?> c) |
| | | { |
| | | return delegateSet().removeAll(c); |
| | | } |
| | | |
| | | @Override |
| | | public boolean retainAll(Collection<?> c) |
| | | { |
| | | return delegateSet().retainAll(c); |
| | | } |
| | | |
| | | @Override |
| | | public int size() |
| | | { |
| | | return delegate.size(); |
| | | } |
| | | |
| | | @Override |
| | | public Object[] toArray() |
| | | { |
| | | return delegateSet().toArray(); |
| | | } |
| | | |
| | | @Override |
| | | public <T> T[] toArray(T[] a) |
| | | { |
| | | return delegateSet().toArray(a); |
| | | } |
| | | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return delegateSet().toString(); |
| | | } |
| | | |
| | | private Set<E> delegateSet() |
| | | { |
| | | return delegate.keySet(); |
| | | } |
| | | } |
| | | |
| | | /** Data to put into id2entry tree. */ |
| | | private static final class Id2EntryData |
| | | { |
| | |
| | | private final FileChannel fileChannel; |
| | | private final List<Integer> bufferPositions = new ArrayList<>(); |
| | | /** TODO JNR offer configuration for this. */ |
| | | private int bufferSize = 1024; |
| | | private int bufferSize = 10 * MB; |
| | | |
| | | // FIXME this is not thread safe yet!!! |
| | | /** |
| | |
| | | * <p> |
| | | * This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the |
| | | * {@link #bufferSize}. |
| | | * <p> |
| | | * This code uses a {@link ConcurrentHashMap} instead of a {@link ConcurrentSkipListMap} because |
| | | * during performance testing it was found this code spent a lot of time in |
| | | * {@link ByteString#compareTo(ByteSequence)} when putting entries to the map. However, at this |
| | | * point, we only need to put very quickly data in the map, we do not need keys to be sorted. |
| | | * <p> |
| | | * Note: using {@link Set} here will be a problem with id2childrencount where values deduplication |
| | | * is not required. How to solve this problem? |
| | | */ |
| | | private ConcurrentNavigableMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentSkipListMap<>(); |
| | | private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>(); |
| | | /** Projected occupied disk for the data stored in {@link #inMemoryStore}. */ |
| | | private int maximumExpectedSizeOnDisk; |
| | | |
| | |
| | | int recordSize = INT_SIZE + key.length() + INT_SIZE + value.length(); |
| | | if (bufferSize < maximumExpectedSizeOnDisk + recordSize) |
| | | { |
| | | copyToDisk(); |
| | | flushToMappedByteBuffer(); |
| | | inMemoryStore.clear(); |
| | | maximumExpectedSizeOnDisk = 0; |
| | | } |
| | |
| | | Set<ByteSequence> values = inMemoryStore.get(key); |
| | | if (values == null) |
| | | { |
| | | values = new ConcurrentSkipListSet<>(); |
| | | values = new ConcurrentHashSet<>(); |
| | | Set<ByteSequence> existingValues = inMemoryStore.putIfAbsent(key, values); |
| | | if (existingValues != null) |
| | | { |
| | |
| | | maximumExpectedSizeOnDisk += recordSize; |
| | | } |
| | | |
| | | private void copyToDisk() throws IOException |
| | | private void flushToMappedByteBuffer() throws IOException |
| | | { |
| | | final SortedMap<ByteSequence, Set<ByteSequence>> sortedStore = new TreeMap<>(inMemoryStore); |
| | | |
| | | MappedByteBuffer byteBuffer = nextBuffer(); |
| | | for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : inMemoryStore.entrySet()) |
| | | for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : sortedStore.entrySet()) |
| | | { |
| | | ByteSequence key = mapEntry.getKey(); |
| | | // FIXME JNR merge values before put |
| | |
| | | |
| | | private MappedByteBuffer nextBuffer() throws IOException |
| | | { |
| | | // FIXME JNR bufferSize is an acceptable over approximation |
| | | return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), bufferSize); |
| | | // FIXME JNR when merging duplicate keys during phase one, |
| | | // maximumExpectedSizeOnDisk is an acceptable over approximation |
| | | return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), maximumExpectedSizeOnDisk); |
| | | } |
| | | |
| | | private int getLastPosition(List<Integer> l) |
| | |
| | | byteBuffer.putInt(b.length()); |
| | | // Need to do all of this because b.copyTo(byteBuffer) calls ByteBuffer.flip(). |
| | | // Why does it do that? |
| | | final int limitBeforeFlip = byteBuffer.limit(); |
| | | final int posBeforeFlip = byteBuffer.position(); |
| | | b.copyTo(byteBuffer); |
| | | byteBuffer.limit(bufferSize); |
| | | byteBuffer.limit(limitBeforeFlip); |
| | | byteBuffer.position(posBeforeFlip + b.length()); |
| | | } |
| | | |
| | | void flush() |
| | | void flush() throws IOException |
| | | { |
| | | flushToMappedByteBuffer(); |
| | | writeBufferIndexFile(); |
| | | } |
| | | |
| | |
| | | String treeName = "/" + file.getParentFile().getName() + "/" + file.getName(); |
| | | return getClass().getSimpleName() |
| | | + "(treeName=\"" + treeName + "\"" |
| | | + ", currentBuffer has " + inMemoryStore.size() + " record(s)" |
| | | + ", current buffer holds " + inMemoryStore.size() + " record(s)" |
| | | + " and " + (bufferSize - maximumExpectedSizeOnDisk) + " byte(s) remaining)"; |
| | | } |
| | | } |
| | |
| | | @Override |
| | | public void close() |
| | | { |
| | | for (Buffer buffer : treeNameToBufferMap.values()) |
| | | try |
| | | { |
| | | buffer.flush(); |
| | | for (Buffer buffer : treeNameToBufferMap.values()) |
| | | { |
| | | buffer.flush(); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | |
| | | ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); |
| | | scheduleAtFixedRate(timerService, new SecondPhaseProgressTask()); |
| | | |
| | | final Set<TreeName> treeNames = inStorage.listTrees(); // FIXME JNR rename to listTreeNames()? |
| | | final Set<TreeName> treeNames = inStorage.listTrees(); |
| | | ExecutorService dbService = Executors.newFixedThreadPool(treeNames.size()); |
| | | try (Importer importer = outStorage.startImport()) |
| | | { |
| | |
| | | // key conflicts == merge EntryIDSets |
| | | return new ImportIDSetsMerger(index); |
| | | } |
| | | else if (treeName.getIndexId().equals(ID2CHILDREN_COUNT_NAME)) |
| | | { |
| | | // key conflicts == sum values |
| | | // TODO JNR |
| | | } |
| | | else if (treeName.getIndexId().equals(DN2ID_INDEX_NAME) |
| | | || treeName.getIndexId().equals(DN2URI_INDEX_NAME) |
| | | || isVLVIndex(entryContainer, treeName)) |
| | |
| | | } |
| | | else if (values.size() == 1) |
| | | { |
| | | // Avoids unnecessary decoding + encoding |
| | | return values.iterator().next(); |
| | | } |
| | | |
| | | ImportIDSet idSet = new ImportIDSet(ByteString.empty(), EntryIDSet.newDefinedSet(), index.getIndexEntryLimit()); |
| | | for (ByteString value : values) |
| | | { |
| | | // FIXME JNR Can we make this more efficient? |
| | | // go through long[] + sort in the end? |
| | | idSet.merge(index.decodeValue(ByteString.empty(), value)); |
| | | } |
| | | return index.toValue(idSet); |
| | | return index.toValue(buildEntryIDSet(values)); |
| | | } |
| | | finally |
| | | { |
| | |
| | | values.clear(); |
| | | } |
| | | } |
| | | |
| | | private EntryIDSet buildEntryIDSet(Set<ByteString> values) |
| | | { |
| | | // accumulate in array |
| | | int i = 0; |
| | | long[] entryIDs = new long[index.getIndexEntryLimit()]; |
| | | for (ByteString value : values) |
| | | { |
| | | final EntryIDSet entryIDSet = index.decodeValue(ByteString.empty(), value); |
| | | if (!entryIDSet.isDefined() || i + entryIDSet.size() >= index.getIndexEntryLimit()) |
| | | { |
| | | // above index entry limit |
| | | return EntryIDSet.newUndefinedSet(); |
| | | } |
| | | |
| | | for (EntryID entryID : entryIDSet) |
| | | { |
| | | entryIDs[i++] = entryID.longValue(); |
| | | } |
| | | } |
| | | |
| | | // due to how the entryIDSets are built, there should not be any duplicate entryIDs |
| | | Arrays.sort(entryIDs); |
| | | return EntryIDSet.newDefinedSet(entryIDs); |
| | | } |
| | | } |
| | | |
| | | /** Task used to migrate excluded branch. */ |
| | |
| | | { |
| | | processDN2ID(suffix, entry.getName(), entryID); |
| | | } |
| | | |
| | | processDN2URI(suffix, entry); |
| | | processIndexes(suffix, entry, entryID); |
| | | processVLVIndexes(suffix, entry, entryID); |
| | | id2EntryPutTask.put(suffix, entryID, entry); |
| | | |
| | | importCount.getAndIncrement(); |
| | | } |
| | | |
| | |
| | | @Override |
| | | public boolean insert(final DN dn, final EntryID entryID) |
| | | { |
| | | final AtomicBoolean result = new AtomicBoolean(); |
| | | try |
| | | { |
| | | final AtomicBoolean result = new AtomicBoolean(); |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | |
| | | result.set(suffix.getDN2ID().insert(txn, dn, entryID)); |
| | | } |
| | | }); |
| | | return result.get(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | return result.get(); |
| | | } |
| | | |
| | | @Override |