mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
05.31.2007 17beeae33bb7d73dee3f1a4f9bdf18e5645717d7
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -111,7 +111,7 @@
  private MsgQueue lateQueue = new MsgQueue();
  private final Map<ChangeNumber, AckMessageList> waitingAcks  =
          new HashMap<ChangeNumber, AckMessageList>();
  private ReplicationCache replicationCache = null;
  private ReplicationServerDomain replicationServerDomain = null;
  private String serverURL;
  private int outCount = 0; // number of update sent to the server
  private int inCount = 0;  // number of updates received from the server
@@ -227,11 +227,13 @@
        // This is an outgoing connection. Publish our start message.
        this.baseDn = baseDn;
        // Get or create the ReplicationCache
        replicationCache = replicationServer.getReplicationCache(baseDn, true);
        localGenerationId = replicationCache.getGenerationId();
        // Get or create the ReplicationServerDomain
        replicationServerDomain =
                replicationServer.getReplicationServerDomain(baseDn, true);
        localGenerationId = replicationServerDomain.getGenerationId();
        ServerState localServerState = replicationCache.getDbServerState();
        ServerState localServerState =
                replicationServerDomain.getDbServerState();
        ReplServerStartMessage msg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    baseDn, windowSize, localServerState,
@@ -298,12 +300,13 @@
        serverIsLDAPserver = true;
        // Get or Create the ReplicationCache
        replicationCache = replicationServer.getReplicationCache(this.baseDn,
            true);
        localGenerationId = replicationCache.getGenerationId();
        // Get or Create the ReplicationServerDomain
        replicationServerDomain =
                replicationServer.getReplicationServerDomain(this.baseDn, true);
        localGenerationId = replicationServerDomain.getGenerationId();
        ServerState localServerState = replicationCache.getDbServerState();
        ServerState localServerState =
                replicationServerDomain.getDbServerState();
        // This an incoming connection. Publish our start message
        ReplServerStartMessage myStartMsg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
@@ -322,9 +325,10 @@
        if (debugEnabled())
        {
          Set<String> ss = this.serverState.toStringSet();
          Set<String> lss = replicationCache.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
                   getMonitorInstanceName() +
          Set<String> lss =
                  replicationServerDomain.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationServerDomain.
                   getReplicationServer().getMonitorInstanceName() +
                   ", SH received START from LS serverId=" + serverId +
                   " baseDN=" + this.baseDn +
                   " generationId=" + generationId +
@@ -376,7 +380,7 @@
          }
          else
          {
            replicationCache.setGenerationId(generationId, false);
            replicationServerDomain.setGenerationId(generationId, false);
          }
        }
      }
@@ -396,11 +400,11 @@
        this.baseDn = receivedMsg.getBaseDn();
        if (baseDn == null)
        {
          // Get or create the ReplicationCache
          replicationCache = replicationServer.getReplicationCache(this.baseDn,
              true);
          localGenerationId = replicationCache.getGenerationId();
          ServerState serverState = replicationCache.getDbServerState();
          // Get or create the ReplicationServerDomain
          replicationServerDomain = replicationServer.
                  getReplicationServerDomain(this.baseDn, true);
          localGenerationId = replicationServerDomain.getGenerationId();
          ServerState serverState = replicationServerDomain.getDbServerState();
          // The session initiator decides whether to use SSL.
          sslEncryption = receivedMsg.getSSLEncryption();
@@ -431,9 +435,10 @@
        if (debugEnabled())
        {
          Set<String> ss = this.serverState.toStringSet();
          Set<String> lss = replicationCache.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
                   getMonitorInstanceName() +
          Set<String> lss =
                  replicationServerDomain.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationServerDomain.
                   getReplicationServer().getMonitorInstanceName() +
                   ", SH received START from RS serverId=" + serverId +
                   " baseDN=" + this.baseDn +
                   " generationId=" + generationId +
@@ -448,7 +453,8 @@
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + replicationCache.getReplicationServer().
            TRACER.debugInfo("In " +
                    replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + " RS with serverID=" + serverId +
              " is connected with the right generation ID");
          }
@@ -464,7 +470,7 @@
              if (generationId != localGenerationId)
              {
                // if the 2 RS have different generationID
                if (replicationCache.getGenerationIdSavedStatus())
                if (replicationServerDomain.getGenerationIdSavedStatus())
                {
                  // it the present RS has received changes regarding its
                  //     gen ID and so won't change without a reset
@@ -497,7 +503,8 @@
                  //         set the gen ID received from the peer RS
                  //         specially if the peer has a non nul state and
                  //         we have a nul state ?
                  // replicationCache.setGenerationId(generationId, false);
                  // replicationServerDomain.
                  // setGenerationId(generationId, false);
                  Message message = NOTE_BAD_GENERATION_ID.get(
                      this.baseDn.toNormalizedString(),
                      Short.toString(receivedMsg.getServerId()),
@@ -519,7 +526,7 @@
          else
          {
            // The local RS is not initialized - take the one received
            replicationCache.setGenerationId(generationId, false);
            replicationServerDomain.setGenerationId(generationId, false);
          }
        }
      }
@@ -529,18 +536,18 @@
        return;   // we did not recognize the message, ignore it
      }
      // Get or create the ReplicationCache
      replicationCache = replicationServer.getReplicationCache(this.baseDn,
          true);
      // Get or create the ReplicationServerDomain
      replicationServerDomain = replicationServer.
              getReplicationServerDomain(this.baseDn,true);
      boolean started;
      if (serverIsLDAPserver)
      {
        started = replicationCache.startServer(this);
        started = replicationServerDomain.startServer(this);
      }
      else
      {
        started = replicationCache.startReplicationServer(this);
        started = replicationServerDomain.startReplicationServer(this);
      }
      if (started)
@@ -548,8 +555,10 @@
        // sendWindow MUST be created before starting the writer
        sendWindow = new Semaphore(sendWindowSize);
        writer = new ServerWriter(session, serverId, this, replicationCache);
        reader = new ServerReader(session, serverId, this, replicationCache);
        writer = new ServerWriter(session, serverId,
                this, replicationServerDomain);
        reader = new ServerReader(session, serverId,
                this, replicationServerDomain);
        reader.start();
        writer.start();
@@ -575,7 +584,8 @@
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + replicationCache.getReplicationServer().
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + " RS failed to start locally " +
              " the connection from serverID="+serverId);
          }
@@ -812,7 +822,7 @@
        * the sum of the number of missing changes for every dbHandler.
        */
       int totalCount = 0;
       ServerState dbState = replicationCache.getDbServerState();
       ServerState dbState = replicationServerDomain.getDbServerState();
       for (short id : dbState)
       {
         int max = dbState.getMaxChangeNumber(id).getSeqnum();
@@ -926,7 +936,7 @@
     * Ignore updates from a server that is degraded due to
     * its inconsistent generationId
     */
    long referenceGenerationId = replicationCache.getGenerationId();
    long referenceGenerationId = replicationServerDomain.getGenerationId();
    if ((referenceGenerationId>0) &&
        (referenceGenerationId != generationId))
    {
@@ -993,7 +1003,7 @@
      saturationCount = 0;
      try
      {
        replicationCache.checkAllSaturation();
        replicationServerDomain.checkAllSaturation();
      }
      catch (IOException e)
      {
@@ -1059,11 +1069,11 @@
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
          /* fill the lateQueue */
          for (short serverId : replicationCache.getServers())
          for (short serverId : replicationServerDomain.getServers())
          {
            ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
            ReplicationIterator iterator =
              replicationCache.getChangelogIterator(serverId, lastCsn);
              replicationServerDomain.getChangelogIterator(serverId, lastCsn);
            if ((iterator != null) && (iterator.getChange() != null))
            {
              iteratorSortedSet.add(iterator);
@@ -1244,7 +1254,7 @@
    }
    if (completedFlag)
    {
      replicationCache.sendAck(changeNumber, true);
      replicationServerDomain.sendAck(changeNumber, true);
    }
  }
@@ -1274,8 +1284,9 @@
    }
    if (completedFlag)
    {
      ReplicationCache replicationCache = ackList.getChangelogCache();
      replicationCache.sendAck(changeNumber, false,
      ReplicationServerDomain replicationServerDomain =
              ackList.getChangelogCache();
      replicationServerDomain.sendAck(changeNumber, false,
                             ackList.getReplicationServerId());
    }
  }
@@ -1304,20 +1315,22 @@
   * @param update The update that must be added to the list.
   * @param ChangelogServerId The identifier of the replicationServer that sent
   *                          the update.
   * @param replicationCache The ReplicationCache from which the change was
   *                         processed and to which the ack must later be sent.
   * @param replicationServerDomain The ReplicationServerDomain from which the
   *                                change was processed and to which the ack
   *                                must later be sent.
   * @param nbWaitedAck The number of ack that must be received before
   *                    the update is fully acked.
   */
  public static void addWaitingAck(
      UpdateMessage update,
      short ChangelogServerId, ReplicationCache replicationCache,
      short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
      int nbWaitedAck)
  {
    ReplServerAckMessageList ackList =
          new ReplServerAckMessageList(update.getChangeNumber(),
                                      nbWaitedAck,
                                      ChangelogServerId, replicationCache);
                                      ChangelogServerId,
                                      replicationServerDomain);
    synchronized(changelogsWaitingAcks)
    {
      changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
@@ -1561,7 +1574,7 @@
    {
      if (flowControl)
      {
        if (replicationCache.restartAfterSaturation(this))
        if (replicationServerDomain.restartAfterSaturation(this))
        {
          flowControl = false;
        }
@@ -1605,11 +1618,11 @@
  public void process(RoutableMessage msg)
  {
    if (debugEnabled())
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
                 getMonitorInstanceName() +
                 " SH for remote server " + this.getMonitorInstanceName() +
                 " processes received msg=" + msg);
    replicationCache.process(msg, this);
    replicationServerDomain.process(msg, this);
  }
  /**
@@ -1623,7 +1636,7 @@
   throws IOException
   {
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sends message=" + info);
@@ -1640,7 +1653,7 @@
   public void receiveReplServerInfo(ReplServerInfoMessage infoMsg)
   {
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sets replServerInfo " + "<" + infoMsg + ">");
@@ -1691,7 +1704,8 @@
  public void send(RoutableMessage msg) throws IOException
  {
    if (debugEnabled())
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
          TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() +
              " SH for remote server " + this.getMonitorInstanceName() +
              " sends message=" + msg);