[Issue 1085] Synchronization protocol must be extensible
2 files added
9 files modified
| | |
| | | import static org.opends.server.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.util.Collection; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.io.IOException; |
| | | import java.net.ConnectException; |
| | | import java.net.InetAddress; |
| | |
| | | import java.net.Socket; |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.Collection; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | | |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.ReplServerStartMessage; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.ReplServerStartMessage; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.replication.protocol.ServerStartMessage; |
| | | import org.opends.server.replication.protocol.SocketSession; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.protocol.WindowMessage; |
| | | import org.opends.server.types.DN; |
| | |
| | | private int halfRcvWindow; |
| | | private int maxRcvWindow; |
| | | private int timeout = 0; |
| | | private short protocolVersion; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window/2; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.protocolVersion = ProtocolVersion.currentVersion(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | halfRcvWindow*2, heartbeatInterval, state); |
| | | halfRcvWindow*2, heartbeatInterval, state, |
| | | protocolVersion); |
| | | session.publish(msg); |
| | | |
| | | |
| | |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | startMsg = (ReplServerStartMessage) session.receive(); |
| | | |
| | | /* |
| | | * We have sent our own protocol version to the replication server. |
| | | * The replication server will use the same one (or an older one |
| | | * if it is an old replication server). |
| | | */ |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | startMsg.getVersion()); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | /* |
| | |
| | | // session with the replicationServer or renegociate the parameters that |
| | | // were sent in the ServerStart message |
| | | } |
| | | |
| | | /** |
| | | * Get the version of the replication protocol. |
| | | * @return The version of the replication protocol. |
| | | */ |
| | | public short getProtocolVersion() |
| | | { |
| | | return protocolVersion; |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | |
| | | /** |
| | | * The version utility class for the replication protocol. |
| | | */ |
| | | public class ProtocolVersion |
| | | { |
| | | /** |
| | | * Get the version included in the Start Message mean the replication |
| | | * protocol version used by the server that created the message. |
| | | * |
| | | * @return The version used by the server that created the message. |
| | | */ |
| | | static short CURRENT_VERSION = 1; |
| | | |
| | | /** |
| | | * Specifies the current version of the replication protocol. |
| | | * |
| | | * @return The current version of the protocol. |
| | | */ |
| | | public static short currentVersion() |
| | | { |
| | | return CURRENT_VERSION; |
| | | } |
| | | |
| | | /** |
| | | * For test purpose. |
| | | * @param currentVersion The provided current version. |
| | | */ |
| | | public static void setCurrentVersion(short currentVersion) |
| | | { |
| | | CURRENT_VERSION = currentVersion; |
| | | } |
| | | |
| | | /** |
| | | * Specifies the oldest version of the protocol from the provided one |
| | | * and the current one. |
| | | * |
| | | * @param version The version to be compared to the current one. |
| | | * @return The minimal protocol version. |
| | | */ |
| | | public static short minWithCurrent(short version) |
| | | { |
| | | Short sVersion = Short.valueOf(version); |
| | | Short newVersion = (sVersion<CURRENT_VERSION?sVersion:CURRENT_VERSION); |
| | | return newVersion; |
| | | } |
| | | } |
| | | |
| | |
| | | * Message sent by a replication server to another replication server |
| | | * at Startup. |
| | | */ |
| | | public class ReplServerStartMessage extends ReplicationMessage implements |
| | | public class ReplServerStartMessage extends StartMessage implements |
| | | Serializable |
| | | { |
| | | private static final long serialVersionUID = -5871385537169856856L; |
| | |
| | | * @param baseDn base DN for which the ReplServerStartMessage is created. |
| | | * @param windowSize The window size. |
| | | * @param serverState our ServerState for this baseDn. |
| | | * @param protocolVersion The replication protocol version of the creator. |
| | | */ |
| | | public ReplServerStartMessage(short serverId, String serverURL, DN baseDn, |
| | | int windowSize, |
| | | ServerState serverState) |
| | | ServerState serverState, |
| | | short protocolVersion) |
| | | { |
| | | super(protocolVersion); |
| | | this.serverId = serverId; |
| | | this.serverURL = serverURL; |
| | | if (baseDn != null) |
| | |
| | | /* The ReplServerStartMessage is encoded in the form : |
| | | * <baseDn><ServerId><ServerUrl><windowsize><ServerState> |
| | | */ |
| | | super(MSG_TYPE_REPL_SERVER_START, in); |
| | | |
| | | try |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != MSG_TYPE_REPL_SERVER_START) |
| | | throw new DataFormatException( |
| | | "input is not a valid ReplServerStartMsg"); |
| | | int pos = 1; |
| | | /* first bytes are the header */ |
| | | int pos = headerLength; |
| | | |
| | | /* read the dn |
| | | * first calculate the length then construct the string |
| | |
| | | byte[] byteServerState = serverState.getBytes(); |
| | | byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8"); |
| | | |
| | | int length = 1 + byteDn.length + 1 + byteServerId.length + 1 + |
| | | byteServerUrl.length + 1 + byteWindowSize.length + 1 + |
| | | byteServerState.length + 1; |
| | | int length = byteDn.length + 1 + byteServerId.length + 1 + |
| | | byteServerUrl.length + 1 + byteWindowSize.length + 1 + |
| | | byteServerState.length + 1; |
| | | |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = MSG_TYPE_REPL_SERVER_START; |
| | | int pos = 1; |
| | | /* encode the header in a byte[] large enough to also contain the mods */ |
| | | byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length); |
| | | int pos = headerLength; |
| | | |
| | | /* put the baseDN and a terminating 0 */ |
| | | pos = addByteArray(byteDn, resultByteArray, pos); |
| | |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | |
| | | /** |
| | | * This message is used by LDAP server when they first connect. |
| | | * to a replication server to let them know who they are and what is their state |
| | | * (their RUV) |
| | | */ |
| | | public class ServerStartMessage extends ReplicationMessage implements |
| | | public class ServerStartMessage extends StartMessage implements |
| | | Serializable |
| | | { |
| | | private static final long serialVersionUID = 8649393307038290287L; |
| | |
| | | * @param windowSize The window size used by this server. |
| | | * @param heartbeatInterval The requested heartbeat interval. |
| | | * @param serverState The state of this server. |
| | | * @param protocolVersion The replication protocol version of the creator. |
| | | */ |
| | | public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay, |
| | | int maxReceiveQueue, int maxSendDelay, |
| | | int maxSendQueue, int windowSize, |
| | | long heartbeatInterval, |
| | | ServerState serverState) |
| | | ServerState serverState, |
| | | short protocolVersion) |
| | | { |
| | | super(protocolVersion); |
| | | |
| | | this.serverId = serverId; |
| | | this.baseDn = baseDn.toString(); |
| | | this.maxReceiveDelay = maxReceiveDelay; |
| | |
| | | */ |
| | | public ServerStartMessage(byte[] in) throws DataFormatException |
| | | { |
| | | super(MSG_TYPE_SERVER_START, in); |
| | | |
| | | /* The ServerStartMessage is encoded in the form : |
| | | * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <header><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <maxSendDelay><maxSendQueue><window><heartbeatInterval><ServerState> |
| | | */ |
| | | try |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != MSG_TYPE_SERVER_START) |
| | | throw new DataFormatException("input is not a valid ServerStart msg"); |
| | | int pos = 1; |
| | | /* first bytes are the header */ |
| | | int pos = headerLength; |
| | | |
| | | /* |
| | | * read the dn |
| | |
| | | String.valueOf(heartbeatInterval).getBytes("UTF-8"); |
| | | byte[] byteServerState = serverState.getBytes(); |
| | | |
| | | int length = 1 + byteDn.length + 1 + byteServerId.length + 1 + |
| | | int length = byteDn.length + 1 + byteServerId.length + 1 + |
| | | byteServerUrl.length + 1 + |
| | | byteMaxRecvDelay.length + 1 + |
| | | byteMaxRecvQueue.length + 1 + |
| | |
| | | byteHeartbeatInterval.length + 1 + |
| | | byteServerState.length + 1; |
| | | |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = MSG_TYPE_SERVER_START; |
| | | int pos = 1; |
| | | /* encode the header in a byte[] large enough to also contain the mods */ |
| | | byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length); |
| | | int pos = headerLength; |
| | | |
| | | pos = addByteArray(byteDn, resultByteArray, pos); |
| | | |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | |
| | | /** |
| | | * This abstract message class is the superclass for start messages used |
| | | * by LDAP servers and Replication servers to initiate their communications. |
| | | * This class specifies a message header that contains the Replication |
| | | * Protocol version. |
| | | */ |
| | | public abstract class StartMessage extends ReplicationMessage |
| | | { |
| | | private short protocolVersion; |
| | | |
| | | /** |
| | | * The length of the header of this message. |
| | | */ |
| | | protected int headerLength; |
| | | |
| | | /** |
| | | * Create a new StartMessage. |
| | | * |
| | | * @param protocolVersion The Replication Protocol version of the server |
| | | * for which the StartMessage is created. |
| | | */ |
| | | public StartMessage(short protocolVersion) |
| | | { |
| | | this.protocolVersion = protocolVersion; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new ServerStartMessage from its encoded form. |
| | | * |
| | | * @param type The type of the message to create. |
| | | * @param encodedMsg The byte array containing the encoded form of the |
| | | * StartMessage. |
| | | * @throws DataFormatException If the byte array does not contain a valid |
| | | * encoded form of the ServerStartMessage. |
| | | */ |
| | | public StartMessage(byte type, byte [] encodedMsg) throws DataFormatException |
| | | { |
| | | headerLength = decodeHeader(type, encodedMsg); |
| | | } |
| | | |
| | | /** |
| | | * Encode the header for the start message. |
| | | * |
| | | * @param type The type of the message to create. |
| | | * @param additionalLength additional length needed to encode the remaining |
| | | * part of the UpdateMessage. |
| | | * @return a byte array containing the common header and enough space to |
| | | * encode the reamining bytes of the UpdateMessage as was specified |
| | | * by the additionalLength. |
| | | * (byte array length = common header length + additionalLength) |
| | | * @throws UnsupportedEncodingException if UTF-8 is not supported. |
| | | */ |
| | | public byte[] encodeHeader(byte type, int additionalLength) |
| | | throws UnsupportedEncodingException |
| | | { |
| | | byte[] versionByte = Short.toString(protocolVersion).getBytes("UTF-8"); |
| | | |
| | | /* The message header is stored in the form : |
| | | * <message type><protocol version> |
| | | */ |
| | | int length = 1 + versionByte.length + 1 + |
| | | additionalLength; |
| | | |
| | | byte[] encodedMsg = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | encodedMsg[0] = type; |
| | | int pos = 1; |
| | | |
| | | /* put the protocol version */ |
| | | headerLength = addByteArray(versionByte, encodedMsg, pos); |
| | | |
| | | return encodedMsg; |
| | | } |
| | | |
| | | /** |
| | | * Decode the Header part of this message, and check its type. |
| | | * |
| | | * @param type The type of this message. |
| | | * @param encodedMsg the encoded form of the message. |
| | | * @return the position at which the remaining part of the message starts. |
| | | * @throws DataFormatException if the encodedMsg does not contain a valid |
| | | * common header. |
| | | */ |
| | | public int decodeHeader(byte type, byte [] encodedMsg) |
| | | throws DataFormatException |
| | | { |
| | | /* first byte is the type */ |
| | | if (encodedMsg[0] != type) |
| | | throw new DataFormatException("byte[] is not a valid msg"); |
| | | |
| | | try |
| | | { |
| | | /* then read the version */ |
| | | int pos = 1; |
| | | int length = getNextLength(encodedMsg, pos); |
| | | protocolVersion = Short.valueOf( |
| | | new String(encodedMsg, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | return pos; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Get the version included in the Start Message mean the replication |
| | | * protocol version used by the server that created the message. |
| | | * |
| | | * @return The version used by the server that created the message. |
| | | */ |
| | | public short getVersion() |
| | | { |
| | | return protocolVersion; |
| | | } |
| | | } |
| | |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.AckMessage; |
| | | import org.opends.server.replication.protocol.ReplServerStartMessage; |
| | | import org.opends.server.replication.protocol.HeartbeatThread; |
| | |
| | | private int saturationCount = 0; |
| | | private short replicationServerId; |
| | | |
| | | private short protocolVersion; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | |
| | | super("Server Handler"); |
| | | this.session = session; |
| | | this.maxQueueSize = queueSize; |
| | | this.protocolVersion = ProtocolVersion.currentVersion(); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | if (baseDn != null) |
| | | { |
| | | // This is an outgoing connection. Publish our start message. |
| | | this.baseDn = baseDn; |
| | | replicationCache = replicationServer.getReplicationCache(baseDn); |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | ReplServerStartMessage msg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | baseDn, windowSize, localServerState); |
| | | baseDn, windowSize, localServerState, |
| | | protocolVersion); |
| | | |
| | | session.publish(msg); |
| | | } |
| | | |
| | | // Wait and process ServerStart or ReplServerStart |
| | | ReplicationMessage msg = session.receive(); |
| | | if (msg instanceof ServerStartMessage) |
| | | { |
| | | // The remote server is an LDAP Server |
| | | ServerStartMessage receivedMsg = (ServerStartMessage) msg; |
| | | |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | receivedMsg.getVersion()); |
| | | serverId = receivedMsg.getServerId(); |
| | | serverURL = receivedMsg.getServerURL(); |
| | | this.baseDn = receivedMsg.getBaseDn(); |
| | |
| | | |
| | | serverIsLDAPserver = true; |
| | | |
| | | // This an incoming connection. Publish our start message |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | ReplServerStartMessage myStartMsg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | this.baseDn, windowSize, localServerState); |
| | | this.baseDn, windowSize, localServerState, |
| | | protocolVersion); |
| | | session.publish(myStartMsg); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | } |
| | | else if (msg instanceof ReplServerStartMessage) |
| | | { |
| | | // The remote server is a replication server |
| | | ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg; |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | receivedMsg.getVersion()); |
| | | serverId = receivedMsg.getServerId(); |
| | | serverURL = receivedMsg.getServerURL(); |
| | | int separator = serverURL.lastIndexOf(':'); |
| | |
| | | { |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | ServerState serverState = replicationCache.getDbServerState(); |
| | | |
| | | // Publish our start message |
| | | ReplServerStartMessage outMsg = |
| | | new ReplServerStartMessage(replicationServerId, |
| | | replicationServerURL, |
| | | this.baseDn, windowSize, serverState); |
| | | this.baseDn, windowSize, serverState, |
| | | protocolVersion); |
| | | session.publish(outMsg); |
| | | } |
| | | else |
| | | { |
| | | this.baseDn = baseDn; |
| | | } |
| | | this.serverState = receivedMsg.getServerState(); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | } |
| | |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // This log will go to the task log message |
| | | logError(ErrorLogCategory.TASK, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | "Initialize Task stopped by error", 1); |
| | | "Initialize Task stopped by error" + de.getErrorMessage(), 1); |
| | | |
| | | return TaskState.STOPPED_BY_ERROR; |
| | | } |
| | |
| | | */ |
| | | @Override public void initializeTask() throws DirectoryException |
| | | { |
| | | if (TaskState.isDone(getTaskState())) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | // FIXME -- Do we need any special authorization here? |
| | | Entry taskEntry = getTaskEntry(); |
| | |
| | | |
| | | domain=ReplicationDomain.retrievesReplicationDomain(domainDN); |
| | | |
| | | |
| | | attrList = taskEntry.getAttribute(typeSourceScope); |
| | | String sourceString = TaskUtils.getSingleValueString(attrList); |
| | | source = domain.decodeSource(sourceString); |
| | |
| | | package org.opends.server.replication; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.testng.Assert.*; |
| | | import static org.testng.Assert.assertEquals; |
| | | import static org.testng.Assert.assertTrue; |
| | | |
| | | import java.io.IOException; |
| | | import java.net.ServerSocket; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | |
| | | assertEquals(modOp.getResultCode(), ResultCode.SUCCESS); |
| | | } |
| | | } |
| | | |
| | | @Test(enabled=true) |
| | | public void protocolVersion() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting Replication ProtocolWindowTest : protocolVersion" , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | // Test : Make a broker degrade its version when connecting to an old |
| | | // replication server. |
| | | ProtocolVersion.setCurrentVersion((short)2); |
| | | |
| | | ReplicationBroker broker = new ReplicationBroker( |
| | | new ServerState(), |
| | | baseDn, |
| | | (short) 13, 0, 0, 0, 0, 1000, 0); |
| | | |
| | | |
| | | // Check broker hard-coded version |
| | | short pversion = broker.getProtocolVersion(); |
| | | assertEquals(pversion, 2); |
| | | |
| | | // Connect the broker to the replication server |
| | | ProtocolVersion.setCurrentVersion((short)0); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + replServerPort); |
| | | broker.start(servers); |
| | | TestCaseUtils.sleep(100); // wait for connection established |
| | | |
| | | // Check broker negociated version |
| | | pversion = broker.getProtocolVersion(); |
| | | assertEquals(pversion, 0); |
| | | } |
| | | } |
| | |
| | | DN.decode(synchroPluginStringDN)), |
| | | "Unable to add the Multimaster replication plugin"); |
| | | |
| | | |
| | | // Add the replication server |
| | | DirectoryServer.getConfigHandler().addEntry(replServerEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(replServerEntry.getDN()), |
| | | if (replServerEntry != null) |
| | | { |
| | | // Add the replication server |
| | | DirectoryServer.getConfigHandler().addEntry(replServerEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(replServerEntry.getDN()), |
| | | "Unable to add the replication server"); |
| | | configEntryList.add(replServerEntry.getDN()); |
| | | configEntryList.add(replServerEntry.getDN()); |
| | | } |
| | | |
| | | // We also have a replicated suffix (replication domain) |
| | | DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), |
| | | "Unable to add the synchronized server"); |
| | | configEntryList.add(synchroServerEntry.getDN()); |
| | | if (synchroServerEntry != null) |
| | | { |
| | | // We also have a replicated suffix (replication domain) |
| | | DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), |
| | | "Unable to add the synchronized server"); |
| | | configEntryList.add(synchroServerEntry.getDN()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | state.update(new ChangeNumber((long)1, 1,(short)1)); |
| | | ServerStartMessage msg = new ServerStartMessage(serverId, baseDN, |
| | | window, window, window, window, window, window, state); |
| | | window, window, window, window, window, window, state, (short)1); |
| | | ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getBaseDn(), newMsg.getBaseDn()); |
| | |
| | | assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval()); |
| | | assertEquals(msg.getServerState().getMaxChangeNumber((short)1), |
| | | newMsg.getServerState().getMaxChangeNumber((short)1)); |
| | | assertEquals(msg.getVersion(), newMsg.getVersion()); |
| | | } |
| | | |
| | | @DataProvider(name="changelogStart") |
| | |
| | | { |
| | | state.update(new ChangeNumber((long)1, 1,(short)1)); |
| | | ReplServerStartMessage msg = new ReplServerStartMessage(serverId, |
| | | url, baseDN, window, state); |
| | | url, baseDN, window, state, (short)1); |
| | | ReplServerStartMessage newMsg = new ReplServerStartMessage(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getBaseDn(), newMsg.getBaseDn()); |
| | |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getServerState().getMaxChangeNumber((short)1), |
| | | newMsg.getServerState().getMaxChangeNumber((short)1)); |
| | | assertEquals(msg.getVersion(), newMsg.getVersion()); |
| | | } |
| | | |
| | | /** |