Remove dead code in the replication without any functionality change.
| | |
| | | */ |
| | | package org.opends.server.replication.common; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | |
| | | import java.util.Iterator; |
| | | import java.util.TreeMap; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | |
| | | |
| | | /** |
| | | * This object is used to store a list of ServerState object, one by |
| | |
| | | public class MultiDomainServerState implements Iterable<String> |
| | | { |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * The list of (domain service id, ServerState). |
| | | */ |
| | | private TreeMap<String, ServerState> list; |
| | |
| | | * @param serverId The serverId of the server for which the ServerStartMsg |
| | | * is created. |
| | | * @param baseDn The base DN. |
| | | * @param maxReceiveDelay The max receive delay for this server. |
| | | * @param maxReceiveQueue The max receive Queue for this server. |
| | | * @param maxSendDelay The max Send Delay from this server. |
| | | * @param maxSendQueue The max send Queue from this server. |
| | | * @param windowSize The window size used by this server. |
| | | * @param heartbeatInterval The requested heartbeat interval. |
| | | * @param serverState The state of this server. |
| | |
| | | * after the start messages have been exchanged. |
| | | * @param groupId The group id of the DS for this DN |
| | | */ |
| | | public ServerStartMsg(short serverId, String baseDn, int maxReceiveDelay, |
| | | int maxReceiveQueue, int maxSendDelay, |
| | | int maxSendQueue, int windowSize, |
| | | public ServerStartMsg(short serverId, String baseDn, int windowSize, |
| | | long heartbeatInterval, |
| | | ServerState serverState, |
| | | short protocolVersion, |
| | |
| | | |
| | | this.serverId = serverId; |
| | | this.baseDn = baseDn; |
| | | this.maxReceiveDelay = maxReceiveDelay; |
| | | this.maxReceiveQueue = maxReceiveQueue; |
| | | this.maxSendDelay = maxSendDelay; |
| | | this.maxSendQueue = maxSendQueue; |
| | | this.maxReceiveDelay = 0; |
| | | this.maxReceiveQueue = 0; |
| | | this.maxSendDelay = 0; |
| | | this.maxSendQueue = 0; |
| | | this.windowSize = windowSize; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.sslEncryption = sslEncryption; |
| | |
| | | serverId = serverStartMsg.getServerId(); |
| | | serverURL = serverStartMsg.getServerURL(); |
| | | groupId = serverStartMsg.getGroupId(); |
| | | maxReceiveDelay = serverStartMsg.getMaxReceiveDelay(); |
| | | maxReceiveQueue = serverStartMsg.getMaxReceiveQueue(); |
| | | maxSendDelay = serverStartMsg.getMaxSendDelay(); |
| | | maxSendQueue = serverStartMsg.getMaxSendQueue(); |
| | | heartbeatInterval = serverStartMsg.getHeartbeatInterval(); |
| | | |
| | | // generic stuff |
| | |
| | | setInitialServerState(serverStartMsg.getServerState()); |
| | | setSendWindowSize(serverStartMsg.getWindowSize()); |
| | | |
| | | if (maxReceiveQueue > 0) |
| | | restartReceiveQueue = (maxReceiveQueue > 1000 ? maxReceiveQueue - |
| | | 200 : maxReceiveQueue * 8 / 10); |
| | | else |
| | | restartReceiveQueue = 0; |
| | | |
| | | if (maxSendQueue > 0) |
| | | restartSendQueue = |
| | | (maxSendQueue > 1000 ? maxSendQueue - 200 : maxSendQueue * 8 / |
| | | 10); |
| | | else |
| | | restartSendQueue = 0; |
| | | |
| | | if (maxReceiveDelay > 0) |
| | | restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay - 1 |
| | | : maxReceiveDelay); |
| | | else |
| | | restartReceiveDelay = 0; |
| | | |
| | | if (maxSendDelay > 0) |
| | | restartSendDelay = |
| | | (maxSendDelay > 10 ? maxSendDelay - 1 : maxSendDelay); |
| | | else |
| | | restartSendDelay = 0; |
| | | |
| | | if (heartbeatInterval < 0) |
| | | { |
| | | heartbeatInterval = 0; |
| | |
| | | package org.opends.server.replication.server; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | |
| | | import java.io.IOException; |
| | |
| | | attributes.add(Attributes.create("External-Changelog-Server", |
| | | serverURL)); |
| | | |
| | | try |
| | | { |
| | | MonitorData md; |
| | | md = replicationServerDomain.computeMonitorData(); |
| | | // FIXME:ECL No monitoring exist for ECL. |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = |
| | | ERR_ERROR_RETRIEVING_MONITOR_DATA.get(stackTraceToSingleLineString(e)); |
| | | // We failed retrieving the monitor data. |
| | | attributes.add(Attributes.create("error", message.toString())); |
| | | } |
| | | // FIXME:ECL No monitoring exist for ECL. |
| | | return attributes; |
| | | } |
| | | /** |
| | |
| | | */ |
| | | protected int inCount = 0; |
| | | /** |
| | | * Specifies the max receive queue for this handler. |
| | | */ |
| | | protected int maxReceiveQueue = 0; |
| | | /** |
| | | * Specifies the max send queue for this handler. |
| | | */ |
| | | protected int maxSendQueue = 0; |
| | | /** |
| | | * Specifies the max receive delay for this handler. |
| | | */ |
| | | protected int maxReceiveDelay = 0; |
| | | /** |
| | | * Specifies the max send delay for this handler. |
| | | */ |
| | | protected int maxSendDelay = 0; |
| | | /** |
| | | * Specifies the max queue size for this handler. |
| | | */ |
| | | protected int maxQueueSize = 5000; |
| | |
| | | */ |
| | | protected int maxQueueBytesSize = maxQueueSize * 100; |
| | | /** |
| | | * Specifies the max restart receive queue for this handler. |
| | | */ |
| | | protected int restartReceiveQueue; |
| | | /** |
| | | * Specifies the max restart send queue for this handler. |
| | | */ |
| | | protected int restartSendQueue; |
| | | /** |
| | | * Specifies the max restart receive delay for this handler. |
| | | */ |
| | | protected int restartReceiveDelay; |
| | | /** |
| | | * Specifies the max restart send delay for this handler. |
| | | */ |
| | | protected int restartSendDelay; |
| | | /** |
| | | * Specifies whether the consumer is following the producer (is not late). |
| | | */ |
| | | protected boolean following = false; |
| | |
| | | */ |
| | | private String serviceId = null; |
| | | /** |
| | | * Specifies whether the server is flow controlled and should be stopped from |
| | | * sending messages. |
| | | */ |
| | | protected boolean flowControl = false; |
| | | /** |
| | | * Specifies whether the consumer is still active or not. |
| | | * If not active, the handler will not return any message. |
| | | * Called at the beginning of shutdown process. |
| | |
| | | msgQueue.removeFirst(); |
| | | } |
| | | } |
| | | |
| | | if (isSaturated(update.getChangeNumber(), sourceHandler)) |
| | | { |
| | | sourceHandler.setSaturated(true); |
| | | } |
| | | |
| | | } |
| | | /** |
| | | * Set the shut down flag to true and returns the previous value of the flag. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check is this server is saturated (this server has already been |
| | | * sent a bunch of updates and has not processed them so they are staying |
| | | * in the message queue for this server an the size of the queue |
| | | * for this server is above the configured limit. |
| | | * |
| | | * The limit can be defined in number of updates or with a maximum delay |
| | | * |
| | | * @param changeNumber The changenumber to use to make the delay calculations. |
| | | * @param sourceHandler The ServerHandler which is sending the update. |
| | | * @return true is saturated false if not saturated. |
| | | */ |
| | | public boolean isSaturated(ChangeNumber changeNumber, |
| | | MessageHandler sourceHandler) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | int size = msgQueue.count(); |
| | | |
| | | if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue)) |
| | | return true; |
| | | |
| | | if ((sourceHandler.maxSendQueue > 0) && |
| | | (size >= sourceHandler.maxSendQueue)) |
| | | return true; |
| | | |
| | | if (!msgQueue.isEmpty()) |
| | | { |
| | | UpdateMsg firstUpdate = msgQueue.first(); |
| | | |
| | | if (firstUpdate != null) |
| | | { |
| | | long timeDiff = changeNumber.getTimeSec() - |
| | | firstUpdate.getChangeNumber().getTimeSec(); |
| | | |
| | | if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay)) |
| | | return true; |
| | | |
| | | if ((sourceHandler.maxSendDelay > 0) && |
| | | (timeDiff >= sourceHandler.maxSendDelay)) |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check that the size of the Server Handler messages Queue has lowered |
| | | * below the limit and therefore allowing the reception of messages |
| | | * from other servers to restart. |
| | | * @param source The ServerHandler which was sending the update. |
| | | * can be null. |
| | | * @return true if the processing can restart |
| | | */ |
| | | public boolean restartAfterSaturation(MessageHandler source) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | int queueSize = msgQueue.count(); |
| | | if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue)) |
| | | return false; |
| | | if ((source != null) && (source.maxSendQueue > 0) && |
| | | (queueSize >= source.restartSendQueue)) |
| | | return false; |
| | | |
| | | if (!msgQueue.isEmpty()) |
| | | { |
| | | UpdateMsg firstUpdate = msgQueue.first(); |
| | | UpdateMsg lastUpdate = msgQueue.last(); |
| | | |
| | | if ((firstUpdate != null) && (lastUpdate != null)) |
| | | { |
| | | long timeDiff = lastUpdate.getChangeNumber().getTimeSec() - |
| | | firstUpdate.getChangeNumber().getTimeSec(); |
| | | if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay)) |
| | | return false; |
| | | if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >= |
| | | source.restartSendDelay)) |
| | | return false; |
| | | } |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Set that the consumer is now becoming inactive and thus getNextMessage |
| | | * should not return any UpdateMsg any more. |
| | | * @param active the provided state of the consumer. |
| | |
| | | this.following = following; |
| | | } |
| | | |
| | | private void setSaturated(boolean value) |
| | | { |
| | | flowControl = value; |
| | | } |
| | | |
| | | /** |
| | | * Set the initial value of the serverState for this handler. |
| | | * Expected to be done once, then the state will be updated using |
| | |
| | | */ |
| | | public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | private final Object flowControlLock = new Object(); |
| | | private final String baseDn; |
| | | // The Status analyzer that periodically verifis if the connected DSs are |
| | | // late or not |
| | |
| | | * So this methods simply need to check that dependencies are OK |
| | | * and update this replicaId RUV |
| | | * |
| | | * TODO : dependency : |
| | | * before forwarding change, we should check that the dependency |
| | | * that is indicated in this change is OK (change already in the RUV) |
| | | */ |
| | | msg = handler.take(); |
| | | synchronized (flowControlLock) |
| | | { |
| | | if (handler.restartAfterSaturation(null)) |
| | | flowControlLock.notifyAll(); |
| | | } |
| | | |
| | | return msg; |
| | | } |
| | | |
| | |
| | | } catch (IOException ioe) |
| | | { |
| | | /* |
| | | * An error happened trying the send a routabled message |
| | | * An error happened trying the send a routable message |
| | | * to its destination server. |
| | | * Send back an error to the originator of the message. |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if some server Handler should be removed from flow control state. |
| | | * @throws IOException If an error happened. |
| | | */ |
| | | public void checkAllSaturation() throws IOException |
| | | { |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | handler.checkWindow(); |
| | | } |
| | | |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | { |
| | | handler.checkWindow(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check if a server that was in flow control can now restart |
| | | * sending updates. |
| | | * @param sourceHandler The server that must be checked. |
| | | * @return true if the server can restart sending changes. |
| | | * false if the server can't restart sending changes. |
| | | */ |
| | | public boolean restartAfterSaturation(MessageHandler sourceHandler) |
| | | { |
| | | for (MessageHandler handler : replicationServers.values()) |
| | | { |
| | | if (!handler.restartAfterSaturation(sourceHandler)) |
| | | return false; |
| | | } |
| | | |
| | | for (MessageHandler handler : directoryServers.values()) |
| | | { |
| | | if (!handler.restartAfterSaturation(sourceHandler)) |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Send a TopologyMsg to all the connected directory servers in order to |
| | | * let. |
| | | * them know the topology (every known DSs and RSs) |
| | |
| | | */ |
| | | int sendWindowSize; |
| | | |
| | | private int saturationCount = 0; |
| | | |
| | | /** |
| | | * The protocol version established with the remote server. |
| | | */ |
| | |
| | | { |
| | | if (rcvWindow < rcvWindowSizeHalf) |
| | | { |
| | | if (flowControl) |
| | | { |
| | | if (replicationServerDomain.restartAfterSaturation(this)) |
| | | { |
| | | flowControl = false; |
| | | } |
| | | } |
| | | if (!flowControl) |
| | | { |
| | | WindowMsg msg = new WindowMsg(rcvWindowSizeHalf); |
| | | session.publish(msg); |
| | | rcvWindow += rcvWindowSizeHalf; |
| | | } |
| | | WindowMsg msg = new WindowMsg(rcvWindowSizeHalf); |
| | | session.publish(msg); |
| | | rcvWindow += rcvWindowSizeHalf; |
| | | } |
| | | } |
| | | |
| | |
| | | boolean interrupted = true; |
| | | UpdateMsg msg = getnextMessage(true); // synchronous:block until msg |
| | | |
| | | /* |
| | | * When we remove a message from the queue we need to check if another |
| | | * server is waiting in flow control because this queue was too long. |
| | | * This check might cause a performance penalty an therefore it |
| | | * is not done for every message removed but only every few messages. |
| | | */ |
| | | if (++saturationCount > 10) |
| | | { |
| | | saturationCount = 0; |
| | | try |
| | | { |
| | | replicationServerDomain.checkAllSaturation(); |
| | | } catch (IOException e) |
| | | { |
| | | } |
| | | } |
| | | boolean acquired = false; |
| | | do |
| | | { |
| | |
| | | /* |
| | | * Send our ServerStartMsg. |
| | | */ |
| | | ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, |
| | | baseDn, 0, 0, 0, 0, |
| | | ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | ProtocolVersion.getCurrentVersion(), this.getGenerationID(), |
| | | isSslEncryption, |
| | |
| | | throws Exception |
| | | { |
| | | AckMsg msg1, msg2 ; |
| | | |
| | | |
| | | // Consctructor test (with ChangeNumber) |
| | | // Chech that retrieved CN is OK |
| | | msg1 = new AckMsg(cn); |
| | | assertEquals(msg1.getChangeNumber().compareTo(cn), 0); |
| | | |
| | | |
| | | // Check default values for error info |
| | | assertFalse(msg1.hasTimeout()); |
| | | assertFalse(msg1.hasWrongStatus()); |
| | | assertFalse(msg1.hasReplayError()); |
| | | assertTrue(msg1.getFailedServers().size() == 0); |
| | | |
| | | |
| | | // Check constructor with error info |
| | | msg1 = new AckMsg(cn, hasTimeout, hasWrongStatus, hasReplayError, failedServers); |
| | | assertEquals(msg1.getChangeNumber().compareTo(cn), 0); |
| | |
| | | assertTrue(msg1.hasWrongStatus() == hasWrongStatus); |
| | | assertTrue(msg1.hasReplayError() == hasReplayError); |
| | | assertEquals(msg1.getFailedServers(), failedServers); |
| | | |
| | | |
| | | // Consctructor test (with byte[]) |
| | | msg2 = new AckMsg(msg1.getBytes()); |
| | | assertEquals(msg2.getChangeNumber().compareTo(cn), 0); |
| | |
| | | assertTrue(msg1.hasWrongStatus() == msg2.hasWrongStatus()); |
| | | assertTrue(msg1.hasReplayError() == msg2.hasReplayError()); |
| | | assertEquals(msg1.getFailedServers(), msg2.getFailedServers()); |
| | | |
| | | |
| | | // Check invalid bytes for constructor |
| | | byte[] b = msg1.getBytes(); |
| | | b[0] = ReplicationMsg.MSG_TYPE_ADD; |
| | |
| | | { |
| | | assertTrue(true); |
| | | } |
| | | |
| | | |
| | | // Check that retrieved CN is OK |
| | | msg2 = (AckMsg) ReplicationMsg.generateMsg(msg1.getBytes()); |
| | | } |
| | |
| | | String serviceId = "serviceid"; |
| | | |
| | | // create a cookie |
| | | MultiDomainServerState cookie = |
| | | MultiDomainServerState cookie = |
| | | new MultiDomainServerState( |
| | | "o=test:000001210b6f21e904b100000001 000001210b6f21e904b200000001;" + |
| | | "o=test2:000001210b6f21e904b100000002 000001210b6f21e904b200000002;"); |
| | | |
| | | |
| | | // Constructor test |
| | | ECLUpdateMsg msg1 = new ECLUpdateMsg(delmsg, cookie, serviceId); |
| | | assertTrue(msg1.getCookie().equalsTo(cookie)); |
| | |
| | | public void serverStartMsgTest(short serverId, String baseDN, int window, |
| | | ServerState state, long genId, boolean sslEncryption, byte groupId) throws Exception |
| | | { |
| | | ServerStartMsg msg = new ServerStartMsg(serverId, baseDN, |
| | | window, window, window, window, window, window, state, |
| | | ServerStartMsg msg = new ServerStartMsg( |
| | | serverId, baseDN, window, window, state, |
| | | ProtocolVersion.getCurrentVersion(), genId, sslEncryption, groupId); |
| | | ServerStartMsg newMsg = new ServerStartMsg(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getBaseDn(), newMsg.getBaseDn()); |
| | | assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay()); |
| | | assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue()); |
| | | assertEquals(msg.getMaxSendDelay(), newMsg.getMaxSendDelay()); |
| | | assertEquals(msg.getMaxSendQueue(), newMsg.getMaxSendQueue()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval()); |
| | | assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption()); |
| | |
| | | // LS2 state |
| | | ServerState s2 = new ServerState(); |
| | | short sid2 = 222; |
| | | Long now = TimeThread.getTime(); |
| | | Long now = ((Integer)10).longValue(); |
| | | ChangeNumber cn2 = new ChangeNumber(now, |
| | | (short) 123, sid2); |
| | | s2.update(cn2); |
| | |
| | | UpdateMsg newMsg = new UpdateMsg(msg.getBytes()); |
| | | assertEquals(test.getBytes(), newMsg.getPayload()); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Test that ServerStartMsg encoding and decoding works |
| | | * by checking that : msg == new ServerStartMsg(msg.getBytes()). |
| | |
| | | ServerState state = new ServerState(); |
| | | assertTrue(state.update(new ChangeNumber((long)75, 5,(short)263))); |
| | | short mode = 3; |
| | | int firstDraftChangeNumber = 13; |
| | | int lastDraftChangeNumber = 14; |
| | | int firstDraftChangeNumber = 13; |
| | | int lastDraftChangeNumber = 14; |
| | | String myopid = new String("fakeopid"); |
| | | // create original |
| | | StartECLSessionMsg msg = new StartECLSessionMsg(); |
| | |
| | | import java.util.TreeSet; |
| | | import java.util.UUID; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.api.SynchronizationProvider; |
| | | import org.opends.server.backends.task.TaskState; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.ModifyDNOperationBasis; |
| | | import org.opends.server.loggers.ErrorLogger; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | |
| | | // send a ServerStartMsg with an empty ServerState. |
| | | ServerStartMsg msg = |
| | | new ServerStartMsg((short) 1723, TEST_ROOT_DN_STRING, |
| | | 0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(), |
| | | WINDOW, (long) 5000, new ServerState(), |
| | | ProtocolVersion.getCurrentVersion(), 0, sslEncryption, (byte)-1); |
| | | session.publish(msg); |
| | | |
| | |
| | | DN baseDn = DN.decode(TEST_ROOT_DN_STRING); |
| | | msg = new ServerStartMsg( |
| | | (short) 1724, TEST_ROOT_DN_STRING, |
| | | 0, 0, 0, 0, WINDOW, (long) 5000, replServerState, |
| | | WINDOW, (long) 5000, replServerState, |
| | | ProtocolVersion.getCurrentVersion(), |
| | | ReplicationTestCase.getGenerationId(baseDn), |
| | | sslEncryption, (byte)10); |
| | |
| | | new DeleteMsg("o=example," + TEST_ROOT_DN_STRING, gen.newChangeNumber(), |
| | | "uid"); |
| | | broker.publish(msg); |
| | | |
| | | |
| | | if ((count % 10) == 0) |
| | | debugInfo("BrokerWriter " + broker.getServerId() + " sent="+count); |
| | | } |