91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

zookeeper(15)源碼分析-服務器(2)

發布時間:2020-08-01 10:47:07 來源:網絡 閱讀:321 作者:shayang88 欄目:編程語言

LearnerZooKeeperServer是所有Follower和Observer的父類,在LearnerZooKeeperServer里有2個重要的屬性:
//提交請求處理器
protected CommitProcessor commitProcessor;
//同步處理器
protected SyncRequestProcessor syncProcessor;

FollowerZooKeeperServer和ObserverZooKeeperServer都繼承了LearnerZooKeeperServer服務器

1、FollowerZooKeeperServer

1.1、類屬性

    //待同步的請求
    ConcurrentLinkedQueue<Request> pendingSyncs;
    //待處理的事務請求
    LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();

1.2、核心函數

1.2.1、setupRequestProcessors

構建請求處理鏈,FollowerZooKeeperServer的請求處理鏈是:
FollowerRequestProcessor -> CommitProcessor ->FinalRequestProcessor

@Override
    protected void setupRequestProcessors() {
        //最后的處理器
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        //第二個處理器
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        //第一個請求處理器FollowerRequestProcessor
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }

1.2.2、logRequest

該函數將請求進行記錄(放入到對應的隊列中),等待處理。

public void logRequest(TxnHeader hdr, Record txn) {
        Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
        //zxid不等于0,說明此服務器已經處理過請求
        if ((request.zxid & 0xffffffffL) != 0) {
            // 將該請求放入pendingTxns中,等待事務處理
            pendingTxns.add(request);
        }
        // 使用SyncRequestProcessor處理請求(其會將請求放在隊列中,異步進行處理)
        syncProcessor.proce***equest(request);
    }

1.2.3、commit

函數會提交zxid對應的請求(pendingTxns的隊首元素),其首先會判斷隊首請求對應的zxid是否為傳入的zxid,然后再進行移除和提交(放在committedRequests隊列中)。

public void commit(long zxid) {
        // 沒有還在等待處理的事務
        if (pendingTxns.size() == 0) {
            LOG.warn("Committing " + Long.toHexString(zxid)
                    + " without seeing txn");
            return;
        }
        // 隊首元素的zxid
        long firstElementZxid = pendingTxns.element().zxid;
        // 如果隊首元素的zxid不等于需要提交的zxid,則退出程序
        if (firstElementZxid != zxid) {
            LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
                    + " but next pending txn 0x"
                    + Long.toHexString(firstElementZxid));
            System.exit(12);
        }
        // 從待處理事務請求隊列中移除隊首請求
        Request request = pendingTxns.remove();
        // 提交該請求
        commitProcessor.commit(request);
    }

2、ObserverZooKeeperServer

2.1、類屬性

// 同步處理器是否可用,系統參數控制
private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
// 待同步請求隊列
ConcurrentLinkedQueue<Request> pendingSyncs = 
        new ConcurrentLinkedQueue<Request>();

2.2、核心方法

2.2.1、setupRequestProcessors

構建請求處理鏈,ObserverZooKeeperServer的請求處理鏈是:ObserverRequestProcessor->CommitProcessor->FinalRequestProcessor,可能會存在SyncRequestProcessor。

@Override
    protected void setupRequestProcessors() {
        // We might consider changing the processor behaviour of
        // Observers to, for example, remove the disk sync requirements.
        // Currently, they behave almost exactly the same as followers.
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
        ((ObserverRequestProcessor) firstProcessor).start();

        /*
         * Observer should write to disk, so that the it won't request
         * too old txn from the leader which may lead to getting an entire
         * snapshot.
         *
         * However, this may degrade performance as it has to write to disk
         * and do periodic snapshot which may double the memory requirements
         */
             //是否使用同步處理器,看系統參數配置,會影響性能
        if (syncRequestProcessorEnabled) {
            syncProcessor = new SyncRequestProcessor(this, null);
            syncProcessor.start();
        }
    }

2.2.2、commitRequest

同步處理器可用,則使用同步處理器進行處理(放入同步處理器的queuedRequests隊列中),然后提交請求(放入提交請求處理器的committedRequests隊列中)

public void commitRequest(Request request) {
        if (syncRequestProcessorEnabled) {
            // Write to txnlog and take periodic snapshot
                        //寫事務日志,并定期快照
            syncProcessor.proce***equest(request);
        }
        commitProcessor.commit(request);
    }
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

东山县| 泰来县| 抚州市| 石狮市| 清新县| 阿合奇县| 巴中市| 台江县| 蓝山县| 盐边县| 嘉禾县| 芜湖县| 思茅市| 江城| 临朐县| 射阳县| 安泽县| 高雄县| 雷州市| 洛扎县| 古浪县| 永康市| 河池市| 蕲春县| 大新县| 怀宁县| 靖边县| 合水县| 渑池县| 肇州县| 砚山县| 安宁市| 嘉祥县| 江城| 商城县| 肇源县| 九寨沟县| 文山县| 定襄县| 藁城市| 高唐县|