91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

K8S中如何利用Exec Websocket接口實現Pod間的文件拷貝

發布時間:2021-12-16 10:37:50 來源:億速云 閱讀:281 作者:柒染 欄目:互聯網科技

今天就跟大家聊聊有關K8S中如何利用Exec Websocket接口實現Pod間的文件拷貝,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

 需求

想想咱們遇到以下問題一般怎么解決?

新建了一個Pod, 想把另外一個Pod中的文件拷貝到新Pod中進行分析, 怎么實現呢?

如何在項目中, 如何像kubectl cp拷貝文件一樣, 實現Pod間文件拷貝呢?

新Pod與實例Pod共享pvc? 或者封裝一個帶認證上下文的kubectl執行命令行?

簡介


K8S中如何利用Exec Websocket接口實現Pod間的文件拷貝

流程說明

  • 首先初始化信號通道, 用于協程間的信號通知, 收到信號的協程執行暫停/退出循環/關閉通道等操作

  • 初始化數據通道srcStdOutCh, 類型為字節數組[]byte, 用于將源Pod的標準輸出放入通道,  發送給目的Pod標準輸入的數據就是從該數據通道中讀取

  • 拼接exec接口的訪問地址(集群連接,token), tar壓縮命令, 標準輸入/輸出,tty, pod名,容器名等參數. tar czf -  /var/log/xxx.log 表示將該文件樹結構壓縮為數據流

  • 調用websocket的Dialer方法與源Pod容器建立websocket連接, 并開啟協程將標準輸出寫入數據通道srcStdOutCh

  • 參考源pod exec接口, 拼接目的Pod exec訪問連接, tar xzf - -C /tmp表示從標準輸入讀取數據流,  并解壓成文件樹結構(注意:解壓后包含文件目錄樹結構)

  • 與目的Pod建立wss連接, 開啟協程從數據通道srcStdOutCh中讀取源Pod標準輸出, 并寫入目的Pod的標準輸入,  如果從數據通道讀取超時,則表示數據已經傳輸完畢, 此時停止向目的容器輸入數據, 并發送通知信號, 通知主協程可以退出,關閉源Pod的wss連接

注意事項

  • wesocket連上源Pod時, 標準輸出中會輸出空數據, tar命令輸出等干擾數據, 所以接收數據的時候需要傳入一個過濾器回調函數,  用于數據過濾

  • 向目的容器發送數據時, 需要將源容器收到的第一個字節刪除, 一般為1, 表示標準輸出標識, 發送給目的容器是不需要該字節的

  • 發送數據時, 需要設置第一個字節為0, 表示發送到標準輸入

參考代碼

cp.go

/* 總結: 1.不帶緩沖的通道需要先讀后寫 2.websocket ReadMessage方法是阻塞讀取的, 如果要中斷讀取, 關閉連接, 捕獲錯誤即可 */ package cpFilePod2Pod  import (   "crypto/tls"   "errors"   "fmt"   "log"   "net/url"   "regexp"   "strings"   "sync"   "time"    "github.com/gorilla/websocket" )  // 定義過濾器回調函數 type filterCallback func(input string) bool  // 帶有互斥鎖的Websocket連接對象 type WsConn struct {   Conn *websocket.Conn   mu   sync.Mutex }  // 發送字符串, 自動添加換行符 func (self *WsConn) Send(sender string, str string) {   self.mu.Lock()   defer self.mu.Unlock()   // 利用k8s exec websocket接口發送數據時, 第一個字節需要設置為0, 表示將數據發送到標準輸入   data := []byte{0}   data = append(data, []byte(str+"\n")...)   err := self.Conn.WriteMessage(websocket.BinaryMessage, data) //發送二進制數據類型   if err != nil {     log.Printf("發送錯誤, %s", err.Error())   }   log.Printf("%s, 數據:%s, 字節:%+v", sender, str, []byte(str+"\n")) }  //發送字符串, 不添加換行符, 內部做字節過濾,等操作 func (self *WsConn) SendWithFilter(sender string, str string) {   self.mu.Lock()   defer self.mu.Unlock()   // log.Printf("向目的容器發送數據:%s", str)   str = strings.ReplaceAll(str, "\r\n", "\n") // /r=13, /n=10, windows換行符轉Linux換行符   //去掉第一個字節(標準輸出1, byte:[0 1 ...]), 因為從源容器輸出的字節中, 第一位標識了標準輸出1, 給目的容器發送字節時, 需要去除該標志   //當WebSocket建立連接后,發送數據時需要在字節Buffer第一個字節設置為stdin(buf[0] = 0),而接受數據時, 需要判斷第一個字節, stdout(buf[0] = 1)或stderr(buf[0] = 2)   strByte := append([]byte(str)[:0], []byte(str)[1:]...)   data := []byte{0}   data = append(data, strByte...)   err := self.Conn.WriteMessage(websocket.BinaryMessage, data)   log.Printf("向目的容器標準輸入發送數據:\n%s, 字節數:%d, 字節:%+v", string(data), len(data), data)   if err != nil {     log.Printf("發送錯誤, %s", err.Error())   } }  //從連接中獲取數據流, 并寫入字節數組通道中, 內部執行過濾器(回調函數) func (self *WsConn) Receive(receiver string, ch chan []byte, filter filterCallback) error {   self.mu.Lock()   defer self.mu.Unlock()   msgType, msgByte, err := self.Conn.ReadMessage() //阻塞讀取, 類型為2表示二進制數據, 1表示文本, -1表示連接已關閉:websocket: close 1000 (normal)   log.Printf("%s, 讀取到數據:%s, 類型:%d, 字節數:%d, 字節:%+v", receiver, string(msgByte), msgType, len(msgByte), msgByte)   if err != nil {     log.Printf("%s, 讀取出錯, %s", receiver, err.Error())     return err   }   if filter(string(msgByte)) && len(msgByte) > 1 {     ch <- msgByte   } else {     log.Printf("%s, 數據不滿足, 直接丟棄數據, 字符:%s, 字節數:%d, 字節:%v", receiver, string(msgByte), len(msgByte), msgByte)   }   return nil }  func NewWsConn(host string, path string, params map[string]string, headers map[string][]string) (*websocket.Conn, error) {   paramArray := []string{}   for k, v := range params {     paramArray = append(paramArray, fmt.Sprintf("%s=%s", k, v))   }   u := url.URL{Scheme: "wss", Host: host, Path: path, RawQuery: strings.Join(paramArray, "&")}   log.Printf("API:%s", u.String())   dialer := websocket.Dialer{TLSClientConfig: &tls.Config{RootCAs: nil, InsecureSkipVerify: true}}   conn, _, err := dialer.Dial(u.String(), headers)   if err != nil {     return nil, errors.New(fmt.Sprintf("連接錯誤:%s", err.Error()))   }   return conn, nil }  //核心: tar -cf - 將具有文件夾結構的數據轉換成數據流, 通過 tar -xf - 將數據流轉換成 linux 文件系統 func CpPod2Pod() {   //通知主函數可以退出的信號通道   signalExit := make(chan bool, 1)   defer close(signalExit)    //下發不要給目的容器發送數據的信號   signalStopDstSend := make(chan bool, 1)   defer close(signalStopDstSend)    //下發不要從源容器讀取數據的信號   signalStopSrcRead := make(chan bool, 1)   defer close(signalStopSrcRead)    //下發不要從目的容器讀取數據的信號   signalStopDstRead := make(chan bool, 1)   defer close(signalStopDstRead)    //下發不要打印目的容器的輸出數據   signalStopPrintDstStdout := make(chan bool, 1)   defer close(signalStopPrintDstStdout)    //連接pod   host := "172.16.xxx.xxx:6443"   token := "xxx"   headers := map[string][]string{"authorization": {fmt.Sprintf("Bearer %s", token)}}    pathSrc := "/api/v1/namespaces/xxx/pods/xxx/exec"   commandSrc := "tar&command=czf&command=-&command=/var/log/mysql/slow.log" //tar czf - sourceFile   paraSrc := map[string]string{"stdout": "1", "stdin": "0", "stderr": "1", "tty": "0", "container": "xxx", "command": commandSrc}   srcConn, err := NewWsConn(host, pathSrc, paraSrc, headers)   if err != nil {     log.Printf("源Pod連接出錯, %s", err.Error())   }    pathDst := "/api/v1/namespaces/xxx/pods/xxx/exec"   commandDst := "tar&command=xzf&command=-&command=-C&command=/tmp" // tar xzf - -C /tmp   // paraDst := map[string]string{"stdout": "1", "stdin": "1", "stderr": "1", "tty": "0", "container": "xxx", "command": commandDst}   paraDst := map[string]string{"stdout": "0", "stdin": "1", "stderr": "0", "tty": "0", "container": "xxx", "command": commandDst} //關閉目的Pod標準輸出和錯誤輸出   dstConn, err := NewWsConn(host, pathDst, paraDst, headers)   if err != nil {     log.Printf("目的Pod連接出錯, %s", err.Error())   }    wsSrc := WsConn{     Conn: srcConn,   }    wsDst := WsConn{     Conn: dstConn,   }    defer srcConn.Close()   defer dstConn.Close()    srcStdOutCh := make(chan []byte, 2048)   dstStdOutCh := make(chan []byte)   defer close(srcStdOutCh)   defer close(dstStdOutCh)    // 接收源容器標準輸出到數據通道中   go func() {     i := 1     for {       log.Printf("第%d次, 從源容器讀取標準輸出", i)       i++       //定義匿名過濾器回調方法, 對源容器標準輸出中不需要的數據進行過濾       err := wsSrc.Receive("源容器", srcStdOutCh, func(input string) bool {         if input == "cat /var/log/mysql/slow.log" {           return false           // } else if match, _ := regexp.MatchString("root@(.+)#", input); match {           //   return false           // } else if match, _ := regexp.MatchString("cat /(.+).log", input); match {           //   return false           // } else if match, _ := regexp.MatchString("cat /tmp/(.+)", input); match {           //   return false         } else if match, _ := regexp.MatchString("tar: Removing leading(.+)", input); match {           return false         } else if len(input) == 0 { //過濾空消息           // log.Printf("讀取到標準錯誤輸出")           return false         }         return true       })       if err != nil {         log.Printf("讀取源容器標準輸出失敗")         // signalExit <- true         break       }       // time.Sleep(time.Microsecond * 100)     }   }()    /* 注意, 這里不能開啟并發協程去讀取目的容器的標準輸出, 如果開啟可能會與發送數據的協程搶鎖, 從而阻塞向目的容器發送數據*/   // // 從目的容器獲取標準輸出到數據通道中   // go func() {   //   // i := 0   //   for {   //     // 該過濾器直接返回true, 僅占位   //     err := wsDst.Receive("目的容器", dstStdOutCh, func(input string) bool {   //       return true   //     })   //     if err != nil {   //       log.Printf("從目的容器讀取數據失敗")   //       break   //     }   //     // wsDst.Send()   //     time.Sleep(time.Microsecond * 100000)   //   }   //   // log.Printf("從目的容器讀取數據, 第%d次循環", i)   //   // i++   // }()    // //從數據通道中讀取, 目的容器的標準輸出, 并打印   // go func() {   // BreakPrintDstPodStdout:   //   for {   //     select {   //     case data := <-dstStdOutCh:   //       log.Printf("目的容器標準輸出:%s", string(data))   //       // time.Sleep(time.Microsecond * 200)   //     case <-signalStopPrintDstStdout:   //       log.Printf("收到信號, 停止打印目的容器標準輸出")   //       // close(dataOutput)   //       // close(dataCh)   //       // signalStopRead <- true   //       // log.Printf("發送停止讀信號")   //       // close(dataOutput)   //       // close(dataCh)   //       break BreakPrintDstPodStdout   //     }   //     // time.Sleep(time.Microsecond * 100)   //   }   // }()    //從源容器標準輸出的數據通道獲取數據, 然后發送給目的容器標準輸入   //定義超時時間   timeOutSecond := 3   timer := time.NewTimer(time.Second * time.Duration(timeOutSecond)) Break2Main:   for {     select {     case data := <-srcStdOutCh:       wsDst.SendWithFilter("向目的容器發送", string(data))       // time.Sleep(time.Millisecond * 200)       timer.Reset(time.Second * time.Duration(timeOutSecond))     case <-timer.C:       // time.Sleep(time.Second * 5)       log.Printf("================ 源容器標準輸出,沒有新的數據,獲取超時,停止向目的容器發送數據 ================")       // log.Printf("發送信號:停止打印目的容器標準輸出")       // signalStopPrintDstStdout <- true       log.Printf("發送信號:停止從源容器讀取數據")       wsSrc.Conn.Close()       // log.Printf("發送信號:停止從目的容器讀取數據")       // wsDst.Conn.Close()       log.Printf("發送信號:主函數可以退出了")       signalExit <- true       log.Printf("所有信號發送完畢")       log.Printf("================== 跳出循環 =================")       break Break2Main     }     // time.Sleep(time.Microsecond * 1000)   }    // signalStopRead <- true   <-signalExit //阻塞通道, 直到收到一個信號   // signalStopRead <- true   log.Printf("主函數收到信號, 準備退出")   // close(dataCh)   // time.Sleep(time.Second)   // close(dataOutput)   // time.Sleep(time.Second)   // select {} }

cp_test.go

  1. package cpFilePod2Pod 

  2.  

  3. import ( 

  4.   "log" 

  5.   "testing" 

  6.  

  7. // go test -race -test.run TestCpPod2Pod  切到該目錄執行該測試 

  8. func TestCpPod2Pod(t *testing.T) { 

  9.   log.Printf("開始測試") 

  10.   CpPod2Pod() 


參考結果: 源容器: root@xxx-mysql-0:/var/log/mysql# md5sum slow.log 16577613b6ea957ecb5d9d5e976d9c50  slow.log 目的容器: root@xxx-75bdcdb8cf-hq9wf:/tmp/var/log/mysql# md5sum slow.log 16577613b6ea957ecb5d9d5e976d9c50  slow.log

看完上述內容,你們對K8S中如何利用Exec Websocket接口實現Pod間的文件拷貝有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

五峰| 兴仁县| 宁晋县| 修文县| 镇原县| 金塔县| 宝鸡市| 鹤峰县| 星子县| 运城市| 华宁县| 吉林省| 万宁市| 左云县| 宣恩县| 安宁市| 东港市| 庆元县| 湖南省| 察隅县| 榆中县| 漳浦县| 山东省| 广宁县| 大关县| 宜君县| 陇川县| 江津市| 洪江市| 磐安县| 额尔古纳市| 中宁县| 涟源市| 德兴市| 土默特左旗| 微博| 荔浦县| 政和县| 馆陶县| 桐梓县| 姜堰市|