/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006 Sun Microsystems, Inc. */ package org.opends.server.synchronization; import static org.opends.server.loggers.Error.logError; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.synchronization.SynchMessages.*; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.io.Serializable; import org.opends.server.core.AddOperation; import org.opends.server.types.Control; import org.opends.server.types.DN; import org.opends.server.core.DirectoryException; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyOperation; import org.opends.server.protocols.asn1.ASN1OctetString; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.LDAPAttribute; import org.opends.server.protocols.ldap.LDAPException; import org.opends.server.protocols.ldap.LDAPFilter; import org.opends.server.protocols.ldap.LDAPModification; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DereferencePolicy; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.ModificationType; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.SearchScope; /** * ServerState class. * This object is used to store the last update seem on this server * from each server. * It is exchanged with the changelog servers at connection establishment time * It is locally saved in the database */ public class ServerState implements Serializable { private static final long serialVersionUID = 314772980474416183L; private static final String SYNCHRONIZATION_STATE = "ds-sync-state"; private HashMap list; transient private DN baseDn; transient boolean savedStatus = true; transient private InternalClientConnection conn = new InternalClientConnection(); transient private ASN1OctetString serverStateAsn1Dn; transient private DN serverStateDn; /** * create a new ServerState. * @param baseDn The baseDN for which the ServerState is created */ public ServerState(DN baseDn) { list = new HashMap(); this.baseDn = baseDn; serverStateAsn1Dn = new ASN1OctetString( "dc=ffffffff-ffffffff-ffffffff-ffffffff," + baseDn.toString()); try { serverStateDn = DN.decode(serverStateAsn1Dn); } catch (DirectoryException e) { // never happens } } /** * Update the Server State with a ChangeNumber. * All operations with smaller CSN and the same serverID must be committed * before calling this method. * @param changeNumber the committed ChangeNumber. * @return a boolean indicating if the update was meaningfull. */ public boolean update(ChangeNumber changeNumber) { if (changeNumber == null) return false; synchronized(this) { Short id = changeNumber.getServerId(); ChangeNumber oldCN = list.get(id); if (oldCN == null || changeNumber.newer(oldCN)) { list.put(id,changeNumber); savedStatus = false; return true; } else { return false; } } } /** * return a Set of String usable as a textual representation of * a Server state. * format : time seqnum id * * example : * 1 00000109e4666da600220001 * 2 00000109e44567a600220002 * * @return the representation of the Server state */ public Set toStringSet() { HashSet set = new HashSet(); synchronized (this) { for (Short key : list.keySet()) { ChangeNumber change = list.get(key); Date date = new Date(change.getTime()); set.add(change.toString() + " " + date.toString()); } } return set; } /** * return the text representation of ServerState. * @return the text representation of ServerState */ @Override public String toString() { synchronized (this) { String str = null; for (Short key : list.keySet()) { ChangeNumber change = list.get(key); str += change.toString(); } return str; } } /** * Get the largest ChangeNumber seen for a given LDAP server ID. * * @param serverId : the server ID * @return the largest ChangeNumber seen */ public ChangeNumber getMaxChangeNumber(short serverId) { return list.get(serverId); } /** * Save this object to persistent storage. */ public void save() { if (list.size() == 0) return; ArrayList values = new ArrayList(); synchronized (this) { for (Short id : list.keySet()) { ASN1OctetString value = new ASN1OctetString(list.get(id).toString()); values.add(value); } } LDAPAttribute attr = new LDAPAttribute(SYNCHRONIZATION_STATE, values); LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); ArrayList mods = new ArrayList(1); mods.add(mod); boolean done = false; while (!done) { /* * Generate a modify operation on the Server State Entry : * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn */ ModifyOperation op = new ModifyOperation(conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), new ArrayList(0), serverStateAsn1Dn, mods); op.setInternalOperation(true); op.setSynchronizationOperation(true); op.run(); ResultCode resultCode = op.getResultCode(); if (resultCode != ResultCode.SUCCESS) { if (resultCode == ResultCode.NO_SUCH_OBJECT) { createStateEntry(); } else { int msgID = MSGID_ERROR_UPDATING_RUV; String message = getMessage(msgID, op.getResultCode().getResultCodeName(), op.toString(), op.getErrorMessage(), baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); break; } } else done = true; } } /** * Load the ServerState from the backing entry in database to memory. */ public void loadState() { /* * Read the serverState from the database, * If not there create empty entry */ LDAPFilter filter; try { filter = LDAPFilter.decode("objectclass=*"); } catch (LDAPException e) { // can not happen return; } /* * Search the database entry that is used to periodically * save the ServerState */ InternalSearchOperation search = conn.processSearch(serverStateAsn1Dn, SearchScope.BASE_OBJECT, DereferencePolicy.DEREF_ALWAYS, 0, 0, false, filter,new LinkedHashSet(0)); if (((search.getResultCode() != ResultCode.SUCCESS)) && ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT))) { int msgID = MSGID_ERROR_SEARCHING_RUV; String message = getMessage(msgID, search.getResultCode().getResultCodeName(), search.toString(), search.getErrorMessage(), baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } SearchResultEntry resultEntry = null; if (search.getResultCode() == ResultCode.SUCCESS) { /* * Read the serverState from the SYNCHRONIZATION_STATE attribute */ LinkedList result = search.getSearchEntries(); resultEntry = result.getFirst(); if (resultEntry != null) { AttributeType synchronizationStateType = DirectoryServer.getAttributeType(SYNCHRONIZATION_STATE); List attrs = resultEntry.getAttribute(synchronizationStateType); if (attrs != null) { Attribute attr = attrs.get(0); LinkedHashSet values = attr.getValues(); for (AttributeValue value : values) { ChangeNumber changeNumber = new ChangeNumber(value.getStringValue()); update(changeNumber); } } } /* * TODO : The ServerState is saved to the database periodically, * therefore in case of crash it is possible that is does not contain * the latest changes that have been processed and saved to the * database. * In order to make sure that we don't loose them, search all the entries * that have been updated after this entry. * This is done by using the HistoricalCsnOrderingMatchingRule * and an ordering index for historical attribute */ } if ((resultEntry == null) || ((search.getResultCode() != ResultCode.SUCCESS))) { createStateEntry(); } } /** * Create the Entry that will be used to store the ServerState information. * It will be updated when the server stops and periodically. */ private void createStateEntry() { ArrayList attrs = new ArrayList(); ArrayList values = new ArrayList(); ASN1OctetString value = new ASN1OctetString("extensibleObject"); values.add(value); LDAPAttribute attr = new LDAPAttribute("objectClass", values); value = new ASN1OctetString("domain"); values.add(value); attr = new LDAPAttribute("objectClass", values); attrs.add(attr); values = new ArrayList(); value = new ASN1OctetString("ffffffff-ffffffff-ffffffff-ffffffff"); values.add(value); attr = new LDAPAttribute("dc", values); attrs.add(attr); AddOperation add = conn.processAdd(serverStateAsn1Dn, attrs); ResultCode resultCode = add.getResultCode(); if ((resultCode != ResultCode.SUCCESS) && (resultCode != ResultCode.NO_SUCH_OBJECT)) { int msgID = MSGID_ERROR_UPDATING_RUV; String message = getMessage(msgID, add.getResultCode().getResultCodeName(), add.toString(), add.getErrorMessage(), baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } } /** * Get the Dn where the ServerState is stored. * @return Returns the serverStateDn. */ public DN getServerStateDn() { return serverStateDn; } }