91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

七、HDFS上傳和下載原理(有源碼解析)

發布時間:2020-06-05 17:20:20 來源:網絡 閱讀:1292 作者:隔壁小白 欄目:大數據

[TOC]

一、HDFS文件上傳基本原理

1、基本流程

七、HDFS上傳和下載原理(有源碼解析)

1)客戶端通過本地通過RPC與namenode建立rpc通信,然后請求上傳文件
2)namenode收到請求后,會檢查是否能創建該文件(比如校驗用戶是否有該權限,文件是否已經存在等)。如果檢查通過,namenode就會開始記錄該新文件的元信息(先寫入到edits文件,然后更新內存中的metadata),并響應client可以開始上傳。
3)client 在本地將文件進行切塊(按照指定的block大小)。然后請求namemode上傳第一個block。
4)namenode根據策略以及每個datanode的情況,返回3個datanode地址給client(這里默認3副本)。
5)client與請求namenode返回的3個datanode建立pipeline,即 client請求dn1,dn1請求dn2,dn2請求dn3,這樣一個串行通道。
6)3個datanode逐級響應,最終響應給client。表示可以傳輸數據
7)client會將每個block還會分割成一個個packet,然后放入 data queue中,等待上傳。每傳輸一個packet,就會將packet加入到另外一個 ack queue中,等到pipeline中的datanode響應傳輸完成后,就會講相應的packet從ack queue中移除。
8)后面就是重復上面的流程,直到client關閉通道,并將所有的queue中的packet刷寫到pipeline中之后,datanode就會標記文件已完成。

注意:client完成寫入之后,此時block 才是可見的,正在寫的block是不可見的。當調用sync方法時(將緩沖區數據刷寫到磁盤中),client才確認寫入已經完成。client關閉流時調用 的close方法,底層就會調用sync。是否需要手動調用取決你根據程序需 要在數據健壯性和吞吐率之間的權衡。

2、datanode發生錯誤的解決方式

問題:傳輸過程中,某個datanode發生錯誤,hdfs是怎么解決?
1)pipeline關閉掉
2)為了防止丟包,ack queue中的packet會同步到data queue中。重新進行下一次傳輸。
3)把產生錯誤的datanode上當前在寫,但未完成的block刪除掉
4)剩下的block寫到剩余兩個正常的datanode中。
5)namenode會自動尋找另外合適的一個datanode復制另外兩個datanode中刷寫的block,完成3副本的寫入。當然,這個操作namenode的內部機制,對client來說是無感知的。

3、元數據存儲

namenode使用兩種文件保存元數據,fsimag和edits文件。
fsimage:元數據鏡像文件,存儲某一時間段內的namenode的內存元數據信息
edits:操作日志文件。
fstime:保存最近一次checkpoint的時間。
更詳細的 fsimage和edits文件講解,請看 “hdfs體系架構”

4、元數據的合并

? namenode所有的元數據信息從啟動時就已經全部加載到內存中(為了提高查詢性能),用于處理讀請求的查詢操作。到有寫操作時,namenode會先向edits文件中寫入操作日志,完成后才會修改內存中的metadata,這個主要是保證元數據已經存儲到磁盤中不丟失。
? hdfs內部維護的fsimage文件其實就是內存中的metadata的鏡像,但是兩者并不是實時一致的。fsimage的更新是通過合并edits來實現的。而這個合并操作是 secondaryNameNode完成的,主要流程如下:
七、HDFS上傳和下載原理(有源碼解析)

1)首先是 SNN通知 NN切換edits文件,主要是保證合并過程有新的寫入操作時能夠正常寫入edits文件。
2)SNN通過http請求從NN獲取 fsimage和edits文件。
3)SNN將fsiamge載入內存,開始合并edits到fsimage,生成新的fsimage
4)SNN將新的fsimage發送給NN
5)NN用新的fsimage,替換舊的fsimage。

4、寫入時的網絡拓撲選擇

? 寫入操作時,默認3副本,那么副本分布在哪些datanode節點上,會影響寫入速度。在hdfs的網絡拓撲中,有那么四種物理范圍,同一節點、同一機架上的不同節點、同一機房中不同節點、不同機房中的不同節點。這4中物理范圍表示節點間的距離逐漸增大。這種物理距離越遠會影響副本之間所在節點之間的傳輸效率,即傳輸效率越低。

5、機架感知

上面說到副本的選擇的節點的位置會影響寫效率,那么hdfs是如何選擇節點位置的。
(1)舊版本的方式
七、HDFS上傳和下載原理(有源碼解析)

路徑是 r1/n1 --> r2/n1 --> r2/n2

(2)新版本方式
七、HDFS上傳和下載原理(有源碼解析)

路徑是 r1/n1 --> r1/n2 --> r2/n2(后面這個其實任意都行,主要處于不同機架就好)
這種方式比第一種要好,因為這種方式數據經過的總路徑更短了,只要一個副本需要跨機架傳輸,而上面的則有兩個副本需要跨機架傳輸。

二、HDFS上傳文件源碼分析

下面的分析過程基于 hadoop2.8.4 的源碼分析的。

1、client初始化源碼分析

一般來說,會先通過 FileSystem.get() 獲取到操作hdfs 的客戶端對象,后面所有的操作都通過調用該對象的方法完成的。

FileSystem client = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf);

接著我們看看 FileSystem.get() 的實現

public static FileSystem get(URI uri, Configuration conf) throws IOException {
        String scheme = uri.getScheme();
        String authority = uri.getAuthority();
        if (scheme == null && authority == null) {
            return get(conf);
        } else {
            if (scheme != null && authority == null) {
                URI defaultUri = getDefaultUri(conf);
                if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
                    return get(defaultUri, conf);
                }
            }

            String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);

  /*
  這里是關鍵代碼,表示進入 CACHE.get() 方法
  */
            return conf.getBoolean(disableCacheName, false) ? createFileSystem(uri, conf) : CACHE.get(uri, conf);
        }
    }

CACHE是FileSystem的一個靜態內部類Cache 的對象。繼續看看 CACHE.get()方法

FileSystem get(URI uri, Configuration conf) throws IOException {
            FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf);
            //進入CACHE對象的 getInternal() 方法
            return this.getInternal(uri, conf, key);
        }

進入CACHE對象的 getInternal() 方法

 private FileSystem getInternal(URI uri, Configuration conf, FileSystem.Cache.Key key) throws IOException {
            FileSystem fs;
            synchronized(this) {
            /*
            獲取map中的filesytem對象,表示之前已經初始化了filesystem對象,并存儲到map集合中,現在直接從map中獲取就好。      
            */
                fs = (FileSystem)this.map.get(key);
            }

            if (fs != null) {
                //如果fs存在,就直接返回存在的filesytem實例即可
                return fs;
            } else {
            //如果是初次使用filesystem,就得創建并初始化
                fs = FileSystem.createFileSystem(uri, conf);
                synchronized(this) {
                    FileSystem oldfs = (FileSystem)this.map.get(key);
                    if (oldfs != null) {
                        fs.close();
                        return oldfs;
                    } else {
                        if (this.map.isEmpty() && !ShutdownHookManager.get().isShutdownInProgress()) {
                            ShutdownHookManager.get().addShutdownHook(this.clientFinalizer, 10);
                        }

                        fs.key = key;
                        this.map.put(key, fs);
                        if (conf.getBoolean("fs.automatic.close", true)) {
                            this.toAutoClose.add(key);
                        }

                        return fs;
                    }
                }
            }
        }

我們看到了上面有兩種方式,一種是如果filesytem對象已存在,那么直接從map獲取并返回對象即可。如果不存在,就調用 FileSystem.createFileSystem() 方法創建,創建完成后返回fs。下面看看這個方法.

private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
        Tracer tracer = FsTracer.get(conf);
        TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
        scope.addKVAnnotation("scheme", uri.getScheme());

        FileSystem var6;
        try {
            Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
            FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);

            //這是關鍵性的代碼,看名字就知道,對filesytem 進行初始化
            fs.initialize(uri, conf);
            var6 = fs;
        } finally {
            scope.close();
        }

        return var6;
    }

我們要注意,FileSystem這個類是抽象類,它的實現子類是 DistributedFileSystem,所以雖然 fs是FileSystem類型的,但是對象本身是DistributedFileSystem類型的,也就是java 的多態特性。所以fs.initialize() 調用的實際上是 DistributedFileSystem中initialize()方法。下面看看這個方法

/*
DistributedFileSystem.class
*/

public void initialize(URI uri, Configuration conf) throws IOException {
        super.initialize(uri, conf);
        this.setConf(conf);
        String host = uri.getHost();
        if (host == null) {
            throw new IOException("Incomplete HDFS URI, no host: " + uri);
        } else {
            this.homeDirPrefix = conf.get("dfs.user.home.dir.prefix", "/user");

            //這是關鍵性代碼,創建了一個DFSClient對象,顧名思義就是RPC的客戶端
            this.dfs = new DFSClient(uri, conf, this.statistics);
            this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
            this.workingDir = this.getHomeDirectory();
            this.storageStatistics = (DFSOpsCountStatistics)GlobalStorageStatistics.INSTANCE.put("DFSOpsCountStatistics", new StorageStatisticsProvider() {
                public StorageStatistics provide() {
                    return new DFSOpsCountStatistics();
                }
            });
        }
    }

看到上面創建了一個 DFSClient() 對象,賦值給了 this.dfs。下面看看這個類的構造方法。

/*
DFSClient.class
*/

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, Statistics stats) throws IOException {
        .............................
        /*源碼比較長,所以截取重要的部分顯示*/

        //這是一個關鍵性變量,其實就是namenode代理對象,只不過還沒有創建對象
        ProxyAndInfo<ClientProtocol> proxyInfo = null;
        ...............................

        //下面開始創建namenode代理對象
        if (proxyInfo != null) {
            this.dtService = proxyInfo.getDelegationTokenService();
            this.namenode = (ClientProtocol)proxyInfo.getProxy();
        } else if (rpcNamenode != null) {
            Preconditions.checkArgument(nameNodeUri == null);
            this.namenode = rpcNamenode;
            this.dtService = null;
        } else {
            Preconditions.checkArgument(nameNodeUri != null, "null URI");
            //這里創建代理對象信息
            proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, nameNodeUri, nnFallbackToSimpleAuth);
            this.dtService = proxyInfo.getDelegationTokenService();

            //這里可以看到直接通過 proxyInfo.getProxy()獲取namenode代理對象,并將引用賦值給 this.namenode。并且類型是 ClientProtocol 類型的。
            this.namenode = (ClientProtocol)proxyInfo.getProxy();
        }

    /*下面省略一堆代碼*/    

    }

可以看到上面已經通過 this.namenode = (ClientProtocol)proxyInfo.getProxy(); 獲取到了 namenode 的代理對象,也就是rpc的客戶端對象。下面看看 ClientProtocol 這個是啥東西,因為代理對象是這個類型的。

/*
ClientProtocol.class

這是個接口
*/
public interface ClientProtocol {
    long versionID = 69L;
    /*

    下面主要是定義很多個抽象方法,主要就是用于對hdfs進行操作的接口,比如,open,create等這些常用方法。
    */
}

下面看看 proxyInfo創建代理對象的方法

/*
NameNodeProxiesClient
*/

    public static NameNodeProxiesClient.ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol(Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) throws IOException {
        AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, true, fallbackToSimpleAuth);
        if (failoverProxyProvider == null) {

        //創建無HA的代理對象
            InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
            Text dtService = SecurityUtil.buildTokenService(nnAddr);
            //創建proxy對象
            ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);

            //ProxyAndInfo是一個靜態內部類,將前面的proxy通過該類封裝后返回,可通過該類的 getProxy 方法返回已創建的proxy對象
            return new NameNodeProxiesClient.ProxyAndInfo(proxy, dtService, nnAddr);
        } else {

        //創建有HA的代理對象
            return createHAProxy(conf, nameNodeUri, ClientProtocol.class, failoverProxyProvider);
        }
    }

可以看到上面是已經創建了 proxy對象,并返回,而且我們也可以看到,創建的proxy對象就是clientProtocol類型的。下面看看創建proxy對象的方法 createNonHAProxyWithClientProtocol()

/*
NameNodeProxiesClient
*/

   public static ClientProtocol createNonHAProxyWithClientProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException {
        RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
        RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf, "dfs.client.retry.policy.enabled", false, "dfs.client.retry.policy.spec", "10000,6,60000,10", SafeModeException.class.getName());
        long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);

        //這里是核心代碼,可以明顯看到調用 RPC 模塊中的方法創建proxy對象
        ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB)RPC.getProtocolProxy(ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf), defaultPolicy, fallbackToSimpleAuth).getProxy();
        if (withRetries) {
            Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap();
            ClientProtocol translatorProxy = new ClientNamenodeProtocolTranslatorPB(proxy);
            return (ClientProtocol)RetryProxy.create(ClientProtocol.class, new DefaultFailoverProxyProvider(ClientProtocol.class, translatorProxy), methodNameToPolicyMap, defaultPolicy);
        } else {
            return new ClientNamenodeProtocolTranslatorPB(proxy);
        }
    }

所以至此我們可以發現,客戶端和namenode之間通信的方式就是通過RPC實現的。

總結來說,方法的調用時序圖如下:
七、HDFS上傳和下載原理(有源碼解析)

2、上傳源碼分析

一般來說,上傳操作,首先得

OutputStream os = fs.create(new Path("xxxx"));

即創建文件,然后再上傳文件數據。上傳數據的流程和普通的流操作沒什么區別。
下面看看這個 create方法。

/*
FileSystem.class
*/

    public abstract FSDataOutputStream create(Path var1, FsPermission var2, boolean var3, int var4, short var5, long var6, Progressable var8) throws IOException;

可以看到這是個抽象方法,前面也說到,它的實現子類是 DistributedFileSystem,這里這里是調用子類的 create方法,繼續看

/*
DistributedFileSystem.class
*/    

    public FSDataOutputStream create(Path f, final FsPermission permission, final EnumSet<CreateFlag> cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException {
        this.statistics.incrementWriteOps(1);
        this.storageStatistics.incrementOpCounter(OpType.CREATE);
        Path absF = this.fixRelativePart(f);
        return (FSDataOutputStream)(new FileSystemLinkResolver<FSDataOutputStream>() {
            public FSDataOutputStream doCall(Path p) throws IOException {

                //這里是核心代碼,this.dfs前面說到了就是存儲了DFSClient對象的引用的。可以通過該客戶端調用很多操作hdfs的方法。這里調用create方法,創建了一個 DFSOutputStream 對象。輸出流對象
                DFSOutputStream dfsos = DistributedFileSystem.this.dfs.create(DistributedFileSystem.this.getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt);

                //這里將上面創建的dfsos進行包裝并返回
                return DistributedFileSystem.this.dfs.createWrappedOutputStream(dfsos, DistributedFileSystem.this.statistics);
            }

            public FSDataOutputStream next(FileSystem fs, Path p) throws IOException {
                return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt);
            }
        }).resolve(this, absF);
    }

可以看見上面創建返回了 DFSOutputStream 輸出流對象。下面看看DFSClient.create方法的實現代碼。

/*
DFSClient.class
*/ 

public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException {
        this.checkOpen();
        FsPermission masked = this.applyUMask(permission);
        LOG.debug("{}: masked={}", src, masked);

        //創建輸出流對象
        DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, this.dfsClientConf.createChecksum(checksumOpt), this.getFavoredNodesStr(favoredNodes));
        this.beginFileLease(result.getFileId(), result);
        return result;
    }

繼續看 DFSOutputStream.newStreamForCreate 這個方法.

/*
DistributedFileSystem.class
*/ 

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException {
        TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate", src);
        Throwable var12 = null;

        try {
            HdfsFileStatus stat = null;
            boolean shouldRetry = true;
            int retryCount = 10;

            while(true) {
                if (shouldRetry) {
                    shouldRetry = false;

                    try {

                    //這里是核心代碼,可以看見是調用 dfsclient.namenode這個代理對象中的create方法創建文件,并返回狀態
                        stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS);
                    } catch (RemoteException var27) {
                        IOException e = var27.unwrapRemoteException(new Class[]{AccessControlException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class});
                        if (e instanceof RetryStartFileException) {
                            if (retryCount <= 0) {
                                throw new IOException("Too many retries because of encryption zone operations", e);
                            }

                            shouldRetry = true;
                            --retryCount;
                            continue;
                        }

                        throw e;
                    }
                }

                Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");

                //這里將上面創建文件的狀態傳入輸出流作為參數
                DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
                //看見一個神奇的方法
                out.start();
                DFSOutputStream var30 = out;

                //返回輸出流
                return var30;
            }
        } catch (Throwable var28) {
            var12 = var28;
            throw var28;
        } finally {
            if (ignored != null) {
                if (var12 != null) {
                    try {
                        ignored.close();
                    } catch (Throwable var26) {
                        var12.addSuppressed(var26);
                    }
                } else {
                    ignored.close();
                }
            }

        }
    }

上面看到 DFSOutputStream 對象居然有一個 start方法,來看看先。

/*
DFSOutputStream.class
*/ 
   protected synchronized void start() {
        this.getStreamer().start();
    }

// 繼續看 this.getStreamer() 這個方法,可以看到這個方法返回的是DataStreamer,繼續看這個類
 protected DataStreamer getStreamer() {
        return this.streamer;
    }

/*
DataStreamer.class
*/
//可以看到這個類繼承了 Daemon類,而Daemon本身是繼承了 Thread類
class DataStreamer extends Daemon {  }

由此可得知,DFSOutputStream 這個類本身并沒有繼承 Thread類,但是使用DataStreamer這個繼承了 Thread類的來新建線程傳輸數據,不占用當前線程。而在 DataStreamer 這個類中,重寫了 Thread標志性的 run 方法。傳輸數據就是在這里完成的。前面說到的 hdfs的 pipeline 也是這個run方法中實現的,里面是一個while死循環,知道傳輸完數據為止,或者客戶端關閉。代碼過長,就不看了。反正看到這里已經成功獲取了 client的輸出流對象,后面就是傳統的輸入流和輸出流的對接了,這里不細講了。

方法時序圖如下:
七、HDFS上傳和下載原理(有源碼解析)

1、FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信
2、調用FileSystem的create()方法,由于實現類為DistributedFileSystem,所有是調用該類中的create()方法
3、DistributedFileSystem持有DFSClient的引用,繼續調用DFSClient中的create()方法
4、DFSOutputStream提供的靜態newStreamForCreate()方法中調用NameNodeRpcServer服務端的create()方法并創建DFSOutputStream輸出流對象返回
5、通過hadoop提供的IOUtil工具類將輸出流輸出到本地

三、HDFS下載基本原理

1、基本流程
七、HDFS上傳和下載原理(有源碼解析)

1)客戶端向namenode請求下載文件,namenode在內存的metadata查找對應的文件的元數據,如果無則返回無,有則返回對應文件的block位置信息。而且,namenode會根據客戶端所在的位置,根據datanode以及client之間的距離大小,將返回的 block 的副本的datanode節點從距離小到大排序,距離最近的datanode則排在第一位。
2)client通過機架感知策略,選擇最近的datanode進行block請求讀取
3)datanode開始傳輸數據給client,以packet為單位,并做校驗
4)客戶端接收packet之后,本地緩存,然后再往本地路徑寫入該block。
5)第二塊,第三塊block重復以上過程

注意:

如果在讀數據的時候, DFSInputStream和datanode的通訊發生異常,就會嘗試正在讀的block的排序第二近的datanode,并且會記錄哪個 datanode發生錯誤,剩余的blocks讀的時候就會直接跳過該datanode。 DFSInputStream也會檢查block數據校驗和,如果發現一個壞的block,就會先報告到namenode節點,然后 DFSInputStream在其他的datanode上讀該block的鏡像。

四、HDFS下載源碼分析

client的初始化代碼是一樣的,這里不重復分析了。直接看下載
首先通過 open方法獲取目標文件的輸入流對象。

FSDataInputStream fis = client.open(getPath);

下面看看這個open方法

/*
FileSystem.class
*/

public FSDataInputStream open(Path f) throws IOException {
        return this.open(f, this.getConf().getInt("io.file.buffer.size", 4096));
    }

public abstract FSDataInputStream open(Path var1, int var2) throws IOException;

可以看到,依舊是抽象方法,所以依舊是調用 DistributedFileSystem的open方法。

/*
DistributedFileSystem.class
*/
    public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
        this.statistics.incrementReadOps(1);
        this.storageStatistics.incrementOpCounter(OpType.OPEN);
        Path absF = this.fixRelativePart(f);
        return (FSDataInputStream)(new FileSystemLinkResolver<FSDataInputStream>() {
            public FSDataInputStream doCall(Path p) throws IOException {

            //核心代碼,這里調用dfsclient的open方法穿件輸入流
                DFSInputStream dfsis = DistributedFileSystem.this.dfs.open(DistributedFileSystem.this.getPathName(p), bufferSize, DistributedFileSystem.this.verifyChecksum);
                return DistributedFileSystem.this.dfs.createWrappedInputStream(dfsis);
            }

            public FSDataInputStream next(FileSystem fs, Path p) throws IOException {
                return fs.open(p, bufferSize);
            }
        }).resolve(this, absF);
    }

熟悉的套路,依舊調用 dfsclient的open方法,創建輸入流,下面看看這個open方法

/*
DFSClient.class
*/

public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) throws IOException {
        this.checkOpen();
        TraceScope ignored = this.newPathTraceScope("newDFSInputStream", src);
        Throwable var5 = null;

        DFSInputStream var6;
        try {

         //這里直接創建一個輸入流對象,如果按照上面上傳文件的套路來說,應該是  dfsclient.namenode.open(xxx)才對的,這里并沒有
            var6 = new DFSInputStream(this, src, verifyChecksum, (LocatedBlocks)null);
        } catch (Throwable var15) {
            var5 = var15;
            throw var15;
        } finally {
            if (ignored != null) {
                if (var5 != null) {
                    try {
                        ignored.close();
                    } catch (Throwable var14) {
                        var5.addSuppressed(var14);
                    }
                } else {
                    ignored.close();
                }
            }

        }

        return var6;
    }

上面并沒有調用DFSClient.open,而是將DFSClient作為參數傳入DFSInputStream。下面看看 DFSInputStream 這個神奇的類。

/*
DFSInputStream.class
*/

DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException {
        //將dfsclinet保存到當前類中
        this.dfsClient = dfsClient;
        this.verifyChecksum = verifyChecksum;
        this.src = src;
        synchronized(this.infoLock) {
            this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
        }

        this.locatedBlocks = locatedBlocks;
        //核心方法,開始獲取block信息,如有多少個block,以及每個block所在的datanode節點名
        this.openInfo(false);
    }

下面看看 openInfo() 方法

/*
DFSInputStream.class
*/

    void openInfo(boolean refreshLocatedBlocks) throws IOException {
        DfsClientConf conf = this.dfsClient.getConf();
        synchronized(this.infoLock) {

            //獲取block的位置信息以及最后一個block的長度(因為最后一個block肯定不是完整的128M的長度)
            this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);

            int retriesForLastBlockLength;
            for(retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); retriesForLastBlockLength > 0 && this.lastBlockBeingWrittenLength == -1L; --retriesForLastBlockLength) {
                DFSClient.LOG.warn("Last block locations not available. Datanodes might not have reported blocks completely. Will retry for " + retriesForLastBlockLength + " times");
                this.waitFor(conf.getRetryIntervalForGetLastBlockLength());
                this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(true);
            }

            if (this.lastBlockBeingWrittenLength == -1L && retriesForLastBlockLength == 0) {
                throw new IOException("Could not obtain the last block locations.");
            }
        }
    }

下面看看 fetchLocatedBlocksAndGetLastBlockLength 這個獲取block信息的方法

/*
DFSInputStream.class
*/

private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh) throws IOException {
        LocatedBlocks newInfo = this.locatedBlocks;
        if (this.locatedBlocks == null || refresh) {

            //可以看到這里是調用 dfsclient中的方法倆獲取block信息
            newInfo = this.dfsClient.getLocatedBlocks(this.src, 0L);
        }

        DFSClient.LOG.debug("newInfo = {}", newInfo);
        if (newInfo == null) {
            throw new IOException("Cannot open filename " + this.src);
        } else {
            if (this.locatedBlocks != null) {
                Iterator<LocatedBlock> oldIter = this.locatedBlocks.getLocatedBlocks().iterator();
                Iterator newIter = newInfo.getLocatedBlocks().iterator();

                while(oldIter.hasNext() && newIter.hasNext()) {
                    if (!((LocatedBlock)oldIter.next()).getBlock().equals(((LocatedBlock)newIter.next()).getBlock())) {
                        throw new IOException("Blocklist for " + this.src + " has changed!");
                    }
                }
            }

            this.locatedBlocks = newInfo;
            long lastBlockBeingWrittenLength = 0L;
            if (!this.locatedBlocks.isLastBlockComplete()) {
                LocatedBlock last = this.locatedBlocks.getLastLocatedBlock();
                if (last != null) {
                    if (last.getLocations().length == 0) {
                        if (last.getBlockSize() == 0L) {
                            return 0L;
                        }

                        return -1L;
                    }

                    long len = this.readBlockLength(last);
                    last.getBlock().setNumBytes(len);
                    lastBlockBeingWrittenLength = len;
                }
            }

            this.fileEncryptionInfo = this.locatedBlocks.getFileEncryptionInfo();
            return lastBlockBeingWrittenLength;
        }
    }

看到上面又回到調用 dfsClient.getLocatedBlocks,看看這個方法

/*
DFSClient.class
*/

public LocatedBlocks getLocatedBlocks(String src, long start) throws IOException {
        return this.getLocatedBlocks(src, start, this.dfsClientConf.getPrefetchSize());
    }

//繼續調用下面這個方法
    public LocatedBlocks getLocatedBlocks(String src, long start, long length) throws IOException {
        TraceScope ignored = this.newPathTraceScope("getBlockLocations", src);
        Throwable var7 = null;

        LocatedBlocks var8;
        try {

        //調用這個靜態方法獲取 block位置信息
            var8 = callGetBlockLocations(this.namenode, src, start, length);
        } catch (Throwable var17) {
            var7 = var17;
            throw var17;
        } finally {
            if (ignored != null) {
                if (var7 != null) {
                    try {
                        ignored.close();
                    } catch (Throwable var16) {
                        var7.addSuppressed(var16);
                    }
                } else {
                    ignored.close();
                }
            }

        }

        return var8;
    }

//繼續看
    static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, String src, long start, long length) throws IOException {
        try {

        //熟悉的味道,通過 namenode 的代理對象獲取block信息
            return namenode.getBlockLocations(src, start, length);
        } catch (RemoteException var7) {
            throw var7.unwrapRemoteException(new Class[]{AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class});
        }
    }

上面可以看到,仍舊是通過 namenode代理對象發起操作,下面看看 namenode.getBlockLocations。因為代理對象的類型是 ClientProtocol類型的,是個接口,所以得到實現子類中查看 ,ClientNamenodeProtocolTranslatorPB這個類。

/*
ClientNamenodeProtocolTranslatorPB.class
*/

public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException {
        GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto.newBuilder().setSrc(src).setOffset(offset).setLength(length).build();

        try {

            //熟悉的味道,調用 rcpProxy 向namenode server 發起操作。
            GetBlockLocationsResponseProto resp = this.rpcProxy.getBlockLocations((RpcController)null, req);
            return resp.hasLocations() ? PBHelperClient.convert(resp.getLocations()) : null;
        } catch (ServiceException var8) {
            throw ProtobufHelper.getRemoteException(var8);
        }
    }

看到這里,下面就是RPC底層的操作了。

方法時序圖如下:
七、HDFS上傳和下載原理(有源碼解析)1、FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信(與前面一樣)
2、調用FileSystem的open()方法,由于實現類為DistributedFileSystem,所有是調用該類中的open()方法
3、DistributedFileSystem持有DFSClient的引用,繼續調用DFSClient中的open()方法
4、實例化DFSInputStream輸入流
5、調用openinfo()方法
6、調用fetchLocatedBlocksAndGetLastBlockLength()方法,抓取block信息并獲取最后block長度
7、調用DFSClient中的getLocatedBlocks()方法,獲取block信息
8、在callGetBlockLocations()方法中通過NameNode代理對象調用NameNodeRpcServer的getBlockLocations()方法
9、將block信息寫入輸出流,在8中會將 block 位置信息保存到DFSInputStream輸入流對象中的成員變量中
10、交給IOUtil,下載文件到本地

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

偏关县| 华容县| 镇赉县| 于田县| 武鸣县| 永福县| 阿拉善右旗| 镇沅| 宜兰市| 德兴市| 曲松县| 萍乡市| 偏关县| 日照市| 恭城| 虎林市| 建始县| 江油市| 台江县| 清水河县| 弋阳县| 宜丰县| 松江区| 博兴县| 武乡县| 抚顺县| 嘉荫县| 新和县| 信宜市| 溆浦县| 锦州市| 麻阳| 冷水江市| 万宁市| 津市市| 明星| 华蓥市| 和林格尔县| 贵南县| 资源县| 盐津县|