mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
22.55.2014 df8af541e0b60da31ebd5963d50bb2646fea9846
OPENDJ-1467 :  File Based Changelog must support replicas temporarily leaving the topology

[Note: real merge of all changelog.file package content to be done in one shot in
a future commit]

* File-based changelog
Store the offline CSN when a replica goes offline
Read the offline CSN if present to build the changelog state at startup

Update ReplicationEnvironment.java to manage read and write from/to storage
Update ReplicationEnvironmentTest.java with more unit tests
Update replication.properties with new messages

* File and JE based changelog
Remove the offline CSN if present when receiving an heartbeat or an
update message

Update FileChangelogDB.java, JEChangelogDB.java, ReplicationDBEnv.java to
manage online replica notification

* Other minor updates : renaming, comments

7 files modified
115 ■■■■ changed files
opendj3-server-dev/src/messages/messages/replication.properties 16 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 16 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 15 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 15 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java 49 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/messages/messages/replication.properties
@@ -574,8 +574,8 @@
 to file system for log file '%s'
ERR_CHANGELOG_UNABLE_TO_SEEK_260=Could not seek to position %d for reader \
 on log file '%s'
ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root directory '%s' for \
 log file
ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root \
 directory '%s' for log file
ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE_262=Could not decode DN \
 from domain state file '%s', from line '%s'
ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE_263=Could not read domain state \
@@ -606,4 +606,14 @@
ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE_275=Could not rename \
 head log file from '%s' to '%s'
INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \
 size of head log file is %d bytes
 size of head log file is %d bytes
ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could not add replica \
 offline for domain %s and server id %d because the path '%s' does not exist
ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write offline \
 replica information for domain %s and server id %d, using path '%s' (offline CSN is %s)
ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica offline \
 state file '%s' for domain %s, it should contain exactly one line corresponding to the offline CSN
ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \
 replica offline state file '%s' for domain %s
ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \
 offline state file '%s' for domain %s and server id %d
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -496,7 +496,7 @@
      if (updateMsg instanceof ReplicaOfflineMsg)
      {
        final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg;
        this.domainDB.replicaOffline(baseDN, offlineMsg.getCSN());
        this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN());
        return true;
      }
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -225,5 +225,5 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
  void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -138,14 +138,6 @@
  /** {@inheritDoc} */
  @Override
  public void replicaOffline(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void initializeDB()
  {
    throw new RuntimeException("Not implemented");
@@ -182,6 +174,14 @@
  /** {@inheritDoc} */
  @Override
  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexDB getChangeNumberIndexDB()
  {
    throw new RuntimeException("Not implemented");
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -174,6 +174,21 @@
  }
  /**
   * Indicates if the replica corresponding to provided domain DN and server id
   * is offline.
   *
   * @param domainDN
   *          base DN of the replica
   * @param serverId
   *          server id of the replica
   * @return {@code true} if replica is offline, {@code false} otherwise
   */
  public boolean isReplicaOffline(DN domainDN, int serverId)
  {
    return replicasOffline.getCSN(domainDN, serverId) != null;
  }
  /**
   * Ensures the medium consistency point is updated by UpdateMsg.
   *
   * @param baseDN
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -827,6 +827,7 @@
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
      indexer.publishUpdateMsg(baseDN, updateMsg);
    }
    return pair.getSecond(); // replica DB was created
@@ -839,15 +840,25 @@
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
      indexer.publishHeartbeat(baseDN, heartbeatCSN);
    }
  }
  private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
      throws ChangelogException
  {
    if (indexer.isReplicaOffline(baseDN, serverId))
    {
      replicationEnv.notifyReplicaOnline(baseDN, serverId);
    }
  }
  /** {@inheritDoc} */
  @Override
  public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
  public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
  {
    replicationEnv.addOfflineReplica(baseDN, offlineCSN);
    replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -530,14 +530,32 @@
   */
  static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
  {
    final byte[] key =
        toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
            + FIELD_SEPARATOR + baseDN.toNormalizedString());
    final byte[] key = toReplicaOfflineKey(baseDN, offlineCSN.getServerId());
    final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
    data.append(offlineCSN.getTime());
    return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
  }
  /**
   * Return the key for a replica offline entry in the changelog state database.
   *
   * @param baseDN
   *          the replica's baseDN
   * @param serverId
   *          the replica's serverId
   * @return the key used in the database to store offline time of the replica
   */
  private static byte[] toReplicaOfflineKey(DN baseDN, int serverId)
  {
    return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedString());
  }
  /** Returns an entry with the provided key and a null value. */
  private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key)
  {
    return new SimpleImmutableEntry<byte[], byte[]>(key, null);
  }
  private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
      throws ChangelogException, RuntimeException
  {
@@ -722,7 +740,9 @@
  }
  /**
   * Add the information about an offline replica to the changelog state DB.
   * Notify that replica is offline.
   * <p>
   * This information is stored in the changelog state DB.
   *
   * @param baseDN
   *          the domain of the offline replica
@@ -731,7 +751,7 @@
   * @throws ChangelogException
   *           if a database problem occurred
   */
  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  {
    synchronized (stateLock)
@@ -744,6 +764,25 @@
    }
  }
  /**
   * Notify that replica is online.
   * <p>
   * Update the changelog state DB if necessary (ie, replica was known to be
   * offline).
   *
   * @param baseDN
   *          the domain of replica
   * @param serverId
   *          the serverId of replica
   * @throws ChangelogException
   *           if a database problem occurred
   */
  public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
        "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
  }
  private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
      String methodInvocation) throws ChangelogException
  {