| | |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | */ |
| | | |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.File; |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | | import java.io.StringReader; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.io.*; |
| | | import java.util.*; |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | |
| | | import org.opends.server.backends.jeb.BackendImpl; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.LockFileManager; |
| | | import org.opends.server.core.ModifyDNOperation; |
| | | import org.opends.server.core.ModifyDNOperationBasis; |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.core.ModifyOperationBasis; |
| | | import org.opends.server.core.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.protocols.asn1.ASN1Exception; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | |
| | | import org.opends.server.protocols.ldap.LDAPControl; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.protocols.ldap.LDAPModification; |
| | | import org.opends.server.replication.common.AssuredMode; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.service.ReplicationBroker; |
| | | import org.opends.server.replication.service.ReplicationDomain; |
| | |
| | | import org.opends.server.tasks.PurgeConflictsHistoricalTask; |
| | | import org.opends.server.tasks.TaskUtils; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.types.operation.PluginOperation; |
| | | import org.opends.server.types.operation.PostOperationAddOperation; |
| | | import org.opends.server.types.operation.PostOperationDeleteOperation; |
| | | import org.opends.server.types.operation.PostOperationModifyDNOperation; |
| | | import org.opends.server.types.operation.PostOperationModifyOperation; |
| | | import org.opends.server.types.operation.PostOperationOperation; |
| | | import org.opends.server.types.operation.PreOperationAddOperation; |
| | | import org.opends.server.types.operation.PreOperationDeleteOperation; |
| | | import org.opends.server.types.operation.PreOperationModifyDNOperation; |
| | | import org.opends.server.types.operation.PreOperationModifyOperation; |
| | | import org.opends.server.types.operation.*; |
| | | import org.opends.server.util.LDIFReader; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | // The update to replay message queue where the listener thread is going to |
| | | // push incoming update messages. |
| | | /** |
| | | * The update to replay message queue where the listener thread is going to |
| | | * push incoming update messages. |
| | | */ |
| | | private final BlockingQueue<UpdateToReplay> updateToReplayQueue; |
| | | private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger(); |
| | | private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger(); |
| | |
| | | private volatile boolean disabled = false; |
| | | private volatile boolean stateSavingDisabled = false; |
| | | |
| | | // This list is used to temporary store operations that needs |
| | | // to be replayed at session establishment time. |
| | | private final TreeMap<ChangeNumber, FakeOperation> replayOperations = |
| | | /** |
| | | * This list is used to temporary store operations that needs to be replayed |
| | | * at session establishment time. |
| | | */ |
| | | private final SortedMap<ChangeNumber, FakeOperation> replayOperations = |
| | | new TreeMap<ChangeNumber, FakeOperation>(); |
| | | |
| | | /** |
| | |
| | | * Fractional replication variables. |
| | | */ |
| | | |
| | | // Holds the fractional configuration for this domain, if any. |
| | | /** Holds the fractional configuration for this domain, if any. */ |
| | | private FractionalConfig fractionalConfig = null; |
| | | |
| | | /** |
| | |
| | | * fractionalFilterOperation(PreOperationModifyOperation |
| | | * modifyOperation, boolean performFiltering) method |
| | | */ |
| | | // The operation contains attributes subject to fractional filtering according |
| | | // to the fractional configuration |
| | | /** |
| | | * The operation contains attributes subject to fractional filtering according |
| | | * to the fractional configuration. |
| | | */ |
| | | private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1; |
| | | // The operation contains no attributes subject to fractional filtering |
| | | // according to the fractional configuration |
| | | /** |
| | | * The operation contains no attributes subject to fractional filtering |
| | | * according to the fractional configuration. |
| | | */ |
| | | private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2; |
| | | // The operation should become a no-op |
| | | /** The operation should become a no-op. */ |
| | | private static final int FRACTIONAL_BECOME_NO_OP = 3; |
| | | |
| | | // This configuration boolean indicates if this ReplicationDomain should log |
| | | // ChangeNumbers. |
| | | /** |
| | | * This configuration boolean indicates if this ReplicationDomain should log |
| | | * ChangeNumbers. |
| | | */ |
| | | private boolean logChangeNumber = false; |
| | | |
| | | // This configuration integer indicates the time the domain keeps the |
| | | // historical information necessary to solve conflicts. |
| | | // When a change stored in the historical part of the user entry has a date |
| | | // (from its replication ChangeNumber) older than this delay, it is candidate |
| | | // to be purged. |
| | | /** |
| | | * This configuration integer indicates the time the domain keeps the |
| | | * historical information necessary to solve conflicts.<br> |
| | | * When a change stored in the historical part of the user entry has a date |
| | | * (from its replication ChangeNumber) older than this delay, it is candidate |
| | | * to be purged. |
| | | */ |
| | | private long histPurgeDelayInMilliSec = 0; |
| | | |
| | | // The last change number purged in this domain. Allows to have a continuous |
| | | // purging process from one purge processing (task run) to the next one. |
| | | // Values 0 when the server starts. |
| | | /** |
| | | * The last change number purged in this domain. Allows to have a continuous |
| | | * purging process from one purge processing (task run) to the next one. |
| | | * Values 0 when the server starts. |
| | | */ |
| | | private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0); |
| | | |
| | | /** |
| | |
| | | if (fractionalConfig.isFractional()) |
| | | { |
| | | // Set new fractional configuration values |
| | | if (newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL) |
| | | fractionalConfig.setFractionalExclusive(true); |
| | | else |
| | | fractionalConfig.setFractionalExclusive(false); |
| | | fractionalConfig.setFractionalExclusive( |
| | | newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL); |
| | | fractionalConfig.setFractionalSpecificClassesAttributes( |
| | | newFractionalConfig.getFractionalSpecificClassesAttributes()); |
| | | fractionalConfig.setFractionalAllClassesAttributes( |
| | |
| | | // Set stored fractional configuration values |
| | | if (storedFractionalConfig.isFractional()) |
| | | { |
| | | if (storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL) |
| | | storedFractionalConfig.setFractionalExclusive(true); |
| | | else |
| | | storedFractionalConfig.setFractionalExclusive(false); |
| | | storedFractionalConfig.setFractionalExclusive( |
| | | storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL); |
| | | } |
| | | storedFractionalConfig.setFractionalSpecificClassesAttributes( |
| | | storedFractionalSpecificClassesAttributes); |
| | |
| | | AttributeValue rdnAttributeValue = |
| | | entryRdn.getAttributeValue(attributeType); |
| | | List<Attribute> attrList = attributesMap.get(attributeType); |
| | | Iterator<Attribute> attrIt = attrList.iterator(); |
| | | AttributeValue sameAttrValue = null; |
| | | // Locate the attribute value identical to the one in the RDN |
| | | while(attrIt.hasNext()) |
| | | for (Attribute attr : attrList) |
| | | { |
| | | Attribute attr = attrIt.next(); |
| | | if (attr.contains(rdnAttributeValue)) |
| | | { |
| | | for (AttributeValue attrValue : attr) { |
| | |
| | | /** |
| | | * Create and replay a synchronized Operation from an UpdateMsg. |
| | | * |
| | | * @param msg The UpdateMsg to be replayed. |
| | | * @param msg |
| | | * The UpdateMsg to be replayed. |
| | | * @param shutdown |
| | | * whether the server initiated shutdown |
| | | */ |
| | | public void replay(LDAPUpdateMsg msg) |
| | | public void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown) |
| | | { |
| | | Operation op = null; |
| | | boolean replayDone = false; |
| | |
| | | |
| | | while ((!dependency) && (!replayDone) && (retryCount-- > 0)) |
| | | { |
| | | if (shutdown.get()) |
| | | { |
| | | // shutdown initiated, let's leave |
| | | return; |
| | | } |
| | | // Try replay the operation |
| | | op.setInternalOperation(true); |
| | | op.setSynchronizationOperation(true); |
| | |
| | | // renamed by a more recent modify DN. |
| | | replayDone = true; |
| | | } |
| | | else if (result == ResultCode.BUSY) |
| | | { |
| | | /* |
| | | * We probably could not get a lock (OPENDJ-885). Give the server |
| | | * another chance to process this operation immediately. |
| | | */ |
| | | Thread.yield(); |
| | | continue; |
| | | } |
| | | else if (result == ResultCode.UNAVAILABLE) |
| | | { |
| | | /* |
| | | * It can happen when a rebuild is performed or the backend is |
| | | * offline (OPENDJ-49). Give the server another chance to process |
| | | * this operation after some time. |
| | | */ |
| | | Thread.sleep(50); |
| | | continue; |
| | | } |
| | | else if (op instanceof ModifyOperation) |
| | | { |
| | | ModifyOperation newOp = (ModifyOperation) op; |
| | |
| | | } |
| | | } catch (ASN1Exception e) |
| | | { |
| | | Message message = ERR_EXCEPTION_DECODING_OPERATION.get( |
| | | String.valueOf(msg) + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | replayErrorMsg = message.toString(); |
| | | replayErrorMsg = logDecodingOperationError(msg, e); |
| | | } catch (LDAPException e) |
| | | { |
| | | Message message = ERR_EXCEPTION_DECODING_OPERATION.get( |
| | | String.valueOf(msg) + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | replayErrorMsg = message.toString(); |
| | | replayErrorMsg = logDecodingOperationError(msg, e); |
| | | } catch (DataFormatException e) |
| | | { |
| | | Message message = ERR_EXCEPTION_DECODING_OPERATION.get( |
| | | String.valueOf(msg) + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | replayErrorMsg = message.toString(); |
| | | replayErrorMsg = logDecodingOperationError(msg, e); |
| | | } catch (Exception e) |
| | | { |
| | | if (changeNumber != null) |
| | |
| | | updateError(changeNumber); |
| | | } else |
| | | { |
| | | Message message = ERR_EXCEPTION_DECODING_OPERATION.get( |
| | | String.valueOf(msg) + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | replayErrorMsg = message.toString(); |
| | | replayErrorMsg = logDecodingOperationError(msg, e); |
| | | } |
| | | } finally |
| | | { |
| | |
| | | } while (msg != null); |
| | | } |
| | | |
| | | private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e) |
| | | { |
| | | Message message = ERR_EXCEPTION_DECODING_OPERATION.get( |
| | | String.valueOf(msg) + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | return message.toString(); |
| | | } |
| | | |
| | | /** |
| | | * This method is called when an error happens while replaying |
| | | * an operation. |
| | |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public boolean processUpdate(UpdateMsg updateMsg) |
| | | public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown) |
| | | { |
| | | // Ignore message if fractional configuration is inconsistent and |
| | | // we have been passed into bad data set status |
| | |
| | | |
| | | // Put update message into the replay queue |
| | | // (block until some place in the queue is available) |
| | | UpdateToReplay updateToReplay = new UpdateToReplay(msg, this); |
| | | updateToReplayQueue.offer(updateToReplay); |
| | | final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this); |
| | | while (!shutdown.get()) |
| | | { |
| | | // loop until we can offer to the queue or shutdown was initiated |
| | | try |
| | | { |
| | | if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS)) |
| | | { |
| | | // successful offer to the queue, let's exit the loop |
| | | break; |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Thread interrupted: check for shutdown. |
| | | } |
| | | } |
| | | |
| | | return false; |
| | | } |
| | |
| | | case EXCLUSIVE_FRACTIONAL: |
| | | case INCLUSIVE_FRACTIONAL: |
| | | result.setFractional(true); |
| | | if (newFractionalMode == EXCLUSIVE_FRACTIONAL) |
| | | result.setFractionalExclusive(true); |
| | | else |
| | | result.setFractionalExclusive(false); |
| | | result.setFractionalExclusive( |
| | | newFractionalMode == EXCLUSIVE_FRACTIONAL); |
| | | break; |
| | | } |
| | | result.setFractionalSpecificClassesAttributes( |