mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
07.29.2014 7ea8ac48d10e033ba0d6ca0ec0d66ace144062a0
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -66,6 +66,16 @@
{
  private Environment dbEnvironment;
  private Database changelogStateDb;
  /**
   * The current changelogState. This is in-memory version of what is inside the
   * on-disk changelogStateDB. It improves performances in case the
   * changelogState is read often.
   *
   * @GuardedBy("stateLock")
   */
  private final ChangelogState changelogState;
  /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState */
  private final Object stateLock = new Object();
  private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
  private ReplicationServer replicationServer;
  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
@@ -101,6 +111,7 @@
       * of all the servers that have been seen in the past.
       */
      changelogStateDb = openDatabase("changelogstate");
      changelogState = readOnDiskChangelogState();
    }
    catch (RuntimeException e)
    {
@@ -222,13 +233,23 @@
  }
  /**
   * Read and return the list of known servers from the database.
   * Return the current changelog state.
   *
   * @return the current {@link ChangelogState}
   */
  public ChangelogState getChangelogState()
  {
    return changelogState;
  }
  /**
   * Read and return the changelog state from the database.
   *
   * @return the {@link ChangelogState} read from the changelogState DB
   * @throws ChangelogException
   *           if a database problem occurs
   */
  public ChangelogState readChangelogState() throws ChangelogException
  protected ChangelogState readOnDiskChangelogState() throws ChangelogException
  {
    return decodeChangelogState(readWholeState());
  }
@@ -434,8 +455,13 @@
      // Opens the DB for the changes received from this server on this domain.
      final Database replicaDB = openDatabase(replicaEntry.getKey());
      putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
      putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
      synchronized (stateLock)
      {
        putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
        changelogState.addServerIdToDomain(serverId, baseDN);
        putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
        changelogState.setDomainGenerationId(baseDN, generationId);
      }
      return replicaDB;
    }
    catch (RuntimeException e)
@@ -638,9 +664,13 @@
   */
  public void clearGenerationId(DN baseDN) throws ChangelogException
  {
    final int unusedGenId = 0;
    deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
        "clearGenerationId(baseDN=" + baseDN + ")");
    synchronized (stateLock)
    {
      final int unusedGenId = 0;
      deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
          "clearGenerationId(baseDN=" + baseDN + ")");
      changelogState.setDomainGenerationId(baseDN, unusedGenId);
    }
  }
  /**
@@ -656,8 +686,12 @@
   */
  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
    synchronized (stateLock)
    {
      deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
          "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
      changelogState.setDomainGenerationId(baseDN, -1);
    }
  }
  private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
@@ -721,10 +755,14 @@
  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  {
    // just overwrite any older entry as it is assumed a newly received offline
    // CSN is newer than the previous one
    putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
        "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
    synchronized (stateLock)
    {
      // just overwrite any older entry as it is assumed a newly received offline
      // CSN is newer than the previous one
      putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
          "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
      changelogState.addOfflineReplica(baseDN, offlineCSN);
    }
  }
  /**
@@ -742,8 +780,12 @@
   */
  public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
        "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
    synchronized (stateLock)
    {
      deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
          "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
      changelogState.removeOfflineReplica(baseDN, serverId);
    }
  }
  private void putInChangelogStateDB(Entry<byte[], byte[]> entry,