mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
16.10.2007 65f071d6b9adf6414c8074381f5e95acb1297565
When a replica connects to a replication server that is "late" compared to the replica data , the replica must update the replication server with the missing changes.
1 line fix , the entryUUID attribute must be part of the searched attributes in order to rebuild the operations - and created unit tests for this.


1 files added
5 files modified
393 ■■■■■ changed files
opends/src/messages/messages/replication.properties 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/Historical.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 76 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java 303 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 2 ●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -170,8 +170,8 @@
 This replication server will now shutdown
SEVERE_ERR_REPLICATION_COULD_NOT_CONNECT_61=The Replication is configured for \
 suffix  %s but was not able to connect to any Replication Server
NOTICE_NOW_FOUND_CHANGELOG_62=Replication Server %s now used for Replication \
 Domain %s
NOTICE_NOW_FOUND_SAME_GENERATION_CHANGELOG_62=Replication is up and running \
 for domain %s with replication server %s - data generation is %s
NOTICE_DISCONNECTED_FROM_CHANGELOG_63=The connection to Replication Server %s \
 has been dropped by the Replication Server
SEVERE_ERR_CHANGELOG_ERROR_SENDING_INFO_64=An unexpected error occurred \
@@ -239,3 +239,7 @@
 the replication server backend
SEVERE_ERR_UNKNOWN_DN_95=The base DN %s is not stored by any of the \
 Directory Server backend
NOTICE_NOW_FOUND_BAD_GENERATION_CHANGELOG_96=Replication is up but degraded \
 for domain %s with replication server %s - local data generation is %s \
 - replication server data generation is %s - This may be only temporary \
  or require a full resynchronization
opends/src/server/org/opends/server/replication/plugin/Historical.java
@@ -84,7 +84,7 @@
  /**
   * The name of the entryuuid attribute.
   */
  static final String ENTRYUIDNAME = "entryuuid";
  public static final String ENTRYUIDNAME = "entryuuid";
  /*
opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
@@ -60,7 +60,7 @@
   * Compare two ByteString values containing historical information.
   * @param value1 first value to compare
   * @param value2 second value to compare
   * @return 0 when equals, -1 ot 1 to establish order
   * @return 0 when equals, -1 or 1 to establish order
   */
  @Override
  public int compareValues(ByteString value1, ByteString value2)
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -202,7 +202,7 @@
   */
  private void connect()
  {
    ReplServerStartMessage replServerStartMsg;
    ReplServerStartMessage replServerStartMsg = null;
    // Stop any existing heartbeat monitor from a previous session.
    if (heartbeatMonitor != null)
@@ -331,21 +331,11 @@
                /*
                 * Get all the changes that have not been seen by this
                 * replicationServer and update it
                 * replicationServer and populate the replayOperations
                 * list.
                 */
                InternalClientConnection conn =
                  InternalClientConnection.getRootConnection();
                LDAPFilter filter = LDAPFilter.decode(
                    "("+ Historical.HISTORICALATTRIBUTENAME +
                    ">=dummy:" + replServerMaxChangeNumber + ")");
                LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
                attrs.add(Historical.HISTORICALATTRIBUTENAME);
                InternalSearchOperation op = conn.processSearch(
                    new ASN1OctetString(baseDn.toString()),
                    SearchScope.WHOLE_SUBTREE,
                    DereferencePolicy.NEVER_DEREF_ALIASES,
                    0, 0, false, filter,
                    attrs, this);
                InternalSearchOperation op = seachForChangedEntries(
                    baseDn, replServerMaxChangeNumber, this);
                if (op.getResultCode() != ResultCode.SUCCESS)
                {
                  /*
@@ -451,9 +441,27 @@
          sendWindow.release(Integer.MAX_VALUE);
        this.sendWindow = new Semaphore(maxSendWindow);
        connectPhaseLock.notify();
        Message message =
            NOTE_NOW_FOUND_CHANGELOG.get(replicationServer, baseDn.toString());
        logError(message);
        if ((replServerStartMsg.getGenerationId() == this.generationId) ||
           (replServerStartMsg.getGenerationId() == -1))
        {
          Message message =
            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
                baseDn.toString(),
                replicationServer,
                Long.toString(this.generationId));
          logError(message);
        }
        else
        {
          Message message =
            NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
                baseDn.toString(),
                replicationServer,
                Long.toString(this.generationId),
                Long.toString(replServerStartMsg.getGenerationId()));
          logError(message);
        }
      }
      else
      {
@@ -476,6 +484,38 @@
  }
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute.
   * @param baseDn the base DN
   * @param fromChangeNumber The change number from which we want the changes
   * @param resultListener that will process the entries returned.
   * @return the internal search operation
   * @throws Exception when raised.
   */
  public static InternalSearchOperation seachForChangedEntries(
      DN baseDn,
      ChangeNumber fromChangeNumber,
      InternalSearchListener resultListener)
  throws Exception
  {
    InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
    LDAPFilter filter = LDAPFilter.decode(
        "("+ Historical.HISTORICALATTRIBUTENAME +
        ">=dummy:" + fromChangeNumber + ")");
    LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
    attrs.add(Historical.HISTORICALATTRIBUTENAME);
    attrs.add(Historical.ENTRYUIDNAME);
    return conn.processSearch(
        new ASN1OctetString(baseDn.toString()),
        SearchScope.WHOLE_SUBTREE,
        DereferencePolicy.NEVER_DEREF_ALIASES,
        0, 0, false, filter,
        attrs,
        resultListener);
  }
  /**
   * Start the heartbeat monitor thread.
   */
  private void startHeartBeat()
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
New file
@@ -0,0 +1,303 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.net.ServerSocket;
import java.util.Iterator;
import java.util.List;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchResultEntry;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
 * Test the usage of the historical data of the replication.
 */
public class HistoricalCsnOrderingTest
extends ReplicationTestCase
{
  /**
   * A "person" entry
   */
  protected Entry personEntry;
  private int replServerPort;
  /**
   * Set up the environment for performing the tests in this Class.
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @BeforeClass
  @Override
  public void setUp() throws Exception
  {
    super.setUp();
    // Create backend top level entries
    String[] topEntries = new String[2];
    topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
    + "objectClass: domain\n";
    topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n"
    + "objectClass: organizationalUnit\n"
    + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
    for (String entryStr : topEntries)
    {
      addEntry(TestCaseUtils.entryFromLdifString(entryStr));
    }
    // top level synchro provider
    String synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Multimaster Synchro plugin
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
      + synchroStringDN;
    // find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
    replServerPort = socket.getLocalPort();
    socket.close();
    // replication server
    String replServerLdif =
      "dn: cn=Replication Server, " + synchroPluginStringDN + "\n"
      + "objectClass: top\n"
      + "objectClass: ds-cfg-replication-server\n"
      + "cn: Replication Server\n"
      + "ds-cfg-replication-port: " + replServerPort + "\n"
      + "ds-cfg-replication-server-id: 1\n";
    replServerEntry = TestCaseUtils.entryFromLdifString(replServerLdif);
    // suffix synchronized
    String synchroServerLdif =
      "dn: cn=example, cn=domains, " + synchroPluginStringDN + "\n"
      + "objectClass: top\n"
      + "objectClass: ds-cfg-replication-domain\n"
      + "cn: example\n"
      + "ds-cfg-base-dn: ou=People,dc=example,dc=com\n"
      + "ds-cfg-replication-server: localhost:" + replServerPort + "\n"
      + "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n";
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
      + "objectClass: top\n" + "objectClass: person\n"
      + "objectClass: organizationalPerson\n"
      + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
      + "homePhone: 951-245-7634\n"
      + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
      + "mobile: 027-085-0537\n"
      + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
      + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
      + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
      + "street: 17984 Thirteenth Street\n"
      + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
      + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
      + "userPassword: password\n" + "initials: AA\n";
    personEntry = TestCaseUtils.entryFromLdifString(personLdif);
    configureReplication();
  }
  /**
   * Add an entry in the datatbase
   *
   */
  private void addEntry(Entry entry) throws Exception
  {
    AddOperationBasis addOp = new AddOperationBasis(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
        .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
        entry.getUserAttributes(), entry.getOperationalAttributes());
    addOp.setInternalOperation(true);
    addOp.run();
    assertNotNull(getEntry(entry.getDN(), 1000, true));
  }
  /**
   * Check the basic comparator on the HistoricalCsnOrderingMatchingRule
   */
  @Test()
  public void basicRuleTest()
  throws Exception
  {
    // Creates a rule
    HistoricalCsnOrderingMatchingRule r =
      new HistoricalCsnOrderingMatchingRule();
    ChangeNumber del1 = new ChangeNumber(1, (short) 0, (short) 1);
    ChangeNumber del2 = new ChangeNumber(1, (short) 1, (short) 1);
    ByteString v1 = new ASN1OctetString("a"+":"+del1.toString());
    ByteString v2 = new ASN1OctetString("a"+":"+del2.toString());
    int cmp = r.compareValues(v1, v1);
    assertTrue(cmp == 0);
    cmp = r.compareValues(v1, v2);
    assertTrue(cmp == -1);
    cmp = r.compareValues(v2, v1);
    assertTrue(cmp == 1);
  }
  /**
   * Test that we can retrieve the entries that were missed by
   * a replication server and can  re-build operations from the historical
   * informations.
   */
  @Test()
  public void changesCmpTest()
  throws Exception
  {
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    final DN dn1 = DN.decode("cn=test1," + baseDn.toString());
    final AttributeType histType =
      DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
    "Starting replication test : changesCmpTest"));
    // Add the first test entry.
    TestCaseUtils.addEntry(
        "dn: cn=test1," + baseDn.toString(),
        "displayname: Test1",
        "objectClass: top",
        "objectClass: person",
        "objectClass: organizationalPerson",
        "objectClass: inetOrgPerson",
        "cn: test1",
        "sn: test"
    );
    // Perform a first modification to update the historical attribute
    int resultCode = TestCaseUtils.applyModifications(
        "dn: cn=test1," + baseDn.toString(),
        "changetype: modify",
        "add: description",
    "description: foo");
    assertEquals(resultCode, 0);
    // Read the entry back to get its historical and included changeNumber
    Entry entry = DirectoryServer.getEntry(dn1);
    List<Attribute> attrs1 = entry.getAttribute(histType);
    assertTrue(attrs1 != null);
    assertTrue(attrs1.isEmpty() != true);
    String histValue =
      attrs1.get(0).getValues().iterator().next().getStringValue();
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "First historical value:" + histValue));
    // Perform a 2nd modification to update the hist attribute with
    // a second value
    resultCode = TestCaseUtils.applyModifications(
        "dn: cn=test1," + baseDn.toString(),
        "changetype: modify",
        "add: description",
    "description: bar");
    assertEquals(resultCode, 0);
    Entry entry2 = DirectoryServer.getEntry(dn1);
    List<Attribute> attrs2 = entry2.getAttribute(histType);
    assertTrue(attrs2 != null);
    assertTrue(attrs2.isEmpty() != true);
    Iterator<AttributeValue> iav = attrs2.get(0).getValues().iterator();
    try
    {
      while (true)
      {
        AttributeValue av = iav.next();
        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
            "Secnd historical value:" + av.getNormalizedStringValue()));
      }
    }
    catch(Exception e)
    {
    }
    // Build a change number from the first modification
    String hv[] = histValue.split(":");
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        hv[1]));
    ChangeNumber fromChangeNumber =
      new ChangeNumber(hv[1]);
    // Retrieves the entries that have changed since the first modification
    InternalSearchOperation op =
      ReplicationBroker.seachForChangedEntries(baseDn, fromChangeNumber, null);
    // The expected result is one entry .. the one previously modified
    assertTrue(op.getResultCode() == ResultCode.SUCCESS);
    assertTrue(op.getSearchEntries().size()==1);
    // From the historical of this entry, rebuild operations
    // Since there have been 2 modifications, there should be 2
    // operations rebuild from this state.
    int updatesCnt = 0;
    for (SearchResultEntry searchEntry : op.getSearchEntries())
    {
      logError(Message.raw(Category.SYNC, Severity.INFORMATION,
          searchEntry.toString()));
      Iterable<FakeOperation> updates =
        Historical.generateFakeOperations(searchEntry);
      for (FakeOperation fop : updates)
      {
        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
            fop.generateMessage().toString()));
        updatesCnt++;
      }
    }
    assertTrue(updatesCnt == 2);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -1260,7 +1260,7 @@
     {
       server1.publish(msg);
     }
     Thread.sleep(200);
     Thread.sleep(500);
     // Sets manually the association backend-replication server since
     // no config object exist for our replication server.