您好,欢迎来到锐游网。
搜索
您的当前位置:首页golang实现任务调度器dispatcher

golang实现任务调度器dispatcher

来源:锐游网
package dispatcher

type Job interface {
	Do() error
}

var JobQueue chan Job
var DispatcherInstance *Dispatcher

type worker struct {
	workerId    int
	workerQueue chan Job
	quit        chan bool
}

func (w *worker) Start() {
	go func() {
		for {
			DispatcherInstance.idleWorker <- w.workerId
			select {
			case job := <-w.workerQueue:
				job.Do()
			case <-w.quit:
				return
			}
		}
	}()
}
func (w worker) Stop() {
	go func() {
		w.quit <- true
	}()
}

type Dispatcher struct {
	Name       string
	maxWorkers int
	idleWorker chan int
	workerList []worker
}

func New(maxWorkerNum, maxQueueNum int) *Dispatcher {
	JobQueue = make(chan Job, maxQueueNum)

	DispatcherInstance = &Dispatcher{
		idleWorker: make(chan int, maxWorkerNum),
		Name:       "Dispatcher001",
		maxWorkers: maxWorkerNum,
	}

	return DispatcherInstance
}

func (d *Dispatcher) Run() {
	d.workerList = make([]worker, d.maxWorkers)
	for i := 0; i < d.maxWorkers; i++ {
		d.workerList[i] = worker{
			workerId:    i,
			workerQueue: make(chan Job),
			quit:        make(chan bool),
		}

		d.workerList[i].Start()
	}
	go func() {
		for {
			select {
			case job := <-JobQueue:
				workerId := <-d.idleWorker
				d.workerList[workerId].workerQueue <- job
			}
		}
	}()
}

func (d *Dispatcher) Dispatch(job Job) {
	JobQueue <- job
}

func (d *Dispatcher) Close() {
	for _, w := range d.workerList {
		w.Stop()
	}
}

使用

package dispatcher

import (
	"log"
	"strconv"
	"time"
)

func Run() {
	dispatcher := New(10, 100)
	dispatcher.Run()

	for i := 1; i <= 100; i++ {
		dispatcher.Dispatch(Dog{
			Name: "dog" + strconv.Itoa(i),
		})
	}

	time.Sleep(time.Second * 5)
}

type Dog struct {
	Name string
}

func (d Dog) Do() error {
	log.Println("Dog Eat", d.Name)
	return nil
}

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- ryyc.cn 版权所有

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务