From 1c9161da82b4128644d12f9df1812960ec48b395 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Tue, 15 Jun 2010 14:43:34 +0000
Subject: [PATCH] Enhance replication conflict attributes comparator, enable/disable replication domain, and some error message for diagnostic
---
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 17 +++-
opendj-sdk/opends/src/messages/messages/replication.properties | 2
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 188 ++++++++++++++++++++++++----------------------
4 files changed, 115 insertions(+), 95 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 0189c92..c62dcf4 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -87,7 +87,7 @@
replication server that has seen all the local changes on suffix %s. Found %d \
replications server(s) not up to date. Going to replay changes
NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \
- server on suffix %s, retrying...
+ server on suffix %s among the following RS candidates %s, retrying...
NOTICE_EXCEPTION_CLOSING_DATABASE_24=Error closing Replication Server database \
%s :
SEVERE_ERR_EXCEPTION_DECODING_OPERATION_25=Error trying to replay %s, \
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 513e014..e406d54 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -2538,7 +2538,7 @@
AttributeType attrType =
DirectoryServer.getAttributeType(DS_SYNC_CONFLICT, true);
Attribute attr = Attributes.create(attrType, AttributeValues.create(
- attrType, targetDN.toString()));
+ attrType, targetDN.toNormalizedString()));
Modification mod = new Modification(ModificationType.REPLACE, attr);
newOp.addModification(mod);
}
@@ -3413,7 +3413,7 @@
AttributeType attrType = DirectoryServer.getAttributeType(DS_SYNC_CONFLICT,
true);
Attribute attr = Attributes.create(attrType, AttributeValues.create(
- attrType, conflictDN.toString()));
+ attrType, conflictDN.toNormalizedString()));
List<Modification> mods = new ArrayList<Modification>();
Modification mod = new Modification(ModificationType.REPLACE, attr);
mods.add(mod);
@@ -3458,14 +3458,23 @@
*/
private void addConflict(AddMsg msg) throws ASN1Exception
{
+ String normalizedDN;
+ try
+ {
+ normalizedDN = DN.decode(msg.getDn()).toNormalizedString();
+ } catch (DirectoryException e)
+ {
+ normalizedDN = msg.getDn();
+ }
+
// Generate an alert to let the administrator know that some
// conflict could not be solved.
- Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(msg.getDn());
+ Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(normalizedDN);
DirectoryServer.sendAlertNotification(this,
ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
// Add the conflict attribute
- msg.addAttribute(DS_SYNC_CONFLICT, msg.getDn());
+ msg.addAttribute(DS_SYNC_CONFLICT, normalizedDN);
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 32a4d33..a4ca80e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -865,6 +865,7 @@
// Get info from every available replication servers
replicationServerInfos = collectReplicationServersInfo();
+ String rsis = replicationServerInfos.toString();
ReplicationServerInfo replicationServerInfo = null;
@@ -1057,7 +1058,7 @@
connectionError = true;
connectPhaseLock.notify();
Message message =
- NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
+ NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString(), rsis);
logError(message);
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 77a8d87..23dff58 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -32,40 +32,19 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.common.StatusMachine.*;
-import org.opends.server.replication.common.ChangeNumberGenerator;
-
import java.io.BufferedOutputStream;
-
-import org.opends.server.tasks.InitializeTargetTask;
-import org.opends.server.tasks.InitializeTask;
-import org.opends.server.types.Attribute;
-
-import org.opends.server.core.DirectoryServer;
-
-
-import java.util.Set;
-
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-
-import java.util.HashMap;
-
-import java.util.Map;
-
-import org.opends.server.config.ConfigException;
-import java.util.Collection;
-
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -77,9 +56,14 @@
import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
@@ -90,15 +74,20 @@
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatMsg;
+import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.tasks.InitializeTargetTask;
+import org.opends.server.tasks.InitializeTask;
+import org.opends.server.types.Attribute;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
@@ -229,7 +218,7 @@
private byte groupId = (byte)1;
// Referrals urls to be published to other servers of the topology
// TODO: fill that with all currently opened urls if no urls configured
- private List<String> refUrls = new ArrayList<String>();
+ private final List<String> refUrls = new ArrayList<String>();
/**
* A set of counters used for Monitoring.
@@ -320,6 +309,12 @@
Set<String> crossServersECLIncludes = new HashSet<String>();
/**
+ * An object used to protect the initialization of the underlying broker
+ * session of this ReplicationDomain.
+ */
+ private final Object sessionLock = new Object();
+
+ /**
* Returns the {@link ChangeNumberGenerator} that will be used to
* generate {@link ChangeNumber} for this domain.
*
@@ -1065,8 +1060,8 @@
private class ExportThread extends DirectoryThread
{
// Id of server that will be initialized
- private int serverToInitialize;
- private int initWindow;
+ private final int serverToInitialize;
+ private final int initWindow;
/**
* Constructor for the ExportThread.
@@ -1153,7 +1148,8 @@
// Flow control during initialization
// - for each remote server, counter of messages received
- private HashMap<Integer, Integer> ackVals = new HashMap<Integer, Integer>();
+ private final HashMap<Integer, Integer> ackVals =
+ new HashMap<Integer, Integer>();
// - serverId of the slowest server (the one with the smallest non null
// counter)
private int slowestServerId = -1;
@@ -3016,27 +3012,30 @@
long heartbeatInterval, long changetimeHeartbeatInterval)
throws ConfigException
{
- if (broker == null)
+ synchronized (sessionLock)
{
- /*
- * create the broker object used to publish and receive changes
- */
- broker = new ReplicationBroker(
- this, state, serviceID,
- serverID, window,
- getGenerationID(),
- heartbeatInterval,
- new ReplSessionSecurity(),
- getGroupId(),
- changetimeHeartbeatInterval);
+ if (broker == null)
+ {
+ /*
+ * create the broker object used to publish and receive changes
+ */
+ broker = new ReplicationBroker(
+ this, state, serviceID,
+ serverID, window,
+ getGenerationID(),
+ heartbeatInterval,
+ new ReplSessionSecurity(),
+ getGroupId(),
+ changetimeHeartbeatInterval);
- broker.start(replicationServers);
+ broker.start(replicationServers);
- /*
- * Create a replication monitor object responsible for publishing
- * monitoring information below cn=monitor.
- */
- monitor = new ReplicationMonitor(this);
+ /*
+ * Create a replication monitor object responsible for publishing
+ * monitoring information below cn=monitor.
+ */
+ monitor = new ReplicationMonitor(this);
+ }
DirectoryServer.registerMonitorProvider(monitor);
}
@@ -3060,29 +3059,32 @@
long heartbeatInterval)
throws ConfigException
{
- if (broker == null)
+ synchronized (sessionLock)
{
- /*
- * create the broker object used to publish and receive changes
- */
- broker = new ReplicationBroker(
- this, state, serviceID,
- serverID, window,
- getGenerationID(),
- heartbeatInterval,
- new ReplSessionSecurity(),
- getGroupId(),
- 0); // change time heartbeat is disabled
+ if (broker == null)
+ {
+ /*
+ * create the broker object used to publish and receive changes
+ */
+ broker = new ReplicationBroker(
+ this, state, serviceID,
+ serverID, window,
+ getGenerationID(),
+ heartbeatInterval,
+ new ReplSessionSecurity(),
+ getGroupId(),
+ 0); // change time heartbeat is disabled
- broker.start(replicationServers);
+ broker.start(replicationServers);
- /*
- * Create a replication monitor object responsible for publishing
- * monitoring information below cn=monitor.
- */
- monitor = new ReplicationMonitor(this);
+ /*
+ * Create a replication monitor object responsible for publishing
+ * monitoring information below cn=monitor.
+ */
+ monitor = new ReplicationMonitor(this);
- DirectoryServer.registerMonitorProvider(monitor);
+ DirectoryServer.registerMonitorProvider(monitor);
+ }
}
}
@@ -3098,10 +3100,13 @@
*/
public void startListenService()
{
- //
- // Create the listener thread
- listenerThread = new ListenerThread(this);
- listenerThread.start();
+ synchronized (sessionLock)
+ {
+ //
+ // Create the listener thread
+ listenerThread = new ListenerThread(this);
+ listenerThread.start();
+ }
}
/**
@@ -3116,21 +3121,23 @@
*/
public void disableService()
{
- // Stop the listener thread
- if (listenerThread != null)
+ synchronized (sessionLock)
{
- listenerThread.shutdown();
+ // Stop the listener thread
+ if (listenerThread != null)
+ {
+ listenerThread.shutdown();
+ }
+
+ if (broker != null)
+ {
+ broker.stop();
+ }
+
+ // Wait for the listener thread to stop
+ if (listenerThread != null)
+ listenerThread.waitForShutdown();
}
-
- if (broker != null)
- {
- broker.stop();
- }
-
- // Wait for the listener thread to stop
- if (listenerThread != null)
- listenerThread.waitForShutdown();
-
}
/**
@@ -3147,11 +3154,14 @@
*/
public void enableService()
{
- broker.start();
+ synchronized (sessionLock)
+ {
+ broker.start();
- // Create the listener thread
- listenerThread = new ListenerThread(this);
- listenerThread.start();
+ // Create the listener thread
+ listenerThread = new ListenerThread(this);
+ listenerThread.start();
+ }
}
/**
--
Gitblit v1.10.0