| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import static org.opends.messages.BackendMessages.*; |
| | | import static org.opends.messages.JebMessages.INFO_JEB_EXPORT_FINAL_STATUS; |
| | | import static org.opends.messages.JebMessages.INFO_JEB_EXPORT_PROGRESS_REPORT; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.util.StaticUtils.getExceptionMessage; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.admin.Configuration; |
| | |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.core.SearchOperation; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.DeleteMsg; |
| | | import org.opends.server.replication.protocol.ModifyDNMsg; |
| | | import org.opends.server.replication.protocol.ModifyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.BackupConfig; |
| | | import org.opends.server.types.BackupDirectory; |
| | | import org.opends.server.types.ConditionResult; |
| | |
| | | import org.opends.server.types.LDIFExportConfig; |
| | | import org.opends.server.types.LDIFImportConfig; |
| | | import org.opends.server.types.LDIFImportResult; |
| | | import org.opends.server.types.RawAttribute; |
| | | import org.opends.server.types.RestoreConfig; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.util.AddChangeRecordEntry; |
| | | import org.opends.server.util.DeleteChangeRecordEntry; |
| | | import org.opends.server.util.LDIFWriter; |
| | | import org.opends.server.util.ModifyChangeRecordEntry; |
| | | import org.opends.server.util.ModifyDNChangeRecordEntry; |
| | | import org.opends.server.util.Validator; |
| | | |
| | | /** |
| | |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private static final String EXPORT_BASE_DN = "dc=replicationChanges"; |
| | | |
| | | // The base DNs for this backend. |
| | | private DN[] baseDNs; |
| | | |
| | |
| | | private JEBackendCfg cfg; |
| | | |
| | | /** |
| | | * The number of milliseconds between job progress reports. |
| | | */ |
| | | private long progressInterval = 10000; |
| | | |
| | | /** |
| | | * The current number of entries exported. |
| | | */ |
| | | private long exportedCount = 0; |
| | | |
| | | /** |
| | | * The current number of entries skipped. |
| | | */ |
| | | private long skippedCount = 0; |
| | | |
| | | /** |
| | | * Creates a new backend with the provided information. All backend |
| | | * implementations must implement a default constructor that use |
| | | * <CODE>super()</CODE> to invoke this constructor. |
| | |
| | | */ |
| | | public boolean supportsLDIFExport() |
| | | { |
| | | return false; |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public synchronized void exportLDIF(LDIFExportConfig exportConfig) |
| | | throws DirectoryException |
| | | throws DirectoryException |
| | | { |
| | | // TODO |
| | | List<DN> includeBranches = exportConfig.getIncludeBranches(); |
| | | DN baseDN; |
| | | ArrayList<ReplicationCache> exportContainers = |
| | | new ArrayList<ReplicationCache>(); |
| | | |
| | | Iterator<ReplicationCache> rcachei = server.getCacheIterator(); |
| | | if (rcachei != null) |
| | | { |
| | | while (rcachei.hasNext()) |
| | | { |
| | | ReplicationCache rc = rcachei.next(); |
| | | |
| | | // Skip containers that are not covered by the include branches. |
| | | baseDN = DN.decode(rc.getBaseDn().toString() + "," + EXPORT_BASE_DN); |
| | | |
| | | if (includeBranches == null || includeBranches.isEmpty()) |
| | | { |
| | | exportContainers.add(rc); |
| | | } |
| | | else |
| | | { |
| | | for (DN includeBranch : includeBranches) |
| | | { |
| | | if (includeBranch.isDescendantOf(baseDN) || |
| | | includeBranch.isAncestorOf(baseDN)) |
| | | { |
| | | exportContainers.add(rc); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Make a note of the time we started. |
| | | long startTime = System.currentTimeMillis(); |
| | | |
| | | // Start a timer for the progress report. |
| | | Timer timer = new Timer(); |
| | | TimerTask progressTask = new ProgressTask(); |
| | | timer.scheduleAtFixedRate(progressTask, progressInterval, |
| | | progressInterval); |
| | | |
| | | // Create the LDIF writer. |
| | | LDIFWriter ldifWriter; |
| | | try |
| | | { |
| | | ldifWriter = new LDIFWriter(exportConfig); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | |
| | | Message message = |
| | | ERR_BACKEND_CANNOT_CREATE_LDIF_WRITER.get(String.valueOf(e)); |
| | | throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), |
| | | message, e); |
| | | } |
| | | |
| | | exportRootChanges(exportContainers, exportConfig, ldifWriter); |
| | | |
| | | // Iterate through the containers. |
| | | try |
| | | { |
| | | for (ReplicationCache exportContainer : exportContainers) |
| | | { |
| | | exportContainer(exportContainer, exportConfig, ldifWriter); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | timer.cancel(); |
| | | |
| | | // Close the LDIF writer |
| | | try |
| | | { |
| | | ldifWriter.close(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | } |
| | | |
| | | long finishTime = System.currentTimeMillis(); |
| | | long totalTime = (finishTime - startTime); |
| | | |
| | | float rate = 0; |
| | | if (totalTime > 0) |
| | | { |
| | | rate = 1000f*exportedCount / totalTime; |
| | | } |
| | | |
| | | Message message = INFO_JEB_EXPORT_FINAL_STATUS.get( |
| | | exportedCount, skippedCount, totalTime/1000, rate); |
| | | logError(message); |
| | | } |
| | | |
| | | /* |
| | | * Exports the root changes of the export, and one entry by domain. |
| | | */ |
| | | private void exportRootChanges(List<ReplicationCache> exportContainers, |
| | | LDIFExportConfig exportConfig, LDIFWriter ldifWriter) |
| | | { |
| | | Map<AttributeType,List<Attribute>> attributes = |
| | | new HashMap<AttributeType,List<Attribute>>(); |
| | | ArrayList<Attribute> ldapAttrList = new ArrayList<Attribute>(); |
| | | |
| | | AttributeType ocType= |
| | | DirectoryServer.getAttributeType("objectclass", true); |
| | | LinkedHashSet<AttributeValue> ocValues = |
| | | new LinkedHashSet<AttributeValue>(); |
| | | ocValues.add(new AttributeValue(ocType, "top")); |
| | | ocValues.add(new AttributeValue(ocType, "domain")); |
| | | Attribute ocAttr = new Attribute(ocType, "objectclass", ocValues); |
| | | ldapAttrList.add(ocAttr); |
| | | attributes.put(ocType, ldapAttrList); |
| | | |
| | | try |
| | | { |
| | | AddChangeRecordEntry changeRecord = |
| | | new AddChangeRecordEntry(DN.decode(EXPORT_BASE_DN), |
| | | attributes); |
| | | ldifWriter.writeChangeRecord(changeRecord); |
| | | } |
| | | catch (Exception e) {} |
| | | |
| | | for (ReplicationCache exportContainer : exportContainers) |
| | | { |
| | | attributes.clear(); |
| | | ldapAttrList.clear(); |
| | | |
| | | ldapAttrList.add(ocAttr); |
| | | |
| | | AttributeType stateType= |
| | | DirectoryServer.getAttributeType("state", true); |
| | | LinkedHashSet<AttributeValue> stateValues = |
| | | new LinkedHashSet<AttributeValue>(); |
| | | stateValues.add(new AttributeValue(stateType, |
| | | exportContainer.getDbServerState().toString())); |
| | | TRACER.debugInfo("State=" + |
| | | exportContainer.getDbServerState().toString()); |
| | | Attribute stateAttr = new Attribute(ocType, "state", stateValues); |
| | | ldapAttrList.add(stateAttr); |
| | | |
| | | AttributeType genidType= |
| | | DirectoryServer.getAttributeType("generation-id", true); |
| | | LinkedHashSet<AttributeValue> genidValues = |
| | | new LinkedHashSet<AttributeValue>(); |
| | | genidValues.add(new AttributeValue(genidType, |
| | | String.valueOf(exportContainer.getGenerationId())+ |
| | | exportContainer.getBaseDn())); |
| | | Attribute genidAttr = new Attribute(ocType, "generation-id", genidValues); |
| | | ldapAttrList.add(genidAttr); |
| | | attributes.put(genidType, ldapAttrList); |
| | | |
| | | try |
| | | { |
| | | AddChangeRecordEntry changeRecord = |
| | | new AddChangeRecordEntry(DN.decode( |
| | | exportContainer.getBaseDn() + "," + EXPORT_BASE_DN), |
| | | attributes); |
| | | ldifWriter.writeChangeRecord(changeRecord); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | Message message = ERR_EXPORT_CANNOT_WRITE_ENTRY_TO_LDIF.get( |
| | | exportContainer.getBaseDn() + "," + EXPORT_BASE_DN, |
| | | String.valueOf(e)); |
| | | logError(message); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Export the changes for a given ReplicationCache. |
| | | */ |
| | | private void exportContainer(ReplicationCache rc, |
| | | LDIFExportConfig exportConfig, LDIFWriter ldifWriter) |
| | | { |
| | | StringBuilder buffer = new StringBuilder(); |
| | | |
| | | // Walk through the servers |
| | | for (Short serverId : rc.getServers()) |
| | | { |
| | | ReplicationIterator ri = rc.getChangelogIterator(serverId, |
| | | null); |
| | | |
| | | if (ri == null) |
| | | break; |
| | | |
| | | // Walk through the changes |
| | | while (ri.getChange() != null) |
| | | { |
| | | UpdateMessage msg = ri.getChange(); |
| | | exportChange(buffer, msg, exportConfig, ldifWriter); |
| | | if (!ri.next()) |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Export one change. |
| | | */ |
| | | private void exportChange(StringBuilder buffer, UpdateMessage msg, |
| | | LDIFExportConfig exportConfig, LDIFWriter ldifWriter) |
| | | { |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | String dn = null; |
| | | try |
| | | { |
| | | if (msg instanceof AddMsg) |
| | | { |
| | | AddMsg addMsg = (AddMsg)msg; |
| | | AddOperation op = (AddOperation)msg.createOperation(conn); |
| | | dn = "puid=" + addMsg.getParentUid() + "," + |
| | | "changeNumber=" + msg.getChangeNumber().toString() + "," + |
| | | msg.getDn() +","+ "dc=replicationChanges"; |
| | | |
| | | Map<AttributeType,List<Attribute>> attributes = |
| | | new HashMap<AttributeType,List<Attribute>>(); |
| | | |
| | | for (RawAttribute a : op.getRawAttributes()) |
| | | { |
| | | Attribute attr = a.toAttribute(); |
| | | AttributeType attrType = attr.getAttributeType(); |
| | | List<Attribute> attrs = attributes.get(attrType); |
| | | if (attrs == null) |
| | | { |
| | | attrs = new ArrayList<Attribute>(1); |
| | | attrs.add(attr); |
| | | attributes.put(attrType, attrs); |
| | | } |
| | | else |
| | | { |
| | | attrs.add(attr); |
| | | } |
| | | } |
| | | AddChangeRecordEntry changeRecord = |
| | | new AddChangeRecordEntry(DN.decode(dn), attributes); |
| | | ldifWriter.writeChangeRecord(changeRecord); |
| | | } |
| | | else if (msg instanceof DeleteMsg) |
| | | { |
| | | DeleteMsg delMsg = (DeleteMsg)msg; |
| | | // DN |
| | | dn = "uuid=" + msg.getUniqueId() + "," + |
| | | "changeNumber=" + delMsg.getChangeNumber().toString()+ "," + |
| | | msg.getDn() +","+ |
| | | "dc=replicationChanges"; |
| | | DeleteChangeRecordEntry changeRecord = |
| | | new DeleteChangeRecordEntry(DN.decode(dn)); |
| | | ldifWriter.writeChangeRecord(changeRecord); |
| | | } |
| | | else if (msg instanceof ModifyMsg) |
| | | { |
| | | ModifyOperation op = (ModifyOperation)msg.createOperation(conn); |
| | | // DN |
| | | dn = "uuid=" + msg.getUniqueId() + "," + |
| | | "changeNumber=" + msg.getChangeNumber().toString()+ "," + |
| | | msg.getDn() +","+ |
| | | "dc=replicationChanges"; |
| | | op.setInternalOperation(true); |
| | | ModifyChangeRecordEntry changeRecord = |
| | | new ModifyChangeRecordEntry(DN.decode(dn), |
| | | op.getRawModifications()); |
| | | ldifWriter.writeChangeRecord(changeRecord); |
| | | } |
| | | else if (msg instanceof ModifyDNMsg) |
| | | { |
| | | ModifyDNOperation op = (ModifyDNOperation)msg.createOperation(conn); |
| | | // DN |
| | | dn = "uuid=" + msg.getUniqueId() + "," + |
| | | "changeNumber=" + msg.getChangeNumber().toString()+ "," + |
| | | msg.getDn() +","+ |
| | | "dc=replicationChanges"; |
| | | op.setInternalOperation(true); |
| | | ModifyDNChangeRecordEntry changeRecord = |
| | | new ModifyDNChangeRecordEntry(DN.decode(dn), |
| | | op.getNewRDN(), op.deleteOldRDN(), |
| | | op.getNewSuperior()); |
| | | ldifWriter.writeChangeRecord(changeRecord); |
| | | } |
| | | this.exportedCount++; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | this.skippedCount++; |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | Message message = ERR_EXPORT_CANNOT_WRITE_ENTRY_TO_LDIF.get( |
| | | dn, String.valueOf(e)); |
| | | logError(message); |
| | | |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | this.server = server; |
| | | } |
| | | |
| | | /** |
| | | * This class reports progress of the export job at fixed intervals. |
| | | */ |
| | | class ProgressTask extends TimerTask |
| | | { |
| | | /** |
| | | * The number of entries that had been exported at the time of the |
| | | * previous progress report. |
| | | */ |
| | | private long previousCount = 0; |
| | | |
| | | /** |
| | | * The time in milliseconds of the previous progress report. |
| | | */ |
| | | private long previousTime; |
| | | |
| | | /** |
| | | * Create a new export progress task. |
| | | */ |
| | | public ProgressTask() |
| | | { |
| | | previousTime = System.currentTimeMillis(); |
| | | } |
| | | |
| | | /** |
| | | * The action to be performed by this timer task. |
| | | */ |
| | | public void run() |
| | | { |
| | | long latestCount = exportedCount; |
| | | long deltaCount = (latestCount - previousCount); |
| | | long latestTime = System.currentTimeMillis(); |
| | | long deltaTime = latestTime - previousTime; |
| | | |
| | | if (deltaTime == 0) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | float rate = 1000f*deltaCount / deltaTime; |
| | | |
| | | Message message = |
| | | INFO_JEB_EXPORT_PROGRESS_REPORT.get(latestCount, skippedCount, rate); |
| | | logError(message); |
| | | |
| | | previousCount = latestCount; |
| | | previousTime = latestTime; |
| | | } |
| | | }; |
| | | } |