From b891a41e31f548f0ef888cf8de090f4785715169 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 06 Jan 2009 08:52:11 +0000
Subject: [PATCH]
---
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 112 ++++++++++++++++++++++++++++++++++++++++++++------------
1 files changed, 88 insertions(+), 24 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 7c4df2b..1f54b7f 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008 Sun Microsystems, Inc.
+ * Copyright 2008-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.service;
@@ -170,7 +170,7 @@
* The ReplicationBroker that is used by this ReplicationDomain to
* connect to the ReplicationService.
*/
- private ReplicationBroker broker;
+ private ReplicationBroker broker = null;
/**
* This Map is used to store all outgoing assured messages in order
@@ -991,8 +991,6 @@
{
// The task that initiated the operation.
Task initializeTask;
- // The input stream for the import
- ReplInputStream ldifImportInputStream = null;
// The target in the case of an export
short exportTarget = RoutableMsg.UNKNOWN_SERVER;
// The source in the case of an import
@@ -1553,7 +1551,6 @@
ieContext.setCounters(
initializeMessage.getEntryCount(),
initializeMessage.getEntryCount());
- ieContext.ldifImportInputStream = new ReplInputStream(this);
try
{
@@ -1682,6 +1679,52 @@
}
/**
+ * Check the value of the Replication Servers generation ID.
+ *
+ * @param generationID The expected value of the generation ID.
+ *
+ * @throws DirectoryException When the generation ID of the Replication
+ * Servers is not the expected value.
+ */
+ private void checkGenerationID(long generationID) throws DirectoryException
+ {
+ boolean flag = false;
+
+ for (int i = 0; i< 10; i++)
+ {
+ for (RSInfo rsInfo : getRsList())
+ {
+ if (rsInfo.getGenerationId() == generationID)
+ {
+ flag = true;
+ break;
+ }
+ else
+ {
+ try
+ {
+ Thread.sleep(i*100);
+ } catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ if (flag)
+ {
+ break;
+ }
+ }
+
+ if (!flag)
+ {
+ ResultCode resultCode = ResultCode.OTHER;
+ Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
+ throw new DirectoryException(
+ resultCode, message);
+ }
+ }
+
+ /**
* Reset the Replication Log.
* Calling this method will remove all the Replication information that
* was kept on all the Replication Servers currently connected in the
@@ -1693,7 +1736,21 @@
*/
public void resetReplicationLog() throws DirectoryException
{
+ // Reset the Generation ID to -1 to clean the ReplicationServers.
resetGenerationId((long)-1);
+
+ // check that at least one ReplicationServer did change its generation-id
+ checkGenerationID(-1);
+
+ // Reconnect to the Replication Server so that it adopt our
+ // GenerationID.
+ disableService();
+ enableService();
+
+ resetGenerationId(getGenerationID());
+
+ // check that at least one ReplicationServer did change its generation-id
+ checkGenerationID(getGenerationID());
}
/**
@@ -1715,8 +1772,7 @@
if (!isConnected())
{
ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
- serviceID);
+ Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
throw new DirectoryException(
resultCode, message);
}
@@ -2088,26 +2144,29 @@
Collection<String> replicationServers, int window,
long heartbeatInterval) throws ConfigException
{
- /*
- * create the broker object used to publish and receive changes
- */
- broker = new ReplicationBroker(
- this, state, serviceID,
- serverID, window,
- getGenerationID(),
- heartbeatInterval,
- new ReplSessionSecurity(),
- getGroupId());
+ if (broker == null)
+ {
+ /*
+ * create the broker object used to publish and receive changes
+ */
+ broker = new ReplicationBroker(
+ this, state, serviceID,
+ serverID, window,
+ getGenerationID(),
+ heartbeatInterval,
+ new ReplSessionSecurity(),
+ getGroupId());
- broker.start(replicationServers);
+ broker.start(replicationServers);
- /*
- * Create a replication monitor object responsible for publishing
- * monitoring information below cn=monitor.
- */
- monitor = new ReplicationMonitor(this);
+ /*
+ * Create a replication monitor object responsible for publishing
+ * monitoring information below cn=monitor.
+ */
+ monitor = new ReplicationMonitor(this);
- DirectoryServer.registerMonitorProvider(monitor);
+ DirectoryServer.registerMonitorProvider(monitor);
+ }
}
/**
@@ -2115,9 +2174,14 @@
* <p>
* After this method has been called, the Replication Service will start
* calling the {@link #processUpdate(UpdateMsg)}.
+ * <p>
+ * This method must be called once and must be called after the
+ * {@link #startPublishService(Collection, int, long)}.
+ *
*/
public void startListenService()
{
+ //
// Create the listener thread
listenerThread = new ListenerThread(this);
listenerThread.start();
--
Gitblit v1.10.0