| File was renamed from opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java |
| | |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | package org.opends.server.replication.service; |
| | | |
| | | import org.opends.server.replication.protocol.HeartbeatMonitor; |
| | | |
| | | import org.opends.messages.*; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | |
| | | import java.util.Collection; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchListener; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DereferencePolicy; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.types.SearchResultEntry; |
| | | import org.opends.server.types.SearchResultReference; |
| | | import org.opends.server.types.SearchScope; |
| | | |
| | | /** |
| | | * The broker for Multi-master Replication. |
| | | */ |
| | | public class ReplicationBroker implements InternalSearchListener |
| | | public class ReplicationBroker |
| | | { |
| | | |
| | | /** |
| | |
| | | private Collection<String> servers; |
| | | private boolean connected = false; |
| | | private String replicationServer = "Not connected"; |
| | | private TreeSet<FakeOperation> replayOperations; |
| | | private ProtocolSession session = null; |
| | | private final ServerState state; |
| | | private final DN baseDn; |
| | | private final String baseDn; |
| | | private final short serverId; |
| | | private int maxSendDelay; |
| | | private int maxReceiveDelay; |
| | | private int maxSendQueue; |
| | | private int maxReceiveQueue; |
| | | private Semaphore sendWindow; |
| | | private int maxSendWindow; |
| | | private int rcvWindow; |
| | | private int halfRcvWindow; |
| | | private int maxRcvWindow; |
| | | private int rcvWindow = 100; |
| | | private int halfRcvWindow = rcvWindow/2; |
| | | private int maxRcvWindow = rcvWindow; |
| | | private int timeout = 0; |
| | | private short protocolVersion; |
| | | private long generationId = -1; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | // My group id |
| | | private byte groupId = (byte) -1; |
| | |
| | | // The server URL of the RS we are connected to |
| | | private String rsServerUrl = null; |
| | | // Our replication domain |
| | | private ReplicationDomain replicationDomain = null; |
| | | private ReplicationDomain domain = null; |
| | | |
| | | // Trick for avoiding a inner class for many parameters return for |
| | | // performPhaseOneHandshake method. |
| | |
| | | // Same group id poller thread |
| | | private SameGroupIdPoller sameGroupIdPoller = null; |
| | | |
| | | /* |
| | | * Properties for the last topology info received from the network. |
| | | */ |
| | | // Info for other DSs. |
| | | // Warning: does not contain info for us (for our server id) |
| | | private List<DSInfo> dsList = new ArrayList<DSInfo>(); |
| | | // Info for other RSs. |
| | | private List<RSInfo> rsList = new ArrayList<RSInfo>(); |
| | | |
| | | private long generationID; |
| | | |
| | | /** |
| | | * Creates a new ReplicationServer Broker for a particular ReplicationDomain. |
| | | * |
| | |
| | | * when negotiating the session with the replicationServer. |
| | | * @param serverId The server ID that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param maxReceiveQueue The maximum size of the receive queue to use on |
| | | * the replicationServer. |
| | | * @param maxReceiveDelay The maximum replication delay to use on the |
| | | * replicationServer. |
| | | * @param maxSendQueue The maximum size of the send queue to use on |
| | | * the replicationServer. |
| | | * @param maxSendDelay The maximum send delay to use on the replicationServer. |
| | | * @param window The size of the send and receive window to use. |
| | | * @param heartbeatInterval The interval between heartbeats requested of the |
| | | * replicationServer, or zero if no heartbeats are requested. |
| | |
| | | * @param groupId The group id of our domain. |
| | | */ |
| | | public ReplicationBroker(ReplicationDomain replicationDomain, |
| | | ServerState state, DN baseDn, short serverId, int maxReceiveQueue, |
| | | int maxReceiveDelay, int maxSendQueue, int maxSendDelay, int window, |
| | | long heartbeatInterval, long generationId, |
| | | ServerState state, String baseDn, short serverId, int window, |
| | | long generationId, long heartbeatInterval, |
| | | ReplSessionSecurity replSessionSecurity, byte groupId) |
| | | { |
| | | this.replicationDomain = replicationDomain; |
| | | this.domain = replicationDomain; |
| | | this.baseDn = baseDn; |
| | | this.serverId = serverId; |
| | | this.maxReceiveDelay = maxReceiveDelay; |
| | | this.maxSendDelay = maxSendDelay; |
| | | this.maxReceiveQueue = maxReceiveQueue; |
| | | this.maxSendQueue = maxSendQueue; |
| | | this.state = state; |
| | | replayOperations = |
| | | new TreeSet<FakeOperation>(new FakeOperationComparator()); |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window / 2; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | this.generationId = generationId; |
| | | this.replSessionSecurity = replSessionSecurity; |
| | | this.groupId = groupId; |
| | | this.generationID = generationId; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.maxRcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window /2; |
| | | } |
| | | |
| | | /** |
| | | * Start the ReplicationBroker. |
| | | */ |
| | | public void start() |
| | | { |
| | | shutdown = false; |
| | | this.rcvWindow = this.maxRcvWindow; |
| | | this.connect(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | shutdown = false; |
| | | this.servers = servers; |
| | | |
| | | if (servers.size() < 1) |
| | | { |
| | | Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Gets the server id. |
| | | * @return The server id |
| | | */ |
| | | private long getGenerationID() |
| | | { |
| | | if (domain != null) |
| | | return domain.getGenerationID(); |
| | | else |
| | | return generationID; |
| | | } |
| | | |
| | | /** |
| | | * Gets the server url of the RS we are connected to. |
| | | * @return The server url of the RS we are connected to |
| | | */ |
| | |
| | | |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | if (replicationDomain != null) |
| | | if (domain != null) |
| | | { |
| | | // If a first connect or a connection failure occur, we go through here. |
| | | // force status machine to NOT_CONNECTED_STATUS so that monitoring can |
| | | // see that we are not connected. |
| | | replicationDomain.toNotConnectedStatus(); |
| | | domain.toNotConnectedStatus(); |
| | | } |
| | | |
| | | // Stop any existing poller and heartbeat monitor from a previous session. |
| | |
| | | ServerStatus initStatus = |
| | | computeInitialServerStatus(replServerStartMsg.getGenerationId(), |
| | | bestServerInfo.getServerState(), |
| | | replServerStartMsg.getDegradedStatusThreshold(), generationId); |
| | | replServerStartMsg.getDegradedStatusThreshold(), |
| | | this.getGenerationID()); |
| | | |
| | | // Perfom session start (handshake phase 2) |
| | | TopologyMsg topologyMsg = performPhaseTwoHandshake(bestServer, |
| | |
| | | if ((tmpRsGroupId == groupId) || |
| | | ((tmpRsGroupId != groupId) && !someServersWithSameGroupId)) |
| | | { |
| | | /* |
| | | * We must not publish changes to a replicationServer that has |
| | | * not seen all our previous changes because this could cause |
| | | * some other ldap servers to miss those changes. |
| | | * Check that the ReplicationServer has seen all our previous |
| | | * changes. |
| | | */ |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | replServerStartMsg.getServerState(). |
| | | getMaxChangeNumber(serverId); |
| | | |
| | | if (replServerMaxChangeNumber == null) |
| | | { |
| | | replServerMaxChangeNumber = new ChangeNumber(0, 0, serverId); |
| | | } |
| | | ChangeNumber ourMaxChangeNumber = |
| | | state.getMaxChangeNumber(serverId); |
| | | |
| | | if ((ourMaxChangeNumber != null) && |
| | | (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | { |
| | | |
| | | // Replication server is missing some of our changes: let's |
| | | // send them to him. |
| | | |
| | | Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); |
| | | logError(message); |
| | | |
| | | /* |
| | | * Get all the changes that have not been seen by this |
| | | * replication server and populate the replayOperations |
| | | * list. |
| | | */ |
| | | InternalSearchOperation op = searchForChangedEntries( |
| | | baseDn, replServerMaxChangeNumber, this); |
| | | if (op.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | | /* |
| | | * An error happened trying to search for the updates |
| | | * This server will start acepting again new updates but |
| | | * some inconsistencies will stay between servers. |
| | | * Log an error for the repair tool |
| | | * that will need to resynchronize the servers. |
| | | */ |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | } else |
| | | { |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | | ChangeNumber cn = replayOp.getChangeNumber(); |
| | | /* |
| | | * Because the entry returned by the search operation |
| | | * can contain old historical information, it is |
| | | * possible that some of the FakeOperation are |
| | | * actually older than the |
| | | * Only send the Operation if it was newer than |
| | | * the last ChangeNumber known by the Replication Server. |
| | | */ |
| | | if (cn.newer(replServerMaxChangeNumber)) |
| | | { |
| | | message = |
| | | DEBUG_SENDING_CHANGE.get( |
| | | replayOp.getChangeNumber().toString()); |
| | | logError(message); |
| | | session.publish(replayOp.generateMessage()); |
| | | } |
| | | } |
| | | message = DEBUG_CHANGES_SENT.get(); |
| | | logError(message); |
| | | } |
| | | replayOperations.clear(); |
| | | } |
| | | |
| | | replicationServer = tmpReadableServerName; |
| | | maxSendWindow = replServerStartMsg.getWindowSize(); |
| | | rsGroupId = replServerStartMsg.getGroupId(); |
| | |
| | | |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | if (replicationDomain != null) |
| | | if (domain != null) |
| | | { |
| | | replicationDomain.setInitialStatus(initStatus); |
| | | replicationDomain.receiveTopo(topologyMsg); |
| | | domain.sessionInitiated( |
| | | initStatus, replServerStartMsg.getServerState(), |
| | | session); |
| | | } |
| | | receiveTopo(topologyMsg); |
| | | connected = true; |
| | | if (getRsGroupId() != groupId) |
| | | { |
| | |
| | | // Do not log connection error |
| | | newServerWithSameGroupId = true; |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | | Message message = ERR_PUBLISHING_FAKE_OPS.get( |
| | | baseDn.toNormalizedString(), bestServer, |
| | | e.getLocalizedMessage() + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_COMPUTING_FAKE_OPS.get( |
| | | baseDn.toNormalizedString(), bestServer, |
| | | baseDn, bestServer, |
| | | e.getLocalizedMessage() + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } finally |
| | |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connectPhaseLock.notify(); |
| | | |
| | | if ((replServerStartMsg.getGenerationId() == this.generationId) || |
| | | if ((replServerStartMsg.getGenerationId() == this.getGenerationID()) || |
| | | (replServerStartMsg.getGenerationId() == -1)) |
| | | { |
| | | Message message = |
| | |
| | | Short.toString(rsServerId), |
| | | replicationServer, |
| | | Short.toString(serverId), |
| | | Long.toString(this.generationId)); |
| | | Long.toString(this.getGenerationID())); |
| | | logError(message); |
| | | } else |
| | | { |
| | |
| | | NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get( |
| | | baseDn.toString(), |
| | | replicationServer, |
| | | Long.toString(this.generationId), |
| | | Long.toString(this.getGenerationID()), |
| | | Long.toString(replServerStartMsg.getGenerationId())); |
| | | logError(message); |
| | | } |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("RB for dn " + baseDn.toNormalizedString() + |
| | | TRACER.debugInfo("RB for dn " + baseDn + |
| | | " and with server id " + Short.toString(serverId) + " computed " + |
| | | Integer.toString(nChanges) + " changes late."); |
| | | } |
| | |
| | | /* |
| | | * Send our ServerStartMsg. |
| | | */ |
| | | ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | halfRcvWindow * 2, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), generationId, isSslEncryption, |
| | | ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, |
| | | baseDn, 0, 0, 0, 0, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), this.getGenerationID(), |
| | | isSslEncryption, |
| | | groupId); |
| | | localSession.publish(serverStartMsg); |
| | | |
| | |
| | | } |
| | | |
| | | // Sanity check |
| | | DN repDn = replServerStartMsg.getBaseDn(); |
| | | String repDn = replServerStartMsg.getBaseDn(); |
| | | if (!(this.baseDn.equals(repDn))) |
| | | { |
| | | Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(), |
| | | this.baseDn.toString()); |
| | | this.baseDn); |
| | | logError(message); |
| | | error = true; |
| | | } |
| | |
| | | if ( (e instanceof SocketTimeoutException) && debugEnabled() ) |
| | | { |
| | | TRACER.debugInfo("Timeout trying to connect to RS " + server + |
| | | " for dn: " + baseDn.toNormalizedString()); |
| | | " for dn: " + baseDn); |
| | | } |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1", |
| | | baseDn.toNormalizedString(), server, e.getLocalizedMessage() + |
| | | baseDn, server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | if (keepConnection) // Log error message only for final connection |
| | | { |
| | |
| | | StartSessionMsg startSessionMsg = null; |
| | | // May have created a broker with null replication domain for |
| | | // unit test purpose. |
| | | if (replicationDomain != null) |
| | | if (domain != null) |
| | | { |
| | | startSessionMsg = new StartSessionMsg(initStatus, |
| | | replicationDomain.getRefUrls(), |
| | | replicationDomain.isAssured(), |
| | | replicationDomain.getAssuredMode(), |
| | | replicationDomain.getAssuredSdLevel()); |
| | | startSessionMsg = |
| | | new StartSessionMsg( |
| | | initStatus, |
| | | domain.getRefUrls(), |
| | | domain.isAssured(), |
| | | domain.getAssuredMode(), |
| | | domain.getAssuredSdLevel()); |
| | | } else |
| | | { |
| | | startSessionMsg = |
| | |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2", |
| | | baseDn.toNormalizedString(), server, e.getLocalizedMessage() + |
| | | baseDn, server, e.getLocalizedMessage() + |
| | | stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | |
| | | * @return The computed best replication server. |
| | | */ |
| | | public static String computeBestReplicationServer(ServerState myState, |
| | | HashMap<String, ServerInfo> rsInfos, short serverId, DN baseDn, |
| | | HashMap<String, ServerInfo> rsInfos, short serverId, String baseDn, |
| | | byte groupId) |
| | | { |
| | | /* |
| | |
| | | * @return The computed best replication server. |
| | | */ |
| | | private static String searchForBestReplicationServer(ServerState myState, |
| | | HashMap<String, ServerInfo> rsInfos, short serverId, DN baseDn) |
| | | HashMap<String, ServerInfo> rsInfos, short serverId, String baseDn) |
| | | { |
| | | /* |
| | | * Find replication servers who are up to date (or more up to date than us, |
| | |
| | | */ |
| | | |
| | | Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get( |
| | | upToDateServers.size(), |
| | | baseDn.toNormalizedString(), |
| | | Short.toString(serverId)); |
| | | upToDateServers.size(), baseDn, Short.toString(serverId)); |
| | | logError(message); |
| | | |
| | | /* |
| | |
| | | */ |
| | | // lateOnes cannot be empty |
| | | Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get( |
| | | baseDn.toNormalizedString(), lateOnes.size()); |
| | | baseDn, lateOnes.size()); |
| | | logError(message); |
| | | |
| | | // Min of the shifts |
| | |
| | | } |
| | | |
| | | /** |
| | | * Search for the changes that happened since fromChangeNumber |
| | | * based on the historical attribute. The only changes that will |
| | | * be send will be the one generated on the serverId provided in |
| | | * fromChangeNumber. |
| | | * @param baseDn the base DN |
| | | * @param fromChangeNumber The change number from which we want the changes |
| | | * @param resultListener that will process the entries returned. |
| | | * @return the internal search operation |
| | | * @throws Exception when raised. |
| | | */ |
| | | public static InternalSearchOperation searchForChangedEntries( |
| | | DN baseDn, |
| | | ChangeNumber fromChangeNumber, |
| | | InternalSearchListener resultListener) |
| | | throws Exception |
| | | { |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | Short serverId = fromChangeNumber.getServerId(); |
| | | |
| | | String maxValueForId = "ffffffffffffffff" + |
| | | String.format("%04x", serverId) + "ffffffff"; |
| | | |
| | | LDAPFilter filter = LDAPFilter.decode( |
| | | "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:" |
| | | + fromChangeNumber + ")(" + Historical.HISTORICALATTRIBUTENAME + |
| | | "<=dummy:" + maxValueForId + "))"); |
| | | |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(Historical.HISTORICALATTRIBUTENAME); |
| | | attrs.add(Historical.ENTRYUIDNAME); |
| | | attrs.add("*"); |
| | | return conn.processSearch( |
| | | new ASN1OctetString(baseDn.toString()), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, filter, |
| | | attrs, |
| | | resultListener); |
| | | } |
| | | |
| | | /** |
| | | * Start the heartbeat monitor thread. |
| | | */ |
| | | private void startHeartBeat() |
| | |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( |
| | | baseDn.toNormalizedString(), e.getLocalizedMessage())); |
| | | baseDn, e.getLocalizedMessage())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | |
| | | } |
| | | } |
| | | } |
| | | if (!credit) |
| | | if ((!credit) && (currentWindowSemaphore.availablePermits() == 0)) |
| | | { |
| | | // the window is still closed. |
| | | // Send a WindowProbeMsg message to wakeup the receiver in case the |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ReplicationBroker.publish() " + |
| | | "IO exception raised : " + e.getLocalizedMessage()); |
| | | "Interrupted exception raised : " + e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | WindowMsg windowMsg = (WindowMsg) msg; |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } else |
| | | } |
| | | else if (msg instanceof TopologyMsg) |
| | | { |
| | | TopologyMsg topoMsg = (TopologyMsg)msg; |
| | | receiveTopo(topoMsg); |
| | | } |
| | | else |
| | | { |
| | | return msg; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set the value of the generationId for that broker. Normally the |
| | | * generationId is set through the constructor but there are cases |
| | | * where the value of the generationId must be changed while the broker |
| | | * already exist for example after an on-line import. |
| | | * |
| | | * @param generationId The value of the generationId. |
| | | * |
| | | */ |
| | | public void setGenerationId(long generationId) |
| | | { |
| | | this.generationId = generationId; |
| | | } |
| | | |
| | | /** |
| | | * Get the name of the replicationServer to which this broker is currently |
| | | * connected. |
| | | * |
| | |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void handleInternalSearchEntry( |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultEntry searchEntry) |
| | | { |
| | | /* |
| | | * This call back is called at session establishment phase |
| | | * for each entry that has been changed by this server and the changes |
| | | * have not been sent to any Replication Server. |
| | | * The role of this method is to build equivalent operation from |
| | | * the historical information and add them in the replayOperations |
| | | * table. |
| | | */ |
| | | Iterable<FakeOperation> updates = |
| | | Historical.generateFakeOperations(searchEntry); |
| | | for (FakeOperation op : updates) |
| | | { |
| | | replayOperations.add(op); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void handleInternalSearchReference( |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultReference searchReference) |
| | | { |
| | | // TODO to be implemented |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum receive window size. |
| | | * |
| | | * @return The maximum receive window size. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Change some config parameters. |
| | | * Change some configuration parameters. |
| | | * |
| | | * @param replicationServers The new list of replication servers. |
| | | * @param maxReceiveQueue The max size of receive queue. |
| | | * @param maxReceiveDelay The max receive delay. |
| | | * @param maxSendQueue The max send queue. |
| | | * @param maxSendDelay The max Send Delay. |
| | | * @param replicationServers The new list of replication servers. |
| | | * @param window The max window size. |
| | | * @param heartbeatInterval The heartbeat interval. |
| | | * @param heartbeatInterval The heartBeat interval. |
| | | * |
| | | * @return A boolean indicating if the changes |
| | | * requires to restart the service. |
| | | */ |
| | | public void changeConfig(Collection<String> replicationServers, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval) |
| | | public boolean changeConfig( |
| | | Collection<String> replicationServers, int window, long heartbeatInterval) |
| | | { |
| | | this.servers = replicationServers; |
| | | this.maxRcvWindow = window; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.maxReceiveDelay = maxReceiveDelay; |
| | | this.maxReceiveQueue = maxReceiveQueue; |
| | | this.maxSendDelay = maxSendDelay; |
| | | this.maxSendQueue = maxSendQueue; |
| | | // These parameters needs to be renegociated with the ReplicationServer |
| | | // so if they have changed, that requires restarting the session with |
| | | // the ReplicationServer. |
| | | Boolean needToRestartSession = false; |
| | | |
| | | // For info, a new session with the replicationServer |
| | | // will be recreated in the replication domain |
| | | // to take into account the new configuration. |
| | | // A new session is necessary only when information regarding |
| | | // the connection is modified |
| | | if ((servers == null) || |
| | | (!(replicationServers.size() == servers.size() |
| | | && replicationServers.containsAll(servers))) || |
| | | window != this.maxRcvWindow || |
| | | heartbeatInterval != this.heartbeatInterval) |
| | | { |
| | | needToRestartSession = true; |
| | | } |
| | | |
| | | this.servers = replicationServers; |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window / 2; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | |
| | | return needToRestartSession; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | try |
| | | { |
| | | |
| | | ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS, |
| | | newStatus); |
| | | session.publish(csMsg); |
| | | } catch (IOException ex) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_CS.get( |
| | | baseDn.toNormalizedString(), |
| | | baseDn, |
| | | Short.toString(serverId), |
| | | ex.getLocalizedMessage() + stackTraceToSingleLineString(ex)); |
| | | logError(message); |
| | |
| | | { |
| | | this.groupId = groupId; |
| | | } |
| | | |
| | | /** |
| | | * Gets the info for DSs in the topology (except us). |
| | | * @return The info for DSs in the topology (except us) |
| | | */ |
| | | public List<DSInfo> getDsList() |
| | | { |
| | | return dsList; |
| | | } |
| | | |
| | | /** |
| | | * Gets the info for RSs in the topology (except the one we are connected |
| | | * to). |
| | | * @return The info for RSs in the topology (except the one we are connected |
| | | * to) |
| | | */ |
| | | public List<RSInfo> getRsList() |
| | | { |
| | | return rsList; |
| | | } |
| | | |
| | | /** |
| | | * Processes an incoming TopologyMsg. |
| | | * Updates the structures for the local view of the topology. |
| | | * |
| | | * @param topoMsg The topology information received from RS. |
| | | */ |
| | | public void receiveTopo(TopologyMsg topoMsg) |
| | | { |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn |
| | | + " received topology info update:\n" + topoMsg); |
| | | |
| | | // Store new lists |
| | | synchronized(getDsList()) |
| | | { |
| | | synchronized(getRsList()) |
| | | { |
| | | dsList = topoMsg.getDsList(); |
| | | rsList = topoMsg.getRsList(); |
| | | } |
| | | } |
| | | } |
| | | } |