/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2008-2009 Sun Microsystems, Inc. */ package org.opends.server.backends.ndb; import com.mysql.cluster.ndbj.Ndb; import com.mysql.cluster.ndbj.NdbApiException; import com.mysql.cluster.ndbj.NdbApiTemporaryException; import com.mysql.cluster.ndbj.NdbClusterConnection; import org.opends.messages.Message; import java.util.concurrent.ConcurrentHashMap; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; import org.opends.server.types.DN; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.ResultCode; import org.opends.server.api.Backend; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.server.NdbBackendCfg; import org.opends.server.config.ConfigException; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.messages.NdbMessages.*; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.types.DebugLogLevel; /** * Root container holds all the entry containers for each base DN. * It also maintains all the openings and closings of the entry * containers. */ public class RootContainer implements ConfigurationChangeListener { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * The backend configuration. */ private NdbBackendCfg config; /** * The backend to which this entry root container belongs. */ private Backend backend; /** * The base DNs contained in this entryContainer. */ private ConcurrentHashMap entryContainers; /** * NDB connection objects. */ private static NdbClusterConnection[] ndbConns; /** * NDB handle objects. */ private static LinkedBlockingQueue ndbQueue; /** * NDB thread count. */ private int ndbThreadCount; /** * NDB number of connections. */ private int ndbNumConnections; /** * The range to use when requesting next ID. */ private static final long NDB_NEXTID_RANGE = 1000; /** * The maximum number of NDB threads. */ private static final int NDB_MAX_THREAD_COUNT = 128; /** * Timeout for the first node/group to become ready. */ private static final int NDB_TIMEOUT_FIRST_ALIVE = 60; /** * Timeout for the rest of nodes/groups to become ready. */ private static final int NDB_TIMEOUT_AFTER_FIRST_ALIVE = 60; /** * Creates a new RootContainer object. * * @param config The configuration of the NDB backend. * @param backend A reference to the NDB backend that is creating this * root container. */ public RootContainer(Backend backend, NdbBackendCfg config) { this.entryContainers = new ConcurrentHashMap(); this.backend = backend; this.config = config; this.ndbNumConnections = this.config.getNdbNumConnections(); this.ndbConns = new NdbClusterConnection[ndbNumConnections]; this.ndbThreadCount = this.config.getNdbThreadCount(); if (this.ndbThreadCount > NDB_MAX_THREAD_COUNT) { this.ndbThreadCount = NDB_MAX_THREAD_COUNT; } this.ndbQueue = new LinkedBlockingQueue( this.ndbThreadCount); config.addNdbChangeListener(this); } /** * Opens the root container using the NDB configuration object provided. * * @throws NdbApiException If an error occurs when opening. * @throws ConfigException If an configuration error occurs while opening. * @throws Exception If an unknown error occurs when opening. */ public void open() throws NdbApiException, ConfigException, Exception { // Log a message indicating upcoming NDB connect. logError(NOTE_NDB_WAITING_FOR_CLUSTER.get()); // Connect to the cluster. for (int i = 0; i < this.ndbNumConnections; i++) { try { this.ndbConns[i] = NdbClusterConnection.create( this.config.getNdbConnectString()); this.ndbConns[i].connect(5, 3, true); this.ndbConns[i].waitUntilReady(NDB_TIMEOUT_FIRST_ALIVE, NDB_TIMEOUT_AFTER_FIRST_ALIVE); } catch (NdbApiTemporaryException e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } // Retry. if (this.ndbConns[i] != null) { this.ndbConns[i].close(); this.ndbConns[i] = null; } i--; continue; } } // Get NDB objects. int connsIndex = 0; for (int i = 0; i < this.ndbThreadCount; i++) { Ndb ndb = ndbConns[connsIndex].createNdb( BackendImpl.DATABASE_NAME, 1024); connsIndex++; if (connsIndex >= this.ndbNumConnections) { connsIndex = 0; } try { this.ndbQueue.put(ndb); } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } if (ndb != null) { ndb.close(); } } } openAndRegisterEntryContainers(config.getBaseDN()); } /** * Opens the entry container for a base DN. If the entry container does not * exist for the base DN, it will be created. The entry container will be * opened with the same mode as the root container. Any entry containers * opened in a read only root container will also be read only. Any entry * containers opened in a non transactional root container will also be non * transactional. * * @param baseDN The base DN of the entry container to open. * @return The opened entry container. * @throws NdbApiException If an error occurs while opening the entry * container. * @throws ConfigException If an configuration error occurs while opening * the entry container. */ public EntryContainer openEntryContainer(DN baseDN) throws NdbApiException, ConfigException { String databasePrefix = baseDN.toNormalizedString(); EntryContainer ec = new EntryContainer(baseDN, databasePrefix, backend, config, this); ec.open(); return ec; } /** * Registeres the entry container for a base DN. * * @param baseDN The base DN of the entry container to register. * @param entryContainer The entry container to register for the baseDN. * @throws Exception If an error occurs while registering the entry * container. */ public void registerEntryContainer(DN baseDN, EntryContainer entryContainer) throws Exception { EntryContainer ec1 = this.entryContainers.get(baseDN); // If an entry container for this baseDN is already open we don't allow // another to be opened. if (ec1 != null) // FIXME: Should be NDBException instance. throw new Exception("An entry container named " + ec1.getDatabasePrefix() + " is alreadly registered for base DN " + baseDN.toString()); this.entryContainers.put(baseDN, entryContainer); } /** * Opens the entry containers for multiple base DNs. * * @param baseDNs The base DNs of the entry containers to open. * @throws NdbApiException If an error occurs while opening the entry * container. * @throws ConfigException if a configuration error occurs while opening the * container. */ private void openAndRegisterEntryContainers(Set baseDNs) throws NdbApiException, ConfigException, Exception { for(DN baseDN : baseDNs) { EntryContainer ec = openEntryContainer(baseDN); registerEntryContainer(baseDN, ec); } } /** * Unregisteres the entry container for a base DN. * * @param baseDN The base DN of the entry container to close. * @return The entry container that was unregistered or NULL if a entry * container for the base DN was not registered. */ public EntryContainer unregisterEntryContainer(DN baseDN) { return entryContainers.remove(baseDN); } /** * Close the root entryContainer. * * @throws NdbApiException If an error occurs while attempting to close * the entryContainer. */ public void close() throws NdbApiException { for(DN baseDN : entryContainers.keySet()) { EntryContainer ec = unregisterEntryContainer(baseDN); ec.exclusiveLock.lock(); try { ec.close(); } finally { ec.exclusiveLock.unlock(); } } while (!this.ndbQueue.isEmpty()) { Ndb ndb = null; try { ndb = this.ndbQueue.poll(); if (ndb != null) { ndb.close(); } } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } } this.ndbQueue.clear(); for (NdbClusterConnection ndbConn : ndbConns) { ndbConn.close(); } config.removeNdbChangeListener(this); } /** * Get NDB handle from the queue. * @return NDB handle. */ protected Ndb getNDB() { try { return ndbQueue.take(); } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } return null; } } /** * Release NDB handle to the queue. * @param ndb handle to release. */ protected void releaseNDB(Ndb ndb) { try { ndbQueue.put(ndb); } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } if (ndb != null) { ndb.close(); } return; } } /** * Return all the entry containers in this root container. * * @return The entry containers in this root container. */ public Collection getEntryContainers() { return entryContainers.values(); } /** * Returns all the baseDNs this root container stores. * * @return The set of DNs this root container stores. */ public Set getBaseDNs() { return entryContainers.keySet(); } /** * Return the entry container for a specific base DN. * * @param baseDN The base DN of the entry container to retrive. * @return The entry container for the base DN. */ public EntryContainer getEntryContainer(DN baseDN) { EntryContainer ec = null; DN nodeDN = baseDN; while (ec == null && nodeDN != null) { ec = entryContainers.get(nodeDN); if (ec == null) { nodeDN = nodeDN.getParentDNInSuffix(); } } return ec; } /** * Get the backend configuration used by this root container. * * @return The NDB backend configuration used by this root container. */ public NdbBackendCfg getConfiguration() { return config; } /** * Get the total number of entries in this root container. * * @return The number of entries in this root container * @throws NdbApiException If an error occurs while retrieving the entry * count. */ public long getEntryCount() throws NdbApiException { long entryCount = 0; for(EntryContainer ec : this.entryContainers.values()) { ec.sharedLock.lock(); try { entryCount += ec.getEntryCount(); } finally { ec.sharedLock.unlock(); } } return entryCount; } /** * Assign the next entry ID. * @param ndb Ndb handle. * @return The assigned entry ID. */ public long getNextEntryID(Ndb ndb) { long eid = 0; try { eid = ndb.getAutoIncrementValue(BackendImpl.NEXTID_TABLE, NDB_NEXTID_RANGE); } catch (NdbApiException e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } return eid; } /** * {@inheritDoc} */ public boolean isConfigurationChangeAcceptable( NdbBackendCfg cfg, List unacceptableReasons) { boolean acceptable = true; return acceptable; } /** * {@inheritDoc} */ public ConfigChangeResult applyConfigurationChange(NdbBackendCfg cfg) { ConfigChangeResult ccr; boolean adminActionRequired = false; ArrayList messages = new ArrayList(); ccr = new ConfigChangeResult(ResultCode.SUCCESS, adminActionRequired, messages); return ccr; } }