dolphinscheduler 2.0.6 负载均衡源码
创始人
2024-02-09 14:52:26
0

目录

  • 🐬官网介绍
  • 🐬负载均衡
    • 🐠加权随机(random)
    • 🐠平滑轮询(roundrobin)
    • 🐠线性负载(lowerweight)
  • 🐵其它

🐬官网介绍


官网资料,从1.3开始到3.1都是一样的,源码将以2.0.6版本为例,官网介绍如下:
在这里插入图片描述

🐬负载均衡


可以修改master.properties,指定负载均衡算法:
在这里插入图片描述
MasterConfig.java默认为线性负载lowerweight
在这里插入图片描述
HostManagerConfig.java根据配置选择算法:
在这里插入图片描述
父类CommonHostManager,获取有效的worker列表:
在这里插入图片描述

🐠加权随机(random)


RandomSelector
RandomHostManager
AbstractSelector

在这里插入图片描述

🐠平滑轮询(roundrobin)


用到了原子类

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
  • AtomicBoolean
    在这里插入图片描述

    /** ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.* Written by Doug Lea with assistance from members of JCP JSR-166* Expert Group and released to the public domain, as explained at* http://creativecommons.org/publicdomain/zero/1.0/*/package java.util.concurrent.atomic;
    import sun.misc.Unsafe;/*** A {@code boolean} value that may be updated atomically. See the* {@link java.util.concurrent.atomic} package specification for* description of the properties of atomic variables. An* {@code AtomicBoolean} is used in applications such as atomically* updated flags, and cannot be used as a replacement for a* {@link java.lang.Boolean}.** @since 1.5* @author Doug Lea*/
    public class AtomicBoolean implements java.io.Serializable {private static final long serialVersionUID = 4654671469794556979L;// setup to use Unsafe.compareAndSwapInt for updatesprivate static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;static {try {valueOffset = unsafe.objectFieldOffset(AtomicBoolean.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}private volatile int value;/*** Creates a new {@code AtomicBoolean} with the given initial value.** @param initialValue the initial value*/public AtomicBoolean(boolean initialValue) {value = initialValue ? 1 : 0;}/*** Creates a new {@code AtomicBoolean} with initial value {@code false}.*/public AtomicBoolean() {}/*** Returns the current value.** @return the current value*/public final boolean get() {return value != 0;}/*** Atomically sets the value to the given updated value* if the current value {@code ==} the expected value.** @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that* the actual value was not equal to the expected value.*/public final boolean compareAndSet(boolean expect, boolean update) {int e = expect ? 1 : 0;int u = update ? 1 : 0;return unsafe.compareAndSwapInt(this, valueOffset, e, u);}/*** Atomically sets the value to the given updated value* if the current value {@code ==} the expected value.** 

    May fail* spuriously and does not provide ordering guarantees, so is* only rarely an appropriate alternative to {@code compareAndSet}.** @param expect the expected value* @param update the new value* @return {@code true} if successful*/public boolean weakCompareAndSet(boolean expect, boolean update) {int e = expect ? 1 : 0;int u = update ? 1 : 0;return unsafe.compareAndSwapInt(this, valueOffset, e, u);}/*** Unconditionally sets to the given value.** @param newValue the new value*/public final void set(boolean newValue) {value = newValue ? 1 : 0;}/*** Eventually sets to the given value.** @param newValue the new value* @since 1.6*/public final void lazySet(boolean newValue) {int v = newValue ? 1 : 0;unsafe.putOrderedInt(this, valueOffset, v);}/*** Atomically sets to the given value and returns the previous value.** @param newValue the new value* @return the previous value*/public final boolean getAndSet(boolean newValue) {boolean prev;do {prev = get();} while (!compareAndSet(prev, newValue));return prev;}/*** Returns the String representation of the current value.* @return the String representation of the current value*/public String toString() {return Boolean.toString(get());}}

  • AtomicLong
    在这里插入图片描述

    /** ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.* Written by Doug Lea with assistance from members of JCP JSR-166* Expert Group and released to the public domain, as explained at* http://creativecommons.org/publicdomain/zero/1.0/*/package java.util.concurrent.atomic;
    import java.util.function.LongUnaryOperator;
    import java.util.function.LongBinaryOperator;
    import sun.misc.Unsafe;/*** A {@code long} value that may be updated atomically.  See the* {@link java.util.concurrent.atomic} package specification for* description of the properties of atomic variables. An* {@code AtomicLong} is used in applications such as atomically* incremented sequence numbers, and cannot be used as a replacement* for a {@link java.lang.Long}. However, this class does extend* {@code Number} to allow uniform access by tools and utilities that* deal with numerically-based classes.** @since 1.5* @author Doug Lea*/
    public class AtomicLong extends Number implements java.io.Serializable {private static final long serialVersionUID = 1927816293512124184L;// setup to use Unsafe.compareAndSwapLong for updatesprivate static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;/*** Records whether the underlying JVM supports lockless* compareAndSwap for longs. While the Unsafe.compareAndSwapLong* method works in either case, some constructions should be* handled at Java level to avoid locking user-visible locks.*/static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();/*** Returns whether underlying JVM supports lockless CompareAndSet* for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.*/private static native boolean VMSupportsCS8();static {try {valueOffset = unsafe.objectFieldOffset(AtomicLong.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}private volatile long value;/*** Creates a new AtomicLong with the given initial value.** @param initialValue the initial value*/public AtomicLong(long initialValue) {value = initialValue;}/*** Creates a new AtomicLong with initial value {@code 0}.*/public AtomicLong() {}/*** Gets the current value.** @return the current value*/public final long get() {return value;}/*** Sets to the given value.** @param newValue the new value*/public final void set(long newValue) {value = newValue;}/*** Eventually sets to the given value.** @param newValue the new value* @since 1.6*/public final void lazySet(long newValue) {unsafe.putOrderedLong(this, valueOffset, newValue);}/*** Atomically sets to the given value and returns the old value.** @param newValue the new value* @return the previous value*/public final long getAndSet(long newValue) {return unsafe.getAndSetLong(this, valueOffset, newValue);}/*** Atomically sets the value to the given updated value* if the current value {@code ==} the expected value.** @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that* the actual value was not equal to the expected value.*/public final boolean compareAndSet(long expect, long update) {return unsafe.compareAndSwapLong(this, valueOffset, expect, update);}/*** Atomically sets the value to the given updated value* if the current value {@code ==} the expected value.** 

    May fail* spuriously and does not provide ordering guarantees, so is* only rarely an appropriate alternative to {@code compareAndSet}.** @param expect the expected value* @param update the new value* @return {@code true} if successful*/public final boolean weakCompareAndSet(long expect, long update) {return unsafe.compareAndSwapLong(this, valueOffset, expect, update);}/*** Atomically increments by one the current value.** @return the previous value*/public final long getAndIncrement() {return unsafe.getAndAddLong(this, valueOffset, 1L);}/*** Atomically decrements by one the current value.** @return the previous value*/public final long getAndDecrement() {return unsafe.getAndAddLong(this, valueOffset, -1L);}/*** Atomically adds the given value to the current value.** @param delta the value to add* @return the previous value*/public final long getAndAdd(long delta) {return unsafe.getAndAddLong(this, valueOffset, delta);}/*** Atomically increments by one the current value.** @return the updated value*/public final long incrementAndGet() {return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;}/*** Atomically decrements by one the current value.** @return the updated value*/public final long decrementAndGet() {return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;}/*** Atomically adds the given value to the current value.** @param delta the value to add* @return the updated value*/public final long addAndGet(long delta) {return unsafe.getAndAddLong(this, valueOffset, delta) + delta;}/*** Atomically updates the current value with the results of* applying the given function, returning the previous value. The* function should be side-effect-free, since it may be re-applied* when attempted updates fail due to contention among threads.** @param updateFunction a side-effect-free function* @return the previous value* @since 1.8*/public final long getAndUpdate(LongUnaryOperator updateFunction) {long prev, next;do {prev = get();next = updateFunction.applyAsLong(prev);} while (!compareAndSet(prev, next));return prev;}/*** Atomically updates the current value with the results of* applying the given function, returning the updated value. The* function should be side-effect-free, since it may be re-applied* when attempted updates fail due to contention among threads.** @param updateFunction a side-effect-free function* @return the updated value* @since 1.8*/public final long updateAndGet(LongUnaryOperator updateFunction) {long prev, next;do {prev = get();next = updateFunction.applyAsLong(prev);} while (!compareAndSet(prev, next));return next;}/*** Atomically updates the current value with the results of* applying the given function to the current and given values,* returning the previous value. The function should be* side-effect-free, since it may be re-applied when attempted* updates fail due to contention among threads. The function* is applied with the current value as its first argument,* and the given update as the second argument.** @param x the update value* @param accumulatorFunction a side-effect-free function of two arguments* @return the previous value* @since 1.8*/public final long getAndAccumulate(long x,LongBinaryOperator accumulatorFunction) {long prev, next;do {prev = get();next = accumulatorFunction.applyAsLong(prev, x);} while (!compareAndSet(prev, next));return prev;}/*** Atomically updates the current value with the results of* applying the given function to the current and given values,* returning the updated value. The function should be* side-effect-free, since it may be re-applied when attempted* updates fail due to contention among threads. The function* is applied with the current value as its first argument,* and the given update as the second argument.** @param x the update value* @param accumulatorFunction a side-effect-free function of two arguments* @return the updated value* @since 1.8*/public final long accumulateAndGet(long x,LongBinaryOperator accumulatorFunction) {long prev, next;do {prev = get();next = accumulatorFunction.applyAsLong(prev, x);} while (!compareAndSet(prev, next));return next;}/*** Returns the String representation of the current value.* @return the String representation of the current value*/public String toString() {return Long.toString(get());}/*** Returns the value of this {@code AtomicLong} as an {@code int}* after a narrowing primitive conversion.* @jls 5.1.3 Narrowing Primitive Conversions*/public int intValue() {return (int)get();}/*** Returns the value of this {@code AtomicLong} as a {@code long}.*/public long longValue() {return get();}/*** Returns the value of this {@code AtomicLong} as a {@code float}* after a widening primitive conversion.* @jls 5.1.2 Widening Primitive Conversions*/public float floatValue() {return (float)get();}/*** Returns the value of this {@code AtomicLong} as a {@code double}* after a widening primitive conversion.* @jls 5.1.2 Widening Primitive Conversions*/public double doubleValue() {return (double)get();}}

真正的源码RoundRobinSelector…神马玩意!!!!
在这里插入图片描述

🐠线性负载(lowerweight)


默认的负载均衡算法,估计也是真正使用的,另外两个就是凑数的,select方法都是单独写的
在这里插入图片描述
ExecutorDispatcher:只有LowerWeightHostManager重写了select(ExecutionContext context)
在这里插入图片描述

  • LowerWeightRoundRobin
    在这里插入图片描述
    /** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.dolphinscheduler.server.master.dispatch.host.assign;import java.util.Collection;
    import java.util.Comparator;
    import java.util.List;
    import java.util.stream.Collectors;import com.google.common.collect.Lists;/*** lower weight round robin*/
    public class LowerWeightRoundRobin extends AbstractSelector {/*** select** @param sources sources* @return HostWeight*/@Overridepublic HostWeight doSelect(Collection sources) {double totalWeight = 0;double lowWeight = 0;HostWeight lowerNode = null;List weights = canAssignTaskHost(sources);for (HostWeight hostWeight : weights) {totalWeight += hostWeight.getWeight();hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {lowerNode = hostWeight;lowWeight = hostWeight.getCurrentWeight();}}lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);return lowerNode;}private List canAssignTaskHost(Collection sources) {List zeroWaitingTask = sources.stream().filter(h -> h.getWaitingTaskCount() == 0).collect(Collectors.toList());if (!zeroWaitingTask.isEmpty()) {return zeroWaitingTask;}HostWeight hostWeight = sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();List waitingTask = Lists.newArrayList(hostWeight);List equalWaitingTask = sources.stream().filter(h -> !h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() == hostWeight.getWaitingTaskCount()).collect(Collectors.toList());if (!equalWaitingTask.isEmpty()) {waitingTask.addAll(equalWaitingTask);}return waitingTask;}
    }

🐵其它


@Bean注解官网

Spring的@Bean注解用于告诉方法,产生一个Bean对象,然后这个Bean对象交给Spring管理。产生这个Bean对象的方法Spring只会调用一次,随后将这个Bean对象放在自己的IOC容器中。

SpringIOC
容器管理一个或者多个bean,这些bean都需要在@Configuration注解下进行创建,在一个方法上使用@Bean注解就表明这个方法需要交给Spring进行管理。

在这里插入图片描述

相关内容

热门资讯

前7个月广义财政支出超21万亿... 更加积极财政政策陆续落地,推动经济平稳运行。 根据财政部最新数据,今年前7个月广义财政(全国一般公共...
顺发恒能股份:关联方资金往来及... 8月21日,顺发恒能股份公司发布公告,其关联方资金往来及对外担保管理制度修订内容已经公司2025年8...
美联储政策框架巨变在即:稳通胀... 鲍威尔周五要放大招!美联储货币政策新框架即将出炉,新框架可能强调稳定通胀是良好就业市场的基础,重塑政...
深圳市人社局提醒:第一代社保卡... 近日,深圳市人力资源和社会保障局提醒,目前我市正在发行第三代加载金融功能的社会保障卡,第一代社保卡已...
锐明技术发布对外担保管理制度,... 2025年8月21日,锐明技术发布公告,公布了其对外担保管理制度。 该制度旨在规范深圳市锐明技术股份...
芜湖福赛科技股份有限公司发布对... 2025年8月21日,芜湖福赛科技股份有限公司发布公告,公布其对外担保管理制度。 该制度旨在规范公司...
并购贷款政策十年大修: 参股型... [ 《办法》将占并购交易价款的比例上限从60%提升至70%,要求权益性资金不低于30%;参股型并购贷...
HWG!罗马诺:AC米兰签药厂... 直播吧08月21日讯 罗马诺以标志性的Here we go确认,AC米兰将租借勒沃库森前锋博尼法斯。...
贝蒂斯全力追求安东尼:租借+买... 在这个转会窗口即将关闭之际,西甲球队贝蒂斯正在全力以赴地追求曼联的年轻天才安东尼。根据记者Álvar...
全国首例!女子出差时被领导性侵... 2023年9月22日一次商务酒宴,作为公司销售总监的崔丽丽受公司指派赴杭州出差,期间参加商务宴请后醉...