mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
17.00.2007 bd724fad0c954f1e607a0a90cbca3eb41d1f2460
opends/src/quicksetup/org/opends/quicksetup/installer/Installer.java
@@ -3974,6 +3974,7 @@
                        ne), ne);
      }
    }
    resetGenerationId(ctx, suffixDn, true, sourceServerDisplay);
  }
  /**
@@ -4023,4 +4024,138 @@
  {
    return (random.nextInt() & modulo);
  }
  private void resetGenerationId(InitialLdapContext ctx,
      String suffixDn, boolean displayProgress, String sourceServerDisplay)
  throws ApplicationException
  {
    boolean taskCreated = false;
    int i = 1;
    boolean isOver = false;
    String dn = null;
    BasicAttributes attrs = new BasicAttributes();
    Attribute oc = new BasicAttribute("objectclass");
    oc.add("top");
    oc.add("ds-task");
    oc.add("ds-task-reset-generation-id");
    attrs.put(oc);
    attrs.put("ds-task-class-name",
        "org.opends.server.tasks.SetGenerationIdTask");
    attrs.put("ds-task-reset-generation-id-domain-base-dn", suffixDn);
    while (!taskCreated)
    {
      String id = "quicksetup-reset-generation-id-"+i;
      dn = "ds-task-id="+id+",cn=Scheduled Tasks,cn=Tasks";
      attrs.put("ds-task-id", id);
      try
      {
        DirContext dirCtx = ctx.createSubcontext(dn, attrs);
        taskCreated = true;
        LOG.log(Level.INFO, "created task entry: "+attrs);
        dirCtx.close();
      }
      catch (NameAlreadyBoundException x)
      {
      }
      catch (NamingException ne)
      {
        LOG.log(Level.SEVERE, "Error creating task "+attrs, ne);
        throw new ApplicationException(
            ReturnCode.APPLICATION_ERROR,
                getThrowableMsg(INFO_ERROR_LAUNCHING_INITIALIZATION.get(
                        sourceServerDisplay
                ), ne), ne);
      }
      i++;
    }
    // Wait until it is over
    SearchControls searchControls = new SearchControls();
    searchControls.setCountLimit(1);
    searchControls.setSearchScope(
        SearchControls. OBJECT_SCOPE);
    String filter = "objectclass=*";
    searchControls.setReturningAttributes(
        new String[] {
            "ds-task-log-message",
            "ds-task-state"
        });
    Message lastDisplayedMsg = null;
    String lastLogMsg = null;
    long lastTimeMsgDisplayed = -1;
    while (!isOver)
    {
      try
      {
        Thread.sleep(500);
      }
      catch (Throwable t)
      {
      }
      try
      {
        NamingEnumeration res = ctx.search(dn, filter, searchControls);
        SearchResult sr = (SearchResult)res.next();
        String logMsg = getFirstValue(sr, "ds-task-log-message");
        if (logMsg != null)
        {
          if (!logMsg.equals(lastLogMsg))
          {
            LOG.log(Level.INFO, logMsg);
            lastLogMsg = logMsg;
          }
        }
        InstallerHelper helper = new InstallerHelper();
        String state = getFirstValue(sr, "ds-task-state");
        if (helper.isDone(state) || helper.isStoppedByError(state))
        {
          isOver = true;
          Message errorMsg;
          if (lastLogMsg == null)
          {
            errorMsg = INFO_ERROR_DURING_INITIALIZATION_NO_LOG.get(
                    sourceServerDisplay, state, sourceServerDisplay);
          }
          else
          {
            errorMsg = INFO_ERROR_DURING_INITIALIZATION_LOG.get(
                    sourceServerDisplay, lastLogMsg, state,
                    sourceServerDisplay);
          }
          if (helper.isCompletedWithErrors(state))
          {
            notifyListeners(getFormattedWarning(errorMsg));
          }
          else if (!helper.isSuccessful(state) ||
              helper.isStoppedByError(state))
          {
            ApplicationException ae = new ApplicationException(
                ReturnCode.APPLICATION_ERROR, errorMsg,
                null);
            throw ae;
          }
          else if (displayProgress)
          {
            notifyListeners(getFormattedProgress(
                INFO_SUFFIX_INITIALIZED_SUCCESSFULLY.get()));
          }
        }
      }
      catch (NameNotFoundException x)
      {
        isOver = true;
        notifyListeners(getFormattedProgress(
            INFO_SUFFIX_INITIALIZED_SUCCESSFULLY.get()));
      }
      catch (NamingException ne)
      {
        throw new ApplicationException(
            ReturnCode.APPLICATION_ERROR,
                getThrowableMsg(INFO_ERROR_POOLING_INITIALIZATION.get(
                        sourceServerDisplay),
                        ne), ne);
      }
    }
  }
}
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -122,7 +122,7 @@
   */
  private boolean connectionError = false;
  private Object connectPhaseLock = new Object();
  private final Object connectPhaseLock = new Object();
  /**
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -2415,7 +2415,7 @@
  public void resetGenerationId()
  {
    requestedResetSinceLastStart = true;
    ResetGenerationId genIdMessage = new ResetGenerationId();
    ResetGenerationId genIdMessage = new ResetGenerationId(this.generationId);
    broker.publish(genIdMessage);
  }
opends/src/server/org/opends/server/replication/protocol/ResetGenerationId.java
@@ -26,7 +26,10 @@
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
@@ -38,13 +41,15 @@
    Serializable
{
  private static final long serialVersionUID = 7657049716115572226L;
  private long generationId;
  /**
   * Creates a new message.
   * @param generationId The new reference value of the generationID.
   */
  public ResetGenerationId()
  public ResetGenerationId(long generationId)
  {
    this.generationId = generationId;
  }
  /**
@@ -57,9 +62,24 @@
   */
  public ResetGenerationId(byte[] in) throws DataFormatException
  {
    if (in[0] != MSG_TYPE_RESET_GENERATION_ID)
      throw new
      DataFormatException("input is not a valid GenerationId Message");
    try
    {
      if (in[0] != MSG_TYPE_RESET_GENERATION_ID)
        throw new
        DataFormatException("input is not a valid GenerationId Message");
      int pos = 1;
      /* read the generationId */
      int length = getNextLength(in, pos);
      generationId = Long.valueOf(new String(in, pos, length,
      "UTF-8"));
      pos += length +1;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
@@ -68,11 +88,33 @@
  @Override
  public byte[] getBytes()
  {
    int length = 1;
    byte[] resultByteArray = new byte[length];
    try
    {
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
    /* put the type of the operation */
    resultByteArray[0] = MSG_TYPE_RESET_GENERATION_ID;
    return resultByteArray;
      /* Put the message type */
      oStream.write(MSG_TYPE_RESET_GENERATION_ID);
      // Put the generationId
      oStream.write(String.valueOf(generationId).getBytes("UTF-8"));
      oStream.write(0);
      return oStream.toByteArray();
    }
    catch (IOException e)
    {
      // never happens
      return null;
    }
  }
  /**
   * Returns the generation Id set in this message.
   * @return the value of the generation ID.
   *
   */
  public long getGenerationId()
  {
    return this.generationId;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -959,19 +959,6 @@
    }
    /**
     * Sets the replication server informations for the provided
     * handler from the provided ReplServerInfoMessage.
     *
     * @param handler The server handler from which the info was received.
     * @param infoMsg The information message that was received.
     */
    public void setReplServerInfo(
        ServerHandler handler, ReplServerInfoMessage infoMsg)
    {
      handler.setReplServerInfo(infoMsg);
    }
    /**
     * Sets the provided value as the new in memory generationId.
     *
     * @param generationId The new value of generationId.
@@ -1007,20 +994,27 @@
     *
     * @param senderHandler The handler associated to the server
     *        that requested to reset the generationId.
     * @param genIdMsg The reset generation ID msg received.
     */
    public void resetGenerationId(ServerHandler senderHandler)
    public void resetGenerationId(ServerHandler senderHandler,
        ResetGenerationId genIdMsg)
    {
      long newGenId = genIdMsg.getGenerationId();
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDN=" + baseDn +
          " RCache.resetGenerationId");
          " RCache.resetGenerationId received new ref genId="  + newGenId);
      // Notifies the others LDAP servers that from now on
      // they have the bad generationId
      for (ServerHandler handler : connectedServers.values())
      {
        handler.resetGenerationId();
        if (newGenId != handler.getGenerationId())
        {
          handler.resetGenerationId();
        }
      }
      // Propagates the reset message to the others replication servers
@@ -1031,7 +1025,7 @@
        {
          try
          {
            handler.sendGenerationId(new ResetGenerationId());
            handler.sendGenerationId(genIdMsg);
          }
          catch (IOException e)
          {
@@ -1041,45 +1035,48 @@
        }
      }
      // Reset the localchange and state db for the current domain
      synchronized (sourceDbHandlers)
      if (this.generationId != newGenId)
      {
        for (DbHandler dbHandler : sourceDbHandlers.values())
        // Reset the localchange and state db for the current domain
        synchronized (sourceDbHandlers)
        {
          try
          for (DbHandler dbHandler : sourceDbHandlers.values())
          {
            dbHandler.clear();
            try
            {
              dbHandler.clear();
            }
            catch (Exception e)
            {
              // TODO: i18n
              logError(Message.raw(
                  "Exception caught while clearing dbHandler:" +
                  e.getLocalizedMessage()));
            }
          }
          catch (Exception e)
          {
            // TODO: i18n
            logError(Message.raw(
                "Exception caught while clearing dbHandler:" +
                e.getLocalizedMessage()));
          }
          sourceDbHandlers.clear();
          if (debugEnabled())
            TRACER.debugInfo(
                "In " + this.replicationServer.getMonitorInstanceName() +
                " baseDN=" + baseDn +
            " The source db handler has been cleared");
        }
        sourceDbHandlers.clear();
        try
        {
          replicationServer.clearGenerationId(baseDn);
        }
        catch (Exception e)
        {
          // TODO: i18n
          logError(Message.raw(
              "Exception caught while clearing generationId:" +
              e.getLocalizedMessage()));
        }
        if (debugEnabled())
          TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              " baseDN=" + baseDn +
              " The source db handler has been cleared");
        // Reset the in memory domain generationId
        generationId = newGenId;
      }
      try
      {
        replicationServer.clearGenerationId(baseDn);
      }
      catch (Exception e)
      {
        // TODO: i18n
        logError(Message.raw(
            "Exception caught while clearing generationId:" +
            e.getLocalizedMessage()));
      }
      // Reset the in memory domain generationId
      generationId = -1;
    }
    /**
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -130,6 +130,14 @@
  // ID of the backend
  private static final String backendId = "replicationChanges";
  // At startup, the listen thread wait on this flag for the connet
  // thread to look for other servers in the topology.
  // TODO when a replication server is out of date (has old changes
  // to receive from other servers, the listen thread should not accept
  // connection from ldap servers. (issue 1302)
  private boolean connectedInTopology = false;
  private final Object connectedInTopologyLock = new Object();
  /**
   * The tracer object for the debug logger.
   */
@@ -211,6 +219,23 @@
  void runListen()
  {
    Socket newSocket;
    // wait for the connect thread to find other replication
    // servers in the topology before starting to accept connections
    // from the ldap servers.
    synchronized (connectedInTopologyLock)
    {
      if (connectedInTopology == false)
      {
        try
        {
          connectedInTopologyLock.wait(1000);
        } catch (InterruptedException e)
        {
        }
      }
    }
    while ((shutdown == false) && (stopListen  == false))
    {
      // Wait on the replicationServer port.
@@ -286,6 +311,15 @@
          }
        }
      }
      synchronized (connectedInTopologyLock)
      {
        // wake up the listen thread if necessary.
        if (connectedInTopology == false)
        {
          connectedInTopologyLock.notify();
          connectedInTopology = true;
        }
      }
      try
      {
        synchronized (this)
@@ -391,7 +425,7 @@
      // FIXME : Is it better to have the time to receive the ReplServerInfo
      // from all the other replication servers since this info is necessary
      // to route an early received total update request.
      try { Thread.sleep(300);} catch(Exception e) {}
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates listen threads");
@@ -798,6 +832,7 @@
        // Add the replication backend
        DirectoryServer.getConfigHandler().addEntry(backendConfigEntry, null);
      }
      ldifImportConfig.close();
    }
    catch(Exception e)
    {
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -1617,7 +1617,7 @@
    *
    * @param infoMsg The information message.
    */
   public void setReplServerInfo(ReplServerInfoMessage infoMsg)
   public void receiveReplServerInfo(ReplServerInfoMessage infoMsg)
   {
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
@@ -1749,6 +1749,7 @@
   public void sendGenerationId(ResetGenerationId msg)
   throws IOException
   {
     generationId = msg.getGenerationId();
     session.publish(msg);
   }
}
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -182,7 +182,7 @@
        else if (msg instanceof ResetGenerationId)
        {
          ResetGenerationId genIdMsg = (ResetGenerationId) msg;
          replicationCache.resetGenerationId(this.handler);
          replicationCache.resetGenerationId(this.handler, genIdMsg);
        }
        else if (msg instanceof WindowProbe)
        {
@@ -192,7 +192,7 @@
        else if (msg instanceof ReplServerInfoMessage)
        {
          ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
          handler.setReplServerInfo(infoMsg);
          handler.receiveReplServerInfo(infoMsg);
          if (debugEnabled())
          {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -719,7 +719,6 @@
      long genId;
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      assertEquals(replServer1.getGenerationId(baseDn), -1);
      /*
       * Test  : empty replicated backend
@@ -892,6 +891,13 @@
       * Test: Reset the replication server in order to allow new data set.
       */
      debugInfo("Launch an on-line import on DS.");
      genId=-1;
      Entry importTask = getTaskImport();
      addTask(importTask, ResultCode.SUCCESS, null);
      waitTaskState(importTask, TaskState.COMPLETED_SUCCESSFULLY, null);
      Thread.sleep(500);
      Entry taskReset = TestCaseUtils.makeEntry(
          "dn: ds-task-id=resetgenid"+genId+ UUID.randomUUID() +
          ",cn=Scheduled Tasks,cn=Tasks",
@@ -908,33 +914,17 @@
      // TODO: Test that replication server db has been cleared
      assertEquals(replServer1.getGenerationId(baseDn),
          -1, "Expected genId to be reset in replServer1");
      debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import.");
      genId = readGenId();
      assertTrue(genId != -1, "DS is expected to have a new genID computed " +
          " after on-line import but genId=" + genId);
      ReplicationMessage rcvmsg = broker2.receive();
      if (!(rcvmsg instanceof ErrorMessage))
      {
        fail("Broker2 is expected to receive an ErrorMessage " +
            " to signal degradation due to reset" + rcvmsg);
      }
      ErrorMessage emsg = (ErrorMessage)rcvmsg;
      debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
      rgenId = replServer1.getGenerationId(baseDn);
      assertEquals(genId, rgenId, "DS and replServer are expected to have same genId.");
      rcvmsg = broker3.receive();
      if (!(rcvmsg instanceof ErrorMessage))
      {
        fail("Broker3 is expected to receive an ErrorMessage " +
            " to signal degradation due to reset" + rcvmsg);
      }
      emsg = (ErrorMessage)rcvmsg;
      debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
      rgenId = replServer1.getGenerationId(baseDn);
      assertTrue(rgenId==-1,"Expecting that genId has been reset in replServer1: rgenId="+rgenId);
      assertTrue(replServer1.getReplicationCache(baseDn, false).
      assertTrue(!replServer1.getReplicationCache(baseDn, false).
          isDegradedDueToGenerationId(server1ID),
      "Expecting that DS is degraded since domain genId has been reset");
      "Expecting that DS is not degraded since domain genId has been reset");
      assertTrue(replServer1.getReplicationCache(baseDn, false).
          isDegradedDueToGenerationId(server2ID),
@@ -946,8 +936,39 @@
      // Now create a change that normally would be replicated
      // but will not be replicated here since DS and brokers are degraded
      String[] ent2 = { createEntry(UUID.randomUUID()) };
      this.addTestEntriesToDB(ent2);
      String[] ent3 = { createEntry(UUID.randomUUID()) };
      this.addTestEntriesToDB(ent3);
      try
      {
        ReplicationMessage msg = broker2.receive();
        if (!(msg instanceof ErrorMessage))
        {
          fail("Broker 2 connection is expected to receive an ErrorMessage."
              + msg);
        }
        ErrorMessage emsg = (ErrorMessage)msg;
        debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
      }
      catch(SocketTimeoutException se)
      {
        fail("Broker 2 is expected to receive an ErrorMessage.");
      }
      try
      {
        ReplicationMessage msg = broker3.receive();
        if (!(msg instanceof ErrorMessage))
        {
          fail("Broker 3 connection is expected to receive an ErrorMessage."
              + msg);
        }
        ErrorMessage emsg = (ErrorMessage)msg;
        debugInfo(testCase + " " + emsg.getMsgID() + " " + emsg.getDetails());
      }
      catch(SocketTimeoutException se)
      {
        fail("Broker 3 is expected to receive an ErrorMessage.");
      }
      try
      {
@@ -969,23 +990,7 @@
        ReplicationMessage msg = broker3.receive();
        fail("No update message is supposed to be received by degraded broker3"+ msg);
      } catch(SocketTimeoutException e) { /* expected */ }
      debugInfo("Launch an on-line import on DS.");
      genId=-1;
      Entry importTask = getTaskImport();
      addTask(importTask, ResultCode.SUCCESS, null);
      waitTaskState(importTask, TaskState.COMPLETED_SUCCESSFULLY, null);
      Thread.sleep(500);
      debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import.");
      genId = readGenId();
      assertTrue(genId != -1, "DS is expected to have a new genID computed " +
          " after on-line import but genId=" + genId);
      rgenId = replServer1.getGenerationId(baseDn);
      assertEquals(genId, rgenId, "DS and replServer are expected to have same genId.");
      // In S1 launch the total update to initialize S2
      addTask(taskInitRemoteS2, ResultCode.SUCCESS, null);
@@ -1008,7 +1013,7 @@
      Thread.sleep(200);
      debugInfo("Verifying that replServer1 has been reset.");
      assertEquals(replServer1.getGenerationId(baseDn), -1);
      assertEquals(replServer1.getGenerationId(baseDn), rgenId);
      debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
      disconnectFromReplServer(changelog1ID);
@@ -1163,9 +1168,9 @@
    
    debugInfo("Verifying that all replservers genIds have been reset.");
    genId = readGenId();
    assertEquals(replServer1.getGenerationId(baseDn), -1);
    assertEquals(replServer2.getGenerationId(baseDn), -1);
    assertEquals(replServer3.getGenerationId(baseDn), -1);
    assertEquals(replServer2.getGenerationId(baseDn), genId);
    assertEquals(replServer2.getGenerationId(baseDn), genId);
    assertEquals(replServer3.getGenerationId(baseDn), genId);
    debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
    disconnectFromReplServer(changelog1ID);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -48,6 +48,9 @@
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
@@ -75,6 +78,7 @@
import org.opends.server.types.ModificationType;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchScope;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
import org.testng.annotations.AfterClass;
@@ -1198,4 +1202,4 @@
     catch(Exception e) {};
     return l;
   }
}
 }