| | |
| | | package org.opends.server.replication.plugin; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | |
| | | import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | |
| | | import static org.opends.server.replication.protocol.OperationContext.*; |
| | | import static org.opends.server.util.StaticUtils.createEntry; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.HashSet; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.LinkedList; |
| | |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.zip.CheckedOutputStream; |
| | | import java.util.zip.DataFormatException; |
| | | import java.util.zip.Adler32; |
| | | import java.io.OutputStream; |
| | | |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.meta.MultimasterDomainCfgDefn.*; |
| | | import org.opends.server.admin.std.server.MultimasterDomainCfg; |
| | | import org.opends.server.admin.std.server.BackendCfg; |
| | | import org.opends.server.api.AlertGenerator; |
| | | import org.opends.server.api.Backend; |
| | | import org.opends.server.api.DirectoryThread; |
| | |
| | | import org.opends.server.protocols.asn1.ASN1Exception; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPAttribute; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.protocols.ldap.LDAPModification; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.common.ServerState; |
| | |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.Control; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DereferencePolicy; |
| | | import org.opends.server.types.DirectoryException; |
| | |
| | | import org.opends.server.types.ModificationType; |
| | | import org.opends.server.types.Operation; |
| | | import org.opends.server.types.RDN; |
| | | import org.opends.server.types.RawModification; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.types.SearchFilter; |
| | | import org.opends.server.types.SearchResultEntry; |
| | |
| | | private int maxSendQueue = 0; |
| | | private int maxReceiveDelay = 0; |
| | | private int maxSendDelay = 0; |
| | | private long generationId = -1; |
| | | private long rejectedGenerationId = -1; |
| | | private boolean requestedResetSinceLastStart = false; |
| | | |
| | | /** |
| | | * This object is used to store the list of update currently being |
| | |
| | | private int window = 100; |
| | | |
| | | /** |
| | | * The isoalation policy that this domain is going to use. |
| | | * The isolation policy that this domain is going to use. |
| | | * This field describes the behavior of the domain when an update is |
| | | * attempted and the domain could not connect to any Replication Server. |
| | | * Possible values are accept-updates or deny-updates, but other values |
| | |
| | | |
| | | // The total entry count expected to be processed |
| | | long entryCount = 0; |
| | | // The count for the entry left to be processed |
| | | // The count for the entry not yet processed |
| | | long entryLeftCount = 0; |
| | | |
| | | boolean checksumOutput = false; |
| | | |
| | | // The exception raised when any |
| | | DirectoryException exception = null; |
| | | long checksumOutputValue = (long)0; |
| | | |
| | | /** |
| | | * Initializes the counters of the task with the provider value. |
| | | * Initializes the import/export counters with the provider value. |
| | | * @param count The value with which to initialize the counters. |
| | | */ |
| | | public void initTaskCounters(long count) |
| | | public void initImportExportCounters(long count) |
| | | { |
| | | entryCount = count; |
| | | entryLeftCount = count; |
| | |
| | | * Update the counters of the task for each entry processed during |
| | | * an import or export. |
| | | */ |
| | | public void updateTaskCounters() |
| | | public void updateCounters() |
| | | { |
| | | entryLeftCount--; |
| | | |
| | |
| | | configDn = configuration.dn(); |
| | | |
| | | /* |
| | | * Modify conflicts are solved for all suffixes but the schema suffix |
| | | * because we don't want to store extra information in the schema |
| | | * ldif files. |
| | | * This has no negative impact because the changes on schema should |
| | | * not produce conflicts. |
| | | */ |
| | | * Modify conflicts are solved for all suffixes but the schema suffix |
| | | * because we don't want to store extra information in the schema |
| | | * ldif files. |
| | | * This has no negative impact because the changes on schema should |
| | | * not produce conflicts. |
| | | */ |
| | | if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0) |
| | | { |
| | | solveConflictFlag = false; |
| | |
| | | monitor = new ReplicationMonitor(this); |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | |
| | | backend = retrievesBackend(baseDN); |
| | | if (backend == null) |
| | | { |
| | | throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get( |
| | | baseDN.toNormalizedString())); |
| | | } |
| | | |
| | | try |
| | | { |
| | | generationId = loadGenerationId(); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | logError(ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toNormalizedString(), e.getLocalizedMessage())); |
| | | } |
| | | |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | */ |
| | | broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue, |
| | | maxReceiveDelay, maxSendQueue, maxSendDelay, window, |
| | | heartbeatInterval, new ReplSessionSecurity(configuration)); |
| | | heartbeatInterval, generationId, |
| | | new ReplSessionSecurity(configuration)); |
| | | |
| | | broker.start(replicationServers); |
| | | |
| | | // Retrieves the related backend and its config entry |
| | | try |
| | | { |
| | | retrievesBackendInfos(baseDN); |
| | | } catch (DirectoryException e) |
| | | { |
| | | // The backend associated to this suffix is not able to |
| | | // perform export and import. |
| | | // The replication can continue but this replicationDomain |
| | | // won't be able to use total update. |
| | | } |
| | | |
| | | /* |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | | * for each operation done on the replication domain. |
| | |
| | | * If not set the ResultCode and the response message, |
| | | * interrupt the operation, and return false |
| | | * |
| | | * @param op The Operation that needs to be checked. |
| | | * @param Operation The Operation that needs to be checked. |
| | | * |
| | | * @return true when it OK to process the Operation, false otherwise. |
| | | * When false is returned the resultCode and the reponse message |
| | |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | log(Message.raw("Broker received message :" + msg)); |
| | | |
| | | if (debugEnabled()) |
| | | if (!(msg instanceof HeartbeatMessage)) |
| | | TRACER.debugInfo("Message received <" + msg + ">"); |
| | | |
| | | if (msg instanceof AckMessage) |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | |
| | | { |
| | | // Another server requests us to provide entries |
| | | // for a total update |
| | | initMsg = (InitializeRequestMessage) msg; |
| | | initMsg = (InitializeRequestMessage)msg; |
| | | } |
| | | else if (msg instanceof InitializeTargetMessage) |
| | | { |
| | |
| | | // bunch of entries from the remote server and we |
| | | // want the import thread to catch them and |
| | | // not the ListenerThread. |
| | | importBackend(importMsg); |
| | | initialize(importMsg); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Returns an error message to notify the sender |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(importMsg.getsenderID(), |
| | | de.getMessageObject()); |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(de.getMessageObject()); |
| | | mb.append("Backend ID: "); |
| | | mb.append(backend.getBackendID()); |
| | | log(mb.toMessage()); |
| | | |
| | | // Return an error message to notify the sender |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(importMsg.getsenderID(), |
| | | de.getMessageObject()); |
| | | TRACER.debugInfo(Message.toString(mb.toMessage())); |
| | | broker.publish(errorMsg); |
| | | } |
| | | } |
| | |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMessage)msg); |
| | | } |
| | | else |
| | | { |
| | | /* We can receive an error message from the replication server |
| | | * in the following cases : |
| | | * - we connected with an incorrect generation id |
| | | */ |
| | | ErrorMessage errorMsg = (ErrorMessage)msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get( |
| | | errorMsg.getDetails())); |
| | | |
| | | if (errorMsg.getMsgID() == NOTE_RESET_GENERATION_ID.getId()) |
| | | { |
| | | TRACER.debugInfo("requestedResetSinceLastStart=" + |
| | | requestedResetSinceLastStart + |
| | | "rejectedGenerationId=" + rejectedGenerationId); |
| | | |
| | | if (requestedResetSinceLastStart && (rejectedGenerationId>0)) |
| | | { |
| | | // When the last generation presented was refused and we are |
| | | // the 'reseter' server then restart automatically to become |
| | | // the 'master' |
| | | state.clear(); |
| | | rejectedGenerationId = -1; |
| | | requestedResetSinceLastStart = false; |
| | | broker.stop(); |
| | | broker.start(replicationServers); |
| | | } |
| | | } |
| | | if (errorMsg.getMsgID() == NOTE_BAD_GENERATION_ID.getId()) |
| | | { |
| | | rejectedGenerationId = generationId; |
| | | } |
| | | } |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | |
| | | { |
| | | try |
| | | { |
| | | initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(), |
| | | initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(), |
| | | null); |
| | | } |
| | | catch(DirectoryException de) |
| | |
| | | public void disable() |
| | | { |
| | | state.save(); |
| | | state.clear(); |
| | | state.clearInMemory(); |
| | | disabled = true; |
| | | // stop the listener threads |
| | | for (ListenerThread thread : synchroThreads) |
| | |
| | | * The domain will connect back to a replication Server and |
| | | * will recreate threads to listen for messages from the Sycnhronization |
| | | * server. |
| | | * The generationId will be retrieved or computed if necessary. |
| | | * The ServerState will also be read again from the local database. |
| | | */ |
| | | public void enable() |
| | | { |
| | | state.clear(); |
| | | state.clearInMemory(); |
| | | state.loadState(); |
| | | disabled = false; |
| | | |
| | | |
| | | try |
| | | { |
| | | generationId = loadGenerationId(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | /* TODO should mark that replicationServer service is |
| | | * not available, log an error and retry upon timeout |
| | | * should we stop the modifications ? |
| | | */ |
| | | logError(ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toNormalizedString(), e.getLocalizedMessage())); |
| | | return; |
| | | } |
| | | |
| | | // After an on-line import, the value of the generationId is new |
| | | // and it is necessary for the broker to send this new value as part |
| | | // of the serverStart message. |
| | | broker.setGenerationId(generationId); |
| | | |
| | | broker.start(replicationServers); |
| | | |
| | | createListeners(); |
| | | } |
| | | |
| | | /** |
| | | * Compute the data generationId associated with the current data present |
| | | * in the backend for this domain. |
| | | * @return The computed generationId. |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | public long computeGenerationId() throws DirectoryException |
| | | { |
| | | long bec = backend.getEntryCount(); |
| | | if (bec<0) |
| | | backend = this.retrievesBackend(baseDN); |
| | | bec = backend.getEntryCount(); |
| | | this.acquireIEContext(); |
| | | ieContext.checksumOutput = true; |
| | | ieContext.entryCount = (bec<1000?bec:1000); |
| | | ieContext.entryLeftCount = ieContext.entryCount; |
| | | exportBackend(); |
| | | long genId = ieContext.checksumOutputValue; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Computed generationId: #entries=" + bec + |
| | | " generationId=" + ieContext.checksumOutputValue); |
| | | ieContext.checksumOutput = false; |
| | | this.releaseIEContext(); |
| | | return genId; |
| | | } |
| | | |
| | | /** |
| | | * Returns the generationId set for this domain. |
| | | * |
| | | * @return The generationId. |
| | | */ |
| | | public long getGenerationId() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * The attribute name used to store the state in the backend. |
| | | */ |
| | | protected static final String REPLICATION_GENERATION_ID = |
| | | "ds-sync-generation-id"; |
| | | |
| | | /** |
| | | * Stores the value of the generationId. |
| | | * @param generationId The value of the generationId. |
| | | * @return a ResultCode indicating if the method was successfull. |
| | | */ |
| | | public ResultCode saveGenerationId(long generationId) |
| | | { |
| | | // The generationId is stored in the root entry of the domain. |
| | | ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString()); |
| | | |
| | | ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>(); |
| | | ASN1OctetString value = new ASN1OctetString(Long.toString(generationId)); |
| | | values.add(value); |
| | | |
| | | LDAPAttribute attr = |
| | | new LDAPAttribute(REPLICATION_GENERATION_ID, values); |
| | | LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); |
| | | ArrayList<RawModification> mods = new ArrayList<RawModification>(1); |
| | | mods.add(mod); |
| | | |
| | | ModifyOperationBasis op = |
| | | new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(), |
| | | InternalClientConnection.nextMessageID(), |
| | | new ArrayList<Control>(0), asn1BaseDn, |
| | | mods); |
| | | op.setInternalOperation(true); |
| | | op.setSynchronizationOperation(true); |
| | | op.setDontSynchronize(true); |
| | | |
| | | op.run(); |
| | | |
| | | ResultCode result = op.getResultCode(); |
| | | if (result != ResultCode.SUCCESS) |
| | | { |
| | | Message message = ERR_UPDATING_GENERATION_ID.get( |
| | | op.getResultCode().getResultCodeName() + " " + |
| | | op.getErrorMessage(), |
| | | baseDN.toString()); |
| | | logError(message); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Load the GenerationId from the root entry of the domain |
| | | * from the REPLICATION_GENERATION_ID attribute in database |
| | | * to memory, or compute it if not found. |
| | | * |
| | | * @return generationId The retrieved value of generationId |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | public long loadGenerationId() |
| | | throws DirectoryException |
| | | { |
| | | long generationId=-1; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Attempt to read generation ID from DB " + baseDN.toString()); |
| | | |
| | | ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString()); |
| | | boolean found = false; |
| | | LDAPFilter filter; |
| | | try |
| | | { |
| | | filter = LDAPFilter.decode("objectclass=*"); |
| | | } |
| | | catch (LDAPException e) |
| | | { |
| | | // can not happen |
| | | return -1; |
| | | } |
| | | |
| | | /* |
| | | * Search the database entry that is used to periodically |
| | | * save the ServerState |
| | | */ |
| | | InternalSearchOperation search = null; |
| | | LinkedHashSet<String> attributes = new LinkedHashSet<String>(1); |
| | | attributes.add(REPLICATION_GENERATION_ID); |
| | | search = conn.processSearch(asn1BaseDn, |
| | | SearchScope.BASE_OBJECT, |
| | | DereferencePolicy.DEREF_ALWAYS, 0, 0, false, |
| | | filter,attributes); |
| | | if (((search.getResultCode() != ResultCode.SUCCESS)) && |
| | | ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT))) |
| | | { |
| | | Message message = ERR_SEARCHING_GENERATION_ID.get( |
| | | search.getResultCode().getResultCodeName() + " " + |
| | | search.getErrorMessage(), |
| | | baseDN.toString()); |
| | | logError(message); |
| | | } |
| | | |
| | | SearchResultEntry resultEntry = null; |
| | | if (search.getResultCode() == ResultCode.SUCCESS) |
| | | { |
| | | LinkedList<SearchResultEntry> result = search.getSearchEntries(); |
| | | resultEntry = result.getFirst(); |
| | | if (resultEntry != null) |
| | | { |
| | | AttributeType synchronizationGenIDType = |
| | | DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID); |
| | | List<Attribute> attrs = |
| | | resultEntry.getAttribute(synchronizationGenIDType); |
| | | if (attrs != null) |
| | | { |
| | | Attribute attr = attrs.get(0); |
| | | LinkedHashSet<AttributeValue> values = attr.getValues(); |
| | | if (values.size()!=1) |
| | | { |
| | | Message message = ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toString(), "#Values != 1"); |
| | | logError(message); |
| | | } |
| | | else |
| | | { |
| | | found=true; |
| | | try |
| | | { |
| | | generationId = Long.decode(values.iterator().next(). |
| | | getStringValue()); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | Message message = ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toString(), e.getLocalizedMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (!found) |
| | | { |
| | | generationId = computeGenerationId(); |
| | | saveGenerationId(generationId); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Generation ID created for domain base DN=" + |
| | | baseDN.toString() + |
| | | " generationId=" + generationId); |
| | | } |
| | | else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Generation ID successfully read from domain base DN=" + baseDN + |
| | | " generationId=" + generationId); |
| | | } |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * Reset the generationId of this domain in the whole topology. |
| | | * A message is sent to the Replication Servers for them to reset |
| | | * their change dbs. |
| | | */ |
| | | public void resetGenerationId() |
| | | { |
| | | requestedResetSinceLastStart = true; |
| | | ResetGenerationId genIdMessage = new ResetGenerationId(); |
| | | broker.publish(genIdMessage); |
| | | } |
| | | |
| | | /** |
| | | * Do whatever is needed when a backup is started. |
| | | * We need to make sure that the serverState is correclty save. |
| | | */ |
| | |
| | | { |
| | | msg = broker.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Import: EntryBytes received " + msg); |
| | | if (msg == null) |
| | | { |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | log(Message.raw("receiveEntryBytes: received " + msg)); |
| | | |
| | | if (msg instanceof EntryMessage) |
| | | { |
| | | // FIXME |
| | | EntryMessage entryMsg = (EntryMessage)msg; |
| | | byte[] entryBytes = entryMsg.getEntryBytes().clone(); |
| | | ieContext.updateTaskCounters(); |
| | | ieContext.updateCounters(); |
| | | return entryBytes; |
| | | } |
| | | else if (msg instanceof DoneMessage) |
| | |
| | | // The error is stored and the import is ended |
| | | // by returning null |
| | | ErrorMessage errorMsg = (ErrorMessage)msg; |
| | | ieContext.exception = new DirectoryException(ResultCode.OTHER, |
| | | errorMsg.getDetails()); |
| | | ieContext.exception = new DirectoryException( |
| | | ResultCode.OTHER, |
| | | errorMsg.getDetails()); |
| | | return null; |
| | | } |
| | | else |
| | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // TODO: i18n |
| | | ieContext.exception = new DirectoryException(ResultCode.OTHER, |
| | | Message.raw("received an unexpected message type"), e); |
| | | return null; |
| | | Message.raw("received an unexpected message type" + |
| | | e.getLocalizedMessage())); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Log debug message. |
| | | * @param message The message to log. |
| | | * Export the entries from the backend. |
| | | * The ieContext must have been set before calling. |
| | | * |
| | | * @throws DirectoryException when an error occured |
| | | */ |
| | | private void log(Message message) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("DebugInfo" + message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Export the entries. |
| | | * @throws DirectoryException when an error occurred |
| | | */ |
| | | protected void exportBackend() throws DirectoryException |
| | | protected void exportBackend() |
| | | throws DirectoryException |
| | | { |
| | | // FIXME Temporary workaround - will probably be fixed when implementing |
| | | // dynamic config |
| | | retrievesBackendInfos(this.baseDN); |
| | | backend = retrievesBackend(this.baseDN); |
| | | |
| | | // Acquire a shared lock for the backend. |
| | | try |
| | |
| | | ResultCode.OTHER, message, null); |
| | | } |
| | | |
| | | ReplLDIFOutputStream os = new ReplLDIFOutputStream(this); |
| | | OutputStream os; |
| | | ReplLDIFOutputStream ros; |
| | | |
| | | if (ieContext.checksumOutput) |
| | | { |
| | | ros = new ReplLDIFOutputStream(this, ieContext.entryCount); |
| | | os = new CheckedOutputStream(ros, new Adler32()); |
| | | try |
| | | { |
| | | os.write((Long.toString(ieContext.entryCount)).getBytes()); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // Should never happen |
| | | } |
| | | } |
| | | else |
| | | { |
| | | ros = new ReplLDIFOutputStream(this, (short)-1); |
| | | os = ros; |
| | | } |
| | | LDIFExportConfig exportConfig = new LDIFExportConfig(os); |
| | | |
| | | // baseDN branch is the only one included in the export |
| | | List<DN> includeBranches = new ArrayList<DN>(1); |
| | | includeBranches.add(this.baseDN); |
| | | exportConfig.setIncludeBranches(includeBranches); |
| | | |
| | | // For the checksum computing mode, only consider the 'stable' attributes |
| | | if (ieContext.checksumOutput) |
| | | { |
| | | String includeAttributeStrings[] = |
| | | {"objectclass", "sn", "cn", "entryuuid"}; |
| | | HashSet<AttributeType> includeAttributes; |
| | | includeAttributes = new HashSet<AttributeType>(); |
| | | for (String attrName : includeAttributeStrings) |
| | | { |
| | | AttributeType attrType = DirectoryServer.getAttributeType(attrName); |
| | | if (attrType == null) |
| | | { |
| | | attrType = DirectoryServer.getDefaultAttributeType(attrName); |
| | | } |
| | | includeAttributes.add(attrType); |
| | | } |
| | | exportConfig.setIncludeAttributes(includeAttributes); |
| | | } |
| | | |
| | | // Launch the export. |
| | | try |
| | | { |
| | |
| | | } |
| | | finally |
| | | { |
| | | // Clean up after the export by closing the export config. |
| | | exportConfig.close(); |
| | | |
| | | if ((ieContext != null) && (ieContext.checksumOutput)) |
| | | { |
| | | ieContext.checksumOutputValue = |
| | | ((CheckedOutputStream)os).getChecksum().getValue(); |
| | | } |
| | | else |
| | | { |
| | | // Clean up after the export by closing the export config. |
| | | // Will also flush the export and export the remaining entries. |
| | | // This is a real export where writer has been initialized. |
| | | exportConfig.close(); |
| | | } |
| | | |
| | | // Release the shared lock on the backend. |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the backend object related to the domain and the backend's |
| | | * config entry. They will be used for import and export. |
| | | * TODO This should be in a shared package rather than here. |
| | | * Retrieves the backend related to the domain. |
| | | * |
| | | * @return The backend of that domain. |
| | | * @param baseDN The baseDN to retrieve the backend |
| | | * @throws DirectoryException when an error occired |
| | | */ |
| | | protected void retrievesBackendInfos(DN baseDN) throws DirectoryException |
| | | protected Backend retrievesBackend(DN baseDN) |
| | | { |
| | | // Retrieves the backend related to this domain |
| | | Backend domainBackend = DirectoryServer.getBackend(baseDN); |
| | | if (domainBackend == null) |
| | | { |
| | | Message message = ERR_CANNOT_DECODE_BASE_DN.get(DN_BACKEND_BASE, ""); |
| | | throw new DirectoryException( |
| | | ResultCode.OTHER, message, null); |
| | | } |
| | | |
| | | // Retrieves its configuration |
| | | BackendCfg backendCfg = TaskUtils.getConfigEntry(domainBackend); |
| | | if (backendCfg == null) |
| | | { |
| | | Message message = |
| | | ERR_LDIFIMPORT_NO_BACKENDS_FOR_ID.get(); |
| | | logError(message); |
| | | throw new DirectoryException( |
| | | ResultCode.OTHER, message, null); |
| | | } |
| | | |
| | | this.backend = domainBackend; |
| | | if (! domainBackend.supportsLDIFImport()) |
| | | { |
| | | Message message = ERR_LDIFIMPORT_CANNOT_IMPORT.get( |
| | | String.valueOf(baseDN)); |
| | | logError(message); |
| | | throw new DirectoryException( |
| | | ResultCode.OTHER, message, null); |
| | | } |
| | | return DirectoryServer.getBackend(baseDN); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Sends lDIFEntry entry lines to the export target currently set. |
| | | * Exports an entry in LDIF format. |
| | | * |
| | | * @param lDIFEntry The lines for the LDIF entry. |
| | | * @param lDIFEntry The entry to be exported.. |
| | | * |
| | | * @throws IOException when an error occurred. |
| | | */ |
| | | public void sendEntryLines(String lDIFEntry) throws IOException |
| | | public void exportLDIFEntry(String lDIFEntry) throws IOException |
| | | { |
| | | // If an error was raised - like receiving an ErrorMessage |
| | | // we just let down the export. |
| | |
| | | throw ioe; |
| | | } |
| | | |
| | | // new entry then send the current one |
| | | EntryMessage entryMessage = new EntryMessage( |
| | | if (ieContext.checksumOutput == false) |
| | | { |
| | | // Actually send the entry |
| | | EntryMessage entryMessage = new EntryMessage( |
| | | serverId, ieContext.exportTarget, lDIFEntry.getBytes()); |
| | | broker.publish(entryMessage); |
| | | |
| | | ieContext.updateTaskCounters(); |
| | | broker.publish(entryMessage); |
| | | } |
| | | ieContext.updateCounters(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * and should be updated of its progress. |
| | | * @throws DirectoryException when an error occurs |
| | | */ |
| | | public void initialize(short source, Task initTask) |
| | | public void initializeFromRemote(short source, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | // TRACER.debugInfo("Entering initializeFromRemote"); |
| | | |
| | | acquireIEContext(); |
| | | ieContext.initializeTask = initTask; |
| | | |
| | |
| | | /** |
| | | * Verifies that the given string represents a valid source |
| | | * from which this server can be initialized. |
| | | * @param sourceString The string representaing the source |
| | | * @param sourceString The string representing the source |
| | | * @return The source as a short value |
| | | * @throws DirectoryException if the string is not valid |
| | | */ |
| | | public short decodeSource(String sourceString) |
| | | throws DirectoryException |
| | | { |
| | | TRACER.debugInfo("Entering decodeSource"); |
| | | short source = 0; |
| | | Throwable cause = null; |
| | | try |
| | |
| | | // TODO Verifies serverID is in the domain |
| | | // We shold check here that this is a server implied |
| | | // in the current domain. |
| | | |
| | | log(Message.raw("Source decoded for import:" + source)); |
| | | return source; |
| | | } |
| | | } |
| | |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_INVALID_IMPORT_SOURCE.get(); |
| | | if (cause != null) |
| | | { |
| | | throw new DirectoryException( |
| | | resultCode, message, cause); |
| | | } |
| | | else |
| | | { |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param target The target that should be initialized |
| | | * @param initTask The task that triggers this initialization and that should |
| | | * be updated with its progress. |
| | | * |
| | | * @exception DirectoryException When an error occurs. |
| | | */ |
| | | public void initializeTarget(short target, Task initTask) |
| | | public void initializeRemote(short target, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | initializeTarget(target, serverId, initTask); |
| | | initializeRemote(target, serverId, initTask); |
| | | } |
| | | |
| | | /** |
| | | * Process the initialization of some other server or servers in the topology |
| | | * specified by the target argument when this initialization has been |
| | | * initiated by another server than this one. |
| | | * specified by the target argument when this initialization specifying the |
| | | * server that requests the initialization. |
| | | * |
| | | * @param target The target that should be initialized. |
| | | * @param requestorID The server that initiated the export. |
| | | * @param initTask The task that triggers this initialization and that should |
| | | * be updated with its progress. |
| | | * |
| | | * @exception DirectoryException When an error occurs. |
| | | */ |
| | | public void initializeTarget(short target, short requestorID, Task initTask) |
| | | public void initializeRemote(short target, short requestorID, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | // FIXME Temporary workaround - will probably be fixed when implementing |
| | | // dynamic config |
| | | retrievesBackendInfos(this.baseDN); |
| | | |
| | | acquireIEContext(); |
| | | |
| | | ieContext.exportTarget = target; |
| | | if (initTask != null) |
| | | { |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.initTaskCounters(backend.getEntryCount()); |
| | | } |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMessage initializeMessage = new InitializeTargetMessage( |
| | | baseDN, serverId, ieContext.exportTarget, requestorID, |
| | | backend.getEntryCount()); |
| | | |
| | | log(Message.raw("SD : publishes " + initializeMessage + |
| | | " for #entries=" + backend.getEntryCount() + ieContext.entryLeftCount)); |
| | | |
| | | broker.publish(initializeMessage); |
| | | |
| | | try |
| | | { |
| | | // FIXME Temporary workaround - will probably be fixed when implementing |
| | | // dynamic config |
| | | backend = retrievesBackend(this.baseDN); |
| | | |
| | | if (!backend.supportsLDIFExport()) |
| | | { |
| | | Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get( |
| | | backend.getBackendID().toString()); |
| | | logError(message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | acquireIEContext(); |
| | | |
| | | ieContext.exportTarget = target; |
| | | if (initTask != null) |
| | | { |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.initImportExportCounters(backend.getEntryCount()); |
| | | } |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMessage initializeMessage = new InitializeTargetMessage( |
| | | baseDN, serverId, ieContext.exportTarget, requestorID, |
| | | backend.getEntryCount()); |
| | | |
| | | broker.publish(initializeMessage); |
| | | |
| | | exportBackend(); |
| | | |
| | | // Notify the peer of the success |
| | | DoneMessage doneMsg = new DoneMessage(serverId, |
| | | initializeMessage.getDestination()); |
| | | initializeMessage.getDestination()); |
| | | broker.publish(doneMsg); |
| | | |
| | | releaseIEContext(); |
| | |
| | | catch(DirectoryException de) |
| | | { |
| | | // Notify the peer of the failure |
| | | ErrorMessage errorMsg = new ErrorMessage(target, de.getMessageObject()); |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(target, |
| | | de.getMessageObject()); |
| | | broker.publish(errorMsg); |
| | | |
| | | releaseIEContext(); |
| | |
| | | StringBuilder failureReason = new StringBuilder(); |
| | | if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason)) |
| | | { |
| | | Message message = ERR_LDIFIMPORT_CANNOT_LOCK_BACKEND.get( |
| | | backend.getBackendID(), String.valueOf(failureReason)); |
| | | Message message = ERR_INIT_CANNOT_LOCK_BACKEND.get( |
| | | backend.getBackendID(), |
| | | String.valueOf(failureReason)); |
| | | logError(message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | |
| | | * @param initializeMessage The message that initiated the import. |
| | | * @exception DirectoryException Thrown when an error occurs. |
| | | */ |
| | | protected void importBackend(InitializeTargetMessage initializeMessage) |
| | | protected void initialize(InitializeTargetMessage initializeMessage) |
| | | throws DirectoryException |
| | | { |
| | | LDIFImportConfig importConfig = null; |
| | | DirectoryException de = null; |
| | | |
| | | if (!backend.supportsLDIFImport()) |
| | | { |
| | | Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get( |
| | | backend.getBackendID().toString()); |
| | | logError(message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | try |
| | | { |
| | | log(Message.raw("startImport")); |
| | | |
| | | if (initializeMessage.getRequestorID() == serverId) |
| | | { |
| | | // The import responds to a request we did so the IEContext |
| | |
| | | |
| | | ieContext.importSource = initializeMessage.getsenderID(); |
| | | ieContext.entryLeftCount = initializeMessage.getEntryCount(); |
| | | ieContext.initTaskCounters(initializeMessage.getEntryCount()); |
| | | ieContext.initImportExportCounters(initializeMessage.getEntryCount()); |
| | | |
| | | preBackendImport(this.backend); |
| | | |
| | |
| | | // Process import |
| | | this.backend.importLDIF(importConfig); |
| | | |
| | | TRACER.debugInfo("The import has ended successfully."); |
| | | stateSavingDisabled = false; |
| | | |
| | | // Re-exchange state with SS |
| | | broker.stop(); |
| | | broker.start(replicationServers); |
| | | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | DirectoryException de = |
| | | new DirectoryException( |
| | | ResultCode.OTHER, Message.raw(e.getLocalizedMessage())); |
| | | ieContext.exception = de; |
| | | throw (de); |
| | | de = new DirectoryException(ResultCode.OTHER, |
| | | Message.raw(e.getLocalizedMessage())); |
| | | } |
| | | finally |
| | | { |
| | |
| | | ((InitializeTask)ieContext.initializeTask). |
| | | setState(ieContext.updateTaskCompletionState(),ieContext.exception); |
| | | } |
| | | |
| | | releaseIEContext(); |
| | | |
| | | log(Message.raw("End importBackend")); |
| | | // Retrieves the generation ID associated with the data imported |
| | | try |
| | | { |
| | | generationId = loadGenerationId(); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | logError(ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toNormalizedString(), |
| | | e.getLocalizedMessage())); |
| | | } |
| | | rejectedGenerationId = -1; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "After import, the replication plugin restarts connections" + |
| | | " to all RSs to provide new generation ID=" + generationId); |
| | | broker.setGenerationId(generationId); |
| | | |
| | | // Re-exchange generationID and state with RS |
| | | broker.reStart(); |
| | | } |
| | | // Success |
| | | // Sends up the root error. |
| | | if (de != null) |
| | | throw de; |
| | | } |
| | | |
| | | /** |
| | |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | // FIXME setBackendEnabled should be part taskUtils ? |
| | | TaskUtils.enableBackend(backend.getBackendID()); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if the domain is connected to a ReplicationServer. |
| | | * |
| | | * @return true if the server is connected, false if not. |
| | | */ |
| | | public boolean isConnected() |
| | | { |
| | | return broker.isConnected(); |
| | | } |
| | | |
| | | /** |
| | | * Determine whether the connection to the replication server is encrypted. |
| | | * @return true if the connection is encrypted, false otherwise. |
| | | */ |