mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
27.28.2010 a5c5efbf8ca56c059709953f7fedb647dadaed06
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication;
@@ -61,10 +61,13 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -146,9 +149,11 @@
  boolean emptyOldChanges = true;
  LDAPReplicationDomain replDomain = null;
  int initWindow = 100;
  private void log(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
        "InitOnLineTests/" + s));
    if (debugEnabled())
    {
@@ -183,7 +188,8 @@
    // clear it.
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
    updatedEntries = newLDIFEntries();
    // For most tests, a limited number of entries is enough
    updatedEntries = newLDIFEntries(2);
    // Create an internal connection in order to provide operations
    // to DS to populate the db -
@@ -283,6 +289,7 @@
  private void waitTaskCompleted(Entry taskEntry, TaskState expectedState,
      long expectedLeft, long expectedDone)
  {
    log("waitTaskCompleted " + taskEntry.toLDIFString());
    try
    {
      // FIXME - Factorize with TasksTestCase
@@ -398,6 +405,22 @@
      for (String ldifEntry : updatedEntries)
      {
        Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
        addTestEntryToDB(entry);
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
      log("addTestEntriesToDB : " + updatedEntries.length + " successfully added to DB");
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  private void addTestEntryToDB(Entry entry)
  {
    try
    {
        AddOperationBasis addOp = new AddOperationBasis(connection,
            InternalClientConnection.nextOperationID(), InternalClientConnection
            .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
@@ -411,18 +434,17 @@
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
      fail("addTestEntryToDB Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /*
   * Creates entries necessary to the test.
   */
  private String[] newLDIFEntries()
  private String[] newLDIFEntries(int entriesCnt)
  {
    // It is relevant to test ReplLDIFInputStream
    // and ReplLDIFOutputStream with big entries
@@ -430,46 +452,76 @@
    for (int i=0; i<bigAttributeValue.length; i++)
      bigAttributeValue[i] = Integer.toString(i).charAt(0);
    String[] entries =
    {
    String[] entries = new String[entriesCnt + 2];
    String filler = "000000000000000000000000000000000000";
    entries[0] = new String(
        "dn: " + EXAMPLE_DN + "\n"
        + "objectClass: top\n"
        + "objectClass: domain\n"
        + "dc: example\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
        + "\n",
        + "\n");
    entries[1] = new String(
          "dn: ou=People," + EXAMPLE_DN + "\n"
        + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "ou: People\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
        + "\n",
          "dn: cn=Fiona Jensen,ou=people," + EXAMPLE_DN + "\n"
        + "\n");
    for (int i=0; i<entriesCnt; i++)
    {
      String useri="0000"+i;
      entries[i+2] = new String(
          "dn: cn="+useri+",ou=people," + EXAMPLE_DN + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Fiona Jensen\n"
        + "sn: Jensen\n"
        + "uid: fiona\n"
        + "cn: "+useri+"_cn"+"\n"
        + "sn: "+useri+"_sn"+"\n"
        + "uid: "+useri+"_uid"+"\n"
        + "telephonenumber:: "+ Base64.encode(
            new String(bigAttributeValue).getBytes())+"\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
        + "\n",
          "dn: cn=Robert Langman,ou=people," + EXAMPLE_DN + "\n"
        + "entryUUID: 21111111-1111-1111-1111-"+useri+
        filler.substring(0, 12-useri.length())+"\n"
        + "\n");
    };
    return entries;
  }
  /*
   * Creates entries necessary to the test.
   */
  private String newLDIFEntry(int entryCnt)
  {
    // It is relevant to test ReplLDIFInputStream
    // and ReplLDIFOutputStream with big entries
    char bigAttributeValue[] = new char[30240];
    for (int i=0; i<bigAttributeValue.length; i++)
      bigAttributeValue[i] = Integer.toString(i).charAt(0);
    String filler = "000000000000000000000000000000000000";
    String useri="0000"+entryCnt;
    return  new String(
        "dn: cn="+useri+",ou=people," + EXAMPLE_DN + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Robert Langman\n"
        + "sn: Langman\n"
        + "uid: robert\n"
        + "telephonenumber: "+ new String(bigAttributeValue)+"\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
        + "\n"
        };
        + "cn: "+useri+"_cn"+"\n"
        + "sn: "+useri+"_sn"+"\n"
        + "uid: "+useri+"_uid"+"\n"
        + "telephonenumber:: "+ Base64.encode(
            new String(bigAttributeValue).getBytes())+"\n"
            + "entryUUID: 21111111-1111-1111-1111-"+useri+
            filler.substring(0, 12-useri.length())+"\n"
            + "\n");
    return entries;
  }
  /**
@@ -488,15 +540,16 @@
      RoutableMsg initTargetMessage =
        new InitializeTargetMsg(
          EXAMPLE_DN, server2ID, destinationServerID, requestorID,
          updatedEntries.length);
          updatedEntries.length, initWindow);
      broker.publish(initTargetMessage);
      int cnt = 0;
      for (String entry : updatedEntries)
      {
        log("Broker will pusblish 1 entry: bytes:"+ entry.length());
        log("Broker will publish 1 entry: bytes:"+ entry.length());
        EntryMsg entryMsg = new EntryMsg(senderID, destinationServerID,
            entry.getBytes());
            entry.getBytes(), ++cnt);
        broker.publish(entryMsg);
      }
@@ -559,7 +612,7 @@
      }
      catch (SocketTimeoutException e)
      {
        log("SocketTimeoutException while waiting fro entries" +
        log("SocketTimeoutException while waiting for entries" +
            stackTraceToSingleLineString(e));
      }
      catch(Exception e)
@@ -571,6 +624,11 @@
    assertTrue(entriesReceived == updatedEntries.length,
        " Received entries("+entriesReceived +
        ") == Expected entries("+updatedEntries.length+")");
    broker.setGenerationID(EMPTY_DN_GENID);
    broker.reStart(true);
    try { Thread.sleep(500); } catch(Exception e) {}
  }
  /**
@@ -643,6 +701,11 @@
   */
  private void connectServer1ToChangelog(int changelogID)
  {
    connectServer1ToChangelog(changelogID, 0);
  }
  private void connectServer1ToChangelog(int changelogID, int heartbeat)
  {
    // Connect DS to the replicationServer
    try
    {
@@ -651,7 +714,7 @@
      String synchroServerLdif =
        "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
      + "objectClass: top\n"
      + "objectC7lass: ds-cfg-synchronization-provider\n"
      + "objectClass: ds-cfg-synchronization-provider\n"
      + "objectClass: ds-cfg-replication-domain\n"
      + "cn: " + testName + "\n"
      + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
@@ -659,7 +722,7 @@
      + getChangelogPort(changelogID)+"\n"
      + "ds-cfg-server-id: " + server1ID + "\n"
      + "ds-cfg-receive-status: true\n"
//    + "ds-cfg-heartbeat-interval: 0 ms\n"
      + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
      + "ds-cfg-window-size: " + WINDOW_SIZE;
@@ -706,11 +769,19 @@
  /**
   * Tests the import side of the Initialize task
   * Test steps :
   * - create a task 'InitFromS2' in S1
   * - make S2 export its entries
   * - test that S1 has succesfully imported the entries and completed the task.
   *
   * TODO: Error case: make S2 crash/disconnect in the middle of the export
   * and test that, on S1 side, the task ends with an error.
   * State of the backend on S1 partially initialized: ?
   */
  @Test(enabled=true, groups="slow")
  public void initializeImport() throws Exception
  {
    String testCase = "initializeImport";
    String testCase = "initializeImport ";
    log("Starting "+testCase);
@@ -740,8 +811,8 @@
      InitializeRequestMsg initMsg = (InitializeRequestMsg)msg;
      // S2 publishes entries to S1
      makeBrokerPublishEntries(server2, server2ID, initMsg.getsenderID(),
          initMsg.getsenderID());
      makeBrokerPublishEntries(server2, server2ID, initMsg.getSenderID(),
          initMsg.getSenderID());
      // Wait for task (import) completion in S1
      waitTaskCompleted(taskInitFromS2, TaskState.COMPLETED_SUCCESSFULLY,
@@ -757,12 +828,16 @@
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
  /**
   * Tests the export side of the Initialize task
   * Test steps :
   * - add entries in S1, make S2 publish InitRequest
   * - test that S1 has succesfully exported the entries (by receiving them
   *   on S2 side).
   */
  @Test(enabled=true, groups="slow")
  public void initializeExport() throws Exception
@@ -789,20 +864,27 @@
      // Thread.sleep(3000);
      InitializeRequestMsg initMsg = new InitializeRequestMsg(EXAMPLE_DN,
        server2ID, server1ID);
        server2ID, server1ID, 100);
      server2.publish(initMsg);
      // Signal RS we just entered the full update status
      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      receiveUpdatedEntries(server2, server2ID, updatedEntries);
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
}
  /**
   * Tests the import side of the InitializeTarget task
   * Test steps :
   * - add entries in S1 and create a task 'InitTargetS2' in S1
   * - wait task completed
   * - test that S2 has succesfully received the entries
   */
  @Test(enabled=true, groups="slow")
  public void initializeTargetExport() throws Exception
@@ -832,21 +914,33 @@
      // Launch in S1 the task that will initialize S2
      addTask(taskInitTargetS2, ResultCode.SUCCESS, null);
      // Wait for task completion
      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
      // Signal RS we just entered the full update status
      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      // Tests that entries have been received by S2
      receiveUpdatedEntries(server2, server2ID, updatedEntries);
      // Wait for task completion
      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
  /**
   * Tests the import side of the InitializeTarget task
   * Test steps :
   * - addEntries in S1, create a task 'InitAll' in S1
   * - wait task completed on S1
   * - test that S2 and S3 have succesfully imported the entries.
   *
   * TODO: Error case: make S1 crash in the middle of the export and test that
   * the task ends with an error. State of the backend on both S2 and S3: ?
   *
   * TODO: Error case: make S2 crash in the middle of the import and test what??
   */
  @Test(enabled=true, groups="slow")
  public void initializeTargetExportAll() throws Exception
@@ -879,17 +973,22 @@
      // Launch in S1 the task that will initialize S2
      addTask(taskInitTargetAll, ResultCode.SUCCESS, null);
      // Wait for task completion
      waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, null);
      // Tests that entries have been received by S2
      // Signal RS we just entered the full update status
      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      server3.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      receiveUpdatedEntries(server2, server2ID, updatedEntries);
      receiveUpdatedEntries(server3, server3ID, updatedEntries);
      // Wait for task completion
      waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, null);
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -921,13 +1020,20 @@
      // wait until the replication domain has expected generationID
      // this should indicate that the import occured correctly.
      for (int count = 0; count < 100; count++)
      for (int count = 0; count < 120; count++)
      {
        if (replDomain.getGenerationID() == 56869)
        if (replDomain.getGenerationID() == 53235)
          break;
        Thread.sleep(200);
        log(testCase + " genId=" + replDomain.getGenerationID());
        Thread.sleep(1000);
      }
      if (replDomain.getGenerationID() != 53235)
      {
        fail(testCase + " Import success waited longer than expected \n" +
            TestCaseUtils.threadStacksToString());
      }
      // Test that entries have been imported in S1
      testEntriesInDb();
@@ -938,7 +1044,7 @@
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -993,7 +1099,7 @@
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1052,7 +1158,8 @@
          "ds-task-initialize-domain-dn: " + baseDn,
          "ds-task-initialize-replica-server-id: -3");
      addTask(taskInit, ResultCode.OTHER,
          ERR_INVALID_IMPORT_SOURCE.get());
          ERR_INVALID_IMPORT_SOURCE.get(baseDn.toNormalizedString(),
              Integer.toString(server1ID),"-3",""));
      // Scope containing a serverID absent from the domain
      // createTask(taskInitTargetS2);
@@ -1064,7 +1171,7 @@
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1156,23 +1263,24 @@
    // TODO Test ReplicationServerDomain.getDestinationServers method.
      log("Successfully ending " + testCase);
    } finally
    {
      if (broker2 != null)
        broker2.stop();
      if (broker3 != null)
        broker3.stop();
      afterTest();
      afterTest(testCase);
    }
  }
  @Test(enabled=true, groups="slow")
  public void initializeTargetExportMultiSS() throws Exception
  {
    String testCase = "initializeTargetExportMultiSS";
    try
    {
      String testCase = "initializeTargetExportMultiSS";
      log("Starting " + testCase);
      // Create 2 changelogs
@@ -1190,6 +1298,7 @@
      // connected to changelog2
      if (server2 == null)
      {
        log(testCase + " Will connect server 2 to " + changelog2ID);
        server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
            server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
      }
@@ -1197,19 +1306,40 @@
     // Thread.sleep(1000);
      // Launch in S1 the task that will initialize S2
      log(testCase + " add task " + Thread.currentThread());
      addTask(taskInitTargetS2, ResultCode.SUCCESS, null);
      // Wait for task completion
      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
      log(testCase + " " + server2.getServerId() + " wait target " + Thread.currentThread());
      ReplicationMsg msgrcv;
      do
      {
        msgrcv = server2.receive();
        log(testCase + " " + server2.getServerId() + " receives " + msgrcv);
      }
      while(!(msgrcv instanceof InitializeTargetMsg));
      assertTrue(msgrcv instanceof InitializeTargetMsg, msgrcv.getClass().getCanonicalName());
      // Signal RS we just entered the full update status
      log(testCase + " change status");
      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      // Tests that entries have been received by S2
      log(testCase + " receive entries");
      receiveUpdatedEntries(server2, server2ID, updatedEntries);
      // Wait for task completion
      log(testCase + " wait task completed");
      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      log(testCase + e.getLocalizedMessage());
    }
    finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1263,17 +1393,34 @@
      // S3 sends init request
      log(testCase + " server 3 Will send reqinit to " + server1ID);
      InitializeRequestMsg initMsg =
        new InitializeRequestMsg(EXAMPLE_DN, server3ID, server1ID);
        new InitializeRequestMsg(EXAMPLE_DN, server3ID, server1ID, 100);
      server3.publish(initMsg);
      // S3 should receive target, entries & done
      log(testCase + " Wait for InitializeTargetMsg");
      ReplicationMsg msgrcv = null;
      do
      {
        msgrcv = server3.receive();
        log(testCase + " receives  "+ msgrcv);
      }
      while (!(msgrcv instanceof InitializeTargetMsg));
      assertTrue(msgrcv instanceof InitializeTargetMsg,msgrcv.getClass().getCanonicalName() +
      msgrcv);
      // Signal RS we just entered the full update status
      server3.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
      log(testCase + " Will verify server 3 has received expected entries");
      receiveUpdatedEntries(server3, server3ID, updatedEntries);
      log(testCase + " Will verify no more msgs");
      while (true)
      {
        try
        {
          log(testCase + " Will receive");
          ReplicationMsg msg = server3.receive();
          fail("Receive unexpected message " + msg);
        } catch (SocketTimeoutException e)
@@ -1282,11 +1429,11 @@
          break;
        }
      }
      log("Successfully ending " + testCase);
    } finally
    }
    finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1318,7 +1465,8 @@
      addTask(taskInit, ResultCode.SUCCESS, null);
      waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
            baseDn.toString(), "20"));
      // Test 2
      taskInit = TestCaseUtils.makeEntry(
@@ -1331,7 +1479,9 @@
        "ds-task-initialize-domain-dn: " + baseDn,
        "ds-task-initialize-replica-server-id: " + server1ID);
      addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get());
      addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get(
          baseDn.toNormalizedString(),
          Integer.toString(server1ID),"20",""));
      if (replDomain != null)
      {
@@ -1342,7 +1492,7 @@
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1376,7 +1526,7 @@
      addTask(taskInit, ResultCode.SUCCESS, null);
      waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDn.toString(), "0"));
      if (replDomain != null)
      {
@@ -1387,7 +1537,7 @@
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
@@ -1483,14 +1633,15 @@
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
      afterTest(testCase);
    }
  }
  /**
   * Disconnect broker and remove entries from the local DB
   * @param testCase The name of the test case.
   */
  protected void afterTest()
  protected void afterTest(String testCase)
  {
    // Check that the domain has completed the import/export task.
@@ -1564,6 +1715,7 @@
    {
      replServerPort[i] = 0;
    }
    log("Successfully cleaned " + testCase);
  }
    /**