From 5e495b3c867de5b83117834386859da67cbbedc5 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 13 Aug 2013 13:32:13 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/JEReplicationIterator.java                       |   28 +++------
 opends/src/server/org/opends/server/replication/server/MessageHandler.java                              |   29 ++++-----
 opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java |    4 +
 opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java           |   60 ++++++++++++++++++++
 opends/src/server/org/opends/server/replication/server/ReplicationBackend.java                          |    1 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java       |    1 
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                     |    1 
 opends/src/server/org/opends/server/replication/server/DbHandler.java                                   |    3 
 8 files changed, 91 insertions(+), 36 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 661a7ac..b5bae4e 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -42,6 +42,7 @@
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ReplicationIterator;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.Attributes;
 import org.opends.server.types.InitializationException;
@@ -276,7 +277,7 @@
     {
       flush();
     }
-    return new ReplicationIterator(db, changeNumber, this);
+    return new JEReplicationIterator(db, changeNumber, this);
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java b/opends/src/server/org/opends/server/replication/server/JEReplicationIterator.java
similarity index 84%
rename from opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
rename to opends/src/server/org/opends/server/replication/server/JEReplicationIterator.java
index 82b201f..16e6243 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
+++ b/opends/src/server/org/opends/server/replication/server/JEReplicationIterator.java
@@ -32,12 +32,12 @@
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ReplicationIterator;
 
 /**
- * This class allows to iterate through the changes received from a given
- * LDAP Server Identifier.
+ * Berkeley DB JE implementation of IReplicationIterator.
  */
-public class ReplicationIterator
+public class JEReplicationIterator implements ReplicationIterator
 {
   private UpdateMsg currentChange = null;
   private ReplServerDBCursor cursor = null;
@@ -55,7 +55,7 @@
    * @param dbHandler The associated DbHandler.
    * @throws ChangelogException if a database problem happened.
    */
-  public ReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
+  public JEReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
       DbHandler dbHandler) throws ChangelogException
   {
     this.db = db;
@@ -86,20 +86,15 @@
     }
   }
 
-  /**
-   * Get the UpdateMsg where the iterator is currently set.
-   * @return The UpdateMsg where the iterator is currently set.
-   */
+  /** {@inheritDoc} */
+  @Override
   public UpdateMsg getChange()
   {
     return currentChange;
   }
 
-  /**
-   * Go to the next change in the ReplicationDB or in the server Queue.
-   * @return false if the iterator is already on the last change before
-   *         this call.
-   */
+  /** {@inheritDoc} */
+  @Override
   public boolean next()
   {
     currentChange = cursor.next();
@@ -136,11 +131,8 @@
     return currentChange != null;
   }
 
-  /**
-   * Release the resources and locks used by this Iterator.
-   * This method must be called when the iterator is no longer used.
-   * Failure to do it could cause DB deadlock.
-   */
+  /** {@inheritDoc} */
+  @Override
   public void releaseCursor()
   {
     synchronized (this)
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 626c08b..ed652c7 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,9 +27,6 @@
  */
 package org.opends.server.replication.server;
 
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.SortedSet;
@@ -45,8 +42,12 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.*;
 import org.opends.server.types.*;
 
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
 /**
  * This class implements a buffering/producer/consumer mechanism of
  * replication changes (UpdateMsg) used inside the replication server.
@@ -281,17 +282,15 @@
            *   load this change on the delayList
            *
            */
-          ReplicationIteratorComparator comparator =
-            new ReplicationIteratorComparator();
           SortedSet<ReplicationIterator> iteratorSortedSet =
-            new TreeSet<ReplicationIterator>(comparator);
+              new TreeSet<ReplicationIterator>(
+                  new ReplicationIteratorComparator());
           try
           {
             /* fill the lateQueue */
             for (int serverId : replicationServerDomain.getServers())
             {
-              ChangeNumber lastCsn = serverState
-                  .getChangeNumber(serverId);
+              ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
               ReplicationIterator iterator = replicationServerDomain
                   .getChangelogIterator(serverId, lastCsn);
               if (iterator != null)
@@ -318,8 +317,7 @@
                 && (lateQueue.count() < 100)
                 && (lateQueue.bytesCount() < 50000))
             {
-              ReplicationIterator iterator = iteratorSortedSet
-                  .first();
+              ReplicationIterator iterator = iteratorSortedSet.first();
               iteratorSortedSet.remove(iterator);
               lateQueue.add(iterator.getChange());
               if (iterator.next())
@@ -376,7 +374,7 @@
                 {
                   msg1 = msgQueue.removeFirst();
                 } while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
-                this.updateServerState(msg);
+                updateServerState(msg);
                 return msg1;
               }
             }
@@ -388,7 +386,7 @@
           {
             msg = lateQueue.removeFirst();
           }
-          this.updateServerState(msg);
+          updateServerState(msg);
           return msg;
         }
       }
@@ -412,7 +410,7 @@
           }
           msg = msgQueue.removeFirst();
 
-          if (this.updateServerState(msg))
+          if (updateServerState(msg))
           {
             /*
              * Only push the message if it has not yet been seen
@@ -462,10 +460,9 @@
           there. So let's take the last change not sent directly from
           the db.
           */
-          ReplicationIteratorComparator comparator =
-            new ReplicationIteratorComparator();
           SortedSet<ReplicationIterator> iteratorSortedSet =
-            new TreeSet<ReplicationIterator>(comparator);
+              new TreeSet<ReplicationIterator>(
+                  new ReplicationIteratorComparator());
           try
           {
             // Build a list of candidates iterator (i.e. db i.e. server)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java b/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
index 0e31927..f1b4c56 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -72,6 +72,7 @@
 import org.opends.server.replication.protocol.ModifyDNMsg;
 import org.opends.server.replication.protocol.ModifyMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ReplicationIterator;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeBuilder;
 import org.opends.server.types.AttributeType;
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 947fcbf..13c3b20 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,6 +47,7 @@
 import org.opends.server.replication.common.*;
 import org.opends.server.replication.protocol.*;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ReplicationIterator;
 import org.opends.server.types.*;
 import org.opends.server.util.TimeThread;
 
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java
new file mode 100644
index 0000000..b44c16c
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java
@@ -0,0 +1,60 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.api;
+
+import org.opends.server.replication.protocol.UpdateMsg;
+
+/**
+ * This interface allows to iterate through the changes received from a given
+ * LDAP Server Identifier.
+ */
+public interface ReplicationIterator
+{
+
+  /**
+   * Get the UpdateMsg where the iterator is currently set.
+   *
+   * @return The UpdateMsg where the iterator is currently set.
+   */
+  UpdateMsg getChange();
+
+  /**
+   * Go to the next change in the ReplicationDB or in the server Queue.
+   *
+   * @return false if the iterator is already on the last change before this
+   *         call.
+   */
+  boolean next();
+
+  /**
+   * Release the resources and locks used by this Iterator. This method must be
+   * called when the iterator is no longer used. Failure to do it could cause DB
+   * deadlock.
+   */
+  void releaseCursor();
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationIteratorComparator.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java
similarity index 93%
rename from opends/src/server/org/opends/server/replication/server/ReplicationIteratorComparator.java
rename to opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java
index b805e30..e02d810 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationIteratorComparator.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java
@@ -23,8 +23,9 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Portions copyright 2013 ForgeRock AS
  */
-package org.opends.server.replication.server;
+package org.opends.server.replication.server.changelog.api;
 
 import java.util.Comparator;
 
@@ -45,6 +46,7 @@
    * @param o2 second ReplicationIterator.
    * @return result of the comparison.
    */
+  @Override
   public int compare(ReplicationIterator o1, ReplicationIterator o2)
   {
     ChangeNumber csn1 = o1.getChange().getChangeNumber();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index 5da4f99..88001a7 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -42,6 +42,7 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ChangeNumberGenerator;
 import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.server.changelog.api.ReplicationIterator;
 import org.testng.annotations.Test;
 
 /**

--
Gitblit v1.10.0