001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2009-2010 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2016 ForgeRock AS. 016 */ 017package org.forgerock.opendj.ldap; 018 019import static org.forgerock.opendj.ldap.RequestHandlerFactoryAdapter.adaptRequestHandler; 020import static org.forgerock.util.time.Duration.duration; 021 022import java.net.InetAddress; 023import java.net.InetSocketAddress; 024import java.util.Collection; 025import java.util.concurrent.ScheduledExecutorService; 026import java.util.concurrent.ThreadLocalRandom; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.concurrent.atomic.AtomicLongArray; 030import org.forgerock.i18n.slf4j.LocalizedLogger; 031import org.forgerock.opendj.ldap.RequestLoadBalancer.PartitionedRequest; 032import org.forgerock.opendj.ldap.requests.AddRequest; 033import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest; 034import org.forgerock.opendj.ldap.requests.CompareRequest; 035import org.forgerock.opendj.ldap.requests.DeleteRequest; 036import org.forgerock.opendj.ldap.requests.DigestMD5SASLBindRequest; 037import org.forgerock.opendj.ldap.requests.GSSAPISASLBindRequest; 038import org.forgerock.opendj.ldap.requests.ModifyDNRequest; 039import org.forgerock.opendj.ldap.requests.ModifyRequest; 040import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest; 041import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest; 042import org.forgerock.opendj.ldap.requests.Request; 043import org.forgerock.opendj.ldap.requests.Requests; 044import org.forgerock.opendj.ldap.requests.SearchRequest; 045import org.forgerock.opendj.ldap.requests.SimpleBindRequest; 046import org.forgerock.util.Function; 047import org.forgerock.util.Option; 048import org.forgerock.util.Options; 049import org.forgerock.util.Reject; 050import org.forgerock.util.promise.NeverThrowsException; 051import org.forgerock.util.promise.Promise; 052import org.forgerock.util.time.Duration; 053 054import com.forgerock.opendj.ldap.CoreMessages; 055import com.forgerock.opendj.ldap.controls.AffinityControl; 056 057/** 058 * This class contains methods for creating and manipulating connection 059 * factories and connections. 060 */ 061public final class Connections { 062 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 063 064 /** 065 * Specifies the interval between successive attempts to reconnect to offline load-balanced connection factories. 066 * The default configuration is to attempt to reconnect every second. 067 */ 068 public static final Option<Duration> LOAD_BALANCER_MONITORING_INTERVAL = Option.withDefault(duration("1 seconds")); 069 070 /** 071 * Specifies the event listener which should be notified whenever a load-balanced connection factory changes state 072 * from online to offline or vice-versa. By default events will be logged to the {@code LoadBalancingAlgorithm} 073 * logger using the {@link LoadBalancerEventListener#LOG_EVENTS} listener. 074 */ 075 public static final Option<LoadBalancerEventListener> LOAD_BALANCER_EVENT_LISTENER = 076 Option.of(LoadBalancerEventListener.class, LoadBalancerEventListener.LOG_EVENTS); 077 078 /** 079 * Specifies the scheduler which will be used for periodically reconnecting to offline connection factories. A 080 * system-wide scheduler will be used by default. 081 */ 082 public static final Option<ScheduledExecutorService> LOAD_BALANCER_SCHEDULER = 083 Option.of(ScheduledExecutorService.class, null); 084 085 /** 086 * Creates a new connection pool which creates new connections as needed 087 * using the provided connection factory, but will reuse previously 088 * allocated connections when they are available. 089 * <p> 090 * Connections which have not been used for sixty seconds are closed and 091 * removed from the pool. Thus, a pool that remains idle for long enough 092 * will not contain any cached connections. 093 * <p> 094 * Connections obtained from the connection pool are guaranteed to be valid 095 * immediately before being returned to the calling application. More 096 * specifically, connections which have remained idle in the connection pool 097 * for a long time and which have been remotely closed due to a time out 098 * will never be returned. However, once a pooled connection has been 099 * obtained it is the responsibility of the calling application to handle 100 * subsequent connection failures, these being signaled via a 101 * {@link ConnectionException}. 102 * 103 * @param factory 104 * The connection factory to use for creating new connections. 105 * @return The new connection pool. 106 * @throws NullPointerException 107 * If {@code factory} was {@code null}. 108 */ 109 public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory) { 110 return new CachedConnectionPool(factory, 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, null); 111 } 112 113 /** 114 * Creates a new connection pool which creates new connections as needed 115 * using the provided connection factory, but will reuse previously 116 * allocated connections when they are available. 117 * <p> 118 * Attempts to use more than {@code maximumPoolSize} connections at once 119 * will block until a connection is released back to the pool. In other 120 * words, this pool will prevent applications from using more than 121 * {@code maximumPoolSize} connections at the same time. 122 * <p> 123 * Connections which have not been used for the provided {@code idleTimeout} 124 * period are closed and removed from the pool, until there are only 125 * {@code corePoolSize} connections remaining. 126 * <p> 127 * Connections obtained from the connection pool are guaranteed to be valid 128 * immediately before being returned to the calling application. More 129 * specifically, connections which have remained idle in the connection pool 130 * for a long time and which have been remotely closed due to a time out 131 * will never be returned. However, once a pooled connection has been 132 * obtained it is the responsibility of the calling application to handle 133 * subsequent connection failures, these being signaled via a 134 * {@link ConnectionException}. 135 * 136 * @param factory 137 * The connection factory to use for creating new connections. 138 * @param corePoolSize 139 * The minimum number of connections to keep in the pool, even if 140 * they are idle. 141 * @param maximumPoolSize 142 * The maximum number of connections to allow in the pool. 143 * @param idleTimeout 144 * The time out period, after which unused non-core connections 145 * will be closed. 146 * @param unit 147 * The time unit for the {@code keepAliveTime} argument. 148 * @return The new connection pool. 149 * @throws IllegalArgumentException 150 * If {@code corePoolSize}, {@code maximumPoolSize} are less 151 * than or equal to zero, or if {@code idleTimeout} is negative, 152 * or if {@code corePoolSize} is greater than 153 * {@code maximumPoolSize}, or if {@code idleTimeout} is 154 * non-zero and {@code unit} is {@code null}. 155 * @throws NullPointerException 156 * If {@code factory} was {@code null}. 157 */ 158 public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory, 159 final int corePoolSize, final int maximumPoolSize, final long idleTimeout, 160 final TimeUnit unit) { 161 return new CachedConnectionPool(factory, corePoolSize, maximumPoolSize, idleTimeout, unit, 162 null); 163 } 164 165 /** 166 * Creates a new connection pool which creates new connections as needed 167 * using the provided connection factory, but will reuse previously 168 * allocated connections when they are available. 169 * <p> 170 * Attempts to use more than {@code maximumPoolSize} connections at once 171 * will block until a connection is released back to the pool. In other 172 * words, this pool will prevent applications from using more than 173 * {@code maximumPoolSize} connections at the same time. 174 * <p> 175 * Connections which have not been used for the provided {@code idleTimeout} 176 * period are closed and removed from the pool, until there are only 177 * {@code corePoolSize} connections remaining. 178 * <p> 179 * Connections obtained from the connection pool are guaranteed to be valid 180 * immediately before being returned to the calling application. More 181 * specifically, connections which have remained idle in the connection pool 182 * for a long time and which have been remotely closed due to a time out 183 * will never be returned. However, once a pooled connection has been 184 * obtained it is the responsibility of the calling application to handle 185 * subsequent connection failures, these being signaled via a 186 * {@link ConnectionException}. 187 * 188 * @param factory 189 * The connection factory to use for creating new connections. 190 * @param corePoolSize 191 * The minimum number of connections to keep in the pool, even if 192 * they are idle. 193 * @param maximumPoolSize 194 * The maximum number of connections to allow in the pool. 195 * @param idleTimeout 196 * The time out period, after which unused non-core connections 197 * will be closed. 198 * @param unit 199 * The time unit for the {@code keepAliveTime} argument. 200 * @param scheduler 201 * The scheduler which should be used for periodically checking 202 * for idle connections, or {@code null} if the default scheduler 203 * should be used. 204 * @return The new connection pool. 205 * @throws IllegalArgumentException 206 * If {@code corePoolSize}, {@code maximumPoolSize} are less 207 * than or equal to zero, or if {@code idleTimeout} is negative, 208 * or if {@code corePoolSize} is greater than 209 * {@code maximumPoolSize}, or if {@code idleTimeout} is 210 * non-zero and {@code unit} is {@code null}. 211 * @throws NullPointerException 212 * If {@code factory} was {@code null}. 213 */ 214 public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory, 215 final int corePoolSize, final int maximumPoolSize, final long idleTimeout, 216 final TimeUnit unit, final ScheduledExecutorService scheduler) { 217 return new CachedConnectionPool(factory, corePoolSize, maximumPoolSize, idleTimeout, unit, 218 scheduler); 219 } 220 221 /** 222 * Creates a new connection pool which will maintain {@code poolSize} 223 * connections created using the provided connection factory. 224 * <p> 225 * Attempts to use more than {@code poolSize} connections at once will block 226 * until a connection is released back to the pool. In other words, this 227 * pool will prevent applications from using more than {@code poolSize} 228 * connections at the same time. 229 * <p> 230 * Connections obtained from the connection pool are guaranteed to be valid 231 * immediately before being returned to the calling application. More 232 * specifically, connections which have remained idle in the connection pool 233 * for a long time and which have been remotely closed due to a time out 234 * will never be returned. However, once a pooled connection has been 235 * obtained it is the responsibility of the calling application to handle 236 * subsequent connection failures, these being signaled via a 237 * {@link ConnectionException}. 238 * 239 * @param factory 240 * The connection factory to use for creating new connections. 241 * @param poolSize 242 * The maximum size of the connection pool. 243 * @return The new connection pool. 244 * @throws IllegalArgumentException 245 * If {@code poolSize} is negative. 246 * @throws NullPointerException 247 * If {@code factory} was {@code null}. 248 */ 249 public static ConnectionPool newFixedConnectionPool(final ConnectionFactory factory, 250 final int poolSize) { 251 return new CachedConnectionPool(factory, poolSize, poolSize, 0L, null, null); 252 } 253 254 /** 255 * Creates a new internal client connection which will route requests to the 256 * provided {@code RequestHandler}. 257 * <p> 258 * When processing requests, {@code RequestHandler} implementations are 259 * passed a {@code RequestContext} having a pseudo {@code requestID} which 260 * is incremented for each successive internal request on a per client 261 * connection basis. The request ID may be useful for logging purposes. 262 * <p> 263 * An internal connection does not require {@code RequestHandler} 264 * implementations to return a result when processing requests. However, it 265 * is recommended that implementations do always return results even for 266 * abandoned requests. This is because application client threads may block 267 * indefinitely waiting for results. 268 * 269 * @param requestHandler 270 * The request handler which will be used for all client 271 * connections. 272 * @return The new internal connection. 273 * @throws NullPointerException 274 * If {@code requestHandler} was {@code null}. 275 */ 276 public static Connection newInternalConnection( 277 final RequestHandler<RequestContext> requestHandler) { 278 Reject.ifNull(requestHandler); 279 return newInternalConnection(adaptRequestHandler(requestHandler)); 280 } 281 282 /** 283 * Creates a new internal client connection which will route requests to the 284 * provided {@code ServerConnection}. 285 * <p> 286 * When processing requests, {@code ServerConnection} implementations are 287 * passed an integer as the first parameter. This integer represents a 288 * pseudo {@code requestID} which is incremented for each successive 289 * internal request on a per client connection basis. The request ID may be 290 * useful for logging purposes. 291 * <p> 292 * An internal connection does not require {@code ServerConnection} 293 * implementations to return a result when processing requests. However, it 294 * is recommended that implementations do always return results even for 295 * abandoned requests. This is because application client threads may block 296 * indefinitely waiting for results. 297 * 298 * @param serverConnection 299 * The server connection. 300 * @return The new internal connection. 301 * @throws NullPointerException 302 * If {@code serverConnection} was {@code null}. 303 */ 304 public static Connection newInternalConnection(final ServerConnection<Integer> serverConnection) { 305 Reject.ifNull(serverConnection); 306 return new InternalConnection(serverConnection); 307 } 308 309 /** 310 * Creates a new connection factory which binds internal client connections 311 * to the provided {@link RequestHandler}s. 312 * <p> 313 * When processing requests, {@code RequestHandler} implementations are 314 * passed an integer as the first parameter. This integer represents a 315 * pseudo {@code requestID} which is incremented for each successive 316 * internal request on a per client connection basis. The request ID may be 317 * useful for logging purposes. 318 * <p> 319 * An internal connection factory does not require {@code RequestHandler} 320 * implementations to return a result when processing requests. However, it 321 * is recommended that implementations do always return results even for 322 * abandoned requests. This is because application client threads may block 323 * indefinitely waiting for results. 324 * 325 * @param requestHandler 326 * The request handler which will be used for all client 327 * connections. 328 * @return The new internal connection factory. 329 * @throws NullPointerException 330 * If {@code requestHandler} was {@code null}. 331 */ 332 public static ConnectionFactory newInternalConnectionFactory( 333 final RequestHandler<RequestContext> requestHandler) { 334 Reject.ifNull(requestHandler); 335 return new InternalConnectionFactory<>( 336 Connections.<Void> newServerConnectionFactory(requestHandler), null); 337 } 338 339 /** 340 * Creates a new connection factory which binds internal client connections 341 * to {@link RequestHandler}s created using the provided 342 * {@link RequestHandlerFactory}. 343 * <p> 344 * When processing requests, {@code RequestHandler} implementations are 345 * passed an integer as the first parameter. This integer represents a 346 * pseudo {@code requestID} which is incremented for each successive 347 * internal request on a per client connection basis. The request ID may be 348 * useful for logging purposes. 349 * <p> 350 * An internal connection factory does not require {@code RequestHandler} 351 * implementations to return a result when processing requests. However, it 352 * is recommended that implementations do always return results even for 353 * abandoned requests. This is because application client threads may block 354 * indefinitely waiting for results. 355 * 356 * @param <C> 357 * The type of client context. 358 * @param factory 359 * The request handler factory to use for creating connections. 360 * @param clientContext 361 * The client context. 362 * @return The new internal connection factory. 363 * @throws NullPointerException 364 * If {@code factory} was {@code null}. 365 */ 366 public static <C> ConnectionFactory newInternalConnectionFactory( 367 final RequestHandlerFactory<C, RequestContext> factory, final C clientContext) { 368 Reject.ifNull(factory); 369 return new InternalConnectionFactory<>(newServerConnectionFactory(factory), clientContext); 370 } 371 372 /** 373 * Creates a new connection factory which binds internal client connections 374 * to {@link ServerConnection}s created using the provided 375 * {@link ServerConnectionFactory}. 376 * <p> 377 * When processing requests, {@code ServerConnection} implementations are 378 * passed an integer as the first parameter. This integer represents a 379 * pseudo {@code requestID} which is incremented for each successive 380 * internal request on a per client connection basis. The request ID may be 381 * useful for logging purposes. 382 * <p> 383 * An internal connection factory does not require {@code ServerConnection} 384 * implementations to return a result when processing requests. However, it 385 * is recommended that implementations do always return results even for 386 * abandoned requests. This is because application client threads may block 387 * indefinitely waiting for results. 388 * 389 * @param <C> 390 * The type of client context. 391 * @param factory 392 * The server connection factory to use for creating connections. 393 * @param clientContext 394 * The client context. 395 * @return The new internal connection factory. 396 * @throws NullPointerException 397 * If {@code factory} was {@code null}. 398 */ 399 public static <C> ConnectionFactory newInternalConnectionFactory( 400 final ServerConnectionFactory<C, Integer> factory, final C clientContext) { 401 Reject.ifNull(factory); 402 return new InternalConnectionFactory<>(factory, clientContext); 403 } 404 405 /** 406 * Creates a new "round-robin" load-balancer which will load-balance connections across the provided set of 407 * connection factories. A round robin load balancing algorithm distributes connection requests across a list of 408 * connection factories one at a time. When the end of the list is reached, the algorithm starts again from the 409 * beginning. 410 * <p/> 411 * This algorithm is typically used for load-balancing <i>within</i> data centers, where load must be distributed 412 * equally across multiple data sources. This algorithm contrasts with the 413 * {@link #newFailoverLoadBalancer(Collection, Options)} which is used for load-balancing <i>between</i> data 414 * centers. 415 * <p/> 416 * If a problem occurs that temporarily prevents connections from being obtained for one of the connection 417 * factories, then this algorithm automatically "fails over" to the next operational connection factory in the list. 418 * If none of the connection factories are operational then a {@code ConnectionException} is returned to the 419 * client. 420 * <p/> 421 * The implementation periodically attempts to connect to failed connection factories in order to determine if they 422 * have become available again. 423 * 424 * @param factories 425 * The connection factories. 426 * @param options 427 * This configuration options for the load-balancer. 428 * @return The new round-robin load balancer. 429 * @see #newAffinityRequestLoadBalancer(Collection, Options) 430 * @see #newFailoverLoadBalancer(Collection, Options) 431 * @see #newLeastRequestsLoadBalancer(Collection, Options) 432 * @see #LOAD_BALANCER_EVENT_LISTENER 433 * @see #LOAD_BALANCER_MONITORING_INTERVAL 434 * @see #LOAD_BALANCER_SCHEDULER 435 */ 436 public static ConnectionFactory newRoundRobinLoadBalancer( 437 final Collection<? extends ConnectionFactory> factories, final Options options) { 438 return new ConnectionLoadBalancer("RoundRobinLoadBalancer", factories, options) { 439 private final int maxIndex = factories.size(); 440 private final AtomicInteger nextIndex = new AtomicInteger(-1); 441 442 @Override 443 int getInitialConnectionFactoryIndex() { 444 // A round robin pool of one connection factories is unlikely in 445 // practice and requires special treatment. 446 if (maxIndex == 1) { 447 return 0; 448 } 449 450 // Determine the next factory to use: avoid blocking algorithm. 451 int oldNextIndex; 452 int newNextIndex; 453 do { 454 oldNextIndex = nextIndex.get(); 455 newNextIndex = oldNextIndex + 1; 456 if (newNextIndex == maxIndex) { 457 newNextIndex = 0; 458 } 459 } while (!nextIndex.compareAndSet(oldNextIndex, newNextIndex)); 460 461 // There's a potential, but benign, race condition here: other threads 462 // could jump in and rotate through the list before we return the 463 // connection factory. 464 return newNextIndex; 465 } 466 }; 467 } 468 469 /** 470 * Creates a new "fail-over" load-balancer which will load-balance connections across the provided set of connection 471 * factories. A fail-over load balancing algorithm provides fault tolerance across multiple underlying connection 472 * factories. 473 * <p/> 474 * This algorithm is typically used for load-balancing <i>between</i> data centers, where there is preference to 475 * always forward connection requests to the <i>closest available</i> data center. This algorithm contrasts with the 476 * {@link #newRoundRobinLoadBalancer(Collection, Options)} which is used for load-balancing <i>within</i> a data 477 * center. 478 * <p/> 479 * This algorithm selects connection factories based on the order in which they were provided during construction. 480 * More specifically, an attempt to obtain a connection factory will always return the <i>first operational</i> 481 * connection factory in the list. Applications should, therefore, organize the connection factories such that the 482 * <i>preferred</i> (usually the closest) connection factories appear before those which are less preferred. 483 * <p/> 484 * If a problem occurs that temporarily prevents connections from being obtained for one of the connection 485 * factories, then this algorithm automatically "fails over" to the next operational connection factory in the list. 486 * If none of the connection factories are operational then a {@code ConnectionException} is returned to the 487 * client. 488 * <p/> 489 * The implementation periodically attempts to connect to failed connection factories in order to determine if they 490 * have become available again. 491 * 492 * @param factories 493 * The connection factories. 494 * @param options 495 * This configuration options for the load-balancer. 496 * @return The new fail-over load balancer. 497 * @see #newRoundRobinLoadBalancer(Collection, Options) 498 * @see #newAffinityRequestLoadBalancer(Collection, Options) 499 * @see #newLeastRequestsLoadBalancer(Collection, Options) 500 * @see #LOAD_BALANCER_EVENT_LISTENER 501 * @see #LOAD_BALANCER_MONITORING_INTERVAL 502 * @see #LOAD_BALANCER_SCHEDULER 503 */ 504 public static ConnectionFactory newFailoverLoadBalancer( 505 final Collection<? extends ConnectionFactory> factories, final Options options) { 506 return new ConnectionLoadBalancer("FailoverLoadBalancer", factories, options) { 507 @Override 508 int getInitialConnectionFactoryIndex() { 509 // Always start with the first connection factory. 510 return 0; 511 } 512 }; 513 } 514 515 /** 516 * Creates a new "affinity" load-balancer which will load-balance individual requests across the provided set of 517 * connection factories, each typically representing a single replica, using an algorithm that ensures that requests 518 * targeting a given DN will always be routed to the same replica. In other words, this load-balancer increases 519 * consistency whilst maintaining read-scalability by simulating a "single master" replication topology, where each 520 * replica is responsible for a subset of the entries. When a replica is unavailable the load-balancer "fails over" 521 * by performing a linear probe in order to find the next available replica thus ensuring high-availability when a 522 * network partition occurs while sacrificing consistency, since the unavailable replica may still be visible to 523 * other clients. 524 * <p/> 525 * This load-balancer distributes requests based on the hash of their target DN and handles all core operations, as 526 * well as any password modify extended requests and SASL bind requests which use authentication IDs having the 527 * "dn:" form. Note that subtree operations (searches, subtree deletes, and modify DN) are likely to include entries 528 * which are "mastered" on different replicas, so client applications should be more tolerant of inconsistencies. 529 * Requests that are either unrecognized or that do not have a parameter that may be considered to be a target DN 530 * will be routed randomly. 531 * <p/> 532 * <b>NOTE:</b> this connection factory returns fake connections, since real connections are obtained for each 533 * request. Therefore, the returned fake connections have certain limitations: abandon requests will be ignored 534 * since they cannot be routed; connection event listeners can be registered, but will only be notified when the 535 * fake connection is closed or when all of the connection factories are unavailable. 536 * <p/> 537 * <b>NOTE:</b> in deployments where there are multiple client applications, care should be taken to ensure that 538 * the factories are configured using the same ordering, otherwise requests will not be routed consistently 539 * across the client applications. 540 * <p/> 541 * The implementation periodically attempts to connect to failed connection factories in order to determine if they 542 * have become available again. 543 * 544 * @param factories 545 * The connection factories. 546 * @param options 547 * This configuration options for the load-balancer. 548 * @return The new affinity load balancer. 549 * @see #newRoundRobinLoadBalancer(Collection, Options) 550 * @see #newFailoverLoadBalancer(Collection, Options) 551 * @see #newLeastRequestsLoadBalancer(Collection, Options) 552 * @see #LOAD_BALANCER_EVENT_LISTENER 553 * @see #LOAD_BALANCER_MONITORING_INTERVAL 554 * @see #LOAD_BALANCER_SCHEDULER 555 */ 556 public static ConnectionFactory newAffinityRequestLoadBalancer( 557 final Collection<? extends ConnectionFactory> factories, final Options options) { 558 return new RequestLoadBalancer("AffinityRequestLoadBalancer", 559 factories, 560 options, 561 newAffinityRequestLoadBalancerNextFunction(factories), 562 NOOP_END_OF_REQUEST_FUNCTION); 563 } 564 565 static Function<Request, PartitionedRequest, NeverThrowsException> newAffinityRequestLoadBalancerNextFunction( 566 final Collection<? extends ConnectionFactory> factories) { 567 return new Function<Request, PartitionedRequest, NeverThrowsException>() { 568 private final int maxPartitionId = factories.size(); 569 570 @Override 571 public PartitionedRequest apply(final Request request) { 572 final int partitionId = computePartitionIdFromDN(dnOfRequest(request), maxPartitionId); 573 return new PartitionedRequest(request, partitionId); 574 } 575 }; 576 } 577 578 /** 579 * Returns the partition ID which should be selected based on the provided DN and number of partitions, taking care 580 * of negative hash values and especially Integer.MIN_VALUE (see doc for Math.abs()). If the provided DN is 581 * {@code null} then a random partition ID will be returned. 582 * 583 * @param dn 584 * The DN whose partition ID is to be determined, which may be {@code null}. 585 * @param numberOfPartitions 586 * The total number of partitions. 587 * @return A partition ID in the range 0 <= partitionID < numberOfPartitions. 588 */ 589 private static int computePartitionIdFromDN(final DN dn, final int numberOfPartitions) { 590 final int partitionId = dn != null ? dn.hashCode() : ThreadLocalRandom.current().nextInt(0, numberOfPartitions); 591 return partitionId == Integer.MIN_VALUE ? 0 : (Math.abs(partitionId) % numberOfPartitions); 592 } 593 594 /** 595 * Returns the DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be 596 * determined. This method will return {@code null} for most extended operations and SASL bind requests which 597 * specify a non-DN authorization ID. 598 * 599 * @param request 600 * The request whose target entry DN is to be determined. 601 * @return The DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be 602 * determined. 603 */ 604 static DN dnOfRequest(final Request request) { 605 // The following conditions are ordered such that the most common operations appear first in order to 606 // reduce the average number of branches. A better solution would be to use a visitor, but a visitor 607 // would only apply to the core operations, not extended operations or SASL binds. 608 if (request instanceof SearchRequest) { 609 return ((SearchRequest) request).getName(); 610 } else if (request instanceof ModifyRequest) { 611 return ((ModifyRequest) request).getName(); 612 } else if (request instanceof SimpleBindRequest) { 613 return dnOf(((SimpleBindRequest) request).getName()); 614 } else if (request instanceof AddRequest) { 615 return ((AddRequest) request).getName(); 616 } else if (request instanceof DeleteRequest) { 617 return ((DeleteRequest) request).getName(); 618 } else if (request instanceof CompareRequest) { 619 return ((CompareRequest) request).getName(); 620 } else if (request instanceof ModifyDNRequest) { 621 return ((ModifyDNRequest) request).getName(); 622 } else if (request instanceof PasswordModifyExtendedRequest) { 623 return dnOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString()); 624 } else if (request instanceof PlainSASLBindRequest) { 625 return dnOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID()); 626 } else if (request instanceof DigestMD5SASLBindRequest) { 627 return dnOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID()); 628 } else if (request instanceof GSSAPISASLBindRequest) { 629 return dnOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID()); 630 } else if (request instanceof CRAMMD5SASLBindRequest) { 631 return dnOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID()); 632 } else { 633 return null; 634 } 635 } 636 637 private static DN dnOfAuthzid(final String authzid) { 638 if (authzid != null && authzid.startsWith("dn:")) { 639 return dnOf(authzid.substring(3)); 640 } 641 return null; 642 } 643 644 private static DN dnOf(final String dnString) { 645 try { 646 return DN.valueOf(dnString); 647 } catch (final IllegalArgumentException ignored) { 648 return null; 649 } 650 } 651 652 /** 653 * Creates a distribution load balancer which uses consistent hashing to distributes requests across a set of 654 * partitions based on a hash of each request's target DN. More precisely, a partition is selected as follows: 655 * <ul> 656 * <li>if the targeted entry lies beneath the partition base DN then the partition is selected based on a hash of 657 * the DN which is superior to the target DN and immediately subordinate to the partition base DN</li> 658 * <li>otherwise, if the request is not a search then the request is routed to a random partition</li> 659 * <li>otherwise, if the search request targets the partition base DN, or a superior thereof, then the search is 660 * routed to a random partition or broadcast to all partitions, depending on the search scope. When broadcasting, 661 * care is taken to re-scope sub-requests in order to avoid returning duplicate entries</li> 662 * <li>otherwise, the search is routed to a random partition because its scope lies outside of the partition 663 * space.</li> 664 * </ul> 665 * This load balancer allows client applications to linearly scale their deployment for write throughput as well 666 * as total number of entries. For example, if a single replicated topology can support 10000 updates/s and a 667 * total of 100M entries, then a 4 way distributed topology could support up to 40000 updates/s and 400M entries. 668 * <p/> 669 * <b>NOTE:</b> there are a number of assumptions in the design of this load balancer as well as a number of 670 * limitations: 671 * <ul> 672 * <li>simple paged results, server side sorting, and VLV request controls are not supported for searches which 673 * traverse all partitions. </li> 674 * <li>persistent searches which traverse all partitions are only supported if they request changes only. </li> 675 * <li>requests which target an entry which is not below the partition base DN will be routed to a partition 676 * selected based on the request's DN, thereby providing affinity. Note that this behavior assumes that entries 677 * which are not below the partition base DN are replicated across all partitions.</li> 678 * <li>searches that traverse multiple partitions as well as entries above the partition base DN may return 679 * results in a non-hierarchical order. Specifically, entries from a partition (below the partition base DN) 680 * may be returned before entries above the partition base DN. Although not required by the LDAP standard, some 681 * legacy clients expect entries to be returned in hierarchical order. 682 * </li> 683 * </ul> 684 * 685 * @param partitionBaseDN 686 * The DN beneath which data is partitioned. All other data is assumed to be shared across all partitions. 687 * @param partitions 688 * The consistent hash map containing the partitions to be distributed. 689 * @param options 690 * The configuration options for the load-balancer (no options are supported currently). 691 * @return The new distribution load balancer. 692 */ 693 @SuppressWarnings("unused") 694 public static ConnectionFactory newFixedSizeDistributionLoadBalancer(final DN partitionBaseDN, 695 final ConsistentHashMap<? extends ConnectionFactory> partitions, final Options options) { 696 return new ConsistentHashDistributionLoadBalancer(partitionBaseDN, partitions); 697 } 698 699 /** 700 * Creates a new "least requests" load-balancer which will load-balance individual requests across the provided 701 * set of connection factories, each typically representing a single replica, using an algorithm that ensures that 702 * requests are routed to the replica which has the minimum number of active requests. 703 * <p> 704 * In other words, this load-balancer provides availability and partition tolerance, but sacrifices consistency. 705 * When a replica is not available, its number of active requests will not decrease until the requests time out, 706 * which will have the effect of directing requests to the other replicas. Consistency is low compared to the 707 * "affinity" load-balancer, because there is no guarantee that requests for the same DN are directed to the same 708 * replica. 709 * <p/> 710 * It is possible to increase consistency by providing a {@link AffinityControl} with a 711 * request. The control value will then be used to compute a hash that will determine the connection to use. In that 712 * case, the "least requests" behavior is completely overridden, i.e. the most saturated connection may be chosen 713 * depending on the hash value. 714 * <p/> 715 * <b>NOTE:</b> this connection factory returns fake connections, since real connections are obtained for each 716 * request. Therefore, the returned fake connections have certain limitations: abandon requests will be ignored 717 * since they cannot be routed; connection event listeners can be registered, but will only be notified when the 718 * fake connection is closed or when all of the connection factories are unavailable. 719 * <p/> 720 * <b>NOTE:</b>Server selection is only based on information which is local to the client application. If other 721 * applications are accessing the same servers then their additional load is not taken into account. Therefore, 722 * this load balancer is only effective if all client applications access the servers in a similar way. 723 * <p/> 724 * The implementation periodically attempts to connect to failed connection factories in order to determine if they 725 * have become available again. 726 * 727 * @param factories 728 * The connection factories. 729 * @param options 730 * This configuration options for the load-balancer. 731 * @return The new least requests load balancer. 732 * @see #newRoundRobinLoadBalancer(Collection, Options) 733 * @see #newFailoverLoadBalancer(Collection, Options) 734 * @see #newAffinityRequestLoadBalancer(Collection, Options) 735 * @see #LOAD_BALANCER_EVENT_LISTENER 736 * @see #LOAD_BALANCER_MONITORING_INTERVAL 737 * @see #LOAD_BALANCER_SCHEDULER 738 */ 739 public static ConnectionFactory newLeastRequestsLoadBalancer( 740 final Collection<? extends ConnectionFactory> factories, final Options options) { 741 final LeastRequestsDispatcher dispatcher = new LeastRequestsDispatcher(factories.size()); 742 return new RequestLoadBalancer("SaturationBasedRequestLoadBalancer", factories, options, 743 newLeastRequestsLoadBalancerNextFunction(dispatcher), 744 newLeastRequestsLoadBalancerEndOfRequestFunction(dispatcher)); 745 } 746 747 private static final DecodeOptions CONTROL_DECODE_OPTIONS = new DecodeOptions(); 748 749 static Function<Request, PartitionedRequest, NeverThrowsException> newLeastRequestsLoadBalancerNextFunction( 750 final LeastRequestsDispatcher dispatcher) { 751 return new Function<Request, PartitionedRequest, NeverThrowsException>() { 752 private final int maxIndex = dispatcher.size(); 753 754 @Override 755 public PartitionedRequest apply(final Request request) { 756 int affinityBasedIndex = parseAffinityRequestControl(request); 757 int finalIndex = dispatcher.selectServer(affinityBasedIndex); 758 Request cleanedRequest = (affinityBasedIndex == -1) 759 ? request : Requests.shallowCopyOfRequest(request, AffinityControl.OID); 760 return new PartitionedRequest(cleanedRequest, finalIndex); 761 } 762 763 private int parseAffinityRequestControl(final Request request) { 764 try { 765 AffinityControl control = request.getControl(AffinityControl.DECODER, CONTROL_DECODE_OPTIONS); 766 if (control != null) { 767 int index = control.getAffinityValue().hashCode(); 768 return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex); 769 } 770 } catch (DecodeException e) { 771 logger.warn(CoreMessages.WARN_DECODING_AFFINITY_CONTROL.get(e.getMessage())); 772 } 773 return -1; 774 } 775 }; 776 } 777 778 static Function<Integer, Void, NeverThrowsException> newLeastRequestsLoadBalancerEndOfRequestFunction( 779 final LeastRequestsDispatcher dispatcher) { 780 return new Function<Integer, Void, NeverThrowsException>() { 781 @Override 782 public Void apply(final Integer index) { 783 dispatcher.terminatedRequest(index); 784 return null; 785 } 786 }; 787 } 788 789 /** No-op "end of request" function for the saturation-based request load balancer. */ 790 static final Function<Integer, Void, NeverThrowsException> NOOP_END_OF_REQUEST_FUNCTION = 791 new Function<Integer, Void, NeverThrowsException>() { 792 @Override 793 public Void apply(Integer index) { 794 return null; 795 } 796 }; 797 798 /** 799 * Dispatch requests to the server index which has the least active requests. 800 * <p> 801 * A server is actually represented only by its index. Provided an initial number of servers, the requests are 802 * dispatched to the less saturated index, i.e. which corresponds to the server that has the lowest number of active 803 * requests. 804 */ 805 static class LeastRequestsDispatcher { 806 /** Counter for each server. */ 807 private final AtomicLongArray serversCounters; 808 809 LeastRequestsDispatcher(int numberOfServers) { 810 serversCounters = new AtomicLongArray(numberOfServers); 811 } 812 813 int size() { 814 return serversCounters.length(); 815 } 816 817 /** 818 * Returns the server index to use. 819 * 820 * @param forceIndex 821 * Forces a server index to use if different from -1. In that case, the default behavior of the 822 * dispatcher is overridden. If -1 is provided, then the default behavior of the dispatcher applies. 823 * @return the server index 824 */ 825 int selectServer(int forceIndex) { 826 int index = forceIndex == -1 ? getLessSaturatedIndex() : forceIndex; 827 serversCounters.incrementAndGet(index); 828 return index; 829 } 830 831 /** 832 * Signals to this dispatcher that a request has been finished for the provided server index. 833 * 834 * @param index 835 * The index of server that processed the request. 836 */ 837 void terminatedRequest(Integer index) { 838 serversCounters.decrementAndGet(index); 839 } 840 841 private int getLessSaturatedIndex() { 842 long min = Long.MAX_VALUE; 843 int minIndex = -1; 844 // Modifications during this loop are ok, effects on result should not be dramatic 845 for (int i = 0; i < serversCounters.length(); i++) { 846 long count = serversCounters.get(i); 847 if (count < min) { 848 min = count; 849 minIndex = i; 850 } 851 } 852 return minIndex; 853 } 854 } 855 856 /** 857 * Creates a new connection factory which forwards connection requests to 858 * the provided factory, but whose {@code toString} method will always 859 * return {@code name}. 860 * <p> 861 * This method may be useful for debugging purposes in order to more easily 862 * identity connection factories. 863 * 864 * @param factory 865 * The connection factory to be named. 866 * @param name 867 * The name of the connection factory. 868 * @return The named connection factory. 869 * @throws NullPointerException 870 * If {@code factory} or {@code name} was {@code null}. 871 */ 872 public static ConnectionFactory newNamedConnectionFactory(final ConnectionFactory factory, 873 final String name) { 874 Reject.ifNull(factory, name); 875 876 return new ConnectionFactory() { 877 878 @Override 879 public void close() { 880 factory.close(); 881 } 882 883 @Override 884 public Connection getConnection() throws LdapException { 885 return factory.getConnection(); 886 } 887 888 @Override 889 public Promise<Connection, LdapException> getConnectionAsync() { 890 return factory.getConnectionAsync(); 891 } 892 893 @Override 894 public String toString() { 895 return name; 896 } 897 898 }; 899 } 900 901 /** 902 * Creates a new server connection factory using the provided 903 * {@link RequestHandler}. The returned factory will manage connection and 904 * request life-cycle, including request cancellation. 905 * <p> 906 * When processing requests, {@link RequestHandler} implementations are 907 * passed a {@link RequestContext} as the first parameter which may be used 908 * for detecting whether the request should be aborted due to 909 * cancellation requests or other events, such as connection failure. 910 * <p> 911 * The returned factory maintains state information which includes a table 912 * of active requests. Therefore, {@code RequestHandler} implementations are 913 * required to always return results in order to avoid potential memory 914 * leaks. 915 * 916 * @param <C> 917 * The type of client context. 918 * @param requestHandler 919 * The request handler which will be used for all client 920 * connections. 921 * @return The new server connection factory. 922 * @throws NullPointerException 923 * If {@code requestHandler} was {@code null}. 924 */ 925 public static <C> ServerConnectionFactory<C, Integer> newServerConnectionFactory( 926 final RequestHandler<RequestContext> requestHandler) { 927 Reject.ifNull(requestHandler); 928 return new RequestHandlerFactoryAdapter<>(new RequestHandlerFactory<C, RequestContext>() { 929 @Override 930 public RequestHandler<RequestContext> handleAccept(final C clientContext) { 931 return requestHandler; 932 } 933 }); 934 } 935 936 /** 937 * Creates a new server connection factory using the provided 938 * {@link RequestHandlerFactory}. The returned factory will manage 939 * connection and request life-cycle, including request cancellation. 940 * <p> 941 * When processing requests, {@link RequestHandler} implementations are 942 * passed a {@link RequestContext} as the first parameter which may be used 943 * for detecting whether the request should be aborted due to 944 * cancellation requests or other events, such as connection failure. 945 * <p> 946 * The returned factory maintains state information which includes a table 947 * of active requests. Therefore, {@code RequestHandler} implementations are 948 * required to always return results in order to avoid potential memory 949 * leaks. 950 * 951 * @param <C> 952 * The type of client context. 953 * @param factory 954 * The request handler factory to use for associating request 955 * handlers with client connections. 956 * @return The new server connection factory. 957 * @throws NullPointerException 958 * If {@code factory} was {@code null}. 959 */ 960 public static <C> ServerConnectionFactory<C, Integer> newServerConnectionFactory( 961 final RequestHandlerFactory<C, RequestContext> factory) { 962 Reject.ifNull(factory); 963 return new RequestHandlerFactoryAdapter<>(factory); 964 } 965 966 /** 967 * Returns an uncloseable view of the provided connection. Attempts to call 968 * {@link Connection#close()} or 969 * {@link Connection#close(org.forgerock.opendj.ldap.requests.UnbindRequest, String)} 970 * will be ignored. 971 * 972 * @param connection 973 * The connection whose {@code close} methods are to be disabled. 974 * @return An uncloseable view of the provided connection. 975 */ 976 public static Connection uncloseable(Connection connection) { 977 return new AbstractConnectionWrapper<Connection>(connection) { 978 @Override 979 public void close() { 980 // Do nothing. 981 } 982 983 @Override 984 public void close(org.forgerock.opendj.ldap.requests.UnbindRequest request, 985 String reason) { 986 // Do nothing. 987 } 988 }; 989 } 990 991 /** 992 * Returns an uncloseable view of the provided connection factory. Attempts 993 * to call {@link ConnectionFactory#close()} will be ignored. 994 * 995 * @param factory 996 * The connection factory whose {@code close} method is to be 997 * disabled. 998 * @return An uncloseable view of the provided connection factory. 999 */ 1000 public static ConnectionFactory uncloseable(final ConnectionFactory factory) { 1001 return new ConnectionFactory() { 1002 1003 @Override 1004 public Promise<Connection, LdapException> getConnectionAsync() { 1005 return factory.getConnectionAsync(); 1006 } 1007 1008 @Override 1009 public Connection getConnection() throws LdapException { 1010 return factory.getConnection(); 1011 } 1012 1013 @Override 1014 public void close() { 1015 // Do nothing. 1016 } 1017 }; 1018 } 1019 1020 /** 1021 * Returns the host name associated with the provided 1022 * {@code InetSocketAddress}, without performing a DNS lookup. This method 1023 * attempts to provide functionality which is compatible with 1024 * {@code InetSocketAddress.getHostString()} in JDK7. It can be removed once 1025 * we drop support for JDK6. 1026 * 1027 * @param socketAddress 1028 * The socket address which is expected to be an instance of 1029 * {@code InetSocketAddress}. 1030 * @return The host name associated with the provided {@code SocketAddress}, 1031 * or {@code null} if it is unknown. 1032 */ 1033 public static String getHostString(final InetSocketAddress socketAddress) { 1034 /* 1035 * See OPENDJ-1270 for more information about slow DNS queries. 1036 * 1037 * We must avoid calling getHostName() in the case where it is likely to 1038 * perform a blocking DNS query. Ideally we would call getHostString(), 1039 * but the method was only added in JDK7. 1040 */ 1041 if (socketAddress.isUnresolved()) { 1042 /* 1043 * Usually socket addresses are resolved on creation. If the address 1044 * is unresolved then there must be a user provided hostname instead 1045 * and getHostName will not perform a reverse lookup. 1046 */ 1047 return socketAddress.getHostName(); 1048 } else { 1049 /* 1050 * Simulate getHostString() by parsing the toString() 1051 * representation. This assumes that the toString() representation 1052 * is stable, which I assume it is because it is documented. 1053 */ 1054 final InetAddress address = socketAddress.getAddress(); 1055 final String hostSlashIp = address.toString(); 1056 final int slashPos = hostSlashIp.indexOf('/'); 1057 if (slashPos == 0) { 1058 return hostSlashIp.substring(1); 1059 } else { 1060 return hostSlashIp.substring(0, slashPos); 1061 } 1062 } 1063 } 1064 1065 /** Prevent instantiation. */ 1066 private Connections() { 1067 // Do nothing. 1068 } 1069}