91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

使用celery怎么實現集群管理

發布時間:2021-06-24 17:41:51 來源:億速云 閱讀:413 作者:Leah 欄目:云計算

本篇文章給大家分享的是有關使用celery怎么實現集群管理,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

架構:

使用celery怎么實現集群管理

這里作為例子的celery app為myapp:

root@workgroup0:~/celeryapp# ls myapp
agent.py   celery.py   config.py   __init__.py
root@workgroup0:~/celeryapp#

公用代碼部分:

celery.py:(備注:172.16.77.175是任務發布節點的ip地址)

from __future__ import absolute_import
from celery import Celery
app = Celery('myapp',
             broker='amqp://guest@172.16.77.175//',
             backend='amqp://guest@172.16.77.175//',
             include=['myapp.agent'])

app.config_from_object('myapp.config')

if __name__ == '__main__':
  app.start()

config.py:

from __future__ import absolute_import
from kombu import Queue,Exchange
from datetime import timedelta

CELERY_TASK_RESULT_EXPIRES=3600
CELERY_TASK_SERIALIZER='json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_RESULT_SERIALIZER='json'

CELERY_DEFAULT_EXCHANGE = 'agent'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'

CELERT_QUEUES =  (
  Queue('machine1',exchange='agent',routing_key='machine1'),
  Queue('machine2',exchange='agent',routing_key='machine2'),
)

__init__.py:(空白)

任務發布節點的agent.py:

from __future__ import absolute_import
from myapp.celery import app

@app.task
def add(x,y):
    return {'the value is ':str(x+y)}

@app.task
def writefile():
    out=open('/tmp/data.txt','w')
    out.write('hello'+'\n')
    out.close()

@app.task
def mul(x,y):
    return x*y

@app.task
def xsum(numbers):
    return sum(numbers)

@app.task
def getl(stri):
    return getlength(stri)


def getlength(stri):
    return len(stri)

docker1上的agent.py:

from __future__ import absolute_import
from myapp.celery import app

@app.task
def add(x,y):
    return {'value':str(x+y),'node_name':'docker1'}                   #增加了node_name用來識別節點

@app.task
def writefile():
    out=open('/tmp/data.txt','w')
    out.write('hello'+'\n')
    out.close()

@app.task
def mul(x,y):
    return x*y

@app.task
def xsum(numbers):
    return sum(numbers)

@app.task
def getl(stri):
    return getlength(stri)


def getlength(stri):
    return len(stri)

docker2上的:

from __future__ import absolute_import
from myapp.celery import app

@app.task
def add(x,y):
    return {'value':str(x+y),'node_name':'docker2'}

@app.task
def writefile():
    out=open('/tmp/data.txt','w')
    out.write('hello'+'\n')
    out.close()

@app.task
def mul(x,y):
    return x*y

@app.task
def xsum(numbers):
    return sum(numbers)

@app.task
def getl(stri):
    return getlength(stri)


def getlength(stri):
    return len(stri)

在這個例子中我只測試add()函數:

在docker1節點上啟動worker:(用-Q指定監聽的queue)

root@workgroup1:~/celeryapp# celery -A myapp worker -l info -Q machine1
/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which is
absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@workgroup1.hzg.com v3.1.17 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         myapp:0x7f472d73f190
- ** ---------- .> transport:   amqp://guest:**@172.16.77.175:5672//
- ** ---------- .> results:     amqp://guest@172.16.77.175//
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> machine1         exchange=machine1(direct) key=machine1
                

[tasks]
  . myapp.agent.add
  . myapp.agent.getl
  . myapp.agent.mul
  . myapp.agent.writefile
  . myapp.agent.xsum

[2015-10-18 15:07:51,313: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672//
[2015-10-18 15:07:51,340: INFO/MainProcess] mingle: searching for neighbors
[2015-10-18 15:07:52,372: INFO/MainProcess] mingle: sync with 1 nodes
[2015-10-18 15:07:52,374: INFO/MainProcess] mingle: sync complete
[2015-10-18 15:07:52,423: WARNING/MainProcess] celery@workgroup1.hzg.com ready.

啟動docker2上的worker:

root@workgroup2:~/celeryapp# celery -A myapp worker -l info -Q machine2
/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which is
absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@workgroup2.hzg.com v3.1.18 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         myapp:0x7f708cb8ec10
- ** ---------- .> transport:   amqp://guest:**@172.16.77.175:5672//
- ** ---------- .> results:     amqp://guest@172.16.77.175//
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> machine2         exchange=machine2(direct) key=machine2
                

[tasks]
  . myapp.agent.add
  . myapp.agent.getl
  . myapp.agent.mul
  . myapp.agent.writefile
  . myapp.agent.xsum

[2015-10-18 15:08:52,114: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672//
[2015-10-18 15:08:52,144: INFO/MainProcess] mingle: searching for neighbors
[2015-10-18 15:08:53,174: INFO/MainProcess] mingle: sync with 1 nodes
[2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete
[2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.

在任務發布節點發布一個計算任務給docker1:

root@workgroup0:~/celeryapp# ls
default.etcd  hots.sh  hotswap.py  myapp  myapp1tmp  people.db  resp  sora  test.py
root@workgroup0:~/celeryapp# python
Python 2.7.6 (default, Mar 22 2014, 22:59:56) 
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from myapp.agent import add
>>> res = add.apply_async(args=[122,34],queue='machine1',routing_key='machine1')
>>> res.get()
{u'value': u'156', u'node_name': u'docker1'}

用get()可以看到來自docker1的返回,再看看docker1的顯示:

[2015-10-18 15:11:51,217: INFO/MainProcess] Task myapp.agent.add[c487a9a2-e5cc-462b-a131-784b363a1952] succeeded in 0.03602907s: {'value': '156', 'node_name': 'docker1'}

至于docker2,一點沒動:

[2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete
[2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.

發布一個任務給docker2:

>>> res = add.apply_async(args=[1440,900],queue='machine2',routing_key='machine2')
>>> res.get()
{u'value': u'2340', u'node_name': u'docker2'}
>>>

以上就是使用celery怎么實現集群管理,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

桐乡市| 庆云县| 福贡县| 交口县| 彭州市| 宜宾县| 文成县| 荃湾区| 咸丰县| 温宿县| 杨浦区| 徐水县| 海安县| 龙川县| 本溪| 鄂尔多斯市| 蒙阴县| 阿图什市| 沙河市| 濉溪县| 兰溪市| 舒城县| 克什克腾旗| 土默特左旗| 清远市| 莆田市| 鞍山市| 博乐市| 双柏县| 德州市| 同江市| 乌拉特前旗| 如东县| 青铜峡市| 文昌市| 周宁县| 兴和县| 锦州市| 文水县| 江陵县| 伊宁市|