From 3a5116b0fb968711531c50d243d94d2cdfd664be Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Tue, 15 Jun 2010 14:43:34 +0000
Subject: [PATCH] Enhance replication conflict attributes comparator, enable/disable replication domain, and some error message for diagnostic
---
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 188 ++++++++++++++++++++++++----------------------
1 files changed, 99 insertions(+), 89 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 77a8d87..23dff58 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -32,40 +32,19 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.common.StatusMachine.*;
-import org.opends.server.replication.common.ChangeNumberGenerator;
-
import java.io.BufferedOutputStream;
-
-import org.opends.server.tasks.InitializeTargetTask;
-import org.opends.server.tasks.InitializeTask;
-import org.opends.server.types.Attribute;
-
-import org.opends.server.core.DirectoryServer;
-
-
-import java.util.Set;
-
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-
-import java.util.HashMap;
-
-import java.util.Map;
-
-import org.opends.server.config.ConfigException;
-import java.util.Collection;
-
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -77,9 +56,14 @@
import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
@@ -90,15 +74,20 @@
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatMsg;
+import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.tasks.InitializeTargetTask;
+import org.opends.server.tasks.InitializeTask;
+import org.opends.server.types.Attribute;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
@@ -229,7 +218,7 @@
private byte groupId = (byte)1;
// Referrals urls to be published to other servers of the topology
// TODO: fill that with all currently opened urls if no urls configured
- private List<String> refUrls = new ArrayList<String>();
+ private final List<String> refUrls = new ArrayList<String>();
/**
* A set of counters used for Monitoring.
@@ -320,6 +309,12 @@
Set<String> crossServersECLIncludes = new HashSet<String>();
/**
+ * An object used to protect the initialization of the underlying broker
+ * session of this ReplicationDomain.
+ */
+ private final Object sessionLock = new Object();
+
+ /**
* Returns the {@link ChangeNumberGenerator} that will be used to
* generate {@link ChangeNumber} for this domain.
*
@@ -1065,8 +1060,8 @@
private class ExportThread extends DirectoryThread
{
// Id of server that will be initialized
- private int serverToInitialize;
- private int initWindow;
+ private final int serverToInitialize;
+ private final int initWindow;
/**
* Constructor for the ExportThread.
@@ -1153,7 +1148,8 @@
// Flow control during initialization
// - for each remote server, counter of messages received
- private HashMap<Integer, Integer> ackVals = new HashMap<Integer, Integer>();
+ private final HashMap<Integer, Integer> ackVals =
+ new HashMap<Integer, Integer>();
// - serverId of the slowest server (the one with the smallest non null
// counter)
private int slowestServerId = -1;
@@ -3016,27 +3012,30 @@
long heartbeatInterval, long changetimeHeartbeatInterval)
throws ConfigException
{
- if (broker == null)
+ synchronized (sessionLock)
{
- /*
- * create the broker object used to publish and receive changes
- */
- broker = new ReplicationBroker(
- this, state, serviceID,
- serverID, window,
- getGenerationID(),
- heartbeatInterval,
- new ReplSessionSecurity(),
- getGroupId(),
- changetimeHeartbeatInterval);
+ if (broker == null)
+ {
+ /*
+ * create the broker object used to publish and receive changes
+ */
+ broker = new ReplicationBroker(
+ this, state, serviceID,
+ serverID, window,
+ getGenerationID(),
+ heartbeatInterval,
+ new ReplSessionSecurity(),
+ getGroupId(),
+ changetimeHeartbeatInterval);
- broker.start(replicationServers);
+ broker.start(replicationServers);
- /*
- * Create a replication monitor object responsible for publishing
- * monitoring information below cn=monitor.
- */
- monitor = new ReplicationMonitor(this);
+ /*
+ * Create a replication monitor object responsible for publishing
+ * monitoring information below cn=monitor.
+ */
+ monitor = new ReplicationMonitor(this);
+ }
DirectoryServer.registerMonitorProvider(monitor);
}
@@ -3060,29 +3059,32 @@
long heartbeatInterval)
throws ConfigException
{
- if (broker == null)
+ synchronized (sessionLock)
{
- /*
- * create the broker object used to publish and receive changes
- */
- broker = new ReplicationBroker(
- this, state, serviceID,
- serverID, window,
- getGenerationID(),
- heartbeatInterval,
- new ReplSessionSecurity(),
- getGroupId(),
- 0); // change time heartbeat is disabled
+ if (broker == null)
+ {
+ /*
+ * create the broker object used to publish and receive changes
+ */
+ broker = new ReplicationBroker(
+ this, state, serviceID,
+ serverID, window,
+ getGenerationID(),
+ heartbeatInterval,
+ new ReplSessionSecurity(),
+ getGroupId(),
+ 0); // change time heartbeat is disabled
- broker.start(replicationServers);
+ broker.start(replicationServers);
- /*
- * Create a replication monitor object responsible for publishing
- * monitoring information below cn=monitor.
- */
- monitor = new ReplicationMonitor(this);
+ /*
+ * Create a replication monitor object responsible for publishing
+ * monitoring information below cn=monitor.
+ */
+ monitor = new ReplicationMonitor(this);
- DirectoryServer.registerMonitorProvider(monitor);
+ DirectoryServer.registerMonitorProvider(monitor);
+ }
}
}
@@ -3098,10 +3100,13 @@
*/
public void startListenService()
{
- //
- // Create the listener thread
- listenerThread = new ListenerThread(this);
- listenerThread.start();
+ synchronized (sessionLock)
+ {
+ //
+ // Create the listener thread
+ listenerThread = new ListenerThread(this);
+ listenerThread.start();
+ }
}
/**
@@ -3116,21 +3121,23 @@
*/
public void disableService()
{
- // Stop the listener thread
- if (listenerThread != null)
+ synchronized (sessionLock)
{
- listenerThread.shutdown();
+ // Stop the listener thread
+ if (listenerThread != null)
+ {
+ listenerThread.shutdown();
+ }
+
+ if (broker != null)
+ {
+ broker.stop();
+ }
+
+ // Wait for the listener thread to stop
+ if (listenerThread != null)
+ listenerThread.waitForShutdown();
}
-
- if (broker != null)
- {
- broker.stop();
- }
-
- // Wait for the listener thread to stop
- if (listenerThread != null)
- listenerThread.waitForShutdown();
-
}
/**
@@ -3147,11 +3154,14 @@
*/
public void enableService()
{
- broker.start();
+ synchronized (sessionLock)
+ {
+ broker.start();
- // Create the listener thread
- listenerThread = new ListenerThread(this);
- listenerThread.start();
+ // Create the listener thread
+ listenerThread = new ListenerThread(this);
+ listenerThread.start();
+ }
}
/**
--
Gitblit v1.10.0