91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

machinery中TaskProcessor的用法

發布時間:2021-07-12 15:42:36 來源:億速云 閱讀:203 作者:chen 欄目:編程語言

這篇文章主要介紹“machinery中TaskProcessor的用法”,在日常操作中,相信很多人在machinery中TaskProcessor的用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”machinery中TaskProcessor的用法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

本文主要研究一下machinery的TaskProcessor

TaskProcessor

// TaskProcessor - can process a delivered task
// This will probably always be a worker instance
type TaskProcessor interface {
	Process(signature *tasks.Signature) error
	CustomQueue() string
	PreConsumeHandler() bool
}

TaskProcessor接口定義了Process、CustomQueue、PreConsumeHandler方法

Worker

// Worker represents a single worker process
type Worker struct {
	server            *Server
	ConsumerTag       string
	Concurrency       int
	Queue             string
	errorHandler      func(err error)
	preTaskHandler    func(*tasks.Signature)
	postTaskHandler   func(*tasks.Signature)
	preConsumeHandler func(*Worker) bool
}

// CustomQueue returns Custom Queue of the running worker process
func (worker *Worker) CustomQueue() string {
	return worker.Queue
}

// Process handles received tasks and triggers success/error callbacks
func (worker *Worker) Process(signature *tasks.Signature) error {
	// If the task is not registered with this worker, do not continue
	// but only return nil as we do not want to restart the worker process
	if !worker.server.IsTaskRegistered(signature.Name) {
		return nil
	}

	taskFunc, err := worker.server.GetRegisteredTask(signature.Name)
	if err != nil {
		return nil
	}

	// Update task state to RECEIVED
	if err = worker.server.GetBackend().SetStateReceived(signature); err != nil {
		return fmt.Errorf("Set state to 'received' for task %s returned error: %s", signature.UUID, err)
	}

	// Prepare task for processing
	task, err := tasks.NewWithSignature(taskFunc, signature)
	// if this failed, it means the task is malformed, probably has invalid
	// signature, go directly to task failed without checking whether to retry
	if err != nil {
		worker.taskFailed(signature, err)
		return err
	}

	// try to extract trace span from headers and add it to the function context
	// so it can be used inside the function if it has context.Context as the first
	// argument. Start a new span if it isn't found.
	taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name)
	tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature)
	task.Context = opentracing.ContextWithSpan(task.Context, taskSpan)

	// Update task state to STARTED
	if err = worker.server.GetBackend().SetStateStarted(signature); err != nil {
		return fmt.Errorf("Set state to 'started' for task %s returned error: %s", signature.UUID, err)
	}

	//Run handler before the task is called
	if worker.preTaskHandler != nil {
		worker.preTaskHandler(signature)
	}

	//Defer run handler for the end of the task
	if worker.postTaskHandler != nil {
		defer worker.postTaskHandler(signature)
	}

	// Call the task
	results, err := task.Call()
	if err != nil {
		// If a tasks.ErrRetryTaskLater was returned from the task,
		// retry the task after specified duration
		retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
		if ok {
			return worker.retryTaskIn(signature, retriableErr.RetryIn())
		}

		// Otherwise, execute default retry logic based on signature.RetryCount
		// and signature.RetryTimeout values
		if signature.RetryCount > 0 {
			return worker.taskRetry(signature)
		}

		return worker.taskFailed(signature, err)
	}

	return worker.taskSucceeded(signature, results)
}

//SetPreConsumeHandler sets a custom handler for the end of a job
func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) {
	worker.preConsumeHandler = handler
}

Worker實現了TaskProcessor接口,其Process方法先通過worker.server.GetRegisteredTask獲取taskFunc,然后通過signature更新state為RECEIVED,之后設置為STARTED,之后執行task.Call(),最后根據結果更新task為failed或者success

小結

machinery的TaskProcessor接口定義了Process、CustomQueue、PreConsumeHandler方法。Worker實現了TaskProcessor接口,其Process方法先通過worker.server.GetRegisteredTask獲取taskFunc,然后通過signature更新state為RECEIVED,之后設置為STARTED,之后執行task.Call(),最后根據結果更新task為failed或者success。

到此,關于“machinery中TaskProcessor的用法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

盐城市| 襄垣县| 黔江区| 张北县| 城固县| 岢岚县| 远安县| 平果县| 府谷县| 巧家县| 宜州市| 沈丘县| 交口县| 河曲县| 宽城| 个旧市| 龙山县| 稻城县| 德州市| 肥东县| 武宣县| 育儿| 龙泉市| 宁强县| 常熟市| 佛冈县| 社会| 龙海市| 紫金县| 祁连县| 滁州市| 兴化市| 吴旗县| 平顶山市| 巴林左旗| 岗巴县| 乌兰浩特市| 大足县| 珠海市| 犍为县| 岑溪市|