mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
14.22.2013 5d1c7d62d688c64af2627ceb8b1556ef313954ec
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -92,7 +92,7 @@
   * we are currently publishing the first update in the balanced tree is the
   * next change that we must push to this particular server.
   */
  private final Map<Integer, DataServerHandler> directoryServers =
  private final Map<Integer, DataServerHandler> connectedDSs =
    new ConcurrentHashMap<Integer, DataServerHandler>();
  /**
@@ -101,7 +101,7 @@
   * in the balanced tree is the next change that we must push to this
   * particular server.
   */
  private final Map<Integer, ReplicationServerHandler> replicationServers =
  private final Map<Integer, ReplicationServerHandler> connectedRSs =
    new ConcurrentHashMap<Integer, ReplicationServerHandler>();
  private final Queue<MessageHandler> otherHandlers =
@@ -358,12 +358,10 @@
     */
    NotAssuredUpdateMsg notAssuredUpdate = null;
    /*
     * Push the message to the replication servers
     */
    // Push the message to the replication servers
    if (sourceHandler.isDataServer())
    {
      for (ReplicationServerHandler handler : replicationServers.values())
      for (ReplicationServerHandler handler : connectedRSs.values())
      {
        /**
         * Ignore updates to RS with bad gen id
@@ -392,10 +390,8 @@
      }
    }
    /*
     * Push the message to the LDAP servers
     */
    for (DataServerHandler handler : directoryServers.values())
    // Push the message to the LDAP servers
    for (DataServerHandler handler : connectedDSs.values())
    {
      // Don't forward the change to the server that just sent it
      if (handler == sourceHandler)
@@ -576,7 +572,7 @@
      }
      // Look for DS eligible for assured
      for (DataServerHandler handler : directoryServers.values())
      for (DataServerHandler handler : connectedDSs.values())
      {
        // Don't forward the change to the server that just sent it
        if (handler == sourceHandler)
@@ -735,7 +731,7 @@
  private void collectRSsEligibleForAssuredReplication(byte groupId,
      List<Integer> expectedServers)
  {
    for (ReplicationServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : connectedRSs.values())
    {
      if (handler.getGroupId() == groupId
      // No ack expected from a RS with different group id
@@ -908,9 +904,8 @@
          List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers();
          for (Integer serverId : serversInTimeout)
          {
            ServerHandler expectedDSInTimeout = directoryServers.get(serverId);
            ServerHandler expectedRSInTimeout =
                replicationServers.get(serverId);
            ServerHandler expectedDSInTimeout = connectedDSs.get(serverId);
            ServerHandler expectedRSInTimeout = connectedRSs.get(serverId);
            if (expectedDSInTimeout != null)
            {
              if (safeRead)
@@ -950,7 +945,7 @@
   */
  public void stopReplicationServers(Collection<String> replServers)
  {
    for (ReplicationServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : connectedRSs.values())
    {
      if (replServers.contains(handler.getServerAddressURL()))
      {
@@ -968,13 +963,13 @@
  public void stopAllServers(boolean shutdown)
  {
    // Close session with other replication servers
    for (ReplicationServerHandler serverHandler : replicationServers.values())
    for (ReplicationServerHandler serverHandler : connectedRSs.values())
    {
      stopServer(serverHandler, shutdown);
    }
    // Close session with other LDAP servers
    for (DataServerHandler serverHandler : directoryServers.values())
    for (DataServerHandler serverHandler : connectedDSs.values())
    {
      stopServer(serverHandler, shutdown);
    }
@@ -988,12 +983,12 @@
   */
  public boolean checkForDuplicateDS(DataServerHandler handler)
  {
    if (directoryServers.containsKey(handler.getServerId()))
    if (connectedDSs.containsKey(handler.getServerId()))
    {
      // looks like two connected LDAP servers have the same serverId
      Message message = ERR_DUPLICATE_SERVER_ID.get(
          localReplicationServer.getMonitorInstanceName(),
          directoryServers.get(handler.getServerId()).toString(),
          connectedDSs.get(handler.getServerId()).toString(),
          handler.toString(), handler.getServerId());
      logError(message);
      return false;
@@ -1047,7 +1042,7 @@
      try
      {
        // Stop useless monitoring publisher if no more RS or DS in domain
        if ( (directoryServers.size() + replicationServers.size() )== 1)
        if ( (connectedDSs.size() + connectedRSs.size() )== 1)
        {
          if (debugEnabled())
          {
@@ -1062,7 +1057,7 @@
        if (handler.isReplicationServer())
        {
          if (replicationServers.containsKey(handler.getServerId()))
          if (connectedRSs.containsKey(handler.getServerId()))
          {
            unregisterServerHandler(handler);
            handler.shutdown();
@@ -1076,11 +1071,11 @@
              buildAndSendTopoInfoToDSs(null);
            }
          }
        } else if (directoryServers.containsKey(handler.getServerId()))
        } else if (connectedDSs.containsKey(handler.getServerId()))
        {
          // If this is the last DS for the domain,
          // shutdown the status analyzer
          if (directoryServers.size() == 1)
          if (connectedDSs.size() == 1)
          {
            if (debugEnabled())
            {
@@ -1193,11 +1188,11 @@
  {
    if (handler.isReplicationServer())
    {
      replicationServers.remove(handler.getServerId());
      connectedRSs.remove(handler.getServerId());
    }
    else
    {
      directoryServers.remove(handler.getServerId());
      connectedDSs.remove(handler.getServerId());
    }
  }
@@ -1224,9 +1219,9 @@
    // topology and the generationId has never been saved, then we can reset
    // it and the next LDAP server to connect will become the new reference.
    boolean lDAPServersConnectedInTheTopology = false;
    if (directoryServers.isEmpty())
    if (connectedDSs.isEmpty())
    {
      for (ReplicationServerHandler rsh : replicationServers.values())
      for (ReplicationServerHandler rsh : connectedRSs.values())
      {
        if (generationId != rsh.getGenerationId())
        {
@@ -1283,7 +1278,7 @@
  throws DirectoryException
  {
    ReplicationServerHandler oldHandler =
      replicationServers.get(handler.getServerId());
      connectedRSs.get(handler.getServerId());
    if (oldHandler != null)
    {
      if (oldHandler.getServerAddressURL().equals(
@@ -1339,7 +1334,7 @@
  public Set<String> getChangelogs()
  {
    Set<String> results = new LinkedHashSet<String>();
    for (ReplicationServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : connectedRSs.values())
    {
      results.add(handler.getServerAddressURL());
    }
@@ -1358,22 +1353,6 @@
  }
  /**
   * Returns as a set of String the list of LDAP servers connected to us.
   * Each string is the serverID of a connected LDAP server.
   *
   * @return The set of connected LDAP servers
   */
  public List<String> getConnectedLDAPservers()
  {
    List<String> results = new ArrayList<String>(0);
    for (DataServerHandler handler : directoryServers.values())
    {
      results.add(String.valueOf(handler.getServerId()));
    }
    return results;
  }
  /**
   * Creates and returns an iterator.
   * When the iterator is not used anymore, the caller MUST call the
   * ReplicationIterator.releaseCursor() method to free the resources
@@ -1493,7 +1472,7 @@
      {
        // Send to all replication servers with a least one remote
        // server connected
        for (ReplicationServerHandler rsh : replicationServers.values())
        for (ReplicationServerHandler rsh : connectedRSs.values())
        {
          if (rsh.hasRemoteLDAPServers())
          {
@@ -1503,7 +1482,7 @@
      }
      // Sends to all connected LDAP servers
      for (DataServerHandler destinationHandler : directoryServers.values())
      for (DataServerHandler destinationHandler : connectedDSs.values())
      {
        // Don't loop on the sender
        if (destinationHandler == senderHandler)
@@ -1516,7 +1495,7 @@
    {
      // Destination is one server
      DataServerHandler destinationHandler =
        directoryServers.get(msg.getDestination());
        connectedDSs.get(msg.getDestination());
      if (destinationHandler != null)
      {
        servers.add(destinationHandler);
@@ -1527,13 +1506,13 @@
        // have the targeted server connected.
        if (senderHandler.isDataServer())
        {
          for (ReplicationServerHandler h : replicationServers.values())
          for (ReplicationServerHandler rsHandler : connectedRSs.values())
          {
            // Send to all replication servers with a least one remote
            // server connected
            if (h.isRemoteLDAPServer(msg.getDestination()))
            if (rsHandler.isRemoteLDAPServer(msg.getDestination()))
            {
              servers.add(h);
              servers.add(rsHandler);
            }
          }
        }
@@ -1808,14 +1787,14 @@
      // from the states stored in the serverHandler.
      // - the server state
      // - the older missing change
      for (DataServerHandler lsh : this.directoryServers.values())
      for (DataServerHandler lsh : this.connectedDSs.values())
      {
        monitorMsg.setServerState(lsh.getServerId(),
            lsh.getServerState(), lsh.getApproxFirstMissingDate(), true);
      }
      // Same for the connected RS
      for (ReplicationServerHandler rsh : this.replicationServers.values())
      for (ReplicationServerHandler rsh : this.connectedRSs.values())
      {
        monitorMsg.setServerState(rsh.getServerId(),
            rsh.getServerState(), rsh.getApproxFirstMissingDate(), false);
@@ -1891,7 +1870,7 @@
   */
  public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne)
  {
    for (DataServerHandler handler : directoryServers.values())
    for (DataServerHandler handler : connectedDSs.values())
    {
      if (notThisOne == null || handler != notThisOne)
        // All except passed one
@@ -1932,7 +1911,7 @@
  public void buildAndSendTopoInfoToRSs()
  {
    TopologyMsg topoMsg = createTopologyMsgForRS();
    for (ReplicationServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : connectedRSs.values())
    {
      for (int i=1; i<=2; i++)
      {
@@ -1976,7 +1955,7 @@
    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
    // Go through every DSs
    for (DataServerHandler serverHandler : directoryServers.values())
    for (DataServerHandler serverHandler : connectedDSs.values())
    {
      dsInfos.add(serverHandler.toDSInfo());
    }
@@ -2001,7 +1980,7 @@
  {
    // Go through every DSs (except recipient of msg)
    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
    for (DataServerHandler serverHandler : directoryServers.values())
    for (DataServerHandler serverHandler : connectedDSs.values())
    {
      if (serverHandler.getServerId() == destDsId)
      {
@@ -2017,7 +1996,7 @@
    // Go through every peer RSs (and get their connected DSs), also add info
    // for RSs
    for (ReplicationServerHandler serverHandler : replicationServers.values())
    for (ReplicationServerHandler serverHandler : connectedRSs.values())
    {
      rsInfos.add(serverHandler.toRSInfo());
@@ -2150,7 +2129,7 @@
      // If we are the first replication server warned,
      // then forwards the reset message to the remote replication servers
      for (ServerHandler rsHandler : replicationServers.values())
      for (ServerHandler rsHandler : connectedRSs.values())
      {
        try
        {
@@ -2170,7 +2149,7 @@
      // Change status of the connected DSs according to the requested new
      // reference generation id
      for (DataServerHandler dsHandler : directoryServers.values())
      for (DataServerHandler dsHandler : connectedDSs.values())
      {
        try
        {
@@ -2362,8 +2341,7 @@
          // TODO: i18n
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
            e.getMessage() + " " +
            stackTraceToSingleLineString(e)));
              e.getMessage() + " " + stackTraceToSingleLineString(e)));
          logError(mb.toMessage());
        }
      }
@@ -2400,10 +2378,10 @@
          + " given local generation Id=" + this.generationId);
    }
    ServerHandler handler = replicationServers.get(serverId);
    ServerHandler handler = connectedRSs.get(serverId);
    if (handler == null)
    {
      handler = directoryServers.get(serverId);
      handler = connectedDSs.get(serverId);
      if (handler == null)
      {
        return false;
@@ -2549,7 +2527,7 @@
            initializePendingMonitorData();
            // Send the monitor requests to the connected replication servers.
            for (ReplicationServerHandler rs : replicationServers.values())
            for (ReplicationServerHandler rs : connectedRSs.values())
            {
              // Add server ID to pending table.
              int serverId = rs.getServerId();
@@ -2657,13 +2635,12 @@
    // So for a given DS connected we can take the state and the max from
    // the DS/state.
    for (ServerHandler ds : directoryServers.values())
    for (ServerHandler ds : connectedDSs.values())
    {
      int serverID = ds.getServerId();
      // the state comes from the state stored in the SH
      ServerState dsState = ds.getServerState()
          .duplicate();
      ServerState dsState = ds.getServerState().duplicate();
      // the max CN sent by that LS also comes from the SH
      ChangeNumber maxcn = dsState.getChangeNumber(serverID);
@@ -2725,46 +2702,44 @@
        pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
        // Store the remote LDAP servers states
        Iterator<Integer> lsidIterator = msg.ldapIterator();
        while (lsidIterator.hasNext())
        Iterator<Integer> dsServerIdIterator = msg.ldapIterator();
        while (dsServerIdIterator.hasNext())
        {
          int sid = lsidIterator.next();
          ServerState dsServerState = msg.getLDAPServerState(sid);
          int dsServerId = dsServerIdIterator.next();
          ServerState dsServerState = msg.getLDAPServerState(dsServerId);
          pendingMonitorData.setMaxCNs(dsServerState);
          pendingMonitorData.setLDAPServerState(sid, dsServerState);
          pendingMonitorData.setFirstMissingDate(sid,
              msg.getLDAPApproxFirstMissingDate(sid));
          pendingMonitorData.setLDAPServerState(dsServerId, dsServerState);
          pendingMonitorData.setFirstMissingDate(dsServerId,
              msg.getLDAPApproxFirstMissingDate(dsServerId));
        }
        // Process the latency reported by the remote RSi on its connections
        // to the other RSes
        Iterator<Integer> rsidIterator = msg.rsIterator();
        while (rsidIterator.hasNext())
        Iterator<Integer> rsServerIdIterator = msg.rsIterator();
        while (rsServerIdIterator.hasNext())
        {
          int rsid = rsidIterator.next();
          if (rsid == localReplicationServer.getServerId())
          int rsServerId = rsServerIdIterator.next();
          long newFmd = msg.getRSApproxFirstMissingDate(rsServerId);
          if (rsServerId == localReplicationServer.getServerId())
          {
            // this is the latency of the remote RSi regarding the current RS
            // let's update the fmd of my connected LS
            for (ServerHandler connectedlsh : directoryServers
                .values())
            for (DataServerHandler connectedDS : connectedDSs.values())
            {
              int connectedlsid = connectedlsh.getServerId();
              Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
              pendingMonitorData.setFirstMissingDate(connectedlsid, newfmd);
              int connectedServerId = connectedDS.getServerId();
              pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd);
            }
          }
          else
          {
            // this is the latency of the remote RSi regarding another RSj
            // let's update the latency of the LSes connected to RSj
            ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
            ReplicationServerHandler rsjHdr = connectedRSs.get(rsServerId);
            if (rsjHdr != null)
            {
              for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds())
              for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds())
              {
                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
                pendingMonitorData.setFirstMissingDate(remotelsid, newfmd);
                pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd);
              }
            }
          }
@@ -2773,8 +2748,8 @@
      catch (RuntimeException e)
      {
        // FIXME: do we really expect these???
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e
            .getMessage() + stackTraceToSingleLineString(e)));
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
            e.getMessage() + stackTraceToSingleLineString(e)));
      }
      finally
      {
@@ -2808,7 +2783,7 @@
   */
  public Map<Integer, DataServerHandler> getConnectedDSs()
  {
    return directoryServers;
    return connectedDSs;
  }
  /**
@@ -2817,7 +2792,7 @@
   */
  public Map<Integer, ReplicationServerHandler> getConnectedRSs()
  {
    return replicationServers;
    return connectedRSs;
  }
@@ -3131,9 +3106,8 @@
            result.update(mostRecentDbCN);
          }
        } catch (Exception e) {
          Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(
              " " + stackTraceToSingleLineString(e));
          logError(errMessage);
          logError(ERR_WRITER_UNEXPECTED_EXCEPTION
              .get(stackTraceToSingleLineString(e)));
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
@@ -3236,13 +3210,13 @@
  private boolean isServerConnected(int serverId)
  {
    if (directoryServers.containsKey(serverId))
    if (connectedDSs.containsKey(serverId))
    {
      return true;
    }
    // not directly connected
    for (ReplicationServerHandler rsHandler : replicationServers.values())
    for (ReplicationServerHandler rsHandler : connectedRSs.values())
    {
      if (rsHandler.isRemoteLDAPServer(serverId))
      {
@@ -3283,7 +3257,7 @@
      {
        // If we are the first replication server warned,
        // then forwards the message to the remote replication servers
        for (ReplicationServerHandler rsHandler : replicationServers.values())
        for (ReplicationServerHandler rsHandler : connectedRSs.values())
        {
          try
          {