mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
05.03.2013 58716319f378d324ebdf129fa75705759a1d7b57
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -362,8 +362,8 @@
    if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
      startMsg = new ReplServerStartMsg(replicationServerId,
          replicationServerURL, getBaseDN(), maxRcvWindow,
      startMsg = new ReplServerStartMsg(getReplicationServerId(),
              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServerDomain.getReplicationServer()
@@ -372,8 +372,8 @@
    else
    {
      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
      startMsg = new ReplServerStartDSMsg(replicationServerId,
          replicationServerURL, getBaseDN(), maxRcvWindow,
      startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
          new ServerState(), localGenerationId, sslEncryption,
          getLocalGroupId(), 0, replicationServer.getWeight(), 0);
    }
@@ -386,21 +386,16 @@
   * Creates a new handler object to a remote replication server.
   * @param session The session with the remote RS.
   * @param queueSize The queue size to manage updates to that RS.
   * @param replicationServerURL The hosting local RS URL.
   * @param replicationServerId The hosting local RS serverId.
   * @param replicationServer The hosting local RS object.
   * @param rcvWindowSize The receiving window size.
   */
  public ECLServerHandler(
      Session session,
      int queueSize,
      String replicationServerURL,
      int replicationServerId,
      ReplicationServer replicationServer,
      int rcvWindowSize)
  {
    super(session, queueSize, replicationServerURL, replicationServerId,
        replicationServer, rcvWindowSize);
    super(session, queueSize, replicationServer, rcvWindowSize);
    try
    {
      setBaseDNAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true);
@@ -413,30 +408,17 @@
  /**
   * Creates a new handler object to a remote replication server.
   * @param replicationServerURL The hosting local RS URL.
   * @param replicationServerId The hosting local RS serverId.
   * @param replicationServer The hosting local RS object.
   * @param startECLSessionMsg the start parameters.
   * @throws DirectoryException when an errors occurs.
   */
  public ECLServerHandler(
      String replicationServerURL,
      int replicationServerId,
      ReplicationServer replicationServer,
      StartECLSessionMsg startECLSessionMsg)
  throws DirectoryException
  {
    // queueSize is hard coded to 1 else super class hangs for some reason
    super(null, 1, replicationServerURL, replicationServerId,
        replicationServer, 0);
    try
    {
      setBaseDNAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true);
    }
    catch(DirectoryException de)
    {
      // no chance to have a bad domain set here
    }
    this(null, 1, replicationServer, 0);
    initialize(startECLSessionMsg);
  }
@@ -724,7 +706,7 @@
      // Creates the table that will contain the real-time info for each
      // and every domain.
      HashSet<DomainContext> tmpSet = new HashSet<DomainContext>();
      Set<DomainContext> tmpSet = new HashSet<DomainContext>();
      String missingDomains = "";
      if (rsdi != null)
      {
@@ -830,8 +812,8 @@
          newDomainCtxt.currentState = new ServerState();
          // Creates an unconnected SH for the domain
          MessageHandler mh = new MessageHandler(maxQueueSize,
              replicationServerURL, replicationServerId, replicationServer);
          MessageHandler mh =
              new MessageHandler(maxQueueSize, replicationServer);
          mh.setInitialServerState(newDomainCtxt.startState);
          mh.setBaseDNAndDomain(rsd.getBaseDn(), false);
          // register the unconnected into the domain
@@ -994,14 +976,13 @@
   *          requested.
   */
  @Override
  public ArrayList<Attribute> getMonitorData()
  public List<Attribute> getMonitorData()
  {
    // Get the generic ones
    ArrayList<Attribute> attributes = super.getMonitorData();
    List<Attribute> attributes = super.getMonitorData();
    // Add the specific RS ones
    attributes.add(Attributes.create("External-Changelog-Server",
        serverURL));
    attributes.add(Attributes.create("External-Changelog-Server", serverURL));
    // TODO:ECL No monitoring exist for ECL.
    return attributes;
@@ -1274,7 +1255,6 @@
      //    take the oldest
      //    if one domain has no msg, still is candidate
      int iDom;
      boolean continueLooping = true;
      while (continueLooping && searchPhase == INIT_PHASE)
      {
@@ -1284,11 +1264,9 @@
          // Default is not to loop, with one exception
          continueLooping = false;
          iDom = getOldestChangeFromDomainCtxts();
          // iDom == -1 means that there is no oldest change to process
          int iDom = getOldestChangeFromDomainCtxts();
          if (iDom == -1)
          {
          { // there is no oldest change to process
            closeInitPhase();
            // signals end of phase 1 to the caller
@@ -1296,12 +1274,14 @@
          }
          // Build the ECLUpdateMsg to be returned
          DomainContext oldestContext = domainCtxts[iDom];
          String suffix = oldestContext.rsd.getBaseDn();
          oldestChange = new ECLUpdateMsg(
              (LDAPUpdateMsg)domainCtxts[iDom].nextMsg,
              (LDAPUpdateMsg)oldestContext.nextMsg,
              null, // cookie will be set later
              domainCtxts[iDom].rsd.getBaseDn(),
              suffix,
              0); //  draftChangeNumber may be set later
          domainCtxts[iDom].nextMsg = null;
          oldestContext.nextMsg = null;
          if (draftCompat)
          {
@@ -1321,7 +1301,7 @@
            // replogcn : the oldest change from the changelog db
            ChangeNumber cnFromChangelogDb =
              oldestChange.getUpdateMsg().getChangeNumber();
            String dnFromChangelogDb = domainCtxts[iDom].rsd.getBaseDn();
            String dnFromChangelogDb = suffix;
            while (true)
            {
@@ -1435,7 +1415,7 @@
                draftCNDb.add(
                    oldestChange.getDraftChangeNumber(),
                    this.previousCookie.toString(),
                    domainCtxts[iDom].rsd.getBaseDn(),
                    suffix,
                    oldestChange.getUpdateMsg().getChangeNumber());
                break;
@@ -1448,23 +1428,23 @@
          // Set and test the domain of the oldestChange see if we reached
          // the end of the phase for this domain
          domainCtxts[iDom].currentState.update(
          oldestContext.currentState.update(
              oldestChange.getUpdateMsg().getChangeNumber());
          if (domainCtxts[iDom].currentState.cover(domainCtxts[iDom].stopState))
          if (oldestContext.currentState.cover(oldestContext.stopState))
          {
            domainCtxts[iDom].active = false;
            oldestContext.active = false;
          }
          if (draftCompat && (lastDraftCN>0) &&
              (oldestChange.getDraftChangeNumber()>lastDraftCN))
          {
            domainCtxts[iDom].active = false;
            oldestContext.active = false;
          }
          if (domainCtxts[iDom].active)
          if (oldestContext.active)
          {
            // populates the table with the next eligible msg from iDom
            // in non blocking mode, return null when no more eligible msg
            domainCtxts[iDom].getNextEligibleMessageForDomain(operationId);
            oldestContext.getNextEligibleMessageForDomain(operationId);
          }
        } // phase == INIT_PHASE
      } // while (...)
@@ -1481,19 +1461,19 @@
        }
        // take the oldest one
        iDom = getOldestChangeFromDomainCtxts();
        int iDom = getOldestChangeFromDomainCtxts();
        if (iDom != -1)
        {
          String suffix = this.domainCtxts[iDom].rsd.getBaseDn();
          DomainContext oldestContext = domainCtxts[iDom];
          String suffix = oldestContext.rsd.getBaseDn();
          oldestChange = new ECLUpdateMsg(
              (LDAPUpdateMsg)domainCtxts[iDom].nextMsg,
              (LDAPUpdateMsg)oldestContext.nextMsg,
              null, // set later
              suffix, 0);
          domainCtxts[iDom].nextMsg = null; // clean
          oldestContext.nextMsg = null; // clean
          domainCtxts[iDom].currentState.update(
          oldestContext.currentState.update(
              oldestChange.getUpdateMsg().getChangeNumber());
          if (draftCompat)
@@ -1509,7 +1489,7 @@
            draftCNDb.add(
                oldestChange.getDraftChangeNumber(),
                this.previousCookie.toString(),
                domainCtxts[iDom].rsd.getBaseDn(),
                suffix,
                oldestChange.getUpdateMsg().getChangeNumber());
          }
        }