网站建设功能文档,网络平台推广引流,python如何做网页,出台网站集约化建设通知近段时间在做云原生AI算力平台#xff0c;之前提到使用k8s informer机制管控多渠道提交的训练任务。 上面第4点#xff1a; informer会监听通过cli和网页portal提交的job#xff0c; 回显到portal平台#xff0c;并在job发生状态变更时通知用户。 1. informer是实现声明式c…近段时间在做云原生AI算力平台之前提到使用k8s informer机制管控多渠道提交的训练任务。上面第4点informer会监听通过cli和网页portal提交的job 回显到portal平台并在job发生状态变更时通知用户。1. informer是实现声明式controller设计的核心k8s采用声明式设计 以结果为导向 实现这一关键能力的组件是k8s各种Controller定义某对象的期望状态实时监控并达成这个状态调谐Reconcile No signal was sent. No webhook fired。informer是k8s client-go库的一部分① 监听资源② 并本地缓存③ 通知上层应用发生了一些事件job创建、job pending、job运行、job完成/失败减少了apiserver的调用流量、优化性能、反应式自动化运维 我当前的需求有点类似于反应式自动化运维。2. informer核心使用流程运行一个完整的informer list --- watch --- cache--- react。① 从apiserver拉取指定的gvr资源, 形成首次资源快照② 持续监听资源的变更事件进deltaFIFO队列这里是通过HTTP/1.1 的Chunked Transfer Encoding分块传输编码来实现的③ 通过上述①②两步得到资源的最新状态并缓存注意缓存的是资源对象而不是资源变更事件 另外是线程安全的存储。④ 事件处理应用在业务层面的动作可以写日志可以做controller的Reconcile动作。开发者主要考量在reactEventHandler阶段其余能力client-go sdk会提供。apiserver--reflector(拉取/监听)--DeltaFiFO(队列)-- Process处理--Handler用户代码2.1 Watch机制 chunked transfer encoding分块传输能力是http1.1 常见功能不需要像websocket那样升级协议到帧格式http连接中每个事件是独立的直到连接关闭。请求spiserver时查询参数加上watchtrue, 会提示服务器本次是监听请求 响应核心特征是响应头包含Transfer-Encoding: chunked验证终端1kubectl proxy --port8081 在主机上8081端口代理aiserver服务 终端2 curl http://localhost:8081/api/v1/namespaces/team-a/pods?watchtrue --verbose在client-go sdk中会为informer watch建立稳定的长连接断线重连、重试等3. 一个常规的informer实践利用kubeconfig创建informer绑定gvr 启动informer带终止信道package main import ( fmt time v1 k8s.io/api/core/v1 k8s.io/apimachinery/pkg/fields k8s.io/client-go/kubernetes k8s.io/client-go/tools/cache k8s.io/client-go/tools/clientcmd ) func main() { config, _ : clientcmd.BuildConfigFromFlags(, /home/user/.kube/config) clientset, _ : kubernetes.NewForConfig(config) podInformer : cache.NewSharedIndexInformer( cache.NewListWatchFromClient( clientset.CoreV1().RESTClient(), pods, v1.NamespaceAll, fields.Everything(), ), v1.Pod{}, time.Minute*10, // resync 周期 cache.Indexers{ // cache上的快速过滤器 byNode: func(obj interface{}) ([]string, error) { pod : obj.(*v1.Pod) return []string{pod.Spec.NodeName}, nil }}, ) podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { pod : obj.(*v1.Pod) fmt.Printf([DELETED] Pod: %s/%s\n, pod.Namespace, pod.Name) }, }) // 启动 informer (必须在独立goroutine中因为Run方法是同步方法) stopCh : make(chan struct{}) defer close(stopCh) go func() { fmt.Println(Starting PodInformer...) podInformer.Run(stopCh) // 同步方法会阻塞直到 stopCh 关闭 fmt.Println(PodInformer stopped) }() // 等待缓存同步就绪 if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) { panic(failed to wait for cache sync) } -stopCh }informer有resync机制 周期性重放数据目的是为业务提供补偿机会上面设置了10分钟重放周期, 0或者不设置则不重放。使用cache.NewListWatchFromClient设置了informer的local cache 开发者可以直接把local cache当成监听对象的集合,client-go会确保local cache正确反映当前的资源对象。informer.Run(stopCh)是一个同步的函数持续执行list-watch-cache-react这个引擎 在应用层面需要以子goroutine形式client-go另有informer工厂informerFactory.Start(stopCh) 内部也是启协程这里也要认识到信道stopCh在golang中的通信作用。为加快这个informer cache访问速度还可以给这个cache加上索引器Indexers, 后面可直接使用索引器访问cache。3.1 ☕ 依托答辩如果把业务需求都做在EventHandler里面长此以往会拉一坨大的。首先这是一个事件队列消费模型Add/Update/Delete变更事件是从一个叫deltaFIFO队列中pop出来的 既然是队列模型那么队列消费的高可用、高性能、可扩展问题就避免不了事件需要同步挨个处理否则控制器侧拿到的最终资源状态可能不对那么这种挨个处理也就谈不上高性能。但是应用又是多实例部署 多个informer都走同样的list-watch-cache-react流程 客观上围绕informer deltaFIIO又形成多生产者多消费者模型这种局面EventHandler就要考量幂等和资源一致性问题。队列常规的高可用考量① 消费者宕机时事件丢失 ② 消费失败如何重试重试又有幂等性问题informer有resync机制会对local cache中的资源构造onUpdate事件也会走EventHandler, 所以EventHandler做的很重会很麻烦。如果业务逻辑很重或者强依赖重试推荐上[workQueue](https://pkg.go.dev/k8s.io/client-gov0.35.0/util/workqueue), 支持以下功能公平按添加顺序处理元素元素去重单个item不会被并发消费多次如果一个item在消费前被多次添加它只会被消费一次多个消费者和生产者 支持消费时重排关闭通知4. controller 声明式实现controller的架构除了informer还提供了额外的工具帮助开发者高效感知最新的资源执行调谐工作。packageroleinformerEyes(watch and cache them)listerMemory(read from lcoal cache)workQueueTask list (reconcile work)type PodController struct { clientset kubernetes.Interface queue workqueue.TypedRateLimitingInterface[string] informer cache.SharedIndexInformer indexer cache.Indexer } func NewPodController(clientset kubernetes.Interface) *PodController { // 创建 Pod informer podInformer : cache.NewSharedIndexInformer( cache.NewListWatchFromClient( clientset.CoreV1().RESTClient(), pods, v1.NamespaceAll, fields.Everything(), ), v1.Pod{}, time.Minute*10, // resync 周期 cache.Indexers{ byNode: func(obj interface{}) ([]string, error) { pod : obj.(*v1.Pod) return []string{pod.Spec.NodeName}, nil }}, ) // 创建控制器 controller : PodController{ clientset: clientset, queue: workqueue.NewTypedRateLimitingQueue( workqueue.DefaultTypedControllerRateLimiter[string]( string), ), informer: podInformer, indexer: podInformer.GetIndexer(), } // 注册事件处理器 podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleAdd, UpdateFunc: controller.handleUpdate, DeleteFunc: controller.handleDelete, }) return controller }① Informers: The Eyes on the Clusterinformer是一个管道当发生变化时该informer管道可确保本地缓存local cache已更新任何已注册的事件处理程序(add/update/delete)都会收到通知informer不会直接调用控制器的Reconcile方法事件处理程序的唯一工作是将key (namespace/name)入队workQueue为什么使用key资源对象变化很快→键更稳定→更适合数据去重。func (c *PodController) handleAdd(obj interface{}) { key, err : cache.MetaNamespaceKeyFunc(obj) if err ! nil { klog.Errorf(Couldnt get key for object %v: %v, obj, err) return } c.queue.Add(key) }② key一旦入队就由workQueue接管去重重试、出队消费失败重排 队列行为增强系统可用性限速 弹性设计增强可用性和效率③ lister其实就是Indexers索引器使用lister从local cache 读取最新的资源状态不需要从api server读取最终执行控制器的Reconcile逻辑。通过workQueue将架构从基于资源事件的队列转换为基于资源的队列 。注意这个时候的workQueue有事件压缩的效果 在被消费之前如果该资源有多个变更事件只会保留首次入队更新时间戳。这和适合控制器的声明式设计Controller不关心对象如何到达当前状态只关心当前状态与期望状态是否一致并做动作使其一致。func (c *PodController) Run(workers int, stopCh -chan struct{}) { defer c.queue.ShutDown() klog.Info(Starting Pod controller) go c.informer.Run(stopCh) if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { klog.Error(Timed out waiting for caches to sync) return } for i : 0; i workers; i { go wait.Until(c.runWorker, time.Second, stopCh) } -stopCh klog.Info(Stopping Pod controller) } func (c *PodController) runWorker() { for c.processNextWorkItem() { } } func (c *PodController) processNextWorkItem() bool { key, quit : c.queue.Get() if quit { return false } defer c.queue.Done(key) err : c.syncPod(key) if err ! nil { klog.Errorf(Error syncing pod %s: %v, key, err) c.queue.AddRateLimited(key) return true } c.queue.Forget(key) return true } func main() { // 创建 clientset // config, _ : rest.InClusterConfig() homepath : homedir.HomeDir() kubeconfig : filepath.Join(homepath, .kube, config) config, err : clientcmd.BuildConfigFromFlags(, kubeconfig) // 集群外认证访问 apiserver if err ! nil { klog.Errorf(Error building kubeconfig: %v, err) } clientset, _ : kubernetes.NewForConfig(config) controller : NewPodController(clientset) stopCh : make(chan struct{}) controller.Run(3, stopCh) }4.1 屎上雕花上文产生的一坨大的是一个重度的事件处理行为sync到mysql并做出通知。咱就缺一个队列可以使用workQueue 但是本次需求要跟踪每次状态变更于是要规避workQueue的事件压缩效果。于是本次将(原资源key 资源版本 资源状态)整体作为入队元素。item : QueueItem{ Key: fmt.Sprintf(%s/%s, pod.Namespace, pod.Name), Version: pod.ResourceVersion, Status pod.Status }利用队列削峰填谷满足了业务的需求和高可用、可扩展要求。注意出队消费时仍要保证幂等操作 可采用资源key资源version) 作为幂等键实现幂等的判定。这就是某企业项目屎上雕花的经历 大家尽管喷 。 ️ ☕ ️新来的外包限流算法用的这么6新来的外包在大群分享了它的限流算法的实现大意了大家都会网关上的限流器两张大图一次性讲透k8s调度器工作原理Go动态感知资源变更的常规套路你指定用过Golang的文本模板你指定没用过幂等的双份请求双倍快乐本文文字原创搁笔常恐意味尽愿闻读者金玉声 ”永久更新“地址见原文。点“赞”戳“在看”