mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
31.33.2010 2caca3a5c55076f212fe2b2d725769737160c59c
Fix for issue #4526. Fixes a race condition in Replication Server when resetting the GenerationID
3 files modified
115 ■■■■ changed files
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 25 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 84 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1810,8 +1810,12 @@
  /**
   * Creates a new list that contains only replication servers that have the
   * passed generation id, from a passed replication server list.
   * @param bestServers The list of replication servers to filter
   * provided generation id, from a provided replication server list.
   * When the selected replication servers have no change (empty serverState)
   * then the 'empty'(generationId==-1) replication servers are also included
   * in the result list.
   *
   * @param bestServers  The list of replication servers to filter
   * @param generationId The generation id that must match
   * @return The sub list of replication servers matching the requested
   * generation id (which may be empty)
@@ -1822,6 +1826,7 @@
  {
    Map<Integer, ReplicationServerInfo> result =
      new HashMap<Integer, ReplicationServerInfo>();
    boolean emptyState = true;
    for (Integer rsId : bestServers.keySet())
    {
@@ -1829,6 +1834,22 @@
      if (replicationServerInfo.getGenerationId() == generationId)
      {
        result.put(rsId, replicationServerInfo);
        if (!replicationServerInfo.serverState.isEmpty())
          emptyState = false;
      }
    }
    if (emptyState)
    {
      // If the RS with a generationId have all an empty state,
      // then the 'empty'(genId=-1) RSes are also candidate
      for (Integer rsId : bestServers.keySet())
      {
        ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
        if (replicationServerInfo.getGenerationId() == -1)
        {
          result.put(rsId, replicationServerInfo);
        }
      }
    }
    return result;
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -591,19 +591,21 @@
  }
  /**
   * Check if a remote replica (DS) is connected to the topology based on
   * the TopologyMsg we received when the remote replica connected or
   * disconnected.
   * Returns informations about the DS server related to the provided serverId.
   * based on the TopologyMsg we received when the remote replica connected or
   * disconnected. Return null when no server with the provided serverId is
   * connected.
   *
   * @param serverId The provided serverId of the remote replica
   * @return whether the remote replica is connected or not.
   * @param  serverId The provided serverId of the remote replica
   * @return the info related to this remote server if it is connected,
   *                  null is the server is NOT connected.
   */
  public boolean isRemoteDSConnected(int serverId)
  public DSInfo isRemoteDSConnected(int serverId)
  {
    for (DSInfo remoteDS : getReplicasList())
      if (remoteDS.getDsId() == serverId)
        return true;
    return false;
        return remoteDS;
    return null;
  }
  /**
@@ -1670,13 +1672,12 @@
  }
  /*
   * For all remote servers in tht start list,
   * For all remote servers in the start list,
   * - wait it has finished the import and present the expected generationID
   * - build the failureList
   */
  private void waitForRemoteEndOfInit()
  {
    int waitResultAttempt = 0;
    Set<Integer> replicasWeAreWaitingFor =  new HashSet<Integer>(0);
    for (Integer sid : ieContext.startList)
@@ -1696,36 +1697,60 @@
    do
    {
      done = true;
      for (DSInfo dsi : getReplicasList())
      short reconnectMaxDelayInSec = 10;
      short reconnectWait = 0;
      for (int serverId : replicasWeAreWaitingFor)
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "[IE] wait for end dsid " + dsi.getDsId()
            + " " + dsi.getStatus()
            + " " + dsi.getGenerationId()
            + " " + this.getGenerationID());
        if (!ieContext.failureList.contains(dsi.getDsId()))
        if (ieContext.failureList.contains(serverId))
        {
          if (dsi.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
          // this server has already been in error during initialization
          // dont't wait for it
          continue;
        }
        DSInfo dsInfo = null;
        dsInfo = isRemoteDSConnected(serverId);
        if (dsInfo == null)
        {
          // this server is disconnected
          // may be for a long time if it crashed or had been stopped
          // may be just the time to reconnect after import : should be short
          if (++reconnectWait<reconnectMaxDelayInSec)
          {
            // let's still wait to give a chance to this server to reconnect
            done = false;
          }
          else
          {
            // we left enough time to the servers to reconnect - now it's too
            // late
          }
        }
        else
        {
          // this server is connected
          if (dsInfo.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
          {
            // this one is still doing the Full Update ... retry later
            done = false;
            try
            { Thread.sleep(1000); } catch (InterruptedException e) {} // 1s
            waitResultAttempt++;
            break;
          }
          else
          {
            // this one is done with the Full Update
            if (dsi.getGenerationId() == this.getGenerationID())
            if (dsInfo.getGenerationId() == this.getGenerationID())
            {
              // and with the expected generationId
              replicasWeAreWaitingFor.remove(dsi.getDsId());
              replicasWeAreWaitingFor.remove(serverId);
            }
          }
        }
      }
      // loop and wait
      if (!done)
        try { Thread.sleep(1000); } catch (InterruptedException e) {} // 1sec
    }
    while ((!done) && (!broker.shuttingDown())); // infinite wait
@@ -1921,7 +1946,7 @@
          // Other messages received during an import are trashed except
          // the topologyMsg.
          if ((msg instanceof TopologyMsg) &&
              (!this.isRemoteDSConnected(ieContext.importSource)))
              (isRemoteDSConnected(ieContext.importSource)==null))
          {
            Message errMsg =
              Message.raw(Category.SYNC, Severity.NOTICE,
@@ -2013,7 +2038,7 @@
        throw(new IOException(ieContext.getException().getMessage()));
      int slowestServerId = ieContext.getSlowestServer();
      if (!isRemoteDSConnected(slowestServerId))
      if (isRemoteDSConnected(slowestServerId)==null)
      {
        ieContext.setException(new DirectoryException(ResultCode.OTHER,
            ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(
@@ -2491,12 +2516,14 @@
  {
    boolean allset = true;
    for (int i = 0; i< 10; i++)
    for (int i = 0; i< 50; i++)
    {
      allset = true;
      for (RSInfo rsInfo : getRsList())
      {
        if (rsInfo.getGenerationId() != generationID)
        // the 'empty' RSes (generationId==-1) are considered as good citizens
        if ((rsInfo.getGenerationId() != -1) &&
            (rsInfo.getGenerationId() != generationID))
        {
          try
          {
@@ -2513,7 +2540,6 @@
        break;
      }
    }
    if (!allset)
    {
      ResultCode resultCode = ResultCode.OTHER;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -162,13 +162,15 @@
      domain1.setGenerationID(2);
      domain1.resetReplicationLog();
      Thread.sleep(500);
      replServers = domain1.getRsList();
      for (RSInfo replServerInfo : replServers)
      {
        // The generation Id of the remote should now be 2
        assertTrue(replServerInfo.getGenerationId() == 2);
        assertEquals(replServerInfo.getGenerationId(), 2,
            "Unexpected value of generationId in RSInfo for RS="
            + replServerInfo.toString());
      }
      int sleepTime = 50;