您现在的位置是:首页 > 智能机电

client-go和golang源码中的技巧

智慧创新站 2025-05-17【智能机电】48人已围观

简介client-go中有很多比较有意思的实现,如定时器,同步机制等,可以作为移植使用。下面就遇到的一些技术讲解,首先看第一个:(/apimachinery/pkg/util/sets/)实现了对golangmap的key的处理,如计算交集,并集等。实际中可能会遇到需要判断两个map的key是否重合的场...


client-go中有很多比较有意思的实现,如定时器,同步机制等,可以作为移植使用。下面就遇到的一些技术讲解,首先看第一个:

(/apimachinery/pkg/util/sets/)

实现了对golangmap的key的处理,如计算交集,并集等。实际中可能会遇到需要判断两个map的key是否重合的场景,此时可以使用下述方式实现,函数将入参的map的key抽取成一个String类型,这样就可以使用String的方法操作key

ps:更多功能参见源码

packagemainimport("fmt""/apimachinery/pkg/util/sets")funcmain(){map1:=map[string]int{"aaa":1,"bbb":2,"ccc":3}map2:=map[string]int{"ccc":1,"ddd":2,"eee":3}newmap1:=(map1)newmap2:=(map2)((),())((()))//3个点用于把数组打散为单个元素}结果:true

同步机制(golang内置方法),用于数据同步

有2个方法:

func(m*Mutex)Lock()func(m*Mutex)Unlock()

类似C语言线程的互斥锁,用于对数据进行加解锁操作。当数据被加锁后,未获得该锁的程序将无法读取被加锁的数据。从下面例子可以看出在数据被解锁前其他协程无法对该数据进行读写操作。

ps:readdata的数据也可能为“data”

packagemainimport("fmt""sync")typeLockTeststruct{}funcmain(){lockTest:=LockTest{{},"data"}gofunc(){()("sleepbegin")(*2)("sleep")()}()(*1)gofunc(){()("readdata:",)()}()gofunc(){()("writedatabegin")="newdata"("writedata")()}()(*5)}结果:sleepbeginsleepwritedatabeginwritedatareaddata:newdata

(golang内置方法),用于数据同步

读写锁,含4个方法,前2个为读锁,后2个为写锁,使用时要一一对应。写锁会阻塞读写操作,读锁不会阻塞写操作,读锁可以有多个,读锁之间不会相互阻塞,适用于读多写少的场景。因此如果单纯使用/与使用/效果相同

func(rw*RWMutex)RLock()func(rw*RWMutex)RUnlock()func(rw*RWMutex)Lock()func(rw*RWMutex)Unlock()

读写锁一般是读锁和写锁结合使用的。在有写锁的时候,读锁会被阻塞,等待写锁释放后才能进行读操作。

ps:和一般都是内置在结构体中使用,用于保护本结构体的数据

packagemainimport("fmt""sync")typeLockTeststruct{}funcmain(){lockTest:=LockTest{{},"data"}gofunc(){()("writedatabegin")="newdata"(*3)("writedata")()}()(*1)gofunc(){()//阻塞等待写锁释放("readbegin")("readdata:",)("readbegin")()}()(*5)}结果:writedatabeginwritedatareadbeginreaddata:newdatareadbegin

(golang内置方法),用于条件变量

用于条件等待,在满足某些条件时程序才能继续执行。它包含如下3个方法:Wait()会挂起其所在的协程等待Signal()或Broadcast()的唤醒。

func(c*Cond)Wait()func(c*Cond)Signal()func(c*Cond)Broadcast()

官方推荐的典型用法如下。由于唤醒协程并不意味着条件已就绪,因此在唤醒后需要检测是否本协程的条件已经满足。

()for!condition(){()}()

使用Signal()唤醒的方式如下,Signal()用于当次唤醒一个协程。如果注释掉下例中的Signal(),那么两个协程会一直Wait(),并不会继续执行。

packagemainimport("fmt""sync")funcmain(){l:={}c:=(l)condition1:=falsecondition2:=falsegofunc(){()for!condition1{()}("condition1=true,run1")()}()gofunc(){()for!condition2{()}("condition2=true,run2")()}()(*1)("signal-1")condition1=()(*1)("signal-2")condition2=()(*10)}结果:signal-1condition1=true,run1signal-2condition2=true,run2

使用Signal()唤醒协程时需要注意,在多个协程等待时,该函数并没有指定需要唤醒哪一个协程。下面程序的输出可能为“condition1=true,run1”也可能为“condition2=true,run2”。因此Signal一般适用于仅有一个协程等待的情况,否则可能造成混乱。

packagemainimport("fmt""sync")funcmain(){l:={}c:=(l)condition1:=falsecondition2:=falsegofunc(){()for!condition1{()}("condition1=true,run1")()}()gofunc(){()for!condition2{()}("condition2=true,run2")()}()(*1)condition1=truecondition2=()(*10)}

Broadcast()比较简单,即唤醒所有等待的协程

packagemainimport("fmt""sync")funcmain(){l:={}c:=(l)condition1:=falsecondition2:=falsegofunc(){()for!condition1{()}("condition1=true,run1")()}()gofunc(){()for!condition2{()}("condition2=true,run2")()}()(*1)condition1=truecondition2=()(*10)}结果:condition1=true,run1condition2=true,run2

,用于等待协程执行完成

有如下3个方法,Add(deltaint)入参表示需要等待的协程的个数,如2表示需要等待2个协程完成;Done()表示该协程结束;Wait()用于阻塞主协程,等待所有协程结束后释放。

func(wg*WaitGroup)Add(deltaint)func(wg*WaitGroup)Done()func(wg*WaitGroup)Wait()

举例如下,启动10个协程,Wait()会阻塞,直到所有的协程执行Done()。

ps:Add(deltaint)函数的入参很重要,入参大于实际需要等待的协程会导致主协程一致阻塞,小于需要等待的协程会导致某些协程提前退出

import("fmt""sync")funcmain(){wg:={}(10)fori:=0;i10;i++{gofunc(iint){()(i,"")}(i)}()}结果:9401236578

协程间使用chan进行同步

下例中使用chan实现主协程控制write,并使用write控制read。协程关闭使用close()函数

ps:使用chan进行协程同步一般将chan作为入参传入,或在函数内部实现协程间的同步。为方便验证,下面例子将所有chan作为全局变量

packagemainimport("fmt""sync")varspeakCh=make(chanstring)varstopReadChan=make(chanstruct{})varstopWriteChan=make(chanstruct{})funcreadChan(stopCh-chanstruct{}){for{select{casewords:=-speakCh:("received:",words)case-stopCh:("stopread!")return}}}funcwriteChan(stopCh-chanstruct{}){for{select{case-stopCh:("stopwrite!")close(stopReadChan)returndefault:}speakCh-"hi"(*2)}}funcmain(){goreadChan(stopReadChan)gowriteChan(stopWriteChan)(*6)close(stopWriteChan)(*6)}结果:received:hireceived:hireceived:histopwrite!stopread!

协程间使用context进行同步

context用于对协程进行管理,如主动退出协程,超时退出协程等,可以看作是使用chan管理协程的扩展。在使用时首先创建一个context,使用cancel()可以取消context,并使用Done()返回的chan管理协程。

官方推荐的用法如下:

funcStream(,outchan-Value)error{for{v,err:=DoSomething(ctx)iferr!=nil{returnerr}select{():()caseout-v:}}}

下例中使用创建一个context,使用cancel()给这一组context发送信号,在协程中使用Done()处理退出事件。

packagemainimport("fmt""context")funcmain(){ctx,cancel:=(())gotestCtx(ctx,"ctx1")gotestCtx(ctx,"ctx2")gotestCtx(ctx,"ctx3")(*3)cancel()(*5)}functestCtx(,namestring)error{for{select{():(":",name)()default:("default:",name)(*2)}}}结果:default:ctx1default:ctx3default:ctx2default:ctx3default:ctx1default::::ctx2

创建context的方式如下,其余三个可以看作是WithCancel的扩展

funcWithCancel(parentContext)(ctxContext,cancelCancelFunc)//需要主动取消contextfuncWithDeadline(parentContext,)(Context,CancelFunc)//在deadline时间点后取消contextfuncWithTimeout(parentContext,)(Context,CancelFunc)//在超时后取消contextfuncWithValue(parentContext,key,valinterface{})Context

再看一个WithTimeout的例子,下面设置context的超时时间为3s且没有主动cancel(),3s超时后可以看到该context对应的协程正常退出

funcmain(){ctx,_:=((),*3)gotestCtx(ctx,"ctx1")gotestCtx(ctx,"ctx2")gotestCtx(ctx,"ctx3")(*5)}结果:default:ctx3default:ctx1default:ctx2default:ctx3default:ctx1default::::ctx1

context可以看作是一个树,当cancel一个context时,会同时cancle它的子context。下面首先创建一个ctx,然后在此ctx下面创建一个subctx。当执行cancle()ctx时会同时cancel()该的subctx。

()就是已经实现的首个context。

funcmain(){ctx,cancel:=(())subctx,_:=(ctx)gotestCtx(ctx,"ctx1")gotestCtx(subctx,"subctx1")gotestCtx(subctx,"subctx2")(*3)canclel()(*10)}结果:default:subctx2default:ctx1default:subctx1default:subctx2default:ctx1default::::subctx2

下例中仅cancel()subctx,可以看到并没有影响subctx的parent。

funcmain(){ctx,_:=(())subctx,subcancel:=(ctx)gotestCtx(ctx,"ctx1")gotestCtx(subctx,"subctx1")gotestCtx(subctx,"subctx2")(*3)subcancel()(*10)}结果:default:subctx1default:subctx2default:ctx1default:ctx1default:subctx1default::subctx2default::subctx1default:ctx1default:ctx1default:ctx1default:ctx1

(/apimachinery/pkg/util/wait/)

client-go中的创造性地将与chan和ctx结合,实现了协程间同步和等待全部Group中的协程结束的功能。由于StartWithChannel和StartWithContext的入参函数类型比较固定,因此使用上并不通用,但可以作为参考,在实际中扩展使用。下例中给出了简单用法。

func(g*Group)Wait()func(g*Group)StartWithChannel(stopCh-chanstruct{},ffunc(stopCh-chanstruct{}))func(g*Group)StartWithContext(,ffunc())
funcmain(){f1:=func(){for{select{():returndefault:("hi11")()}}}wg:={}ctx,cancel:=(())(ctx,f1)(*3)cancel()()}结果:hihihi

定时器ticker定时器

首先看一下一般使用的定时器,client-go中比较复杂的定时器也是在此基础上封装的。下面例子中给出的是ticker定时器,它会按照一定的时间频率往中发类型的数据,可以在协程中通过判断来执行定时任务。下例来自官方,实现每秒执行一次打印,

import("fmt""time")funcmain(){ticker:=()()done:=make(chanbool)gofunc(){(10*)done-true}()for{select{case-done:("Done!")returncaset:=-:("Currenttime:",t)}}}结果:Currenttime:2019-07-0414:30:37.9088968+0800CSTm=+5.328291301Currenttime:2019-07-0414:30:38.9089349+0800CSTm=+6.328328801Currenttime:2019-07-0414:30:39.9101415+0800CSTm=+7.329534901Currenttime:2019-07-0414:30:40.9095174+0800CSTm=+8.328910201Currenttime:2019-07-0414:30:41.9092961+0800CSTm=+9.328688301Currenttime:2019-07-0414:30:42.9087682+0800CSTm=+10.328159801Currenttime:2019-07-0414:30:43.9088604+0800CSTm=+11.328251401Currenttime:2019-07-0414:30:44.909609+0800CSTm=+12.328999501Currenttime:2019-07-0414:30:45.9094782+0800CSTm=+13.328868101Currenttime:2019-07-0414:30:46.909006+0800CSTm=+14.328395401Done!

需要注意的是使用ticker并不能保证程序被精确性调度,如果程序的执行时间大于ticker的调度周期,那么程序的触发周期会发生偏差(可能由于系统cpu占用过高,网络延迟等原因)。如下面例子中,ticker触发周期为1s,但程序执行大于2s,此时会出现程序执行频率不一致的情况。适用于周期性触发一个任务。

funcmain(){ticker:=()()done:=make(chanbool)gofunc(){(10*)done-true}()for{select{case-done:("Done!")returncaset:=-:(*2)("Currenttime:",t)}}}结果:Currenttime:2019-07-0414:56:52.5446526+0800CSTm=+5.281916601Currenttime:2019-07-0414:56:53.5452488+0800CSTm=+6.282512201//和上一条相差1s,但和下一条相差2sCurrenttime:2019-07-0414:56:55.5443528+0800CSTm=+8.281615101Currenttime:2019-07-0414:56:57.5449183+0800CSTm=+10.282179401Currenttime:2019-07-0414:56:59.5448671+0800CSTm=+12.282127101Done!

timer定时器

timer的机制和ticker相同,在定时器超时后往一个chan中发送数据。不同的是ticker可以周期性调度,timer只会执行一次,如果需要重复调度,需要使用Reset函数重置timer。利用该机制,可以在同一个timer上以不同间隔调度程序。

funcmain(){timer:=()()t:=-("Currenttime:",t)(*2)t=-("Currenttime:",t)(*3)t=-("Currenttime:",t)}结果:Currenttime:2019-07-0415:47:01.7518201+0800CSTm=+5.312710501Currenttime:2019-07-0415:47:03.7766692+0800CSTm=+7.337558501Currenttime:2019-07-0415:47:06.7770913+0800CSTm=+10.337978901

使用timer需要注意Reset函数只能在timer超时后使用,否则将无效。因为的长度只有1,如果前面一个定时器结束前执行了Reset,那么前面的定时器会被取消。具体可以参见这里

funcNewTimer(dDuration)*Timer{c:=make(chanTime,1)}

下面例子中可以看出,多次执行Reset并不会多次触发定时任务,在前一个定时器超时前执行Reset,会取消前一个定时器并以Reset中的duration开始计时。

funcmain(){("nowtime:"())timer:=(*5)()(*2)(*2)(*2)gofunc(){for;;{select{caset:=-:("Currenttime:",t)}}}()(*10)}结果:nowtime:2019-07-0416:16:31.7246084+0800CSTm=+4.281414201Currenttime:2019-07-0416:16:33.7505395+0800CSTm=+6.307344201

官方推荐的用法如下,由于没有加锁,此方法不能在多个协程中同时使用。

if!(){-}(d)

funcAfterFunc(dDuration,ffunc())*Timer函数用于在d时间超时后,执行f函数。注意返回的timer需要手动stop

timer:=(*5,func(){("timeout")})(*6)()

更多timer的用法可以参见官方文档

wait实现(/apimachinery/pkg/util/wait/)

wait中实现了很多与定时相关的函数,首先来看第一组:

funcForever(ffunc(),)funcUntil(ffunc(),,stopCh-chanstruct{})funcUntilWithContext(,ffunc(),)funcNonSlidingUntil(ffunc(),,stopCh-chanstruct{})funcNonSlidingUntilWithContext(,ffunc(),)

Until函数每period会调度f函数,如果stopCh中有停止信号,则退出。当程序运行时间超过period时,也不会退出调度循环,该特性和Ticker相同。底层使用Timer实现。

Until和NonSlidingUntil为一对,UntilWithContext和NonSlidingUntilWithContext为一对,区别只是定时器启动时间点不同,可以简单用下图表示:

这两种(带“NonSliding”前缀的)函数在处理正常程序时没有什么区别,但在一些场景下会有不同的地方。下面例子中使用处理的程序中sleep了2s,这可以表示程序因为某种原因导致超出正常处理时间。此时可以看到结果中的“num1”和“num2”是同时调用的

funcmain(){first:=truenum:=0stopCh:=make(chanstruct{})gofunc(){(*10)close(stopCh)("done")}()(func(){iftrue==first{(*2)first=false}num=num+1("num:",num,"time",())},*1,stopCh)(*100)}结果:num:1time2019-07-0421:05:59.5298524+0800CSTm=+26.277103101num:2time2019-07-0421:05:59.554999+0800CSTm=+26.302249701num:3time2019-07-0421:06:00.5559679+0800CSTm=+27.303218601num:4time2019-07-0421:06:01.5566608+0800CSTm=+28.303911501

将上述程序的替换为,得到如下结果,可以看到首次(异常)和第二次(正常)的间隔正好是中设置的调度周期,即1s。

ps:大部分场景下两者使用上并没有什么不同,毕竟正常情况下程序运行时间必然小于程序调度周期。如果需要在程序处理延时的情况下尽快进行下一次调度,则选择带”NonSliding“前缀的函数

结果:num:1time2019-07-0421:09:14.9643889+0800CSTm=+2.010865201num:2time2019-07-0421:09:15.9935285+0800CSTm=+3.040004801num:3time2019-07-0421:09:16.9956846+0800CSTm=+4.042160901

funcForever(ffunc(),)

该函数比较简单,就是取消了用于控制Until停止的stopCh。以永远不停止的方式周期性执行f函数

funcExponentialBackoff(backoffBackoff,conditionConditionFunc)error

ExponentialBackoff可以实现在函数执行错误后实现以指数退避方式的延时重试。ExponentialBackoff内部使用的是

ExponentialBackoff的首个入参Backoff如下:

Duration:表示初始的延时时间

Factor:指数退避的因子

Jitter:可以看作是偏差因子,该值越大,每次重试的延时的可选区间越大

Steps:指数退避的步数,可以看作程序的最大重试次数

Cap:用于在Factor非0时限制最大延时时间和最大重试次数,为0表示不限制最大延时时间

typeBackoffstruct{////////////,initial//////Thereturneddurationwillneverbegreaterthancap*before*jitter//`cap*(1.0+jitter)`.}

第二个参数ConditionFunc表示运行的函数,返回的bool值表示该函数是否执行成功,如果执行成功则会退出指数退避

typeConditionFuncfunc()(donebool,errerror)

下面做几组测试:

=当Factor和Jitter都为0时,可以看到调度周期是相同的,即Duration的值(1s)。

import("fmt""/apimachinery/pkg/util/wait""time")funcmain(){varDefaultRetry={Steps:5,Duration:1*,Factor:0,Jitter:0,}((DefaultRetry,func()(bool,error){(())returnfalse,nil}))}结果:2019-07-0510:17:33.9610108+0800CSTm=+0.0798311012019-07-0510:17:34.961132+0800CSTm=+1.0799523012019-07-0510:17:35.961512+0800CSTm=+2.0803323012019-07-0510:17:36.9625144+0800CSTm=+3.08-07-0510:17:37.9636334+0800CSTm=+4.082453701timedoutwaitingforthecondition

=先看Jitter对duration的影响,Jitter(duration,)的计算方式如下,如果入参的Factor为0,而Jitter非0,则将Factor调整为1。()为[0.0,1.0)的伪随机数。

将Jitter调整为0.5,根据下面计算方式预期duration为[1s,1.5s)。运行程序得出如下结果,观察可以发现,duration大概是1.4s

ifmaxFactor=0.0{maxFactor=1.0}wait:=duration+(()*maxFactor*float64(duration))
varDefaultRetry={Steps:5,Duration:1*,Factor:0,Jitter:0.5,}结果:2019-07-0510:21:49.5993445+0800CSTm=+2.3826691012019-07-0510:21:50.9026701+0800CSTm=+3.6859947012019-07-0510:21:52.3759019+0800CSTm=+5.19-07-0510:21:53.7086265+0800CSTm=+6.4919510012019-07-0510:21:54.9283913+0800CSTm=+7.711715901timedoutwaitingforthecondition

=Factor非0且Jitter为0时,对duration的调整如下

!=0{=(float64()*){==0}}

从公式中可以得出,Factor对程序执行的延的影响如下,可以看到Factor为1时并没有什么作用

duration(1)=durationduration(2)=Factor*duration(1)duration(3)=Factor*duration(2)duration(n)=Factor*duration(n-1)

Factor为1时,可以看到函数执行间隔均为1s

varDefaultRetry={Steps:5,Duration:1*,Factor:1,Jitter:0,}结果:2019-07-0510:28:50.8481017+0800CSTm=+2.3639839012019-07-0510:28:51.8482274+0800CSTm=+3.3641096012019-07-0510:28:52.8482359+0800CSTm=+4.3641182012019-07-0510:28:53.848687+0800CSTm=+5.3645693012019-07-0510:28:54.849409+0800CSTm=+6.365291201timedoutwaitingforthecondition

调整Factor为3,预期延时时间为1s,3s,9s,27s,从测试结果看与预期相符

varDefaultRetry={Steps:5,Duration:1*,Factor:3,Jitter:0,}结果:2019-07-0510:35:06.9030165+0800CSTm=+0.0777461012019-07-0510:35:07.9038392+0800CSTm=+1.0785687012019-07-0510:35:10.9038733+0800CSTm=+4.0786029012019-07-0510:35:19.9042141+0800CSTm=+13.0789436012019-07-0510:35:46.904647+0800CSTm=+40.079376501timedoutwaitingforthecondition

=当Factor和Jitter非0时的延迟计算方式如下:

save_duration(0)=durationduration(1)=Jitter(save_duration(0),)save_duration(1)=Factor*save_duration(0)duration(2)=Jitter(save_duration(1),)save_duration(2)=Factor*save_duration(1)duration(3)=Jitter(save_duration(2),)save_duration=Factor*save_duration(2)duration(n)=Jitter(save_duration(n-1),)

设置Backoff参数如下,按照上述公式得出的期望延时为[1,1.1),[3,3.3),[9,9.9),[27,29.7)。实际运行如下,小数点一位后四舍五入得出实际延时为1.1,3.3,9.6,28.2,与预期相符。

varDefaultRetry={Steps:5,Duration:1*,Factor:3,Jitter:0.1,}结果:2019-07-0511:42:54.8779046+0800CSTm=+0.19-07-0511:42:55.9399737+0800CSTm=+1.1977829012019-07-0511:42:59.2240904+0800CSTm=+4.48-07-0511:43:08.8232438+0800CSTm=+14.0807305012019-07-0511:43:37.0058953+0800CSTm=+42.262752301timedoutwaitingforthecondition

=最后看下的影响。设置Cap为10s,预期会比上面不带Cap的少执行2次(不带Cap限制的在Step为0时还会执行一次)。实际执行上也是如此

varDefaultRetry={Steps:5,Duration:1*,Factor:3,Jitter:0.1,Cap:*10,}结果:2019-07-0512:02:43.8678742+0800CSTm=+0.1206739012019-07-0512:02:44.9294079+0800CSTm=+1.19-07-0512:02:48.2125558+0800CSTm=+4.465333301

ExponentialBackoff借鉴了TCP协议的指数退避算法,适用于可能会产生资源竞争的场景。指数退避可以有效地在没有缓存处理或缓存不足的场景下减小服务端的压力。

wait库的第二组

funcPoll(interval,,conditionConditionFunc)errorfuncPollImmediate(interval,,conditionConditionFunc)errorfuncPollInfinite(,conditionConditionFunc)errorfuncPollImmediateInfinite(,conditionConditionFunc)errorfuncPollUntil(,conditionConditionFunc,stopCh-chanstruct{})errorfuncPollImmediateUntil(,conditionConditionFunc,stopCh-chanstruct{})error

Poll表示以interval的周期执行condition函数,直到timeout超时或condition返回true/err非空。

和使用上还是有些类似的,区别在于一个使用timeout限制超时时间,一个使用chan提供主动停止调度。

import("fmt""/apimachinery/pkg/util/wait""time")funcmain(){(,*5,func()(donebool,errerror){(())returnfalse,nil})结果:2019-07-0513:43:31.2622405+0800CSTm=+1.0693249012019-07-0513:43:32.2619663+0800CSTm=+2.0690507012019-07-0513:43:33.2626114+0800CSTm=+3.0696958012019-07-0513:43:34.2626876+0800CSTm=+4.0697720012019-07-0513:43:35.2624168+0800CSTm=+5.0695012012019-07-0513:43:35.2624168+0800CSTm=+5.069501201

PollInfinite相比Poll取消了timeout的限制。

PollUntil相比Until来说,PollUntil在condition函数返回true或error的时候会退出调度。

heap堆(/client-go/tools/cache)

实现heap需要实现下面Interface接口,heap使用队列实现了一个完全二叉树

//{(xinterface{})//addxaselementLen()Pop()interface{}//removeandreturnelementLen()-1.}//{//()int//Lessreportswhethertheelementwith//(i,jint)bool//(i,jint)}

heap对外提供的方法为如下:

funcInit(hInterface)funcPush(hInterface,xinterface{})funcPop(hInterface)interface{}funcRemove(hInterface,iint)interface{}funcFix(hInterface,iint)//当修改完队列中的index=i的元素后,重新排序

例子如下:

import("container/heap""fmt")funcGetAllHeapItems(tHeap_t,namestring){items:=[]interface{}{}()!=0{items=app(items,(t))}(name,":",items)}typeHeap_t[]intfunc(hHeap_t)Len()int{returnlen(h)}func(hHeap_t)Less(i,jint)bool{returnh[i]h[j]}func(hHeap_t)Swap(i,jint){h[i],h[j]=h[j],h[i]}func(h*Heap_t)Push(xinterface{}){*h=app(*h,x.(int))}func(h*Heap_t)Pop()interface{}{()==0{returnnil}x:=(*h)[len(*h)-1]*h=(*h)[0:(len(*h)-1)]returnx}funcmain(){h:=Heap_t{4,2,6,80,100,45}//[1248804562356100](h)GetAllHeapItems(*h,"h")h1:=Heap_t{4,2,6,80,100,45}(h1)(3)GetAllHeapItems(*h1,"h1")h2:=Heap_t{4,2,6,80,100,45}(h2)GetAllHeapItems(*h2,"h2")h3:=Heap_t{4,2,6,80,100,45}(h3)(*h3)[2]=200(1111,h3)(h3,2)(2222,h3)GetAllHeapItems(*h3,"h3")}结果:h:[2464580100]h1:[23464580100]h2:[2464580100]1111[242008010045]2222[244580100200]h3:[244580100200]

heap的实现比较巧妙,使用队列实现了完全二叉树,比较适用于查询频繁的场景,原理解析可以参见这里

更多使用和例子参见官方文档

klog(/klog)实现执行日志打印

使用select{}实现主协程不退出

funcmain(){select{}}

可以使用switch对地址进行判断

packagemainimport("fmt")funcmain(){typeemptyCtxintbackground:=new(emptyCtx)todo:=new(emptyCtx)typeSwitch:=func(iinterface{}){switchi{casebackground:("background")casetodo:("todo")default:("default")}}typeSwitch(background)}结果:true

限流("/x/time/rate")

使用令牌桶实现限流,它共有3组对外方法,多数场景下使用Wait,用于等待令牌。更多解析可以参见这里

func(lim*Limiter)Wait()(errerror)func(lim*Limiter)WaitN(,nint)(errerror)
func(lim*Limiter)Allow()boolfunc(lim*Limiter)AllowN(,nint)bool
func(lim*Limiter)Reserve()*Reservationfunc(lim*Limiter)ReserveN(,nint)*Reservation

Limiter使用如下函数初始化,b为令牌桶大小,初始是满的;r为每秒往桶里面放token的速率

funcNewLimiter(rLimit,bint)*Limiter

下面是Limiter的简单用法,其最终耗费时间为5s。计算方式为:

需要处理20个事件由于桶一开始是满的,所以立即可以处理已有的10个token还剩下10个事件,此时桶已经空了,每秒往桶里面放token的速率为每秒2个,因此每秒可以处理2个事件,处理10个事件需要5秒,这就是5s的由来,即(20-b)/r

ps:NewLimiter的入参r可以大于b,但其实此时大于的部分并没有意义,受限于桶的大小,多余的token会被丢弃

import("context""fmt""/x/time/rate""time")funcmain(){l:=(2,10)ctx,cancel:=(())defercancel()f:=func()error{select{():()default:}//dosomethingreturnnil}start:=()fori:=0;i20;i++{err:=(ctx)ifnil!=err{(err)((start))return}gof(ctx)}((start))}结果:5.0000404s

下例中,如果每秒处理的令牌小于2,调度频率为实际执行频率(每秒一次)

funcmain(){l:=(2,10)ctx,cancel:=(())defercancel()f:=func()error{select{():()default:}(1*)returnnil}start:=()fori:=0;i20;i++{err:=(ctx)ifnil!=err{(err)((start))return}f(ctx)}((start))}结果:20.0107691s

WaitN用于判断是否可以同时执行n个事件,每次消耗n个令牌。如下例子的总时间算法为:(5*6-10)/2=10

import("context""fmt""/x/time/rate""time")funcmain(){l:=(2,10)ctx,cancel:=(())defercancel()f:=func()error{select{():()default:}returnnil}start:=()fori:=0;i6;i++{err:=(ctx,5)ifnil!=err{(err)((start))return}f(ctx)}((start))}结果:10.0011304s

读取yaml文件

假设yaml文件内容如下,yaml文件中的每个首字母必须小写。PS:yaml文件中的所有字符建议都小写

scrapeInterval:60points:-point::LTAI2KSu0MDauu2raccessKeySecret:D3m0j7vDmrAWf33SFUh3LJRF1QGgTuproject:avacar-slb-slslogstore:avacar-slsconsumerGroupName:CursorconsumerName:Consumer1cursorPosition:END_CURSOR

在代码中定义对应的解析结构体,结构体成员首字母大写,注意每个元素后面的yaml对应的字符串需要与yaml文件中的元素对应,大小写一致。

typeConfigstruct{ScrapeIntervalint32points[]points`yaml:"points"`}typepointsstruct{pointstring`yaml:"point"`AccessKeyIDstring`yaml:"accessKeyID"`AccessKeySecretstring`yaml:"accessKeySecret"`Projectstring`yaml:"project"`Logstorestring`yaml:"logstore"`ConsumerGroupNamestring`yaml:"consumerGroupName"`ConsumerNamestring`yaml:"consumerName"`CursorPositionstring`yaml:"cursorPosition"`}

使用如下方式即可将yaml文件的内容提取出来,即config

varconfigConfigconfigContent,err:=("D:\\")iferr!=nil{("openfilefailed")return}(configContent,config)

设计的目的是用来保存和复用临时对象,以减少内存分配,降低CG压力,在大量复用变量的场景下能显著提高运行效率

typeSstruct{numint}funcBenchmarkWithPool(b*){vars*Svarpool={New:func()interface{}{returnnew(S)},}fori:=0;;i++{forj:=0;j10000;j++{s=().(*S)=1++(s)}}}funcBenchmarkWithNoPool(b*){vars*Sfori:=0;;i++{forj:=0;j10000;j++{s=S{num:1}++}}}funcmain(){t1:=().Nanosecond()BenchmarkWithPool({N:10})t2:=().Nanosecond()-("t2=",t2)t3:=().Nanosecond()BenchmarkWithNoPool({N:10})t4:=().Nanosecond()-("t4=",t4)}}

结果:

t2=1992800t4=999000

从下面可以看出,put和get是按照顺序一对一的,如果get完,则调用New函数创建一个新的元素返回

//建立对象varpipe={New:func()interface{}{return"Hello,BeiJing"}}//放入("Hello,World1")("Hello,World2")("Hello,World3")//取出(())(())(())//再取就没有了,会自动调用(())

结果:

2019/12/0215:24:47Hello,World12019/12/0215:24:47Hello,World22019/12/0215:24:47Hello,World32019/12/0215:24:47Hello,BeiJing

参考:

作者:charlieroro

出处:

很赞哦!(122)