mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.58.2007 b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -30,6 +30,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
@@ -39,6 +40,7 @@
import org.opends.server.replication.protocol.DoneMessage;
import org.opends.server.replication.protocol.EntryMessage;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.InitializeTargetMessage;
import org.opends.server.replication.protocol.ProtocolSession;
@@ -76,7 +78,7 @@
   * Constructor for the LDAP server reader part of the replicationServer.
   *
   * @param session The ProtocolSession from which to read the data.
   * @param serverId The server ID of the server from which we read changes.
   * @param serverId The server ID of the server from which we read messages.
   * @param handler The server handler for this server reader.
   * @param replicationCache The ReplicationCache for this server reader.
   */
@@ -97,14 +99,11 @@
  {
    if (debugEnabled())
    {
      if (handler.isReplicationServer())
      {
        TRACER.debugInfo("Replication server reader starting " + serverId);
      }
      else
      {
        TRACER.debugInfo("LDAP server reader starting " + serverId);
      }
      TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" RS ":" LS")+
          " reader starting for serverId=" + serverId);
    }
    /*
     * wait on input stream
@@ -116,6 +115,25 @@
      {
        ReplicationMessage msg = session.receive();
        if (debugEnabled())
        {
          if (handler.isReplicationServer())
          {
            TRACER.debugInfo(
                "In RS <" + replicationCache.getReplicationServer().
                getMonitorInstanceName() +
                "> from RS server with serverId=" + serverId +
                " receives " + msg);
          }
          else
          {
            TRACER.debugInfo(
                "In RS <" + replicationCache.getReplicationServer().
                getMonitorInstanceName() +
                "> from LDAP server with serverId=" + serverId +
                " receives " + msg);
          }
        }
        if (msg instanceof AckMessage)
        {
          AckMessage ack = (AckMessage) msg;
@@ -124,9 +142,22 @@
        }
        else if (msg instanceof UpdateMessage)
        {
          UpdateMessage update = (UpdateMessage) msg;
          handler.decAndCheckWindow();
          replicationCache.put(update, handler);
          // Ignore update received from a replica with
          // a bad generation ID
          long referenceGenerationId = replicationCache.getGenerationId();
          if ((referenceGenerationId>0) &&
              (referenceGenerationId != handler.getGenerationId()))
          {
            logError(ERR_IGNORING_UPDATE_FROM.get(
                msg.toString(),
                handler.getMonitorInstanceName()));
          }
          else
          {
            UpdateMessage update = (UpdateMessage) msg;
            handler.decAndCheckWindow();
            replicationCache.put(update, handler);
          }
        }
        else if (msg instanceof WindowMessage)
        {
@@ -159,6 +190,11 @@
          ErrorMessage errorMsg = (ErrorMessage) msg;
          handler.process(errorMsg);
        }
        else if (msg instanceof ResetGenerationId)
        {
          ResetGenerationId genIdMsg = (ResetGenerationId) msg;
          replicationCache.resetGenerationId(this.handler);
        }
        else if (msg instanceof WindowProbe)
        {
          WindowProbe windowProbeMsg = (WindowProbe) msg;
@@ -168,6 +204,52 @@
        {
          ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
          handler.setReplServerInfo(infoMsg);
          if (debugEnabled())
          {
            if (handler.isReplicationServer())
              TRACER.debugInfo(
               "In RS " + replicationCache.getReplicationServer().
               getServerId() +
               " Receiving replServerInfo from " + handler.getServerId() +
               " baseDn=" + replicationCache.getBaseDn() +
               " genId=" + infoMsg.getGenerationId());
          }
          if (replicationCache.getGenerationId()<0)
          {
            // Here is the case where a ReplicationServer receives from
            // another ReplicationServer the generationId for a domain
            // for which the generation ID has never been set.
            replicationCache.setGenerationId(infoMsg.getGenerationId(), false);
          }
          else
          {
            if (infoMsg.getGenerationId()<0)
            {
              // Here is the case where another ReplicationServer
              // signals that it has no generationId set for the domain.
              // If we have generationId set locally and no server currently
              // connected for that domain in the topology then we may also
              // reset the generationId localy.
              replicationCache.mayResetGenerationId();
            }
            if (replicationCache.getGenerationId() != infoMsg.getGenerationId())
            {
              Message message = NOTE_BAD_GENERATION_ID.get(
                  replicationCache.getBaseDn().toNormalizedString(),
                  Short.toString(handler.getServerId()),
                  Long.toString(infoMsg.getGenerationId()),
                  Long.toString(replicationCache.getGenerationId()));
              ErrorMessage errorMsg = new ErrorMessage(
                  replicationCache.getReplicationServer().getServerId(),
                  handler.getServerId(),
                  message);
              session.publish(errorMsg);
            }
          }
        }
        else if (msg == null)
        {
@@ -187,21 +269,40 @@
       * Log a message and exit from this loop
       * So that this handler is stopped.
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " reader IO EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e) + e.getLocalizedMessage() +
          e.getCause());
      Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
      logError(message);
    } catch (ClassNotFoundException e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " reader CNF EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the conenction.
       * close the connection.
       */
      Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
      logError(message);
    } catch (Exception e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " server reader EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the conenction.
       * close the connection.
       */
      Message message = NOTE_READER_EXCEPTION.get(handler.toString());
      logError(message);
@@ -213,6 +314,12 @@
       * happen.
       * Attempt to close the socket and stop the server handler.
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " reader CLOSE serverID=" + serverId
          + stackTraceToSingleLineString(new Exception()));
      try
      {
        session.close();
@@ -223,15 +330,11 @@
      replicationCache.stopServer(handler);
    }
    if (debugEnabled())
    {
      if (handler.isReplicationServer())
      {
        TRACER.debugInfo("Replication server reader stopping " + serverId);
      }
      else
      {
        TRACER.debugInfo("LDAP server reader stopping " + serverId);
      }
    }
      TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?"RS":"LDAP") +
          " server reader stopped for serverID=" + serverId
          + stackTraceToSingleLineString(new Exception()));
  }
}