| | |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | */ |
| | | |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.zip.DataFormatException; |
| | | import java.util.Date; |
| | | |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*; |
| | | import org.opends.server.admin.std.server.ExternalChangelogDomainCfg; |
| | |
| | | import org.opends.server.util.LDIFReader; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | import org.opends.server.workflowelement.localbackend.*; |
| | | import org.opends.server.tasks.PurgeConflictsHistoricalTask; |
| | | |
| | | /** |
| | | * This class implements the bulk part of the.of the Directory Server side |
| | |
| | | */ |
| | | public static final String DS_SYNC_CONFLICT = "ds-sync-conflict"; |
| | | |
| | | /** |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | |
| | | // ChangeNumbers. |
| | | private boolean logChangeNumber = false; |
| | | |
| | | // This configuration integer indicates the time the domain keeps the |
| | | // historical information necessary to solve conflicts. |
| | | // When a change stored in the historical part of the user entry has a date |
| | | // (from its replication ChangeNumber) older than this delay, it is candidate |
| | | // to be purged. |
| | | private long histPurgeDelayInMilliSec = 0; |
| | | |
| | | // The last change number purged in this domain. Allows to have a continuous |
| | | // purging process from one purge processing (task run) to the next one. |
| | | // Values 0 when the server starts. |
| | | private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0); |
| | | |
| | | /** |
| | | * The thread that periodically saves the ServerState of this |
| | | * LDAPReplicationDomain in the database. |
| | |
| | | |
| | | // Read the configuration parameters. |
| | | Set<String> replicationServers = configuration.getReplicationServer(); |
| | | serverId = configuration.getServerId(); |
| | | baseDn = configuration.getBaseDN(); |
| | | |
| | | this.serverId = configuration.getServerId(); |
| | | this.baseDn = configuration.getBaseDN(); |
| | | int window = configuration.getWindowSize(); |
| | | heartbeatInterval = configuration.getHeartbeatInterval(); |
| | | isolationpolicy = configuration.getIsolationPolicy(); |
| | | configDn = configuration.dn(); |
| | | logChangeNumber = configuration.isLogChangenumber(); |
| | | this.isolationpolicy = configuration.getIsolationPolicy(); |
| | | this.configDn = configuration.dn(); |
| | | this.logChangeNumber = configuration.isLogChangenumber(); |
| | | this.updateToReplayQueue = updateToReplayQueue; |
| | | this.histPurgeDelayInMilliSec = |
| | | configuration.getConflictsHistoricalPurgeDelay()*60*1000; |
| | | |
| | | // Get assured configuration |
| | | readAssuredConfig(configuration, false); |
| | |
| | | * Check if the operation that just happened has cleared a conflict : |
| | | * Clearing a conflict happens if the operation has free a DN that |
| | | * for which an other entry was in conflict. |
| | | * Steps: |
| | | * - get the DN freed by a DELETE or MODRDN op |
| | | * - search for entries put in the conflict space (dn=entryuid'+'....) |
| | | * because the expected DN was not available (ds-sync-conflict=expected DN) |
| | | * - retain the entry with the oldest conflict |
| | | * - rename this entry with the freedDN as it was expected originally |
| | | */ |
| | | private void checkForClearedConflict(PostOperationOperation op) |
| | | { |
| | |
| | | return; |
| | | } |
| | | |
| | | DN targetDN; |
| | | DN freedDN; |
| | | if (type == OperationType.DELETE) |
| | | { |
| | | targetDN = ((PostOperationDeleteOperation) op).getEntryDN(); |
| | | freedDN = ((PostOperationDeleteOperation) op).getEntryDN(); |
| | | } |
| | | else if (type == OperationType.MODIFY_DN) |
| | | { |
| | | targetDN = ((PostOperationModifyDNOperation) op).getEntryDN(); |
| | | freedDN = ((PostOperationModifyDNOperation) op).getEntryDN(); |
| | | } |
| | | else |
| | | { |
| | |
| | | try |
| | | { |
| | | filter = LDAPFilter.decode( |
| | | DS_SYNC_CONFLICT + "=" + targetDN.toNormalizedString()); |
| | | DS_SYNC_CONFLICT + "=" + freedDN.toNormalizedString()); |
| | | } catch (LDAPException e) |
| | | { |
| | | // Not possible. We know the filter just above is correct. |
| | |
| | | { |
| | | DN entryDN = entrytoRename.getDN(); |
| | | ModifyDNOperationBasis newOp = renameEntry( |
| | | entryDN, targetDN.getRDN(), targetDN.getParent(), false); |
| | | entryDN, freedDN.getRDN(), freedDN.getParent(), false); |
| | | |
| | | ResultCode res = newOp.getResultCode(); |
| | | if (res != ResultCode.SUCCESS) |
| | |
| | | { |
| | | isolationpolicy = configuration.getIsolationPolicy(); |
| | | logChangeNumber = configuration.isLogChangenumber(); |
| | | histPurgeDelayInMilliSec = |
| | | configuration.getConflictsHistoricalPurgeDelay()*60*1000; |
| | | |
| | | changeConfig( |
| | | configuration.getReplicationServer(), |
| | |
| | | { |
| | | return this.eclDomain.isEnabled(); |
| | | } |
| | | |
| | | /** |
| | | * Return the purge delay (in ms) for the historical information stored |
| | | * in entries to solve conflicts for this domain. |
| | | * |
| | | * @return the purge delay. |
| | | */ |
| | | public long getHistoricalPurgeDelay() |
| | | { |
| | | return histPurgeDelayInMilliSec; |
| | | } |
| | | |
| | | /** |
| | | * Check if the operation that just happened has cleared a conflict : |
| | | * Clearing a conflict happens if the operation has free a DN that |
| | | * for which an other entry was in conflict. |
| | | * Steps: |
| | | * - get the DN freed by a DELETE or MODRDN op |
| | | * - search for entries put in the conflict space (dn=entryuid'+'....) |
| | | * because the expected DN was not available (ds-sync-conflict=expected DN) |
| | | * - retain the entry with the oldest conflict |
| | | * - rename this entry with the freedDN as it was expected originally |
| | | * |
| | | * @param task the task raising this purge. |
| | | * @param endDate the date to stop this task whether the job is done or not. |
| | | * @throws DirectoryException when an exception happens. |
| | | * |
| | | */ |
| | | public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task, |
| | | long endDate) |
| | | throws DirectoryException |
| | | { |
| | | LDAPFilter filter = null; |
| | | |
| | | TRACER.debugInfo("[PURGE] purgeConflictsHistorical " |
| | | + "on domain: " + baseDn |
| | | + "endDate:" + new Date(endDate) |
| | | + "lastChangeNumberPurgedFromHist: " |
| | | + lastChangeNumberPurgedFromHist.toStringUI()); |
| | | |
| | | try |
| | | { |
| | | filter = LDAPFilter.decode( |
| | | "(" + EntryHistorical.HISTORICALATTRIBUTENAME + ">=dummy:" |
| | | + lastChangeNumberPurgedFromHist + ")"); |
| | | |
| | | } catch (LDAPException e) |
| | | { |
| | | // Not possible. We know the filter just above is correct. |
| | | } |
| | | |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(EntryHistorical.HISTORICALATTRIBUTENAME); |
| | | attrs.add(EntryHistorical.ENTRYUIDNAME); |
| | | attrs.add("*"); |
| | | InternalSearchOperation searchOp = conn.processSearch( |
| | | ByteString.valueOf(baseDn.toString()), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, filter, |
| | | attrs, null); |
| | | |
| | | int count = 0; |
| | | task.setProgressStats(lastChangeNumberPurgedFromHist, count); |
| | | |
| | | LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); |
| | | for (SearchResultEntry entry : entries) |
| | | { |
| | | long maxTimeToRun = endDate - TimeThread.getTime(); |
| | | if (maxTimeToRun<0) |
| | | { |
| | | Message errMsg = Message.raw(Category.SYNC, Severity.NOTICE, |
| | | " end date reached"); |
| | | DirectoryException de = new DirectoryException( |
| | | ResultCode.ADMIN_LIMIT_EXCEEDED, |
| | | errMsg); |
| | | throw (de); |
| | | } |
| | | |
| | | EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry); |
| | | lastChangeNumberPurgedFromHist = entryHist.getOldestCN(); |
| | | entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec); |
| | | Attribute attr = entryHist.encodeAndPurge(); |
| | | count += entryHist.getLastPurgedValuesCount(); |
| | | List<Modification> mods = new LinkedList<Modification>(); |
| | | Modification mod; |
| | | mod = new Modification(ModificationType.REPLACE, attr); |
| | | mods.add(mod); |
| | | |
| | | ModifyOperationBasis newOp = |
| | | new ModifyOperationBasis( |
| | | conn, InternalClientConnection.nextOperationID(), |
| | | InternalClientConnection.nextMessageID(), |
| | | new ArrayList<Control>(0), |
| | | entry.getDN(), |
| | | mods); |
| | | newOp.setInternalOperation(true); |
| | | newOp.setSynchronizationOperation(true); |
| | | newOp.setDontSynchronize(true); |
| | | |
| | | newOp.run(); |
| | | |
| | | if (newOp.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | | // Log information for the repair tool. |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE.get()); |
| | | mb.append(String.valueOf(newOp)); |
| | | mb.append(" "); |
| | | mb.append(String.valueOf(newOp.getResultCode())); |
| | | logError(mb.toMessage()); |
| | | } |
| | | else |
| | | { |
| | | task.setProgressStats(lastChangeNumberPurgedFromHist, count); |
| | | } |
| | | } |
| | | } |
| | | } |