From 7c30dbb5403772b323df3ad907d9ed15d23b5aee Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 29 Apr 2010 20:35:40 +0000
Subject: [PATCH] Last batch of changes for this week. This adds support for the IETF based Password Policy for LDAP as SubEntry. Also resolves the following issues : - 4544 :  initializeBackend() should not set JE env config params directly. - 4478 : ECL in draft compat mode / search lastchangenumber can be very long - 4538 : Virtual attributes not retrieved when entry cache configured - 4547 : Search Filter Matching differ for cn=Directory Manager and plain user. - 4514 : Logs shows unexpected message with replication monitoring data missing (Partial fix) - 4534 : Replication using security does not work after server restart - 4516 : SEVERE_ERROR: servers (...) have the same ServerId In addition, they also improve reliability and performance in various areas including CollectiveAttributes, Virtual Attributes and Subentries management, Schema loading, Replication...

---
 opends/src/server/org/opends/server/replication/server/ReplicationDB.java |  393 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 383 insertions(+), 10 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 0e045bc..86e27fb 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Copyright 2006-2010 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import org.opends.messages.MessageBuilder;
@@ -37,6 +37,7 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.UpdateMsg;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.zip.DataFormatException;
 
 import com.sleepycat.je.Cursor;
 import com.sleepycat.je.DatabaseEntry;
@@ -67,6 +68,44 @@
   // close the db (shutdown or clear).
   private ReentrantReadWriteLock dbCloseLock;
 
+  // Change counter management
+  // The Db itself does not allow to count records between a start and an end
+  // change. And we cannot rely on the replication seqnum that is part of the
+  // changenumber, since there can be holes (when an operation is canceled).
+  // And traversing all the records from the start one to the end one works
+  // fine but can be very long (ECL:lastChangeNumber).
+  //
+  // So we are storing special records in the DB (called counter records),
+  // that contain the number of changes since the previous counter record.
+  // One special record is :
+  // - a special key : changetime , serverid=0  seqnum=0
+  // - a counter value : count of changes since previous counter record.
+  //
+  // A counter record has to follow the order of the db, so it needs to have
+  // a changenumber key that follow the order.
+  // A counter record must have its own chagenumber key since the Db does not
+  // support duplicate key (it is a compatibility breaker character of the DB).
+  //
+  // We define 2 conditions to store a counter record :
+  // 1/- at least 'counterWindowSize' changes have been stored in the Db
+  //     since the previous counter record
+  // 2/- the change to be stored has a new timestamp - so that the counter
+  //     record is the first record for this timestamp.
+  //
+
+
+  private int  counterCurrValue = 1;
+  // Current value of the counter.
+
+  private long counterTsLimit = 0;
+  // When not null,
+  // the next change with a ts different from tsForNewCounterRecord will lead
+  // to store a new counterRecord.
+
+  private int  counterWindowSize = 1000;
+  // The counter record will never be written to the db more often than each
+  // counterWindowSize changes.
+
  /**
    * Creates a new database or open existing database that will be used
    * to store and retrieve changes from an LDAP server.
@@ -92,6 +131,64 @@
         true).getGenerationId());
 
     dbCloseLock = new ReentrantReadWriteLock(true);
+
+    //
+    Cursor cursor = null;
+    Transaction txn = null;
+    DatabaseEntry key = new DatabaseEntry();
+    DatabaseEntry data = new DatabaseEntry();
+    OperationStatus status;
+    int distBackToCounterRecord = 0;
+
+    // Initialize counter
+    this.counterCurrValue = 1;
+    cursor = db.openCursor(txn, null);
+    status = cursor.getLast(key, data, LockMode.DEFAULT);
+    while (status == OperationStatus.SUCCESS)
+    {
+      try
+      {
+        ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8"));
+        if (!ReplicationDB.isaCounter(cn))
+        {
+          status = cursor.getPrev(key, data, LockMode.DEFAULT);
+          distBackToCounterRecord++;
+        }
+        else
+        {
+          // counter record
+          counterCurrValue = decodeCounterValue(data.getData())+1;
+          counterTsLimit = cn.getTime();
+          break;
+        }
+      }
+      catch (UnsupportedEncodingException e)
+      {
+        MessageBuilder mb = new MessageBuilder();
+        mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
+        mb.append(stackTraceToSingleLineString(e));
+        logError(mb.toMessage());
+        replicationServer.shutdown();
+        if (txn != null)
+        {
+          try
+          {
+            txn.abort();
+          } catch (DatabaseException e1)
+          {
+            // can't do much more. The ReplicationServer is shuting down.
+          }
+        }
+        replicationServer.shutdown();
+      }
+      catch (DataFormatException e)
+      {
+        // Should never happen
+      }
+    }
+    counterCurrValue += distBackToCounterRecord;
+    cursor.close();
+
   }
 
   /**
@@ -123,9 +220,31 @@
           {
             DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
             DatabaseEntry data = new ReplicationData(change);
-            db.put(txn, key, data);
-          }
 
+            if ((counterCurrValue!=0) &&
+                (counterCurrValue%counterWindowSize == 0))
+            {
+              // enough changes to generate a counter record - wait for the next
+              // change fo time
+              counterTsLimit = change.getChangeNumber().getTime();
+            }
+            if ((counterTsLimit!=0)
+                && (change.getChangeNumber().getTime() != counterTsLimit))
+            {
+              // Write the counter record
+              DatabaseEntry counterKey = new ReplicationKey(
+                  new ChangeNumber(
+                  change.getChangeNumber().getTime(),
+                  0, 0));
+              DatabaseEntry counterValue =
+                encodeCounterValue(counterCurrValue-1);
+              db.put(txn, counterKey, counterValue);
+              counterTsLimit=0;
+            }
+            db.put(txn, key, data);
+            counterCurrValue++;
+
+          }
           txn.commitWriteNoSync();
           txn = null;
           done = true;
@@ -275,6 +394,7 @@
   {
     Cursor cursor = null;
     String str = null;
+    ChangeNumber cn = null;
 
     try
     {
@@ -301,11 +421,25 @@
         try
         {
           str = new String(key.getData(), "UTF-8");
+          cn = new ChangeNumber(str);
+          if (ReplicationDB.isaCounter(cn))
+          {
+            // First record is a counter record .. go next
+            status = cursor.getNext(key, data, LockMode.DEFAULT);
+            if (status != OperationStatus.SUCCESS)
+            {
+              // DB contains only a counter record
+              return null;
+            }
+            else
+            {
+              cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+            }
+          }
         } catch (UnsupportedEncodingException e)
         {
           // never happens
         }
-        return new ChangeNumber(str);
       }
       finally
       {
@@ -320,8 +454,9 @@
       mb.append(stackTraceToSingleLineString(e));
       logError(mb.toMessage());
       replicationServer.shutdown();
-      return null;
+      cn = null;
     }
+    return cn;
   }
 
   /**
@@ -331,7 +466,7 @@
   public ChangeNumber readLastChange()
   {
     Cursor cursor = null;
-    String str = null;
+    ChangeNumber cn = null;
 
     try
     {
@@ -349,13 +484,23 @@
         }
         try
         {
-          str = new String(key.getData(), "UTF-8");
+          String str = new String(key.getData(), "UTF-8");
+          cn = new ChangeNumber(str);
+          if (ReplicationDB.isaCounter(cn))
+          {
+            if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
+              OperationStatus.SUCCESS)
+            {
+              /* database only contain a counter record - don't know
+               * how much it can be possible but ... */
+              cn = null;
+            }
+          }
         }
         catch (UnsupportedEncodingException e)
         {
           // never happens
         }
-        return new ChangeNumber(str);
       }
       finally
       {
@@ -369,8 +514,9 @@
       mb.append(stackTraceToSingleLineString(e));
       logError(mb.toMessage());
       replicationServer.shutdown();
-      return null;
+      cn = null;
     }
+    return cn;
   }
 
   /**
@@ -611,7 +757,14 @@
         {
           return null;
         }
-        try {
+        try
+        {
+          ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8"));
+          if(ReplicationDB.isaCounter(cn))
+          {
+            // counter record
+            continue;
+          }
           currentChange = ReplicationData.generateChange(data.getData());
         } catch (Exception e) {
           /*
@@ -681,4 +834,224 @@
       dbCloseLock.writeLock().unlock();
     }
   }
+  /**
+   * Count the number of changes between 2 changes numbers (inclusive).
+   * @param start The lower limit of the count.
+   * @param stop The higher limit of the count.
+   * @return The number of changes between provided start and stop changeNumber.
+   * Returns -1 when an error occurs.
+   */
+  public int count(ChangeNumber start, ChangeNumber stop)
+  {
+    int counterRecord1 = 0;
+    int counterRecord2 = 0;
+    int distToCounterRecord1 = 0;
+    int distBackToCounterRecord2 = 0;
+    int count=0;
+    Cursor cursor = null;
+    Transaction txn = null;
+    OperationStatus status;
+    try
+    {
+      ChangeNumber cn ;
+
+      if ((start==null)&&(stop==null))
+        return (int)db.count();
+
+      // Step 1 : from the start point, traverse db to the next counter record
+      // or to the stop point.
+      DatabaseEntry key = new DatabaseEntry();
+      DatabaseEntry data = new DatabaseEntry();
+      cursor = db.openCursor(txn, null);
+      if (start != null)
+      {
+        key = new ReplicationKey(start);
+        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+        if (status == OperationStatus.NOTFOUND)
+          status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
+      }
+      else
+      {
+        status = cursor.getNext(key, data, LockMode.DEFAULT);
+      }
+
+      while (status == OperationStatus.SUCCESS)
+      {
+        // test whether the record is a regular change or a counter
+        String csnString = new String(key.getData(), "UTF-8");
+        cn = new ChangeNumber(csnString);
+        if (cn.getServerId() != 0)
+        {
+          // reached a regular change record
+          // test whether we reached the 'stop' target
+          if (!cn.newer(stop))
+          {
+            // let's loop
+            distToCounterRecord1++;
+            status = cursor.getNext(key, data, LockMode.DEFAULT);
+          }
+          else
+          {
+            // reached the end
+            break;
+          }
+        }
+        else
+        {
+          // counter record
+          counterRecord1 = decodeCounterValue(data.getData());
+          break;
+        }
+      }
+      cursor.close();
+
+      // cases
+      //
+      if (counterRecord1==0)
+        return distToCounterRecord1;
+
+      // Step 2 : from the stop point, traverse db to the next counter record
+      // or to the start point.
+      txn = null;
+      data = new DatabaseEntry();
+      key = new ReplicationKey(stop);
+      cursor = db.openCursor(txn, null);
+      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+      if (status == OperationStatus.SUCCESS)
+      {
+        cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+      }
+      else
+      {
+        key = new DatabaseEntry();
+        data = new DatabaseEntry();
+        status = cursor.getLast(key, data, LockMode.DEFAULT);
+        if (status != OperationStatus.SUCCESS)
+        {
+          /* database is empty */
+          return 0;
+        }
+      }
+      while (status == OperationStatus.SUCCESS)
+      {
+        cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+        if (!ReplicationDB.isaCounter(cn))
+        {
+          // regular change record
+          if (!cn.older(start))
+          {
+            distBackToCounterRecord2++;
+            status = cursor.getPrev(key, data, LockMode.DEFAULT);
+          }
+          else
+            break;
+        }
+        else
+        {
+          // counter record
+          counterRecord2 = decodeCounterValue(data.getData());
+          break;
+        }
+      }
+      cursor.close();
+
+      // Step 3 : Now consolidates the result
+      if (counterRecord1!=0)
+      {
+        if (counterRecord1 == counterRecord2)
+        {
+          // only one cp between from and to - no need to use it
+          count = distToCounterRecord1 + distBackToCounterRecord2;
+        }
+        else
+        {
+          // 2 cp between from and to
+          count = distToCounterRecord1 + (counterRecord2-counterRecord1)
+            + distBackToCounterRecord2;
+        }
+      }
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      MessageBuilder mb = new MessageBuilder();
+      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
+      mb.append(stackTraceToSingleLineString(e));
+      logError(mb.toMessage());
+      replicationServer.shutdown();
+    }
+    catch (DataFormatException e)
+    {
+      // Should never happen
+    }
+    finally
+    {
+      if (cursor != null)
+        cursor.close();
+      if (txn != null)
+      {
+        try
+        {
+          txn.abort();
+        } catch (DatabaseException e1)
+        {
+          // can't do much more. The ReplicationServer is shuting down.
+        }
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Test if a provided changeNumber represents a counter record.
+   * @param cn The provided changeNumber.
+   * @return True if the provided changenumber is a counter.
+   */
+  static private boolean isaCounter(ChangeNumber cn)
+  {
+    return ((cn.getServerId()== 0) && (cn.getSeqnum()==0));
+  }
+
+  /**
+   * Decode the provided database entry as a the value of a counter.
+   * @param entry The provided entry.
+   * @return The counter value.
+   * @throws DataFormatException
+   */
+  private static int decodeCounterValue(byte[] entry)
+  throws DataFormatException
+  {
+    try
+    {
+      String numAckStr = new String(entry, 0, entry.length, "UTF-8");
+      return Integer.parseInt(numAckStr);
+
+    } catch (UnsupportedEncodingException e)
+    {
+      throw new DataFormatException("UTF-8 is not supported by this jvm.");
+    }
+  }
+
+  /**
+   * Encode the provided counter value in a database entry.
+   * @param entry The provided entry.
+   * @return The databse entry with the counter value encoded inside..
+   * @throws UnsupportedEncodingException
+   */
+  static private DatabaseEntry encodeCounterValue(int value)
+  throws UnsupportedEncodingException
+  {
+    DatabaseEntry entry = new DatabaseEntry();
+    entry.setData(String.valueOf(value).getBytes("UTF-8"));
+    return entry;
+  }
+
+  /**
+   * Set the counter writing window size (public method for unit tests only).
+   * @param size Size in number of record.
+   */
+  public void setCounterWindowSize(int size)
+  {
+    this.counterWindowSize = size;
+  }
+
 }

--
Gitblit v1.10.0