001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions Copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2009-2010 Sun Microsystems, Inc.
015 * Portions Copyright 2011-2016 ForgeRock AS.
016 */
017package org.forgerock.opendj.ldap;
018
019import static org.forgerock.opendj.ldap.RequestHandlerFactoryAdapter.adaptRequestHandler;
020import static org.forgerock.util.time.Duration.duration;
021
022import java.net.InetAddress;
023import java.net.InetSocketAddress;
024import java.util.Collection;
025import java.util.concurrent.ScheduledExecutorService;
026import java.util.concurrent.ThreadLocalRandom;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.concurrent.atomic.AtomicLongArray;
030import org.forgerock.i18n.slf4j.LocalizedLogger;
031import org.forgerock.opendj.ldap.RequestLoadBalancer.PartitionedRequest;
032import org.forgerock.opendj.ldap.requests.AddRequest;
033import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest;
034import org.forgerock.opendj.ldap.requests.CompareRequest;
035import org.forgerock.opendj.ldap.requests.DeleteRequest;
036import org.forgerock.opendj.ldap.requests.DigestMD5SASLBindRequest;
037import org.forgerock.opendj.ldap.requests.GSSAPISASLBindRequest;
038import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
039import org.forgerock.opendj.ldap.requests.ModifyRequest;
040import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest;
041import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest;
042import org.forgerock.opendj.ldap.requests.Request;
043import org.forgerock.opendj.ldap.requests.Requests;
044import org.forgerock.opendj.ldap.requests.SearchRequest;
045import org.forgerock.opendj.ldap.requests.SimpleBindRequest;
046import org.forgerock.util.Function;
047import org.forgerock.util.Option;
048import org.forgerock.util.Options;
049import org.forgerock.util.Reject;
050import org.forgerock.util.promise.NeverThrowsException;
051import org.forgerock.util.promise.Promise;
052import org.forgerock.util.time.Duration;
053
054import com.forgerock.opendj.ldap.CoreMessages;
055import com.forgerock.opendj.ldap.controls.AffinityControl;
056
057/**
058 * This class contains methods for creating and manipulating connection
059 * factories and connections.
060 */
061public final class Connections {
062    private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
063
064    /**
065     * Specifies the interval between successive attempts to reconnect to offline load-balanced connection factories.
066     * The default configuration is to attempt to reconnect every second.
067     */
068    public static final Option<Duration> LOAD_BALANCER_MONITORING_INTERVAL = Option.withDefault(duration("1 seconds"));
069
070    /**
071     * Specifies the event listener which should be notified whenever a load-balanced connection factory changes state
072     * from online to offline or vice-versa. By default events will be logged to the {@code LoadBalancingAlgorithm}
073     * logger using the {@link LoadBalancerEventListener#LOG_EVENTS} listener.
074     */
075    public static final Option<LoadBalancerEventListener> LOAD_BALANCER_EVENT_LISTENER =
076            Option.of(LoadBalancerEventListener.class, LoadBalancerEventListener.LOG_EVENTS);
077
078    /**
079     * Specifies the scheduler which will be used for periodically reconnecting to offline connection factories. A
080     * system-wide scheduler will be used by default.
081     */
082    public static final Option<ScheduledExecutorService> LOAD_BALANCER_SCHEDULER =
083            Option.of(ScheduledExecutorService.class, null);
084
085    /**
086     * Creates a new connection pool which creates new connections as needed
087     * using the provided connection factory, but will reuse previously
088     * allocated connections when they are available.
089     * <p>
090     * Connections which have not been used for sixty seconds are closed and
091     * removed from the pool. Thus, a pool that remains idle for long enough
092     * will not contain any cached connections.
093     * <p>
094     * Connections obtained from the connection pool are guaranteed to be valid
095     * immediately before being returned to the calling application. More
096     * specifically, connections which have remained idle in the connection pool
097     * for a long time and which have been remotely closed due to a time out
098     * will never be returned. However, once a pooled connection has been
099     * obtained it is the responsibility of the calling application to handle
100     * subsequent connection failures, these being signaled via a
101     * {@link ConnectionException}.
102     *
103     * @param factory
104     *            The connection factory to use for creating new connections.
105     * @return The new connection pool.
106     * @throws NullPointerException
107     *             If {@code factory} was {@code null}.
108     */
109    public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory) {
110        return new CachedConnectionPool(factory, 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, null);
111    }
112
113    /**
114     * Creates a new connection pool which creates new connections as needed
115     * using the provided connection factory, but will reuse previously
116     * allocated connections when they are available.
117     * <p>
118     * Attempts to use more than {@code maximumPoolSize} connections at once
119     * will block until a connection is released back to the pool. In other
120     * words, this pool will prevent applications from using more than
121     * {@code maximumPoolSize} connections at the same time.
122     * <p>
123     * Connections which have not been used for the provided {@code idleTimeout}
124     * period are closed and removed from the pool, until there are only
125     * {@code corePoolSize} connections remaining.
126     * <p>
127     * Connections obtained from the connection pool are guaranteed to be valid
128     * immediately before being returned to the calling application. More
129     * specifically, connections which have remained idle in the connection pool
130     * for a long time and which have been remotely closed due to a time out
131     * will never be returned. However, once a pooled connection has been
132     * obtained it is the responsibility of the calling application to handle
133     * subsequent connection failures, these being signaled via a
134     * {@link ConnectionException}.
135     *
136     * @param factory
137     *            The connection factory to use for creating new connections.
138     * @param corePoolSize
139     *            The minimum number of connections to keep in the pool, even if
140     *            they are idle.
141     * @param maximumPoolSize
142     *            The maximum number of connections to allow in the pool.
143     * @param idleTimeout
144     *            The time out period, after which unused non-core connections
145     *            will be closed.
146     * @param unit
147     *            The time unit for the {@code keepAliveTime} argument.
148     * @return The new connection pool.
149     * @throws IllegalArgumentException
150     *             If {@code corePoolSize}, {@code maximumPoolSize} are less
151     *             than or equal to zero, or if {@code idleTimeout} is negative,
152     *             or if {@code corePoolSize} is greater than
153     *             {@code maximumPoolSize}, or if {@code idleTimeout} is
154     *             non-zero and {@code unit} is {@code null}.
155     * @throws NullPointerException
156     *             If {@code factory} was {@code null}.
157     */
158    public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory,
159            final int corePoolSize, final int maximumPoolSize, final long idleTimeout,
160            final TimeUnit unit) {
161        return new CachedConnectionPool(factory, corePoolSize, maximumPoolSize, idleTimeout, unit,
162                null);
163    }
164
165    /**
166     * Creates a new connection pool which creates new connections as needed
167     * using the provided connection factory, but will reuse previously
168     * allocated connections when they are available.
169     * <p>
170     * Attempts to use more than {@code maximumPoolSize} connections at once
171     * will block until a connection is released back to the pool. In other
172     * words, this pool will prevent applications from using more than
173     * {@code maximumPoolSize} connections at the same time.
174     * <p>
175     * Connections which have not been used for the provided {@code idleTimeout}
176     * period are closed and removed from the pool, until there are only
177     * {@code corePoolSize} connections remaining.
178     * <p>
179     * Connections obtained from the connection pool are guaranteed to be valid
180     * immediately before being returned to the calling application. More
181     * specifically, connections which have remained idle in the connection pool
182     * for a long time and which have been remotely closed due to a time out
183     * will never be returned. However, once a pooled connection has been
184     * obtained it is the responsibility of the calling application to handle
185     * subsequent connection failures, these being signaled via a
186     * {@link ConnectionException}.
187     *
188     * @param factory
189     *            The connection factory to use for creating new connections.
190     * @param corePoolSize
191     *            The minimum number of connections to keep in the pool, even if
192     *            they are idle.
193     * @param maximumPoolSize
194     *            The maximum number of connections to allow in the pool.
195     * @param idleTimeout
196     *            The time out period, after which unused non-core connections
197     *            will be closed.
198     * @param unit
199     *            The time unit for the {@code keepAliveTime} argument.
200     * @param scheduler
201     *            The scheduler which should be used for periodically checking
202     *            for idle connections, or {@code null} if the default scheduler
203     *            should be used.
204     * @return The new connection pool.
205     * @throws IllegalArgumentException
206     *             If {@code corePoolSize}, {@code maximumPoolSize} are less
207     *             than or equal to zero, or if {@code idleTimeout} is negative,
208     *             or if {@code corePoolSize} is greater than
209     *             {@code maximumPoolSize}, or if {@code idleTimeout} is
210     *             non-zero and {@code unit} is {@code null}.
211     * @throws NullPointerException
212     *             If {@code factory} was {@code null}.
213     */
214    public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory,
215            final int corePoolSize, final int maximumPoolSize, final long idleTimeout,
216            final TimeUnit unit, final ScheduledExecutorService scheduler) {
217        return new CachedConnectionPool(factory, corePoolSize, maximumPoolSize, idleTimeout, unit,
218                scheduler);
219    }
220
221    /**
222     * Creates a new connection pool which will maintain {@code poolSize}
223     * connections created using the provided connection factory.
224     * <p>
225     * Attempts to use more than {@code poolSize} connections at once will block
226     * until a connection is released back to the pool. In other words, this
227     * pool will prevent applications from using more than {@code poolSize}
228     * connections at the same time.
229     * <p>
230     * Connections obtained from the connection pool are guaranteed to be valid
231     * immediately before being returned to the calling application. More
232     * specifically, connections which have remained idle in the connection pool
233     * for a long time and which have been remotely closed due to a time out
234     * will never be returned. However, once a pooled connection has been
235     * obtained it is the responsibility of the calling application to handle
236     * subsequent connection failures, these being signaled via a
237     * {@link ConnectionException}.
238     *
239     * @param factory
240     *            The connection factory to use for creating new connections.
241     * @param poolSize
242     *            The maximum size of the connection pool.
243     * @return The new connection pool.
244     * @throws IllegalArgumentException
245     *             If {@code poolSize} is negative.
246     * @throws NullPointerException
247     *             If {@code factory} was {@code null}.
248     */
249    public static ConnectionPool newFixedConnectionPool(final ConnectionFactory factory,
250            final int poolSize) {
251        return new CachedConnectionPool(factory, poolSize, poolSize, 0L, null, null);
252    }
253
254    /**
255     * Creates a new internal client connection which will route requests to the
256     * provided {@code RequestHandler}.
257     * <p>
258     * When processing requests, {@code RequestHandler} implementations are
259     * passed a {@code RequestContext} having a pseudo {@code requestID} which
260     * is incremented for each successive internal request on a per client
261     * connection basis. The request ID may be useful for logging purposes.
262     * <p>
263     * An internal connection does not require {@code RequestHandler}
264     * implementations to return a result when processing requests. However, it
265     * is recommended that implementations do always return results even for
266     * abandoned requests. This is because application client threads may block
267     * indefinitely waiting for results.
268     *
269     * @param requestHandler
270     *            The request handler which will be used for all client
271     *            connections.
272     * @return The new internal connection.
273     * @throws NullPointerException
274     *             If {@code requestHandler} was {@code null}.
275     */
276    public static Connection newInternalConnection(
277            final RequestHandler<RequestContext> requestHandler) {
278        Reject.ifNull(requestHandler);
279        return newInternalConnection(adaptRequestHandler(requestHandler));
280    }
281
282    /**
283     * Creates a new internal client connection which will route requests to the
284     * provided {@code ServerConnection}.
285     * <p>
286     * When processing requests, {@code ServerConnection} implementations are
287     * passed an integer as the first parameter. This integer represents a
288     * pseudo {@code requestID} which is incremented for each successive
289     * internal request on a per client connection basis. The request ID may be
290     * useful for logging purposes.
291     * <p>
292     * An internal connection does not require {@code ServerConnection}
293     * implementations to return a result when processing requests. However, it
294     * is recommended that implementations do always return results even for
295     * abandoned requests. This is because application client threads may block
296     * indefinitely waiting for results.
297     *
298     * @param serverConnection
299     *            The server connection.
300     * @return The new internal connection.
301     * @throws NullPointerException
302     *             If {@code serverConnection} was {@code null}.
303     */
304    public static Connection newInternalConnection(final ServerConnection<Integer> serverConnection) {
305        Reject.ifNull(serverConnection);
306        return new InternalConnection(serverConnection);
307    }
308
309    /**
310     * Creates a new connection factory which binds internal client connections
311     * to the provided {@link RequestHandler}s.
312     * <p>
313     * When processing requests, {@code RequestHandler} implementations are
314     * passed an integer as the first parameter. This integer represents a
315     * pseudo {@code requestID} which is incremented for each successive
316     * internal request on a per client connection basis. The request ID may be
317     * useful for logging purposes.
318     * <p>
319     * An internal connection factory does not require {@code RequestHandler}
320     * implementations to return a result when processing requests. However, it
321     * is recommended that implementations do always return results even for
322     * abandoned requests. This is because application client threads may block
323     * indefinitely waiting for results.
324     *
325     * @param requestHandler
326     *            The request handler which will be used for all client
327     *            connections.
328     * @return The new internal connection factory.
329     * @throws NullPointerException
330     *             If {@code requestHandler} was {@code null}.
331     */
332    public static ConnectionFactory newInternalConnectionFactory(
333            final RequestHandler<RequestContext> requestHandler) {
334        Reject.ifNull(requestHandler);
335        return new InternalConnectionFactory<>(
336            Connections.<Void> newServerConnectionFactory(requestHandler), null);
337    }
338
339    /**
340     * Creates a new connection factory which binds internal client connections
341     * to {@link RequestHandler}s created using the provided
342     * {@link RequestHandlerFactory}.
343     * <p>
344     * When processing requests, {@code RequestHandler} implementations are
345     * passed an integer as the first parameter. This integer represents a
346     * pseudo {@code requestID} which is incremented for each successive
347     * internal request on a per client connection basis. The request ID may be
348     * useful for logging purposes.
349     * <p>
350     * An internal connection factory does not require {@code RequestHandler}
351     * implementations to return a result when processing requests. However, it
352     * is recommended that implementations do always return results even for
353     * abandoned requests. This is because application client threads may block
354     * indefinitely waiting for results.
355     *
356     * @param <C>
357     *            The type of client context.
358     * @param factory
359     *            The request handler factory to use for creating connections.
360     * @param clientContext
361     *            The client context.
362     * @return The new internal connection factory.
363     * @throws NullPointerException
364     *             If {@code factory} was {@code null}.
365     */
366    public static <C> ConnectionFactory newInternalConnectionFactory(
367            final RequestHandlerFactory<C, RequestContext> factory, final C clientContext) {
368        Reject.ifNull(factory);
369        return new InternalConnectionFactory<>(newServerConnectionFactory(factory), clientContext);
370    }
371
372    /**
373     * Creates a new connection factory which binds internal client connections
374     * to {@link ServerConnection}s created using the provided
375     * {@link ServerConnectionFactory}.
376     * <p>
377     * When processing requests, {@code ServerConnection} implementations are
378     * passed an integer as the first parameter. This integer represents a
379     * pseudo {@code requestID} which is incremented for each successive
380     * internal request on a per client connection basis. The request ID may be
381     * useful for logging purposes.
382     * <p>
383     * An internal connection factory does not require {@code ServerConnection}
384     * implementations to return a result when processing requests. However, it
385     * is recommended that implementations do always return results even for
386     * abandoned requests. This is because application client threads may block
387     * indefinitely waiting for results.
388     *
389     * @param <C>
390     *            The type of client context.
391     * @param factory
392     *            The server connection factory to use for creating connections.
393     * @param clientContext
394     *            The client context.
395     * @return The new internal connection factory.
396     * @throws NullPointerException
397     *             If {@code factory} was {@code null}.
398     */
399    public static <C> ConnectionFactory newInternalConnectionFactory(
400            final ServerConnectionFactory<C, Integer> factory, final C clientContext) {
401        Reject.ifNull(factory);
402        return new InternalConnectionFactory<>(factory, clientContext);
403    }
404
405    /**
406     * Creates a new "round-robin" load-balancer which will load-balance connections across the provided set of
407     * connection factories. A round robin load balancing algorithm distributes connection requests across a list of
408     * connection factories one at a time. When the end of the list is reached, the algorithm starts again from the
409     * beginning.
410     * <p/>
411     * This algorithm is typically used for load-balancing <i>within</i> data centers, where load must be distributed
412     * equally across multiple data sources. This algorithm contrasts with the
413     * {@link #newFailoverLoadBalancer(Collection, Options)} which is used for load-balancing <i>between</i> data
414     * centers.
415     * <p/>
416     * If a problem occurs that temporarily prevents connections from being obtained for one of the connection
417     * factories, then this algorithm automatically "fails over" to the next operational connection factory in the list.
418     * If none of the connection factories are operational then a {@code ConnectionException} is returned to the
419     * client.
420     * <p/>
421     * The implementation periodically attempts to connect to failed connection factories in order to determine if they
422     * have become available again.
423     *
424     * @param factories
425     *         The connection factories.
426     * @param options
427     *         This configuration options for the load-balancer.
428     * @return The new round-robin load balancer.
429     * @see #newAffinityRequestLoadBalancer(Collection, Options)
430     * @see #newFailoverLoadBalancer(Collection, Options)
431     * @see #newLeastRequestsLoadBalancer(Collection, Options)
432     * @see #LOAD_BALANCER_EVENT_LISTENER
433     * @see #LOAD_BALANCER_MONITORING_INTERVAL
434     * @see #LOAD_BALANCER_SCHEDULER
435     */
436    public static ConnectionFactory newRoundRobinLoadBalancer(
437            final Collection<? extends ConnectionFactory> factories, final Options options) {
438        return new ConnectionLoadBalancer("RoundRobinLoadBalancer", factories, options) {
439            private final int maxIndex = factories.size();
440            private final AtomicInteger nextIndex = new AtomicInteger(-1);
441
442            @Override
443            int getInitialConnectionFactoryIndex() {
444                // A round robin pool of one connection factories is unlikely in
445                // practice and requires special treatment.
446                if (maxIndex == 1) {
447                    return 0;
448                }
449
450                // Determine the next factory to use: avoid blocking algorithm.
451                int oldNextIndex;
452                int newNextIndex;
453                do {
454                    oldNextIndex = nextIndex.get();
455                    newNextIndex = oldNextIndex + 1;
456                    if (newNextIndex == maxIndex) {
457                        newNextIndex = 0;
458                    }
459                } while (!nextIndex.compareAndSet(oldNextIndex, newNextIndex));
460
461                // There's a potential, but benign, race condition here: other threads
462                // could jump in and rotate through the list before we return the
463                // connection factory.
464                return newNextIndex;
465            }
466        };
467    }
468
469    /**
470     * Creates a new "fail-over" load-balancer which will load-balance connections across the provided set of connection
471     * factories. A fail-over load balancing algorithm provides fault tolerance across multiple underlying connection
472     * factories.
473     * <p/>
474     * This algorithm is typically used for load-balancing <i>between</i> data centers, where there is preference to
475     * always forward connection requests to the <i>closest available</i> data center. This algorithm contrasts with the
476     * {@link #newRoundRobinLoadBalancer(Collection, Options)} which is used for load-balancing <i>within</i> a data
477     * center.
478     * <p/>
479     * This algorithm selects connection factories based on the order in which they were provided during construction.
480     * More specifically, an attempt to obtain a connection factory will always return the <i>first operational</i>
481     * connection factory in the list. Applications should, therefore, organize the connection factories such that the
482     * <i>preferred</i> (usually the closest) connection factories appear before those which are less preferred.
483     * <p/>
484     * If a problem occurs that temporarily prevents connections from being obtained for one of the connection
485     * factories, then this algorithm automatically "fails over" to the next operational connection factory in the list.
486     * If none of the connection factories are operational then a {@code ConnectionException} is returned to the
487     * client.
488     * <p/>
489     * The implementation periodically attempts to connect to failed connection factories in order to determine if they
490     * have become available again.
491     *
492     * @param factories
493     *         The connection factories.
494     * @param options
495     *         This configuration options for the load-balancer.
496     * @return The new fail-over load balancer.
497     * @see #newRoundRobinLoadBalancer(Collection, Options)
498     * @see #newAffinityRequestLoadBalancer(Collection, Options)
499     * @see #newLeastRequestsLoadBalancer(Collection, Options)
500     * @see #LOAD_BALANCER_EVENT_LISTENER
501     * @see #LOAD_BALANCER_MONITORING_INTERVAL
502     * @see #LOAD_BALANCER_SCHEDULER
503     */
504    public static ConnectionFactory newFailoverLoadBalancer(
505            final Collection<? extends ConnectionFactory> factories, final Options options) {
506        return new ConnectionLoadBalancer("FailoverLoadBalancer", factories, options) {
507            @Override
508            int getInitialConnectionFactoryIndex() {
509                // Always start with the first connection factory.
510                return 0;
511            }
512        };
513    }
514
515    /**
516     * Creates a new "affinity" load-balancer which will load-balance individual requests across the provided set of
517     * connection factories, each typically representing a single replica, using an algorithm that ensures that requests
518     * targeting a given DN will always be routed to the same replica. In other words, this load-balancer increases
519     * consistency whilst maintaining read-scalability by simulating a "single master" replication topology, where each
520     * replica is responsible for a subset of the entries. When a replica is unavailable the load-balancer "fails over"
521     * by performing a linear probe in order to find the next available replica thus ensuring high-availability when a
522     * network partition occurs while sacrificing consistency, since the unavailable replica may still be visible to
523     * other clients.
524     * <p/>
525     * This load-balancer distributes requests based on the hash of their target DN and handles all core operations, as
526     * well as any password modify extended requests and SASL bind requests which use authentication IDs having the
527     * "dn:" form. Note that subtree operations (searches, subtree deletes, and modify DN) are likely to include entries
528     * which are "mastered" on different replicas, so client applications should be more tolerant of inconsistencies.
529     * Requests that are either unrecognized or that do not have a parameter that may be considered to be a target DN
530     * will be routed randomly.
531     * <p/>
532     * <b>NOTE:</b> this connection factory returns fake connections, since real connections are obtained for each
533     * request. Therefore, the returned fake connections have certain limitations: abandon requests will be ignored
534     * since they cannot be routed; connection event listeners can be registered, but will only be notified when the
535     * fake connection is closed or when all of the connection factories are unavailable.
536     * <p/>
537     * <b>NOTE:</b> in deployments where there are multiple client applications, care should be taken to ensure that
538     * the factories are configured using the same ordering, otherwise requests will not be routed consistently
539     * across the client applications.
540     * <p/>
541     * The implementation periodically attempts to connect to failed connection factories in order to determine if they
542     * have become available again.
543     *
544     * @param factories
545     *         The connection factories.
546     * @param options
547     *         This configuration options for the load-balancer.
548     * @return The new affinity load balancer.
549     * @see #newRoundRobinLoadBalancer(Collection, Options)
550     * @see #newFailoverLoadBalancer(Collection, Options)
551     * @see #newLeastRequestsLoadBalancer(Collection, Options)
552     * @see #LOAD_BALANCER_EVENT_LISTENER
553     * @see #LOAD_BALANCER_MONITORING_INTERVAL
554     * @see #LOAD_BALANCER_SCHEDULER
555     */
556    public static ConnectionFactory newAffinityRequestLoadBalancer(
557            final Collection<? extends ConnectionFactory> factories, final Options options) {
558        return new RequestLoadBalancer("AffinityRequestLoadBalancer",
559                                       factories,
560                                       options,
561                                       newAffinityRequestLoadBalancerNextFunction(factories),
562                                       NOOP_END_OF_REQUEST_FUNCTION);
563    }
564
565    static Function<Request, PartitionedRequest, NeverThrowsException> newAffinityRequestLoadBalancerNextFunction(
566            final Collection<? extends ConnectionFactory> factories) {
567        return new Function<Request, PartitionedRequest, NeverThrowsException>() {
568            private final int maxPartitionId = factories.size();
569
570            @Override
571            public PartitionedRequest apply(final Request request) {
572                final int partitionId = computePartitionIdFromDN(dnOfRequest(request), maxPartitionId);
573                return new PartitionedRequest(request, partitionId);
574            }
575        };
576    }
577
578    /**
579     * Returns the partition ID which should be selected based on the provided DN and number of partitions, taking care
580     * of negative hash values and especially Integer.MIN_VALUE (see doc for Math.abs()). If the provided DN is
581     * {@code null} then a random partition ID will be returned.
582     *
583     * @param dn
584     *         The DN whose partition ID is to be determined, which may be {@code null}.
585     * @param numberOfPartitions
586     *         The total number of partitions.
587     * @return A partition ID in the range 0 <= partitionID < numberOfPartitions.
588     */
589    private static int computePartitionIdFromDN(final DN dn, final int numberOfPartitions) {
590        final int partitionId = dn != null ? dn.hashCode() : ThreadLocalRandom.current().nextInt(0, numberOfPartitions);
591        return partitionId == Integer.MIN_VALUE ? 0 : (Math.abs(partitionId) % numberOfPartitions);
592    }
593
594    /**
595     * Returns the DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be
596     * determined. This method will return {@code null} for most extended operations and SASL bind requests which
597     * specify a non-DN authorization ID.
598     *
599     * @param request
600     *         The request whose target entry DN is to be determined.
601     * @return The DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be
602     *         determined.
603     */
604    static DN dnOfRequest(final Request request) {
605        // The following conditions are ordered such that the most common operations appear first in order to
606        // reduce the average number of branches. A better solution would be to use a visitor, but a visitor
607        // would only apply to the core operations, not extended operations or SASL binds.
608        if (request instanceof SearchRequest) {
609            return ((SearchRequest) request).getName();
610        } else if (request instanceof ModifyRequest) {
611            return ((ModifyRequest) request).getName();
612        } else if (request instanceof SimpleBindRequest) {
613            return dnOf(((SimpleBindRequest) request).getName());
614        } else if (request instanceof AddRequest) {
615            return ((AddRequest) request).getName();
616        } else if (request instanceof DeleteRequest) {
617            return ((DeleteRequest) request).getName();
618        } else if (request instanceof CompareRequest) {
619            return ((CompareRequest) request).getName();
620        } else if (request instanceof ModifyDNRequest) {
621            return ((ModifyDNRequest) request).getName();
622        } else if (request instanceof PasswordModifyExtendedRequest) {
623            return dnOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString());
624        } else if (request instanceof PlainSASLBindRequest) {
625            return dnOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID());
626        } else if (request instanceof DigestMD5SASLBindRequest) {
627            return dnOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID());
628        } else if (request instanceof GSSAPISASLBindRequest) {
629            return dnOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID());
630        } else if (request instanceof CRAMMD5SASLBindRequest) {
631            return dnOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID());
632        } else {
633            return null;
634        }
635    }
636
637    private static DN dnOfAuthzid(final String authzid) {
638        if (authzid != null && authzid.startsWith("dn:")) {
639            return dnOf(authzid.substring(3));
640        }
641        return null;
642    }
643
644    private static DN dnOf(final String dnString) {
645        try {
646            return DN.valueOf(dnString);
647        } catch (final IllegalArgumentException ignored) {
648            return null;
649        }
650    }
651
652    /**
653     * Creates a distribution load balancer which uses consistent hashing to distributes requests across a set of
654     * partitions based on a hash of each request's target DN. More precisely, a partition is selected as follows:
655     * <ul>
656     * <li>if the targeted entry lies beneath the partition base DN then the partition is selected based on a hash of
657     * the DN which is superior to the target DN and immediately subordinate to the partition base DN</li>
658     * <li>otherwise, if the request is not a search then the request is routed to a random partition</li>
659     * <li>otherwise, if the search request targets the partition base DN, or a superior thereof, then the search is
660     * routed to a random partition or broadcast to all partitions, depending on the search scope. When broadcasting,
661     * care is taken to re-scope sub-requests in order to avoid returning duplicate entries</li>
662     * <li>otherwise, the search is routed to a random partition because its scope lies outside of the partition
663     * space.</li>
664     * </ul>
665     * This load balancer allows client applications to linearly scale their deployment for write throughput as well
666     * as total number of entries. For example, if a single replicated topology can support 10000 updates/s and a
667     * total of 100M entries, then a 4 way distributed topology could support up to 40000 updates/s and 400M entries.
668     * <p/>
669     * <b>NOTE:</b> there are a number of assumptions in the design of this load balancer as well as a number of
670     * limitations:
671     * <ul>
672     * <li>simple paged results, server side sorting, and VLV request controls are not supported for searches which
673     * traverse all partitions. </li>
674     * <li>persistent searches which traverse all partitions are only supported if they request changes only. </li>
675     * <li>requests which target an entry which is not below the partition base DN will be routed to a partition
676     * selected based on the request's DN, thereby providing affinity. Note that this behavior assumes that entries
677     * which are not below the partition base DN are replicated across all partitions.</li>
678     * <li>searches that traverse multiple partitions as well as entries above the partition base DN may return
679     * results in a non-hierarchical order. Specifically, entries from a partition (below the partition base DN)
680     * may be returned before entries above the partition base DN. Although not required by the LDAP standard, some
681     * legacy clients expect entries to be returned in hierarchical order.
682     * </li>
683     * </ul>
684     *
685     * @param partitionBaseDN
686     *         The DN beneath which data is partitioned. All other data is assumed to be shared across all partitions.
687     * @param partitions
688     *         The consistent hash map containing the partitions to be distributed.
689     * @param options
690     *         The configuration options for the load-balancer (no options are supported currently).
691     * @return The new distribution load balancer.
692     */
693    @SuppressWarnings("unused")
694    public static ConnectionFactory newFixedSizeDistributionLoadBalancer(final DN partitionBaseDN,
695            final ConsistentHashMap<? extends ConnectionFactory> partitions, final Options options) {
696        return new ConsistentHashDistributionLoadBalancer(partitionBaseDN, partitions);
697    }
698
699    /**
700     * Creates a new "least requests" load-balancer which will load-balance individual requests across the provided
701     * set of connection factories, each typically representing a single replica, using an algorithm that ensures that
702     * requests are routed to the replica which has the minimum number of active requests.
703     * <p>
704     * In other words, this load-balancer provides availability and partition tolerance, but sacrifices consistency.
705     * When a replica is not available, its number of active requests will not decrease until the requests time out,
706     * which will have the effect of directing requests to the other replicas. Consistency is low compared to the
707     * "affinity" load-balancer, because there is no guarantee that requests for the same DN are directed to the same
708     * replica.
709     * <p/>
710     * It is possible to increase consistency by providing a {@link AffinityControl} with a
711     * request. The control value will then be used to compute a hash that will determine the connection to use. In that
712     * case, the "least requests" behavior is completely overridden, i.e. the most saturated connection may be chosen
713     * depending on the hash value.
714     * <p/>
715     * <b>NOTE:</b> this connection factory returns fake connections, since real connections are obtained for each
716     * request. Therefore, the returned fake connections have certain limitations: abandon requests will be ignored
717     * since they cannot be routed; connection event listeners can be registered, but will only be notified when the
718     * fake connection is closed or when all of the connection factories are unavailable.
719     * <p/>
720     * <b>NOTE:</b>Server selection is only based on information which is local to the client application. If other
721     * applications are accessing the same servers then their additional load is not taken into account. Therefore,
722     * this load balancer is only effective if all client applications access the servers in a similar way.
723     * <p/>
724     * The implementation periodically attempts to connect to failed connection factories in order to determine if they
725     * have become available again.
726     *
727     * @param factories
728     *            The connection factories.
729     * @param options
730     *            This configuration options for the load-balancer.
731     * @return The new least requests load balancer.
732     * @see #newRoundRobinLoadBalancer(Collection, Options)
733     * @see #newFailoverLoadBalancer(Collection, Options)
734     * @see #newAffinityRequestLoadBalancer(Collection, Options)
735     * @see #LOAD_BALANCER_EVENT_LISTENER
736     * @see #LOAD_BALANCER_MONITORING_INTERVAL
737     * @see #LOAD_BALANCER_SCHEDULER
738     */
739    public static ConnectionFactory newLeastRequestsLoadBalancer(
740            final Collection<? extends ConnectionFactory> factories, final Options options) {
741        final LeastRequestsDispatcher dispatcher = new LeastRequestsDispatcher(factories.size());
742        return new RequestLoadBalancer("SaturationBasedRequestLoadBalancer", factories, options,
743                newLeastRequestsLoadBalancerNextFunction(dispatcher),
744                newLeastRequestsLoadBalancerEndOfRequestFunction(dispatcher));
745    }
746
747    private static final DecodeOptions CONTROL_DECODE_OPTIONS = new DecodeOptions();
748
749    static Function<Request, PartitionedRequest, NeverThrowsException> newLeastRequestsLoadBalancerNextFunction(
750            final LeastRequestsDispatcher dispatcher) {
751        return new Function<Request, PartitionedRequest, NeverThrowsException>() {
752            private final int maxIndex = dispatcher.size();
753
754            @Override
755            public PartitionedRequest apply(final Request request) {
756                int affinityBasedIndex = parseAffinityRequestControl(request);
757                int finalIndex = dispatcher.selectServer(affinityBasedIndex);
758                Request cleanedRequest = (affinityBasedIndex == -1)
759                        ? request : Requests.shallowCopyOfRequest(request, AffinityControl.OID);
760                return new PartitionedRequest(cleanedRequest, finalIndex);
761            }
762
763            private int parseAffinityRequestControl(final Request request) {
764                try {
765                    AffinityControl control = request.getControl(AffinityControl.DECODER, CONTROL_DECODE_OPTIONS);
766                    if (control != null) {
767                        int index = control.getAffinityValue().hashCode();
768                        return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex);
769                    }
770                } catch (DecodeException e) {
771                    logger.warn(CoreMessages.WARN_DECODING_AFFINITY_CONTROL.get(e.getMessage()));
772                }
773                return -1;
774            }
775        };
776    }
777
778    static Function<Integer, Void, NeverThrowsException> newLeastRequestsLoadBalancerEndOfRequestFunction(
779            final LeastRequestsDispatcher dispatcher) {
780        return new Function<Integer, Void, NeverThrowsException>() {
781            @Override
782            public Void apply(final Integer index) {
783                dispatcher.terminatedRequest(index);
784                return null;
785            }
786        };
787    }
788
789    /** No-op "end of request" function for the saturation-based request load balancer. */
790    static final Function<Integer, Void, NeverThrowsException> NOOP_END_OF_REQUEST_FUNCTION =
791            new Function<Integer, Void, NeverThrowsException>() {
792                @Override
793                public Void apply(Integer index) {
794                    return null;
795                }
796            };
797
798    /**
799     * Dispatch requests to the server index which has the least active requests.
800     * <p>
801     * A server is actually represented only by its index. Provided an initial number of servers, the requests are
802     * dispatched to the less saturated index, i.e. which corresponds to the server that has the lowest number of active
803     * requests.
804     */
805    static class LeastRequestsDispatcher {
806        /** Counter for each server. */
807        private final AtomicLongArray serversCounters;
808
809        LeastRequestsDispatcher(int numberOfServers) {
810            serversCounters = new AtomicLongArray(numberOfServers);
811        }
812
813        int size() {
814            return serversCounters.length();
815        }
816
817        /**
818         * Returns the server index to use.
819         *
820         * @param forceIndex
821         *            Forces a server index to use if different from -1. In that case, the default behavior of the
822         *            dispatcher is overridden. If -1 is provided, then the default behavior of the dispatcher applies.
823         * @return the server index
824         */
825        int selectServer(int forceIndex) {
826            int index = forceIndex == -1 ? getLessSaturatedIndex() : forceIndex;
827            serversCounters.incrementAndGet(index);
828            return index;
829        }
830
831        /**
832         * Signals to this dispatcher that a request has been finished for the provided server index.
833         *
834         * @param index
835         *            The index of server that processed the request.
836         */
837        void terminatedRequest(Integer index) {
838            serversCounters.decrementAndGet(index);
839        }
840
841        private int getLessSaturatedIndex() {
842            long min = Long.MAX_VALUE;
843            int minIndex = -1;
844            // Modifications during this loop are ok, effects on result should not be dramatic
845            for (int i = 0; i < serversCounters.length(); i++) {
846                long count = serversCounters.get(i);
847                if (count < min) {
848                    min = count;
849                    minIndex = i;
850                }
851            }
852            return minIndex;
853        }
854    }
855
856    /**
857     * Creates a new connection factory which forwards connection requests to
858     * the provided factory, but whose {@code toString} method will always
859     * return {@code name}.
860     * <p>
861     * This method may be useful for debugging purposes in order to more easily
862     * identity connection factories.
863     *
864     * @param factory
865     *            The connection factory to be named.
866     * @param name
867     *            The name of the connection factory.
868     * @return The named connection factory.
869     * @throws NullPointerException
870     *             If {@code factory} or {@code name} was {@code null}.
871     */
872    public static ConnectionFactory newNamedConnectionFactory(final ConnectionFactory factory,
873            final String name) {
874        Reject.ifNull(factory, name);
875
876        return new ConnectionFactory() {
877
878            @Override
879            public void close() {
880                factory.close();
881            }
882
883            @Override
884            public Connection getConnection() throws LdapException {
885                return factory.getConnection();
886            }
887
888            @Override
889            public Promise<Connection, LdapException> getConnectionAsync() {
890                return factory.getConnectionAsync();
891            }
892
893            @Override
894            public String toString() {
895                return name;
896            }
897
898        };
899    }
900
901    /**
902     * Creates a new server connection factory using the provided
903     * {@link RequestHandler}. The returned factory will manage connection and
904     * request life-cycle, including request cancellation.
905     * <p>
906     * When processing requests, {@link RequestHandler} implementations are
907     * passed a {@link RequestContext} as the first parameter which may be used
908     * for detecting whether the request should be aborted due to
909     * cancellation requests or other events, such as connection failure.
910     * <p>
911     * The returned factory maintains state information which includes a table
912     * of active requests. Therefore, {@code RequestHandler} implementations are
913     * required to always return results in order to avoid potential memory
914     * leaks.
915     *
916     * @param <C>
917     *            The type of client context.
918     * @param requestHandler
919     *            The request handler which will be used for all client
920     *            connections.
921     * @return The new server connection factory.
922     * @throws NullPointerException
923     *             If {@code requestHandler} was {@code null}.
924     */
925    public static <C> ServerConnectionFactory<C, Integer> newServerConnectionFactory(
926            final RequestHandler<RequestContext> requestHandler) {
927        Reject.ifNull(requestHandler);
928        return new RequestHandlerFactoryAdapter<>(new RequestHandlerFactory<C, RequestContext>() {
929            @Override
930            public RequestHandler<RequestContext> handleAccept(final C clientContext) {
931                return requestHandler;
932            }
933        });
934    }
935
936    /**
937     * Creates a new server connection factory using the provided
938     * {@link RequestHandlerFactory}. The returned factory will manage
939     * connection and request life-cycle, including request cancellation.
940     * <p>
941     * When processing requests, {@link RequestHandler} implementations are
942     * passed a {@link RequestContext} as the first parameter which may be used
943     * for detecting whether the request should be aborted due to
944     * cancellation requests or other events, such as connection failure.
945     * <p>
946     * The returned factory maintains state information which includes a table
947     * of active requests. Therefore, {@code RequestHandler} implementations are
948     * required to always return results in order to avoid potential memory
949     * leaks.
950     *
951     * @param <C>
952     *            The type of client context.
953     * @param factory
954     *            The request handler factory to use for associating request
955     *            handlers with client connections.
956     * @return The new server connection factory.
957     * @throws NullPointerException
958     *             If {@code factory} was {@code null}.
959     */
960    public static <C> ServerConnectionFactory<C, Integer> newServerConnectionFactory(
961            final RequestHandlerFactory<C, RequestContext> factory) {
962        Reject.ifNull(factory);
963        return new RequestHandlerFactoryAdapter<>(factory);
964    }
965
966    /**
967     * Returns an uncloseable view of the provided connection. Attempts to call
968     * {@link Connection#close()} or
969     * {@link Connection#close(org.forgerock.opendj.ldap.requests.UnbindRequest, String)}
970     * will be ignored.
971     *
972     * @param connection
973     *            The connection whose {@code close} methods are to be disabled.
974     * @return An uncloseable view of the provided connection.
975     */
976    public static Connection uncloseable(Connection connection) {
977        return new AbstractConnectionWrapper<Connection>(connection) {
978            @Override
979            public void close() {
980                // Do nothing.
981            }
982
983            @Override
984            public void close(org.forgerock.opendj.ldap.requests.UnbindRequest request,
985                    String reason) {
986                // Do nothing.
987            }
988        };
989    }
990
991    /**
992     * Returns an uncloseable view of the provided connection factory. Attempts
993     * to call {@link ConnectionFactory#close()} will be ignored.
994     *
995     * @param factory
996     *            The connection factory whose {@code close} method is to be
997     *            disabled.
998     * @return An uncloseable view of the provided connection factory.
999     */
1000    public static ConnectionFactory uncloseable(final ConnectionFactory factory) {
1001        return new ConnectionFactory() {
1002
1003            @Override
1004            public Promise<Connection, LdapException> getConnectionAsync() {
1005                return factory.getConnectionAsync();
1006            }
1007
1008            @Override
1009            public Connection getConnection() throws LdapException {
1010                return factory.getConnection();
1011            }
1012
1013            @Override
1014            public void close() {
1015                // Do nothing.
1016            }
1017        };
1018    }
1019
1020    /**
1021     * Returns the host name associated with the provided
1022     * {@code InetSocketAddress}, without performing a DNS lookup. This method
1023     * attempts to provide functionality which is compatible with
1024     * {@code InetSocketAddress.getHostString()} in JDK7. It can be removed once
1025     * we drop support for JDK6.
1026     *
1027     * @param socketAddress
1028     *            The socket address which is expected to be an instance of
1029     *            {@code InetSocketAddress}.
1030     * @return The host name associated with the provided {@code SocketAddress},
1031     *         or {@code null} if it is unknown.
1032     */
1033    public static String getHostString(final InetSocketAddress socketAddress) {
1034        /*
1035         * See OPENDJ-1270 for more information about slow DNS queries.
1036         *
1037         * We must avoid calling getHostName() in the case where it is likely to
1038         * perform a blocking DNS query. Ideally we would call getHostString(),
1039         * but the method was only added in JDK7.
1040         */
1041        if (socketAddress.isUnresolved()) {
1042            /*
1043             * Usually socket addresses are resolved on creation. If the address
1044             * is unresolved then there must be a user provided hostname instead
1045             * and getHostName will not perform a reverse lookup.
1046             */
1047            return socketAddress.getHostName();
1048        } else {
1049            /*
1050             * Simulate getHostString() by parsing the toString()
1051             * representation. This assumes that the toString() representation
1052             * is stable, which I assume it is because it is documented.
1053             */
1054            final InetAddress address = socketAddress.getAddress();
1055            final String hostSlashIp = address.toString();
1056            final int slashPos = hostSlashIp.indexOf('/');
1057            if (slashPos == 0) {
1058                return hostSlashIp.substring(1);
1059            } else {
1060                return hostSlashIp.substring(0, slashPos);
1061            }
1062        }
1063    }
1064
1065    /** Prevent instantiation. */
1066    private Connections() {
1067        // Do nothing.
1068    }
1069}