mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
14.22.2013 e55bf44a419dc71f5a5afea590095a6b8064310a
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -29,7 +29,6 @@
import java.io.IOException;
import java.util.*;
import java.util.zip.DataFormatException;
import org.opends.messages.Message;
import org.opends.server.replication.common.*;
@@ -39,6 +38,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachine.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
@@ -92,76 +92,10 @@
   */
  public void changeStatusForResetGenId(long newGenId) throws IOException
  {
    final int localRsServerId = replicationServer.getServerId();
    StatusMachineEvent event;
    if (newGenId == -1)
    StatusMachineEvent event = getStatusMachineEvent(newGenId);
    if (event == null)
    {
      // The generation id is being made invalid, let's put the DS
      // into BAD_GEN_ID_STATUS
      event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
    } else
    {
      if (newGenId == generationId)
      {
        if (status == ServerStatus.BAD_GEN_ID_STATUS)
        {
          // This server has the good new reference generation id.
          // Close connection with him to force his reconnection: DS will
          // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
          if (debugEnabled())
          {
            TRACER.debugInfo(
                "In RS " + localRsServerId +
                ", closing connection to DS " + getServerId() +
                " for baseDn " + getBaseDN() +
                " to force reconnection as new local" +
                " generationId and remote one match and DS is in bad gen id: " +
                newGenId);
          }
          // Connection closure must not be done calling RSD.stopHandler() as it
          // would rewait the RSD lock that we already must have entering this
          // method. This would lead to a reentrant lock which we do not want.
          // So simply close the session, this will make the hang up appear
          // after the reader thread that took the RSD lock releases it.
          if (session != null
              // V4 protocol introduced a StopMsg to properly close the
              // connection between servers
             && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
          {
            try
            {
              session.publish(new StopMsg());
            }
            catch (IOException ioe)
            {
              // Anyway, going to close session, so nothing to do
            }
          }
          // NOT_CONNECTED_STATUS is the last one in RS session life: handler
          // will soon disappear after this method call...
          status = ServerStatus.NOT_CONNECTED_STATUS;
          return;
        } else
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In RS " + localRsServerId + ". DS "
                + getServerId() + " for baseDn " + getBaseDN()
                + " has already generation id " + newGenId
                + " so no ChangeStatusMsg sent to him.");
          }
          return;
        }
      } else
      {
        // This server has a bad generation id compared to new reference one,
        // let's put it into BAD_GEN_ID_STATUS
        event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
      }
      return;
    }
    if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT
@@ -170,7 +104,7 @@
      // Prevent useless error message (full update status cannot lead to bad
      // gen status)
      Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
              Integer.toString(localRsServerId),
              Integer.toString(replicationServer.getServerId()),
              getBaseDN(),
              Integer.toString(serverId),
              Long.toString(generationId),
@@ -179,30 +113,73 @@
      return;
    }
    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
    changeStatus(event, "for reset gen id");
  }
    if (newStatus == ServerStatus.INVALID_STATUS)
  private StatusMachineEvent getStatusMachineEvent(long newGenId)
  {
    if (newGenId == -1)
    {
      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(getBaseDN(),
          Integer.toString(serverId), status.toString(), event.toString());
      logError(msg);
      return;
      // The generation id is being made invalid, let's put the DS
      // into BAD_GEN_ID_STATUS
      return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
    }
    if (newGenId != generationId)
    {
      // This server has a bad generation id compared to new reference one,
      // let's put it into BAD_GEN_ID_STATUS
      return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
    }
    // Send message requesting to change the DS status
    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
        ServerStatus.INVALID_STATUS);
    if (status != ServerStatus.BAD_GEN_ID_STATUS)
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("In RS " + replicationServer.getServerId()
            + ", DS " + getServerId() + " for baseDn " + getBaseDN()
            + " has already generation id " + newGenId
            + " so no ChangeStatusMsg sent to him.");
      }
      return null;
    }
    // This server has the good new reference generation id.
    // Close connection with him to force his reconnection: DS will
    // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
    if (debugEnabled())
    {
      TRACER.debugInfo("In RS " + localRsServerId
          + " Sending change status for reset gen id to " + getServerId()
          + " for baseDn " + getBaseDN() + ":\n" + csMsg);
      TRACER.debugInfo("In RS " + replicationServer.getServerId()
          + ", closing connection to DS " + getServerId() + " for baseDn "
          + getBaseDN() + " to force reconnection as new local"
          + " generationId and remote one match and DS is in bad gen id: "
          + newGenId);
    }
    session.publish(csMsg);
    // Connection closure must not be done calling RSD.stopHandler() as it
    // would rewait the RSD lock that we already must have entering this
    // method. This would lead to a reentrant lock which we do not want.
    // So simply close the session, this will make the hang up appear
    // after the reader thread that took the RSD lock releases it.
    if (session != null
        // V4 protocol introduced a StopMsg to properly close the
        // connection between servers
        && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      try
      {
        session.publish(new StopMsg());
      }
      catch (IOException ioe)
      {
        // Anyway, going to close session, so nothing to do
      }
    }
    status = newStatus;
    // NOT_CONNECTED_STATUS is the last one in RS session life: handler
    // will soon disappear after this method call...
    status = ServerStatus.NOT_CONNECTED_STATUS;
    return null;
  }
  /**
@@ -215,6 +192,12 @@
  public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event)
  throws IOException
  {
    return changeStatus(event, "from status analyzer");
  }
  private ServerStatus changeStatus(StatusMachineEvent event, String origin)
      throws IOException
  {
    // Check state machine allows this new status (Sanity check)
    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
    if (newStatus == ServerStatus.INVALID_STATUS)
@@ -222,22 +205,20 @@
      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(getBaseDN(),
          Integer.toString(serverId), status.toString(), event.toString());
      logError(msg);
      // Status analyzer must only change from NORMAL_STATUS to DEGRADED_STATUS
      // and vice versa. We may are being trying to change the status while for
      // instance another status has just been entered: e.g a full update has
      // just been engaged. In that case, just ignore attempt to change the
      // status
      // Only change allowed is from NORMAL_STATUS to DEGRADED_STATUS and vice
      // versa. We may be trying to change the status while another status has
      // just been entered: e.g a full update has just been engaged.
      // In that case, just ignore attempt to change the status
      return newStatus;
    }
    // Send message requesting to change the DS status
    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
        ServerStatus.INVALID_STATUS);
    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, INVALID_STATUS);
    if (debugEnabled())
    {
      TRACER.debugInfo("In RS " + replicationServer.getServerId()
          + " Sending change status from status analyzer to " + getServerId()
          + " Sending change status " + origin + " to " + getServerId()
          + " for baseDn " + getBaseDN() + ":\n" + csMsg);
    }
@@ -589,7 +570,7 @@
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServer.getDegradedStatusThreshold(),
          replicationServer.getWeight(),
          replicationServerDomain.getConnectedLDAPservers().size());
          replicationServerDomain.getConnectedDSs().size());
    }
    send(startMsg);
@@ -626,16 +607,10 @@
   * receiving a StopMsg to properly stop the handshake procedure.
   * @return the startSessionMsg received or null DS sent a stop message to
   *         not finish the handshake.
   * @throws DirectoryException
   * @throws IOException
   * @throws ClassNotFoundException
   * @throws DataFormatException
   * @throws NotSupportedOldVersionPDUException
   * @throws Exception
   */
  private StartSessionMsg waitAndProcessStartSessionFromRemoteDS()
  throws DirectoryException, IOException, ClassNotFoundException,
  DataFormatException,
  NotSupportedOldVersionPDUException
      throws Exception
  {
    ReplicationMsg msg = session.receive();