Those changes tries to reduce replication overhead
- improve the encoding /decoding of replication messages
- move some of the processing outside of the PendingChanges lock
- Better management of the Replication Server flush thread so that the
replication log is not the replication bottleneck
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.common; |
| | | |
| | |
| | | java.lang.Comparable<ChangeNumber> |
| | | { |
| | | private static final long serialVersionUID = -8802722277749190740L; |
| | | private long timeStamp; |
| | | private int seqnum; |
| | | private short serverId; |
| | | private final long timeStamp; |
| | | private final int seqnum; |
| | | private final short serverId; |
| | | |
| | | // A String representation of the ChangeNumber suitable for network |
| | | // transmission. |
| | | private String formatedString = null;; |
| | | |
| | | /** |
| | | * Create a new ChangeNumber from a String. |
| | |
| | | |
| | | temp = str.substring(20, 28); |
| | | seqnum = Integer.parseInt(temp, 16); |
| | | |
| | | formatedString = str; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public String toString() |
| | | { |
| | | return format(); |
| | | } |
| | | |
| | | /** |
| | | * Convert the ChangeNumber to a String that is suitable for network |
| | | * transmission. |
| | | * |
| | | * @return the string |
| | | */ |
| | | public String format() |
| | | { |
| | | if (formatedString != null) |
| | | return formatedString; |
| | | |
| | | return String.format("%016x%04x%08x", timeStamp, serverId, seqnum); |
| | | } |
| | | |
| | | /** |
| | | * Convert the ChangeNumber to a printable String that is . |
| | | * Convert the ChangeNumber to a printable String with a user friendly |
| | | * format. |
| | | * |
| | | * @return the string |
| | | */ |
| | | public String toStringUI() |
| | |
| | | import java.io.File; |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.ArrayList; |
| | | import java.util.HashSet; |
| | | import java.util.LinkedHashMap; |
| | |
| | | } |
| | | else |
| | | { |
| | | pendingChanges.commit(curChangeNumber, msg); |
| | | // If assured replication is configured, this will prepare blocking |
| | | // mechanism. If assured replication is disabled, this returns |
| | | // immediately |
| | | prepareWaitForAckIfAssuredEnabled(msg); |
| | | try |
| | | { |
| | | msg.encode(); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // will be caught at publish time. |
| | | } |
| | | pendingChanges.commitAndPushCommittedChanges(curChangeNumber, msg); |
| | | } |
| | | } |
| | | catch (NoSuchElementException e) |
| | |
| | | if (curChangeNumber != null) |
| | | { |
| | | pendingChanges.remove(curChangeNumber); |
| | | pendingChanges.pushCommittedChanges(); |
| | | } |
| | | } |
| | | |
| | | if (!op.isSynchronizationOperation()) |
| | | { |
| | | // If assured replication is configured, this will prepare blocking |
| | | // mechanism. If assured replication is disabled, this returns |
| | | // immediately |
| | | prepareWaitForAckIfAssuredEnabled(msg); |
| | | |
| | | pendingChanges.pushCommittedChanges(); |
| | | |
| | | // If assured replication is enabled, this will wait for the matching |
| | | // ack or time out. If assured replication is disabled, this returns |
| | | // immediately |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2008 Sun Microsystems, Inc. |
| | | * Copyright 2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | |
| | | public synchronized void commit(ChangeNumber changeNumber, |
| | | LDAPUpdateMsg msg) |
| | | { |
| | | _commit(changeNumber, msg); |
| | | } |
| | | /** |
| | | * Mark an update message as committed. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update message that must be |
| | | * set as committed. |
| | | * @param msg The message associated to the update. |
| | | */ |
| | | public void _commit(ChangeNumber changeNumber, |
| | | LDAPUpdateMsg msg) |
| | | { |
| | | PendingChange curChange = pendingChanges.get(changeNumber); |
| | | if (curChange == null) |
| | | { |
| | |
| | | */ |
| | | public synchronized ChangeNumber putLocalOperation(PluginOperation operation) |
| | | { |
| | | return _putLocalOperation(operation); |
| | | } |
| | | /** |
| | | * Add a new UpdateMsg to the pending list from the provided local |
| | | * operation. |
| | | * |
| | | * @param operation The local operation for which an UpdateMsg must |
| | | * be added in the pending list. |
| | | * @return The ChangeNumber now associated to the operation. |
| | | */ |
| | | public ChangeNumber _putLocalOperation(PluginOperation operation) |
| | | { |
| | | ChangeNumber changeNumber; |
| | | |
| | | changeNumber = changeNumberGenerator.newChangeNumber(); |
| | |
| | | */ |
| | | public synchronized int pushCommittedChanges() |
| | | { |
| | | return _pushCommittedChanges(); |
| | | } |
| | | /** |
| | | * Push all committed local changes to the replicationServer service. |
| | | * |
| | | * @return The number of pushed updates. |
| | | */ |
| | | public int _pushCommittedChanges() |
| | | { |
| | | int numSentUpdates = 0; |
| | | if (pendingChanges.isEmpty()) |
| | | return numSentUpdates; |
| | |
| | | } |
| | | return numSentUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Mark an update message as committed, then |
| | | * push all committed local changes to the replicationServer service |
| | | * in a single atomic operation. |
| | | * |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update message that must be |
| | | * set as committed. |
| | | * @param msg The message associated to the update. |
| | | * |
| | | * @return The number of pushed updates. |
| | | */ |
| | | public synchronized int commitAndPushCommittedChanges( |
| | | ChangeNumber changeNumber, |
| | | LDAPUpdateMsg msg) |
| | | { |
| | | _commit(changeNumber, msg); |
| | | return _pushCommittedChanges(); |
| | | } |
| | | } |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | * error><failed server ids> |
| | | */ |
| | | |
| | | ByteArrayOutputStream oStream = new ByteArrayOutputStream(); |
| | | ByteArrayOutputStream oStream = new ByteArrayOutputStream(200); |
| | | |
| | | /* Put the type of the operation */ |
| | | oStream.write(MSG_TYPE_ACK); |
| | | |
| | | /* Put the ChangeNumber */ |
| | | byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8"); |
| | | byte[] changeNumberByte = changeNumber.format().getBytes("UTF-8"); |
| | | oStream.write(changeNumberByte); |
| | | oStream.write(0); |
| | | |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | @Override |
| | | public byte[] getBytes() throws UnsupportedEncodingException |
| | | { |
| | | if (bytes == null) |
| | | { |
| | | int length = encodedAttributes.length; |
| | | byte[] byteParentId = null; |
| | | if (parentUniqueId != null) |
| | |
| | | } |
| | | return resultByteArray; |
| | | } |
| | | else |
| | | { |
| | | return bytes; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | @Override |
| | | public byte[] getBytes() throws UnsupportedEncodingException |
| | | { |
| | | if (bytes == null) |
| | | { |
| | | return encodeHeader(MSG_TYPE_DELETE, 0); |
| | | } |
| | | else |
| | | { |
| | | return bytes; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | protected String uniqueId; |
| | | |
| | | /** |
| | | * Encoded form of the LDAPUpdateMsg. |
| | | */ |
| | | protected byte[] bytes = null; |
| | | |
| | | /** |
| | | * Creates a new UpdateMsg. |
| | | */ |
| | | public LDAPUpdateMsg() |
| | |
| | | return uniqueId; |
| | | } |
| | | |
| | | /** |
| | | * Do all the work necessary for the encoding. |
| | | * |
| | | * This is useful in case when one wants to perform this outside |
| | | * of a synchronized portion of code. |
| | | * |
| | | * This method is not synchronized and therefore not MT safe. |
| | | * |
| | | * @throws UnsupportedEncodingException when encoding fails. |
| | | */ |
| | | public void encode() throws UnsupportedEncodingException |
| | | { |
| | | bytes = getBytes(); |
| | | } |
| | | |
| | | /** |
| | | * Create and Operation from the message. |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | */ |
| | | public byte[] getBytes_V1() throws UnsupportedEncodingException |
| | | { |
| | | if (bytes == null) |
| | | { |
| | | byte[] byteNewRdn = newRDN.getBytes("UTF-8"); |
| | | byte[] byteNewSuperior = null; |
| | | byte[] byteNewSuperiorId = null; |
| | |
| | | |
| | | return resultByteArray; |
| | | } |
| | | else |
| | | { |
| | | return bytes; |
| | | } |
| | | } |
| | | } |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | public ModifyMsg(byte[] in) throws DataFormatException, |
| | | UnsupportedEncodingException |
| | | { |
| | | bytes = in; |
| | | |
| | | // Decode header |
| | | byte[] allowedPduTypes = new byte[2]; |
| | | allowedPduTypes[0] = MSG_TYPE_MODIFY; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Creates a new Modify message from a V1 byte[]. |
| | | * |
| | | * @param in The byte[] from which the operation must be read. |
| | | * @throws DataFormatException If the input byte[] is not a valid ModifyMsg |
| | | * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM. |
| | | * |
| | | * @return The created ModifyMsg. |
| | | */ |
| | | public static ModifyMsg createV1(byte[] in) throws DataFormatException, |
| | | UnsupportedEncodingException |
| | | { |
| | | ModifyMsg msg = new ModifyMsg(in); |
| | | msg.bytes = null; |
| | | |
| | | return msg; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public byte[] getBytes() throws UnsupportedEncodingException |
| | | { |
| | | if (bytes == null) |
| | | { |
| | | /* encode the header in a byte[] large enough to also contain the mods */ |
| | | byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1); |
| | | byte[] mybytes = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1); |
| | | |
| | | /* add the mods */ |
| | | int pos = encodedMsg.length - (encodedMods.length + 1); |
| | | addByteArray(encodedMods, encodedMsg, pos); |
| | | int pos = mybytes.length - (encodedMods.length + 1); |
| | | addByteArray(encodedMods, mybytes, pos); |
| | | |
| | | return encodedMsg; |
| | | return mybytes; |
| | | } |
| | | else |
| | | { |
| | | return bytes; |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | throw new NotSupportedOldVersionPDUException("Replication Server Info", |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V1, buffer[0]); |
| | | case MSG_TYPE_MODIFY: |
| | | case MSG_TYPE_MODIFY_V1: |
| | | msg = new ModifyMsg(buffer); |
| | | break; |
| | | case MSG_TYPE_MODIFY_V1: |
| | | msg = ModifyMsg.createV1(buffer); |
| | | break; |
| | | case MSG_TYPE_ADD: |
| | | case MSG_TYPE_ADD_V1: |
| | | msg = new AddMsg(buffer); |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.MessageBuilder; |
| | |
| | | // the threads calling add() method will be blocked if the size of |
| | | // msgQueue becomes larger than the queueHimark and will resume |
| | | // only when the size of the msgQueue goes below queueLowmark. |
| | | int queueHimark = 5000; |
| | | int queueLowmark = 4000; |
| | | int queueMaxSize = 5000; |
| | | int queueLowmark = 1000; |
| | | int queueHimark = 4000; |
| | | |
| | | // The queue himark and lowmark in bytes, this is set to 100 times the |
| | | // himark and lowmark in number of updates. |
| | | int queueHimarkBytes = 100 * queueHimark; |
| | | int queueMaxBytes = 100 * queueMaxSize; |
| | | int queueLowmarkBytes = 100 * queueLowmark; |
| | | int queueHimarkBytes = 100 * queueHimark; |
| | | |
| | | // The number of bytes currently in the queue |
| | | int queueByteSize = 0; |
| | |
| | | serverId = id; |
| | | this.baseDn = baseDn; |
| | | trimage = replicationServer.getTrimage(); |
| | | queueHimark = queueSize; |
| | | queueLowmark = queueSize * 4 / 5; |
| | | queueHimarkBytes = 100 * queueHimark; |
| | | queueLowmarkBytes = 100 * queueLowmark; |
| | | queueMaxSize = queueSize; |
| | | queueLowmark = queueSize * 1 / 5; |
| | | queueHimark = queueSize * 4 / 5; |
| | | queueMaxBytes = 200 * queueMaxSize; |
| | | queueLowmarkBytes = 200 * queueLowmark; |
| | | queueHimarkBytes = 200 * queueLowmark; |
| | | db = new ReplicationDB(id, baseDn, replicationServer, dbenv); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | |
| | | synchronized (msgQueue) |
| | | { |
| | | int size = msgQueue.size(); |
| | | while ((size > queueHimark) || (queueByteSize > queueHimarkBytes)) |
| | | if ((size > queueHimark) || (queueByteSize > queueHimarkBytes)) |
| | | msgQueue.notify(); |
| | | |
| | | while ((size > queueMaxSize) || (queueByteSize > queueMaxBytes)) |
| | | { |
| | | try |
| | | { |
| | | msgQueue.wait(500); |
| | | msgQueue.wait(5000); |
| | | } catch (InterruptedException e) |
| | | { |
| | | // simply loop to try again. |
| | |
| | | { |
| | | while (shutdown == false) |
| | | { |
| | | try { |
| | | try |
| | | { |
| | | flush(); |
| | | trim(); |
| | | |
| | | synchronized (this) |
| | | synchronized (msgQueue) |
| | | { |
| | | if ((msgQueue.size() < queueLowmark) && |
| | | (queueByteSize < queueLowmarkBytes)) |
| | | { |
| | | try |
| | | { |
| | | this.wait(1000); |
| | | msgQueue.wait(10000); |
| | | } catch (InterruptedException e) |
| | | { } |
| | | } |
| | | } |
| | | } catch (Exception end) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | |
| | | int tries = 0; |
| | | while ((tries++ < DEADLOCK_RETRIES) && (!done)) |
| | | { |
| | | synchronized (flushLock) |
| | | { |
| | | /* the trim is done by group in order to save some CPU and IO bandwidth |
| | | * start the transaction then do a bunch of remove then commit |
| | | */ |
| | |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Flush a number of updates from the memory list to the stable storage. |
| | |
| | | private void flush() |
| | | { |
| | | int size; |
| | | int chunksize = (500 < queueHimark ? 500 : queueHimark); |
| | | int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize); |
| | | |
| | | do |
| | | { |
| | |
| | | { |
| | | msgQueue.clear(); |
| | | queueByteSize = 0; |
| | | } |
| | | |
| | | db.clear(); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | } |
| | | } |
| | | } |
| | |
| | | logError(errorMsg); |
| | | } else if (sourceGroupId != groupId) |
| | | { |
| | | // Assured feature does not cross different group ids |
| | | // Assured feature does not cross different group IDS |
| | | } else |
| | | { |
| | | if ((generationId > 0) && |
| | | (generationId == sourceHandler.getGenerationId())) |
| | | // Ignore assured updates from wrong generationid servers |
| | | // Ignore assured updates from wrong generationId servers |
| | | { |
| | | if (sourceHandler.isLDAPserver()) |
| | | { |
| | |
| | | } |
| | | } |
| | | } else |
| | | { // A RS sent us the safe data message, for sure no futher acks to wait |
| | | { // A RS sent us the safe data message, for sure no further ack to wait |
| | | if (safeDataLevel == (byte) 1) |
| | | { |
| | | /** |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication; |
| | | |
| | |
| | | Message expectedMessage) |
| | | { |
| | | TaskState taskState = null; |
| | | int cpt=10; |
| | | int cpt=40; |
| | | try |
| | | { |
| | | SearchFilter filter = |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |