官网资料,从1.3开始到3.1都是一样的,源码将以2.0.6版本为例,官网介绍如下:
可以修改master.properties
,指定负载均衡算法:
MasterConfig.java
默认为线性负载lowerweight
:
HostManagerConfig.java
根据配置选择算法:
父类CommonHostManager
,获取有效的worker列表:
RandomSelector
RandomHostManager
AbstractSelector
用到了原子类
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
AtomicBoolean
/** ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.* Written by Doug Lea with assistance from members of JCP JSR-166* Expert Group and released to the public domain, as explained at* http://creativecommons.org/publicdomain/zero/1.0/*/package java.util.concurrent.atomic;
import sun.misc.Unsafe;/*** A {@code boolean} value that may be updated atomically. See the* {@link java.util.concurrent.atomic} package specification for* description of the properties of atomic variables. An* {@code AtomicBoolean} is used in applications such as atomically* updated flags, and cannot be used as a replacement for a* {@link java.lang.Boolean}.** @since 1.5* @author Doug Lea*/
public class AtomicBoolean implements java.io.Serializable {private static final long serialVersionUID = 4654671469794556979L;// setup to use Unsafe.compareAndSwapInt for updatesprivate static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;static {try {valueOffset = unsafe.objectFieldOffset(AtomicBoolean.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}private volatile int value;/*** Creates a new {@code AtomicBoolean} with the given initial value.** @param initialValue the initial value*/public AtomicBoolean(boolean initialValue) {value = initialValue ? 1 : 0;}/*** Creates a new {@code AtomicBoolean} with initial value {@code false}.*/public AtomicBoolean() {}/*** Returns the current value.** @return the current value*/public final boolean get() {return value != 0;}/*** Atomically sets the value to the given updated value* if the current value {@code ==} the expected value.** @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that* the actual value was not equal to the expected value.*/public final boolean compareAndSet(boolean expect, boolean update) {int e = expect ? 1 : 0;int u = update ? 1 : 0;return unsafe.compareAndSwapInt(this, valueOffset, e, u);}/*** Atomically sets the value to the given updated value* if the current value {@code ==} the expected value.** May fail* spuriously and does not provide ordering guarantees, so is* only rarely an appropriate alternative to {@code compareAndSet}.** @param expect the expected value* @param update the new value* @return {@code true} if successful*/public boolean weakCompareAndSet(boolean expect, boolean update) {int e = expect ? 1 : 0;int u = update ? 1 : 0;return unsafe.compareAndSwapInt(this, valueOffset, e, u);}/*** Unconditionally sets to the given value.** @param newValue the new value*/public final void set(boolean newValue) {value = newValue ? 1 : 0;}/*** Eventually sets to the given value.** @param newValue the new value* @since 1.6*/public final void lazySet(boolean newValue) {int v = newValue ? 1 : 0;unsafe.putOrderedInt(this, valueOffset, v);}/*** Atomically sets to the given value and returns the previous value.** @param newValue the new value* @return the previous value*/public final boolean getAndSet(boolean newValue) {boolean prev;do {prev = get();} while (!compareAndSet(prev, newValue));return prev;}/*** Returns the String representation of the current value.* @return the String representation of the current value*/public String toString() {return Boolean.toString(get());}}
AtomicLong
/** ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.* Written by Doug Lea with assistance from members of JCP JSR-166* Expert Group and released to the public domain, as explained at* http://creativecommons.org/publicdomain/zero/1.0/*/package java.util.concurrent.atomic;
import java.util.function.LongUnaryOperator;
import java.util.function.LongBinaryOperator;
import sun.misc.Unsafe;/*** A {@code long} value that may be updated atomically. See the* {@link java.util.concurrent.atomic} package specification for* description of the properties of atomic variables. An* {@code AtomicLong} is used in applications such as atomically* incremented sequence numbers, and cannot be used as a replacement* for a {@link java.lang.Long}. However, this class does extend* {@code Number} to allow uniform access by tools and utilities that* deal with numerically-based classes.** @since 1.5* @author Doug Lea*/
public class AtomicLong extends Number implements java.io.Serializable {private static final long serialVersionUID = 1927816293512124184L;// setup to use Unsafe.compareAndSwapLong for updatesprivate static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;/*** Records whether the underlying JVM supports lockless* compareAndSwap for longs. While the Unsafe.compareAndSwapLong* method works in either case, some constructions should be* handled at Java level to avoid locking user-visible locks.*/static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();/*** Returns whether underlying JVM supports lockless CompareAndSet* for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.*/private static native boolean VMSupportsCS8();static {try {valueOffset = unsafe.objectFieldOffset(AtomicLong.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}private volatile long value;/*** Creates a new AtomicLong with the given initial value.** @param initialValue the initial value*/public AtomicLong(long initialValue) {value = initialValue;}/*** Creates a new AtomicLong with initial value {@code 0}.*/public AtomicLong() {}/*** Gets the current value.** @return the current value*/public final long get() {return value;}/*** Sets to the given value.** @param newValue the new value*/public final void set(long newValue) {value = newValue;}/*** Eventually sets to the given value.** @param newValue the new value* @since 1.6*/public final void lazySet(long newValue) {unsafe.putOrderedLong(this, valueOffset, newValue);}/*** Atomically sets to the given value and returns the old value.** @param newValue the new value* @return the previous value*/public final long getAndSet(long newValue) {return unsafe.getAndSetLong(this, valueOffset, newValue);}/*** Atomically sets the value to the given updated value* if the current value {@code ==} the expected value.** @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that* the actual value was not equal to the expected value.*/public final boolean compareAndSet(long expect, long update) {return unsafe.compareAndSwapLong(this, valueOffset, expect, update);}/*** Atomically sets the value to the given updated value* if the current value {@code ==} the expected value.** May fail* spuriously and does not provide ordering guarantees, so is* only rarely an appropriate alternative to {@code compareAndSet}.** @param expect the expected value* @param update the new value* @return {@code true} if successful*/public final boolean weakCompareAndSet(long expect, long update) {return unsafe.compareAndSwapLong(this, valueOffset, expect, update);}/*** Atomically increments by one the current value.** @return the previous value*/public final long getAndIncrement() {return unsafe.getAndAddLong(this, valueOffset, 1L);}/*** Atomically decrements by one the current value.** @return the previous value*/public final long getAndDecrement() {return unsafe.getAndAddLong(this, valueOffset, -1L);}/*** Atomically adds the given value to the current value.** @param delta the value to add* @return the previous value*/public final long getAndAdd(long delta) {return unsafe.getAndAddLong(this, valueOffset, delta);}/*** Atomically increments by one the current value.** @return the updated value*/public final long incrementAndGet() {return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;}/*** Atomically decrements by one the current value.** @return the updated value*/public final long decrementAndGet() {return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;}/*** Atomically adds the given value to the current value.** @param delta the value to add* @return the updated value*/public final long addAndGet(long delta) {return unsafe.getAndAddLong(this, valueOffset, delta) + delta;}/*** Atomically updates the current value with the results of* applying the given function, returning the previous value. The* function should be side-effect-free, since it may be re-applied* when attempted updates fail due to contention among threads.** @param updateFunction a side-effect-free function* @return the previous value* @since 1.8*/public final long getAndUpdate(LongUnaryOperator updateFunction) {long prev, next;do {prev = get();next = updateFunction.applyAsLong(prev);} while (!compareAndSet(prev, next));return prev;}/*** Atomically updates the current value with the results of* applying the given function, returning the updated value. The* function should be side-effect-free, since it may be re-applied* when attempted updates fail due to contention among threads.** @param updateFunction a side-effect-free function* @return the updated value* @since 1.8*/public final long updateAndGet(LongUnaryOperator updateFunction) {long prev, next;do {prev = get();next = updateFunction.applyAsLong(prev);} while (!compareAndSet(prev, next));return next;}/*** Atomically updates the current value with the results of* applying the given function to the current and given values,* returning the previous value. The function should be* side-effect-free, since it may be re-applied when attempted* updates fail due to contention among threads. The function* is applied with the current value as its first argument,* and the given update as the second argument.** @param x the update value* @param accumulatorFunction a side-effect-free function of two arguments* @return the previous value* @since 1.8*/public final long getAndAccumulate(long x,LongBinaryOperator accumulatorFunction) {long prev, next;do {prev = get();next = accumulatorFunction.applyAsLong(prev, x);} while (!compareAndSet(prev, next));return prev;}/*** Atomically updates the current value with the results of* applying the given function to the current and given values,* returning the updated value. The function should be* side-effect-free, since it may be re-applied when attempted* updates fail due to contention among threads. The function* is applied with the current value as its first argument,* and the given update as the second argument.** @param x the update value* @param accumulatorFunction a side-effect-free function of two arguments* @return the updated value* @since 1.8*/public final long accumulateAndGet(long x,LongBinaryOperator accumulatorFunction) {long prev, next;do {prev = get();next = accumulatorFunction.applyAsLong(prev, x);} while (!compareAndSet(prev, next));return next;}/*** Returns the String representation of the current value.* @return the String representation of the current value*/public String toString() {return Long.toString(get());}/*** Returns the value of this {@code AtomicLong} as an {@code int}* after a narrowing primitive conversion.* @jls 5.1.3 Narrowing Primitive Conversions*/public int intValue() {return (int)get();}/*** Returns the value of this {@code AtomicLong} as a {@code long}.*/public long longValue() {return get();}/*** Returns the value of this {@code AtomicLong} as a {@code float}* after a widening primitive conversion.* @jls 5.1.2 Widening Primitive Conversions*/public float floatValue() {return (float)get();}/*** Returns the value of this {@code AtomicLong} as a {@code double}* after a widening primitive conversion.* @jls 5.1.2 Widening Primitive Conversions*/public double doubleValue() {return (double)get();}}
真正的源码RoundRobinSelector
…神马玩意!!!!
默认的负载均衡算法,估计也是真正使用的,另外两个就是凑数的,select方法都是单独写的
ExecutorDispatcher
:只有LowerWeightHostManager
重写了select(ExecutionContext context)
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.dolphinscheduler.server.master.dispatch.host.assign;import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;import com.google.common.collect.Lists;/*** lower weight round robin*/
public class LowerWeightRoundRobin extends AbstractSelector {/*** select** @param sources sources* @return HostWeight*/@Overridepublic HostWeight doSelect(Collection sources) {double totalWeight = 0;double lowWeight = 0;HostWeight lowerNode = null;List weights = canAssignTaskHost(sources);for (HostWeight hostWeight : weights) {totalWeight += hostWeight.getWeight();hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {lowerNode = hostWeight;lowWeight = hostWeight.getCurrentWeight();}}lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);return lowerNode;}private List canAssignTaskHost(Collection sources) {List zeroWaitingTask = sources.stream().filter(h -> h.getWaitingTaskCount() == 0).collect(Collectors.toList());if (!zeroWaitingTask.isEmpty()) {return zeroWaitingTask;}HostWeight hostWeight = sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();List waitingTask = Lists.newArrayList(hostWeight);List equalWaitingTask = sources.stream().filter(h -> !h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() == hostWeight.getWaitingTaskCount()).collect(Collectors.toList());if (!equalWaitingTask.isEmpty()) {waitingTask.addAll(equalWaitingTask);}return waitingTask;}
}
@Bean注解官网
Spring的@Bean注解用于告诉方法,产生一个Bean对象,然后这个Bean对象交给Spring管理。产生这个Bean对象的方法Spring只会调用一次,随后将这个Bean对象放在自己的IOC容器中。
SpringIOC
容器管理一个或者多个bean,这些bean都需要在@Configuration注解下进行创建,在一个方法上使用@Bean注解就表明这个方法需要交给Spring进行管理。