From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java |  139 +++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 124 insertions(+), 15 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 2ae06c4..414dd88 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -25,13 +25,13 @@
  *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
-import org.opends.messages.Message;
-
+import org.opends.messages.*;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.messages.ReplicationMessages.*;
 
 import org.opends.messages.MessageBuilder;
 import static org.opends.server.util.StaticUtils.getFileForPath;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.File;
 import java.io.IOException;
@@ -61,6 +61,8 @@
 import org.opends.server.types.ConfigChangeResult;
 import org.opends.server.types.DN;
 import org.opends.server.types.ResultCode;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.loggers.debug.DebugTracer;
 
 import com.sleepycat.je.DatabaseException;
 
@@ -101,12 +103,17 @@
   private int queueSize;
   private String dbDirname = null;
   private long trimAge; // the time (in sec) after which the  changes must
+                        // be deleted from the persistent storage.
   private int replicationPort;
-                        // de deleted from the persistent storage.
   private boolean stopListen = false;
   private ReplSessionSecurity replSessionSecurity;
 
   /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
+  /**
    * Creates a new Replication server using the provided configuration entry.
    *
    * @param configuration The configuration of this replication server.
@@ -191,7 +198,7 @@
         // The socket has probably been closed as part of the
         // shutdown or changing the port number process.
         // just log debug information and loop.
-        Message message = DEBUG_REPLICATION_PORT_IOEXCEPTION.get();
+        Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
         logError(message);
       }
     }
@@ -272,6 +279,10 @@
     String hostname = serverURL.substring(0, separator);
     boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL);
 
+    if (debugEnabled())
+      TRACER.debugInfo("RS " + this.getMonitorInstanceName() +
+               " connects to " + serverURL);
+
     try
     {
       InetSocketAddress ServerAddr = new InetSocketAddress(
@@ -329,22 +340,41 @@
       listenSocket.bind(new InetSocketAddress(changelogPort));
 
       /*
-       * create working threads
+       * creates working threads
+       * We must first connect, then start to listen.
        */
-      listenThread =
-        new ReplicationServerListenThread("Replication Server Listener", this);
-      listenThread.start();
+      if (debugEnabled())
+        TRACER.debugInfo("RS " +getMonitorInstanceName()+
+            " creates connect threads");
       connectThread =
         new ReplicationServerConnectThread("Replication Server Connect", this);
       connectThread.start();
 
+      // FIXME : Is it better to have the time to receive the ReplServerInfo
+      // from all the other replication servers since this info is necessary
+      // to route an early received total update request.
+
+      if (debugEnabled())
+        TRACER.debugInfo("RS " +getMonitorInstanceName()+
+            " creates listen threads");
+
+      listenThread =
+        new ReplicationServerListenThread("Replication Server Listener", this);
+      listenThread.start();
+
+      if (debugEnabled())
+        TRACER.debugInfo("RS " +getMonitorInstanceName()+
+            " successfully initialized");
+
     } catch (DatabaseException e)
     {
-      Message message = ERR_COULD_NOT_INITIALIZE_DB.get(dbDirname);
+      Message message = ERR_COULD_NOT_INITIALIZE_DB.get(
+        getFileForPath(dbDirname).getAbsolutePath());
       logError(message);
     } catch (ReplicationDBException e)
     {
-      Message message = ERR_COULD_NOT_READ_DB.get(dbDirname);
+      Message message = ERR_COULD_NOT_READ_DB.get(dbDirname,
+          e.getLocalizedMessage());
       logError(message);
     } catch (UnknownHostException e)
     {
@@ -362,18 +392,22 @@
    * Get the ReplicationCache associated to the base DN given in parameter.
    *
    * @param baseDn The base Dn for which the ReplicationCache must be returned.
+   * @param create Specifies whether to create the ReplicationCache if it does
+   *        not already exist.
    * @return The ReplicationCache associated to the base DN given in parameter.
    */
-  public ReplicationCache getReplicationCache(DN baseDn)
+  public ReplicationCache getReplicationCache(DN baseDn, boolean create)
   {
     ReplicationCache replicationCache;
 
     synchronized (baseDNs)
     {
       replicationCache = baseDNs.get(baseDn);
-      if (replicationCache == null)
+      if ((replicationCache == null) && (create))
+      {
         replicationCache = new ReplicationCache(baseDn, this);
-      baseDNs.put(baseDn, replicationCache);
+        baseDNs.put(baseDn, replicationCache);
+      }
     }
 
     return replicationCache;
@@ -384,6 +418,9 @@
    */
   public void shutdown()
   {
+    if (shutdown)
+      return;
+
     shutdown = true;
 
     // shutdown the connect thread
@@ -404,6 +441,12 @@
       // replication Server service is closing anyway.
     }
 
+    // shutdown the listen thread
+    if (listenThread != null)
+    {
+      listenThread.interrupt();
+    }
+
     // shutdown all the ChangelogCaches
     for (ReplicationCache replicationCache : baseDNs.values())
     {
@@ -424,13 +467,36 @@
    *
    * @param id The serverId for which the dbHandler must be created.
    * @param baseDn The DN for which the dbHandler muste be created.
+   * @param generationId The generationId for this server and this domain.
    * @return The new DB handler for this ReplicationServer and the serverId and
    *         DN given in parameter.
    * @throws DatabaseException in case of underlying database problem.
    */
-  DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException
+  public DbHandler newDbHandler(short id, DN baseDn, long generationId)
+  throws DatabaseException
   {
-    return new DbHandler(id, baseDn, this, dbEnv);
+    return new DbHandler(id, baseDn, this, dbEnv, generationId);
+  }
+
+  /**
+   * Clears the generationId for the domain related to the provided baseDn.
+   * @param  baseDn The baseDn for which to delete the generationId.
+   * @throws DatabaseException When it occurs.
+   */
+  public void clearGenerationId(DN baseDn)
+  throws DatabaseException
+  {
+    try
+    {
+      dbEnv.clearGenerationId(baseDn);
+    }
+    catch(Exception e)
+    {
+      TRACER.debugInfo(
+          "In RS <" + getMonitorInstanceName() +
+          " Exception in clearGenerationId" +
+          stackTraceToSingleLineString(e) + e.getLocalizedMessage());
+    }
   }
 
   /**
@@ -618,7 +684,50 @@
     Attribute bases = new Attribute(baseType, "base-dn", baseValues);
     attributes.add(bases);
 
+    // Publish to monitor the generation ID by domain
+    AttributeType generationIdType=
+      DirectoryServer.getAttributeType("base-dn-generation-id", true);
+    LinkedHashSet<AttributeValue> generationIdValues =
+      new LinkedHashSet<AttributeValue>();
+    for (DN base : baseDNs.keySet())
+    {
+      long generationId=-1;
+      ReplicationCache cache = getReplicationCache(base, false);
+      if (cache != null)
+        generationId = cache.getGenerationId();
+      generationIdValues.add(new AttributeValue(generationIdType,
+          base.toString() + " " + generationId));
+    }
+    Attribute generationIds = new Attribute(generationIdType, "generation-id",
+        generationIdValues);
+    attributes.add(generationIds);
+
     return attributes;
   }
 
+  /**
+   * Get the value of generationId for the replication domain
+   * associated with the provided baseDN.
+   *
+   * @param baseDN The baseDN of the domain.
+   * @return The value of the generationID.
+   */
+  public long getGenerationId(DN baseDN)
+  {
+    ReplicationCache rc = this.getReplicationCache(baseDN, false);
+    if (rc!=null)
+      return rc.getGenerationId();
+    return -1;
+  }
+
+  /**
+   * Get the serverId for this replication server.
+   *
+   * @return The value of the serverId.
+   *
+   */
+  public short getServerId()
+  {
+    return serverId;
+  }
 }

--
Gitblit v1.10.0