mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
08.50.2007 db8ccd3607a951d74b373afc6ca63b3b18554ab1
Issue 613 :
Add the capability to deal with schema changes made be editing the schema files :
The core server provides a notification to the synchronization. The
synchronization generates a pseudo changes and propagate it to the
other LDAP servers through the synchronization servers.

The core server is not yet able to provide the notification but this is the code
for the synchronization part of the job with its associated unit test.
5 files modified
201 ■■■■ changed files
opends/src/server/org/opends/server/backends/SchemaBackend.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java 20 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java 45 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SchemaSynchronizationTest.java 126 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java 2 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/SchemaBackend.java
@@ -3425,10 +3425,10 @@
      if (values != null)
      {
        ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
        attrList.add(new Attribute(matchingRuleUsesType,
                                 matchingRuleUsesType.getPrimaryName(),
                                 values));
        schemaEntry.putAttribute(matchingRuleUsesType, attrList);
        attrList.add(new Attribute(synchronizationStateType,
                                   synchronizationStateType.getPrimaryName(),
                                   values));
        schemaEntry.putAttribute(synchronizationStateType, attrList);
      }
    }
opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -27,6 +27,7 @@
package org.opends.server.synchronization.plugin;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.opends.server.api.ConfigAddListener;
@@ -47,6 +48,7 @@
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.Modification;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SynchronizationProviderResult;
@@ -535,6 +537,24 @@
    SynchronizationDomain domain = findDomain(dn, null);
    domain.backupEnd();
  }
  /**
   * This method is called whenever the server detects a modification
   * of the schema done by directly modifying the backing files
   * of the schema backend.
   * Call the schema Synchronization Domain if it exists.
   *
   * @param  modifications  The list of modifications that was
   *                                      applied to the schema.
   *
   */
  public static void schemaChangeNotification(List<Modification> modifications)
  {
    SynchronizationDomain domain =
      findDomain(DirectoryServer.getSchemaDN(), null);
    if (domain != null)
      domain.synchronizeModifications(modifications);
  }
}
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -90,6 +90,7 @@
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.Modification;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
@@ -259,25 +260,19 @@
    configAttributes.add(baseDn);
    /*
     * Modify conflicts are solved for all suffixes but the cn=schema suffix
     * Modify conflicts are solved for all suffixes but the schema suffix
     * because we don't want to store extra information in the schema
     * ldif files.
     * This has no negative impact because the changes on schema should
     * not produce conflicts.
     */
    try
    if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0)
    {
      if (baseDN.compareTo(DN.decode("cn=schema")) == 0)
      {
        solveConflictFlag = false;
      }
      else
      {
        solveConflictFlag = true;
      }
    } catch (DirectoryException e1)
      solveConflictFlag = false;
    }
    else
    {
      // never happens because "cn=schema" is a valid DN
      solveConflictFlag = true;
    }
    state = new PersistentServerState(baseDN);
@@ -1933,4 +1928,30 @@
  {
    // Nothing is needed at the moment
  }
  /**
   * Push the modifications contain the in given parameter has
   * a modification that would happen on a local server.
   * The modifications are not applied to the local database,
   * historical information is not updated but a ChangeNumber
   * is generated and the ServerState associated to this domain is
   * updated.
   * @param modifications The modification to push
   */
  public void synchronizeModifications(List<Modification> modifications)
  {
    Operation op =
      new ModifyOperation(InternalClientConnection.getRootConnection(),
                          InternalClientConnection.nextOperationID(),
                          InternalClientConnection.nextMessageID(),
                          null, DirectoryServer.getSchemaDN(),
                          modifications);
    ChangeNumber cn = generateChangeNumber(op);
    System.out.println("cn is " + cn);
    OperationContext ctx = new ModifyContext(cn, "schema");
    op.setAttachment(SYNCHROCONTEXT, ctx);
    op.setResultCode(ResultCode.SUCCESS);
    synchronize(op);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SchemaSynchronizationTest.java
@@ -30,18 +30,25 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.File;
import java.io.FileInputStream;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import org.opends.server.TestCaseUtils;
import org.opends.server.TestErrorLogger;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ChangeNumberGenerator;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.plugin.MultimasterSynchronization;
import org.opends.server.synchronization.protocol.ModifyMsg;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.types.Attribute;
@@ -62,7 +69,9 @@
public class SchemaSynchronizationTest extends SynchronizationTestCase
{
  ArrayList<Modification> rcvdMods = null;
  private ArrayList<Modification> rcvdMods = null;
  private int changelogPort;
  /**
   * Set up the environment for performing the tests in this Class.
@@ -77,6 +86,11 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // find  a free port for the changelog server
    ServerSocket socket = TestCaseUtils.bindFreePort();
    changelogPort = socket.getLocalPort();
    socket.close();
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
@@ -100,7 +114,8 @@
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
        + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
        + "cn: Changelog Server\n"
        + "ds-cfg-changelog-port: " + changelogPort + "\n"
        + "ds-cfg-changelog-server-id: 1\n";
    changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
@@ -110,7 +125,7 @@
        + "objectClass: ds-cfg-synchronization-provider-config\n"
        + "cn: example\n"
        + "ds-cfg-synchronization-dn: cn=schema\n"
        + "ds-cfg-changelog-server: localhost:8989\n"
        + "ds-cfg-changelog-server: localhost:" + changelogPort + "\n"
        + "ds-cfg-directory-server-id: 1\n";
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
@@ -131,7 +146,7 @@
    final DN baseDn = DN.decode("cn=schema");
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
      openChangelogSession(baseDn, (short) 2, 100, changelogPort, 5000, true);
    try
    {
@@ -217,9 +232,11 @@
    final DN baseDn = DN.decode("cn=schema");
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
      openChangelogSession(baseDn, (short) 2, 100, changelogPort, 5000, true);
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)2, 0);
    ModifyMsg modMsg = new ModifyMsg(new ChangeNumber((long) 10, 1, (short) 2),
    ModifyMsg modMsg = new ModifyMsg(gen.NewChangeNumber(),
                                     baseDn, rcvdMods, "cn=schema");
    broker.publish(modMsg);
@@ -230,4 +247,101 @@
    if (found == false)
      fail("The modification has not been correctly replayed.");
  }
  /**
   * Checks that changes done to the schema files are pushed to the
   * Changelog servers and that the ServerState is updated in the schema
   * file.
   */
  @Test(enabled=true, dependsOnMethods = { "replaySchemaChange" })
  public void pushSchemaFilesChange() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting synchronization test : pushSchemaFilesChange ", 1);
    final DN baseDn = DN.decode("cn=schema");
    ChangelogBroker broker =
      openChangelogSession(baseDn, (short) 3, 100, changelogPort, 5000, true);
    // create a schema change Notification
    AttributeType attrType =
      DirectoryServer.getAttributeType("attributetypes", true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, "( 2.5.44.76.35 NAME 'push' )"));
    Attribute attr = new Attribute(attrType, "attributetypes", values);
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.ADD, attr);
    mods.add(mod);
    MultimasterSynchronization.schemaChangeNotification(mods);
    // receive the message on the broker side.
    SynchronizationMessage msg = broker.receive();
    assertTrue(msg instanceof ModifyMsg,
               "The received synchronization message is not a MODIFY msg");
    ModifyMsg modMsg = (ModifyMsg) msg;
    Operation receivedOp = modMsg.createOperation(connection);
    assertTrue(DN.decode(modMsg.getDn()).compareTo(baseDn) == 0,
               "The received message is not for cn=schema");
    assertTrue(receivedOp instanceof ModifyOperation,
               "The received synchronization message is not a MODIFY msg");
    ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp;
    List<LDAPModification> rcvdRawMods =
      receivedModifyOperation.getRawModifications();
    this.rcvdMods = new ArrayList<Modification>();
    for (LDAPModification m : rcvdRawMods)
    {
      this.rcvdMods.add(m.toModification());
    }
    assertTrue(this.rcvdMods.contains(mod),
               "The received mod does not contain the original change");
    // check that the schema files were updated with the new ServerState.
    // by checking that the ChangeNUmber of msg we just received has been
    // added to the user schema file.
    // build the string to find in the schema file
    String stateStr = modMsg.getChangeNumber().toString();
    // open the schema file
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path = buildRoot + File.separator + "build" + File.separator +
                  "unit-tests" + File.separator + "package" + File.separator +
                  "config" + File.separator + "schema" + File.separator +
                  "99-user.ldif";
    // it is necessary to loop on this check because the state is not
    // written immediately but only every so often.
    int count = 0;
    while (true)
    {
      File file = new File(path);
      FileInputStream input = new FileInputStream(file);
      byte[] bytes = new byte[input.available()];
      input.read(bytes);
      String fileStr = new String(bytes);
      if (fileStr.indexOf(stateStr) != -1)
      {
        break;
      }
      else
      {
        if (count++ > 50)
        {
          fail("The Schema persistentState (changenumber:"
             + stateStr + ") has not been saved to " + path + " : " + fileStr);
        }
        else
          TestCaseUtils.sleep(100);
      }
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -142,6 +142,8 @@
    broker.start(servers);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    TestCaseUtils.sleep(100); // give some time to the broker to connect
                              // to the changelog server.
    if (emptyOldChanges)
    {
      /*