From d90a03781f4833f726597d735b79b49073940d75 Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Fri, 06 Nov 2015 17:46:01 +0000
Subject: [PATCH] OPENDJ-1937 Replication draft change log changeNumber attribute should be synchronized with other RSs during initialization
---
opendj-server-legacy/src/main/java/org/opends/server/tools/dsreplication/ReplicationCliMain.java | 726 +++++++++++++++++++++++++++++++++++-------------------
1 files changed, 467 insertions(+), 259 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/tools/dsreplication/ReplicationCliMain.java b/opendj-server-legacy/src/main/java/org/opends/server/tools/dsreplication/ReplicationCliMain.java
index 8c5494f..c612fad 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/tools/dsreplication/ReplicationCliMain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/tools/dsreplication/ReplicationCliMain.java
@@ -36,7 +36,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
@@ -49,6 +48,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
@@ -57,13 +57,12 @@
import javax.naming.NamingEnumeration;
import javax.naming.NamingException;
import javax.naming.NoPermissionException;
-import javax.naming.directory.Attribute;
-import javax.naming.directory.BasicAttribute;
import javax.naming.directory.BasicAttributes;
import javax.naming.directory.DirContext;
import javax.naming.directory.SearchControls;
import javax.naming.directory.SearchResult;
import javax.naming.ldap.InitialLdapContext;
+import javax.naming.ldap.LdapName;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
@@ -112,6 +111,8 @@
import org.opends.server.tools.tasks.TaskScheduleInteraction;
import org.opends.server.tools.tasks.TaskScheduleUserData;
import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.HostPort;
import org.opends.server.types.InitializationException;
import org.opends.server.types.NullOutputStream;
import org.opends.server.types.OpenDsException;
@@ -147,6 +148,7 @@
import static com.forgerock.opendj.cli.Utils.*;
import static com.forgerock.opendj.util.OperatingSystem.*;
+import static java.util.Collections.*;
import static org.forgerock.util.Utils.*;
import static org.opends.admin.ads.util.ConnectionUtils.*;
import static org.opends.admin.ads.util.PreferredConnection.*;
@@ -220,6 +222,9 @@
STATUS(STATUS_REPLICATION_SUBCMD_NAME, INFO_REPLICATION_STATUS_MENU_PROMPT.get()),
/** Replication purge historical. */
PURGE_HISTORICAL(PURGE_HISTORICAL_SUBCMD_NAME, INFO_REPLICATION_PURGE_HISTORICAL_MENU_PROMPT.get()),
+ /** Set changelog change number from another server. */
+ RESET_CHANGE_NUMBER(ReplicationCliArgumentParser.RESET_CHANGE_NUMBER_SUBCMD_NAME,
+ INFO_DESCRIPTION_RESET_CHANGE_NUMBER.get()),
/** Cancel operation. */
CANCEL(null, null);
@@ -256,6 +261,34 @@
}
}
+ /** Abstract some of the operations when two servers must be queried for information. */
+ private interface OperationBetweenSourceAndDestinationServers
+ {
+ /**
+ * Returns whether we should stop processing after asking the user for additional information.
+ * Might connect to servers to run configuration checks.
+ * @param baseDNs user specified baseDNs
+ * @param source the source server
+ * @param dest the destination server
+ * @param interactive if user has to input information
+ * @return whether we should stop
+ */
+ boolean continueAfterUserInput(Collection<String> baseDNs, InitialLdapContext source, InitialLdapContext dest,
+ boolean interactive);
+
+ /**
+ * Confirm with the user whether the current task should continue.
+ *
+ * @param uData servers address and authentication parameters
+ * @param ctxSource LDAP Context for the destination server
+ * @param ctxDestination LDAP Context for the source server
+ * @param defaultValue default yes or no
+ * @return whether the current task should be interrupted
+ */
+ boolean confirmOperation(SourceDestinationServerUserData uData, InitialLdapContext ctxSource,
+ InitialLdapContext ctxDestination, final boolean defaultValue);
+ }
+
/** The argument parser to be used. */
private ReplicationCliArgumentParser argParser;
private FileBasedArgument userProvidedAdminPwdFile;
@@ -527,6 +560,8 @@
return statusReplication();
case PURGE_HISTORICAL:
return purgeHistorical();
+ case RESET_CHANGE_NUMBER:
+ return resetChangeNumber();
default:
return SUCCESSFUL_NOP;
}
@@ -543,15 +578,7 @@
*/
private String askForAdministratorUID(String defaultValue, LocalizedLogger logger)
{
- try
- {
- return readInput(INFO_ADMINISTRATOR_UID_PROMPT.get(), defaultValue);
- }
- catch (ClientException ce)
- {
- logger.warn(LocalizableMessage.raw("Error reading input: " + ce, ce));
- return defaultValue;
- }
+ return ask(logger, INFO_ADMINISTRATOR_UID_PROMPT.get(), defaultValue);
}
/**
@@ -574,6 +601,19 @@
}
}
+ private String ask(LocalizedLogger logger, LocalizableMessage msgPrompt, String defaultValue)
+ {
+ try
+ {
+ return readInput(msgPrompt, defaultValue);
+ }
+ catch (ClientException ce)
+ {
+ logger.warn(LocalizableMessage.raw("Error reading input: " + ce, ce));
+ return defaultValue;
+ }
+ }
+
/**
* Commodity method used to repeatidly ask the user to provide an integer
* value.
@@ -1316,6 +1356,219 @@
}
}
+ private ReplicationCliReturnCode resetChangeNumber()
+ {
+ final String changeNumber;
+ final SourceDestinationServerUserData uData = new SourceDestinationServerUserData();
+
+ if (!argParser.isInteractive())
+ {
+ initializeWithArgParser(uData);
+ return resetChangeNumber(uData);
+ }
+ OperationBetweenSourceAndDestinationServers
+ resetChangeNumberOperations = new OperationBetweenSourceAndDestinationServers()
+ {
+ @Override
+ public boolean continueAfterUserInput(Collection<String> baseDNs, InitialLdapContext source,
+ InitialLdapContext dest, boolean interactive)
+ {
+ TopologyCacheFilter filter = new TopologyCacheFilter();
+ filter.setSearchMonitoringInformation(false);
+
+ if (!argParser.resetChangeNumber.isPresent())
+ {
+ String cn = getNewestChangeNumber(source);
+ if (cn.isEmpty())
+ {
+ return true;
+ }
+ argParser.setResetChangeNumber(
+ ask(logger, INFO_RESET_CHANGE_NUMBER_TO.get(uData.getSource(), uData.getDestination()), cn));
+ }
+ return false;
+ }
+
+ @Override
+ public boolean confirmOperation(SourceDestinationServerUserData uData, InitialLdapContext ctxSource,
+ InitialLdapContext ctxDestination, boolean defaultValue)
+ {
+ return !askConfirmation(INFO_RESET_CHANGE_NUMBER_CONFIRM_RESET.get(uData.getDestinationHostPort()),
+ defaultValue);
+ }
+ };
+
+ return promptIfRequired(uData, resetChangeNumberOperations) ? resetChangeNumber(uData) : USER_CANCELLED;
+ }
+
+ private ReplicationCliReturnCode resetChangeNumber(SourceDestinationServerUserData uData)
+ {
+
+ InitialLdapContext ctxSource = createAdministrativeContext(uData, uData.getSource());
+ InitialLdapContext ctxDest = createAdministrativeContext(uData, uData.getDestination());
+ if (!getCommonSuffixes(ctxSource, ctxDest, SuffixRelationType.NOT_FULLY_REPLICATED).isEmpty())
+ {
+ errPrintln(ERROR_RESET_CHANGE_NUMBER_SERVERS_BASEDNS_DIFFER.get(uData.getSourceHostPort(),
+ uData.getDestinationHostPort()));
+ return ERROR_RESET_CHANGE_NUMBER_BASEDNS_SHOULD_EQUAL;
+ }
+ if (mustPrintCommandBuilder())
+ {
+ printNewCommandBuilder(RESET_CHANGE_NUMBER_SUBCMD_NAME, uData);
+ }
+ try
+ {
+ String newStartCN;
+ if (argParser.resetChangeNumber.isPresent())
+ {
+ newStartCN = String.valueOf(argParser.getResetChangeNumber());
+ }
+ else
+ {
+ newStartCN = getNewestChangeNumber(ctxSource);
+ if (newStartCN.isEmpty())
+ {
+ return ERROR_UNKNOWN_CHANGE_NUMBER;
+ }
+ argParser.setResetChangeNumber(newStartCN);
+ }
+ SearchControls ctls = new SearchControls();
+ ctls.setSearchScope(SearchControls.SUBTREE_SCOPE);
+ ctls.setReturningAttributes(
+ new String[] {
+ "changeNumber",
+ "replicationCSN",
+ "targetDN"
+ });
+ NamingEnumeration<SearchResult> listeners = ctxSource.search(new LdapName("cn=changelog"),
+ "(changeNumber=" + newStartCN + ")", ctls);
+ if (!listeners.hasMore())
+ {
+ errPrintln(ERROR_RESET_CHANGE_NUMBER_UNKNOWN_NUMBER.get(newStartCN, uData.getSourceHostPort()));
+ return ERROR_UNKNOWN_CHANGE_NUMBER;
+ }
+ SearchResult sr = listeners.next();
+ String newStartCSN = getFirstValue(sr, "replicationCSN");
+ if (newStartCSN == null)
+ {
+ errPrintln(ERROR_RESET_CHANGE_NUMBER_NO_CSN_FOUND.get(newStartCN, uData.getSourceHostPort()));
+ return ERROR_RESET_CHANGE_NUMBER_NO_CSN;
+ }
+ String targetDN = getFirstValue(sr, "targetDN");
+ DN targetBaseDN = DN.rootDN();
+ try
+ {
+ for (String adn : getCommonSuffixes(ctxSource, ctxDest, SuffixRelationType.REPLICATED))
+ {
+ DN dn = DN.valueOf(adn);
+ if (DN.valueOf(targetDN).isDescendantOf(dn) && dn.isDescendantOf(targetBaseDN))
+ {
+ targetBaseDN = dn;
+ }
+ }
+ }
+ catch (DirectoryException de)
+ {
+ errPrintln(ERROR_RESET_CHANGE_NUMBER_EXCEPTION.get(de.getLocalizedMessage()));
+ return ERROR_RESET_CHANGE_NUMBER_PROBLEM;
+ }
+ if (targetBaseDN.isRootDN())
+ {
+ errPrintln(ERROR_RESET_CHANGE_NUMBER_NO_BASEDN.get(newStartCN, targetDN, newStartCSN));
+ return ERROR_RESET_CHANGE_NUMBER_UNKNOWN_BASEDN;
+ }
+ logger.info(INFO_RESET_CHANGE_NUMBER_INFO.get(uData.getDestinationHostPort(),
+ newStartCN, newStartCSN, targetBaseDN.toString()));
+ Map<String, String> taskAttrs = new TreeMap<>();
+ taskAttrs.put("ds-task-reset-change-number-to", newStartCN);
+ taskAttrs.put("ds-task-reset-change-number-csn", newStartCSN);
+ taskAttrs.put("ds-task-reset-change-number-base-dn", targetBaseDN.toString());
+ String taskDN = createServerTask(ctxDest,
+ "ds-task-reset-change-number", "org.opends.server.tasks.ResetChangeNumberTask", "dsreplication-reset-cn",
+ taskAttrs);
+ waitUntilResetChangeNumberTaskEnds(ctxDest, taskDN);
+ return SUCCESSFUL;
+ }
+ catch (ReplicationCliException | NamingException | NullPointerException e)
+ {
+ errPrintln(ERROR_RESET_CHANGE_NUMBER_EXCEPTION.get(e.getLocalizedMessage()));
+ return ERROR_RESET_CHANGE_NUMBER_PROBLEM;
+ }
+ }
+
+ private String getNewestChangeNumber(InitialLdapContext source)
+ {
+ try
+ {
+ SearchControls ctls = new SearchControls();
+ ctls.setSearchScope(SearchControls.OBJECT_SCOPE);
+ ctls.setReturningAttributes(new String[] {"lastChangeNumber"});
+ NamingEnumeration<SearchResult> results = source.search(new LdapName(""), "objectclass=*", ctls);
+ if (results.hasMore()) {
+ return getFirstValue(results.next(), "lastChangeNumber");
+ }
+ }
+ catch (NamingException e)
+ {
+ errPrintln(ERROR_RESET_CHANGE_NUMBER_EXCEPTION.get(e.getLocalizedMessage()));
+ }
+
+ return "";
+ }
+
+ private void waitUntilResetChangeNumberTaskEnds(InitialLdapContext server, String taskDN)
+ throws ReplicationCliException
+ {
+ String lastLogMsg = null;
+ while (true)
+ {
+ sleepCatchInterrupt(500);
+ try
+ {
+ SearchResult sr = getLastSearchResult(server, taskDN, "ds-task-log-message", "ds-task-state" );
+ String logMsg = getFirstValue(sr, "ds-task-log-message");
+ if (logMsg != null && !logMsg.equals(lastLogMsg))
+ {
+ logger.info(LocalizableMessage.raw(logMsg));
+ lastLogMsg = logMsg;
+ }
+ InstallerHelper helper = new InstallerHelper();
+ String state = getFirstValue(sr, "ds-task-state");
+
+ if (helper.isDone(state) || helper.isStoppedByError(state))
+ {
+ LocalizableMessage errorMsg = ERR_UNEXPECTED_DURING_TASK_WITH_LOG.get(lastLogMsg, state, server);
+
+ if (helper.isCompletedWithErrors(state))
+ {
+ logger.warn(LocalizableMessage.raw("Completed with error: " + errorMsg));
+ errPrintln(errorMsg);
+ }
+ else if (!helper.isSuccessful(state) || helper.isStoppedByError(state))
+ {
+ logger.warn(LocalizableMessage.raw("Error: " + errorMsg));
+ throw new ReplicationCliException(errorMsg, ERROR_LAUNCHING_RESET_CHANGE_NUMBER, null);
+ }
+ else
+ {
+ print(INFO_RESET_CHANGE_NUMBER_TASK_FINISHED.get());
+ println();
+ }
+ return;
+ }
+ }
+ catch (NameNotFoundException x)
+ {
+ return;
+ }
+ catch (NamingException ne)
+ {
+ throw new ReplicationCliException(getThrowableMsg(ERR_READING_SERVER_TASK_PROGRESS.get(), ne),
+ ERROR_CONNECTING, ne);
+ }
+ }
+ }
+
private InitialLdapContext createAdministrativeContext(MonoServerReplicationUserData uData)
{
final String bindDn = getAdministratorDN(uData.getAdminUid());
@@ -1411,40 +1664,22 @@
getThrowableMsg(msg, ne), code, ne);
}
}
- // Wait until it is over
- SearchControls searchControls = new SearchControls();
- searchControls.setCountLimit(1);
- searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
- searchControls.setReturningAttributes(
- new String[] {
- "ds-task-log-message",
- "ds-task-state",
- "ds-task-purge-conflicts-historical-purged-values-count",
- "ds-task-purge-conflicts-historical-purge-completed-in-time",
- "ds-task-purge-conflicts-historical-purge-completed-in-time",
- "ds-task-purge-conflicts-historical-last-purged-changenumber"
- });
- String filter = "objectclass=*";
- String lastLogMsg = null;
// Polling only makes sense when we are recurrently scheduling a task
// or the task is being executed now.
+ String lastLogMsg = null;
while (!isOver && uData.getTaskSchedule().getStartDate() == null)
{
sleepCatchInterrupt(500);
try
{
- NamingEnumeration<SearchResult> res =
- ctx.search(dn, filter, searchControls);
- SearchResult sr = null;
- try
- {
- sr = res.next();
- }
- finally
- {
- res.close();
- }
+ SearchResult sr = getFirstSearchResult(ctx, dn,
+ "ds-task-log-message",
+ "ds-task-state",
+ "ds-task-purge-conflicts-historical-purged-values-count",
+ "ds-task-purge-conflicts-historical-purge-completed-in-time",
+ "ds-task-purge-conflicts-historical-purge-completed-in-time",
+ "ds-task-purge-conflicts-historical-last-purged-changenumber");
String logMsg = getFirstValue(sr, "ds-task-log-message");
if (logMsg != null && !logMsg.equals(lastLogMsg))
{
@@ -1479,7 +1714,7 @@
}
catch (NamingException ne)
{
- LocalizableMessage msg = ERR_POOLING_PURGE_HISTORICAL.get();
+ LocalizableMessage msg = ERR_READING_SERVER_TASK_PROGRESS.get();
throw new ReplicationCliException(
getThrowableMsg(msg, ne), ERROR_CONNECTING, ne);
}
@@ -1492,14 +1727,34 @@
return returnCode;
}
+ private SearchResult getFirstSearchResult(InitialLdapContext ctx, String dn, String... returnedAttributes)
+ throws NamingException
+ {
+ SearchControls searchControls = new SearchControls();
+ searchControls.setCountLimit(1);
+ searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
+ searchControls.setReturningAttributes(returnedAttributes);
+ NamingEnumeration<SearchResult> res = ctx.search(dn, "objectclass=*", searchControls);
+ try
+ {
+ SearchResult sr = null;
+ sr = res.next();
+ return sr;
+ }
+ finally
+ {
+ res.close();
+ }
+ }
+
private LocalizableMessage getPurgeErrorMsg(String lastLogMsg, String state, InitialLdapContext ctx)
{
String server = getHostPort(ctx);
if (lastLogMsg != null)
{
- return INFO_ERROR_DURING_PURGE_HISTORICAL_LOG.get(lastLogMsg, state, server);
+ return ERR_UNEXPECTED_DURING_TASK_WITH_LOG.get(lastLogMsg, state, server);
}
- return INFO_ERROR_DURING_PURGE_HISTORICAL_NO_LOG.get(state, server);
+ return ERR_UNEXPECTED_DURING_TASK_NO_LOG.get(state, server);
}
/**
@@ -1556,7 +1811,7 @@
}
replica.setBackendName(backend.getBackendID());
replica.setSuffix(suffix);
- suffix.setReplicas(Collections.singleton(replica));
+ suffix.setReplicas(singleton(replica));
replicas.add(replica);
}
@@ -1685,23 +1940,32 @@
*/
private ReplicationCliReturnCode initializeReplication()
{
- InitializeReplicationUserData uData = new InitializeReplicationUserData();
- if (argParser.isInteractive())
- {
- if (promptIfRequired(uData))
- {
- return initializeReplication(uData);
- }
- else
- {
- return USER_CANCELLED;
- }
- }
- else
+ SourceDestinationServerUserData uData = new SourceDestinationServerUserData();
+ if (!argParser.isInteractive())
{
initializeWithArgParser(uData);
return initializeReplication(uData);
}
+
+ OperationBetweenSourceAndDestinationServers
+ initializeReplicationOperations = new OperationBetweenSourceAndDestinationServers()
+ {
+ @Override
+ public boolean continueAfterUserInput(Collection<String> baseDNs, InitialLdapContext source,
+ InitialLdapContext dest, boolean interactive)
+ {
+ checkSuffixesForInitializeReplication(baseDNs, source, dest, interactive);
+ return baseDNs.isEmpty();
+ }
+
+ @Override
+ public boolean confirmOperation(SourceDestinationServerUserData uData, InitialLdapContext ctxSource,
+ InitialLdapContext ctxDestination, boolean defaultValue)
+ {
+ return !askConfirmation(getInitializeReplicationPrompt(uData, ctxSource, ctxDestination), defaultValue);
+ }
+ };
+ return promptIfRequired(uData, initializeReplicationOperations) ? initializeReplication(uData) : USER_CANCELLED;
}
/**
@@ -2782,7 +3046,7 @@
{
if (uData instanceof InitializeAllReplicationUserData)
{
- sourceServerCI.setHeadingMessage(INFO_REPLICATION_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
+ sourceServerCI.setHeadingMessage(INFO_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
}
sourceServerCI.run();
@@ -2871,10 +3135,12 @@
* is missing, ask the user to provide valid data.
* We assume that if this method is called we are in interactive mode.
* @param uData the object to be updated.
+ * @param serversOperations Additional processing for the command
* @return <CODE>true</CODE> if the object was successfully updated and
* <CODE>false</CODE> if the user cancelled the operation.
*/
- private boolean promptIfRequired(InitializeReplicationUserData uData)
+ private boolean promptIfRequired(SourceDestinationServerUserData uData,
+ OperationBetweenSourceAndDestinationServers serversOperations)
{
boolean cancelled = false;
@@ -2903,7 +3169,7 @@
{
try
{
- sourceServerCI.setHeadingMessage(INFO_REPLICATION_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
+ sourceServerCI.setHeadingMessage(INFO_INITIALIZE_SOURCE_CONNECTION_PARAMETERS.get());
sourceServerCI.run();
hostSource = sourceServerCI.getHostName();
portSource = sourceServerCI.getPortNumber();
@@ -2966,7 +3232,7 @@
{
try
{
- destinationServerCI.setHeadingMessage(INFO_REPLICATION_INITIALIZE_DESTINATION_CONNECTION_PARAMETERS.get());
+ destinationServerCI.setHeadingMessage(INFO_INITIALIZE_DESTINATION_CONNECTION_PARAMETERS.get());
destinationServerCI.run();
hostDestination = destinationServerCI.getHostName();
portDestination = destinationServerCI.getPortNumber();
@@ -2977,7 +3243,7 @@
{
portDestination = -1;
errPrintln();
- errPrintln(ERR_REPLICATION_INITIALIZE_SAME_SERVER_PORT.get(hostSource, portSource));
+ errPrintln(ERR_SOURCE_DESTINATION_INITIALIZE_SAME_SERVER_PORT.get(hostSource, portSource));
errPrintln();
error = true;
}
@@ -3016,17 +3282,14 @@
if (!cancelled)
{
LinkedList<String> suffixes = argParser.getBaseDNs();
- checkSuffixesForInitializeReplication(suffixes, ctxSource, ctxDestination, true);
- cancelled = suffixes.isEmpty();
-
+ cancelled = serversOperations.continueAfterUserInput(suffixes, ctxSource, ctxDestination, true);
uData.setBaseDNs(suffixes);
}
if (!cancelled)
{
- // Ask for confirmation to initialize.
println();
- cancelled = !askConfirmation(getPrompt(uData, ctxSource, ctxDestination), true);
+ cancelled = serversOperations.confirmOperation(uData, ctxSource, ctxDestination, true);
println();
}
@@ -3034,8 +3297,8 @@
return !cancelled;
}
- private LocalizableMessage getPrompt(InitializeReplicationUserData uData, InitialLdapContext ctxSource,
- InitialLdapContext ctxDestination)
+ private LocalizableMessage getInitializeReplicationPrompt(SourceDestinationServerUserData uData,
+ InitialLdapContext ctxSource, InitialLdapContext ctxDestination)
{
String hostPortSource = getHostPort(ctxSource);
String hostPortDestination = getHostPort(ctxDestination);
@@ -3138,7 +3401,7 @@
* user.
* @param uData the initialize replication user data object to be initialized.
*/
- private void initializeWithArgParser(InitializeReplicationUserData uData)
+ private void initializeWithArgParser(SourceDestinationServerUserData uData)
{
initialize(uData);
@@ -3250,8 +3513,7 @@
}
catch (Throwable t)
{
- logger.warn(LocalizableMessage.raw(
- "Unexpected error retrieving the replication port: "+t, t));
+ logger.warn(LocalizableMessage.raw("Unexpected error retrieving the replication port: " + t, t));
}
return replicationPort;
}
@@ -3547,8 +3809,7 @@
* @return a Collection containing a list of suffixes that are replicated
* (or those that can be replicated) in two servers.
*/
- private Collection<String> getCommonSuffixes(
- InitialLdapContext ctx1, InitialLdapContext ctx2, SuffixRelationType type)
+ private List<String> getCommonSuffixes(InitialLdapContext ctx1, InitialLdapContext ctx2, SuffixRelationType type)
{
LinkedList<String> suffixes = new LinkedList<>();
try
@@ -4005,11 +4266,10 @@
* @return ReplicationCliReturnCode.SUCCESSFUL if the operation was
* successful and an error code otherwise.
*/
- private ReplicationCliReturnCode initializeReplication(
- InitializeReplicationUserData uData)
+ private ReplicationCliReturnCode initializeReplication(SourceDestinationServerUserData uData)
{
- InitialLdapContext ctxSource = createAdministrativeContext(uData, true);
- InitialLdapContext ctxDestination = createAdministrativeContext(uData, false);
+ InitialLdapContext ctxSource = createAdministrativeContext(uData, uData.getSource());
+ InitialLdapContext ctxDestination = createAdministrativeContext(uData, uData.getDestination());
try
{
if (ctxSource == null || ctxDestination == null)
@@ -4056,22 +4316,19 @@
}
}
- private InitialLdapContext createAdministrativeContext(InitializeReplicationUserData uData, boolean isSource)
+ private InitialLdapContext createAdministrativeContext(SourceDestinationServerUserData uData, HostPort server)
{
- final String host = isSource ? uData.getHostNameSource() : uData.getHostNameDestination();
- final int port = isSource ? uData.getPortSource() : uData.getPortDestination();
try
{
return createAdministrativeContext(
- host, port, useSSL, useStartTLS,
+ server.getHost(), server.getPort(), useSSL, useStartTLS,
getAdministratorDN(uData.getAdminUid()), uData.getAdminPwd(),
getConnectTimeout(), getTrustManager(sourceServerCI));
}
catch (NamingException ne)
{
- final String hostPort = getServerRepresentation(host, port);
errPrintln();
- errPrintln(getMessageForException(ne, hostPort));
+ errPrintln(getMessageForException(ne, server.toString()));
logger.error(LocalizableMessage.raw("Complete error stack:"), ne);
return null;
}
@@ -6890,8 +7147,7 @@
public void progressUpdate(ProgressUpdateEvent ev)
{
LocalizableMessage newLogDetails = ev.getNewLogs();
- if (newLogDetails != null
- && !"".equals(newLogDetails.toString().trim()))
+ if (newLogDetails != null && !"".equals(newLogDetails.toString().trim()))
{
print(newLogDetails);
println();
@@ -6904,8 +7160,7 @@
{
try
{
- installer.initializeSuffix(ctxDestination, replicationId, baseDN,
- displayProgress, getHostPort(ctxSource));
+ installer.initializeSuffix(ctxDestination, replicationId, baseDN, displayProgress, getHostPort(ctxSource));
initDone = true;
}
catch (PeerNotFoundException pnfe)
@@ -7019,81 +7274,37 @@
private void postPreExternalInitialization(String baseDN,
InitialLdapContext ctx, boolean isPre) throws ReplicationCliException
{
- boolean taskCreated = false;
- int i = 1;
boolean isOver = false;
String dn = null;
- BasicAttributes attrs = new BasicAttributes();
- Attribute oc = new BasicAttribute("objectclass");
- oc.add("top");
- oc.add("ds-task");
- oc.add("ds-task-reset-generation-id");
- attrs.put(oc);
- attrs.put("ds-task-class-name",
- "org.opends.server.tasks.SetGenerationIdTask");
+ Map<String, String> attrMap = new TreeMap<>();
if (isPre)
{
- attrs.put("ds-task-reset-generation-id-new-value", "-1");
+ attrMap.put("ds-task-reset-generation-id-new-value", "-1");
}
- attrs.put("ds-task-reset-generation-id-domain-base-dn", baseDN);
- while (!taskCreated)
+ attrMap.put("ds-task-reset-generation-id-domain-base-dn", baseDN);
+
+ try {
+ dn = createServerTask(ctx, "ds-task-reset-generation-id", "org.opends.server.tasks.SetGenerationIdTask",
+ "dsreplication-reset-generation-id", attrMap);
+ }
+ catch (NamingException ne)
{
- String id = "dsreplication-reset-generation-id-"+i;
- dn = "ds-task-id="+id+",cn=Scheduled Tasks,cn=Tasks";
- attrs.put("ds-task-id", id);
- try
- {
- DirContext dirCtx = ctx.createSubcontext(dn, attrs);
- taskCreated = true;
- logger.info(LocalizableMessage.raw("created task entry: "+attrs));
- dirCtx.close();
- }
- catch (NameAlreadyBoundException x)
- {
- }
- catch (NamingException ne)
- {
- logger.error(LocalizableMessage.raw("Error creating task "+attrs, ne));
- LocalizableMessage msg = isPre ?
- ERR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION.get():
+ LocalizableMessage msg = isPre ?
+ ERR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION.get():
ERR_LAUNCHING_POST_EXTERNAL_INITIALIZATION.get();
- ReplicationCliReturnCode code = isPre?
- ERROR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION:
- ERROR_LAUNCHING_POST_EXTERNAL_INITIALIZATION;
- throw new ReplicationCliException(
- getThrowableMsg(msg, ne), code, ne);
- }
- i++;
+ ReplicationCliReturnCode code = isPre?
+ ERROR_LAUNCHING_PRE_EXTERNAL_INITIALIZATION:
+ ERROR_LAUNCHING_POST_EXTERNAL_INITIALIZATION;
+ throw new ReplicationCliException(getThrowableMsg(msg, ne), code, ne);
}
- // Wait until it is over
- SearchControls searchControls = new SearchControls();
- searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
- searchControls.setReturningAttributes(
- new String[] {
- "ds-task-log-message",
- "ds-task-state"
- });
- String filter = "objectclass=*";
+
String lastLogMsg = null;
while (!isOver)
{
sleepCatchInterrupt(500);
try
{
- NamingEnumeration<SearchResult> res =
- ctx.search(dn, filter, searchControls);
- SearchResult sr = null;
- try
- {
- while (res.hasMore())
- {
- sr = res.next();
- }
- }
- finally
- {
- res.close();
- }
+ SearchResult sr = getLastSearchResult(ctx, dn, "ds-task-log-message", "ds-task-state");
String logMsg = getFirstValue(sr, "ds-task-log-message");
if (logMsg != null && !logMsg.equals(lastLogMsg))
{
@@ -7106,7 +7317,7 @@
if (helper.isDone(state) || helper.isStoppedByError(state))
{
isOver = true;
- LocalizableMessage errorMsg = getPrePostErrorMsg(isPre, lastLogMsg, state, ctx);
+ LocalizableMessage errorMsg = getPrePostErrorMsg(lastLogMsg, state, ctx);
if (helper.isCompletedWithErrors(state))
{
@@ -7130,27 +7341,20 @@
}
catch (NamingException ne)
{
- LocalizableMessage msg = isPre ?
- ERR_POOLING_PRE_EXTERNAL_INITIALIZATION.get():
- ERR_POOLING_POST_EXTERNAL_INITIALIZATION.get();
- throw new ReplicationCliException(
- getThrowableMsg(msg, ne), ERROR_CONNECTING, ne);
+ throw new ReplicationCliException(getThrowableMsg(ERR_READING_SERVER_TASK_PROGRESS.get(), ne),
+ ERROR_CONNECTING, ne);
}
}
}
- private LocalizableMessage getPrePostErrorMsg(boolean isPre, String lastLogMsg, String state, InitialLdapContext ctx)
+ private LocalizableMessage getPrePostErrorMsg(String lastLogMsg, String state, InitialLdapContext ctx)
{
String server = getHostPort(ctx);
if (lastLogMsg != null)
{
- return isPre
- ? INFO_ERROR_DURING_PRE_EXTERNAL_INITIALIZATION_LOG.get(lastLogMsg, state, server)
- : INFO_ERROR_DURING_POST_EXTERNAL_INITIALIZATION_LOG.get(lastLogMsg, state, server);
+ return ERR_UNEXPECTED_DURING_TASK_WITH_LOG.get(lastLogMsg, state, server);
}
- return isPre
- ? INFO_ERROR_DURING_PRE_EXTERNAL_INITIALIZATION_NO_LOG.get(state, server)
- : INFO_ERROR_DURING_POST_EXTERNAL_INITIALIZATION_NO_LOG.get(state, server);
+ return ERR_UNEXPECTED_DURING_TASK_NO_LOG.get(state, server);
}
private void sleepCatchInterrupt(long millis)
@@ -7179,58 +7383,23 @@
boolean displayProgress)
throws ClientException, PeerNotFoundException
{
- boolean taskCreated = false;
- int i = 1;
boolean isOver = false;
String dn = null;
String serverDisplay = getHostPort(ctx);
- BasicAttributes attrs = new BasicAttributes();
- Attribute oc = new BasicAttribute("objectclass");
- oc.add("top");
- oc.add("ds-task");
- oc.add("ds-task-initialize-remote-replica");
- attrs.put(oc);
- attrs.put("ds-task-class-name",
- "org.opends.server.tasks.InitializeTargetTask");
- attrs.put("ds-task-initialize-domain-dn", baseDN);
- attrs.put("ds-task-initialize-replica-server-id", "all");
- while (!taskCreated)
+ Map<String, String> attrsMap = new TreeMap<>();
+ attrsMap.put("ds-task-initialize-domain-dn", baseDN);
+ attrsMap.put("ds-task-initialize-replica-server-id", "all");
+ try
{
- String id = "dsreplication-initialize"+i;
- dn = "ds-task-id="+id+",cn=Scheduled Tasks,cn=Tasks";
- attrs.put("ds-task-id", id);
- try
- {
- DirContext dirCtx = ctx.createSubcontext(dn, attrs);
- taskCreated = true;
- logger.info(LocalizableMessage.raw("created task entry: "+attrs));
- dirCtx.close();
- }
- catch (NameAlreadyBoundException x)
- {
- logger.warn(LocalizableMessage.raw("A task with dn: "+dn+" already existed."));
- }
- catch (NamingException ne)
- {
- logger.error(LocalizableMessage.raw("Error creating task "+attrs, ne));
- throw new ClientException(
- ReturnCode.APPLICATION_ERROR,
- getThrowableMsg(INFO_ERROR_LAUNCHING_INITIALIZATION.get(
- serverDisplay), ne), ne);
- }
- i++;
+ dn = createServerTask(ctx, "ds-task-initialize-remote-replica", "org.opends.server.tasks.InitializeTargetTask",
+ "dsreplication-initialize", attrsMap);
}
- // Wait until it is over
- SearchControls searchControls = new SearchControls();
- searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
- searchControls.setReturningAttributes(
- new String[] {
- "ds-task-unprocessed-entry-count",
- "ds-task-processed-entry-count",
- "ds-task-log-message",
- "ds-task-state"
- });
- String filter = "objectclass=*";
+ catch (NamingException ne)
+ {
+ throw new ClientException(ReturnCode.APPLICATION_ERROR,
+ getThrowableMsg(INFO_ERROR_LAUNCHING_INITIALIZATION.get(serverDisplay), ne), ne);
+ }
+
LocalizableMessage lastDisplayedMsg = null;
String lastLogMsg = null;
long lastTimeMsgDisplayed = -1;
@@ -7241,23 +7410,10 @@
sleepCatchInterrupt(500);
try
{
- NamingEnumeration<SearchResult> res =
- ctx.search(dn, filter, searchControls);
- SearchResult sr = null;
- try
- {
- while (res.hasMore())
- {
- sr = res.next();
- }
- }
- finally
- {
- res.close();
- }
+ SearchResult sr = getLastSearchResult(ctx, dn, "ds-task-unprocessed-entry-count",
+ "ds-task-processed-entry-count", "ds-task-log-message", "ds-task-state" );
- // Get the number of entries that have been handled and
- // a percentage...
+ // Get the number of entries that have been handled and a percentage...
String sProcessed = getFirstValue(sr, "ds-task-processed-entry-count");
String sUnprocessed = getFirstValue(sr, "ds-task-unprocessed-entry-count");
long processed = -1;
@@ -7376,6 +7532,65 @@
}
}
+ private SearchResult getLastSearchResult(InitialLdapContext ctx, String dn, String... returnedAttributes)
+ throws NamingException
+ {
+ SearchControls searchControls = new SearchControls();
+ searchControls.setSearchScope(SearchControls.OBJECT_SCOPE);
+ searchControls.setReturningAttributes(returnedAttributes);
+ NamingEnumeration<SearchResult> res = ctx.search(dn, "objectclass=*", searchControls);
+ try
+ {
+ SearchResult sr = null;
+ while (res.hasMore())
+ {
+ sr = res.next();
+ }
+ return sr;
+ }
+ finally
+ {
+ res.close();
+ }
+ }
+
+ private String createServerTask(InitialLdapContext ctx, String taskObjectclass,
+ String taskJavaClass, String taskID, Map<String, String> taskAttrs) throws NamingException
+ {
+ int i = 1;
+ String dn = "";
+ BasicAttributes attrs = new BasicAttributes();
+ attrs.put("objectclass", taskObjectclass);
+ attrs.put("ds-task-class-name", taskJavaClass);
+ for (Map.Entry<String, String> attr : taskAttrs.entrySet())
+ {
+ attrs.put(attr.getKey(), attr.getValue());
+ }
+
+ while (true)
+ {
+ String id = taskID + "-" + i;
+ dn = "ds-task-id=" + id + ",cn=Scheduled Tasks,cn=Tasks";
+ try
+ {
+ DirContext dirCtx = ctx.createSubcontext(dn, attrs);
+ logger.info(LocalizableMessage.raw("created task entry: " + attrs));
+ dirCtx.close();
+ return dn;
+ }
+ catch (NameAlreadyBoundException x)
+ {
+ logger.warn(LocalizableMessage.raw("A task with dn: " + dn + " already existed."));
+ }
+ catch (NamingException ne)
+ {
+ logger.error(LocalizableMessage.raw("Error creating task " + attrs, ne));
+ throw ne;
+ }
+ i++;
+ }
+ }
+
private LocalizableMessage getInitializeAllErrorMsg(String serverDisplay, String lastLogMsg, String state)
{
if (lastLogMsg != null)
@@ -7497,8 +7712,7 @@
if (areDnsEqual(domain.getBaseDN().toString(), baseDN))
{
print(formatter.getFormattedWithPoints(
- INFO_REPLICATION_REMOVING_REFERENCES_ON_REMOTE.get(baseDN,
- hostPort)));
+ INFO_REPLICATION_REMOVING_REFERENCES_ON_REMOTE.get(baseDN, hostPort)));
Set<String> replServers = domain.getReplicationServer();
if (replServers != null)
{
@@ -8199,19 +8413,18 @@
{
String commandName = getCommandName();
- CommandBuilder commandBuilder =
- new CommandBuilder(commandName, subcommandName);
+ CommandBuilder commandBuilder = new CommandBuilder(commandName, subcommandName);
if (ENABLE_REPLICATION_SUBCMD_NAME.equals(subcommandName))
{
// All the arguments for enable replication are update here.
updateCommandBuilder(commandBuilder, (EnableReplicationUserData)uData);
}
- else if (INITIALIZE_REPLICATION_SUBCMD_NAME.equals(subcommandName))
+ else if (INITIALIZE_REPLICATION_SUBCMD_NAME.equals(subcommandName) ||
+ RESET_CHANGE_NUMBER_SUBCMD_NAME.equals(subcommandName))
{
// All the arguments for initialize replication are update here.
- updateCommandBuilder(commandBuilder,
- (InitializeReplicationUserData)uData);
+ updateCommandBuilder(commandBuilder, (SourceDestinationServerUserData)uData);
}
else if (PURGE_HISTORICAL_SUBCMD_NAME.equals(subcommandName))
{
@@ -8315,8 +8528,7 @@
commandBuilder, taskSchedule);
}
- private void addGlobalArguments(CommandBuilder commandBuilder,
- ReplicationUserData uData)
+ private void addGlobalArguments(CommandBuilder commandBuilder, ReplicationUserData uData)
throws ArgumentException
{
List<String> baseDNs = uData.getBaseDNs();
@@ -8331,6 +8543,11 @@
}
commandBuilder.addArgument(baseDNsArg);
+ if (argParser.resetChangeNumber.isPresent())
+ {
+ commandBuilder.addArgument(argParser.resetChangeNumber);
+ }
+
// Try to find some arguments and put them at the end.
String[] identifiersToMove ={
OPTION_LONG_ADMIN_UID,
@@ -8548,8 +8765,8 @@
{
if (OPTION_LONG_HOST.equals(arg.getLongIdentifier()))
{
- commandBuilder.addArgument(getHostArg("host2", 'O', server2.getHostName(),
- INFO_DESCRIPTION_ENABLE_REPLICATION_HOST2));
+ commandBuilder.addArgument(
+ getHostArg("host2", 'O', server2.getHostName(), INFO_DESCRIPTION_ENABLE_REPLICATION_HOST2));
}
else if (OPTION_LONG_PORT.equals(arg.getLongIdentifier()))
{
@@ -8714,7 +8931,7 @@
"usesecondserverasschemasource", null,
"useSecondServerAsSchemaSource",
INFO_DESCRIPTION_ENABLE_REPLICATION_USE_SECOND_AS_SCHEMA_SOURCE.get(
- "--"+argParser.noSchemaReplicationArg.getLongIdentifier())));
+ "--" + argParser.noSchemaReplicationArg.getLongIdentifier())));
}
}
@@ -8794,7 +9011,7 @@
}
private void updateCommandBuilder(CommandBuilder commandBuilder,
- InitializeReplicationUserData uData)
+ SourceDestinationServerUserData uData)
throws ArgumentException
{
// Update the arguments used in the console interaction with the
@@ -9182,8 +9399,8 @@
String hostPortDestination = getHostPort(ctxDestination);
if (isInteractive())
{
- LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_CONFIRMATION.get(
- hostPortSource, hostPortDestination, hostPortSource, hostPortDestination);
+ LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_CONFIRMATION.get(hostPortSource,
+ hostPortDestination, hostPortSource, hostPortDestination);
if (!askConfirmation(msg, true))
{
throw new ReplicationCliException(ERR_REPLICATION_USER_CANCELLED.get(), USER_CANCELLED, null);
@@ -9191,8 +9408,8 @@
}
else
{
- LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_DESCRIPTION.get(
- hostPortSource, hostPortDestination, hostPortSource, hostPortDestination);
+ LocalizableMessage msg = INFO_REPLICATION_MERGING_REGISTRIES_DESCRIPTION.get(hostPortSource,
+ hostPortDestination, hostPortSource, hostPortDestination);
println(msg);
println();
}
@@ -9234,8 +9451,7 @@
if (!commonRepServerIDErrors.isEmpty())
{
mb.append(ERR_REPLICATION_ENABLE_COMMON_REPLICATION_SERVER_ID.get(
- getMessageFromCollection(commonRepServerIDErrors,
- Constants.LINE_SEPARATOR)));
+ getMessageFromCollection(commonRepServerIDErrors, Constants.LINE_SEPARATOR)));
}
if (!commonDomainIDErrors.isEmpty())
{
@@ -9244,8 +9460,7 @@
mb.append(Constants.LINE_SEPARATOR);
}
mb.append(ERR_REPLICATION_ENABLE_COMMON_DOMAIN_ID.get(
- getMessageFromCollection(commonDomainIDErrors,
- Constants.LINE_SEPARATOR)));
+ getMessageFromCollection(commonDomainIDErrors, Constants.LINE_SEPARATOR)));
}
throw new ReplicationCliException(mb.toMessage(),
REPLICATION_ADS_MERGE_NOT_SUPPORTED, null);
@@ -9317,8 +9532,8 @@
catch (Throwable t)
{
logger.error(LocalizableMessage.raw("Error seeding truststore: "+t, t));
- LocalizableMessage msg = ERR_REPLICATION_ENABLE_SEEDING_TRUSTSTORE.get(
- getHostPort(adsCtx2.getDirContext()), getHostPort(adsCtx1.getDirContext()), toString(t));
+ LocalizableMessage msg = ERR_REPLICATION_ENABLE_SEEDING_TRUSTSTORE.get(getHostPort(adsCtx2.getDirContext()),
+ getHostPort(adsCtx1.getDirContext()), toString(t));
throw new ReplicationCliException(msg, ERROR_SEEDING_TRUSTORE, t);
}
pointAdder.stop();
@@ -9375,8 +9590,8 @@
if (server2.isReplicationServer() && server2.getReplicationServerId() == replicationID1
&& !server2.getReplicationServerHostPort().equalsIgnoreCase(replServerHostPort1))
{
- commonRepServerIDErrors.add(ERR_REPLICATION_ENABLE_COMMON_REPLICATION_SERVER_ID_ARG.get(serverToFind
- .getHostPort(true), server2.getHostPort(true), replicationID1));
+ commonRepServerIDErrors.add(ERR_REPLICATION_ENABLE_COMMON_REPLICATION_SERVER_ID_ARG.get(
+ serverToFind.getHostPort(true), server2.getHostPort(true), replicationID1));
return true;
}
}
@@ -9399,12 +9614,8 @@
&& domain1Id == replica2.getReplicationId())
{
commonDomainIDErrors.add(
- ERR_REPLICATION_ENABLE_COMMON_DOMAIN_ID_ARG.get(
- replica1.getServer().getHostPort(true),
- suffix1DN,
- replica2.getServer().getHostPort(true),
- suffix2.getDN(),
- domain1Id));
+ ERR_REPLICATION_ENABLE_COMMON_DOMAIN_ID_ARG.get(replica1.getServer().getHostPort(true), suffix1DN,
+ replica2.getServer().getHostPort(true), suffix2.getDN(), domain1Id));
return true;
}
}
@@ -9507,11 +9718,8 @@
private boolean displayLogFileAtEnd(String subCommand)
{
- final List<String> subCommands = Arrays.asList(
- ENABLE_REPLICATION_SUBCMD_NAME,
- DISABLE_REPLICATION_SUBCMD_NAME,
- INITIALIZE_ALL_REPLICATION_SUBCMD_NAME,
- INITIALIZE_REPLICATION_SUBCMD_NAME);
+ final List<String> subCommands = Arrays.asList(ENABLE_REPLICATION_SUBCMD_NAME, DISABLE_REPLICATION_SUBCMD_NAME,
+ INITIALIZE_ALL_REPLICATION_SUBCMD_NAME, INITIALIZE_REPLICATION_SUBCMD_NAME, RESET_CHANGE_NUMBER_SUBCMD_NAME);
return subCommands.contains(subCommand);
}
--
Gitblit v1.10.0