mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
07.29.2014 7ea8ac48d10e033ba0d6ca0ec0d66ace144062a0
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -173,15 +173,33 @@
  /** Root path where the replication log is stored. */
  private final String replicationRootPath;
  /**
   * The current changelogState. This is in-memory version of what is inside the
   * on-disk changelogStateDB. It improves performances in case the
   * changelogState is read often.
   *
   * @GuardedBy("domainsLock")
   */
  private final ChangelogState changelogState;
  /** The list of logs that are in use. */
  private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
  /** Maps each domain DN to a domain id that is used to name directory in file system. */
  /**
   * Maps each domain DN to a domain id that is used to name directory in file system.
   *
   * @GuardedBy("domainsLock")
   */
  private final Map<DN, String> domains = new HashMap<DN, String>();
  /** Exclusive lock to guard the domains mapping and change of state to a domain.*/
  private final Object domainLock = new Object();
  /**
   * Exclusive lock to synchronize:
   * <ul>
   * <li>the domains mapping</li>
   * <li>changes to the in-memory changelogState</li>
   * <li>changes to the on-disk state of a domain</li>
   */
  private final Object domainsLock = new Object();
  /** The underlying replication server. */
  private final ReplicationServer replicationServer;
@@ -203,21 +221,21 @@
  {
    this.replicationRootPath = rootPath;
    this.replicationServer = replicationServer;
    this.changelogState = readOnDiskChangelogState();
  }
  /**
   * Returns the state of the replication changelog, which includes the list of
   * known servers and the generation id.
   * Returns the state of the replication changelog.
   *
   * @return the {@link ChangelogState}
   * @return the {@link ChangelogState} read from the changelogState DB
   * @throws ChangelogException
   *           if a problem occurs while retrieving the state.
   *           if a database problem occurs
   */
  ChangelogState readChangelogState() throws ChangelogException
  ChangelogState readOnDiskChangelogState() throws ChangelogException
  {
    final ChangelogState state = new ChangelogState();
    final File changelogPath = new File(replicationRootPath);
    synchronized (domainLock)
    synchronized (domainsLock)
    {
      readDomainsStateFile();
      checkDomainDirectories(changelogPath);
@@ -230,6 +248,16 @@
  }
  /**
   * Returns the current state of the replication changelog.
   *
   * @return the current {@link ChangelogState}
   */
  ChangelogState getChangelogState()
  {
    return changelogState;
  }
  /**
   * Finds or creates the log used to store changes from the replication server
   * with the given serverId and the given baseDN.
   *
@@ -256,7 +284,7 @@
      ensureRootDirectoryExists();
      String domainId = null;
      synchronized (domainLock)
      synchronized (domainsLock)
      {
        domainId = domains.get(domainDN);
        if (domainId == null)
@@ -266,9 +294,11 @@
        final File serverIdPath = getServerIdPath(domainId, serverId);
        ensureServerIdDirectoryExists(serverIdPath);
        changelogState.addServerIdToDomain(serverId, domainDN);
        final File generationIdPath = getGenerationIdPath(domainId, generationId);
        ensureGenerationIdFileExists(generationIdPath);
        changelogState.setDomainGenerationId(domainDN, generationId);
        return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
      }
@@ -333,12 +363,12 @@
   */
  void clearGenerationId(final DN domainDN) throws ChangelogException
  {
    synchronized (domainLock)
    synchronized (domainsLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
        return; // unknown domain => no-op
      }
      final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
      if (idFile != null)
@@ -350,6 +380,7 @@
              ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
        }
      }
      changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID);
    }
  }
@@ -364,16 +395,17 @@
   */
  void resetGenerationId(final DN baseDN) throws ChangelogException
  {
    synchronized (domainLock)
    synchronized (domainsLock)
    {
      clearGenerationId(baseDN);
      final String domainId = domains.get(baseDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
        return; // unknown domain => no-op
      }
      final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
      ensureGenerationIdFileExists(generationIdPath);
      changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID);
    }
  }
@@ -390,12 +422,12 @@
   */
  void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
  {
    synchronized (domainLock)
    synchronized (domainsLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
        return; // unknown domain => no-op
      }
      final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
      if (!serverIdPath.exists())
@@ -409,6 +441,7 @@
        // Overwrite file, only the last sent offline CSN is kept
        writer = newFileWriter(offlineFile);
        writer.write(offlineCSN.toString());
        changelogState.addOfflineReplica(domainDN, offlineCSN);
      }
      catch (IOException e)
      {
@@ -435,12 +468,12 @@
   */
  void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
  {
    synchronized (domainLock)
    synchronized (domainsLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
        return; // unknown domain => no-op
      }
      final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
      if (offlineFile.exists())
@@ -452,6 +485,7 @@
              offlineFile.getPath(), domainDN.toString(), serverId));
        }
      }
      changelogState.removeOfflineReplica(domainDN, serverId);
    }
  }
@@ -497,24 +531,22 @@
  private void checkDomainDirectories(final File changelogPath) throws ChangelogException
  {
    final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
    if (dnDirectories == null)
    if (dnDirectories != null)
    {
      throw new ChangelogException(ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH.get(replicationRootPath));
    }
      final Set<String> domainIdsFromFileSystem = new HashSet<String>();
      for (final File dnDir : dnDirectories)
      {
        final String fileName = dnDir.getName();
        final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
        domainIdsFromFileSystem.add(domainId);
      }
    Set<String> domainIdsFromFileSystem = new HashSet<String>();
    for (final File dnDir : dnDirectories)
    {
      final String fileName = dnDir.getName();
      final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
      domainIdsFromFileSystem.add(domainId);
    }
    Set<String> expectedDomainIds = new HashSet<String>(domains.values());
    if (!domainIdsFromFileSystem.equals(expectedDomainIds))
    {
      throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
          domainIdsFromFileSystem.toString()));
      final Set<String> expectedDomainIds = new HashSet<String>(domains.values());
      if (!domainIdsFromFileSystem.equals(expectedDomainIds))
      {
        throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
            domainIdsFromFileSystem.toString()));
      }
    }
  }