From 3a5116b0fb968711531c50d243d94d2cdfd664be Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Tue, 15 Jun 2010 14:43:34 +0000
Subject: [PATCH] Enhance replication conflict attributes comparator, enable/disable replication domain, and some error message for diagnostic

---
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java |  188 ++++++++++++++++++++++++----------------------
 1 files changed, 99 insertions(+), 89 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 77a8d87..23dff58 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -32,40 +32,19 @@
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.replication.common.StatusMachine.*;
 
-import org.opends.server.replication.common.ChangeNumberGenerator;
-
 import java.io.BufferedOutputStream;
-
-import org.opends.server.tasks.InitializeTargetTask;
-import org.opends.server.tasks.InitializeTask;
-import org.opends.server.types.Attribute;
-
-import org.opends.server.core.DirectoryServer;
-
-
-import java.util.Set;
-
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-
-import java.util.HashMap;
-
-import java.util.Map;
-
-import org.opends.server.config.ConfigException;
-import java.util.Collection;
-
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -77,9 +56,14 @@
 import org.opends.messages.Severity;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.backends.task.Task;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.AssuredMode;
 import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.common.StatusMachine;
@@ -90,15 +74,20 @@
 import org.opends.server.replication.protocol.EntryMsg;
 import org.opends.server.replication.protocol.ErrorMsg;
 import org.opends.server.replication.protocol.HeartbeatMsg;
+import org.opends.server.replication.protocol.InitializeRcvAckMsg;
 import org.opends.server.replication.protocol.InitializeRequestMsg;
 import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.InitializeRcvAckMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
 import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
 import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.tasks.InitializeTargetTask;
+import org.opends.server.tasks.InitializeTask;
+import org.opends.server.types.Attribute;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.ResultCode;
 
@@ -229,7 +218,7 @@
   private byte groupId = (byte)1;
   // Referrals urls to be published to other servers of the topology
   // TODO: fill that with all currently opened urls if no urls configured
-  private List<String> refUrls = new ArrayList<String>();
+  private final List<String> refUrls = new ArrayList<String>();
 
   /**
    * A set of counters used for Monitoring.
@@ -320,6 +309,12 @@
   Set<String> crossServersECLIncludes = new HashSet<String>();
 
   /**
+   * An object used to protect the initialization of the underlying broker
+   * session of this ReplicationDomain.
+   */
+  private final Object sessionLock = new Object();
+
+  /**
    * Returns the {@link ChangeNumberGenerator} that will be used to
    * generate {@link ChangeNumber} for this domain.
    *
@@ -1065,8 +1060,8 @@
   private class ExportThread extends DirectoryThread
   {
     // Id of server that will be initialized
-    private int serverToInitialize;
-    private int initWindow;
+    private final int serverToInitialize;
+    private final int initWindow;
 
     /**
      * Constructor for the ExportThread.
@@ -1153,7 +1148,8 @@
 
     // Flow control during initialization
     // - for each remote server, counter of messages received
-    private HashMap<Integer, Integer> ackVals = new HashMap<Integer, Integer>();
+    private final HashMap<Integer, Integer> ackVals =
+      new HashMap<Integer, Integer>();
     // - serverId of the slowest server (the one with the smallest non null
     //   counter)
     private int slowestServerId = -1;
@@ -3016,27 +3012,30 @@
       long heartbeatInterval, long changetimeHeartbeatInterval)
   throws ConfigException
   {
-    if (broker == null)
+    synchronized (sessionLock)
     {
-      /*
-       * create the broker object used to publish and receive changes
-       */
-      broker = new ReplicationBroker(
-          this, state, serviceID,
-          serverID, window,
-          getGenerationID(),
-          heartbeatInterval,
-          new ReplSessionSecurity(),
-          getGroupId(),
-          changetimeHeartbeatInterval);
+      if (broker == null)
+      {
+        /*
+         * create the broker object used to publish and receive changes
+         */
+        broker = new ReplicationBroker(
+            this, state, serviceID,
+            serverID, window,
+            getGenerationID(),
+            heartbeatInterval,
+            new ReplSessionSecurity(),
+            getGroupId(),
+            changetimeHeartbeatInterval);
 
-      broker.start(replicationServers);
+        broker.start(replicationServers);
 
-      /*
-       * Create a replication monitor object responsible for publishing
-       * monitoring information below cn=monitor.
-       */
-      monitor = new ReplicationMonitor(this);
+        /*
+         * Create a replication monitor object responsible for publishing
+         * monitoring information below cn=monitor.
+         */
+        monitor = new ReplicationMonitor(this);
+      }
 
       DirectoryServer.registerMonitorProvider(monitor);
     }
@@ -3060,29 +3059,32 @@
       long heartbeatInterval)
   throws ConfigException
   {
-    if (broker == null)
+    synchronized (sessionLock)
     {
-      /*
-       * create the broker object used to publish and receive changes
-       */
-      broker = new ReplicationBroker(
-          this, state, serviceID,
-          serverID, window,
-          getGenerationID(),
-          heartbeatInterval,
-          new ReplSessionSecurity(),
-          getGroupId(),
-          0); // change time heartbeat is disabled
+      if (broker == null)
+      {
+        /*
+         * create the broker object used to publish and receive changes
+         */
+        broker = new ReplicationBroker(
+            this, state, serviceID,
+            serverID, window,
+            getGenerationID(),
+            heartbeatInterval,
+            new ReplSessionSecurity(),
+            getGroupId(),
+            0); // change time heartbeat is disabled
 
-      broker.start(replicationServers);
+        broker.start(replicationServers);
 
-      /*
-       * Create a replication monitor object responsible for publishing
-       * monitoring information below cn=monitor.
-       */
-      monitor = new ReplicationMonitor(this);
+        /*
+         * Create a replication monitor object responsible for publishing
+         * monitoring information below cn=monitor.
+         */
+        monitor = new ReplicationMonitor(this);
 
-      DirectoryServer.registerMonitorProvider(monitor);
+        DirectoryServer.registerMonitorProvider(monitor);
+      }
     }
   }
 
@@ -3098,10 +3100,13 @@
    */
   public void startListenService()
   {
-    //
-    // Create the listener thread
-    listenerThread = new ListenerThread(this);
-    listenerThread.start();
+    synchronized (sessionLock)
+    {
+      //
+      // Create the listener thread
+      listenerThread = new ListenerThread(this);
+      listenerThread.start();
+    }
   }
 
   /**
@@ -3116,21 +3121,23 @@
    */
   public void disableService()
   {
-    // Stop the listener thread
-    if (listenerThread != null)
+    synchronized (sessionLock)
     {
-      listenerThread.shutdown();
+      // Stop the listener thread
+      if (listenerThread != null)
+      {
+        listenerThread.shutdown();
+      }
+
+      if (broker != null)
+      {
+        broker.stop();
+      }
+
+      // Wait for the listener thread to stop
+      if (listenerThread != null)
+        listenerThread.waitForShutdown();
     }
-
-    if (broker != null)
-    {
-      broker.stop();
-    }
-
-    // Wait for the listener thread to stop
-    if (listenerThread != null)
-      listenerThread.waitForShutdown();
-
   }
 
   /**
@@ -3147,11 +3154,14 @@
    */
   public void enableService()
   {
-    broker.start();
+    synchronized (sessionLock)
+    {
+      broker.start();
 
-    // Create the listener thread
-    listenerThread = new ListenerThread(this);
-    listenerThread.start();
+      // Create the listener thread
+      listenerThread = new ListenerThread(this);
+      listenerThread.start();
+    }
   }
 
   /**

--
Gitblit v1.10.0