opendj3-server-dev/src/messages/messages/replication.properties
@@ -574,8 +574,8 @@ to file system for log file '%s' ERR_CHANGELOG_UNABLE_TO_SEEK_260=Could not seek to position %d for reader \ on log file '%s' ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root directory '%s' for \ log file ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root \ directory '%s' for log file ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE_262=Could not decode DN \ from domain state file '%s', from line '%s' ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE_263=Could not read domain state \ @@ -606,4 +606,14 @@ ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE_275=Could not rename \ head log file from '%s' to '%s' INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \ size of head log file is %d bytes size of head log file is %d bytes ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could not add replica \ offline for domain %s and server id %d because the path '%s' does not exist ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write offline \ replica information for domain %s and server id %d, using path '%s' (offline CSN is %s) ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica offline \ state file '%s' for domain %s, it should contain exactly one line corresponding to the offline CSN ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \ replica offline state file '%s' for domain %s ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \ offline state file '%s' for domain %s and server id %d opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -496,7 +496,7 @@ if (updateMsg instanceof ReplicaOfflineMsg) { final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg; this.domainDB.replicaOffline(baseDN, offlineMsg.getCSN()); this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN()); return true; } opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -225,5 +225,5 @@ * @throws ChangelogException * If a database problem happened */ void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException; void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException; } opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -138,14 +138,6 @@ /** {@inheritDoc} */ @Override public void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void initializeDB() { throw new RuntimeException("Not implemented"); @@ -182,6 +174,14 @@ /** {@inheritDoc} */ @Override public void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public ChangeNumberIndexDB getChangeNumberIndexDB() { throw new RuntimeException("Not implemented"); opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -174,6 +174,21 @@ } /** * Indicates if the replica corresponding to provided domain DN and server id * is offline. * * @param domainDN * base DN of the replica * @param serverId * server id of the replica * @return {@code true} if replica is offline, {@code false} otherwise */ public boolean isReplicaOffline(DN domainDN, int serverId) { return replicasOffline.getCSN(domainDN, serverId) != null; } /** * Ensures the medium consistency point is updated by UpdateMsg. * * @param baseDN opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -827,6 +827,7 @@ final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId()); indexer.publishUpdateMsg(baseDN, updateMsg); } return pair.getSecond(); // replica DB was created @@ -839,15 +840,25 @@ final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId()); indexer.publishHeartbeat(baseDN, heartbeatCSN); } } private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId) throws ChangelogException { if (indexer.isReplicaOffline(baseDN, serverId)) { replicationEnv.notifyReplicaOnline(baseDN, serverId); } } /** {@inheritDoc} */ @Override public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException { replicationEnv.addOfflineReplica(baseDN, offlineCSN); replicationEnv.notifyReplicaOffline(baseDN, offlineCSN); final ChangeNumberIndexer indexer = cnIndexer.get(); if (indexer != null) { opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -530,14 +530,32 @@ */ static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN) { final byte[] key = toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId() + FIELD_SEPARATOR + baseDN.toNormalizedString()); final byte[] key = toReplicaOfflineKey(baseDN, offlineCSN.getServerId()); final ByteStringBuilder data = new ByteStringBuilder(8); // store a long data.append(offlineCSN.getTime()); return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray()); } /** * Return the key for a replica offline entry in the changelog state database. * * @param baseDN * the replica's baseDN * @param serverId * the replica's serverId * @return the key used in the database to store offline time of the replica */ private static byte[] toReplicaOfflineKey(DN baseDN, int serverId) { return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedString()); } /** Returns an entry with the provided key and a null value. */ private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key) { return new SimpleImmutableEntry<byte[], byte[]>(key, null); } private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry) throws ChangelogException, RuntimeException { @@ -722,7 +740,9 @@ } /** * Add the information about an offline replica to the changelog state DB. * Notify that replica is offline. * <p> * This information is stored in the changelog state DB. * * @param baseDN * the domain of the offline replica @@ -731,7 +751,7 @@ * @throws ChangelogException * if a database problem occurred */ public void addOfflineReplica(DN baseDN, CSN offlineCSN) public void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException { synchronized (stateLock) @@ -744,6 +764,25 @@ } } /** * Notify that replica is online. * <p> * Update the changelog state DB if necessary (ie, replica was known to be * offline). * * @param baseDN * the domain of replica * @param serverId * the serverId of replica * @throws ChangelogException * if a database problem occurred */ public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException { deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)), "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")"); } private void putInChangelogStateDB(Entry<byte[], byte[]> entry, String methodInvocation) throws ChangelogException {