mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
15.43.2010 3a5116b0fb968711531c50d243d94d2cdfd664be
Enhance replication conflict attributes comparator, enable/disable replication domain, and some error message for diagnostic
4 files modified
96 ■■■■■ changed files
opends/src/messages/messages/replication.properties 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 17 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 74 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -87,7 +87,7 @@
 replication server that has seen all the local changes on suffix %s. Found %d \
replications server(s) not up to date. Going to replay changes
NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \
 server on suffix %s, retrying...
 server on suffix %s among the following RS candidates %s, retrying...
NOTICE_EXCEPTION_CLOSING_DATABASE_24=Error closing Replication Server database \
 %s :
SEVERE_ERR_EXCEPTION_DECODING_OPERATION_25=Error trying to replay %s, \
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -2538,7 +2538,7 @@
       AttributeType attrType =
         DirectoryServer.getAttributeType(DS_SYNC_CONFLICT, true);
       Attribute attr = Attributes.create(attrType, AttributeValues.create(
           attrType, targetDN.toString()));
           attrType, targetDN.toNormalizedString()));
       Modification mod = new Modification(ModificationType.REPLACE, attr);
       newOp.addModification(mod);
     }
@@ -3413,7 +3413,7 @@
    AttributeType attrType = DirectoryServer.getAttributeType(DS_SYNC_CONFLICT,
        true);
    Attribute attr = Attributes.create(attrType, AttributeValues.create(
        attrType, conflictDN.toString()));
        attrType, conflictDN.toNormalizedString()));
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
@@ -3458,14 +3458,23 @@
   */
  private void addConflict(AddMsg msg) throws ASN1Exception
  {
    String normalizedDN;
    try
    {
      normalizedDN = DN.decode(msg.getDn()).toNormalizedString();
    } catch (DirectoryException e)
    {
      normalizedDN = msg.getDn();
    }
    // Generate an alert to let the administrator know that some
    // conflict could not be solved.
    Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(msg.getDn());
    Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(normalizedDN);
    DirectoryServer.sendAlertNotification(this,
        ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
    // Add the conflict attribute
    msg.addAttribute(DS_SYNC_CONFLICT, msg.getDn());
    msg.addAttribute(DS_SYNC_CONFLICT, normalizedDN);
  }
  /**
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -865,6 +865,7 @@
      // Get info from every available replication servers
      replicationServerInfos = collectReplicationServersInfo();
      String rsis = replicationServerInfos.toString();
      ReplicationServerInfo replicationServerInfo = null;
@@ -1057,7 +1058,7 @@
          connectionError = true;
          connectPhaseLock.notify();
          Message message =
            NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
            NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString(), rsis);
          logError(message);
        }
      }
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -32,40 +32,19 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.common.StatusMachine.*;
import org.opends.server.replication.common.ChangeNumberGenerator;
import java.io.BufferedOutputStream;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.types.Attribute;
import org.opends.server.core.DirectoryServer;
import java.util.Set;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import java.util.HashMap;
import java.util.Map;
import org.opends.server.config.ConfigException;
import java.util.Collection;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -77,9 +56,14 @@
import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
@@ -90,15 +74,20 @@
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.types.Attribute;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
@@ -229,7 +218,7 @@
  private byte groupId = (byte)1;
  // Referrals urls to be published to other servers of the topology
  // TODO: fill that with all currently opened urls if no urls configured
  private List<String> refUrls = new ArrayList<String>();
  private final List<String> refUrls = new ArrayList<String>();
  /**
   * A set of counters used for Monitoring.
@@ -320,6 +309,12 @@
  Set<String> crossServersECLIncludes = new HashSet<String>();
  /**
   * An object used to protect the initialization of the underlying broker
   * session of this ReplicationDomain.
   */
  private final Object sessionLock = new Object();
  /**
   * Returns the {@link ChangeNumberGenerator} that will be used to
   * generate {@link ChangeNumber} for this domain.
   *
@@ -1065,8 +1060,8 @@
  private class ExportThread extends DirectoryThread
  {
    // Id of server that will be initialized
    private int serverToInitialize;
    private int initWindow;
    private final int serverToInitialize;
    private final int initWindow;
    /**
     * Constructor for the ExportThread.
@@ -1153,7 +1148,8 @@
    // Flow control during initialization
    // - for each remote server, counter of messages received
    private HashMap<Integer, Integer> ackVals = new HashMap<Integer, Integer>();
    private final HashMap<Integer, Integer> ackVals =
      new HashMap<Integer, Integer>();
    // - serverId of the slowest server (the one with the smallest non null
    //   counter)
    private int slowestServerId = -1;
@@ -3016,6 +3012,8 @@
      long heartbeatInterval, long changetimeHeartbeatInterval)
  throws ConfigException
  {
    synchronized (sessionLock)
    {
    if (broker == null)
    {
      /*
@@ -3037,6 +3035,7 @@
       * monitoring information below cn=monitor.
       */
      monitor = new ReplicationMonitor(this);
      }
      DirectoryServer.registerMonitorProvider(monitor);
    }
@@ -3060,6 +3059,8 @@
      long heartbeatInterval)
  throws ConfigException
  {
    synchronized (sessionLock)
    {
    if (broker == null)
    {
      /*
@@ -3085,6 +3086,7 @@
      DirectoryServer.registerMonitorProvider(monitor);
    }
  }
  }
  /**
   * Starts the receiver side of the Replication Service.
@@ -3098,11 +3100,14 @@
   */
  public void startListenService()
  {
    synchronized (sessionLock)
    {
    //
    // Create the listener thread
    listenerThread = new ListenerThread(this);
    listenerThread.start();
  }
  }
  /**
   * Temporarily disable the Replication Service.
@@ -3116,6 +3121,8 @@
   */
  public void disableService()
  {
    synchronized (sessionLock)
    {
    // Stop the listener thread
    if (listenerThread != null)
    {
@@ -3130,7 +3137,7 @@
    // Wait for the listener thread to stop
    if (listenerThread != null)
      listenerThread.waitForShutdown();
    }
  }
  /**
@@ -3147,12 +3154,15 @@
   */
  public void enableService()
  {
    synchronized (sessionLock)
    {
    broker.start();
    // Create the listener thread
    listenerThread = new ListenerThread(this);
    listenerThread.start();
  }
  }
  /**
   * Definitively stops the Replication Service.