/*
|
* CDDL HEADER START
|
*
|
* The contents of this file are subject to the terms of the
|
* Common Development and Distribution License, Version 1.0 only
|
* (the "License"). You may not use this file except in compliance
|
* with the License.
|
*
|
* You can obtain a copy of the license at
|
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
|
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
|
* See the License for the specific language governing permissions
|
* and limitations under the License.
|
*
|
* When distributing Covered Code, include this CDDL HEADER in each
|
* file and include the License file at
|
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
|
* add the following below this CDDL HEADER, with the fields enclosed
|
* by brackets "[]" replaced with your own identifying * information:
|
* Portions Copyright [yyyy] [name of copyright owner]
|
*
|
* CDDL HEADER END
|
*
|
*
|
* Portions Copyright 2006 Sun Microsystems, Inc.
|
*/
|
package org.opends.server.changelog;
|
|
import static org.opends.server.loggers.Error.logError;
|
import static org.opends.server.messages.MessageHandler.getMessage;
|
import static org.opends.server.synchronization.SynchMessages.*;
|
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
|
|
import java.util.ArrayList;
|
import java.util.Date;
|
import java.util.List;
|
import java.util.LinkedList;
|
|
import org.opends.server.api.DirectoryThread;
|
import org.opends.server.api.MonitorProvider;
|
import org.opends.server.changelog.ChangelogDB.ChangelogCursor;
|
import org.opends.server.config.ConfigEntry;
|
import org.opends.server.config.ConfigException;
|
import org.opends.server.types.Attribute;
|
import org.opends.server.types.DN;
|
import org.opends.server.types.ErrorLogCategory;
|
import org.opends.server.types.ErrorLogSeverity;
|
import org.opends.server.types.InitializationException;
|
import org.opends.server.util.TimeThread;
|
import org.opends.server.core.DirectoryServer;
|
import org.opends.server.synchronization.ChangeNumber;
|
import org.opends.server.synchronization.UpdateMessage;
|
|
import com.sleepycat.je.DatabaseException;
|
|
/**
|
* This class is used for managing the changelog database for each servers
|
* in the topology.
|
* It is responsible for efficiently saving the updates that is received from
|
* each master server into stable storage.
|
* This class is also able to generate a ChangelogIterator that can be
|
* used to read all changes from a given ChangeNUmber.
|
*
|
* This class publish some monitoring information below cn=monitor.
|
*
|
*/
|
public class DbHandler implements Runnable
|
{
|
// This queue hold all the updates not yet saved to stable storage
|
// it is only used as a temporary placeholder so that the write
|
// in the stable storage can be grouped for efficiency reason.
|
// it is never read back by changelog threads that are responsible
|
// for pushing the changes to other changelog server or to LDAP server
|
private LinkedList<UpdateMessage> msgQueue = new LinkedList<UpdateMessage>();
|
private ChangelogDB db;
|
private ChangeNumber firstChange = null;
|
private ChangeNumber lastChange = null;
|
private short serverId;
|
private DN baseDn;
|
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
|
private boolean shutdown = false;
|
private boolean done = false;
|
private DirectoryThread thread = null;
|
|
/**
|
* Creates a New dbHandler associated to a given LDAP server.
|
*
|
* @param id Identifier of the DB.
|
* @param baseDn of the DB.
|
* @throws DatabaseException If a database problem happened
|
*/
|
public DbHandler(short id, DN baseDn) throws DatabaseException
|
{
|
this.serverId = id;
|
this.baseDn = baseDn;
|
db = new ChangelogDB(id, baseDn);
|
firstChange = db.readFirstChange();
|
lastChange = db.readLastChange();
|
thread = new DirectoryThread(this, "changelog db " + id + " " + baseDn);
|
thread.start();
|
|
DirectoryServer.deregisterMonitorProvider(
|
dbMonitor.getMonitorInstanceName());
|
DirectoryServer.registerMonitorProvider(dbMonitor);
|
}
|
|
/**
|
* Add an update to the list of messages that must be saved to the db
|
* managed by this db handler.
|
* This method is blocking if the size of the list of message is larger
|
* than its maximum.
|
*
|
* @param update The update that must be saved to the db managed by this db
|
* handler.
|
*/
|
public void add(UpdateMessage update)
|
{
|
synchronized (msgQueue)
|
{
|
int size = msgQueue.size();
|
while (size > 5000) /* TODO : max size should be configurable */
|
{
|
try
|
{
|
msgQueue.wait(500);
|
} catch (InterruptedException e)
|
{
|
// simply loop to try again.
|
}
|
size = msgQueue.size();
|
}
|
|
msgQueue.add(update);
|
if (lastChange == null || lastChange.older(update.getChangeNumber()))
|
{
|
lastChange = update.getChangeNumber();
|
}
|
if (firstChange == null)
|
firstChange = update.getChangeNumber();
|
}
|
}
|
|
/**
|
* Get some changes out of the message queue of the LDAP server.
|
*
|
* @param number the number of messages to extract.
|
* @return a List containing number changes extracted from the queue.
|
*/
|
private List<UpdateMessage> getChanges(int number)
|
{
|
int current = 0;
|
LinkedList<UpdateMessage> changes = new LinkedList<UpdateMessage>();
|
|
synchronized (msgQueue)
|
{
|
int size = msgQueue.size();
|
while ((current < number) && (current < size))
|
{
|
UpdateMessage msg = msgQueue.get(current);
|
current++;
|
changes.add(msg);
|
}
|
}
|
return changes;
|
}
|
|
/**
|
* Get the firstChange.
|
* @return Returns the firstChange.
|
*/
|
public ChangeNumber getFirstChange()
|
{
|
return firstChange;
|
}
|
|
/**
|
* Get the lastChange.
|
* @return Returns the lastChange.
|
*/
|
public ChangeNumber getLastChange()
|
{
|
return lastChange;
|
}
|
|
/**
|
* Generate a new ChangelogIterator that allows to browse the db
|
* managed by this dbHandler and starting at the position defined
|
* by a given changeNumber.
|
*
|
* @param changeNumber The position where the iterator must start.
|
*
|
* @return a new ChangelogIterator that allows to browse the db
|
* managed by this dbHandler and starting at the position defined
|
* by a given changeNumber.
|
*
|
* @throws DatabaseException if a database problem happened.
|
* @throws Exception If there is no other change to push after change
|
* with changeNumber number.
|
*/
|
public ChangelogIterator generateIterator(ChangeNumber changeNumber)
|
throws DatabaseException, Exception
|
{
|
return new ChangelogIterator(serverId, db, changeNumber);
|
}
|
|
/**
|
* Removes message in a subList of the msgQueue from the msgQueue.
|
*
|
* @param number the number of changes to be removed.
|
*/
|
private void clear(int number)
|
{
|
synchronized (msgQueue)
|
{
|
int current = 0;
|
while ((current < number) && (!msgQueue.isEmpty()))
|
{
|
msgQueue.remove();
|
current++;
|
}
|
if (msgQueue.size() < 5000)
|
msgQueue.notify();
|
}
|
}
|
|
/**
|
* Shutdown this dbHandler.
|
*/
|
public void shutdown()
|
{
|
shutdown = true;
|
synchronized (this)
|
{
|
this.notifyAll();
|
}
|
|
synchronized (this)
|
{
|
while (done == false)
|
{
|
try
|
{
|
this.wait();
|
} catch (Exception e)
|
{}
|
}
|
}
|
db.shutdown();
|
}
|
|
/**
|
* Run method for this class.
|
* Periodically Flushes the ChangelogCache from memory to the stable storage
|
* and trims the old updates.
|
*/
|
public void run()
|
{
|
while (shutdown == false)
|
{
|
try {
|
flush();
|
trim();
|
|
synchronized (this)
|
{
|
try
|
{
|
this.wait(1000);
|
} catch (InterruptedException e)
|
{ }
|
}
|
} catch (Exception end)
|
{
|
int msgID = MSGID_EXCEPTION_CHANGELOG_TRIM_FLUSH;
|
String message = getMessage(msgID) + stackTraceToSingleLineString(end);
|
logError(ErrorLogCategory.SYNCHRONIZATION,
|
ErrorLogSeverity.SEVERE_ERROR,
|
message, msgID);
|
}
|
}
|
// call flush a last time before exiting to make sure that
|
// no change was forgotten in the msgQueue
|
flush();
|
|
synchronized (this)
|
{
|
done = true;
|
this.notifyAll();
|
}
|
}
|
|
/**
|
* Flush old change information from this changelog database.
|
* @throws DatabaseException In case of database problem.
|
*/
|
private void trim() throws DatabaseException, Exception
|
{
|
int size = 0;
|
boolean finished = false;
|
int trimage = 24*60*60*1000; // TODO : make trim-age a config parameter
|
ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage,
|
(short) 0, (short)0);
|
|
/* the trim is done by group in order to save some CPU and IO bandwidth
|
* start the transaction then do a bunch of remove then commit
|
*/
|
ChangelogCursor cursor;
|
|
cursor = db.openDeleteCursor();
|
|
try {
|
while ((size < 5000 ) && (!finished))
|
{
|
ChangeNumber changeNumber = cursor.nextChangeNumber();
|
if ((changeNumber != null) && (!changeNumber.equals(lastChange))
|
&& (changeNumber.older(trimDate)))
|
{
|
size++;
|
cursor.delete();
|
}
|
else
|
{
|
firstChange = changeNumber;
|
finished = true;
|
}
|
}
|
|
cursor.close();
|
} catch (DatabaseException e)
|
{
|
cursor.close();
|
throw (e);
|
}
|
}
|
|
/**
|
* Flush a number of updates from the memory list to the stable storage.
|
*/
|
private void flush()
|
{
|
int size;
|
|
do
|
{
|
// get N messages to save in the DB
|
List<UpdateMessage> changes = getChanges(500);
|
|
// if no more changes to save exit immediately.
|
if ((changes == null) || ((size = changes.size()) == 0))
|
return;
|
|
// save the change to the stable storage.
|
db.addEntries(changes);
|
|
// remove the changes from the list of changes to be saved.
|
clear(changes.size());
|
|
} while (size >=500);
|
}
|
|
/**
|
* This internal class is used to implement the Monitoring capabilities
|
* of the dbHandler.
|
*/
|
private class DbMonitorProvider extends MonitorProvider
|
{
|
private DbMonitorProvider()
|
{
|
super("Changelog Database");
|
}
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public ArrayList<Attribute> getMonitorData()
|
{
|
ArrayList<Attribute> attributes = new ArrayList<Attribute>();
|
attributes.add(new Attribute("changelog-database",
|
String.valueOf(serverId)));
|
attributes.add(new Attribute("base-dn", baseDn.toString()));
|
ChangeNumber first = getFirstChange();
|
ChangeNumber last = getLastChange();
|
if (first != null)
|
{
|
Date firstTime = new Date(first.getTime());
|
attributes.add(new Attribute("first-change",
|
first.toString() + " " + firstTime.toString()));
|
}
|
if (last != null)
|
{
|
Date lastTime = new Date(last.getTime());
|
attributes.add(new Attribute("last-change",
|
last.toString() + " " + lastTime.toString()));
|
}
|
|
return attributes;
|
}
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public String getMonitorInstanceName()
|
{
|
return "Changelog database " + baseDn.toString() +
|
" " + String.valueOf(serverId);
|
}
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public long getUpdateInterval()
|
{
|
/* we don't wont to do polling on this monitor */
|
return 0;
|
}
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public void initializeMonitorProvider(ConfigEntry configEntry)
|
throws ConfigException,InitializationException
|
{
|
// Nothing to do for now
|
}
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public void updateMonitorData()
|
{
|
// As long as getUpdateInterval() returns 0, this will never get called
|
}
|
}
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public String toString()
|
{
|
return(baseDn + " " + serverId + " " + firstChange + " " + lastChange);
|
}
|
|
}
|