/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.zip.DataFormatException; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.api.DirectoryThread; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.*; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DirectoryException; import org.opends.server.types.ResultCode; import org.opends.server.util.TimeThread; /** * This class defines a server handler, which handles all interaction with a * peer replication server. */ public class ECLServerHandler extends ServerHandler { // Properties filled only if remote server is a RS private String serverAddressURL; String operationId; /** * CLDomainContext : contains the state properties for the search * currently being processed, by replication domain. */ private class CLDomainContext { ReplicationServerDomain rsd; // the repl server domain boolean active; // is the domain still active MessageHandler mh; // the message handler associated UpdateMsg nextMsg; UpdateMsg nonElligiblemsg; ServerState startState; ServerState currentState; ServerState stopState; /** * Add to the provider buffer a string representation of this object. */ public void toString(StringBuilder buffer, int i) { CLDomainContext xx = clDomCtxts[i]; buffer.append( " clDomCtxts(" + i + ") [act=" + xx.active + " rsd=" + rsd + " nextMsg=" + nextMsg + "(" + (nextMsg != null? new Date(nextMsg.getChangeNumber().getTime()).toString():"") + ")" + " nextNonEligibleMsg=" + nonElligiblemsg + " startState=" + startState + " stopState= " + stopState + " currState= " + currentState + "]"); } } // The list of contexts by domain for the current search CLDomainContext[] clDomCtxts = new CLDomainContext[0]; private void clDomCtxtsToString(String msg) { StringBuilder buffer = new StringBuilder(); buffer.append(msg+"\n"); for (int i=0;i excludedServiceIDs = new ArrayList(); /** * Provides a string representation of this object. * @return the string representation. */ public String toString() { return new String( this.getClass().getCanonicalName() + ":[" + " nextSeqnum=" + nextSeqnum + " persistent=" + isPersistent + " stopSeqnum" + stopSeqnum + " endOfSeqnumdbReached=" + endOfSeqnumdbReached + " searchPhase=" + searchPhase + " generalizedStartState=" + generalizedStartState + "]"); } } // The context of the current search private CLTraverseCtxt cLSearchCtxt = new CLTraverseCtxt(); /** * Starts this handler based on a start message received from remote server. * @param inECLStartMsg The start msg provided by the remote server. * @return Whether the remote server requires encryption or not. * @throws DirectoryException When a problem occurs. */ public boolean processStartFromRemote(ServerStartECLMsg inECLStartMsg) throws DirectoryException { try { protocolVersion = ProtocolVersion.minWithCurrent( inECLStartMsg.getVersion()); generationId = inECLStartMsg.getGenerationId(); serverURL = inECLStartMsg.getServerURL(); int separator = serverURL.lastIndexOf(':'); serverAddressURL = session.getRemoteAddress() + ":" + serverURL.substring(separator + 1); setInitialServerState(inECLStartMsg.getServerState()); setSendWindowSize(inECLStartMsg.getWindowSize()); if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) { // We support connection from a V1 RS // Only V2 protocol has the group id in repl server start message this.groupId = inECLStartMsg.getGroupId(); } // FIXME:ECL Any generationID must be removed, it makes no sense here. oldGenerationId = -100; } catch(Exception e) { Message message = Message.raw(e.getLocalizedMessage()); throw new DirectoryException(ResultCode.OTHER, message); } return inECLStartMsg.getSSLEncryption(); } /** * Creates a new handler object to a remote replication server. * @param session The session with the remote RS. * @param queueSize The queue size to manage updates to that RS. * @param replicationServerURL The hosting local RS URL. * @param replicationServerId The hosting local RS serverId. * @param replicationServer The hosting local RS object. * @param rcvWindowSize The receiving window size. */ public ECLServerHandler( ProtocolSession session, int queueSize, String replicationServerURL, short replicationServerId, ReplicationServer replicationServer, int rcvWindowSize) { super(session, queueSize, replicationServerURL, replicationServerId, replicationServer, rcvWindowSize); try { setServiceIdAndDomain("cn=changelog"); } catch(DirectoryException de) { // no chance to have a bad domain set here } } /** * Creates a new handler object to a remote replication server. * @param replicationServerURL The hosting local RS URL. * @param replicationServerId The hosting local RS serverId. * @param replicationServer The hosting local RS object. * @param startECLSessionMsg the start parameters. * @throws DirectoryException when an errors occurs. */ public ECLServerHandler( String replicationServerURL, short replicationServerId, ReplicationServer replicationServer, StartECLSessionMsg startECLSessionMsg) throws DirectoryException { // FIXME:ECL queueSize is hard coded to 1 else Handler hangs for some reason super(null, 1, replicationServerURL, replicationServerId, replicationServer, 0); try { setServiceIdAndDomain("cn=changelog"); } catch(DirectoryException de) { // no chance to have a bad domain set here } this.initialize(startECLSessionMsg); } /** * Starts the handler from a remote start message received from * the remote server. * @param inECLStartMsg The provided ReplServerStart message received. */ public void startFromRemoteServer(ServerStartECLMsg inECLStartMsg) { try { // Process start from remote boolean sessionInitiatorSSLEncryption = processStartFromRemote(inECLStartMsg); // lock with timeout lockDomain(true); // send start to remote ReplServerStartMsg outReplServerStartMsg = sendStartToRemote((short)-1); // log logStartHandshakeRCVandSND(inECLStartMsg, outReplServerStartMsg); // until here session is encrypted then it depends on the negociation // The session initiator decides whether to use SSL. if (!sessionInitiatorSSLEncryption) session.stopEncryption(); // wait and process StartSessionMsg from remote RS StartECLSessionMsg inStartECLSessionMsg = waitAndProcessStartSessionECLFromRemoteServer(); logStartECLSessionHandshake(inStartECLSessionMsg); // initialization initialize(inStartECLSessionMsg); } catch(DirectoryException de) { abortStart(de.getMessageObject()); } catch(Exception e) { abortStart(Message.raw(e.getLocalizedMessage())); } finally { if ((replicationServerDomain != null) && replicationServerDomain.hasLock()) { replicationServerDomain.release(); } } } /** * Wait receiving the StartSessionMsg from the remote DS and process it. * @return the startSessionMsg received * @throws DirectoryException * @throws IOException * @throws ClassNotFoundException * @throws DataFormatException * @throws NotSupportedOldVersionPDUException */ private StartECLSessionMsg waitAndProcessStartSessionECLFromRemoteServer() throws DirectoryException, IOException, ClassNotFoundException, DataFormatException, NotSupportedOldVersionPDUException { ReplicationMsg msg = null; msg = session.receive(); if (!(msg instanceof StartECLSessionMsg)) { Message message = Message.raw( "Protocol error: StartECLSessionMsg required." + msg + " received."); abortStart(message); } // Process StartSessionMsg sent by remote DS StartECLSessionMsg startECLSessionMsg = (StartECLSessionMsg) msg; return startECLSessionMsg; } /** * Initialize the handler from a provided cookie value. * @param providedGeneralizedStartState The provided cookie value. * @throws DirectoryException When an error is raised. */ public void initializeCLSearchFromGenState( String providedGeneralizedStartState) throws DirectoryException { this.cLSearchCtxt.nextSeqnum = -1; // will not generate seqnum initializeCLDomCtxts(providedGeneralizedStartState); } /** * Initialize the context for each domain. * @param providedGeneralizedStartState the provided generalized state * @throws DirectoryException When an error occurs. */ public void initializeCLDomCtxts(String providedGeneralizedStartState) throws DirectoryException { HashMap startStates = new HashMap(); ReplicationServer rs = replicationServerDomain.getReplicationServer(); try { // Initialize start state for all running domains with empty state Iterator rsdk = rs.getCacheIterator(); if (rsdk != null) { while (rsdk.hasNext()) { // process a domain ReplicationServerDomain rsd = rsdk.next(); // skip the changelog domain if (rsd == this.replicationServerDomain) continue; startStates.put(rsd.getBaseDn(), new ServerState()); } } // Overwrite start state from the cookie provided in the request if ((providedGeneralizedStartState != null) && (providedGeneralizedStartState.length()>0)) { String[] domains = providedGeneralizedStartState.split(";"); for (String domainState : domains) { // Split baseDN and serverState String[] fields = domainState.split(":"); // BaseDN - Check it String domainBaseDNReceived = fields[0]; if (!startStates.containsKey(domainBaseDNReceived)) throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( "unknown " + domainBaseDNReceived)); // ServerState ServerState domainServerState = new ServerState(); if (fields.length>1) { String strState = fields[1]; String[] strCN = strState.split(" "); for (String sr : strCN) { ChangeNumber fromChangeNumber = new ChangeNumber(sr); domainServerState.update(fromChangeNumber); } } startStates.put(domainBaseDNReceived, domainServerState); // FIXME: ECL first cookie value check // ECL For each of the provided state, it this state is older // than the older change stored in the replication changelog .... // then a purge occured since the time the cookie was published // it is recommended to do a full resync ReplicationServerDomain rsd = rs.getReplicationServerDomain(domainBaseDNReceived, false); ServerState domainStartState = rsd.getStartState(); if (!domainServerState.cover(domainStartState)) { throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( "too old cookie provided " + providedGeneralizedStartState + " first acceptable change for " + rsd.getBaseDn() + " is " + rsd.getStartState())); } } } } catch(DirectoryException de) { throw de; } catch(Exception e) { throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get( "Exception raised: " + e.getMessage())); } try { // Now traverse all domains and build the initial changelog context Iterator rsdi = rs.getCacheIterator(); // Creates the table that will contain the real-time info by domain. clDomCtxts = new CLDomainContext[rs.getCacheSize()-1 -this.cLSearchCtxt.excludedServiceIDs.size()]; int i =0; if (rsdi != null) { while (rsdi.hasNext()) { // process a domain ReplicationServerDomain rsd = rsdi.next(); // skip the 'unreal' changelog domain if (rsd == this.replicationServerDomain) continue; // skip the excluded domains boolean excluded = false; for(String excludedServiceID : this.cLSearchCtxt.excludedServiceIDs) { if (excludedServiceID.equalsIgnoreCase(rsd.getBaseDn())) { excluded=true; break; } } if (excluded) continue; // Creates the context record CLDomainContext newContext = new CLDomainContext(); newContext.active = true; newContext.rsd = rsd; if (this.cLSearchCtxt.isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY) { newContext.startState = rsd.getCLElligibleState(); } else { newContext.startState = startStates.get(rsd.getBaseDn()); newContext.stopState = rsd.getCLElligibleState(); } newContext.currentState = new ServerState(); // Creates an unconnected SH MessageHandler mh = new MessageHandler(maxQueueSize, replicationServerURL, replicationServerId, replicationServer); // set initial state mh.setInitialServerState(newContext.startState); // set serviceID and domain mh.setServiceIdAndDomain(rsd.getBaseDn()); // register into domain rsd.registerHandler(mh); newContext.mh = mh; // store the new context clDomCtxts[i] = newContext; i++; } } // the next record from the seqnumdb should be the one cLSearchCtxt.endOfSeqnumdbReached = false; cLSearchCtxt.generalizedStartState = providedGeneralizedStartState; // Initializes all domain with the next elligible message for (int j=0; j 10) { saturationCount = 0; try { replicationServerDomain.checkAllSaturation(); } catch (IOException e) { } } */ boolean acquired = false; do { try { acquired = sendWindow.tryAcquire((long) 500, TimeUnit.MILLISECONDS); interrupted = false; } catch (InterruptedException e) { // loop until not interrupted } } while (((interrupted) || (!acquired)) && (!shutdownWriter)); if (msg != null) { incrementOutCount(); } return msg; } /** * Get the next message - non blocking - null when none. * * @param synchronous - not used - always non blocking. * @return the next message - null when none. */ protected UpdateMsg getnextMessage(boolean synchronous) { UpdateMsg msg = null; try { ECLUpdateMsg eclMsg = getnextUpdate(); if (eclMsg!=null) msg = eclMsg.getUpdateMsg(); } catch(DirectoryException de) { TRACER.debugCaught(DebugLogLevel.ERROR, de); } return msg; } /** * Get the next external changelog update. * * @return The ECL update, null when none. * @exception DirectoryException when any problem occurs. */ protected ECLUpdateMsg getnextUpdate() throws DirectoryException { return getGeneralizedNextECLUpdate(this.cLSearchCtxt); } /** * Computes the cross domain eligible message (non blocking). * Return null when search is covered */ private ECLUpdateMsg getGeneralizedNextECLUpdate(CLTraverseCtxt cLSearchCtxt) throws DirectoryException { ECLUpdateMsg theOldestChange = null; try { TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + "," + this + " getGeneralizedNextECLUpdate starts with ctxt=" + cLSearchCtxt); if ((cLSearchCtxt.nextSeqnum != -1) && (!cLSearchCtxt.endOfSeqnumdbReached)) { /* TODO:ECL G Good changelog draft compat. // First time , initialise the cursor to traverse the seqnumdb if (seqnumDbReadIterator == null) { try { seqnumDbReadIterator = replicationServerDomain.getReplicationServer(). getSeqnumDbHandler().generateIterator(cLSearchCtxt.nextSeqnum); TRACER.debugInfo("getGeneralizedNextMessage(): " + " creates seqnumDbReadIterator from nextSeqnum=" + cLSearchCtxt.nextSeqnum + " 1rst=" + seqnumDbReadIterator.getSeqnum() + " CN=" + seqnumDbReadIterator.getChangeNumber() + cLSearchCtxt); } catch(Exception e) { cLSearchCtxt.endOfSeqnumdbReached = true; } } */ } // Search / no seqnum / not persistent // ----------------------------------- // init: all domain are candidate // get one msg from each // no (null) msg returned: should not happen since we go to a state // that is computed/expected // getMessage: // get the oldest msg: // after: // if stopState of domain is covered then domain is out candidate // until no domain candidate mean generalized stopState // has been reached // else // get one msg from that domain // no (null) msg returned: should not happen since we go to a state // that is computed/expected // step 2: send DoneMsg // Persistent: // ---------- // step 1&2: same as non persistent // // step 3: reinit all domain are candidate // take the oldest // if one domain has no msg, still is candidate int iDom = 0; boolean nextclchange = true; while ((nextclchange) && (cLSearchCtxt.searchPhase==1)) { // Step 1 & 2 if (cLSearchCtxt.searchPhase==1) { if (debugEnabled()) clDomCtxtsToString("In getGeneralizedNextMessage : " + "looking for the generalized oldest change"); // Retrieves the index in the subx table of the // generalizedOldestChange iDom = getGeneralizedOldestChange(); // idomain != -1 means that we got one // generalized oldest change to process if (iDom==-1) { closePhase1(); // signify end of phase 1 to the caller return null; } // idomain != -1 means that we got one // generalized oldest change to process String suffix = this.clDomCtxts[iDom].rsd.getBaseDn(); theOldestChange = new ECLUpdateMsg( (LDAPUpdateMsg)clDomCtxts[iDom].nextMsg, null, // set later suffix); nextclchange = false; /* TODO:ECL G Good change log draft compat. if (cLSearchCtxt.nextSeqnum!=-1) { // Should either retrieve or generate a seqnum // we also need to check if the seqnumdb is acccurate reagrding // the changelogdb. // if not, 2 potential reasons // -1 : purge from the changelog .. let's traverse the seqnumdb // -2 : changelog is late .. let's traverse the changelog // replogcn : the oldest change from the changelog db ChangeNumber replogcn = theOldestChange.getChangeNumber(); DN replogReplDomDN = clDomCtxts[iDom].rsd.getBaseDn(); while (true) { if (!cLSearchCtxt.endOfSeqnumdbReached) { // we did not reach yet the end of the seqnumdb // seqnumcn : the next change from from the seqnum db ChangeNumber seqnumcn = seqnumDbReadIterator.getChangeNumber(); // are replogcn and seqnumcn should be the same change ? int cmp = replogcn.compareTo(seqnumcn); DN seqnumReplDomDN=DN.decode(seqnumDbReadIterator. getDomainDN()); TRACER.debugInfo("seqnumgen: comparing the 2 db " + " changelogdb:" + replogReplDomDN + "=" + replogcn + " ts=" + new Date(replogcn.getTime()).toString() + "## seqnumdb:" + seqnumReplDomDN + "=" + seqnumcn + " ts=" + new Date(seqnumcn.getTime()).toString() + " sn older=" + seqnumcn.older(replogcn)); if ((replogReplDomDN.compareTo(seqnumReplDomDN)==0) && (cmp==0)) { // same domain and same CN => same change // assign the seqnum from the seqnumdb // to the change from the changelogdb TRACER.debugInfo("seqnumgen: assigning seqnum=" + seqnumDbReadIterator.getSeqnum() + " to change=" + theOldestChange); theOldestChange.setSeqnum(seqnumDbReadIterator.getSeqnum()); // prepare the next seqnum for the potential next change added // to the seqnumDb cLSearchCtxt.nextSeqnum = seqnumDbReadIterator.getSeqnum() + 1; break; } else { // replogcn and seqnumcn are NOT the same change if (seqnumcn.older(replogcn)) { // the change from the seqnumDb is older // that means that the change has been purged from the // changelog try { // let's traverse the seqnumdb searching for the change // found in the changelogDb. TRACER.debugInfo("seqnumgen: will skip " + seqnumcn + " and next from the seqnum"); cLSearchCtxt.endOfSeqnumdbReached = (seqnumDbReadIterator.next()==false); TRACER.debugInfo("seqnumgen: has nexted cr to " + " sn=" + seqnumDbReadIterator.getSeqnum() + " cn=" + seqnumDbReadIterator.getChangeNumber() + " and reached end " + " of seqnumdb:"+cLSearchCtxt.endOfSeqnumdbReached); if (cLSearchCtxt.endOfSeqnumdbReached) { // we are at the end of the seqnumdb in the append mode // store in seqnumdb the pair // seqnum of the cur change,state before this change) replicationServerDomain.addSeqnum( cLSearchCtxt.nextSeqnum, getGenState(), clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(), theOldestChange.getChangeNumber()); theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum); cLSearchCtxt.nextSeqnum++; break; } else { // next change from seqnumdb cLSearchCtxt.nextSeqnum = seqnumDbReadIterator.getSeqnum() + 1; continue; } } catch(Exception e) { } } else { // the change from the changelogDb is older // it should have been stored lately // let's continue to traverse the changelogdb TRACER.debugInfo("seqnumgen: will skip " + replogcn + " and next from the CL"); nextclchange = true; break; // TO BE CHECKED } } } else { // we are at the end of the seqnumdb in the append mode // store in seqnumdb the pair // (seqnum of the current change, state before this change) replicationServerDomain.addSeqnum( cLSearchCtxt.nextSeqnum, getGenState(), clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(), theOldestChange.getChangeNumber()); theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum); cLSearchCtxt.nextSeqnum++; break; } } // while seqnum } // nextseqnum !- -1 */ // here we have the right oldest change and in the seqnum case we // have its seqnum // Set and test the domain of the oldestChange see if we reached // the end of the phase for this domain clDomCtxts[iDom].currentState.update( theOldestChange.getUpdateMsg().getChangeNumber()); if (clDomCtxts[iDom].currentState.cover(clDomCtxts[iDom].stopState)) { clDomCtxts[iDom].active = false; } // Test the seqnum of the oldestChange see if we reached // the end of operation /* TODO:ECL G Good changelog draft compat. Not yet implemented if ((cLSearchCtxt.stopSeqnum>0) && (theOldestChange.getSeqnum()>=cLSearchCtxt.stopSeqnum)) { closePhase1(); // means end of phase 1 to the calling writer return null; } */ if (clDomCtxts[iDom].active) { // populates the table with the next eligible msg from idomain // in non blocking mode, return null when no more eligible msg getNextElligibleMessage(iDom); } } // phase ==1 } // while (nextclchange) if (cLSearchCtxt.searchPhase==2) { clDomCtxtsToString("In getGeneralizedNextMessage (persistent): " + "looking for the generalized oldest change"); for (int ido=0; ido