mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
03.45.2014 2d735189c834108a2e5f7a795610372eb6d00aed
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -30,10 +30,12 @@
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.util.HashMap;
import java.util.HashSet;
@@ -77,15 +79,21 @@
 * <ul>
 * <li>A "generation_[id].id" file, where [id] is the generation id</li>
 * <li>One directory per server id, named after "[id].server" where [id] is the
 * id of the server. Each directory contains the log files for the given server
 * id.</li>
 * id of the server.</li>
 * </ul>
 * All log files end with the ".log" suffix. Log files always include the "head.log"
 * file and optionally zero to many read-only log files named after the lowest key
 * and highest key present in the log file.
 * Each server id directory contains the following files :
 * <ul>
 * <li>The "head.log" file, which is the more recent log file where records are appended.</li>
 * <li>Zero to many read-only log files named after the lowest key
 * and highest key present in the log file (they all end with the ".log" suffix.</li>
 * <li>Optionally, a "offline.state" file that indicates that this particular server id
 *  of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li>
 * </ul>
 * See {@code Log} class for details on the log files.
 *
 * <p>
 * Layout example with two domains "o=test1" and "o=test2", each having server
 * ids 22 and 33 :
 * ids 22 and 33, with server id 33 for domain "o=test1" being offline :
 *
 * <pre>
 * +---changelog
@@ -96,15 +104,16 @@
 * |   \---1.domain
 * |       \---generation1.id
 * |       \---22.server
 * |           \---head.log
 * |           \---head.log [contains last records written]
 * |       \---33.server
 * |           \---head.log
 * |           \---head.log [contains last records written]
 *             \---offline.state
 * |   \---2.domain
 * |       \---generation1.id
 * |       \---22.server
 * |           \---head.log
 * |           \---head.log [contains last records written]
 * |       \---33.server
 * |           \---head.log
 * |           \---head.log [contains last records written]
 * </pre>
 */
class ReplicationEnvironment
@@ -120,6 +129,8 @@
  private static final String DOMAINS_STATE_FILENAME = "domains.state";
  static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
  private static final String DOMAIN_STATE_SEPARATOR = ":";
  private static final String DOMAIN_SUFFIX = ".domain";
@@ -130,6 +141,8 @@
  private static final String GENERATION_ID_FILE_SUFFIX = ".id";
  private static final String UTF8_ENCODING = "UTF-8";
  private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter()
  {
    @Override
@@ -169,7 +182,6 @@
  private final Map<DN, String> domains = new HashMap<DN, String>();
  /** Exclusive lock to guard the domains mapping and change of state to a domain.*/
  // TODO : review the usefulness of this lock
  private final Object domainLock = new Object();
  /** The underlying replication server. */
@@ -358,6 +370,77 @@
    }
  }
  /**
   * Notify that the replica corresponding to provided domain and provided CSN
   * is offline.
   *
   * @param domainDN
   *          the domain of the offline replica
   * @param offlineCSN
   *          the offline replica serverId and offline timestamp
   * @throws ChangelogException
   *           if a problem occurs
   */
  void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
  {
    synchronized (domainLock)
    {
      final String domainId = domains.get(domainDN);
      final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
      if (!serverIdPath.exists())
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get(
            domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath()));
      }
      final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
      Writer writer = null;
      try
      {
        // Overwrite file, only the last sent offline CSN is kept
        writer = newFileWriter(offlineFile);
        writer.write(offlineCSN.toString());
      }
      catch (IOException e)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get(
            domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e);
      }
      finally
      {
        StaticUtils.close(writer);
      }
    }
  }
  /**
   * Notify that the replica corresponding to provided domain and server id
   * is online.
   *
   * @param domainDN
   *          the domain of the replica
   * @param serverId
   *          the replica serverId
   * @throws ChangelogException
   *           if a problem occurs
   */
  void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
  {
    synchronized (domainLock)
    {
      final String domainId = domains.get(domainDN);
      final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
      if (offlineFile.exists())
      {
        final boolean isDeleted = offlineFile.delete();
        if (!isDeleted)
        {
          throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get(
              offlineFile.getPath(), domainDN.toString(), serverId));
        }
      }
    }
  }
  /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
  private void readDomainsStateFile() throws ChangelogException
  {
@@ -368,7 +451,7 @@
      String line = null;
      try
      {
        reader = new BufferedReader(new InputStreamReader(new FileInputStream(domainsStateFile), "UTF-8"));
        reader = newFileReader(domainsStateFile);
        while ((line = reader.readLine()) != null)
        {
          final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
@@ -425,7 +508,7 @@
   * Update the changelog state with the state corresponding to the provided
   * domain DN.
   */
  private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState result)
  private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state)
      throws ChangelogException
  {
    final File domainDirectory = getDomainPath(domainEntry.getValue());
@@ -436,7 +519,7 @@
          replicationRootPath, domainDirectory.getPath()));
    }
    final DN domainDN = domainEntry.getKey();
    result.setDomainGenerationId(domainDN, toGenerationId(generationId));
    state.setDomainGenerationId(domainDN, toGenerationId(generationId));
    final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER);
    if (serverIds == null)
@@ -446,7 +529,43 @@
    }
    for (final File serverId : serverIds)
    {
      result.addServerIdToDomain(toServerId(serverId.getName()), domainDN);
      readStateForServerId(domainDN, serverId, state);
    }
  }
  private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException
  {
    state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN);
    final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
    if (offlineFile.exists())
    {
      final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN);
      state.addOfflineReplica(domainDN, offlineCSN);
    }
  }
  private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException
  {
    BufferedReader reader = null;
    try
    {
      reader = newFileReader(offlineFile);
      String line = reader.readLine();
      if (line == null || reader.readLine() != null)
      {
        throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get(
            domainDN.toString(), offlineFile.getPath()));
      }
      return new CSN(line);
    }
    catch(IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get(
          domainDN.toString(), offlineFile.getPath()), e);
    }
    finally {
      StaticUtils.close(reader);
    }
  }
@@ -458,13 +577,13 @@
    Writer writer = null;
    try
    {
      writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(domainsStateFile), "UTF-8"));
      writer = newFileWriter(domainsStateFile);
      for (final Entry<DN, String> entry : domains.entrySet())
      {
        writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey()));
      }
    }
    catch (Exception e)
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId,
          domainDN.toString(), domainsStateFile.getPath()), e);
@@ -561,7 +680,17 @@
    return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
  }
  private File getServerIdPath(final String domainId, final int serverId)
  /**
   * Return the path for the provided domain id and server id.
   * Package private to be usable in tests.
   *
   * @param domainId
   *            The id corresponding to a domain DN
   * @param serverId
   *            The server id to retrieve
   * @return the path
   */
  File getServerIdPath(final String domainId, final int serverId)
  {
    return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX);
  }
@@ -673,4 +802,16 @@
      throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e);
    }
  }
  /** Returns a buffered writer on the provided file. */
  private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException
  {
    return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING));
  }
  /** Returns a buffered reader on the provided file. */
  private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException
  {
    return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING));
  }
}