From 8c65daf79d1c7fbe47f556c4d4bba2c2859851d1 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Mon, 21 Apr 2008 21:08:12 +0000
Subject: [PATCH] This patch adds index buffering capabilities to the JE backend as to avoid using a fixed lock timeout for subtree delete and mod DN operations. Previously, any index modifications to subordinate entries of the affected operations will be performed with dn2id and id2entry modifications. This creates multiple random access to index database keys which could cause deadlocks in face of multiple parallel operations. With this fix, all index modifications are buffered up until the end of the operation so that each key of each index will be accessed once and in order. This maintains the DB access ordering in the JE backend of dn2id, id2entry, dn2uri, indexes in config order, VLV indexes in config order, and finally id2children and id2subtree. Since deadlocks should no longer occur in the JE backend, JE lock timeouts are now disabled at the JE environment level instead of the txn level. With this change, the performance of subtree deletes and mod DN operations have increased dramatically.
---
opends/src/server/org/opends/server/backends/jeb/Index.java | 433 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 433 insertions(+), 0 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/jeb/Index.java b/opends/src/server/org/opends/server/backends/jeb/Index.java
index 829bfec..e81afe7 100644
--- a/opends/src/server/org/opends/server/backends/jeb/Index.java
+++ b/opends/src/server/org/opends/server/backends/jeb/Index.java
@@ -185,6 +185,55 @@
}
/**
+ * Add an add entry ID operation into a index buffer.
+ *
+ * @param buffer The index buffer to insert the ID into.
+ * @param keyBytes The index key bytes.
+ * @param entryID The entry ID.
+ * @return True if the entry ID is inserted or ignored because the entry limit
+ * count is exceeded. False if it already exists in the entry ID set
+ * for the given key.
+ */
+ public boolean insertID(IndexBuffer buffer, byte[] keyBytes,
+ EntryID entryID)
+ {
+ TreeMap<byte[], IndexBuffer.BufferedIndexValues> bufferedOperations =
+ buffer.getBufferedIndex(this);
+ IndexBuffer.BufferedIndexValues values = null;
+
+ if(bufferedOperations == null)
+ {
+ bufferedOperations = new TreeMap<byte[],
+ IndexBuffer.BufferedIndexValues>(comparator);
+ buffer.putBufferedIndex(this, bufferedOperations);
+ }
+ else
+ {
+ values = bufferedOperations.get(keyBytes);
+ }
+
+ if(values == null)
+ {
+ values = new IndexBuffer.BufferedIndexValues();
+ bufferedOperations.put(keyBytes, values);
+ }
+
+ if(values.deletedIDs != null && values.deletedIDs.contains(entryID))
+ {
+ values.deletedIDs.remove(entryID);
+ return true;
+ }
+
+ if(values.addedIDs == null)
+ {
+ values.addedIDs = new EntryIDSet(keyBytes, null);
+ }
+
+ values.addedIDs.add(entryID);
+ return true;
+ }
+
+ /**
* Insert an entry ID into the set of IDs indexed by a given key.
*
* @param txn A database transaction, or null if none is required.
@@ -378,6 +427,276 @@
}
/**
+ * Update the set of entry IDs for a given key.
+ *
+ * @param txn A database transaction, or null if none is required.
+ * @param key The database key.
+ * @param deletedIDs The IDs to remove for the key.
+ * @param addedIDs the IDs to add for the key.
+ * @throws DatabaseException If a database error occurs.
+ */
+ void updateKey(Transaction txn, DatabaseEntry key,
+ EntryIDSet deletedIDs, EntryIDSet addedIDs)
+ throws DatabaseException
+ {
+ OperationStatus status;
+ DatabaseEntry data = new DatabaseEntry();
+
+ // Handle cases where nothing is changed early to avoid
+ // DB access.
+ if(deletedIDs != null && deletedIDs.size() == 0 &&
+ (addedIDs == null || addedIDs.size() == 0))
+ {
+ return;
+ }
+
+ if(addedIDs != null && addedIDs.size() == 0 &&
+ (deletedIDs == null || deletedIDs.size() == 0))
+ {
+ return;
+ }
+
+
+ if(deletedIDs == null && addedIDs == null)
+ {
+ status = delete(txn, key);
+
+ if(status != OperationStatus.SUCCESS)
+ {
+ if(debugEnabled())
+ {
+ StringBuilder builder = new StringBuilder();
+ StaticUtils.byteArrayToHexPlusAscii(builder, key.getData(), 4);
+ TRACER.debugError("The expected key does not exist in the " +
+ "index %s.\nKey:%s", name, builder.toString());
+ }
+ }
+
+ return;
+ }
+
+ if(maintainCount)
+ {
+ for(int i = 0; i < phantomWriteRetires; i++)
+ {
+ if(updateKeyWithRMW(txn, key, data, deletedIDs, addedIDs) ==
+ OperationStatus.SUCCESS)
+ {
+ return;
+ }
+ }
+ }
+ else
+ {
+ status = read(txn, key, data, LockMode.READ_COMMITTED);
+ if(status == OperationStatus.SUCCESS)
+ {
+ EntryIDSet entryIDList =
+ new EntryIDSet(key.getData(), data.getData());
+
+ if (entryIDList.isDefined())
+ {
+ for(int i = 0; i < phantomWriteRetires; i++)
+ {
+ if(updateKeyWithRMW(txn, key, data, deletedIDs, addedIDs) ==
+ OperationStatus.SUCCESS)
+ {
+ return;
+ }
+ }
+ }
+ }
+ else
+ {
+ if(rebuildRunning || trusted)
+ {
+ if(deletedIDs != null)
+ {
+ if(debugEnabled())
+ {
+ StringBuilder builder = new StringBuilder();
+ StaticUtils.byteArrayToHexPlusAscii(builder, key.getData(), 4);
+ TRACER.debugError("The expected key does not exist in the " +
+ "index %s.\nKey:%s", name, builder.toString());
+ }
+ }
+ data.setData(addedIDs.toDatabase());
+
+ status = insert(txn, key, data);
+ if(status == OperationStatus.KEYEXIST)
+ {
+ for(int i = 1; i < phantomWriteRetires; i++)
+ {
+ if(updateKeyWithRMW(txn, key, data, deletedIDs, addedIDs) ==
+ OperationStatus.SUCCESS)
+ {
+ return;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private OperationStatus updateKeyWithRMW(Transaction txn,
+ DatabaseEntry key,
+ DatabaseEntry data,
+ EntryIDSet deletedIDs,
+ EntryIDSet addedIDs)
+ throws DatabaseException
+ {
+ OperationStatus status;
+
+ status = read(txn, key, data, LockMode.RMW);
+ if(status == OperationStatus.SUCCESS)
+ {
+ EntryIDSet entryIDList =
+ new EntryIDSet(key.getData(), data.getData());
+
+ if(addedIDs != null)
+ {
+ if(entryIDList.isDefined() && indexEntryLimit > 0)
+ {
+ long idCountDelta = addedIDs.size();
+ if(deletedIDs != null)
+ {
+ idCountDelta -= deletedIDs.size();
+ }
+ if(idCountDelta + entryIDList.size() >= indexEntryLimit)
+ {
+ if(maintainCount)
+ {
+ entryIDList = new EntryIDSet(entryIDList.size() + idCountDelta);
+ }
+ else
+ {
+ entryIDList = new EntryIDSet();
+ }
+ entryLimitExceededCount++;
+
+ if(debugEnabled())
+ {
+ StringBuilder builder = new StringBuilder();
+ StaticUtils.byteArrayToHexPlusAscii(builder, key.getData(), 4);
+ TRACER.debugInfo("Index entry exceeded in index %s. " +
+ "Limit: %d. ID list size: %d.\nKey:",
+ name, indexEntryLimit, idCountDelta + addedIDs.size(),
+ builder);
+
+ }
+ }
+ else
+ {
+ entryIDList.addAll(addedIDs);
+ if(deletedIDs != null)
+ {
+ entryIDList.deleteAll(deletedIDs);
+ }
+ }
+ }
+ else
+ {
+ entryIDList.addAll(addedIDs);
+ if(deletedIDs != null)
+ {
+ entryIDList.deleteAll(deletedIDs);
+ }
+ }
+ }
+ else if(deletedIDs != null)
+ {
+ entryIDList.deleteAll(deletedIDs);
+ }
+
+ byte[] after = entryIDList.toDatabase();
+ if (after == null)
+ {
+ // No more IDs, so remove the key. If index is not
+ // trusted then this will cause all subsequent reads
+ // for this key to return undefined set.
+ return delete(txn, key);
+ }
+ else
+ {
+ data.setData(after);
+ return put(txn, key, data);
+ }
+ }
+ else
+ {
+ if(rebuildRunning || trusted)
+ {
+ if(deletedIDs != null)
+ {
+ if(debugEnabled())
+ {
+ StringBuilder builder = new StringBuilder();
+ StaticUtils.byteArrayToHexPlusAscii(builder, key.getData(), 4);
+ TRACER.debugError("The expected key does not exist in the " +
+ "index %s.\nKey:%s", name, builder.toString());
+ }
+ }
+ data.setData(addedIDs.toDatabase());
+ return insert(txn, key, data);
+ }
+ else
+ {
+ return OperationStatus.SUCCESS;
+ }
+ }
+ }
+
+ /**
+ * Add an remove entry ID operation into a index buffer.
+ *
+ * @param buffer The index buffer to insert the ID into.
+ * @param keyBytes The index key bytes.
+ * @param entryID The entry ID.
+ * @return True if the entry ID is inserted or ignored because the entry limit
+ * count is exceeded. False if it already exists in the entry ID set
+ * for the given key.
+ */
+ public boolean removeID(IndexBuffer buffer, byte[] keyBytes,
+ EntryID entryID)
+ {
+ TreeMap<byte[], IndexBuffer.BufferedIndexValues> bufferedOperations =
+ buffer.getBufferedIndex(this);
+ IndexBuffer.BufferedIndexValues values = null;
+
+ if(bufferedOperations == null)
+ {
+ bufferedOperations = new TreeMap<byte[],
+ IndexBuffer.BufferedIndexValues>(comparator);
+ buffer.putBufferedIndex(this, bufferedOperations);
+ }
+ else
+ {
+ values = bufferedOperations.get(keyBytes);
+ }
+
+ if(values == null)
+ {
+ values = new IndexBuffer.BufferedIndexValues();
+ bufferedOperations.put(keyBytes, values);
+ }
+
+ if(values.addedIDs != null && values.addedIDs.contains(entryID))
+ {
+ values.addedIDs.remove(entryID);
+ return true;
+ }
+
+ if(values.deletedIDs == null)
+ {
+ values.deletedIDs = new EntryIDSet(keyBytes, null);
+ }
+
+ values.deletedIDs.add(entryID);
+ return true;
+ }
+
+ /**
* Remove an entry ID from the set of IDs indexed by a given key.
*
* @param txn A database transaction, or null if none is required.
@@ -516,6 +835,35 @@
}
/**
+ * Buffered delete of a key from the JE database.
+ * @param buffer The index buffer to use to store the deleted keys
+ * @param keyBytes The index key bytes.
+ */
+ public void delete(IndexBuffer buffer, byte[] keyBytes)
+ {
+ TreeMap<byte[], IndexBuffer.BufferedIndexValues> bufferedOperations =
+ buffer.getBufferedIndex(this);
+ IndexBuffer.BufferedIndexValues values = null;
+
+ if(bufferedOperations == null)
+ {
+ bufferedOperations = new TreeMap<byte[],
+ IndexBuffer.BufferedIndexValues>(comparator);
+ buffer.putBufferedIndex(this, bufferedOperations);
+ }
+ else
+ {
+ values = bufferedOperations.get(keyBytes);
+ }
+
+ if(values == null)
+ {
+ values = new IndexBuffer.BufferedIndexValues();
+ bufferedOperations.put(keyBytes, values);
+ }
+ }
+
+ /**
* Check if an entry ID is in the set of IDs indexed by a given key.
*
* @param txn A database transaction, or null if none is required.
@@ -790,6 +1138,36 @@
}
/**
+ * Update the index buffer for a deleted entry.
+ *
+ * @param buffer The index buffer to use to store the deleted keys
+ * @param entryID The entry ID.
+ * @param entry The entry to be indexed.
+ * @return True if all the indexType keys for the entry are added. False if
+ * the entry ID already exists for some keys.
+ * @throws DatabaseException If an error occurs in the JE database.
+ * @throws DirectoryException If a Directory Server error occurs.
+ */
+ public boolean addEntry(IndexBuffer buffer, EntryID entryID, Entry entry)
+ throws DatabaseException, DirectoryException
+ {
+ HashSet<byte[]> addKeys = new HashSet<byte[]>();
+ boolean success = true;
+
+ indexer.indexEntry(null, entry, addKeys);
+
+ for (byte[] keyBytes : addKeys)
+ {
+ if(!insertID(buffer, keyBytes, entryID))
+ {
+ success = false;
+ }
+ }
+
+ return success;
+ }
+
+ /**
* Update the index for a new entry.
*
* @param txn A database transaction, or null if none is required.
@@ -821,6 +1199,27 @@
return success;
}
+ /**
+ * Update the index buffer for a deleted entry.
+ *
+ * @param buffer The index buffer to use to store the deleted keys
+ * @param entryID The entry ID
+ * @param entry The contents of the deleted entry.
+ * @throws DatabaseException If an error occurs in the JE database.
+ * @throws DirectoryException If a Directory Server error occurs.
+ */
+ public void removeEntry(IndexBuffer buffer, EntryID entryID, Entry entry)
+ throws DatabaseException, DirectoryException
+ {
+ HashSet<byte[]> delKeys = new HashSet<byte[]>();
+
+ indexer.indexEntry(null, entry, delKeys);
+
+ for (byte[] keyBytes : delKeys)
+ {
+ removeID(buffer, keyBytes, entryID);
+ }
+ }
/**
* Update the index for a deleted entry.
@@ -885,6 +1284,40 @@
}
/**
+ * Update the index to reflect a sequence of modifications in a Modify
+ * operation.
+ *
+ * @param buffer The index buffer to use to store the deleted keys
+ * @param entryID The ID of the entry that was modified.
+ * @param oldEntry The entry before the modifications were applied.
+ * @param newEntry The entry after the modifications were applied.
+ * @param mods The sequence of modifications in the Modify operation.
+ * @throws DatabaseException If an error occurs in the JE database.
+ */
+ public void modifyEntry(IndexBuffer buffer,
+ EntryID entryID,
+ Entry oldEntry,
+ Entry newEntry,
+ List<Modification> mods)
+ throws DatabaseException
+ {
+ TreeSet<byte[]> addKeys = new TreeSet<byte[]>(indexer.getComparator());
+ TreeSet<byte[]> delKeys = new TreeSet<byte[]>(indexer.getComparator());
+
+ indexer.modifyEntry(null, oldEntry, newEntry, mods, addKeys, delKeys);
+
+ for (byte[] keyBytes : delKeys)
+ {
+ removeID(buffer, keyBytes, entryID);
+ }
+
+ for (byte[] keyBytes : addKeys)
+ {
+ insertID(buffer, keyBytes, entryID);
+ }
+ }
+
+ /**
* Set the index entry limit.
*
* @param indexEntryLimit The index entry limit to set.
--
Gitblit v1.10.0