From 2dd63b9cc119a57154c7d7a752698493eea24903 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Tue, 16 Oct 2007 07:10:39 +0000
Subject: [PATCH] When a replica connects to a replication server that is "late" compared to the replica data , the replica must update the replication server with the missing changes. 1 line fix , the entryUUID attribute must be part of the searched attributes in order to rebuild the operations - and created unit tests for this.
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 76 ++++++++--
opendj-sdk/opends/src/messages/messages/replication.properties | 8
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java | 2
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java | 303 +++++++++++++++++++++++++++++++++++++++++++
6 files changed, 370 insertions(+), 23 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 0f08170..47f7f6b 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -170,8 +170,8 @@
This replication server will now shutdown
SEVERE_ERR_REPLICATION_COULD_NOT_CONNECT_61=The Replication is configured for \
suffix %s but was not able to connect to any Replication Server
-NOTICE_NOW_FOUND_CHANGELOG_62=Replication Server %s now used for Replication \
- Domain %s
+NOTICE_NOW_FOUND_SAME_GENERATION_CHANGELOG_62=Replication is up and running \
+ for domain %s with replication server %s - data generation is %s
NOTICE_DISCONNECTED_FROM_CHANGELOG_63=The connection to Replication Server %s \
has been dropped by the Replication Server
SEVERE_ERR_CHANGELOG_ERROR_SENDING_INFO_64=An unexpected error occurred \
@@ -239,3 +239,7 @@
the replication server backend
SEVERE_ERR_UNKNOWN_DN_95=The base DN %s is not stored by any of the \
Directory Server backend
+NOTICE_NOW_FOUND_BAD_GENERATION_CHANGELOG_96=Replication is up but degraded \
+ for domain %s with replication server %s - local data generation is %s \
+ - replication server data generation is %s - This may be only temporary \
+ or require a full resynchronization
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java
index 2f740df..42b2f5b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java
@@ -84,7 +84,7 @@
/**
* The name of the entryuuid attribute.
*/
- static final String ENTRYUIDNAME = "entryuuid";
+ public static final String ENTRYUIDNAME = "entryuuid";
/*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
index 675f517..6709a78 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
@@ -60,7 +60,7 @@
* Compare two ByteString values containing historical information.
* @param value1 first value to compare
* @param value2 second value to compare
- * @return 0 when equals, -1 ot 1 to establish order
+ * @return 0 when equals, -1 or 1 to establish order
*/
@Override
public int compareValues(ByteString value1, ByteString value2)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index e561fb7..52e0ac4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -202,7 +202,7 @@
*/
private void connect()
{
- ReplServerStartMessage replServerStartMsg;
+ ReplServerStartMessage replServerStartMsg = null;
// Stop any existing heartbeat monitor from a previous session.
if (heartbeatMonitor != null)
@@ -331,21 +331,11 @@
/*
* Get all the changes that have not been seen by this
- * replicationServer and update it
+ * replicationServer and populate the replayOperations
+ * list.
*/
- InternalClientConnection conn =
- InternalClientConnection.getRootConnection();
- LDAPFilter filter = LDAPFilter.decode(
- "("+ Historical.HISTORICALATTRIBUTENAME +
- ">=dummy:" + replServerMaxChangeNumber + ")");
- LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
- attrs.add(Historical.HISTORICALATTRIBUTENAME);
- InternalSearchOperation op = conn.processSearch(
- new ASN1OctetString(baseDn.toString()),
- SearchScope.WHOLE_SUBTREE,
- DereferencePolicy.NEVER_DEREF_ALIASES,
- 0, 0, false, filter,
- attrs, this);
+ InternalSearchOperation op = seachForChangedEntries(
+ baseDn, replServerMaxChangeNumber, this);
if (op.getResultCode() != ResultCode.SUCCESS)
{
/*
@@ -451,9 +441,27 @@
sendWindow.release(Integer.MAX_VALUE);
this.sendWindow = new Semaphore(maxSendWindow);
connectPhaseLock.notify();
- Message message =
- NOTE_NOW_FOUND_CHANGELOG.get(replicationServer, baseDn.toString());
- logError(message);
+
+ if ((replServerStartMsg.getGenerationId() == this.generationId) ||
+ (replServerStartMsg.getGenerationId() == -1))
+ {
+ Message message =
+ NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
+ baseDn.toString(),
+ replicationServer,
+ Long.toString(this.generationId));
+ logError(message);
+ }
+ else
+ {
+ Message message =
+ NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
+ baseDn.toString(),
+ replicationServer,
+ Long.toString(this.generationId),
+ Long.toString(replServerStartMsg.getGenerationId()));
+ logError(message);
+ }
}
else
{
@@ -476,6 +484,38 @@
}
/**
+ * Search for the changes that happened since fromChangeNumber
+ * based on the historical attribute.
+ * @param baseDn the base DN
+ * @param fromChangeNumber The change number from which we want the changes
+ * @param resultListener that will process the entries returned.
+ * @return the internal search operation
+ * @throws Exception when raised.
+ */
+ public static InternalSearchOperation seachForChangedEntries(
+ DN baseDn,
+ ChangeNumber fromChangeNumber,
+ InternalSearchListener resultListener)
+ throws Exception
+ {
+ InternalClientConnection conn =
+ InternalClientConnection.getRootConnection();
+ LDAPFilter filter = LDAPFilter.decode(
+ "("+ Historical.HISTORICALATTRIBUTENAME +
+ ">=dummy:" + fromChangeNumber + ")");
+ LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+ attrs.add(Historical.HISTORICALATTRIBUTENAME);
+ attrs.add(Historical.ENTRYUIDNAME);
+ return conn.processSearch(
+ new ASN1OctetString(baseDn.toString()),
+ SearchScope.WHOLE_SUBTREE,
+ DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, 0, false, filter,
+ attrs,
+ resultListener);
+ }
+
+ /**
* Start the heartbeat monitor thread.
*/
private void startHeartBeat()
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
new file mode 100644
index 0000000..2f31563
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -0,0 +1,303 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.net.ServerSocket;
+import java.util.Iterator;
+import java.util.List;
+
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.core.AddOperationBasis;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchResultEntry;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test the usage of the historical data of the replication.
+ */
+public class HistoricalCsnOrderingTest
+extends ReplicationTestCase
+{
+ /**
+ * A "person" entry
+ */
+ protected Entry personEntry;
+ private int replServerPort;
+
+ /**
+ * Set up the environment for performing the tests in this Class.
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @BeforeClass
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Create backend top level entries
+ String[] topEntries = new String[2];
+ topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
+ + "objectClass: domain\n";
+ topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n"
+ + "objectClass: organizationalUnit\n"
+ + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
+ for (String entryStr : topEntries)
+ {
+ addEntry(TestCaseUtils.entryFromLdifString(entryStr));
+ }
+
+ // top level synchro provider
+ String synchroStringDN = "cn=Synchronization Providers,cn=config";
+
+ // Multimaster Synchro plugin
+ synchroPluginStringDN = "cn=Multimaster Synchronization, "
+ + synchroStringDN;
+
+ // find a free port for the replicationServer
+ ServerSocket socket = TestCaseUtils.bindFreePort();
+ replServerPort = socket.getLocalPort();
+ socket.close();
+
+ // replication server
+ String replServerLdif =
+ "dn: cn=Replication Server, " + synchroPluginStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-replication-server\n"
+ + "cn: Replication Server\n"
+ + "ds-cfg-replication-port: " + replServerPort + "\n"
+ + "ds-cfg-replication-server-id: 1\n";
+ replServerEntry = TestCaseUtils.entryFromLdifString(replServerLdif);
+
+ // suffix synchronized
+ String synchroServerLdif =
+ "dn: cn=example, cn=domains, " + synchroPluginStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-replication-domain\n"
+ + "cn: example\n"
+ + "ds-cfg-base-dn: ou=People,dc=example,dc=com\n"
+ + "ds-cfg-replication-server: localhost:" + replServerPort + "\n"
+ + "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n";
+ synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
+
+ String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
+ + "objectClass: top\n" + "objectClass: person\n"
+ + "objectClass: organizationalPerson\n"
+ + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+ + "homePhone: 951-245-7634\n"
+ + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+ + "mobile: 027-085-0537\n"
+ + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+ + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n"
+ + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+ + "street: 17984 Thirteenth Street\n"
+ + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+ + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+ + "userPassword: password\n" + "initials: AA\n";
+ personEntry = TestCaseUtils.entryFromLdifString(personLdif);
+
+ configureReplication();
+ }
+
+ /**
+ * Add an entry in the datatbase
+ *
+ */
+ private void addEntry(Entry entry) throws Exception
+ {
+ AddOperationBasis addOp = new AddOperationBasis(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
+ entry.getUserAttributes(), entry.getOperationalAttributes());
+ addOp.setInternalOperation(true);
+ addOp.run();
+ assertNotNull(getEntry(entry.getDN(), 1000, true));
+ }
+
+ /**
+ * Check the basic comparator on the HistoricalCsnOrderingMatchingRule
+ */
+ @Test()
+ public void basicRuleTest()
+ throws Exception
+ {
+ // Creates a rule
+ HistoricalCsnOrderingMatchingRule r =
+ new HistoricalCsnOrderingMatchingRule();
+
+ ChangeNumber del1 = new ChangeNumber(1, (short) 0, (short) 1);
+ ChangeNumber del2 = new ChangeNumber(1, (short) 1, (short) 1);
+
+ ByteString v1 = new ASN1OctetString("a"+":"+del1.toString());
+ ByteString v2 = new ASN1OctetString("a"+":"+del2.toString());
+
+ int cmp = r.compareValues(v1, v1);
+ assertTrue(cmp == 0);
+
+ cmp = r.compareValues(v1, v2);
+ assertTrue(cmp == -1);
+
+ cmp = r.compareValues(v2, v1);
+ assertTrue(cmp == 1);
+ }
+
+ /**
+ * Test that we can retrieve the entries that were missed by
+ * a replication server and can re-build operations from the historical
+ * informations.
+ */
+ @Test()
+ public void changesCmpTest()
+ throws Exception
+ {
+ final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+ final DN dn1 = DN.decode("cn=test1," + baseDn.toString());
+ final AttributeType histType =
+ DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
+
+ logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+ "Starting replication test : changesCmpTest"));
+
+ // Add the first test entry.
+ TestCaseUtils.addEntry(
+ "dn: cn=test1," + baseDn.toString(),
+ "displayname: Test1",
+ "objectClass: top",
+ "objectClass: person",
+ "objectClass: organizationalPerson",
+ "objectClass: inetOrgPerson",
+ "cn: test1",
+ "sn: test"
+ );
+
+ // Perform a first modification to update the historical attribute
+ int resultCode = TestCaseUtils.applyModifications(
+ "dn: cn=test1," + baseDn.toString(),
+ "changetype: modify",
+ "add: description",
+ "description: foo");
+ assertEquals(resultCode, 0);
+
+ // Read the entry back to get its historical and included changeNumber
+ Entry entry = DirectoryServer.getEntry(dn1);
+ List<Attribute> attrs1 = entry.getAttribute(histType);
+
+ assertTrue(attrs1 != null);
+ assertTrue(attrs1.isEmpty() != true);
+
+ String histValue =
+ attrs1.get(0).getValues().iterator().next().getStringValue();
+
+ logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+ "First historical value:" + histValue));
+
+ // Perform a 2nd modification to update the hist attribute with
+ // a second value
+ resultCode = TestCaseUtils.applyModifications(
+ "dn: cn=test1," + baseDn.toString(),
+ "changetype: modify",
+ "add: description",
+ "description: bar");
+ assertEquals(resultCode, 0);
+
+ Entry entry2 = DirectoryServer.getEntry(dn1);
+ List<Attribute> attrs2 = entry2.getAttribute(histType);
+
+ assertTrue(attrs2 != null);
+ assertTrue(attrs2.isEmpty() != true);
+
+ Iterator<AttributeValue> iav = attrs2.get(0).getValues().iterator();
+ try
+ {
+ while (true)
+ {
+ AttributeValue av = iav.next();
+ logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+ "Secnd historical value:" + av.getNormalizedStringValue()));
+ }
+ }
+ catch(Exception e)
+ {
+ }
+
+ // Build a change number from the first modification
+ String hv[] = histValue.split(":");
+ logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+ hv[1]));
+ ChangeNumber fromChangeNumber =
+ new ChangeNumber(hv[1]);
+
+ // Retrieves the entries that have changed since the first modification
+ InternalSearchOperation op =
+ ReplicationBroker.seachForChangedEntries(baseDn, fromChangeNumber, null);
+
+ // The expected result is one entry .. the one previously modified
+ assertTrue(op.getResultCode() == ResultCode.SUCCESS);
+ assertTrue(op.getSearchEntries().size()==1);
+
+ // From the historical of this entry, rebuild operations
+ // Since there have been 2 modifications, there should be 2
+ // operations rebuild from this state.
+ int updatesCnt = 0;
+ for (SearchResultEntry searchEntry : op.getSearchEntries())
+ {
+ logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+ searchEntry.toString()));
+ Iterable<FakeOperation> updates =
+ Historical.generateFakeOperations(searchEntry);
+ for (FakeOperation fop : updates)
+ {
+ logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+ fop.generateMessage().toString()));
+ updatesCnt++;
+ }
+ }
+ assertTrue(updatesCnt == 2);
+ }
+}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 165d085..e23c770 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -1260,7 +1260,7 @@
{
server1.publish(msg);
}
- Thread.sleep(200);
+ Thread.sleep(500);
// Sets manually the association backend-replication server since
// no config object exist for our replication server.
--
Gitblit v1.10.0