mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
26.49.2009 1284f3f2c26a3308d2fe84740d7ab0a9c20cd71a
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1142,20 +1142,31 @@
    {
      try
      {
        // Acquire lock on domain (see more details in comment of start()
        // method of ServerHandler)
        lock();
      } catch (InterruptedException ex)
      {
        // Try doing job anyway...
        try
        {
          // Acquire lock on domain (see more details in comment of start()
          // method of ServerHandler)
          lock();
        } catch (InterruptedException ex)
        {
          // Try doing job anyway...
        }
        if (otherHandlers.contains(handler))
        {
          unRegisterHandler(handler);
          handler.shutdown();
        }
      }
      if (otherHandlers.contains(handler))
      catch(Exception e)
      {
        unRegisterHandler(handler);
        handler.shutdown();
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
            stackTraceToSingleLineString(e)));
      }
      finally
      {
        release();
      }
    }
    release();
  }
  /**
@@ -2025,86 +2036,96 @@
  {
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this +
        " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId() +
        " for baseDn " + baseDn + ":\n" + genIdMsg);
          "In " + this +
          " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId()+
          " for baseDn " + baseDn + ":\n" + genIdMsg);
    try
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    } catch (InterruptedException ex)
    {
      // Try doing job anyway...
    }
    long newGenId = genIdMsg.getGenerationId();
    if (newGenId != this.generationId)
    {
      changeGenerationId(newGenId, false);
    }
    else
    {
      // Order to take a gen id we already have, just ignore
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this
          + " Reset generation id requested for baseDn " + baseDn
          + " but generation id was already " + this.generationId
          + ":\n" + genIdMsg);
      }
    // If we are the first replication server warned,
    // then forwards the reset message to the remote replication servers
    for (ServerHandler rsHandler : replicationServers.values())
    {
      try
      {
        // After we'll have sent the message , the remote RS will adopt
        // the new genId
        rsHandler.setGenerationId(newGenId);
        if (senderHandler.isDataServer())
        // Acquire lock on domain (see more details in comment of start() method
        // of ServerHandler)
        lock();
      } catch (InterruptedException ex)
      {
        // Try doing job anyway...
      }
      long newGenId = genIdMsg.getGenerationId();
      if (newGenId != this.generationId)
      {
        changeGenerationId(newGenId, false);
      }
      else
      {
        // Order to take a gen id we already have, just ignore
        if (debugEnabled())
          TRACER.debugInfo(
              "In " + this
              + " Reset generation id requested for baseDn " + baseDn
              + " but generation id was already " + this.generationId
              + ":\n" + genIdMsg);
      }
      // If we are the first replication server warned,
      // then forwards the reset message to the remote replication servers
      for (ServerHandler rsHandler : replicationServers.values())
      {
        try
        {
          rsHandler.send(genIdMsg);
          // After we'll have sent the message , the remote RS will adopt
          // the new genId
          rsHandler.setGenerationId(newGenId);
          if (senderHandler.isDataServer())
          {
            rsHandler.send(genIdMsg);
          }
        } catch (IOException e)
        {
          logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn.toString(),
              e.getMessage()));
        }
      } catch (IOException e)
      {
        logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn.toString(),
          e.getMessage()));
      }
    }
    // Change status of the connected DSs according to the requested new
    // reference generation id
    for (DataServerHandler dsHandler : directoryServers.values())
      // Change status of the connected DSs according to the requested new
      // reference generation id
      for (DataServerHandler dsHandler : directoryServers.values())
      {
        try
        {
          dsHandler.changeStatusForResetGenId(newGenId);
        } catch (IOException e)
        {
          logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn.
              toString(),
              Integer.toString(dsHandler.getServerId()),
              e.getMessage()));
        }
      }
      // Update every peers (RS/DS) with potential topology changes (status
      // change). Rather than doing that each time a DS has a status change
      // (consecutive to reset gen id message), we prefer advertising once for
      // all after changes (less packet sent), here at the end of the reset msg
      // treatment.
      buildAndSendTopoInfoToDSs(null);
      buildAndSendTopoInfoToRSs();
      Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString(),
          Long.toString(newGenId));
      logError(message);
    }
    catch(Exception e)
    {
      try
      {
        dsHandler.changeStatusForResetGenId(newGenId);
      } catch (IOException e)
      {
        logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn.
          toString(),
          Integer.toString(dsHandler.getServerId()),
          e.getMessage()));
      }
      logError(Message.raw(Category.SYNC, Severity.NOTICE,
          stackTraceToSingleLineString(e)));
    }
    // Update every peers (RS/DS) with potential topology changes (status
    // change). Rather than doing that each time a DS has a status change
    // (consecutive to reset gen id message), we prefer advertising once for
    // all after changes (less packet sent), here at the end of the reset msg
    // treatment.
    buildAndSendTopoInfoToDSs(null);
    buildAndSendTopoInfoToRSs();
    Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString(),
      Long.toString(newGenId));
    logError(message);
    release();
    finally
    {
      release();
    }
  }
  /**
@@ -2119,41 +2140,51 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(
        "In RS " + getReplicationServer().getServerId() +
        " Receiving ChangeStatusMsg from " + senderHandler.getServerId() +
        " for baseDn " + baseDn + ":\n" + csMsg);
          "In RS " + getReplicationServer().getServerId() +
          " Receiving ChangeStatusMsg from " + senderHandler.getServerId() +
          " for baseDn " + baseDn + ":\n" + csMsg);
    }
    try
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    } catch (InterruptedException ex)
    {
      // Try doing job anyway...
    }
      try
      {
        // Acquire lock on domain (see more details in comment of start() method
        // of ServerHandler)
        lock();
      } catch (InterruptedException ex)
      {
        // Try doing job anyway...
      }
    ServerStatus newStatus = senderHandler.processNewStatus(csMsg);
    if (newStatus == ServerStatus.INVALID_STATUS)
      ServerStatus newStatus = senderHandler.processNewStatus(csMsg);
      if (newStatus == ServerStatus.INVALID_STATUS)
      {
        // Already logged an error in processNewStatus()
        // just return not to forward a bad status to topology
        return;
      }
      // Update every peers (RS/DS) with topology changes
      buildAndSendTopoInfoToDSs(senderHandler);
      buildAndSendTopoInfoToRSs();
      Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
          Integer.toString(senderHandler.getServerId()),
          baseDn.toString(),
          newStatus.toString());
      logError(message);
    }
    catch(Exception e)
    {
      // Already logged an error in processNewStatus()
      // just return not to forward a bad status to topology
      logError(Message.raw(Category.SYNC, Severity.NOTICE,
          stackTraceToSingleLineString(e)));
    }
    finally
    {
      release();
      return;
    }
    // Update every peers (RS/DS) with topology changes
    buildAndSendTopoInfoToDSs(senderHandler);
    buildAndSendTopoInfoToRSs();
    Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
        Integer.toString(senderHandler.getServerId()),
      baseDn.toString(),
      newStatus.toString());
    logError(message);
    release();
  }
  /**
@@ -2168,6 +2199,8 @@
  {
    try
    {
    try
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
@@ -2210,7 +2243,6 @@
      (newStatus == oldStatus) )
    {
      // Change was impossible or already occurred (see StatusAnalyzer comments)
      release();
      return false;
    }
@@ -2218,7 +2250,16 @@
    buildAndSendTopoInfoToDSs(serverHandler);
    buildAndSendTopoInfoToRSs();
    release();
    }
    catch(Exception e)
    {
      logError(Message.raw(Category.SYNC, Severity.NOTICE,
          stackTraceToSingleLineString(e)));
    }
    finally
    {
      release();
    }
    return false;
  }
@@ -2338,45 +2379,56 @@
      // Try doing job anyway...
    }
    /*
     * Store DS connected to remote RS and update information about the peer RS
     */
    handler.processTopoInfoFromRS(topoMsg);
    /*
     * Handle generation id
     */
    if (allowResetGenId)
    try
    {
      // Check if generation id has to be resetted
      mayResetGenerationId();
      if (generationId < 0)
        generationId = handler.getGenerationId();
    }
      /*
       * Store DS connected to remote RS & update information about the peer RS
       */
      handler.processTopoInfoFromRS(topoMsg);
    if (generationId > 0 && (generationId != handler.getGenerationId()))
      /*
       * Handle generation id
       */
      if (allowResetGenId)
      {
        // Check if generation id has to be resetted
        mayResetGenerationId();
        if (generationId < 0)
          generationId = handler.getGenerationId();
      }
      if (generationId > 0 && (generationId != handler.getGenerationId()))
      {
        Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
            baseDn,
            Integer.toString(handler.getServerId()),
            Long.toString(handler.getGenerationId()),
            Long.toString(generationId));
        logError(message);
        ErrorMsg errorMsg = new ErrorMsg(
            getReplicationServer().getServerId(),
            handler.getServerId(),
            message);
        handler.sendError(errorMsg);
      }
      /*
       * Sends the currently known topology information to every connected
       * DS we have.
       */
      buildAndSendTopoInfoToDSs(null);
    }
    catch(Exception e)
    {
      Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
        baseDn,
        Integer.toString(handler.getServerId()),
        Long.toString(handler.getGenerationId()),
        Long.toString(generationId));
      logError(message);
      ErrorMsg errorMsg = new ErrorMsg(
        getReplicationServer().getServerId(),
        handler.getServerId(),
        message);
      handler.sendError(errorMsg);
      logError(Message.raw(Category.SYNC, Severity.NOTICE,
          stackTraceToSingleLineString(e)));
    }
    /*
     * Sends the currently known topology information to every connected
     * DS we have.
     */
    buildAndSendTopoInfoToDSs(null);
    release();
    finally
    {
      release();
    }
  }
  /* =======================
@@ -3048,6 +3100,50 @@
      // Consider this producer (DS/db).
      int sid = db.getServerId();
      // Should it be considered for eligibility ?
      ChangeNumber heartbeatLastDN =
        getChangeTimeHeartbeatState().getMaxChangeNumber(sid);
      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
      // then the domain is considered down and not considered for eligibility
      /*
      if ((heartbeatLastDN != null) &&
          (TimeThread.getTime()- heartbeatLastDN.getTime() > 5000))
      {
        if (debugEnabled())
          TRACER.debugInfo("In " + this.getName() +
            " Server " + sid
            + " is not considered for eligibility ... potentially down");
        continue;
      }
      */
      boolean sidConnected = false;
      if (directoryServers.containsKey(sid))
      {
        sidConnected = true;
      }
      else
      {
        // not directly connected
        for (ReplicationServerHandler rsh : replicationServers.values())
        {
          if (rsh.isRemoteLDAPServer(sid))
          {
            sidConnected = true;
            break;
          }
        }
      }
      if (!sidConnected)
      {
        if (debugEnabled())
          TRACER.debugInfo("In " + this.getName() +
            " Server " + sid
            + " is not considered for eligibility ... potentially down");
        continue;
      }
      ChangeNumber changelogLastCN = db.getLastChange();
      if (changelogLastCN != null)
      {
@@ -3057,9 +3153,6 @@
        }
      }
      ChangeNumber heartbeatLastDN =
        getChangeTimeHeartbeatState().getMaxChangeNumber(sid);
      if ((heartbeatLastDN != null) &&
       ((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN))))
      {