mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
29.14.2007 c8816d7020a4bf6dc3cd5b116b40a94bf7acabc3
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -26,13 +26,23 @@
 */
package org.opends.server.synchronization.changelog;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.util.StaticUtils.getFileForPath;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import com.sleepycat.je.DatabaseException;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
import org.opends.server.config.ConfigAttribute;
@@ -48,17 +58,7 @@
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.io.File;
import java.io.IOException;
import com.sleepycat.je.DatabaseException;
/**
 * Changelog Listener.
@@ -99,12 +99,14 @@
  private ChangelogDbEnv dbEnv;
  private int rcvWindow;
  private int queueSize;
  private String dbDirname = null;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
  static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
  static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
  static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
  static final IntegerConfigAttribute changelogPortStub =
    new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
@@ -128,6 +130,11 @@
    new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size",
                               false, false, false, true, 0, false, 0);
  static final StringConfigAttribute dbDirnameStub =
    new StringConfigAttribute(CHANGELOG_DIR_PATH_ATTR,
        "changelog storage directory path", false,
        false, true);
  /**
   * Check if a ConfigEntry is valid.
   * @param config The config entry that needs to be checked.
@@ -263,6 +270,35 @@
      configAttributes.add(queueSizeAttr);
    }
    /*
     * read the storage directory path attribute
     */
    StringConfigAttribute dbDirnameAttr =
      (StringConfigAttribute) config.getConfigAttribute(dbDirnameStub);
    if (dbDirnameAttr == null)
    {
      dbDirname = "changelogDb";
    }
    else
    {
      dbDirname = dbDirnameAttr.activeValue();
      configAttributes.add(changelogServer);
    }
    // Exists or Create
    File f = getFileForPath(dbDirname);
    try
    {
      if (!f.exists())
      {
        f.mkdir();
      }
    }
    catch (Exception e)
    {
      throw new ConfigException(MSGID_FILE_CHECK_CREATE_FAILED,
          e.getMessage() + " " + getFileForPath(dbDirname));
    }
    initialize(changelogServerId, changelogPort);
    configDn = config.getDN();
@@ -442,10 +478,8 @@
    {
      /*
       * Initialize the changelog database.
       * TODO : the changelog db path should be configurable
       */
      dbEnv = new ChangelogDbEnv(
          DirectoryServer.getServerRoot() + File.separator + "changelogDb",
      dbEnv = new ChangelogDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
          this);
      /*
@@ -475,13 +509,13 @@
    } catch (DatabaseException e)
    {
      int msgID = MSGID_COULD_NOT_INITIALIZE_DB;
      String message = getMessage(msgID, "changelogDb");
      String message = getMessage(msgID, dbDirname);
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
    } catch (ChangelogDBException e)
    {
      int msgID = MSGID_COULD_NOT_READ_DB;
      String message = getMessage(msgID, "changelogDb");
      String message = getMessage(msgID, dbDirname);
      message += getMessage(e.getMessageID());
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization.common;
@@ -265,6 +265,15 @@
    CATEGORY_MASK_SYNC | SEVERITY_MASK_MILD_ERROR | 35;
  /**
   * Failure when test existence or try to create directory
   * for the changelog database.  This message takes one
   * string argument containing details of the exception
   * and path of the directory.
   */
  public static final int MSGID_FILE_CHECK_CREATE_FAILED =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_MILD_ERROR | 36;
  /**
   * Register the messages from this class in the core server.
   *
   */
@@ -354,6 +363,9 @@
        MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE,
        "An Exception was caught while receiving synchronization message : %s");
    MessageHandler.registerMessage(MSGID_LOOP_REPLAYING_OPERATION,
         "A loop was detected while replaying operation: %s");
        "A loop was detected while replaying operation: %s");
    MessageHandler.registerMessage(MSGID_FILE_CHECK_CREATE_FAILED,
        "An Exception was caught while testing existence or trying " +
        " to create the directory for the changelog database : %s");
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -26,25 +26,38 @@
 */
package org.opends.server.synchronization.changelog;
import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import org.opends.server.TestCaseUtils;
import org.opends.server.config.ConfigEntry;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.synchronization.SynchronizationTestCase;
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ChangeNumberGenerator;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.protocol.AddMsg;
import org.opends.server.synchronization.protocol.DeleteMsg;
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyDnContext;
import org.opends.server.synchronization.protocol.ModifyMsg;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.RDN;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
 * Tests for the changelog service code.
@@ -447,4 +460,218 @@
      }
    }
  }
  /**
   * Chaining tests of the changelog code with 2 changelog servers involved
   * 2 tests are done here (itest=0 or itest=1)
   *
   * Test 1
   * - Create changelog server 1
   * - Create changelog server 2 connected with changelog server 1
   * - Create and connect client 1 to changelog server 1
   * - Create and connect client 2 to changelog server 2
   * - Make client1 publish changes
   * - Check that client 2 receives the changes published by client 1
   *
   * Test 2
   * - Create changelog server 1
   * - Create and connect client1 to changelog server 1
   * - Make client1 publish changes
   * - Create changelog server 2 connected with changelog server 1
   * - Create and connect client 2 to changelog server 2
   * - Check that client 2 receives the changes published by client 1
   *
   */
  @Test(enabled=true)
  public void changelogChaining() throws Exception
  {
    for (int itest = 0; itest <2; itest++)
    {
      ChangelogBroker broker2 = null;
      boolean emptyOldChanges = true;
      // - Create 2 connected changelog servers
      Changelog[] changelogs = new Changelog[2];
      int[] changelogPorts = new int[2];
      int[] changelogIds = new int[2];
      short[] brokerIds = new short[2];
      ServerSocket socket = null;
      // Find 2 free ports
      for (int i = 0; i <= 1; i++)
      {
        // find  a free port
        socket = TestCaseUtils.bindFreePort();
        changelogPorts[i] = socket.getLocalPort();
        changelogIds[i] = i + 10;
        brokerIds[i] = (short) (100+i);
        if ((itest==0) || (i ==0))
          socket.close();
      }
      for (int i = 0; i <= ((itest == 0) ? 1 : 0); i++)
      {
        changelogs[i] = null;
        // for itest=0, create the 2 connected changelog servers
        // for itest=1, create the 1rst changelog server, the second
        // one will be created later
        String changelogLdif = "dn: cn=Changelog Server\n"
          + "objectClass: top\n"
          + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
          + "cn: Changelog Server\n"
          + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n"
          + "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
          + "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
          + "ds-cfg-window-size: 100" + "\n"
          + "ds-cfg-changelog-db-dirname: changelogDb"+i;
        Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
        ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
        changelogs[i] = new Changelog(changelogConfig);
      }
      ChangelogBroker broker1 = null;
      try
      {
        // For itest=0, create and connect client1 to changelog1
        //              and client2 to changelog2
        // For itest=1, only create and connect client1 to changelog1
        //              client2 will be created later
        broker1 = openChangelogSession(DN.decode("dc=example,dc=com"),
            (short) brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
        if (itest == 0)
        {
          broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
              (short) brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
        }
        // - Test messages between clients by publishing now
        // - Delete
        long time = TimeThread.getTime();
        int ts = 1;
        ChangeNumber cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        DeleteMsg delMsg = new DeleteMsg("o=test"+itest+",dc=example,dc=com", cn, "uid");
        broker1.publish(delMsg);
        String user1entryUUID = "33333333-3333-3333-3333-333333333333";
        String baseUUID = "22222222-2222-2222-2222-222222222222";
        // - Add
        String lentry = new String("dn: dc=example,dc=com\n"
            + "objectClass: top\n" + "objectClass: domain\n"
            + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
        Entry entry = TestCaseUtils.entryFromLdifString(lentry);
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com",
            user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
            .getAttributes(), new ArrayList<Attribute>());
        broker1.publish(addMsg);
        // - Modify
        Attribute attr1 = new Attribute("description", "new value");
        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
        List<Modification> mods = new ArrayList<Modification>();
        mods.add(mod1);
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        ModifyMsg modMsg = new ModifyMsg(cn, DN
            .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid");
        broker1.publish(modMsg);
        // - ModifyDN
        cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
        ModifyDNOperation op = new ModifyDNOperation(connection, 1, 1, null, DN
            .decode("o=test,dc=example,dc=com"), RDN.decode("o=test2"), true,
            null);
        op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(cn, "uniqueid",
        "newparentId"));
        ModifyDNMsg modDNMsg = new ModifyDNMsg(op);
        broker1.publish(modDNMsg);
        if (itest > 0)
        {
          socket.close();
          String changelogLdif = "dn: cn=Changelog Server\n"
            + "objectClass: top\n"
            + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
            + "cn: Changelog Server\n"
            + "ds-cfg-changelog-port: " + changelogPorts[1] + "\n"
            + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n"
            + "ds-cfg-changelog-server-id: " + changelogIds[1] + "\n";
          Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
          ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
          changelogs[1] = new Changelog(changelogConfig);
          // Connect broker 2 to changelog2
          broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
              (short) brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
        }
        // - Check msg receives by broker, through changeLog2
        while (ts > 1)
        {
          SynchronizationMessage msg2;
          try
          {
            msg2 = broker2.receive();
            if (msg2 == null)
              break;
          }
          catch (Exception e)
          {
            fail("Broker receive failed: " + e.getMessage() + "#Msg:" + ts + "#itest:" + itest);
            break;
          }
          if (msg2 instanceof DeleteMsg)
          {
            DeleteMsg delMsg2 = (DeleteMsg) msg2;
            if (delMsg2.toString().equals(delMsg.toString()))
              ts--;
          }
          else if (msg2 instanceof AddMsg)
          {
            AddMsg addMsg2 = (AddMsg) msg2;
            if (addMsg2.toString().equals(addMsg.toString()))
              ts--;
          }
          else if (msg2 instanceof ModifyMsg)
          {
            ModifyMsg modMsg2 = (ModifyMsg) msg2;
            if (modMsg.equals(modMsg2))
              ts--;
          }
          else if (msg2 instanceof ModifyDNMsg)
          {
            ModifyDNMsg modDNMsg2 = (ModifyDNMsg) msg2;
            if (modDNMsg.equals(modDNMsg2))
              ts--;
          }
          else
          {
            fail("Changelog transmission failed: no expected message class.");
            break;
          }
        }
        // Check that everything expected has been received
        assertTrue(ts == 1, "Broker2 did not receive the complete set of"
            + " expected messages: #msg received " + ts);
      }
      finally
      {
        if (changelogs[0] != null)
          changelogs[0].shutdown();
        if (changelogs[1] != null)
          changelogs[1].shutdown();
        if (broker1 != null)
          broker1.stop();
        if (broker2 != null)
          broker2.stop();
      }
    }
  }
}