mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noël Rouvignac
10.54.2015 28215a00c6a6c49ab982f51dee97d501ce954ad3
opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java
@@ -28,15 +28,18 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.assertj.core.api.Assertions;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.ldap.ModificationType;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.server.SynchronizationProviderCfg;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
@@ -46,30 +49,34 @@
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.*;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.Modification;
import org.opends.server.types.Operation;
import org.opends.server.types.RawModification;
import org.opends.server.util.TestTimer;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static java.util.concurrent.TimeUnit.*;
import static org.forgerock.opendj.ldap.ModificationType.*;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.util.CollectionUtils.*;
import static org.testng.Assert.*;
/**
 * Test for the schema replication.
 */
/** Test for the schema replication. */
@SuppressWarnings("javadoc")
public class SchemaReplicationTest extends ReplicationTestCase
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private List<Modification> rcvdMods;
  private int replServerPort;
  /**
   * Set up the environment for performing the tests in this Class.
   */
  /** Set up the environment for performing the tests in this Class. */
  @Override
  @BeforeClass
  public void setUp() throws Exception
@@ -107,10 +114,7 @@
    configureReplication(replServerLdif, domainLdif);
  }
  /**
   * Checks that changes done to the schema are pushed to the replicationServer
   * clients.
   */
  /** Checks that changes done to the schema are pushed to the replicationServer clients. */
  @Test
  public void pushSchemaChange() throws Exception
  {
@@ -128,45 +132,18 @@
      // Modify the schema
      Attribute attr = Attributes.create("attributetypes",
          "( 2.5.44.77.33 NAME 'dummy' )");
      List<Modification> mods = new ArrayList<>();
      Modification mod = new Modification(ModificationType.ADD, attr);
      mods.add(mod);
      ModifyOperation modOp = connection.processModify(baseDN, mods);
      assertEquals(modOp.getResultCode(), ResultCode.SUCCESS,
          "The original operation failed");
      Modification mod = new Modification(ADD, attr);
      processModify(baseDN, mod);
      // See if the client has received the msg
      ReplicationMsg msg = broker.receive();
      Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
      ModifyMsg modMsg = (ModifyMsg) msg;
      ModifyMsg modMsg = receiveModifyMsg(broker);
      assertModReceived(mod, baseDN, modMsg);
      Operation receivedOp = modMsg.createOperation(connection);
      assertEquals(modMsg.getDN(), baseDN, "The received message is not for cn=schema");
      Assertions.assertThat(receivedOp).isInstanceOf(ModifyOperation.class);
      ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp;
      this.rcvdMods = new ArrayList<>();
      for (RawModification m : receivedModifyOperation.getRawModifications())
      {
        this.rcvdMods.add(m.toModification());
      }
      assertTrue(this.rcvdMods.contains(mod),
                 "The received mod does not contain the original change");
      /*
       * Now cleanup the schema for the next test
       */
      mod = new Modification(ModificationType.DELETE, attr);
      mods.clear();
      mods.add(mod);
      modOp = connection.processModify(baseDN, mods);
      assertEquals(modOp.getResultCode(), ResultCode.SUCCESS,
          "The original operation failed");
      /* Now cleanup the schema for the next test */
      processModify(baseDN, new Modification(DELETE, attr));
      // See if the client has received the msg
      msg = broker.receive();
      Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
      receiveModifyMsg(broker);
    }
    finally
    {
@@ -174,6 +151,12 @@
    }
  }
  private void processModify(final DN baseDN, Modification mod)
  {
    ModifyOperation modOp = connection.processModify(baseDN, newArrayList(mod));
    assertEquals(modOp.getResultCode(), ResultCode.SUCCESS);
  }
  /**
   * Checks that changes to the schema pushed to the replicationServer
   * are received and correctly replayed by replication plugin.
@@ -198,10 +181,8 @@
          EntryHistorical.getEntryUUID(DirectoryServer.getEntry(baseDN)));
      broker.publish(modMsg);
      boolean found = checkEntryHasAttribute(baseDN, "attributetypes",
        "( 2.5.44.77.33 NAME 'dummy' )",
        10000, true);
      assertTrue(found, "The modification has not been correctly replayed.");
      checkEntryHasAttributeValue(baseDN, "attributetypes", "( 2.5.44.77.33 NAME 'dummy' )", 10,
          "The modification has not been correctly replayed.");
    }
    finally
    {
@@ -231,73 +212,91 @@
      // create a schema change Notification
      Attribute attr = Attributes.create("attributetypes",
        "( 2.5.44.76.35 NAME 'push' )");
      List<Modification> mods = new ArrayList<>();
      Modification mod = new Modification(ModificationType.ADD, attr);
      mods.add(mod);
      Modification mod = new Modification(ADD, attr);
      List<Modification> mods = newArrayList(mod);
      for (SynchronizationProvider<SynchronizationProviderCfg> provider : DirectoryServer.
        getSynchronizationProviders())
      for (SynchronizationProvider<?> provider : getSynchronizationProviders())
      {
        provider.processSchemaChange(mods);
      }
      // receive the message on the broker side.
      ReplicationMsg msg = broker.receive();
      Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
      ModifyMsg modMsg = (ModifyMsg) msg;
      Operation receivedOp = modMsg.createOperation(connection);
      assertEquals(modMsg.getDN(), baseDN, "The received message is not for cn=schema");
      Assertions.assertThat(receivedOp).isInstanceOf(ModifyOperation.class);
      ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp;
      this.rcvdMods = new ArrayList<>();
      for (RawModification m : receivedModifyOperation.getRawModifications())
      {
        this.rcvdMods.add(m.toModification());
      }
      assertTrue(this.rcvdMods.contains(mod),
        "The received mod does not contain the original change");
      ModifyMsg modMsg = receiveModifyMsg(broker);
      assertModReceived(mod, baseDN, modMsg);
      // check that the schema files were updated with the new ServerState.
      // by checking that the CSN of msg we just received has been
      // added to the user schema file.
      // build the string to find in the schema file
      String stateStr = modMsg.getCSN().toString();
      final String stateStr = modMsg.getCSN().toString();
      // open the schema file
      String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
      String buildDir = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
              buildRoot + File.separator + "build");
      String path = buildDir + File.separator +
        "unit-tests" + File.separator + "package-instance" + File.separator +
        "config" + File.separator + "schema" + File.separator +
        "99-user.ldif";
      final File schemaFile = getSchemaFile();
      // it is necessary to loop on this check because the state is not
      // written immediately but only every so often.
      int count = 0;
      while (true)
      TestTimer timer = new TestTimer.Builder()
        .maxSleep(5, SECONDS)
        .sleepTimes(100, MILLISECONDS)
        .toTimer();
      timer.repeatUntilSuccess(new Callable<Void>()
      {
        File file = new File(path);
        FileInputStream input = new FileInputStream(file);
        byte[] bytes = new byte[input.available()];
        input.read(bytes);
        String fileStr = new String(bytes);
        if (fileStr.contains(stateStr))
        @Override
        public Void call() throws Exception
        {
          break;
          String fileStr = readAsString(schemaFile);
          assertTrue(fileStr.contains(stateStr), "The Schema persistentState (CSN:" + stateStr
              + ") has not been saved to " + schemaFile + " : " + fileStr);
          return null;
        }
        assertTrue(count++ <= 50, "The Schema persistentState (CSN:" + stateStr
            + ") has not been saved to " + path + " : " + fileStr);
        TestCaseUtils.sleep(100);
      }
      });
    } finally
    {
      broker.stop();
    }
    logger.error(LocalizableMessage.raw("Ending replication test : pushSchemaFilesChange "));
  }
  private File getSchemaFile()
  {
    String sep = File.separator;
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String buildDir = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + sep + "target");
    final String path = buildDir + sep
        + "unit-tests" + sep + "package-instance" + sep + "config" + sep + "schema" + sep + "99-user.ldif";
    return new File(path);
  }
  private ModifyMsg receiveModifyMsg(ReplicationBroker broker) throws SocketTimeoutException
  {
    ReplicationMsg msg = broker.receive();
    Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
    return (ModifyMsg) msg;
  }
  private void assertModReceived(Modification mod, final DN baseDN, ModifyMsg modMsg) throws Exception
  {
    Operation receivedOp = modMsg.createOperation(connection);
    assertEquals(modMsg.getDN(), baseDN, "The received message is not for cn=schema");
    Assertions.assertThat(receivedOp).isInstanceOf(ModifyOperation.class);
    ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp;
    this.rcvdMods = new ArrayList<>();
    for (RawModification m : receivedModifyOperation.getRawModifications())
    {
      this.rcvdMods.add(m.toModification());
    }
    Assertions.assertThat(this.rcvdMods)
      .as("The received mod does not contain the original change")
      .contains(mod);
  }
  private String readAsString(File file) throws FileNotFoundException, IOException
  {
    FileInputStream input = new FileInputStream(file);
    byte[] bytes = new byte[input.available()];
    input.read(bytes);
    return new String(bytes);
  }
}