mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.41.2013 c64a67b3d0b51743d9f2a2bf110cb365b8b104af
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -52,6 +52,7 @@
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
@@ -65,6 +66,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.types.ResultCode.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
@@ -143,23 +145,22 @@
  private long monitoringPublisherPeriod = 3000;
  /**
   * The handler of the draft change numbers database, the database used to
   * store the relation between a draft change number ('seqnum') and the
   * associated cookie.
   * The handler of the changelog database, the database stores the relation
   * between a draft change number ('seqnum') and the associated cookie.
   * <p>
   * Guarded by draftCNLock
   * Guarded by changelogDBLock
   */
  private DraftCNDbHandler draftCNDbHandler;
  private ChangelogDB changelogDB;
  /**
   * The last value generated of the draft change number.
   * <p>
   * Guarded by draftCNLock
   * Guarded by changelogDBLock
   **/
  private int lastGeneratedDraftCN = 0;
  /** Used for protecting draft CN related state. */
  private final Object draftCNLock = new Object();
  /** Used for protecting changelogDB related state. */
  private final Object changelogDBLock = new Object();
  /**
   * The tracer object for the debug logger.
@@ -183,7 +184,7 @@
  private long domainTicket = 0L;
  /** BaseDNs excluded for ECL. */
  private Collection<String> excludedBaseDNs = new ArrayList<String>();
  private Set<String> excludedBaseDNs = new HashSet<String>();
  /**
   * The weight affected to the replication server.
@@ -470,7 +471,7 @@
  private Set<String> getConnectedRSUrls(ReplicationServerDomain domain)
  {
    Set<String> results = new LinkedHashSet<String>();
    Set<String> results = new HashSet<String>();
    for (ReplicationServerHandler rsHandler : domain.getConnectedRSs().values())
    {
      results.add(normalizeServerURL(rsHandler.getServerAddressURL()));
@@ -714,11 +715,11 @@
      eclwe.finalizeWorkflowElement();
    }
    synchronized (draftCNLock)
    synchronized (changelogDBLock)
    {
      if (draftCNDbHandler != null)
      if (changelogDB != null)
      {
        draftCNDbHandler.shutdown();
        changelogDB.shutdown();
      }
    }
  }
@@ -900,42 +901,39 @@
    {
      dbEnv.clearGenerationId(baseDn);
    }
    catch (Exception e)
    catch (Exception ignored)
    {
      // Ignore.
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.WARNING, e);
        TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
      }
    }
    synchronized (draftCNLock)
    synchronized (changelogDBLock)
    {
      if (draftCNDbHandler != null)
      if (changelogDB != null)
      {
        try
        {
          draftCNDbHandler.clear(baseDn);
          changelogDB.clear(baseDn);
        }
        catch (Exception e)
        catch (Exception ignored)
        {
          // Ignore.
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, e);
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
        try
        {
          lastGeneratedDraftCN = draftCNDbHandler.getLastKey();
          lastGeneratedDraftCN = changelogDB.getLastKey();
        }
        catch (Exception e)
        catch (Exception ignored)
        {
          // Ignore.
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, e);
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
      }
@@ -1352,12 +1350,10 @@
  public void processExportBegin(Backend backend, LDIFExportConfig config)
  {
    if (debugEnabled())
      TRACER.debugInfo("RS " +getMonitorInstanceName()+
          " Export starts");
      TRACER.debugInfo("RS " + getMonitorInstanceName() + " Export starts");
    if (backend.getBackendID().equals(backendId))
    {
      // Retrieves the backend related to this replicationServerDomain
      // backend =
      ReplicationBackend b =
      (ReplicationBackend)DirectoryServer.getBackend(backendId);
      b.setServer(this);
@@ -1394,38 +1390,36 @@
      rsd.clearDbs();
    }
    synchronized (draftCNLock)
    synchronized (changelogDBLock)
    {
      if (draftCNDbHandler != null)
      if (changelogDB != null)
      {
        try
        {
          draftCNDbHandler.clear();
          changelogDB.clear();
        }
        catch (Exception e)
        catch (Exception ignored)
        {
          // Ignore.
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, e);
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
        try
        {
          draftCNDbHandler.shutdown();
          changelogDB.shutdown();
        }
        catch (Exception e)
        catch (Exception ignored)
        {
          // Ignore.
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, e);
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
        lastGeneratedDraftCN = 0;
        draftCNDbHandler = null;
        changelogDB = null;
      }
    }
  }
@@ -1614,67 +1608,70 @@
    ChangeNumber eligibleCN = null;
    for (ReplicationServerDomain domain : getReplicationServerDomains())
    {
      if ((excludedBaseDNs != null) &&
          excludedBaseDNs.contains(domain.getBaseDn()))
      if (contains(excludedBaseDNs, domain.getBaseDn()))
        continue;
      ChangeNumber domainEligibleCN = domain.getEligibleCN();
      String dates = "";
      if (domainEligibleCN != null)
      final ChangeNumber domainEligibleCN = domain.getEligibleCN();
      if (eligibleCN == null
          || (domainEligibleCN != null && domainEligibleCN.older(eligibleCN)))
      {
        if ((eligibleCN == null) || (domainEligibleCN.older(eligibleCN)))
        {
          eligibleCN = domainEligibleCN;
        }
        dates = new Date(domainEligibleCN.getTime()).toString();
        eligibleCN = domainEligibleCN;
      }
      debugLog += "[dn=" + domain.getBaseDn()
           + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]";
      if (debugEnabled())
      {
        final String dates = domainEligibleCN == null ?
            "" : new Date(domainEligibleCN.getTime()).toString();
        debugLog += "[baseDN=" + domain.getBaseDn()
            + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]";
      }
    }
    if (eligibleCN==null)
    if (eligibleCN==null )
    {
      eligibleCN = new ChangeNumber(TimeThread.getTime(), 0, 0);
    }
    if (debugEnabled())
    if (debugEnabled()) {
      TRACER.debugInfo("In " + this + " getEligibleCN() ends with " +
        " the following domainEligibleCN for each domain :" + debugLog +
        " thus CrossDomainEligibleCN=" + eligibleCN +
        "  ts=" + new Date(eligibleCN.getTime()).toString());
    }
    return eligibleCN;
  }
  private boolean contains(Set<String> col, String elem)
  {
    return col != null && col.contains(elem);
  }
  /**
   * Get or create a handler on a Db on DraftCN for external changelog.
   * Get (or create) a handler on the ChangelogDB for external changelog.
   *
   * @return the handler.
   * @throws DirectoryException
   *           when needed.
   */
  public DraftCNDbHandler getDraftCNDbHandler() throws DirectoryException
  public ChangelogDB getChangelogDB() throws DirectoryException
  {
    synchronized (draftCNLock)
    synchronized (changelogDBLock)
    {
      try
      {
        if (draftCNDbHandler == null)
        if (changelogDB == null)
        {
          draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv);
          changelogDB = new DraftCNDbHandler(this, this.dbEnv);
          lastGeneratedDraftCN = getLastDraftChangeNumber();
        }
        return draftCNDbHandler;
        return changelogDB;
      }
      catch (Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get(""));
        throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
            mb.toMessage(), e);
        throw new DirectoryException(OPERATIONS_ERROR, mb.toMessage(), e);
      }
    }
  }
@@ -1685,11 +1682,11 @@
   */
  public int getFirstDraftChangeNumber()
  {
    synchronized (draftCNLock)
    synchronized (changelogDBLock)
    {
      if (draftCNDbHandler != null)
      if (changelogDB != null)
      {
        return draftCNDbHandler.getFirstKey();
        return changelogDB.getFirstKey();
      }
      return 0;
    }
@@ -1701,11 +1698,11 @@
   */
  public int getLastDraftChangeNumber()
  {
    synchronized (draftCNLock)
    synchronized (changelogDBLock)
    {
      if (draftCNDbHandler != null)
      if (changelogDB != null)
      {
        return draftCNDbHandler.getLastKey();
        return changelogDB.getLastKey();
      }
      return 0;
    }
@@ -1717,7 +1714,7 @@
   */
  public int getNewDraftCN()
  {
    synchronized (draftCNLock)
    synchronized (changelogDBLock)
    {
      return ++lastGeneratedDraftCN;
    }
@@ -1756,12 +1753,11 @@
     */
    int lastDraftCN;
    Boolean dbEmpty = false;
    Long newestDate = 0L;
    DraftCNDbHandler draftCNDbH = getDraftCNDbHandler();
    boolean dbEmpty = false;
    long newestDate = 0L;
    ChangelogDB changelogDB = getChangelogDB();
    // Get the first DraftCN from the DraftCNdb
    int firstDraftCN = draftCNDbH.getFirstKey();
    int firstDraftCN = changelogDB.getFirstKey();
    Map<String,ServerState> domainsServerStateForLastSeqnum = null;
    ChangeNumber changeNumberForLastSeqnum = null;
    String domainForLastSeqnum = null;
@@ -1773,12 +1769,11 @@
    }
    else
    {
      // Get the last DraftCN from the DraftCNdb
      lastDraftCN = draftCNDbH.getLastKey();
      lastDraftCN = changelogDB.getLastKey();
      // Get the generalized state associated with the current last DraftCN
      // and initializes from it the startStates table
      String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
      String lastSeqnumGenState = changelogDB.getPreviousCookie(lastDraftCN);
      if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
      {
        domainsServerStateForLastSeqnum = MultiDomainServerState.
@@ -1786,16 +1781,16 @@
      }
      // Get the changeNumber associated with the current last DraftCN
      changeNumberForLastSeqnum = draftCNDbH.getChangeNumber(lastDraftCN);
      changeNumberForLastSeqnum = changelogDB.getChangeNumber(lastDraftCN);
      // Get the domain associated with the current last DraftCN
      domainForLastSeqnum = draftCNDbH.getBaseDN(lastDraftCN);
      domainForLastSeqnum = changelogDB.getBaseDN(lastDraftCN);
    }
    // Domain by domain
    for (ReplicationServerDomain rsd : getReplicationServerDomains())
    {
      if (excludedBaseDNs.contains(rsd.getBaseDn()))
      if (contains(excludedBaseDNs, rsd.getBaseDn()))
        continue;
      // for this domain, have the state in the replchangelog
@@ -1860,15 +1855,12 @@
  {
    disableEligibility(excludedBaseDNs);
    // Initialize start state for all running domains with empty state
    MultiDomainServerState result = new MultiDomainServerState();
    // Initialize start state for  all running domains with empty state
    for (ReplicationServerDomain rsd : getReplicationServerDomains())
    {
      if ((excludedBaseDNs != null)
          && (excludedBaseDNs.contains(rsd.getBaseDn())))
        continue;
      if (rsd.getDbServerState().isEmpty())
      if (contains(excludedBaseDNs, rsd.getBaseDn())
          || rsd.getDbServerState().isEmpty())
        continue;
      result.update(rsd.getBaseDn(), rsd.getEligibleState(getEligibleCN()));