/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.JebMessages.INFO_JEB_EXPORT_FINAL_STATUS;
import static org.opends.messages.JebMessages.INFO_JEB_EXPORT_PROGRESS_REPORT;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.getExceptionMessage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.opends.messages.Message;
import org.opends.server.admin.Configuration;
import org.opends.server.admin.std.server.BackendCfg;
import org.opends.server.admin.std.server.JEBackendCfg;
import org.opends.server.api.Backend;
import org.opends.server.backends.jeb.BackupManager;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.BackupDirectory;
import org.opends.server.types.ConditionResult;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
import org.opends.server.types.RawAttribute;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.AddChangeRecordEntry;
import org.opends.server.util.DeleteChangeRecordEntry;
import org.opends.server.util.LDIFWriter;
import org.opends.server.util.ModifyChangeRecordEntry;
import org.opends.server.util.ModifyDNChangeRecordEntry;
import org.opends.server.util.Validator;
/**
* This class defines a backend that stores its information in an
* associated replication server object.
* This is primarily intended to take advantage of the backup/restore/
* import/export of the backend API, and to provide an LDAP access
* to the replication server database.
*
* Entries stored in this backend are held in the DB associated with
* the replication server.
*
* Currently are only implemented the create and restore backup features.
*
*/
public class ReplicationBackend
extends Backend
{
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
private static final String EXPORT_BASE_DN = "dc=replicationChanges";
// The base DNs for this backend.
private DN[] baseDNs;
// The mapping between parent DNs and their immediate children.
private HashMap> childDNs;
// The base DNs for this backend, in a hash set.
private HashSet baseDNSet;
// The set of supported controls for this backend.
private HashSet supportedControls;
// The set of supported features for this backend.
private HashSet supportedFeatures;
// The directory associated with this backend.
private BackupDirectory backendDirectory;
ReplicationServer server;
/**
* The configuration of this backend.
*/
private JEBackendCfg cfg;
/**
* The number of milliseconds between job progress reports.
*/
private long progressInterval = 10000;
/**
* The current number of entries exported.
*/
private long exportedCount = 0;
/**
* The current number of entries skipped.
*/
private long skippedCount = 0;
/**
* Creates a new backend with the provided information. All backend
* implementations must implement a default constructor that use
* super() to invoke this constructor.
*/
public ReplicationBackend()
{
super();
// Perform all initialization in initializeBackend.
}
/**
* Set the base DNs for this backend. This is used by the unit tests
* to set the base DNs without having to provide a configuration
* object when initializing the backend.
* @param baseDNs The set of base DNs to be served by this memory backend.
*/
public void setBaseDNs(DN[] baseDNs)
{
this.baseDNs = baseDNs;
}
/**
* {@inheritDoc}
*/
public void configureBackend(Configuration config) throws ConfigException
{
if (config != null)
{
Validator.ensureTrue(config instanceof BackendCfg);
cfg = (JEBackendCfg)config;
DN[] baseDNs = new DN[cfg.getBackendBaseDN().size()];
cfg.getBackendBaseDN().toArray(baseDNs);
setBaseDNs(baseDNs);
backendDirectory = new BackupDirectory(
cfg.getBackendDirectory(), null);
}
}
/**
* {@inheritDoc}
*/
public synchronized void initializeBackend()
throws ConfigException, InitializationException
{
if ((baseDNs == null) || (baseDNs.length != 1))
{
Message message = ERR_MEMORYBACKEND_REQUIRE_EXACTLY_ONE_BASE.get();
throw new ConfigException(message);
}
baseDNSet = new HashSet();
for (DN dn : baseDNs)
{
baseDNSet.add(dn);
}
childDNs = new HashMap>();
supportedControls = new HashSet();
supportedFeatures = new HashSet();
for (DN dn : baseDNs)
{
try
{
DirectoryServer.registerBaseDN(dn, this, false, false);
}
catch (Exception e)
{
if (debugEnabled())
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
Message message = ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(
dn.toString(), getExceptionMessage(e));
throw new InitializationException(message, e);
}
}
}
/**
* Removes any data that may have been stored in this backend.
*/
public synchronized void clearMemoryBackend()
{
childDNs.clear();
}
/**
* {@inheritDoc}
*/
public synchronized void finalizeBackend()
{
for (DN dn : baseDNs)
{
try
{
DirectoryServer.deregisterBaseDN(dn, false);
}
catch (Exception e)
{
if (debugEnabled())
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
}
}
}
/**
* {@inheritDoc}
*/
public DN[] getBaseDNs()
{
return baseDNs;
}
/**
* {@inheritDoc}
*/
public synchronized long getEntryCount()
{
return -1;
}
/**
* {@inheritDoc}
*/
public boolean isLocal()
{
return true;
}
/**
* {@inheritDoc}
*/
public synchronized Entry getEntry(DN entryDN)
{
return null;
}
/**
* {@inheritDoc}
*/
public synchronized boolean entryExists(DN entryDN)
{
return false;
}
/**
* {@inheritDoc}
*/
public synchronized void addEntry(Entry entry, AddOperation addOperation)
throws DirectoryException
{
Message message = ERR_BACKUP_ADD_NOT_SUPPORTED.get();
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
}
/**
* {@inheritDoc}
*/
public synchronized void deleteEntry(DN entryDN,
DeleteOperation deleteOperation)
throws DirectoryException
{
Message message = ERR_BACKUP_DELETE_NOT_SUPPORTED.get();
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
}
/**
* {@inheritDoc}
*/
public synchronized void replaceEntry(Entry entry,
ModifyOperation modifyOperation)
throws DirectoryException
{
Message message = ERR_BACKUP_MODIFY_NOT_SUPPORTED.get();
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
}
/**
* {@inheritDoc}
*/
public synchronized void renameEntry(DN currentDN, Entry entry,
ModifyDNOperation modifyDNOperation)
throws DirectoryException
{
Message message = ERR_BACKUP_MODIFY_DN_NOT_SUPPORTED.get();
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
}
/**
* {@inheritDoc}
*/
public synchronized void search(SearchOperation searchOperation)
throws DirectoryException
{
DN matchedDN = baseDNs[0];
DN baseDN = searchOperation.getBaseDN();
// FIXME Remove this error message or replace when implementing
// the search.
Message message =
ERR_MEMORYBACKEND_ENTRY_DOESNT_EXIST.get(String.valueOf(baseDN));
throw new DirectoryException(
ResultCode.NO_SUCH_OBJECT, message, matchedDN, null);
}
/**
* {@inheritDoc}
*/
public HashSet getSupportedControls()
{
return supportedControls;
}
/**
* {@inheritDoc}
*/
public HashSet getSupportedFeatures()
{
return supportedFeatures;
}
/**
* {@inheritDoc}
*/
public boolean supportsLDIFExport()
{
return true;
}
/**
* {@inheritDoc}
*/
public synchronized void exportLDIF(LDIFExportConfig exportConfig)
throws DirectoryException
{
List includeBranches = exportConfig.getIncludeBranches();
DN baseDN;
ArrayList exportContainers =
new ArrayList();
Iterator rcachei = server.getCacheIterator();
if (rcachei != null)
{
while (rcachei.hasNext())
{
ReplicationCache rc = rcachei.next();
// Skip containers that are not covered by the include branches.
baseDN = DN.decode(rc.getBaseDn().toString() + "," + EXPORT_BASE_DN);
if (includeBranches == null || includeBranches.isEmpty())
{
exportContainers.add(rc);
}
else
{
for (DN includeBranch : includeBranches)
{
if (includeBranch.isDescendantOf(baseDN) ||
includeBranch.isAncestorOf(baseDN))
{
exportContainers.add(rc);
}
}
}
}
}
// Make a note of the time we started.
long startTime = System.currentTimeMillis();
// Start a timer for the progress report.
Timer timer = new Timer();
TimerTask progressTask = new ProgressTask();
timer.scheduleAtFixedRate(progressTask, progressInterval,
progressInterval);
// Create the LDIF writer.
LDIFWriter ldifWriter;
try
{
ldifWriter = new LDIFWriter(exportConfig);
}
catch (Exception e)
{
if (debugEnabled())
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
Message message =
ERR_BACKEND_CANNOT_CREATE_LDIF_WRITER.get(String.valueOf(e));
throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
message, e);
}
exportRootChanges(exportContainers, exportConfig, ldifWriter);
// Iterate through the containers.
try
{
for (ReplicationCache exportContainer : exportContainers)
{
exportContainer(exportContainer, exportConfig, ldifWriter);
}
}
finally
{
timer.cancel();
// Close the LDIF writer
try
{
ldifWriter.close();
}
catch (Exception e)
{
if (debugEnabled())
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
}
}
long finishTime = System.currentTimeMillis();
long totalTime = (finishTime - startTime);
float rate = 0;
if (totalTime > 0)
{
rate = 1000f*exportedCount / totalTime;
}
Message message = INFO_JEB_EXPORT_FINAL_STATUS.get(
exportedCount, skippedCount, totalTime/1000, rate);
logError(message);
}
/*
* Exports the root changes of the export, and one entry by domain.
*/
private void exportRootChanges(List exportContainers,
LDIFExportConfig exportConfig, LDIFWriter ldifWriter)
{
Map> attributes =
new HashMap>();
ArrayList ldapAttrList = new ArrayList();
AttributeType ocType=
DirectoryServer.getAttributeType("objectclass", true);
LinkedHashSet ocValues =
new LinkedHashSet();
ocValues.add(new AttributeValue(ocType, "top"));
ocValues.add(new AttributeValue(ocType, "domain"));
Attribute ocAttr = new Attribute(ocType, "objectclass", ocValues);
ldapAttrList.add(ocAttr);
attributes.put(ocType, ldapAttrList);
try
{
AddChangeRecordEntry changeRecord =
new AddChangeRecordEntry(DN.decode(EXPORT_BASE_DN),
attributes);
ldifWriter.writeChangeRecord(changeRecord);
}
catch (Exception e) {}
for (ReplicationCache exportContainer : exportContainers)
{
attributes.clear();
ldapAttrList.clear();
ldapAttrList.add(ocAttr);
AttributeType stateType=
DirectoryServer.getAttributeType("state", true);
LinkedHashSet stateValues =
new LinkedHashSet();
stateValues.add(new AttributeValue(stateType,
exportContainer.getDbServerState().toString()));
TRACER.debugInfo("State=" +
exportContainer.getDbServerState().toString());
Attribute stateAttr = new Attribute(ocType, "state", stateValues);
ldapAttrList.add(stateAttr);
AttributeType genidType=
DirectoryServer.getAttributeType("generation-id", true);
LinkedHashSet genidValues =
new LinkedHashSet();
genidValues.add(new AttributeValue(genidType,
String.valueOf(exportContainer.getGenerationId())+
exportContainer.getBaseDn()));
Attribute genidAttr = new Attribute(ocType, "generation-id", genidValues);
ldapAttrList.add(genidAttr);
attributes.put(genidType, ldapAttrList);
try
{
AddChangeRecordEntry changeRecord =
new AddChangeRecordEntry(DN.decode(
exportContainer.getBaseDn() + "," + EXPORT_BASE_DN),
attributes);
ldifWriter.writeChangeRecord(changeRecord);
}
catch (Exception e)
{
if (debugEnabled())
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
Message message = ERR_EXPORT_CANNOT_WRITE_ENTRY_TO_LDIF.get(
exportContainer.getBaseDn() + "," + EXPORT_BASE_DN,
String.valueOf(e));
logError(message);
}
}
}
/**
* Export the changes for a given ReplicationCache.
*/
private void exportContainer(ReplicationCache rc,
LDIFExportConfig exportConfig, LDIFWriter ldifWriter)
{
StringBuilder buffer = new StringBuilder();
// Walk through the servers
for (Short serverId : rc.getServers())
{
ReplicationIterator ri = rc.getChangelogIterator(serverId,
null);
if (ri == null)
break;
// Walk through the changes
while (ri.getChange() != null)
{
UpdateMessage msg = ri.getChange();
exportChange(buffer, msg, exportConfig, ldifWriter);
if (!ri.next())
break;
}
}
}
/**
* Export one change.
*/
private void exportChange(StringBuilder buffer, UpdateMessage msg,
LDIFExportConfig exportConfig, LDIFWriter ldifWriter)
{
InternalClientConnection conn =
InternalClientConnection.getRootConnection();
String dn = null;
try
{
if (msg instanceof AddMsg)
{
AddMsg addMsg = (AddMsg)msg;
AddOperation op = (AddOperation)msg.createOperation(conn);
dn = "puid=" + addMsg.getParentUid() + "," +
"changeNumber=" + msg.getChangeNumber().toString() + "," +
msg.getDn() +","+ "dc=replicationChanges";
Map> attributes =
new HashMap>();
for (RawAttribute a : op.getRawAttributes())
{
Attribute attr = a.toAttribute();
AttributeType attrType = attr.getAttributeType();
List attrs = attributes.get(attrType);
if (attrs == null)
{
attrs = new ArrayList(1);
attrs.add(attr);
attributes.put(attrType, attrs);
}
else
{
attrs.add(attr);
}
}
AddChangeRecordEntry changeRecord =
new AddChangeRecordEntry(DN.decode(dn), attributes);
ldifWriter.writeChangeRecord(changeRecord);
}
else if (msg instanceof DeleteMsg)
{
DeleteMsg delMsg = (DeleteMsg)msg;
// DN
dn = "uuid=" + msg.getUniqueId() + "," +
"changeNumber=" + delMsg.getChangeNumber().toString()+ "," +
msg.getDn() +","+
"dc=replicationChanges";
DeleteChangeRecordEntry changeRecord =
new DeleteChangeRecordEntry(DN.decode(dn));
ldifWriter.writeChangeRecord(changeRecord);
}
else if (msg instanceof ModifyMsg)
{
ModifyOperation op = (ModifyOperation)msg.createOperation(conn);
// DN
dn = "uuid=" + msg.getUniqueId() + "," +
"changeNumber=" + msg.getChangeNumber().toString()+ "," +
msg.getDn() +","+
"dc=replicationChanges";
op.setInternalOperation(true);
ModifyChangeRecordEntry changeRecord =
new ModifyChangeRecordEntry(DN.decode(dn),
op.getRawModifications());
ldifWriter.writeChangeRecord(changeRecord);
}
else if (msg instanceof ModifyDNMsg)
{
ModifyDNOperation op = (ModifyDNOperation)msg.createOperation(conn);
// DN
dn = "uuid=" + msg.getUniqueId() + "," +
"changeNumber=" + msg.getChangeNumber().toString()+ "," +
msg.getDn() +","+
"dc=replicationChanges";
op.setInternalOperation(true);
ModifyDNChangeRecordEntry changeRecord =
new ModifyDNChangeRecordEntry(DN.decode(dn),
op.getNewRDN(), op.deleteOldRDN(),
op.getNewSuperior());
ldifWriter.writeChangeRecord(changeRecord);
}
this.exportedCount++;
}
catch (Exception e)
{
this.skippedCount++;
if (debugEnabled())
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
Message message = ERR_EXPORT_CANNOT_WRITE_ENTRY_TO_LDIF.get(
dn, String.valueOf(e));
logError(message);
}
}
/**
* {@inheritDoc}
*/
public boolean supportsLDIFImport()
{
return false;
}
/**
* {@inheritDoc}
*/
public synchronized LDIFImportResult importLDIF(LDIFImportConfig importConfig)
throws DirectoryException
{
return new LDIFImportResult(0, 0, 0);
}
/**
* {@inheritDoc}
*/
public boolean supportsBackup()
{
// This backend does not provide a backup/restore mechanism.
return true;
}
/**
* {@inheritDoc}
*/
public boolean supportsBackup(BackupConfig backupConfig,
StringBuilder unsupportedReason)
{
return true;
}
/**
* {@inheritDoc}
*/
public void createBackup(BackupConfig backupConfig)
throws DirectoryException
{
BackupManager backupManager =
new BackupManager(getBackendID());
backupManager.createBackup(cfg, backupConfig);
}
/**
* {@inheritDoc}
*/
public void removeBackup(BackupDirectory backupDirectory,
String backupID)
throws DirectoryException
{
BackupManager backupManager =
new BackupManager(getBackendID());
backupManager.removeBackup(this.backendDirectory, backupID);
}
/**
* {@inheritDoc}
*/
public boolean supportsRestore()
{
return true;
}
/**
* {@inheritDoc}
*/
public void restoreBackup(RestoreConfig restoreConfig)
throws DirectoryException
{
BackupManager backupManager =
new BackupManager(getBackendID());
backupManager.restoreBackup(cfg, restoreConfig);
}
/**
* Retrieves the number of subordinates for the requested entry.
*
* @param entryDN The distinguished name of the entry.
*
* @return The number of subordinate entries for the requested entry
* or -1 if it can not be determined.
*
* @throws DirectoryException If a problem occurs while trying to
* retrieve the entry.
*/
public long numSubordinates(DN entryDN)
throws DirectoryException
{
Message message = WARN_ROOTDSE_GET_ENTRY_NONROOT.
get(entryDN.toNormalizedString());
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
}
/**
* Indicates whether the requested entry has any subordinates.
*
* @param entryDN The distinguished name of the entry.
*
* @return {@code ConditionResult.TRUE} if the entry has one or more
* subordinates or {@code ConditionResult.FALSE} otherwise
* or {@code ConditionResult.UNDEFINED} if it can not be
* determined.
*
* @throws DirectoryException If a problem occurs while trying to
* retrieve the entry.
*/
public ConditionResult hasSubordinates(DN entryDN)
throws DirectoryException
{
Message message = WARN_ROOTDSE_GET_ENTRY_NONROOT.
get(entryDN.toNormalizedString());
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
}
/**
* Set the replication server associated with this backend.
* @param server The replication server.
*/
public void setServer(ReplicationServer server)
{
this.server = server;
}
/**
* This class reports progress of the export job at fixed intervals.
*/
class ProgressTask extends TimerTask
{
/**
* The number of entries that had been exported at the time of the
* previous progress report.
*/
private long previousCount = 0;
/**
* The time in milliseconds of the previous progress report.
*/
private long previousTime;
/**
* Create a new export progress task.
*/
public ProgressTask()
{
previousTime = System.currentTimeMillis();
}
/**
* The action to be performed by this timer task.
*/
public void run()
{
long latestCount = exportedCount;
long deltaCount = (latestCount - previousCount);
long latestTime = System.currentTimeMillis();
long deltaTime = latestTime - previousTime;
if (deltaTime == 0)
{
return;
}
float rate = 1000f*deltaCount / deltaTime;
Message message =
INFO_JEB_EXPORT_PROGRESS_REPORT.get(latestCount, skippedCount, rate);
logError(message);
previousCount = latestCount;
previousTime = latestTime;
}
};
}