From fd9eb9cbbbc01d8cd50d9bbb8f53cf1b1ad8a16d Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 08 Mar 2007 08:50:49 +0000
Subject: [PATCH] Issue 613 : Add the capability to deal with schema changes made be editing the schema files : The core server provides a notification to the synchronization. The synchronization generates a pseudo changes and propagate it to the other LDAP servers through the synchronization servers.

---
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SchemaSynchronizationTest.java |  126 ++++++++++++++++++++++++++++++-
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java   |    2 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java                      |   45 ++++++++---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java                 |   20 +++++
 opendj-sdk/opends/src/server/org/opends/server/backends/SchemaBackend.java                                            |    8 +-
 5 files changed, 179 insertions(+), 22 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/SchemaBackend.java b/opendj-sdk/opends/src/server/org/opends/server/backends/SchemaBackend.java
index 24fd759..4a68e5b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/SchemaBackend.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/SchemaBackend.java
@@ -3425,10 +3425,10 @@
       if (values != null)
       {
         ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
-        attrList.add(new Attribute(matchingRuleUsesType,
-                                 matchingRuleUsesType.getPrimaryName(),
-                                 values));
-        schemaEntry.putAttribute(matchingRuleUsesType, attrList);
+        attrList.add(new Attribute(synchronizationStateType,
+                                   synchronizationStateType.getPrimaryName(),
+                                   values));
+        schemaEntry.putAttribute(synchronizationStateType, attrList);
       }
     }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
index d0cb7b9..d11b99f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/MultimasterSynchronization.java
@@ -27,6 +27,7 @@
 package org.opends.server.synchronization.plugin;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.opends.server.api.ConfigAddListener;
@@ -47,6 +48,7 @@
 import org.opends.server.core.ModifyOperation;
 import org.opends.server.core.Operation;
 import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.Modification;
 import org.opends.server.types.ResultCode;
 import org.opends.server.types.SynchronizationProviderResult;
 
@@ -535,6 +537,24 @@
     SynchronizationDomain domain = findDomain(dn, null);
     domain.backupEnd();
   }
+
+  /**
+   * This method is called whenever the server detects a modification
+   * of the schema done by directly modifying the backing files
+   * of the schema backend.
+   * Call the schema Synchronization Domain if it exists.
+   *
+   * @param  modifications  The list of modifications that was
+   *                                      applied to the schema.
+   *
+   */
+  public static void schemaChangeNotification(List<Modification> modifications)
+  {
+    SynchronizationDomain domain =
+      findDomain(DirectoryServer.getSchemaDN(), null);
+    if (domain != null)
+      domain.synchronizeModifications(modifications);
+  }
 }
 
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
index 7a4f620..11880b9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -90,6 +90,7 @@
 import org.opends.server.types.Entry;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.Modification;
 import org.opends.server.types.RDN;
 import org.opends.server.types.ResultCode;
 import org.opends.server.types.SearchFilter;
@@ -259,25 +260,19 @@
     configAttributes.add(baseDn);
 
     /*
-     * Modify conflicts are solved for all suffixes but the cn=schema suffix
+     * Modify conflicts are solved for all suffixes but the schema suffix
      * because we don't want to store extra information in the schema
      * ldif files.
      * This has no negative impact because the changes on schema should
      * not produce conflicts.
      */
-    try
+    if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0)
     {
-      if (baseDN.compareTo(DN.decode("cn=schema")) == 0)
-      {
-        solveConflictFlag = false;
-      }
-      else
-      {
-        solveConflictFlag = true;
-      }
-    } catch (DirectoryException e1)
+      solveConflictFlag = false;
+    }
+    else
     {
-      // never happens because "cn=schema" is a valid DN
+      solveConflictFlag = true;
     }
 
     state = new PersistentServerState(baseDN);
@@ -1933,4 +1928,30 @@
   {
     // Nothing is needed at the moment
   }
+
+  /**
+   * Push the modifications contain the in given parameter has
+   * a modification that would happen on a local server.
+   * The modifications are not applied to the local database,
+   * historical information is not updated but a ChangeNumber
+   * is generated and the ServerState associated to this domain is
+   * updated.
+   * @param modifications The modification to push
+   */
+  public void synchronizeModifications(List<Modification> modifications)
+  {
+    Operation op =
+      new ModifyOperation(InternalClientConnection.getRootConnection(),
+                          InternalClientConnection.nextOperationID(),
+                          InternalClientConnection.nextMessageID(),
+                          null, DirectoryServer.getSchemaDN(),
+                          modifications);
+
+    ChangeNumber cn = generateChangeNumber(op);
+    System.out.println("cn is " + cn);
+    OperationContext ctx = new ModifyContext(cn, "schema");
+    op.setAttachment(SYNCHROCONTEXT, ctx);
+    op.setResultCode(ResultCode.SUCCESS);
+    synchronize(op);
+  }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SchemaSynchronizationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SchemaSynchronizationTest.java
index 79b5e8c..7cc2103 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SchemaSynchronizationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SchemaSynchronizationTest.java
@@ -30,18 +30,25 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.ServerSocket;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.List;
 
 import org.opends.server.TestCaseUtils;
+import org.opends.server.TestErrorLogger;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.ModifyOperation;
 import org.opends.server.core.Operation;
 import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.ldap.LDAPModification;
 import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ChangeNumberGenerator;
 import org.opends.server.synchronization.plugin.ChangelogBroker;
+import org.opends.server.synchronization.plugin.MultimasterSynchronization;
 import org.opends.server.synchronization.protocol.ModifyMsg;
 import org.opends.server.synchronization.protocol.SynchronizationMessage;
 import org.opends.server.types.Attribute;
@@ -62,7 +69,9 @@
 public class SchemaSynchronizationTest extends SynchronizationTestCase
 {
 
-  ArrayList<Modification> rcvdMods = null;
+  private ArrayList<Modification> rcvdMods = null;
+
+  private int changelogPort;
 
   /**
    * Set up the environment for performing the tests in this Class.
@@ -77,6 +86,11 @@
     // This test suite depends on having the schema available.
     TestCaseUtils.startServer();
 
+    // find  a free port for the changelog server
+    ServerSocket socket = TestCaseUtils.bindFreePort();
+    changelogPort = socket.getLocalPort();
+    socket.close();
+
     // Create an internal connection
     connection = InternalClientConnection.getRootConnection();
 
@@ -100,7 +114,8 @@
     String changeLogLdif = "dn: " + changeLogStringDN + "\n"
         + "objectClass: top\n"
         + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
-        + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
+        + "cn: Changelog Server\n"
+        + "ds-cfg-changelog-port: " + changelogPort + "\n"
         + "ds-cfg-changelog-server-id: 1\n";
     changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
 
@@ -110,7 +125,7 @@
         + "objectClass: ds-cfg-synchronization-provider-config\n"
         + "cn: example\n"
         + "ds-cfg-synchronization-dn: cn=schema\n"
-        + "ds-cfg-changelog-server: localhost:8989\n"
+        + "ds-cfg-changelog-server: localhost:" + changelogPort + "\n"
         + "ds-cfg-directory-server-id: 1\n";
     synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
 
@@ -131,7 +146,7 @@
     final DN baseDn = DN.decode("cn=schema");
 
     ChangelogBroker broker =
-      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
+      openChangelogSession(baseDn, (short) 2, 100, changelogPort, 5000, true);
 
     try
     {
@@ -217,9 +232,11 @@
     final DN baseDn = DN.decode("cn=schema");
 
     ChangelogBroker broker =
-      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
+      openChangelogSession(baseDn, (short) 2, 100, changelogPort, 5000, true);
+    
+    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)2, 0);
 
-    ModifyMsg modMsg = new ModifyMsg(new ChangeNumber((long) 10, 1, (short) 2),
+    ModifyMsg modMsg = new ModifyMsg(gen.NewChangeNumber(),
                                      baseDn, rcvdMods, "cn=schema");
     broker.publish(modMsg);
 
@@ -230,4 +247,101 @@
     if (found == false)
       fail("The modification has not been correctly replayed.");
   }
+
+  /**
+   * Checks that changes done to the schema files are pushed to the
+   * Changelog servers and that the ServerState is updated in the schema
+   * file.
+   */
+  @Test(enabled=true, dependsOnMethods = { "replaySchemaChange" })
+  public void pushSchemaFilesChange() throws Exception
+  {
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.NOTICE,
+        "Starting synchronization test : pushSchemaFilesChange ", 1);
+
+    final DN baseDn = DN.decode("cn=schema");
+
+    ChangelogBroker broker =
+      openChangelogSession(baseDn, (short) 3, 100, changelogPort, 5000, true);
+
+    // create a schema change Notification
+    AttributeType attrType =
+      DirectoryServer.getAttributeType("attributetypes", true);
+    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
+    values.add(new AttributeValue(attrType, "( 2.5.44.76.35 NAME 'push' )"));
+    Attribute attr = new Attribute(attrType, "attributetypes", values);
+    List<Modification> mods = new ArrayList<Modification>();
+    Modification mod = new Modification(ModificationType.ADD, attr);
+    mods.add(mod);
+
+    MultimasterSynchronization.schemaChangeNotification(mods);
+
+    // receive the message on the broker side.
+    SynchronizationMessage msg = broker.receive();
+
+    assertTrue(msg instanceof ModifyMsg,
+               "The received synchronization message is not a MODIFY msg");
+    ModifyMsg modMsg = (ModifyMsg) msg;
+
+    Operation receivedOp = modMsg.createOperation(connection);
+    assertTrue(DN.decode(modMsg.getDn()).compareTo(baseDn) == 0,
+               "The received message is not for cn=schema");
+
+    assertTrue(receivedOp instanceof ModifyOperation,
+               "The received synchronization message is not a MODIFY msg");
+    ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp;
+
+    List<LDAPModification> rcvdRawMods =
+      receivedModifyOperation.getRawModifications();
+
+    this.rcvdMods = new ArrayList<Modification>();
+    for (LDAPModification m : rcvdRawMods)
+    {
+      this.rcvdMods.add(m.toModification());
+    }
+
+    assertTrue(this.rcvdMods.contains(mod),
+               "The received mod does not contain the original change");
+    
+    // check that the schema files were updated with the new ServerState.
+    // by checking that the ChangeNUmber of msg we just received has been
+    // added to the user schema file.
+    
+    // build the string to find in the schema file
+    String stateStr = modMsg.getChangeNumber().toString();
+
+    // open the schema file
+    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+    String path = buildRoot + File.separator + "build" + File.separator +
+                  "unit-tests" + File.separator + "package" + File.separator +
+                  "config" + File.separator + "schema" + File.separator + 
+                  "99-user.ldif";
+    
+    // it is necessary to loop on this check because the state is not
+    // written immediately but only every so often.
+    int count = 0;
+    while (true)
+    {
+      File file = new File(path);
+      FileInputStream input = new FileInputStream(file);
+      byte[] bytes = new byte[input.available()];
+      input.read(bytes);
+      String fileStr = new String(bytes);
+      if (fileStr.indexOf(stateStr) != -1)
+      {
+        break;
+      }
+      else
+      {
+        if (count++ > 50)
+        {
+          fail("The Schema persistentState (changenumber:"
+             + stateStr + ") has not been saved to " + path + " : " + fileStr);
+        }
+        else
+          TestCaseUtils.sleep(100);
+      }
+    }
+  }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
index 8da2471..f89e5a2 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -142,6 +142,8 @@
     broker.start(servers);
     if (timeout != 0)
       broker.setSoTimeout(timeout);
+    TestCaseUtils.sleep(100); // give some time to the broker to connect
+                              // to the changelog server.
     if (emptyOldChanges)
     {
       /*

--
Gitblit v1.10.0