From a670c31c686f19c2e8619602de6518cde032f61f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 06 Sep 2013 21:41:58 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java |  147 +++++++++---------------------------------------
 1 files changed, 28 insertions(+), 119 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 62fee3a..f351728 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -27,7 +27,6 @@
  */
 package org.opends.server.replication.server;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.StringReader;
 import java.net.*;
@@ -54,10 +53,9 @@
 import org.opends.server.replication.protocol.*;
 import org.opends.server.replication.server.changelog.api.CNIndexRecord;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.je.DbHandler;
-import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
-import org.opends.server.replication.server.changelog.je.ReplicationDbEnv;
+import org.opends.server.replication.server.changelog.je.JEChangelogDB;
 import org.opends.server.types.*;
 import org.opends.server.util.LDIFReader;
 import org.opends.server.util.ServerConstants;
@@ -101,10 +99,9 @@
           new HashMap<String, ReplicationServerDomain>();
 
   private volatile boolean shutdown = false;
-  private ReplicationDbEnv dbEnv;
   private int rcvWindow;
   private int queueSize;
-  private String dbDirname = null;
+  private final ChangelogDB changelogDB = new JEChangelogDB(this);
 
   /**
    * The delay (in sec) after which the changes must be deleted from the
@@ -225,30 +222,11 @@
       replicationServerUrls = new ArrayList<String>();
     queueSize = configuration.getQueueSize();
     purgeDelay = configuration.getReplicationPurgeDelay();
-    dbDirname = configuration.getReplicationDBDirectory();
     rcvWindow = configuration.getWindowSize();
-    if (dbDirname == null)
-    {
-      dbDirname = "changelogDb";
-    }
-    // Check that this path exists or create it.
-    File f = getFileForPath(dbDirname);
-    try
-    {
-      if (!f.exists())
-      {
-        f.mkdir();
-      }
-    }
-    catch (Exception e)
-    {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(e.getLocalizedMessage());
-      mb.append(" ");
-      mb.append(String.valueOf(getFileForPath(dbDirname)));
-      Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
-      throw new ConfigException(msg, e);
-    }
+
+    this.changelogDB.setReplicationDBDirectory(configuration
+        .getReplicationDBDirectory());
+
     groupId = (byte)configuration.getGroupId();
     weight = configuration.getWeight();
     assuredTimeout = configuration.getAssuredTimeout();
@@ -504,10 +482,7 @@
 
     try
     {
-      // Initialize the replicationServer database.
-      dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
-          this);
-      dbEnv.initializeFromChangelogStateDB();
+      this.changelogDB.initializeDB();
 
       setServerURL();
       listenSocket = new ServerSocket();
@@ -539,16 +514,9 @@
       if (debugEnabled())
         TRACER.debugInfo("RS " +getMonitorInstanceName()+
             " successfully initialized");
-    } catch (ChangelogException e)
-    {
-      Message message = ERR_COULD_NOT_READ_DB.get(
-              getFileForPath(dbDirname).getAbsolutePath(),
-              e.getLocalizedMessage());
-      logError(message);
     } catch (UnknownHostException e)
     {
-      Message message = ERR_UNKNOWN_HOSTNAME.get();
-      logError(message);
+      logError(ERR_UNKNOWN_HOSTNAME.get());
     } catch (IOException e)
     {
       Message message =
@@ -827,37 +795,12 @@
 
     shutdownECL();
 
-    if (dbEnv != null)
-    {
-      dbEnv.shutdown();
-    }
+    this.changelogDB.shutdownDB();
 
     // Remove this instance from the global instance list
     allInstances.remove(this);
   }
 
-
-  /**
-   * Creates a new DB handler for this ReplicationServer and the serverId and DN
-   * given in parameter.
-   *
-   * @param serverId
-   *          The serverId for which the dbHandler must be created.
-   * @param baseDn
-   *          The DN for which the dbHandler must be created.
-   * @return The new DB handler for this ReplicationServer and the serverId and
-   *         DN given in parameter.
-   * @throws ChangelogException
-   *           in case of underlying database problem.
-   */
-  public DbHandler newDbHandler(int serverId, String baseDn)
-      throws ChangelogException
-  {
-    return new DbHandler(serverId, baseDn, this, dbEnv, queueSize);
-  }
-
-
-
   /**
    * Clears the generationId for the replicationServerDomain related to the
    * provided baseDn.
@@ -867,18 +810,6 @@
    */
   public void clearGenerationId(String baseDn)
   {
-    try
-    {
-      dbEnv.clearGenerationId(baseDn);
-    }
-    catch (Exception ignored)
-    {
-      if (debugEnabled())
-      {
-        TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
-      }
-    }
-
     synchronized (cnIndexDBLock)
     {
       if (cnIndexDB != null)
@@ -962,11 +893,7 @@
     if (newPurgeDelay != purgeDelay)
     {
       purgeDelay = newPurgeDelay;
-      // propagate
-      for (ReplicationServerDomain domain : getReplicationServerDomains())
-      {
-        domain.setPurgeDelay(purgeDelay*1000);
-      }
+      this.changelogDB.setPurgeDelay(purgeDelay * 1000);
     }
 
     rcvWindow = configuration.getWindowSize();
@@ -1047,7 +974,7 @@
     }
 
     final String newDir = configuration.getReplicationDBDirectory();
-    if (newDir != null && !dbDirname.equals(newDir))
+    if (newDir != null && !this.changelogDB.getDBDirName().equals(newDir))
     {
       return new ConfigChangeResult(ResultCode.SUCCESS, true);
     }
@@ -1597,7 +1524,7 @@
    * @throws DirectoryException
    *           when needed.
    */
-  public ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException
+  ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException
   {
     synchronized (cnIndexDBLock)
     {
@@ -1605,9 +1532,9 @@
       {
         if (cnIndexDB == null)
         {
-          cnIndexDB = new DraftCNDbHandler(this, this.dbEnv);
+          cnIndexDB = this.changelogDB.newChangeNumberIndexDB();
           final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord();
-          // initialization of the lastGeneratedChangeNumebr from the DB content
+          // initialization of the lastGeneratedChangeNumber from the DB content
           // if DB is empty => last record does not exist => default to 0
           lastGeneratedChangeNumber =
               (lastCNRecord != null) ? lastCNRecord.getChangeNumber() : 0;
@@ -1617,7 +1544,8 @@
       catch (Exception e)
       {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        Message message = ERR_CHANGENUMBER_DATABASE.get(e.getMessage());
+        Message message =
+            ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage());
         throw new DirectoryException(OPERATIONS_ERROR, message, e);
       }
     }
@@ -1816,6 +1744,16 @@
   }
 
   /**
+   * Returns the changelogDB.
+   *
+   * @return the changelogDB.
+   */
+  ChangelogDB getChangelogDB()
+  {
+    return this.changelogDB;
+  }
+
+  /**
    * Get the replication server DB directory.
    * This is useful for tests to be able to do some cleanup. Might even be
    * useful for the server some day.
@@ -1824,7 +1762,7 @@
    */
   public String getDbDirName()
   {
-    return dbDirname;
+    return this.changelogDB.getDBDirName();
   }
 
   /*
@@ -1896,33 +1834,4 @@
         + baseDNs.keySet();
   }
 
-  /**
-   * Initializes the generationId for the specified replication domain.
-   *
-   * @param baseDn
-   *          the replication domain
-   * @param generationId
-   *          the the generationId value for initialization
-   */
-  public void initDomainGenerationID(String baseDn, long generationId)
-  {
-    getReplicationServerDomain(baseDn, true).initGenerationID(generationId);
-  }
-
-  /**
-   * Adds the specified serverId to the specified replication domain.
-   *
-   * @param serverId
-   *          the server Id to add to the replication domain
-   * @param baseDn
-   *          the replication domain where to add the serverId
-   * @throws ChangelogException
-   *           If a database error happened.
-   */
-  public void addServerIdToDomain(int serverId, String baseDn)
-      throws ChangelogException
-  {
-    DbHandler dbHandler = newDbHandler(serverId, baseDn);
-    getReplicationServerDomain(baseDn, true).setDbHandler(serverId, dbHandler);
-  }
 }

--
Gitblit v1.10.0