mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
05.31.2007 17beeae33bb7d73dee3f1a4f9bdf18e5645717d7
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -72,7 +72,7 @@
  private short serverId;
  private ProtocolSession session;
  private ServerHandler handler;
  private ReplicationCache replicationCache;
  private ReplicationServerDomain replicationServerDomain;
  /**
   * Constructor for the LDAP server reader part of the replicationServer.
@@ -80,16 +80,18 @@
   * @param session The ProtocolSession from which to read the data.
   * @param serverId The server ID of the server from which we read messages.
   * @param handler The server handler for this server reader.
   * @param replicationCache The ReplicationCache for this server reader.
   * @param replicationServerDomain The ReplicationServerDomain for this server
   *        reader.
   */
  public ServerReader(ProtocolSession session, short serverId,
                      ServerHandler handler, ReplicationCache replicationCache)
                      ServerHandler handler,
                      ReplicationServerDomain replicationServerDomain)
  {
    super(handler.toString() + " reader");
    this.session = session;
    this.serverId = serverId;
    this.handler = handler;
    this.replicationCache = replicationCache;
    this.replicationServerDomain = replicationServerDomain;
  }
  /**
@@ -100,14 +102,15 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "In RS " + replicationCache.getReplicationServer().
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" RS ":" LS")+
          " reader starting for serverId=" + serverId);
    }
    /*
     * wait on input stream
     * grab all incoming messages and publish them to the replicationCache
     * grab all incoming messages and publish them to the
     * replicationServerDomain
     */
    try
    {
@@ -118,7 +121,7 @@
        if (debugEnabled())
        {
          TRACER.debugInfo(
              "In RS " + replicationCache.getReplicationServer().
              "In RS " + replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() +
              (handler.isReplicationServer()?" From RS ":" From LS")+
              " with serverId=" + serverId + " receives " + msg);
@@ -127,13 +130,14 @@
        {
          AckMessage ack = (AckMessage) msg;
          handler.checkWindow();
          replicationCache.ack(ack, serverId);
          replicationServerDomain.ack(ack, serverId);
        }
        else if (msg instanceof UpdateMessage)
        {
          // Ignore update received from a replica with
          // a bad generation ID
          long referenceGenerationId = replicationCache.getGenerationId();
          long referenceGenerationId =
                  replicationServerDomain.getGenerationId();
          if ((referenceGenerationId>0) &&
              (referenceGenerationId != handler.getGenerationId()))
          {
@@ -145,7 +149,7 @@
          {
            UpdateMessage update = (UpdateMessage) msg;
            handler.decAndCheckWindow();
            replicationCache.put(update, handler);
            replicationServerDomain.put(update, handler);
          }
        }
        else if (msg instanceof WindowMessage)
@@ -182,7 +186,7 @@
        else if (msg instanceof ResetGenerationId)
        {
          ResetGenerationId genIdMsg = (ResetGenerationId) msg;
          replicationCache.resetGenerationId(this.handler, genIdMsg);
          replicationServerDomain.resetGenerationId(this.handler, genIdMsg);
        }
        else if (msg instanceof WindowProbe)
        {
@@ -198,19 +202,20 @@
          {
            if (handler.isReplicationServer())
              TRACER.debugInfo(
               "In RS " + replicationCache.getReplicationServer().
               "In RS " + replicationServerDomain.getReplicationServer().
               getServerId() +
               " Receiving replServerInfo from " + handler.getServerId() +
               " baseDn=" + replicationCache.getBaseDn() +
               " baseDn=" + replicationServerDomain.getBaseDn() +
               " genId=" + infoMsg.getGenerationId());
          }
          if (replicationCache.getGenerationId()<0)
          if (replicationServerDomain.getGenerationId()<0)
          {
            // Here is the case where a ReplicationServer receives from
            // another ReplicationServer the generationId for a domain
            // for which the generation ID has never been set.
            replicationCache.setGenerationId(infoMsg.getGenerationId(), false);
            replicationServerDomain.
                    setGenerationId(infoMsg.getGenerationId(),false);
          }
          else
          {
@@ -221,19 +226,20 @@
              // If we have generationId set locally and no server currently
              // connected for that domain in the topology then we may also
              // reset the generationId localy.
              replicationCache.mayResetGenerationId();
              replicationServerDomain.mayResetGenerationId();
            }
            if (replicationCache.getGenerationId() != infoMsg.getGenerationId())
            if (replicationServerDomain.getGenerationId() !=
                    infoMsg.getGenerationId())
            {
              Message message = NOTE_BAD_GENERATION_ID.get(
                  replicationCache.getBaseDn().toNormalizedString(),
                  replicationServerDomain.getBaseDn().toNormalizedString(),
                  Short.toString(handler.getServerId()),
                  Long.toString(infoMsg.getGenerationId()),
                  Long.toString(replicationCache.getGenerationId()));
                  Long.toString(replicationServerDomain.getGenerationId()));
              ErrorMessage errorMsg = new ErrorMessage(
                  replicationCache.getReplicationServer().getServerId(),
                  replicationServerDomain.getReplicationServer().getServerId(),
                  handler.getServerId(),
                  message);
              session.publish(errorMsg);
@@ -260,7 +266,7 @@
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS " + replicationCache.getReplicationServer().
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " reader IO EXCEPTION for serverID=" + serverId
          + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
@@ -270,7 +276,7 @@
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS <" + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " reader CNF EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e));
@@ -284,7 +290,7 @@
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS <" + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " server reader EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e));
@@ -304,7 +310,7 @@
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS " + replicationCache.getReplicationServer().
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " server reader for serverID=" + serverId +
          " is closing the session");
@@ -315,11 +321,11 @@
      {
       // ignore
      }
      replicationCache.stopServer(handler);
      replicationServerDomain.stopServer(handler);
    }
    if (debugEnabled())
      TRACER.debugInfo(
          "In RS " + replicationCache.getReplicationServer().
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" RS":" LDAP") +
          " server reader stopped for serverID=" + serverId);