From 7ea8ac48d10e033ba0d6ca0ec0d66ace144062a0 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 07 Jul 2014 15:29:48 +0000
Subject: [PATCH] OPENDJ-1453 Replica offline messages should be synced with updates

---
 opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java |  100 +++++++++++++++++++++++++++++++++-----------------
 1 files changed, 66 insertions(+), 34 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index 65cd39c..c04fc94 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -173,15 +173,33 @@
 
   /** Root path where the replication log is stored. */
   private final String replicationRootPath;
+  /**
+   * The current changelogState. This is in-memory version of what is inside the
+   * on-disk changelogStateDB. It improves performances in case the
+   * changelogState is read often.
+   *
+   * @GuardedBy("domainsLock")
+   */
+  private final ChangelogState changelogState;
 
   /** The list of logs that are in use. */
   private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
 
-  /** Maps each domain DN to a domain id that is used to name directory in file system. */
+  /**
+   * Maps each domain DN to a domain id that is used to name directory in file system.
+   *
+   * @GuardedBy("domainsLock")
+   */
   private final Map<DN, String> domains = new HashMap<DN, String>();
 
-  /** Exclusive lock to guard the domains mapping and change of state to a domain.*/
-  private final Object domainLock = new Object();
+  /**
+   * Exclusive lock to synchronize:
+   * <ul>
+   * <li>the domains mapping</li>
+   * <li>changes to the in-memory changelogState</li>
+   * <li>changes to the on-disk state of a domain</li>
+   */
+  private final Object domainsLock = new Object();
 
   /** The underlying replication server. */
   private final ReplicationServer replicationServer;
@@ -203,21 +221,21 @@
   {
     this.replicationRootPath = rootPath;
     this.replicationServer = replicationServer;
+    this.changelogState = readOnDiskChangelogState();
   }
 
   /**
-   * Returns the state of the replication changelog, which includes the list of
-   * known servers and the generation id.
+   * Returns the state of the replication changelog.
    *
-   * @return the {@link ChangelogState}
+   * @return the {@link ChangelogState} read from the changelogState DB
    * @throws ChangelogException
-   *           if a problem occurs while retrieving the state.
+   *           if a database problem occurs
    */
-  ChangelogState readChangelogState() throws ChangelogException
+  ChangelogState readOnDiskChangelogState() throws ChangelogException
   {
     final ChangelogState state = new ChangelogState();
     final File changelogPath = new File(replicationRootPath);
-    synchronized (domainLock)
+    synchronized (domainsLock)
     {
       readDomainsStateFile();
       checkDomainDirectories(changelogPath);
@@ -230,6 +248,16 @@
   }
 
   /**
+   * Returns the current state of the replication changelog.
+   *
+   * @return the current {@link ChangelogState}
+   */
+  ChangelogState getChangelogState()
+  {
+    return changelogState;
+  }
+
+  /**
    * Finds or creates the log used to store changes from the replication server
    * with the given serverId and the given baseDN.
    *
@@ -256,7 +284,7 @@
       ensureRootDirectoryExists();
 
       String domainId = null;
-      synchronized (domainLock)
+      synchronized (domainsLock)
       {
         domainId = domains.get(domainDN);
         if (domainId == null)
@@ -266,9 +294,11 @@
 
         final File serverIdPath = getServerIdPath(domainId, serverId);
         ensureServerIdDirectoryExists(serverIdPath);
+        changelogState.addServerIdToDomain(serverId, domainDN);
 
         final File generationIdPath = getGenerationIdPath(domainId, generationId);
         ensureGenerationIdFileExists(generationIdPath);
+        changelogState.setDomainGenerationId(domainDN, generationId);
 
         return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
       }
@@ -333,12 +363,12 @@
    */
   void clearGenerationId(final DN domainDN) throws ChangelogException
   {
-    synchronized (domainLock)
+    synchronized (domainsLock)
     {
       final String domainId = domains.get(domainDN);
       if (domainId == null)
       {
-        return; // unknow domain => no-op
+        return; // unknown domain => no-op
       }
       final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
       if (idFile != null)
@@ -350,6 +380,7 @@
               ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
         }
       }
+      changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID);
     }
   }
 
@@ -364,16 +395,17 @@
    */
   void resetGenerationId(final DN baseDN) throws ChangelogException
   {
-    synchronized (domainLock)
+    synchronized (domainsLock)
     {
       clearGenerationId(baseDN);
       final String domainId = domains.get(baseDN);
       if (domainId == null)
       {
-        return; // unknow domain => no-op
+        return; // unknown domain => no-op
       }
       final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
       ensureGenerationIdFileExists(generationIdPath);
+      changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID);
     }
   }
 
@@ -390,12 +422,12 @@
    */
   void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
   {
-    synchronized (domainLock)
+    synchronized (domainsLock)
     {
       final String domainId = domains.get(domainDN);
       if (domainId == null)
       {
-        return; // unknow domain => no-op
+        return; // unknown domain => no-op
       }
       final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
       if (!serverIdPath.exists())
@@ -409,6 +441,7 @@
         // Overwrite file, only the last sent offline CSN is kept
         writer = newFileWriter(offlineFile);
         writer.write(offlineCSN.toString());
+        changelogState.addOfflineReplica(domainDN, offlineCSN);
       }
       catch (IOException e)
       {
@@ -435,12 +468,12 @@
    */
   void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
   {
-    synchronized (domainLock)
+    synchronized (domainsLock)
     {
       final String domainId = domains.get(domainDN);
       if (domainId == null)
       {
-        return; // unknow domain => no-op
+        return; // unknown domain => no-op
       }
       final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
       if (offlineFile.exists())
@@ -452,6 +485,7 @@
               offlineFile.getPath(), domainDN.toString(), serverId));
         }
       }
+      changelogState.removeOfflineReplica(domainDN, serverId);
     }
   }
 
@@ -497,24 +531,22 @@
   private void checkDomainDirectories(final File changelogPath) throws ChangelogException
   {
     final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
-    if (dnDirectories == null)
+    if (dnDirectories != null)
     {
-      throw new ChangelogException(ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH.get(replicationRootPath));
-    }
+      final Set<String> domainIdsFromFileSystem = new HashSet<String>();
+      for (final File dnDir : dnDirectories)
+      {
+        final String fileName = dnDir.getName();
+        final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
+        domainIdsFromFileSystem.add(domainId);
+      }
 
-    Set<String> domainIdsFromFileSystem = new HashSet<String>();
-    for (final File dnDir : dnDirectories)
-    {
-      final String fileName = dnDir.getName();
-      final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
-      domainIdsFromFileSystem.add(domainId);
-    }
-
-    Set<String> expectedDomainIds = new HashSet<String>(domains.values());
-    if (!domainIdsFromFileSystem.equals(expectedDomainIds))
-    {
-      throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
-          domainIdsFromFileSystem.toString()));
+      final Set<String> expectedDomainIds = new HashSet<String>(domains.values());
+      if (!domainIdsFromFileSystem.equals(expectedDomainIds))
+      {
+        throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
+            domainIdsFromFileSystem.toString()));
+      }
     }
   }
 

--
Gitblit v1.10.0