91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

基于redis樂觀鎖怎么實現并發排隊

發布時間:2022-12-27 16:26:29 來源:億速云 閱讀:135 作者:iii 欄目:開發技術

這篇“基于redis樂觀鎖怎么實現并發排隊”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“基于redis樂觀鎖怎么實現并發排隊”文章吧。

有個需求場景是這樣的,使用redis控制scrapy運行的數量。當系統的后臺設置為4時,只允許scapry啟動4個任務,多余的任務則進行排隊。

概況

最近做了一個django + scrapy + celery + redis 的爬蟲系統,客戶購買的主機除了跑其他程序外,還要跑我開發的這套程序,所以需要手動控制scrapy的實例數量,避免過多的爬蟲給系統造成負擔。

流程設計

1、爬蟲任務由用戶以請求的方式發起,所有的用戶的請求統一進入到celery進行排隊;
2、任務數量控制的執行就交給reids,經由celery保存到redis,包含了爬蟲啟動所需要的必要信息,從redis取一條信息即可啟動一個爬蟲;
3、通過scrapyd的接口來獲取當前在運行的爬蟲數量,以便決定下一步流程:如果小于4,則從redis中取相應數量的信息來啟動爬蟲,如果大于等于4,則繼續等待;
4、如果在運行爬蟲的數量有所減少,則及時從reids中取相應數量的信息來啟動爬蟲。

代碼實現

業務代碼有點復雜和啰嗦,此處使用偽代碼來演示

import redis

# 實例化一個redis連接池
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True, db=4, password='')

r = redis.Redis(connection_pool=pool)
# 爬蟲實例限制為4 即只允許4個scrapy實例在運行
limited = 4

# 聲明redis的樂觀鎖
lock = r.Lock()

# lock.acquire中有while循環,即它會線程阻塞,直到當前線程獲得redis的lock,才會繼續往下執行代碼
if lock.acquire():
	# 1、從reids中取一條爬蟲信息
	info = redis.get() 
	
	# 2、while循環監聽爬蟲運行的數量
	while True:
		req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
		# 統計當前有多少個爬蟲在運行
		running = req.get('running') + req.get('pending')
		
		# 3、判斷是否等待還是要增加爬蟲數量
		# 3.1 如果在運行的數量大于等于設置到量 則繼續等待
		if running >= limited:
			continue
		
		# 3.2 如果小于 則啟動爬蟲
		start_scrapy(info)
		# 3.3 將info從redis中刪除
		redis.delete(info)
		# 3.4 釋放鎖
		lock.release()
		break		

當前,這只是偽代碼而已,實際的業務邏輯可能是非常復雜的,如:

@shared_task
def scrapy_control(key_uuid):

    r = redis.Redis(connection_pool=pool)
    db = MysqlDB()
    speed_limited = db.fetch_config('REPTILE_SPEED')
    speed_limited = int(speed_limited[0])

    keywords_num = MysqlDB().fetch_config('SEARCH_RANDOM')
    keywords_num = int(keywords_num[0])


    # while True:
    lock = r.lock('lock')
    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 進入處理環節' +  '\n')
    try:
        # acquire默認阻塞 如果獲取不到鎖時 會一直阻塞在這個函數的while循環中
        if lock.acquire():
            with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 獲得鎖' +  '\n')
            # 1 從redis中獲取信息
            redis_obj = json.loads(r.get(key_uuid))
            user_id = redis_obj.get('user_id')
            contents = redis_obj.get('contents')
            
            # 2 使用while循環處理核心邏輯          
            is_hold_print = True
            while True:
                req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
                running = req.get('running') + req.get('pending')
                # 3 如果仍然有足夠的爬蟲在運行 則hold住redis鎖,等待有空余的爬蟲位置讓出
                if running >= speed_limited:
                    if is_hold_print:
                        with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 爬蟲在運行,線程等待中' +  '\n')
                        is_hold_print = False
                    time.sleep(1)
                    continue
                
                # 4 有空余的爬蟲位置 則往下走
                # 4.1 處理完所有的內容后 釋放鎖
                if len(contents) == 0:
                    r.delete(key_uuid)
                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 任務已完成,從redis中刪除' +  '\n')
                    lock.release()
                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 釋放鎖' +  '\n')
                    break

                # 4.2 創建task任務
                task_uuid = str(uuid.uuid4())
                article_obj = contents.pop()
                article_id = article_obj.get('article_id')
                article = article_obj.get('content')
                try:
                    Task.objects.create(
                        task_uuid = task_uuid,
                        user_id = user_id,
                        article_id = article_id,
                        content = article
                    )
                except Exception as e:
                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + '->' + str(task_uuid) + ' 創建Task出錯: ' + str(e) +  '\n')
                # finally:
                # 4.3 啟動爬蟲任務 即便創建task失敗也會啟動
                try:
                    task_chain(user_id, article, task_uuid, keywords_num)
                except Exception as e:
                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 啟動任務鏈失敗: ' + str(e) +  '\n')
                
                # 加入sleep 防止代碼執行速度快于爬蟲啟動速度而導致當前線程啟動額外的爬蟲
                time.sleep(5)

    except Exception as e:
        with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 獲得鎖之后的操作出錯: ' + str(e) +  '\n')
        lock.release()

小坑
scrapy啟動速度相對較慢,所以while循環中,代碼中執行到了爬蟲的啟動,需要sleep一下再去通過scrapyd接口獲取爬蟲運行的數量,如果立刻讀取,可能會造成誤判。

以上就是關于“基于redis樂觀鎖怎么實現并發排隊”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

扶余县| 金阳县| 张家港市| 黑河市| 满洲里市| 中西区| 大方县| 阳江市| 四会市| 泗阳县| 南澳县| 南开区| 清远市| 黄大仙区| 鹰潭市| 金平| 五河县| 阳山县| 策勒县| 呼和浩特市| 临安市| 泰顺县| 荆州市| 平江县| 阳信县| 芦溪县| 鄂伦春自治旗| 宜宾市| 金塔县| 香格里拉县| 平乡县| 且末县| 新宁县| 和顺县| 武威市| 昌乐县| 建阳市| 朝阳区| 莒南县| 武乡县| 茌平县|