mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
06.41.2013 8ed297692b7674b67b8d05a26fa9b04c20930e37
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -46,9 +47,9 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -117,17 +118,26 @@
  private final Queue<MessageHandler> otherHandlers =
    new ConcurrentLinkedQueue<MessageHandler>();
  /**
   * This map contains the List of updates received from each LDAP server.
   */
  private final Map<Integer, DbHandler> sourceDbHandlers =
      new ConcurrentHashMap<Integer, DbHandler>();
  private final ChangelogDB changelogDB;
  /** The ReplicationServer that created the current instance. */
  private ReplicationServer localReplicationServer;
  /** GenerationId management. */
  /**
   * The generationId of the current replication domain. The generationId is
   * computed by hashing the first 1000 entries in the DB.
   */
  private volatile long generationId = -1;
  private boolean generationIdSavedStatus = false;
  /**
   * JNR, this is legacy code, hard to follow logic. I think what this field
   * tries to say is: "is the generationId in use anywhere?", i.e. is there a
   * replication topology in place? As soon as an answer to any of these
   * question comes true, then it is set to true.
   * <p>
   * It looks like the only use of this field is to prevent the
   * {@link #generationId} from being reset by
   * {@link #resetGenerationIdIfPossible()}.
   */
  private volatile boolean generationIdSavedStatus = false;
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
@@ -177,6 +187,7 @@
    this.assuredTimeoutTimer = new Timer("Replication server RS("
        + localReplicationServer.getServerId()
        + ") assured timer for domain \"" + baseDn + "\"", true);
    this.changelogDB = localReplicationServer.getChangelogDB();
    DirectoryServer.registerMonitorProvider(this);
  }
@@ -252,7 +263,7 @@
      }
    }
    if (!publishMessage(update, serverId))
    if (!publishUpdateMsg(update, serverId))
    {
      return;
    }
@@ -390,43 +401,46 @@
    }
  }
  private boolean publishMessage(UpdateMsg update, int serverId)
  private boolean publishUpdateMsg(UpdateMsg updateMsg, int serverId)
  {
    // look for the dbHandler that is responsible for the LDAP server which
    // generated the change.
    DbHandler dbHandler;
    synchronized (sourceDbHandlers)
    try
    {
      dbHandler = sourceDbHandlers.get(serverId);
      if (dbHandler == null)
      if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg))
      {
        try
        {
          dbHandler = localReplicationServer.newDbHandler(serverId, baseDn);
          generationIdSavedStatus = true;
        } catch (ChangelogException e)
        /*
         * JNR: Matt and I had a hard time figuring out where to put this
         * synchronized block. We elected to put it here, but without a strong
         * conviction.
         */
        synchronized (generationIDLock)
        {
          /*
           * Because of database problem we can't save any more changes
           * from at least one LDAP server.
           * This replicationServer therefore can't do it's job properly anymore
           * and needs to close all its connections and shutdown itself.
           * JNR: I think the generationIdSavedStatus is set to true because
           * method above created a ReplicaDB which assumes the generationId was
           * communicated to another server. Hence setting true on this field
           * prevent the generationId from being reset.
           */
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(" ");
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          localReplicationServer.shutdown();
          return false;
          generationIdSavedStatus = true;
        }
        sourceDbHandlers.put(serverId, dbHandler);
      }
      return true;
    }
    // Publish the messages to the source handler
    dbHandler.add(update);
    return true;
    catch (ChangelogException e)
    {
      /*
       * Because of database problem we can't save any more changes from at
       * least one LDAP server. This replicationServer therefore can't do it's
       * job properly anymore and needs to close all its connections and
       * shutdown itself.
       */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(" ");
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      localReplicationServer.shutdown();
      return false;
    }
  }
  private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler,
@@ -1261,7 +1275,7 @@
   */
  public Set<Integer> getServerIds()
  {
    return sourceDbHandlers.keySet();
    return changelogDB.getDomainServerIds(baseDn);
  }
  /**
@@ -1278,29 +1292,7 @@
   */
  public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
  {
    DbHandler dbHandler = sourceDbHandlers.get(serverId);
    if (dbHandler == null)
    {
      return null;
    }
    ReplicaDBCursor cursor;
    try
    {
      cursor = dbHandler.generateCursorFrom(startAfterCSN);
    }
    catch (Exception e)
    {
      return null;
    }
    if (!cursor.next())
    {
      close(cursor);
      return null;
    }
    return cursor;
    return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN);
  }
 /**
@@ -1313,12 +1305,7 @@
  */
  public long getCount(int serverId, CSN from, CSN to)
  {
    DbHandler dbHandler = sourceDbHandlers.get(serverId);
    if (dbHandler != null)
    {
      return dbHandler.getCount(from, to);
    }
    return 0;
    return changelogDB.getCount(baseDn, serverId, from, to);
  }
  /**
@@ -1328,12 +1315,7 @@
   */
  public long getChangesCount()
  {
    long entryCount = 0;
    for (DbHandler dbHandler : sourceDbHandlers.values())
    {
      entryCount += dbHandler.getChangesCount();
    }
    return entryCount;
    return changelogDB.getDomainChangesCount(baseDn);
  }
  /**
@@ -1346,24 +1328,6 @@
  }
  /**
   * Sets the provided DbHandler associated to the provided serverId.
   *
   * @param serverId  the serverId for the server to which is
   *                  associated the DbHandler.
   * @param dbHandler the dbHandler associated to the serverId.
   *
   * @throws ChangelogException If a database error happened.
   */
  public void setDbHandler(int serverId, DbHandler dbHandler)
    throws ChangelogException
  {
    synchronized (sourceDbHandlers)
    {
      sourceDbHandlers.put(serverId, dbHandler);
    }
  }
  /**
   * Retrieves the destination handlers for a routable message.
   *
   * @param msg The message to route.
@@ -1734,20 +1698,7 @@
    stopAllServers(true);
    shutdownDbHandlers();
  }
  /** Shutdown all the dbHandlers. */
  private void shutdownDbHandlers()
  {
    synchronized (sourceDbHandlers)
    {
      for (DbHandler dbHandler : sourceDbHandlers.values())
      {
        dbHandler.shutdown();
      }
      sourceDbHandlers.clear();
    }
    changelogDB.shutdownDomain(baseDn);
  }
  /**
@@ -1758,9 +1709,9 @@
  public ServerState getDbServerState()
  {
    ServerState serverState = new ServerState();
    for (DbHandler db : sourceDbHandlers.values())
    for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values())
    {
      serverState.update(db.getLastChange());
      serverState.update(lastCSN);
    }
    return serverState;
  }
@@ -2235,24 +2186,7 @@
  public void clearDbs()
  {
    // Reset the localchange and state db for the current domain
    synchronized (sourceDbHandlers)
    {
      for (DbHandler dbHandler : sourceDbHandlers.values())
      {
        try
        {
          dbHandler.clear();
        } catch (Exception e)
        {
          // TODO: i18n
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
              e.getMessage() + " " + stackTraceToSingleLineString(e)));
          logError(mb.toMessage());
        }
      }
      shutdownDbHandlers();
    }
    changelogDB.clearDomain(baseDn);
    try
    {
      localReplicationServer.clearGenerationId(baseDn);
@@ -2397,20 +2331,6 @@
  }
  /**
   * Set the purge delay on all the db Handlers for this Domain
   * of Replication.
   *
   * @param delay The new purge delay to use.
   */
  public void setPurgeDelay(long delay)
  {
    for (DbHandler dbHandler : sourceDbHandlers.values())
    {
      dbHandler.setPurgeDelay(delay);
    }
  }
  /**
   * Get the map of connected DSs.
   * @return The map of connected DSs
   */
@@ -2667,7 +2587,6 @@
    {
      for (int serverId : dbState)
      {
        DbHandler h = sourceDbHandlers.get(serverId);
        CSN mostRecentDbCSN = dbState.getCSN(serverId);
        try {
          // Is the most recent change in the Db newer than eligible CSN ?
@@ -2676,19 +2595,8 @@
          if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
          {
            // let's try to seek the first change <= eligibleCSN
            ReplicaDBCursor cursor = null;
            try {
              cursor = h.generateCursorFrom(eligibleCSN);
              if (cursor != null && cursor.getChange() != null) {
                CSN newCSN = cursor.getChange().getCSN();
                result.update(newCSN);
              }
            } catch (ChangelogException e) {
              // there's no change older than eligibleCSN (case of s3/csn31)
              result.update(new CSN(0, 0, serverId));
            } finally {
              close(cursor);
            }
            CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN);
            result.update(newCSN);
          } else {
            // for this serverId, all changes in the ChangelogDb are holder
            // than eligibleCSN, the most recent in the db is our guy.
@@ -2721,9 +2629,9 @@
  public ServerState getStartState()
  {
    ServerState domainStartState = new ServerState();
    for (DbHandler dbHandler : sourceDbHandlers.values())
    for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values())
    {
      domainStartState.update(dbHandler.getFirstChange());
      domainStartState.update(firstCSN);
    }
    return domainStartState;
  }
@@ -2741,10 +2649,12 @@
  {
    CSN eligibleCSN = null;
    for (DbHandler db : sourceDbHandlers.values())
    for (Entry<Integer, CSN> entry :
      changelogDB.getDomainLastCSNs(baseDn).entrySet())
    {
      // Consider this producer (DS/db).
      int serverId = db.getServerId();
      final int serverId = entry.getKey();
      final CSN changelogLastCSN = entry.getValue();
      // Should it be considered for eligibility ?
      CSN heartbeatLastCSN =
@@ -2774,7 +2684,6 @@
        continue;
      }
      CSN changelogLastCSN = db.getLastChange();
      if (changelogLastCSN != null
          && (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN)))
      {
@@ -2935,15 +2844,7 @@
   */
  public long getLatestDomainTrimDate()
  {
    long latest = 0;
    for (DbHandler db : sourceDbHandlers.values())
    {
      if (latest == 0 || latest < db.getLatestTrimDate())
      {
        latest = db.getLatestTrimDate();
      }
    }
    return latest;
    return changelogDB.getDomainLatestTrimDate(baseDn);
  }
  /**