您好,登錄后才能下訂單哦!
這篇文章主要講解了“怎么接入Apache Kafka服務器”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么接入Apache Kafka服務器”吧!
交易撮合引擎(Matching/Trading Engine),顧名思義是用來撮合交易的軟件,廣泛地應用在金融、證券、加密貨幣交易等領域。交易引擎負責管理加密資產市場中所有的開口訂單(Open Orders),并在發現匹配的訂單對(Trading Pair)時自動執行交易。本文將首先介紹有關加密資產交易撮合引擎的基本概念,例如委托單、交易委托賬本等,然后使用Golang實現一個原理性的撮合引擎。
在開始打造交易撮合引擎之前,讓我們首先熟悉相關的基本概念與術語。
正如前面所述,交易撮合引擎是用來撮合交易的軟件,可以先把交易撮合引擎看作一個黑盒子,它有一些輸入和輸出。
例如,可能的輸入包括:
創建新的委托單(NewOrder):一個新的委托單可以作為交易撮合引擎的輸入,引擎會嘗試將其與已有的 委托單進行撮合。
取消已有的委托單(CancelOrder):用戶也可以取消一個之前輸入的委托單,如果它還沒有執行的話,即開口訂單。
當然你可以定義其他的輸入,出于簡化考慮,我們現在只定義上述兩個輸入。
交易撮合引擎的輸出是一些事件,以便及時通知其他應用處理。例如,當引擎撮合了一筆交易后,就會觸發一個TradesGenerated事件;而當取消了一個已有的委托單后,引擎就會觸發rderCancelled。同樣,你可以根據自己的需求來定義引擎的輸出,這里我們還是簡單點,只定義這兩個輸出事件。
交易委托賬本(Order Book)就是一個買方委托單或買方委托單的列表,通常按照價格和時間排序。
當一個新的買方(買方)委托單進入引擎后,引擎就會將嘗試其與現有的賣方(買方)委托賬本 進行匹配,看是否存在執行交易的可能。如果找到了匹配的對手單,引擎就可以執行這兩個委托單了,也就是撮合成功了。
在任何交易引擎中,都可能有多種類型的委托單供用戶選擇。其中常見的類型包括:
限價委托單
限價委托單是在當前的加密貨幣交易環境中最常用的委托類型。這種委托單允許用戶指定一個價格,只有當撮合引擎找到同樣價格甚至更好價格的對手單時才執行交易。
對于一個買方委托單而言,這意味著如果你的委托價格是¥100,那么該委托單將會在任何不高于¥100的價格成交 —— 買到指定的價格或者更便宜的價格;而對于一個賣方委托單而言,同樣的委托價格意味著該委托單將在任何不低于¥100的價格成交—— 賣出指定的價格或者更高的價格。
市價委托單
市價委托單的撮合會完全忽略價格因素,而致力于有限完成指定數量的成交。市價委托單在交易委托賬本中有較高的優先級,在流動性充足的市場中市價單可以保證成交。
例如,當用戶委托購買2個以太幣時,該委托單可以在¥900、¥1000、¥2000或任何其他價位成交,這依賴于市場中當前的敞口委托單的情況。
止損委托單
止損委托單盡在市場價格到達指定價位時才被激活,因此它的執行方式與市價委托單相反。一旦止損委托單激活,它們可以自動轉化為市價委托單或限價委托單。
如果你希望打造一個高級的交易所,那么還有其他一些需要了解的概念,例如流動性、多/空交易、FIX/FAST協議等等,但是同樣出于簡化考慮,我們將這些內容留給你自己去發現。
現在,對于交易撮合引擎的構成我們已經有了一些了解,那么讓我們看一下整個系統的架構,以及我們將要使用的技術:
正如你上面看到的,我們的系統將包含引擎的多個客戶端,這些客戶端可以是交易所系統中的其他組件,例如接收終端用戶委托請求的App等等。
在客戶端和引擎之間的通信是使用Apache Kafka作為消息總線來實現的,每個交易對都對應Kafka的一個主題,這樣我們可以確保當消息隊列接收到用戶委托單時,引擎將以同樣的先后順序處理委托單。這保證了即使引擎崩潰重啟我們也可以重建交易委托賬本。
引擎將監聽Kafka主題,執行委托賬本命令并將引擎的輸出事件發布到消息隊列中。當然如果能夠監測委托單的處理速度以及交易的執行情況會更酷。我們可以使用Prometheus來采集性能指標,使用grafana來實現一個監視儀表盤。
可以選擇你熟悉的開發語言,不過由于交易撮合引擎計算量巨大,通常我們應當選擇底層系列的語言,例如:C/C++、GoLang、Rust、Java等等。在這個教程中,我們使用Golang,因為它很快、容易理解、并發實現簡單,而且我也有好久沒有用C++了。
我們將按照以下的步驟來開發交易撮合引擎:
基礎類型定義
Consumer實現
Order Book實現
Producer實現
Monitoring實現
我們需要首先定義一些基礎類型,這包括Order、OrderBook和Trade,分別表示委托單、交易委托賬本和交易:
下面是engine/order.go
文件的內容:
package engine import "encoding/json" type Order struct { Amount uint64 `json:"amount"` Price uint64 `json:"price"` ID string `json:"id"` Side int8 `json:"side"` } func (order *Order) FromJSON(msg []byte) error { return json.Unmarshal(msg, order) } func (order *Order) ToJSON() []byte { str, _ := json.Marshal(order) return str }
這里我們就是簡單地創建了一個結構用來記錄訂單的主要信息,然后添加了一個方法用于快速的JSON轉換。
類似地engine/trade.go
文件的內容:
package engine import "encoding/json" type Trade struct { TakerOrderID string `json:"taker_order_id"` MakerOrderID string `json:"maker_order_id"` Amount uint64 `json:"amount"` Price uint64 `json:"price"` } func (trade *Trade) FromJSON(msg []byte) error { return json.Unmarshal(msg, trade) } func (trade *Trade) ToJSON() []byte { str, _ := json.Marshal(trade) return str }
現在我們已經定義了基本的輸入和輸出類型,現在看看交易委托賬本engine/order_book.go
文件的內容:
package engine // OrderBook type type OrderBook struct { BuyOrders []Order SellOrders []Order } // Add a buy order to the order book func (book *OrderBook) addBuyOrder(order Order) { n := len(book.BuyOrders) var i int for i := n - 1; i >= 0; i-- { buyOrder := book.BuyOrders[i] if buyOrder.Price < order.Price { break } } if i == n-1 { book.BuyOrders = append(book.BuyOrders, order) } else { copy(book.BuyOrders[i+1:], book.BuyOrders[i:]) book.BuyOrders[i] = order } } // Add a sell order to the order book func (book *OrderBook) addSellOrder(order Order) { n := len(book.SellOrders) var i int for i := n - 1; i >= 0; i-- { sellOrder := book.SellOrders[i] if sellOrder.Price > order.Price { break } } if i == n-1 { book.SellOrders = append(book.SellOrders, order) } else { copy(book.SellOrders[i+1:], book.SellOrders[i:]) book.SellOrders[i] = order } } // Remove a buy order from the order book at a given index func (book *OrderBook) removeBuyOrder(index int) { book.BuyOrders = append(book.BuyOrders[:index], book.BuyOrders[index+1:]...) } // Remove a sell order from the order book at a given index func (book *OrderBook) removeSellOrder(index int) { book.SellOrders = append(book.SellOrders[:index], book.SellOrders[index+1:]...) }
在交易委托賬本中,除了創建保存買/賣方委托單的列表外,我們還需要定義添加新委托單的方法。
委托單列表應當根據其類型按升序或降序排列:賣方委托單是按降序排列的,這樣在列表中序號最大的委托單價格最低;買方委托單是按升序排列的,因此在其列表中最后的委托單價格最高。
由于絕大多數交易會在市場價格附近成交,我們可以輕松地從這些數組中插入或移除成員。
現在讓我們來處理委托單。
在下面的代碼中我們添加了一個命令來實現對限價委托單的處理。
文件engine/order_book_limit_order.go
的內容:
package engine // Process an order and return the trades generated before adding the remaining amount to the market func (book *OrderBook) Process(order Order) []Trade { if order.Side == 1 { return book.processLimitBuy(order) } return book.processLimitSell(order) } // Process a limit buy order func (book *OrderBook) processLimitBuy(order Order) []Trade { trades := make([]Trade, 0, 1) n := len(book.SellOrders) // check if we have at least one matching order if n != 0 || book.SellOrders[n-1].Price <= order.Price { // traverse all orders that match for i := n - 1; i >= 0; i-- { sellOrder := book.SellOrders[i] if sellOrder.Price > order.Price { break } // fill the entire order if sellOrder.Amount >= order.Amount { trades = append(trades, Trade{order.ID, sellOrder.ID, order.Amount, sellOrder.Price}) sellOrder.Amount -= order.Amount if sellOrder.Amount == 0 { book.removeSellOrder(i) } return trades } // fill a partial order and continue if sellOrder.Amount < order.Amount { trades = append(trades, Trade{order.ID, sellOrder.ID, sellOrder.Amount, sellOrder.Price}) order.Amount -= sellOrder.Amount book.removeSellOrder(i) continue } } } // finally add the remaining order to the list book.addBuyOrder(order) return trades } // Process a limit sell order func (book *OrderBook) processLimitSell(order Order) []Trade { trades := make([]Trade, 0, 1) n := len(book.BuyOrders) // check if we have at least one matching order if n != 0 || book.BuyOrders[n-1].Price >= order.Price { // traverse all orders that match for i := n - 1; i >= 0; i-- { buyOrder := book.BuyOrders[i] if buyOrder.Price < order.Price { break } // fill the entire order if buyOrder.Amount >= order.Amount { trades = append(trades, Trade{order.ID, buyOrder.ID, order.Amount, buyOrder.Price}) buyOrder.Amount -= order.Amount if buyOrder.Amount == 0 { book.removeBuyOrder(i) } return trades } // fill a partial order and continue if buyOrder.Amount < order.Amount { trades = append(trades, Trade{order.ID, buyOrder.ID, buyOrder.Amount, buyOrder.Price}) order.Amount -= buyOrder.Amount book.removeBuyOrder(i) continue } } } // finally add the remaining order to the list book.addSellOrder(order) return trades }
看起來我們將一個方法變成了兩個,分別處理買方委托單和賣方委托單。這兩個方法在每個方面 都很相似,除了處理的市場側不同。
算法非常簡單。我們將一個買方委托單與所有的賣方委托單進行匹配,找出任何與買方委托價格 一致甚至更低的賣方委托單。當這一條件不能滿足時,或者該買方委托單完成后,我們返會撮合 的交易。
現在就快完成我們的交易引擎了,還需要接入Apache Kafka服務器,然后開始監聽委托單。
main.go
文件的內容:
package main import ( "engine/engine" "log" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" ) func main() { // create the consumer and listen for new order messages consumer := createConsumer() // create the producer of trade messages producer := createProducer() // create the order book book := engine.OrderBook{ BuyOrders: make([]engine.Order, 0, 100), SellOrders: make([]engine.Order, 0, 100), } // create a signal channel to know when we are done done := make(chan bool) // start processing orders go func() { for msg := range consumer.Messages() { var order engine.Order // decode the message order.FromJSON(msg.Value) // process the order trades := book.Process(order) // send trades to message queue for _, trade := range trades { rawTrade := trade.ToJSON() producer.Input() <- &sarama.ProducerMessage{ Topic: "trades", Value: sarama.ByteEncoder(rawTrade), } } // mark the message as processed consumer.MarkOffset(msg, "") } done <- true }() // wait until we are done <-done } // // Create the consumer // func createConsumer() *cluster.Consumer { // define our configuration to the cluster config := cluster.NewConfig() config.Consumer.Return.Errors = false config.Group.Return.Notifications = false config.Consumer.Offsets.Initial = sarama.OffsetOldest // create the consumer consumer, err := cluster.NewConsumer([]string{"127.0.0.1:9092"}, "myconsumer", []string{"orders"}, config) if err != nil { log.Fatal("Unable to connect consumer to kafka cluster") } go handleErrors(consumer) go handleNotifications(consumer) return consumer } func handleErrors(consumer *cluster.Consumer) { for err := range consumer.Errors() { log.Printf("Error: %s\n", err.Error()) } } func handleNotifications(consumer *cluster.Consumer) { for ntf := range consumer.Notifications() { log.Printf("Rebalanced: %+v\n", ntf) } } // // Create the producer // func createProducer() sarama.AsyncProducer { config := sarama.NewConfig() config.Producer.Return.Successes = false config.Producer.Return.Errors = true config.Producer.RequiredAcks = sarama.WaitForAll producer, err := sarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatal("Unable to connect producer to kafka server") } return producer }
利用Golang的Sarama Kafka客戶端開發庫,我們可以分別創建一個接入Kafka的消費者和生產者。
消費者將在指定的Kafka主題上等待新的委托單,然后進行撮合處理。生成的交易接下來使用生產者發送到指定的交易主題。
Kafka消息采用字節數組編碼,因此我們需要將其解碼。反之,將交易傳入消息隊列時,我們還需要進行必要的編碼。
感謝各位的閱讀,以上就是“怎么接入Apache Kafka服務器”的內容了,經過本文的學習后,相信大家對怎么接入Apache Kafka服務器這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。