mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
14.30.2013 0a51f5fbeb5e99c52f1be8973ae656de34fab75f
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -27,35 +27,21 @@
 */
package org.opends.server.replication.server;
import java.io.IOException;
import java.util.*;
import java.util.zip.DataFormatException;
import org.opends.messages.Message;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.StatusMachine.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.DataFormatException;
import org.opends.messages.Message;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
/**
 * This class defines a server handler, which handles all interaction with a
 * peer server (RS or DS).
@@ -104,11 +90,11 @@
   * @param newGenId The new generation id to take into account
   * @throws IOException If IO error occurred.
   */
  public void changeStatusForResetGenId(long newGenId)
  throws IOException
  public void changeStatusForResetGenId(long newGenId) throws IOException
  {
    StatusMachineEvent event;
    final int localRsServerId = replicationServer.getServerId();
    StatusMachineEvent event;
    if (newGenId == -1)
    {
      // The generation id is being made invalid, let's put the DS
@@ -127,9 +113,8 @@
          if (debugEnabled())
          {
            TRACER.debugInfo(
                "In RS " +
                replicationServerDomain.getReplicationServer().getServerId() +
                ". Closing connection to DS " + getServerId() +
                "In RS " + localRsServerId +
                ", closing connection to DS " + getServerId() +
                " for baseDn " + getBaseDN() +
                " to force reconnection as new local" +
                " generationId and remote one match and DS is in bad gen id: " +
@@ -140,20 +125,19 @@
          // would rewait the RSD lock that we already must have entering this
          // method. This would lead to a reentrant lock which we do not want.
          // So simply close the session, this will make the hang up appear
          // after the reader thread that took the RSD lock realeases it.
          if (session != null)
          // after the reader thread that took the RSD lock releases it.
          if (session != null
              // V4 protocol introduced a StopMsg to properly close the
              // connection between servers
             && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
          {
            // V4 protocol introduces a StopMsg to properly close the
            // connection between servers
            if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
            try
            {
              try
              {
                session.publish(new StopMsg());
              } catch (IOException ioe)
              {
                // Anyway, going to close session, so nothing to do
              }
              session.publish(new StopMsg());
            }
            catch (IOException ioe)
            {
              // Anyway, going to close session, so nothing to do
            }
          }
@@ -165,12 +149,10 @@
        {
          if (debugEnabled())
          {
            TRACER.debugInfo(
                "In RS " +
                replicationServerDomain.getReplicationServer().getServerId() +
                ". DS " + getServerId() + " for baseDn " + getBaseDN() +
                " has already generation id " + newGenId +
            " so no ChangeStatusMsg sent to him.");
            TRACER.debugInfo("In RS " + localRsServerId + ". DS "
                + getServerId() + " for baseDn " + getBaseDN()
                + " has already generation id " + newGenId
                + " so no ChangeStatusMsg sent to him.");
          }
          return;
        }
@@ -182,14 +164,13 @@
      }
    }
    if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) &&
        (status == ServerStatus.FULL_UPDATE_STATUS))
    if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT
        && status == ServerStatus.FULL_UPDATE_STATUS)
    {
      // Prevent useless error message (full update status cannot lead to bad
      // gen status)
      Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
          Integer.toString(replicationServerDomain.
              getReplicationServer().getServerId()),
              Integer.toString(localRsServerId),
              getBaseDN(),
              Integer.toString(serverId),
              Long.toString(generationId),
@@ -214,11 +195,9 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "In RS " +
          replicationServerDomain.getReplicationServer().getServerId() +
          " Sending change status for reset gen id to " + getServerId() +
          " for baseDn " + getBaseDN() + ":\n" + csMsg);
      TRACER.debugInfo("In RS " + localRsServerId
          + " Sending change status for reset gen id to " + getServerId()
          + " for baseDn " + getBaseDN() + ":\n" + csMsg);
    }
    session.publish(csMsg);
@@ -257,11 +236,9 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "In RS " +
          replicationServerDomain.getReplicationServer().getServerId() +
          " Sending change status from status analyzer to " + getServerId() +
          " for baseDn " + getBaseDN() + ":\n" + csMsg);
      TRACER.debugInfo("In RS " + replicationServer.getServerId()
          + " Sending change status from status analyzer to " + getServerId()
          + " for baseDn " + getBaseDN() + ":\n" + csMsg);
    }
    session.publish(csMsg);
@@ -296,14 +273,13 @@
    // Add the specific DS ones
    attributes.add(Attributes.create("replica", serverURL));
    attributes.add(Attributes.create("connected-to",
        this.replicationServerDomain.getReplicationServer()
        .getMonitorInstanceName()));
        this.replicationServer.getMonitorInstanceName()));
    MonitorData md = replicationServerDomain.getDomainMonitorData();
    // Oldest missing update
    Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
    if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
    long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
    if (approxFirstMissingDate > 0)
    {
      Date date = new Date(approxFirstMissingDate);
      attributes.add(Attributes.create(
@@ -314,14 +290,12 @@
    }
    // Missing changes
    long missingChanges = md.getMissingChanges(serverId);
    attributes.add(Attributes.create("missing-changes", String
        .valueOf(missingChanges)));
    attributes.add(Attributes.create("missing-changes",
        String.valueOf(md.getMissingChanges(serverId))));
    // Replication delay
    long delay = md.getApproxDelay(serverId);
    attributes.add(Attributes.create("approximate-delay", String
        .valueOf(delay)));
    attributes.add(Attributes.create("approximate-delay",
        String.valueOf(md.getApproxDelay(serverId))));
    /* get the Server State */
    AttributeBuilder builder = new AttributeBuilder("server-state");
@@ -541,18 +515,9 @@
      {
        Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
          Integer.toString(inServerStartMsg.getServerId()),
          Integer.toString(replicationServerDomain.getReplicationServer().
          getServerId()));
          Integer.toString(replicationServer.getServerId()));
        throw new DirectoryException(ResultCode.OTHER, errMessage);
      }
      catch (NotSupportedOldVersionPDUException e)
      {
        // We do not need to support DS V1 connection, we just accept RS V1
        // connection:
        // We just trash the message, log the event for debug purpose and close
        // the connection
        throw new DirectoryException(ResultCode.OTHER, null, null);
      }
      catch (Exception e)
      {
        // We do not need to support DS V1 connection, we just accept RS V1
@@ -588,7 +553,7 @@
    }
    finally
    {
      if ((replicationServerDomain != null) &&
      if (replicationServerDomain != null &&
          replicationServerDomain.hasLock())
        replicationServerDomain.release();
    }
@@ -610,21 +575,20 @@
    {
      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
      startMsg = new ReplServerStartMsg(getReplicationServerId(),
              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServerDomain.getReplicationServer()
              .getDegradedStatusThreshold());
          replicationServer.getDegradedStatusThreshold());
    }
    else
    {
      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
      startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServerDomain.getReplicationServer()
              .getDegradedStatusThreshold(), replicationServer.getWeight(),
          replicationServer.getDegradedStatusThreshold(),
          replicationServer.getWeight(),
          replicationServerDomain.getConnectedLDAPservers().size());
    }
@@ -651,17 +615,10 @@
  {
    if (serverId != 0)
    {
      StringBuilder builder = new StringBuilder("Replica DS(");
      builder.append(serverId);
      builder.append(") for domain \"");
      builder.append(replicationServerDomain.getBaseDn());
      builder.append("\"");
      return builder.toString();
      return "Replica DS(" + serverId + ") for domain \""
          + replicationServerDomain.getBaseDn() + "\"";
    }
    else
    {
      return "Unknown server";
    }
    return "Unknown server";
  }
  /**
@@ -740,7 +697,7 @@
    else
    {
      // We are an empty ReplicationServer
      if ((generationId > 0) && (!getServerState().isEmpty()))
      if (generationId > 0 && !getServerState().isEmpty())
      {
        // If the LDAP server has already sent changes
        // it is not expected to connect to an empty RS