From 2d735189c834108a2e5f7a795610372eb6d00aed Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 03 Jun 2014 08:45:08 +0000
Subject: [PATCH] OPENDJ-1467 :  File Based Changelog must support replicas temporarily leaving the topology

---
 opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java |  177 +++++++++++++++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 159 insertions(+), 18 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index ee69dd7..1e658c7 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -30,10 +30,12 @@
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
 import java.io.Writer;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -77,15 +79,21 @@
  * <ul>
  * <li>A "generation_[id].id" file, where [id] is the generation id</li>
  * <li>One directory per server id, named after "[id].server" where [id] is the
- * id of the server. Each directory contains the log files for the given server
- * id.</li>
+ * id of the server.</li>
  * </ul>
- * All log files end with the ".log" suffix. Log files always include the "head.log"
- * file and optionally zero to many read-only log files named after the lowest key
- * and highest key present in the log file.
+ * Each server id directory contains the following files :
+ * <ul>
+ * <li>The "head.log" file, which is the more recent log file where records are appended.</li>
+ * <li>Zero to many read-only log files named after the lowest key
+ * and highest key present in the log file (they all end with the ".log" suffix.</li>
+ * <li>Optionally, a "offline.state" file that indicates that this particular server id
+ *  of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li>
+ * </ul>
+ * See {@code Log} class for details on the log files.
+ *
  * <p>
  * Layout example with two domains "o=test1" and "o=test2", each having server
- * ids 22 and 33 :
+ * ids 22 and 33, with server id 33 for domain "o=test1" being offline :
  *
  * <pre>
  * +---changelog
@@ -96,15 +104,16 @@
  * |   \---1.domain
  * |       \---generation1.id
  * |       \---22.server
- * |           \---head.log
+ * |           \---head.log [contains last records written]
  * |       \---33.server
- * |           \---head.log
+ * |           \---head.log [contains last records written]
+ *             \---offline.state
  * |   \---2.domain
  * |       \---generation1.id
  * |       \---22.server
- * |           \---head.log
+ * |           \---head.log [contains last records written]
  * |       \---33.server
- * |           \---head.log
+ * |           \---head.log [contains last records written]
  * </pre>
  */
 class ReplicationEnvironment
@@ -120,6 +129,8 @@
 
   private static final String DOMAINS_STATE_FILENAME = "domains.state";
 
+  static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
+
   private static final String DOMAIN_STATE_SEPARATOR = ":";
 
   private static final String DOMAIN_SUFFIX = ".domain";
@@ -130,6 +141,8 @@
 
   private static final String GENERATION_ID_FILE_SUFFIX = ".id";
 
+  private static final String UTF8_ENCODING = "UTF-8";
+
   private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter()
   {
     @Override
@@ -169,7 +182,6 @@
   private final Map<DN, String> domains = new HashMap<DN, String>();
 
   /** Exclusive lock to guard the domains mapping and change of state to a domain.*/
-  // TODO : review the usefulness of this lock
   private final Object domainLock = new Object();
 
   /** The underlying replication server. */
@@ -358,6 +370,77 @@
     }
   }
 
+  /**
+   * Notify that the replica corresponding to provided domain and provided CSN
+   * is offline.
+   *
+   * @param domainDN
+   *          the domain of the offline replica
+   * @param offlineCSN
+   *          the offline replica serverId and offline timestamp
+   * @throws ChangelogException
+   *           if a problem occurs
+   */
+  void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
+  {
+    synchronized (domainLock)
+    {
+      final String domainId = domains.get(domainDN);
+      final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
+      if (!serverIdPath.exists())
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get(
+            domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath()));
+      }
+      final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+      Writer writer = null;
+      try
+      {
+        // Overwrite file, only the last sent offline CSN is kept
+        writer = newFileWriter(offlineFile);
+        writer.write(offlineCSN.toString());
+      }
+      catch (IOException e)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get(
+            domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e);
+      }
+      finally
+      {
+        StaticUtils.close(writer);
+      }
+    }
+  }
+
+  /**
+   * Notify that the replica corresponding to provided domain and server id
+   * is online.
+   *
+   * @param domainDN
+   *          the domain of the replica
+   * @param serverId
+   *          the replica serverId
+   * @throws ChangelogException
+   *           if a problem occurs
+   */
+  void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
+  {
+    synchronized (domainLock)
+    {
+      final String domainId = domains.get(domainDN);
+      final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
+      if (offlineFile.exists())
+      {
+        final boolean isDeleted = offlineFile.delete();
+        if (!isDeleted)
+        {
+          throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get(
+              offlineFile.getPath(), domainDN.toString(), serverId));
+        }
+      }
+    }
+  }
+
   /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
   private void readDomainsStateFile() throws ChangelogException
   {
@@ -368,7 +451,7 @@
       String line = null;
       try
       {
-        reader = new BufferedReader(new InputStreamReader(new FileInputStream(domainsStateFile), "UTF-8"));
+        reader = newFileReader(domainsStateFile);
         while ((line = reader.readLine()) != null)
         {
           final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
@@ -425,7 +508,7 @@
    * Update the changelog state with the state corresponding to the provided
    * domain DN.
    */
-  private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState result)
+  private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state)
       throws ChangelogException
   {
     final File domainDirectory = getDomainPath(domainEntry.getValue());
@@ -436,7 +519,7 @@
           replicationRootPath, domainDirectory.getPath()));
     }
     final DN domainDN = domainEntry.getKey();
-    result.setDomainGenerationId(domainDN, toGenerationId(generationId));
+    state.setDomainGenerationId(domainDN, toGenerationId(generationId));
 
     final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER);
     if (serverIds == null)
@@ -446,7 +529,43 @@
     }
     for (final File serverId : serverIds)
     {
-      result.addServerIdToDomain(toServerId(serverId.getName()), domainDN);
+      readStateForServerId(domainDN, serverId, state);
+    }
+  }
+
+  private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException
+  {
+    state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN);
+
+    final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+    if (offlineFile.exists())
+    {
+      final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN);
+      state.addOfflineReplica(domainDN, offlineCSN);
+    }
+  }
+
+  private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException
+  {
+    BufferedReader reader = null;
+    try
+    {
+      reader = newFileReader(offlineFile);
+      String line = reader.readLine();
+      if (line == null || reader.readLine() != null)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get(
+            domainDN.toString(), offlineFile.getPath()));
+      }
+      return new CSN(line);
+    }
+    catch(IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get(
+          domainDN.toString(), offlineFile.getPath()), e);
+    }
+    finally {
+      StaticUtils.close(reader);
     }
   }
 
@@ -458,13 +577,13 @@
     Writer writer = null;
     try
     {
-      writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(domainsStateFile), "UTF-8"));
+      writer = newFileWriter(domainsStateFile);
       for (final Entry<DN, String> entry : domains.entrySet())
       {
         writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey()));
       }
     }
-    catch (Exception e)
+    catch (IOException e)
     {
       throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId,
           domainDN.toString(), domainsStateFile.getPath()), e);
@@ -561,7 +680,17 @@
     return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
   }
 
-  private File getServerIdPath(final String domainId, final int serverId)
+  /**
+   * Return the path for the provided domain id and server id.
+   * Package private to be usable in tests.
+   *
+   * @param domainId
+   *            The id corresponding to a domain DN
+   * @param serverId
+   *            The server id to retrieve
+   * @return the path
+   */
+  File getServerIdPath(final String domainId, final int serverId)
   {
     return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX);
   }
@@ -673,4 +802,16 @@
       throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e);
     }
   }
+
+  /** Returns a buffered writer on the provided file. */
+  private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException
+  {
+    return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING));
+  }
+
+  /** Returns a buffered reader on the provided file. */
+  private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException
+  {
+    return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING));
+  }
 }

--
Gitblit v1.10.0