opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.common; @@ -35,9 +35,13 @@ java.lang.Comparable<ChangeNumber> { private static final long serialVersionUID = -8802722277749190740L; private long timeStamp; private int seqnum; private short serverId; private final long timeStamp; private final int seqnum; private final short serverId; // A String representation of the ChangeNumber suitable for network // transmission. private String formatedString = null;; /** * Create a new ChangeNumber from a String. @@ -54,6 +58,8 @@ temp = str.substring(20, 28); seqnum = Integer.parseInt(temp, 16); formatedString = str; } /** @@ -141,11 +147,27 @@ */ public String toString() { return format(); } /** * Convert the ChangeNumber to a String that is suitable for network * transmission. * * @return the string */ public String format() { if (formatedString != null) return formatedString; return String.format("%016x%04x%08x", timeStamp, serverId, seqnum); } /** * Convert the ChangeNumber to a printable String that is . * Convert the ChangeNumber to a printable String with a user friendly * format. * * @return the string */ public String toStringUI() opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -49,6 +49,7 @@ import java.io.File; import java.io.InputStream; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedHashMap; @@ -869,7 +870,18 @@ } else { pendingChanges.commit(curChangeNumber, msg); // If assured replication is configured, this will prepare blocking // mechanism. If assured replication is disabled, this returns // immediately prepareWaitForAckIfAssuredEnabled(msg); try { msg.encode(); } catch (UnsupportedEncodingException e) { // will be caught at publish time. } pendingChanges.commitAndPushCommittedChanges(curChangeNumber, msg); } } catch (NoSuchElementException e) @@ -892,18 +904,12 @@ if (curChangeNumber != null) { pendingChanges.remove(curChangeNumber); pendingChanges.pushCommittedChanges(); } } if (!op.isSynchronizationOperation()) { // If assured replication is configured, this will prepare blocking // mechanism. If assured replication is disabled, this returns // immediately prepareWaitForAckIfAssuredEnabled(msg); pendingChanges.pushCommittedChanges(); // If assured replication is enabled, this will wait for the matching // ack or time out. If assured replication is disabled, this returns // immediately opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2008 Sun Microsystems, Inc. * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; @@ -113,6 +113,18 @@ public synchronized void commit(ChangeNumber changeNumber, LDAPUpdateMsg msg) { _commit(changeNumber, msg); } /** * Mark an update message as committed. * * @param changeNumber The ChangeNumber of the update message that must be * set as committed. * @param msg The message associated to the update. */ public void _commit(ChangeNumber changeNumber, LDAPUpdateMsg msg) { PendingChange curChange = pendingChanges.get(changeNumber); if (curChange == null) { @@ -149,6 +161,18 @@ */ public synchronized ChangeNumber putLocalOperation(PluginOperation operation) { return _putLocalOperation(operation); } /** * Add a new UpdateMsg to the pending list from the provided local * operation. * * @param operation The local operation for which an UpdateMsg must * be added in the pending list. * @return The ChangeNumber now associated to the operation. */ public ChangeNumber _putLocalOperation(PluginOperation operation) { ChangeNumber changeNumber; changeNumber = changeNumberGenerator.newChangeNumber(); @@ -165,6 +189,15 @@ */ public synchronized int pushCommittedChanges() { return _pushCommittedChanges(); } /** * Push all committed local changes to the replicationServer service. * * @return The number of pushed updates. */ public int _pushCommittedChanges() { int numSentUpdates = 0; if (pendingChanges.isEmpty()) return numSentUpdates; @@ -195,4 +228,24 @@ } return numSentUpdates; } /** * Mark an update message as committed, then * push all committed local changes to the replicationServer service * in a single atomic operation. * * * @param changeNumber The ChangeNumber of the update message that must be * set as committed. * @param msg The message associated to the update. * * @return The number of pushed updates. */ public synchronized int commitAndPushCommittedChanges( ChangeNumber changeNumber, LDAPUpdateMsg msg) { _commit(changeNumber, msg); return _pushCommittedChanges(); } } opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -241,13 +241,13 @@ * error><failed server ids> */ ByteArrayOutputStream oStream = new ByteArrayOutputStream(); ByteArrayOutputStream oStream = new ByteArrayOutputStream(200); /* Put the type of the operation */ oStream.write(MSG_TYPE_ACK); /* Put the ChangeNumber */ byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8"); byte[] changeNumberByte = changeNumber.format().getBytes("UTF-8"); oStream.write(changeNumberByte); oStream.write(0); opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -258,34 +258,41 @@ @Override public byte[] getBytes() throws UnsupportedEncodingException { int length = encodedAttributes.length; byte[] byteParentId = null; if (parentUniqueId != null) if (bytes == null) { byteParentId = parentUniqueId.getBytes("UTF-8"); length += byteParentId.length + 1; int length = encodedAttributes.length; byte[] byteParentId = null; if (parentUniqueId != null) { byteParentId = parentUniqueId.getBytes("UTF-8"); length += byteParentId.length + 1; } else { length += 1; } /* encode the header in a byte[] large enough to also contain the mods */ byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD, length); int pos = resultByteArray.length - length; if (byteParentId != null) pos = addByteArray(byteParentId, resultByteArray, pos); else resultByteArray[pos++] = 0; /* put the attributes */ for (int i=0; i<encodedAttributes.length; i++,pos++) { resultByteArray[pos] = encodedAttributes[i]; } return resultByteArray; } else { length += 1; return bytes; } /* encode the header in a byte[] large enough to also contain the mods */ byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD, length); int pos = resultByteArray.length - length; if (byteParentId != null) pos = addByteArray(byteParentId, resultByteArray, pos); else resultByteArray[pos++] = 0; /* put the attributes */ for (int i=0; i<encodedAttributes.length; i++,pos++) { resultByteArray[pos] = encodedAttributes[i]; } return resultByteArray; } /** opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -106,7 +106,14 @@ @Override public byte[] getBytes() throws UnsupportedEncodingException { return encodeHeader(MSG_TYPE_DELETE, 0); if (bytes == null) { return encodeHeader(MSG_TYPE_DELETE, 0); } else { return bytes; } } /** opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -58,6 +58,11 @@ protected String uniqueId; /** * Encoded form of the LDAPUpdateMsg. */ protected byte[] bytes = null; /** * Creates a new UpdateMsg. */ public LDAPUpdateMsg() @@ -158,6 +163,20 @@ return uniqueId; } /** * Do all the work necessary for the encoding. * * This is useful in case when one wants to perform this outside * of a synchronized portion of code. * * This method is not synchronized and therefore not MT safe. * * @throws UnsupportedEncodingException when encoding fails. */ public void encode() throws UnsupportedEncodingException { bytes = getBytes(); } /** * Create and Operation from the message. opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -524,56 +524,63 @@ */ public byte[] getBytes_V1() throws UnsupportedEncodingException { byte[] byteNewRdn = newRDN.getBytes("UTF-8"); byte[] byteNewSuperior = null; byte[] byteNewSuperiorId = null; // calculate the length necessary to encode the parameters int length = byteNewRdn.length + 1 + 1; if (newSuperior != null) if (bytes == null) { byteNewSuperior = newSuperior.getBytes("UTF-8"); length += byteNewSuperior.length + 1; byte[] byteNewRdn = newRDN.getBytes("UTF-8"); byte[] byteNewSuperior = null; byte[] byteNewSuperiorId = null; // calculate the length necessary to encode the parameters int length = byteNewRdn.length + 1 + 1; if (newSuperior != null) { byteNewSuperior = newSuperior.getBytes("UTF-8"); length += byteNewSuperior.length + 1; } else length += 1; if (newSuperiorId != null) { byteNewSuperiorId = newSuperiorId.getBytes("UTF-8"); length += byteNewSuperiorId.length + 1; } else length += 1; byte[] resultByteArray = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1, length); int pos = resultByteArray.length - length; /* put the new RDN and a terminating 0 */ pos = addByteArray(byteNewRdn, resultByteArray, pos); /* put the newsuperior and a terminating 0 */ if (newSuperior != null) { pos = addByteArray(byteNewSuperior, resultByteArray, pos); } else resultByteArray[pos++] = 0; /* put the newsuperiorId and a terminating 0 */ if (newSuperiorId != null) { pos = addByteArray(byteNewSuperiorId, resultByteArray, pos); } else resultByteArray[pos++] = 0; /* put the deleteoldrdn flag */ if (deleteOldRdn) resultByteArray[pos++] = 1; else resultByteArray[pos++] = 0; return resultByteArray; } else length += 1; if (newSuperiorId != null) { byteNewSuperiorId = newSuperiorId.getBytes("UTF-8"); length += byteNewSuperiorId.length + 1; return bytes; } else length += 1; byte[] resultByteArray = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1, length); int pos = resultByteArray.length - length; /* put the new RDN and a terminating 0 */ pos = addByteArray(byteNewRdn, resultByteArray, pos); /* put the newsuperior and a terminating 0 */ if (newSuperior != null) { pos = addByteArray(byteNewSuperior, resultByteArray, pos); } else resultByteArray[pos++] = 0; /* put the newsuperiorId and a terminating 0 */ if (newSuperiorId != null) { pos = addByteArray(byteNewSuperiorId, resultByteArray, pos); } else resultByteArray[pos++] = 0; /* put the deleteoldrdn flag */ if (deleteOldRdn) resultByteArray[pos++] = 1; else resultByteArray[pos++] = 0; return resultByteArray; } } opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -92,6 +92,8 @@ public ModifyMsg(byte[] in) throws DataFormatException, UnsupportedEncodingException { bytes = in; // Decode header byte[] allowedPduTypes = new byte[2]; allowedPduTypes[0] = MSG_TYPE_MODIFY; @@ -117,19 +119,44 @@ } /** * Creates a new Modify message from a V1 byte[]. * * @param in The byte[] from which the operation must be read. * @throws DataFormatException If the input byte[] is not a valid ModifyMsg * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM. * * @return The created ModifyMsg. */ public static ModifyMsg createV1(byte[] in) throws DataFormatException, UnsupportedEncodingException { ModifyMsg msg = new ModifyMsg(in); msg.bytes = null; return msg; } /** * {@inheritDoc} */ @Override public byte[] getBytes() throws UnsupportedEncodingException { /* encode the header in a byte[] large enough to also contain the mods */ byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1); if (bytes == null) { /* encode the header in a byte[] large enough to also contain the mods */ byte[] mybytes = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1); /* add the mods */ int pos = encodedMsg.length - (encodedMods.length + 1); addByteArray(encodedMods, encodedMsg, pos); /* add the mods */ int pos = mybytes.length - (encodedMods.length + 1); addByteArray(encodedMods, mybytes, pos); return encodedMsg; return mybytes; } else { return bytes; } } /** opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -143,8 +143,10 @@ throw new NotSupportedOldVersionPDUException("Replication Server Info", ProtocolVersion.REPLICATION_PROTOCOL_V1, buffer[0]); case MSG_TYPE_MODIFY: msg = new ModifyMsg(buffer); break; case MSG_TYPE_MODIFY_V1: msg = new ModifyMsg(buffer); msg = ModifyMsg.createV1(buffer); break; case MSG_TYPE_ADD: case MSG_TYPE_ADD_V1: opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import org.opends.messages.MessageBuilder; @@ -86,13 +86,15 @@ // the threads calling add() method will be blocked if the size of // msgQueue becomes larger than the queueHimark and will resume // only when the size of the msgQueue goes below queueLowmark. int queueHimark = 5000; int queueLowmark = 4000; int queueMaxSize = 5000; int queueLowmark = 1000; int queueHimark = 4000; // The queue himark and lowmark in bytes, this is set to 100 times the // himark and lowmark in number of updates. int queueHimarkBytes = 100 * queueHimark; int queueMaxBytes = 100 * queueMaxSize; int queueLowmarkBytes = 100 * queueLowmark; int queueHimarkBytes = 100 * queueHimark; // The number of bytes currently in the queue int queueByteSize = 0; @@ -140,10 +142,12 @@ serverId = id; this.baseDn = baseDn; trimage = replicationServer.getTrimage(); queueHimark = queueSize; queueLowmark = queueSize * 4 / 5; queueHimarkBytes = 100 * queueHimark; queueLowmarkBytes = 100 * queueLowmark; queueMaxSize = queueSize; queueLowmark = queueSize * 1 / 5; queueHimark = queueSize * 4 / 5; queueMaxBytes = 200 * queueMaxSize; queueLowmarkBytes = 200 * queueLowmark; queueHimarkBytes = 200 * queueLowmark; db = new ReplicationDB(id, baseDn, replicationServer, dbenv); firstChange = db.readFirstChange(); lastChange = db.readLastChange(); @@ -171,11 +175,14 @@ synchronized (msgQueue) { int size = msgQueue.size(); while ((size > queueHimark) || (queueByteSize > queueHimarkBytes)) if ((size > queueHimark) || (queueByteSize > queueHimarkBytes)) msgQueue.notify(); while ((size > queueMaxSize) || (queueByteSize > queueMaxBytes)) { try { msgQueue.wait(500); msgQueue.wait(5000); } catch (InterruptedException e) { // simply loop to try again. @@ -379,17 +386,22 @@ { while (shutdown == false) { try { try { flush(); trim(); synchronized (this) synchronized (msgQueue) { try if ((msgQueue.size() < queueLowmark) && (queueByteSize < queueLowmarkBytes)) { this.wait(1000); } catch (InterruptedException e) { } try { msgQueue.wait(10000); } catch (InterruptedException e) { } } } } catch (Exception end) { @@ -434,56 +446,59 @@ int tries = 0; while ((tries++ < DEADLOCK_RETRIES) && (!done)) { /* the trim is done by group in order to save some CPU and IO bandwidth * start the transaction then do a bunch of remove then commit */ ReplServerDBCursor cursor; cursor = db.openDeleteCursor(); try synchronized (flushLock) { while ((size < 5000 ) && (!finished)) /* the trim is done by group in order to save some CPU and IO bandwidth * start the transaction then do a bunch of remove then commit */ ReplServerDBCursor cursor; cursor = db.openDeleteCursor(); try { ChangeNumber changeNumber = cursor.nextChangeNumber(); if (changeNumber != null) while ((size < 5000 ) && (!finished)) { if ((!changeNumber.equals(lastChange)) && (changeNumber.older(trimDate))) ChangeNumber changeNumber = cursor.nextChangeNumber(); if (changeNumber != null) { size++; cursor.delete(); if ((!changeNumber.equals(lastChange)) && (changeNumber.older(trimDate))) { size++; cursor.delete(); } else { firstChange = changeNumber; finished = true; } } else { firstChange = changeNumber; finished = true; } } else finished = true; cursor.close(); done = true; } cursor.close(); done = true; } catch (DeadlockException e) { cursor.abort(); if (tries == DEADLOCK_RETRIES) catch (DeadlockException e) { // could not handle the Deadlock after DEADLOCK_RETRIES tries. // shutdown the ReplicationServer. cursor.abort(); if (tries == DEADLOCK_RETRIES) { // could not handle the Deadlock after DEADLOCK_RETRIES tries. // shutdown the ReplicationServer. shutdown = true; throw (e); } } catch (DatabaseException e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() shutdown = true; cursor.abort(); throw (e); } } catch (DatabaseException e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() shutdown = true; cursor.abort(); throw (e); } } } @@ -493,7 +508,7 @@ private void flush() { int size; int chunksize = (500 < queueHimark ? 500 : queueHimark); int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize); do { @@ -630,9 +645,10 @@ { msgQueue.clear(); queueByteSize = 0; db.clear(); firstChange = db.readFirstChange(); lastChange = db.readLastChange(); } db.clear(); firstChange = db.readFirstChange(); lastChange = db.readLastChange(); } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -626,12 +626,12 @@ logError(errorMsg); } else if (sourceGroupId != groupId) { // Assured feature does not cross different group ids // Assured feature does not cross different group IDS } else { if ((generationId > 0) && (generationId == sourceHandler.getGenerationId())) // Ignore assured updates from wrong generationid servers // Ignore assured updates from wrong generationId servers { if (sourceHandler.isLDAPserver()) { @@ -662,7 +662,7 @@ } } } else { // A RS sent us the safe data message, for sure no futher acks to wait { // A RS sent us the safe data message, for sure no further ack to wait if (safeDataLevel == (byte) 1) { /** opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication; @@ -978,7 +978,7 @@ Message expectedMessage) { TaskState taskState = null; int cpt=10; int cpt=40; try { SearchFilter filter = opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server;