From a72ae6523fc66a21ced5b5ab04c23e1629ae4d20 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Thu, 20 Dec 2007 17:45:49 +0000
Subject: [PATCH] Updated indexes to order the keys before inserting them into the database. This assures no deadlocks will occur between multiple adds and mods.  Disabled lock timeouts for add and mod operations since deadlocks can not occur. This prevents txn aborts and op retry expiration due to lock timeouts of add and mod operations when the server is under high write load. 

---
 opends/src/server/org/opends/server/backends/jeb/Index.java |  219 ++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 170 insertions(+), 49 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/jeb/Index.java b/opends/src/server/org/opends/server/backends/jeb/Index.java
index 9f00700..c031aad 100644
--- a/opends/src/server/org/opends/server/backends/jeb/Index.java
+++ b/opends/src/server/org/opends/server/backends/jeb/Index.java
@@ -32,7 +32,6 @@
 
 import com.sleepycat.je.*;
 
-import org.opends.server.protocols.asn1.ASN1OctetString;
 import org.opends.server.types.*;
 import org.opends.server.util.StaticUtils;
 import static org.opends.messages.JebMessages.*;
@@ -79,6 +78,17 @@
    */
   private int entryLimitExceededCount;
 
+  /**
+   * The max number of tries to rewrite phantom records.
+   */
+  final int phantomWriteRetires = 3;
+
+  /**
+   * Whether to maintain a count of IDs for a key once the entry limit
+   * has exceeded.
+   */
+  boolean maintainCount;
+
   private State state;
 
   /**
@@ -111,13 +121,15 @@
    * @param indexEntryLimit The configured limit on the number of entry IDs
    * that may be indexed by one key.
    * @param cursorEntryLimit The configured limit on the number of entry IDs
+   * @param maintainCount Whether to maintain a count of IDs for a key once
+   * the entry limit has exceeded.
    * @param env The JE Environemnt
    * @param entryContainer The database entryContainer holding this index.
    * @throws DatabaseException If an error occurs in the JE database.
    */
   public Index(String name, Indexer indexer, State state,
-        int indexEntryLimit, int cursorEntryLimit, Environment env,
-        EntryContainer entryContainer)
+        int indexEntryLimit, int cursorEntryLimit, boolean maintainCount,
+        Environment env, EntryContainer entryContainer)
       throws DatabaseException
   {
     super(name, env, entryContainer);
@@ -125,6 +137,7 @@
     this.comparator = indexer.getComparator();
     this.indexEntryLimit = indexEntryLimit;
     this.cursorEntryLimit = cursorEntryLimit;
+    this.maintainCount = maintainCount;
 
     DatabaseConfig dbNodupsConfig = new DatabaseConfig();
 
@@ -183,60 +196,61 @@
        throws DatabaseException
   {
     OperationStatus status;
-    LockMode lockMode = LockMode.RMW;
     DatabaseEntry entryIDData = entryID.getDatabaseEntry();
     DatabaseEntry data = new DatabaseEntry();
-    boolean success = true;
-    boolean done = false;
+    boolean success = false;
 
-    while(!done)
+    if(maintainCount)
     {
-      status = read(txn, key, data, lockMode);
-
-      if (status == OperationStatus.SUCCESS)
+      for(int i = 0; i < phantomWriteRetires; i++)
+      {
+        if(insertIDWithRMW(txn, key, data, entryIDData, entryID) ==
+            OperationStatus.SUCCESS)
+        {
+          return true;
+        }
+      }
+    }
+    else
+    {
+      status = read(txn, key, data, LockMode.READ_COMMITTED);
+      if(status == OperationStatus.SUCCESS)
       {
         EntryIDSet entryIDList =
             new EntryIDSet(key.getData(), data.getData());
+
         if (entryIDList.isDefined())
         {
-          if (indexEntryLimit > 0 && entryIDList.size() >= indexEntryLimit)
+          for(int i = 0; i < phantomWriteRetires; i++)
           {
-            entryIDList = new EntryIDSet(entryIDList.size());
-            entryLimitExceededCount++;
-
-            if(debugEnabled())
+            if(insertIDWithRMW(txn, key, data, entryIDData, entryID) ==
+                OperationStatus.SUCCESS)
             {
-              StringBuilder builder = new StringBuilder();
-              StaticUtils.byteArrayToHexPlusAscii(builder, key.getData(), 4);
-              TRACER.debugInfo("Index entry exceeded in index %s. " +
-                  "Limit: %d. ID list size: %d.\nKey:",
-                               name, indexEntryLimit, entryIDList.size(),
-                               builder);
-
+              return true;
             }
           }
         }
-
-        success = entryIDList.add(entryID);
-
-        byte[] after = entryIDList.toDatabase();
-        data.setData(after);
-        put(txn, key, data);
-        done = true;
       }
       else
       {
         if(rebuildRunning || trusted)
         {
           status = insert(txn, key, entryIDData);
-          if(status == OperationStatus.SUCCESS)
+          if(status == OperationStatus.KEYEXIST)
           {
-            done = true;
+            for(int i = 1; i < phantomWriteRetires; i++)
+            {
+              if(insertIDWithRMW(txn, key, data, entryIDData, entryID) ==
+                  OperationStatus.SUCCESS)
+              {
+                return true;
+              }
+            }
           }
         }
         else
         {
-          done = true;
+          return true;
         }
       }
     }
@@ -244,6 +258,63 @@
     return success;
   }
 
+  private OperationStatus insertIDWithRMW(Transaction txn, DatabaseEntry key,
+                                          DatabaseEntry data,
+                                          DatabaseEntry entryIDData,
+                                          EntryID entryID)
+      throws DatabaseException
+  {
+    OperationStatus status;
+
+    status = read(txn, key, data, LockMode.RMW);
+    if(status == OperationStatus.SUCCESS)
+    {
+      EntryIDSet entryIDList =
+          new EntryIDSet(key.getData(), data.getData());
+      if (entryIDList.isDefined() && indexEntryLimit > 0 &&
+          entryIDList.size() >= indexEntryLimit)
+      {
+        if(maintainCount)
+        {
+          entryIDList = new EntryIDSet(entryIDList.size());
+        }
+        else
+        {
+          entryIDList = new EntryIDSet();
+        }
+        entryLimitExceededCount++;
+
+        if(debugEnabled())
+        {
+          StringBuilder builder = new StringBuilder();
+          StaticUtils.byteArrayToHexPlusAscii(builder, key.getData(), 4);
+          TRACER.debugInfo("Index entry exceeded in index %s. " +
+              "Limit: %d. ID list size: %d.\nKey:",
+              name, indexEntryLimit, entryIDList.size(),
+              builder);
+
+        }
+      }
+
+      entryIDList.add(entryID);
+
+      byte[] after = entryIDList.toDatabase();
+      data.setData(after);
+      return put(txn, key, data);
+    }
+    else
+    {
+      if(rebuildRunning || trusted)
+      {
+        return insert(txn, key, entryIDData);
+      }
+      else
+      {
+        return OperationStatus.SUCCESS;
+      }
+    }
+  }
+
   /**
    * Remove an entry ID from the set of IDs indexed by a given key.
    *
@@ -256,10 +327,51 @@
       throws DatabaseException
   {
     OperationStatus status;
-    LockMode lockMode = LockMode.RMW;
     DatabaseEntry data = new DatabaseEntry();
 
-    status = read(txn, key, data, lockMode);
+    if(maintainCount)
+    {
+      removeIDWithRMW(txn, key, data, entryID);
+    }
+    else
+    {
+      status = read(txn, key, data, LockMode.READ_COMMITTED);
+      if(status == OperationStatus.SUCCESS)
+      {
+        EntryIDSet entryIDList = new EntryIDSet(key.getData(), data.getData());
+        if(entryIDList.isDefined())
+        {
+          removeIDWithRMW(txn, key, data, entryID);
+        }
+      }
+      else
+      {
+        // Ignore failures if rebuild is running since a empty entryIDset
+        // will probably not be rebuilt.
+        if(trusted && !rebuildRunning)
+        {
+          setTrusted(txn, false);
+
+          if(debugEnabled())
+          {
+            StringBuilder builder = new StringBuilder();
+            StaticUtils.byteArrayToHexPlusAscii(builder, key.getData(), 4);
+            TRACER.debugError("The expected key does not exist in the " +
+                "index %s.\nKey:%s", name, builder.toString());
+          }
+
+          logError(ERR_JEB_INDEX_CORRUPT_REQUIRES_REBUILD.get(name));
+        }
+      }
+    }
+  }
+
+  private void removeIDWithRMW(Transaction txn, DatabaseEntry key,
+                               DatabaseEntry data, EntryID entryID)
+      throws DatabaseException
+  {
+    OperationStatus status;
+    status = read(txn, key, data, LockMode.RMW);
 
     if (status == OperationStatus.SUCCESS)
     {
@@ -272,15 +384,13 @@
         {
           setTrusted(txn, false);
 
-
-
           if(debugEnabled())
           {
             StringBuilder builder = new StringBuilder();
             StaticUtils.byteArrayToHexPlusAscii(builder, key.getData(), 4);
             TRACER.debugError("The expected entry ID does not exist in " +
                 "the entry ID list for index %s.\nKey:%s",
-                              name, builder.toString());
+                name, builder.toString());
           }
 
           logError(ERR_JEB_INDEX_CORRUPT_REQUIRES_REBUILD.get(name));
@@ -612,15 +722,15 @@
   public boolean addEntry(Transaction txn, EntryID entryID, Entry entry)
        throws DatabaseException, DirectoryException
   {
-    HashSet<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>();
+    TreeSet<byte[]> addKeys = new TreeSet<byte[]>(indexer.getComparator());
     boolean success = true;
 
     indexer.indexEntry(txn, entry, addKeys);
 
     DatabaseEntry key = new DatabaseEntry();
-    for (ASN1OctetString keyBytes : addKeys)
+    for (byte[] keyBytes : addKeys)
     {
-      key.setData(keyBytes.value());
+      key.setData(keyBytes);
       if(!insertID(txn, key, entryID))
       {
         success = false;
@@ -643,14 +753,14 @@
   public void removeEntry(Transaction txn, EntryID entryID, Entry entry)
        throws DatabaseException, DirectoryException
   {
-    HashSet<ASN1OctetString> delKeys = new HashSet<ASN1OctetString>();
+    TreeSet<byte[]> delKeys = new TreeSet<byte[]>(indexer.getComparator());
 
     indexer.indexEntry(txn, entry, delKeys);
 
     DatabaseEntry key = new DatabaseEntry();
-    for (ASN1OctetString keyBytes : delKeys)
+    for (byte[] keyBytes : delKeys)
     {
-      key.setData(keyBytes.value());
+      key.setData(keyBytes);
       removeID(txn, key, entryID);
     }
   }
@@ -674,21 +784,21 @@
                           List<Modification> mods)
        throws DatabaseException
   {
-    HashSet<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>();
-    HashSet<ASN1OctetString> delKeys = new HashSet<ASN1OctetString>();
+    TreeSet<byte[]> addKeys = new TreeSet<byte[]>(indexer.getComparator());
+    TreeSet<byte[]> delKeys = new TreeSet<byte[]>(indexer.getComparator());
 
     indexer.modifyEntry(txn, oldEntry, newEntry, mods, addKeys, delKeys);
 
     DatabaseEntry key = new DatabaseEntry();
-    for (ASN1OctetString keyBytes : delKeys)
+    for (byte[] keyBytes : delKeys)
     {
-      key.setData(keyBytes.value());
+      key.setData(keyBytes);
       removeID(txn, key, entryID);
     }
 
-    for (ASN1OctetString keyBytes : addKeys)
+    for (byte[] keyBytes : addKeys)
     {
-      key.setData(keyBytes.value());
+      key.setData(keyBytes);
       insertID(txn, key, entryID);
     }
   }
@@ -754,4 +864,15 @@
   {
     this.rebuildRunning = rebuildRunning;
   }
+
+  /**
+   * Whether this index maintains a count of IDs for keys once the
+   * entry limit has exceeded.
+   * @return <code>true</code> if this index maintains court of IDs
+   * or <code>false</code> otherwise
+   */
+  public boolean getMaintainCount()
+  {
+    return maintainCount;
+  }
 }

--
Gitblit v1.10.0