mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
15.43.2010 3a5116b0fb968711531c50d243d94d2cdfd664be
Enhance replication conflict attributes comparator, enable/disable replication domain, and some error message for diagnostic
4 files modified
210 ■■■■ changed files
opends/src/messages/messages/replication.properties 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 17 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 188 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -87,7 +87,7 @@
 replication server that has seen all the local changes on suffix %s. Found %d \
replications server(s) not up to date. Going to replay changes
NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \
 server on suffix %s, retrying...
 server on suffix %s among the following RS candidates %s, retrying...
NOTICE_EXCEPTION_CLOSING_DATABASE_24=Error closing Replication Server database \
 %s :
SEVERE_ERR_EXCEPTION_DECODING_OPERATION_25=Error trying to replay %s, \
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -2538,7 +2538,7 @@
       AttributeType attrType =
         DirectoryServer.getAttributeType(DS_SYNC_CONFLICT, true);
       Attribute attr = Attributes.create(attrType, AttributeValues.create(
           attrType, targetDN.toString()));
           attrType, targetDN.toNormalizedString()));
       Modification mod = new Modification(ModificationType.REPLACE, attr);
       newOp.addModification(mod);
     }
@@ -3413,7 +3413,7 @@
    AttributeType attrType = DirectoryServer.getAttributeType(DS_SYNC_CONFLICT,
        true);
    Attribute attr = Attributes.create(attrType, AttributeValues.create(
        attrType, conflictDN.toString()));
        attrType, conflictDN.toNormalizedString()));
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
@@ -3458,14 +3458,23 @@
   */
  private void addConflict(AddMsg msg) throws ASN1Exception
  {
    String normalizedDN;
    try
    {
      normalizedDN = DN.decode(msg.getDn()).toNormalizedString();
    } catch (DirectoryException e)
    {
      normalizedDN = msg.getDn();
    }
    // Generate an alert to let the administrator know that some
    // conflict could not be solved.
    Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(msg.getDn());
    Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(normalizedDN);
    DirectoryServer.sendAlertNotification(this,
        ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
    // Add the conflict attribute
    msg.addAttribute(DS_SYNC_CONFLICT, msg.getDn());
    msg.addAttribute(DS_SYNC_CONFLICT, normalizedDN);
  }
  /**
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -865,6 +865,7 @@
      // Get info from every available replication servers
      replicationServerInfos = collectReplicationServersInfo();
      String rsis = replicationServerInfos.toString();
      ReplicationServerInfo replicationServerInfo = null;
@@ -1057,7 +1058,7 @@
          connectionError = true;
          connectPhaseLock.notify();
          Message message =
            NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
            NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString(), rsis);
          logError(message);
        }
      }
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -32,40 +32,19 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.common.StatusMachine.*;
import org.opends.server.replication.common.ChangeNumberGenerator;
import java.io.BufferedOutputStream;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.types.Attribute;
import org.opends.server.core.DirectoryServer;
import java.util.Set;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import java.util.HashMap;
import java.util.Map;
import org.opends.server.config.ConfigException;
import java.util.Collection;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -77,9 +56,14 @@
import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
@@ -90,15 +74,20 @@
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.types.Attribute;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
@@ -229,7 +218,7 @@
  private byte groupId = (byte)1;
  // Referrals urls to be published to other servers of the topology
  // TODO: fill that with all currently opened urls if no urls configured
  private List<String> refUrls = new ArrayList<String>();
  private final List<String> refUrls = new ArrayList<String>();
  /**
   * A set of counters used for Monitoring.
@@ -320,6 +309,12 @@
  Set<String> crossServersECLIncludes = new HashSet<String>();
  /**
   * An object used to protect the initialization of the underlying broker
   * session of this ReplicationDomain.
   */
  private final Object sessionLock = new Object();
  /**
   * Returns the {@link ChangeNumberGenerator} that will be used to
   * generate {@link ChangeNumber} for this domain.
   *
@@ -1065,8 +1060,8 @@
  private class ExportThread extends DirectoryThread
  {
    // Id of server that will be initialized
    private int serverToInitialize;
    private int initWindow;
    private final int serverToInitialize;
    private final int initWindow;
    /**
     * Constructor for the ExportThread.
@@ -1153,7 +1148,8 @@
    // Flow control during initialization
    // - for each remote server, counter of messages received
    private HashMap<Integer, Integer> ackVals = new HashMap<Integer, Integer>();
    private final HashMap<Integer, Integer> ackVals =
      new HashMap<Integer, Integer>();
    // - serverId of the slowest server (the one with the smallest non null
    //   counter)
    private int slowestServerId = -1;
@@ -3016,27 +3012,30 @@
      long heartbeatInterval, long changetimeHeartbeatInterval)
  throws ConfigException
  {
    if (broker == null)
    synchronized (sessionLock)
    {
      /*
       * create the broker object used to publish and receive changes
       */
      broker = new ReplicationBroker(
          this, state, serviceID,
          serverID, window,
          getGenerationID(),
          heartbeatInterval,
          new ReplSessionSecurity(),
          getGroupId(),
          changetimeHeartbeatInterval);
      if (broker == null)
      {
        /*
         * create the broker object used to publish and receive changes
         */
        broker = new ReplicationBroker(
            this, state, serviceID,
            serverID, window,
            getGenerationID(),
            heartbeatInterval,
            new ReplSessionSecurity(),
            getGroupId(),
            changetimeHeartbeatInterval);
      broker.start(replicationServers);
        broker.start(replicationServers);
      /*
       * Create a replication monitor object responsible for publishing
       * monitoring information below cn=monitor.
       */
      monitor = new ReplicationMonitor(this);
        /*
         * Create a replication monitor object responsible for publishing
         * monitoring information below cn=monitor.
         */
        monitor = new ReplicationMonitor(this);
      }
      DirectoryServer.registerMonitorProvider(monitor);
    }
@@ -3060,29 +3059,32 @@
      long heartbeatInterval)
  throws ConfigException
  {
    if (broker == null)
    synchronized (sessionLock)
    {
      /*
       * create the broker object used to publish and receive changes
       */
      broker = new ReplicationBroker(
          this, state, serviceID,
          serverID, window,
          getGenerationID(),
          heartbeatInterval,
          new ReplSessionSecurity(),
          getGroupId(),
          0); // change time heartbeat is disabled
      if (broker == null)
      {
        /*
         * create the broker object used to publish and receive changes
         */
        broker = new ReplicationBroker(
            this, state, serviceID,
            serverID, window,
            getGenerationID(),
            heartbeatInterval,
            new ReplSessionSecurity(),
            getGroupId(),
            0); // change time heartbeat is disabled
      broker.start(replicationServers);
        broker.start(replicationServers);
      /*
       * Create a replication monitor object responsible for publishing
       * monitoring information below cn=monitor.
       */
      monitor = new ReplicationMonitor(this);
        /*
         * Create a replication monitor object responsible for publishing
         * monitoring information below cn=monitor.
         */
        monitor = new ReplicationMonitor(this);
      DirectoryServer.registerMonitorProvider(monitor);
        DirectoryServer.registerMonitorProvider(monitor);
      }
    }
  }
@@ -3098,10 +3100,13 @@
   */
  public void startListenService()
  {
    //
    // Create the listener thread
    listenerThread = new ListenerThread(this);
    listenerThread.start();
    synchronized (sessionLock)
    {
      //
      // Create the listener thread
      listenerThread = new ListenerThread(this);
      listenerThread.start();
    }
  }
  /**
@@ -3116,21 +3121,23 @@
   */
  public void disableService()
  {
    // Stop the listener thread
    if (listenerThread != null)
    synchronized (sessionLock)
    {
      listenerThread.shutdown();
      // Stop the listener thread
      if (listenerThread != null)
      {
        listenerThread.shutdown();
      }
      if (broker != null)
      {
        broker.stop();
      }
      // Wait for the listener thread to stop
      if (listenerThread != null)
        listenerThread.waitForShutdown();
    }
    if (broker != null)
    {
      broker.stop();
    }
    // Wait for the listener thread to stop
    if (listenerThread != null)
      listenerThread.waitForShutdown();
  }
  /**
@@ -3147,11 +3154,14 @@
   */
  public void enableService()
  {
    broker.start();
    synchronized (sessionLock)
    {
      broker.start();
    // Create the listener thread
    listenerThread = new ListenerThread(this);
    listenerThread.start();
      // Create the listener thread
      listenerThread = new ListenerThread(this);
      listenerThread.start();
    }
  }
  /**