mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.03.2013 e3add78e22e130ef6ece000a7989f4f4317d4da8
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -59,8 +59,8 @@
  /**
   * This map contains the List of updates received from each LDAP server.
   */
  private final Map<DN, Map<Integer, DbHandler>> sourceDbHandlers =
      new ConcurrentHashMap<DN, Map<Integer, DbHandler>>();
  private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
      new ConcurrentHashMap<DN, Map<Integer, JEReplicaDB>>();
  private ReplicationDbEnv dbEnv;
  private final String dbDirectoryName;
  private final File dbDirectory;
@@ -152,9 +152,9 @@
    }
  }
  private Map<Integer, DbHandler> getDomainMap(DN baseDN)
  private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN)
  {
    final Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDN);
    final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      return domainMap;
@@ -162,7 +162,7 @@
    return Collections.emptyMap();
  }
  private DbHandler getDbHandler(DN baseDN, int serverId)
  private JEReplicaDB getReplicaDB(DN baseDN, int serverId)
  {
    return getDomainMap(baseDN).get(serverId);
  }
@@ -181,44 +181,44 @@
  private void commission(DN baseDN, int serverId, ReplicationServer rs)
      throws ChangelogException
  {
    getOrCreateDbHandler(baseDN, serverId, rs);
    getOrCreateReplicaDB(baseDN, serverId, rs);
  }
  /**
   * Returns a DbHandler, possibly creating it.
   * Returns a {@link JEReplicaDB}, possibly creating it.
   *
   * @param baseDN
   *          the baseDN for which to create a DbHandler
   *          the baseDN for which to create a ReplicaDB
   * @param serverId
   *          the baseserverId for which to create a DbHandler
   *          the serverId for which to create a ReplicaDB
   * @param rs
   *          the ReplicationServer
   * @return a Pair with the DbHandler and a a boolean indicating if it has been
   *         created
   * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
   *         to be created
   * @throws ChangelogException
   *           if a problem occurred with the database
   */
  Pair<DbHandler, Boolean> getOrCreateDbHandler(DN baseDN,
  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
      int serverId, ReplicationServer rs) throws ChangelogException
  {
    synchronized (sourceDbHandlers)
    synchronized (domainToReplicaDBs)
    {
      Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDN);
      Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
      if (domainMap == null)
      {
        domainMap = new ConcurrentHashMap<Integer, DbHandler>();
        sourceDbHandlers.put(baseDN, domainMap);
        domainMap = new ConcurrentHashMap<Integer, JEReplicaDB>();
        domainToReplicaDBs.put(baseDN, domainMap);
      }
      DbHandler dbHandler = domainMap.get(serverId);
      if (dbHandler == null)
      JEReplicaDB replicaDB = domainMap.get(serverId);
      if (replicaDB == null)
      {
        dbHandler =
            new DbHandler(serverId, baseDN, rs, dbEnv, rs.getQueueSize());
        domainMap.put(serverId, dbHandler);
        return Pair.of(dbHandler, true);
        replicaDB =
            new JEReplicaDB(serverId, baseDN, rs, dbEnv, rs.getQueueSize());
        domainMap.put(serverId, replicaDB);
        return Pair.of(replicaDB, true);
      }
      return Pair.of(dbHandler, false);
      return Pair.of(replicaDB, false);
    }
  }
@@ -318,7 +318,7 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    for (DN baseDN : this.sourceDbHandlers.keySet())
    for (DN baseDN : this.domainToReplicaDBs.keySet())
    {
      removeDomain(baseDN);
    }
@@ -377,10 +377,10 @@
  @Override
  public long getCount(DN baseDN, CSN from, CSN to)
  {
    DbHandler dbHandler = getDbHandler(baseDN, from.getServerId());
    if (dbHandler != null)
    JEReplicaDB replicaDB = getReplicaDB(baseDN, from.getServerId());
    if (replicaDB != null)
    {
      return dbHandler.getCount(from, to);
      return replicaDB.getCount(from, to);
    }
    return 0;
  }
@@ -390,9 +390,9 @@
  public long getDomainChangesCount(DN baseDN)
  {
    long entryCount = 0;
    for (DbHandler dbHandler : getDomainMap(baseDN).values())
    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      entryCount += dbHandler.getChangesCount();
      entryCount += replicaDB.getChangesCount();
    }
    return entryCount;
  }
@@ -401,17 +401,17 @@
  @Override
  public void shutdownDomain(DN baseDN)
  {
    shutdownDbHandlers(getDomainMap(baseDN));
    sourceDbHandlers.remove(baseDN);
    shutdownReplicaDBs(getDomainMap(baseDN));
    domainToReplicaDBs.remove(baseDN);
  }
  private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap)
  private void shutdownReplicaDBs(Map<Integer, JEReplicaDB> domainMap)
  {
    synchronized (domainMap)
    {
      for (DbHandler dbHandler : domainMap.values())
      for (JEReplicaDB replicaDB : domainMap.values())
      {
        dbHandler.shutdown();
        replicaDB.shutdown();
      }
      domainMap.clear();
    }
@@ -422,9 +422,9 @@
  public ServerState getDomainOldestCSNs(DN baseDN)
  {
    final ServerState result = new ServerState();
    for (DbHandler dbHandler : getDomainMap(baseDN).values())
    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      result.update(dbHandler.getOldestCSN());
      result.update(replicaDB.getOldestCSN());
    }
    return result;
  }
@@ -434,9 +434,9 @@
  public ServerState getDomainNewestCSNs(DN baseDN)
  {
    final ServerState result = new ServerState();
    for (DbHandler dbHandler : getDomainMap(baseDN).values())
    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      result.update(dbHandler.getNewestCSN());
      result.update(replicaDB.getNewestCSN());
    }
    return result;
  }
@@ -451,22 +451,22 @@
    ChangelogException firstException = null;
    // 1- clear the replica DBs
    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
    final Map<Integer, JEReplicaDB> domainMap = getDomainMap(baseDN);
    synchronized (domainMap)
    {
      for (DbHandler dbHandler : domainMap.values())
      for (JEReplicaDB replicaDB : domainMap.values())
      {
        try
        {
          dbHandler.clear();
          replicaDB.clear();
        }
        catch (ChangelogException e)
        {
          firstException = e;
        }
      }
      shutdownDbHandlers(domainMap);
      sourceDbHandlers.remove(baseDN);
      shutdownReplicaDBs(domainMap);
      domainToReplicaDBs.remove(baseDN);
    }
    // 2- clear the ChangeNumber index DB
@@ -511,11 +511,11 @@
  @Override
  public void setPurgeDelay(long delay)
  {
    for (Map<Integer, DbHandler> domainMap : sourceDbHandlers.values())
    for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
    {
      for (DbHandler dbHandler : domainMap.values())
      for (JEReplicaDB replicaDB : domainMap.values())
      {
        dbHandler.setPurgeDelay(delay);
        replicaDB.setPurgeDelay(delay);
      }
    }
  }
@@ -525,11 +525,11 @@
  public long getDomainLatestTrimDate(DN baseDN)
  {
    long latest = 0;
    for (DbHandler dbHandler : getDomainMap(baseDN).values())
    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      if (latest == 0 || latest < dbHandler.getLatestTrimDate())
      if (latest == 0 || latest < replicaDB.getLatestTrimDate())
      {
        latest = dbHandler.getLatestTrimDate();
        latest = replicaDB.getLatestTrimDate();
      }
    }
    return latest;
@@ -539,12 +539,12 @@
  @Override
  public CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN)
  {
    final DbHandler dbHandler = getDbHandler(baseDN, serverId);
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    ReplicaDBCursor cursor = null;
    try
    {
      cursor = dbHandler.generateCursorFrom(startAfterCSN);
      cursor = replicaDB.generateCursorFrom(startAfterCSN);
      if (cursor != null && cursor.getChange() != null)
      {
        return cursor.getChange().getCSN();
@@ -572,7 +572,7 @@
      {
        try
        {
          cnIndexDB = new DraftCNDbHandler(replicationServer, this.dbEnv);
          cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
        }
        catch (Exception e)
        {
@@ -595,12 +595,12 @@
  public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
      CSN startAfterCSN)
  {
    DbHandler dbHandler = getDbHandler(baseDN, serverId);
    if (dbHandler != null)
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      try
      {
        ReplicaDBCursor cursor = dbHandler.generateCursorFrom(startAfterCSN);
        ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
        cursor.next();
        return cursor;
      }
@@ -617,12 +617,12 @@
  public boolean publishUpdateMsg(DN baseDN, int serverId,
      UpdateMsg updateMsg) throws ChangelogException
  {
    final Pair<DbHandler, Boolean> pair =
        getOrCreateDbHandler(baseDN, serverId, replicationServer);
    final DbHandler dbHandler = pair.getFirst();
    final Pair<JEReplicaDB, Boolean> pair =
        getOrCreateReplicaDB(baseDN, serverId, replicationServer);
    final JEReplicaDB replicaDB = pair.getFirst();
    final boolean wasCreated = pair.getSecond();
    dbHandler.add(updateMsg);
    replicaDB.add(updateMsg);
    return wasCreated;
  }