From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 144 ++++++++++++++++++++++++++++++++++++++++-------
1 files changed, 122 insertions(+), 22 deletions(-)
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 6bf5821..93193b2 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -28,13 +28,13 @@
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
-import org.opends.server.config.ConfigException;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -42,13 +42,14 @@
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
-import org.opends.messages.MessageBuilder;
-import org.opends.messages.Message;
-import org.opends.messages.Category;
-import org.opends.messages.Severity;
import org.opends.server.backends.task.TaskState;
+import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperationBasis;
import org.opends.server.core.DirectoryServer;
@@ -58,7 +59,10 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
+import org.opends.server.replication.plugin.ReplicationDomain;
+import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.schema.IntegerSyntax;
import org.opends.server.types.Attribute;
@@ -136,13 +140,50 @@
}
/**
- * Open a replicationServer session to the local ReplicationServer.
+ * Retrieves the domain associated to the baseDn, and the value of the generationId
+ * of this domain. If the domain does not exist, returns the default hard-coded\
+ * value of the generationId corresponding to 'no entry'.
*
+ * @param baseDn The baseDn for which we want the generationId
+ * @return The value of the generationId.
+ */
+ static protected long getGenerationId(DN baseDn)
+ {
+ // This is the value of the generationId computed by the server when the
+ // suffix is empty.
+ long genId = 3276850;
+ try
+ {
+ ReplicationDomain replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
+ genId = replDomain.getGenerationId();
+ }
+ catch(Exception e) {}
+ return genId;
+ }
+
+ /**
+ * Open a replicationServer session to the local ReplicationServer.
+ * The generation is read from the replicationDomain object. If it
+ * does not exist, take the 'empty backend' generationID.
*/
protected ReplicationBroker openReplicationSession(
final DN baseDn, short serverId, int window_size,
int port, int timeout, boolean emptyOldChanges)
- throws Exception
+ throws Exception, SocketException
+ {
+ return openReplicationSession(baseDn, serverId, window_size,
+ port, timeout, emptyOldChanges, getGenerationId(baseDn));
+ }
+
+ /**
+ * Open a replicationServer session to the local ReplicationServer
+ * providing the generationId.
+ */
+ protected ReplicationBroker openReplicationSession(
+ final DN baseDn, short serverId, int window_size,
+ int port, int timeout, boolean emptyOldChanges,
+ long generationId)
+ throws Exception, SocketException
{
ServerState state;
if (emptyOldChanges)
@@ -151,8 +192,8 @@
state = new ServerState();
ReplicationBroker broker = new ReplicationBroker(
- state, baseDn, serverId, 0, 0, 0, 0, window_size, 0,
- getReplSessionSecurity());
+ state, baseDn, serverId, 0, 0, 0, 0,
+ window_size, 0, generationId, getReplSessionSecurity());
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:" + port);
broker.start(servers);
@@ -170,7 +211,15 @@
{
while (true)
{
- broker.receive();
+ ReplicationMessage rMsg = broker.receive();
+ if (rMsg instanceof ErrorMessage)
+ {
+ ErrorMessage eMsg = (ErrorMessage)rMsg;
+ logError(new MessageBuilder(
+ "ReplicationTestCase/openReplicationSession ").append(
+ " received ErrorMessage when emptying old changes ").append(
+ eMsg.getDetails()).toMessage());
+ }
}
}
catch (Exception e)
@@ -184,16 +233,30 @@
}
/**
+ * Open a replicationServer session to the local ReplicationServer
+ * with a default value generationId.
+ *
+ */
+ protected ReplicationBroker openReplicationSession(
+ final DN baseDn, short serverId, int window_size,
+ int port, int timeout, ServerState state)
+ throws Exception, SocketException
+ {
+ return openReplicationSession(baseDn, serverId, window_size,
+ port, timeout, state, getGenerationId(baseDn));
+ }
+
+ /**
* Open a new session to the ReplicationServer
* starting with a given ServerState.
*/
protected ReplicationBroker openReplicationSession(
final DN baseDn, short serverId, int window_size,
- int port, int timeout, ServerState state)
- throws Exception
+ int port, int timeout, ServerState state, long generationId)
+ throws Exception, SocketException
{
ReplicationBroker broker = new ReplicationBroker(
- state, baseDn, serverId, 0, 0, 0, 0, window_size, 0,
+ state, baseDn, serverId, 0, 0, 0, 0, window_size, 0, generationId,
getReplSessionSecurity());
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:" + port);
@@ -213,7 +276,18 @@
final DN baseDn, short serverId, int window_size,
int port, int timeout, int maxSendQueue, int maxRcvQueue,
boolean emptyOldChanges)
- throws Exception
+ throws Exception, SocketException
+ {
+ return openReplicationSession(baseDn, serverId, window_size,
+ port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges,
+ getGenerationId(baseDn));
+ }
+
+ protected ReplicationBroker openReplicationSession(
+ final DN baseDn, short serverId, int window_size,
+ int port, int timeout, int maxSendQueue, int maxRcvQueue,
+ boolean emptyOldChanges, long generationId)
+ throws Exception, SocketException
{
ServerState state;
if (emptyOldChanges)
@@ -223,7 +297,7 @@
ReplicationBroker broker = new ReplicationBroker(
state, baseDn, serverId, maxRcvQueue, 0,
- maxSendQueue, 0, window_size, 0,
+ maxSendQueue, 0, window_size, 0, generationId,
getReplSessionSecurity());
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:" + port);
@@ -240,7 +314,15 @@
{
while (true)
{
- broker.receive();
+ ReplicationMessage rMsg = broker.receive();
+ if (rMsg instanceof ErrorMessage)
+ {
+ ErrorMessage eMsg = (ErrorMessage)rMsg;
+ logError(new MessageBuilder(
+ "ReplicationTestCase/openReplicationSession ").append(
+ " received ErrorMessage when emptying old changes ").append(
+ eMsg.getDetails()).toMessage());
+ }
}
}
catch (Exception e)
@@ -264,13 +346,22 @@
while (true)
{
DN dn = configEntryList.removeLast();
- logError(Message.raw(Category.SYNC, Severity.NOTICE,
+
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
"cleaning config entry " + dn));
op = new DeleteOperationBasis(connection, InternalClientConnection
.nextOperationID(), InternalClientConnection.nextMessageID(), null,
dn);
op.run();
+ if ((op.getResultCode() != ResultCode.SUCCESS) &&
+ (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
+ "ReplicationTestCase/Cleaning config entries" +
+ "DEL " + dn +
+ " failed " + op.getResultCode().getResultCodeName()));
+ }
}
}
catch (NoSuchElementException e) {
@@ -293,14 +384,23 @@
while (true)
{
DN dn = entryList.removeLast();
- logError(Message.raw(Category.SYNC, Severity.NOTICE,
- "cleaning entry " + dn));
- op = new DeleteOperationBasis(connection, InternalClientConnection
- .nextOperationID(), InternalClientConnection.nextMessageID(), null,
- dn);
+ op = new DeleteOperationBasis(connection,
+ InternalClientConnection.nextOperationID(),
+ InternalClientConnection.nextMessageID(),
+ null,
+ dn);
op.run();
+
+ if ((op.getResultCode() != ResultCode.SUCCESS) &&
+ (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
+ "ReplicationTestCase/Cleaning entries" +
+ "DEL " + dn +
+ " failed " + op.getResultCode().getResultCodeName()));
+ }
}
}
catch (NoSuchElementException e) {
--
Gitblit v1.10.0