From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication

---
 opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java |  481 +++++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 391 insertions(+), 90 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index 0a4f630..fc7742c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -25,10 +25,12 @@
  *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
+import org.opends.messages.*;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
@@ -58,6 +60,12 @@
   private Environment dbEnvironment = null;
   private Database stateDb = null;
   private ReplicationServer replicationServer = null;
+  private static final String GENERATION_ID_TAG = "GENID";
+  private static final String FIELD_SEPARATOR = " ";
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
 
   /**
    * Initialize this class.
@@ -117,16 +125,120 @@
     Cursor cursor = stateDb.openCursor(null, null);
     DatabaseEntry key = new DatabaseEntry();
     DatabaseEntry data = new DatabaseEntry();
+
     try
     {
+      /*
+       *  Get the domain base DN/ generationIDs records
+       */
       OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
       while (status == OperationStatus.SUCCESS)
       {
         try
         {
           String stringData = new String(data.getData(), "UTF-8");
-          String[] str = stringData.split(" ", 2);
-          short serverId = new Short(str[0]);
+
+          if (debugEnabled())
+            TRACER.debugInfo(
+                "In " + this.replicationServer.getMonitorInstanceName() +
+                " Read tag baseDn generationId=" + stringData);
+
+          String[] str = stringData.split(FIELD_SEPARATOR, 3);
+          if (str[0].equals(GENERATION_ID_TAG))
+          {
+            long generationId=-1;
+
+            DN baseDn;
+
+            try
+            {
+              // <generationId>
+              generationId = new Long(str[1]);
+            }
+            catch (NumberFormatException e)
+            {
+              // should never happen
+              // TODO: i18n
+              throw new ReplicationDBException(Message.raw(
+                  "replicationServer state database has a wrong format: " +
+                  e.getLocalizedMessage()
+                  + "<" + str[1] + ">"));
+            }
+
+            // <baseDn>
+            baseDn = null;
+            try
+            {
+              baseDn = DN.decode(str[2]);
+            } catch (DirectoryException e)
+            {
+              Message message =
+                ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
+              logError(message);
+
+            }
+
+            if (debugEnabled())
+              TRACER.debugInfo(
+                "In " + this.replicationServer.getMonitorInstanceName() +
+                " Has read baseDn=" + baseDn
+                + " generationId=" + generationId);
+
+            replicationServer.getReplicationCache(baseDn, true).
+            setGenerationId(generationId, true);
+          }
+        }
+        catch (UnsupportedEncodingException e)
+        {
+          // should never happens
+          // TODO: i18n
+          throw new ReplicationDBException(Message.raw("need UTF-8 support"));
+        }
+        status = cursor.getNext(key, data, LockMode.DEFAULT);
+      }
+
+      /*
+       * Get the server Id / domain base DN records
+       */
+      status = cursor.getFirst(key, data, LockMode.DEFAULT);
+      while (status == OperationStatus.SUCCESS)
+      {
+        String stringData = null;
+        try
+        {
+          stringData = new String(data.getData(), "UTF-8");
+        }
+        catch (UnsupportedEncodingException e)
+        {
+          // should never happens
+          // TODO: i18n
+          throw new ReplicationDBException(Message.raw(
+          "need UTF-8 support"));
+        }
+
+        if (debugEnabled())
+          TRACER.debugInfo(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " Read serverId BaseDN=" + stringData);
+
+        String[] str = stringData.split(FIELD_SEPARATOR, 2);
+        if (!str[0].equals(GENERATION_ID_TAG))
+        {
+          short serverId = -1;
+          try
+          {
+            // <serverId>
+            serverId = new Short(str[0]);
+          } catch (NumberFormatException e)
+          {
+            // should never happen
+            // TODO: i18n
+            throw new ReplicationDBException(Message.raw(
+                "replicationServer state database has a wrong format: " +
+                e.getLocalizedMessage()
+                + "<" + str[0] + ">"));
+          }
+          // <baseDn>
           DN baseDn = null;
           try
           {
@@ -134,113 +246,302 @@
           } catch (DirectoryException e)
           {
             Message message =
-                ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
+              ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
             logError(message);
           }
+
+          if (debugEnabled())
+            TRACER.debugInfo(
+              "In " + this.replicationServer.getMonitorInstanceName() +
+              " Has read: baseDn=" + baseDn
+              + " serverId=" + serverId);
+
           DbHandler dbHandler =
-            new DbHandler(serverId, baseDn, replicationServer, this);
-          replicationServer.getReplicationCache(baseDn).newDb(serverId,
-                                                            dbHandler);
-        } catch (NumberFormatException e)
-        {
-          // should never happen
-          // TODO: i18n
-          throw new ReplicationDBException(Message.raw(
-              "replicationServer state database has a wrong format"));
-        } catch (UnsupportedEncodingException e)
-        {
-          // should never happens
-          // TODO: i18n
-          throw new ReplicationDBException(Message.raw(
-                  "need UTF-8 support"));
+            new DbHandler(serverId, baseDn, replicationServer, this, 1);
+
+          replicationServer.getReplicationCache(baseDn, true).
+          setDbHandler(serverId, dbHandler);
         }
+
         status = cursor.getNext(key, data, LockMode.DEFAULT);
       }
       cursor.close();
 
-    } catch (DatabaseException dbe) {
+    }
+    catch (DatabaseException dbe)
+    {
       cursor.close();
       throw dbe;
     }
   }
 
-  /**
-   * Find or create the database used to store changes from the server
-   * with the given serverId and the given baseDn.
-   * @param serverId The server id that identifies the server.
-   * @param baseDn The baseDn that identifies the server.
-   * @return the Database.
-   * @throws DatabaseException in case of underlying Exception.
-   */
-  public Database getOrAddDb(Short serverId, DN baseDn)
-                  throws DatabaseException
-  {
-    try
+    /**
+     * Finds or creates the database used to store changes from the server
+     * with the given serverId and the given baseDn.
+     *
+     * @param serverId     The server id that identifies the server.
+     * @param baseDn       The baseDn that identifies the domain.
+     * @param generationId The generationId associated to this domain.
+     * @return the Database.
+     * @throws DatabaseException in case of underlying Exception.
+     */
+    public Database getOrAddDb(Short serverId, DN baseDn, Long generationId)
+    throws DatabaseException
     {
-      String stringId = serverId.toString() + " " + baseDn.toNormalizedString();
-      byte[] byteId;
-
-      byteId = stringId.getBytes("UTF-8");
-
-      // Open the database. Create it if it does not already exist.
-      DatabaseConfig dbConfig = new DatabaseConfig();
-      dbConfig.setAllowCreate(true);
-      dbConfig.setTransactional(true);
-      Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
-
-      DatabaseEntry key = new DatabaseEntry();
-      key.setData(byteId);
-      DatabaseEntry data = new DatabaseEntry();
-      OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
-      if (status == OperationStatus.NOTFOUND)
+      if (debugEnabled())
+        TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " +
+          serverId + " " + baseDn + " " + generationId);
+      try
       {
-        Transaction txn = dbEnvironment.beginTransaction(null, null);
-        try {
-          data.setData(byteId);
-          stateDb.put(txn, key, data);
-          txn.commitWriteNoSync();
-        } catch (DatabaseException dbe)
+        String stringId = serverId.toString() + FIELD_SEPARATOR
+                          + baseDn.toNormalizedString();
+
+        // Opens the database for the changes received from this server
+        // on this domain. Create it if it does not already exist.
+        DatabaseConfig dbConfig = new DatabaseConfig();
+        dbConfig.setAllowCreate(true);
+        dbConfig.setTransactional(true);
+        Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
+
+        // Creates the record serverId/domain base Dn in the stateDb
+        // if it does not already exist.
+        byte[] byteId;
+        byteId = stringId.getBytes("UTF-8");
+        DatabaseEntry key = new DatabaseEntry();
+        key.setData(byteId);
+        DatabaseEntry data = new DatabaseEntry();
+        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
+        if (status == OperationStatus.NOTFOUND)
         {
-          // Abort the txn and propagate the Exception to the caller
-          txn.abort();
-          throw dbe;
+          Transaction txn = dbEnvironment.beginTransaction(null, null);
+          try {
+            data.setData(byteId);
+            if (debugEnabled())
+              TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
+                " serverId/Domain=<"+stringId+">");
+            stateDb.put(txn, key, data);
+            txn.commitWriteNoSync();
+          } catch (DatabaseException dbe)
+          {
+            // Abort the txn and propagate the Exception to the caller
+            txn.abort();
+            throw dbe;
+          }
+        }
+
+        // Creates the record domain base Dn/ generationId in the stateDb
+        // if it does not already exist.
+        stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
+          baseDn.toNormalizedString();
+        String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
+        generationId.toString() + FIELD_SEPARATOR +
+          baseDn.toNormalizedString();
+        byteId = stringId.getBytes("UTF-8");
+        byte[] dataByteId;
+        dataByteId = dataStringId.getBytes("UTF-8");
+        key = new DatabaseEntry();
+        key.setData(byteId);
+        data = new DatabaseEntry();
+        status = stateDb.get(null, key, data, LockMode.DEFAULT);
+        if (status == OperationStatus.NOTFOUND)
+        {
+          Transaction txn = dbEnvironment.beginTransaction(null, null);
+          try {
+            data.setData(dataByteId);
+            if (debugEnabled())
+              TRACER.debugInfo(
+                  "Created in the state Db record Tag/Domain/GenId key=" +
+                  stringId + " value=" + dataStringId);
+            stateDb.put(txn, key, data);
+            txn.commitWriteNoSync();
+          } catch (DatabaseException dbe)
+          {
+            // Abort the txn and propagate the Exception to the caller
+            txn.abort();
+            throw dbe;
+          }
+        }
+        return db;
+      }
+      catch (UnsupportedEncodingException e)
+      {
+        // can't happen
+        return null;
+      }
+    }
+
+    /**
+     * Creates a new transaction.
+     *
+     * @return the transaction.
+     * @throws DatabaseException in case of underlying database Exception.
+     */
+    public Transaction beginTransaction() throws DatabaseException
+    {
+      return dbEnvironment.beginTransaction(null, null);
+    }
+
+    /**
+     * Shutdown the Db environment.
+     */
+    public void shutdown()
+    {
+      try
+      {
+        stateDb.close();
+        dbEnvironment.close();
+      } catch (DatabaseException e)
+      {
+        MessageBuilder mb = new MessageBuilder();
+        mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
+        mb.append(stackTraceToSingleLineString(e));
+        logError(mb.toMessage());
+      }
+    }
+
+    /**
+     * Clears the provided generationId associated to the provided baseDn
+     * from the state Db.
+     *
+     * @param baseDn The baseDn for which the generationID must be cleared.
+     *
+     */
+    public void clearGenerationId(DN baseDn)
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+          " clearGenerationId " + baseDn);
+      try
+      {
+        // Deletes the record domain base Dn/ generationId in the stateDb
+        String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
+          baseDn.toNormalizedString();
+        byte[] byteId = stringId.getBytes("UTF-8");
+        DatabaseEntry key = new DatabaseEntry();
+        key.setData(byteId);
+        DatabaseEntry data = new DatabaseEntry();
+        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
+        if ((status == OperationStatus.SUCCESS) ||
+            (status == OperationStatus.KEYEXIST))
+        {
+          Transaction txn = dbEnvironment.beginTransaction(null, null);
+          try
+          {
+            stateDb.delete(txn, key);
+            txn.commitWriteNoSync();
+            if (debugEnabled())
+              TRACER.debugInfo(
+                "In " + this.replicationServer.getMonitorInstanceName() +
+                " clearGenerationId (" +
+                baseDn +") succeeded.");
+          }
+          catch (DatabaseException dbe)
+          {
+            // Abort the txn and propagate the Exception to the caller
+            txn.abort();
+            throw dbe;
+          }
+        }
+        else
+        {
+          // TODO : should have a better error logging
+          if (debugEnabled())
+            TRACER.debugInfo(
+              "In " + this.replicationServer.getMonitorInstanceName() +
+              " clearGenerationId ("+ baseDn + " failed" + status.toString());
         }
       }
-      return db;
-    } catch (UnsupportedEncodingException e)
-    {
-      // can't happen
-      return null;
+      catch (UnsupportedEncodingException e)
+      {
+        // can't happen
+      }
+      catch (DatabaseException dbe)
+      {
+        // can't happen
+      }
     }
-  }
 
-  /**
-   * Creates a new transaction.
-   *
-   * @return the transaction.
-   * @throws DatabaseException in case of underlying database Exception.
-   */
-  public Transaction beginTransaction() throws DatabaseException
-  {
-    return dbEnvironment.beginTransaction(null, null);
-  }
+    /**
+     * Clears the provided serverId associated to the provided baseDn
+     * from the state Db.
+     *
+     * @param baseDn The baseDn for which the generationID must be cleared.
+     * @param serverId The serverId to remove from the Db.
+     *
+     */
+    public void clearServerId(DN baseDn, Short serverId)
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId);
+      try
+      {
+        String stringId = serverId.toString() + FIELD_SEPARATOR
+                         + baseDn.toNormalizedString();
 
-  /**
-   * Shutdown the Db environment.
-   */
-  public void shutdown()
-  {
-    try
-    {
-      stateDb.close();
-      dbEnvironment.close();
-    } catch (DatabaseException e)
-    {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
+        // Deletes the record serverId/domain base Dn in the stateDb
+        byte[] byteId;
+        byteId = stringId.getBytes("UTF-8");
+        DatabaseEntry key = new DatabaseEntry();
+        key.setData(byteId);
+        DatabaseEntry data = new DatabaseEntry();
+        OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
+        if (status != OperationStatus.NOTFOUND)
+        {
+          Transaction txn = dbEnvironment.beginTransaction(null, null);
+          try {
+            data.setData(byteId);
+            stateDb.delete(txn, key);
+            txn.commitWriteNoSync();
+            if (debugEnabled())
+              TRACER.debugInfo(
+                  " In " + this.replicationServer.getMonitorInstanceName() +
+                  " clearServerId() succeeded " + baseDn + " " +
+                  serverId);
+          }
+          catch (DatabaseException dbe)
+          {
+            // Abort the txn and propagate the Exception to the caller
+            txn.abort();
+            throw dbe;
+          }
+        }
+      }
+      catch (UnsupportedEncodingException e)
+      {
+        // can't happen
+      }
+      catch (DatabaseException dbe)
+      {
+        // can't happen
+      }
     }
-  }
 
+    /**
+     * Clears the database.
+     *
+     * @param databaseName The name of the database to clear.
+     */
+    public final void clearDb(String databaseName)
+    {
+      try
+      {
+        if (debugEnabled())
+          TRACER.debugInfo(
+              "In " + this.replicationServer.getMonitorInstanceName() +
+              "clearDb" + databaseName);
+
+        Transaction txn = dbEnvironment.beginTransaction(null, null);
+        dbEnvironment.truncateDatabase(txn, databaseName, false);
+      }
+      catch (DatabaseException dbe)
+      {
+        MessageBuilder mb = new MessageBuilder();
+        mb.append(ERR_ERROR_CLEARING_DB.get(databaseName,
+            dbe.getLocalizedMessage()));
+        logError(mb.toMessage());
+      }
+    }
 }

--
Gitblit v1.10.0