91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python如何實現查詢bucket已用量腳本

發布時間:2021-11-08 10:03:25 來源:億速云 閱讀:232 作者:小新 欄目:云計算

小編給大家分享一下python如何實現查詢bucket已用量腳本,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

目前僅支持ceph的s3方案,具體配置看說明

# -*- coding: utf-8 -*-
import requests
import json
from email.utils import formatdate
import hmac


py3k = False
from hashlib import sha1 as sha
try:
    from urlparse import urlparse
    from base64 import encodestring

except:
    py3k = True
    from urllib.parse import urlparse
    from base64 import encodebytes as encodestring


class AuthBase(object):
    """Base class that all auth implementations derive from"""

    def __call__(self, r):
        raise NotImplementedError('Auth hooks must be callable.')


class S3Auth(AuthBase):

    """Attaches AWS Authentication to the given Request object."""

    service_base_url = 's3.amazonaws.com'
    # List of Query String Arguments of Interest
    special_params = [
        'acl', 'location', 'logging', 'partNumber', 'policy', 'requestPayment',
        'torrent', 'versioning', 'versionId', 'versions', 'website', 'uploads',
        'uploadId', 'response-content-type', 'response-content-language',
        'response-expires', 'response-cache-control', 'delete', 'lifecycle',
        'response-content-disposition', 'response-content-encoding'
    ]

    def __init__(self, access_key, secret_key, service_url=None):
        if service_url:
            self.service_base_url = service_url
        self.access_key = str(access_key)
        self.secret_key = str(secret_key)
        self.au =""

    def __call__(self, r):
        # Create date header if it is not created yet.
        if not 'date' in r.headers and not 'x-amz-date' in r.headers:
            r.headers['date'] = formatdate(
                timeval=None,
                localtime=False,
                usegmt=True)
        signature = self.get_signature(r)
        if py3k:
            signature = signature.decode('utf-8')
        r.headers['Authorization'] = 'AWS %s:%s' % (self.access_key, signature)
        self.au = r.headers
        # print self.au
        return r

    def get_signature(self, r):
        canonical_string = self.get_canonical_string(
            r.url, r.headers, r.method)
        if py3k:
            key = self.secret_key.encode('utf-8')
            msg = canonical_string.encode('utf-8')
        else:
            key = self.secret_key
            msg = canonical_string
        h = hmac.new(key, msg, digestmod=sha)
        return encodestring(h.digest()).strip()

    def get_canonical_string(self, url, headers, method):
        parsedurl = urlparse(url)
        objectkey = parsedurl.path[1:]
        query_args = sorted(parsedurl.query.split('&'))

        bucket = parsedurl.netloc[:-len(self.service_base_url)]
        if len(bucket) > 1:
            # remove last dot
            bucket = bucket[:-1]

        interesting_headers = {
            'content-md5': '',
            'content-type': '',
            'date': ''}
        for key in headers:
            lk = key.lower()
            try:
                lk = lk.decode('utf-8')
            except:
                pass
            if headers[key] and (lk in interesting_headers.keys() or lk.startswith('x-amz-')):
                interesting_headers[lk] = headers[key].strip()

        # If x-amz-date is used it supersedes the date header.
        if not py3k:
            if 'x-amz-date' in interesting_headers:
                interesting_headers['date'] = ''
        else:
            if 'x-amz-date' in interesting_headers:
                interesting_headers['date'] = ''

        buf = '%s\n' % method
        for key in sorted(interesting_headers.keys()):
            val = interesting_headers[key]
            if key.startswith('x-amz-'):
                buf += '%s:%s\n' % (key, val)
            else:
                buf += '%s\n' % val

        # append the bucket if it exists
        if bucket != '':
            buf += '/%s' % bucket

        # add the objectkey. even if it doesn't exist, add the slash
        buf += '/%s' % objectkey

        params_found = False

        # handle special query string arguments
        for q in query_args:
            k = q.split('=')[0]
            if k in self.special_params:
                if params_found:
                    buf += '&%s' % q
                else:
                    buf += '?%s' % q
                params_found = True
        return buf

class S3Admin():
    def __init__(self):
        self.access_key = '' #填access_key
        self.secret_key = '' #填secret_key
        self.endpoint = 's3.ceph.work'  #填endpoint

    def get_bucket_usage(self,bucketname):
        #url = 'http://%s/%s/' % (self.endpoint, bucketname) #path style
        url = 'http://%s.%s/' % (bucketname,self.endpoint) #virtual hosted styple
        r = requests.head(url, auth=S3Auth(self.access_key, self.secret_key, self.endpoint))
        print r.headers
        return r.headers


s3client = S3Admin()
bucket_name= 'xxx'  #替換成相應的bucket名稱
result = s3client.get_bucket_usage(bucket_name)
print 'objects_num= %s , total_Bytes_Used= %s ' % (result['X-RGW-Object-Count'],result['X-RGW-Bytes-Used'])
#注意 objects_num 為當前bucket內的object數量,total_Bytes_Used為當前bucket內的已用容量(單位為Byte)

以上是“python如何實現查詢bucket已用量腳本”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

轮台县| 荥阳市| 阿克苏市| 康平县| 河间市| 永昌县| 皮山县| 当阳市| 平顺县| 富源县| 岐山县| 左云县| 定南县| 渭源县| 天祝| 奉节县| 中卫市| 阜城县| 无为县| 沐川县| 洛扎县| 商南县| 西昌市| 米泉市| 宜良县| 泰安市| 怀宁县| 唐海县| 卓尼县| 连江县| 阜城县| 化州市| 辽宁省| 隆回县| 翁牛特旗| 云阳县| 罗城| 太仓市| 文山县| 女性| 开封县|