您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關如何用Python開發數字貨幣交易機器人,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
眾所周知,幣圈一天,人間一年。我們進行數字貨幣交易時,在交易所 APP 或者網站 盯盤并手動下單非常耗時,當幣價波動非常大的時候往往會錯失良機。這時我們可以創建一個簡單的 telegram 交易機器人,來幫助我們進行做空和做多交易。
該機器人可以實現以下功能:
做空交易 - 以指定的價格賣出持有貨幣并在價格下跌時回購
做多交易 - 指定的價格購買貨幣并在價格上漲時賣出
列出交易訂單
顯示可用余額
設置 Telegram 機器人
首先需要一個 Telegram 賬號,如果沒有的話請自己注冊一個。然后與BotFather進行對話,通過輸入/newbot來新建一個telegram機器人,根據指示一步步創建并記住你的token。
獲取交易所的 API keys
查找你的交易所API文檔,看看如何獲取對訂單和賬戶余額的訪問權限和步驟,記住你的密碼和API keys。本例中我們以bitfinex為例,Bitmex交易所是目前市面上交易量最大的比特幣期貨交易所,交易量和交易深度非常大。
安裝依賴包
我們這邊用的是Python 3.6版本,同時我們還需要利用CCXT框架獲取Bitmex交易所數據,CCXT是一個JavaScript / Python / PHP 開發庫,用于數字貨幣的交易,支持眾多的比特幣/以太幣/山寨幣交易市場和交易所API。
CCXT庫用于連接數字貨幣交易所并在世界范圍內進行交易和支付處理。使用 ccxt可以快速訪問數字貨幣市場數據,可以用于存儲、分析、可視化、指標開發、 量化交易、策略回溯測試、交易機器人程序以及相關的軟件工程。
然后我們將使用python-telegram-bot與Telegram進行通訊,對聊天消息做出反應并進行交易。
只需要用下面方法安裝以上兩個依賴包:
pip install python-telegram-bot ccxt
我們需要交易機器人實現的基本類功能:
1、獲取交易所概況,允許創建訂單,列出訂單詳情并獲取余額。這將是以 ccxt 實現的裝飾器。
2、交易執行者,因為我們希望自動執行做空和做多交易。
3、即時響應的telegram 機器人。
編寫機器人
項目結構如下:
main.py \config \core \model \util
我們將從一個簡單的模型開始。因為多空交易兩者有很多共同點,可以在\ model中創建一個基類TradeDetails:
import abc class TradeDetails(metaclass=abc.ABCMeta): def __init__(self, start_price: float, symbol: str, amount: float, currency: str = "USD"): self.start_price = start_price self.symbol = symbol.upper() self.amount = amount self.currency = currency @property def exchange_symbol(self): return f"{self.symbol.upper()}/{self.currency}" @property @abc.abstractmethod def exit_price(self): pass def __str__(self) -> str: return f"order for {self.amount} {self.exchange_symbol} with enter price: {self.start_price:.5}, " \ f"exit_price: {self.exit_price:.5}"
具體的為:
LongTrade
from fasttrade.model.trade import TradeDetails class LongTrade(TradeDetails): def __init__(self, start_price: float, symbol: str, amount: float, percent_change: float = 0.5, currency: str = "USD") -> None: super().__init__(start_price, symbol, amount, currency) self.end_price = start_price * (1 + percent_change / 100) @property def exit_price(self): return self.end_price def __str__(self) -> str: return "Long " + super().__str__()
ShortTrade
from fasttrade.model.trade import TradeDetails class ShortTrade(TradeDetails): def __init__(self, start_price: float, symbol: str, amount: float, percent_change: float = 0.5, currency: str = "USD") -> None: super().__init__(start_price, symbol, amount, currency) self.end_price = start_price * (1 - percent_change / 100) @property def exit_price(self): return self.end_price def __str__(self) -> str: return "Short " + super().__str__()
接下來是獲取交易所數據:
from ccxt import Exchange, OrderNotFound class CryptoExchange: def __init__(self, exchange: Exchange): self.exchange = exchange self.exchange.load_markets() @property def free_balance(self): balance = self.exchange.fetch_free_balance() # surprisingly there are balances with 0, so we need to filter these out return {k: v for k, v in balance.items() if v > 0} def fetch_open_orders(self, symbol: str = None): return self.exchange.fetch_open_orders(symbolsymbol=symbol) def fetch_order(self, order_id: int): return self.exchange.fetch_order(order_id) def cancel_order(self, order_id: int): try: self.exchange.cancel_order(order_id) except OrderNotFound: # treat as success pass def create_sell_order(self, symbol: str, amount: float, price: float): return self.exchange.create_order(symbolsymbol=symbol, type="limit", side="sell", amountamount=amount, priceprice=price) def create_buy_order(self, symbol: str, amount: float, price: float): return self.exchange.create_order(symbolsymbol=symbol, type="limit", side="buy", amountamount=amount, priceprice=price)
然后,我們將執行交易程序。程序將接受交易所數據和超時情況以檢查訂單是否完成。當做空時,我們以設定的價格賣出,當價格下降到一定水平時回購。我們使用asyncio協程進行編碼,以使等待不會阻塞:
import asyncio import logging from ccxt import ExchangeError from model.longtrade import LongTrade from model.shorttrade import ShortTrade class TradeExecutor: def __init__(self, exchange, check_timeout: int = 15): self.check_timeout = check_timeout self.exchange = exchange async def execute_trade(self, trade): if isinstance(trade, ShortTrade): await self.execute_short_trade(trade) elif isinstance(trade, LongTrade): await self.execute_long_trade(trade) async def execute_short_trade(self, trade: ShortTrade): sell_price = trade.start_price buy_price = trade.exit_price symbol = trade.exchange_symbol amount = trade.amount order = self.exchange.create_sell_order(symbol, amount, sell_price) logging.info(f'Opened sell order: {amount} of {symbol}. Target sell {sell_price}, buy price {buy_price}') await self._wait_order_complete(order['id']) # post buy order order = self.exchange.create_buy_order(symbol, amount, buy_price) await self._wait_order_complete(order['id']) logging.info(f'Completed short trade: {amount} of {symbol}. Sold at {sell_price} and bought at {buy_price}') async def execute_long_trade(self, trade: LongTrade): buy_price = trade.start_price sell_price = trade.exit_price symbol = trade.exchange_symbol amount = trade.amount order = self.exchange.create_buy_order(symbol, amount, buy_price) logging.info(f'Opened long trade: {amount} of {symbol}. Target buy {buy_price}, sell price {sell_price}') await self._wait_order_complete(order.id) # post sell order order = self.exchange.create_sell_order(symbol, amount, sell_price) await self._wait_order_complete(order.id) logging.info(f'Completed long trade: {amount} of {symbol}. Bought at {buy_price} and sold at {sell_price}') async def _wait_order_complete(self, order_id): status = 'open' while status is 'open': await asyncio.sleep(self.check_timeout) order = self.exchange.fetch_order(order_id) status = order['status'] logging.info(f'Finished order {order_id} with {status} status') # do not proceed further if we canceled order if status == 'canceled': raise ExchangeError('Trade has been canceled')
ccxt使用REST API進行數據傳輸。它不如某些交易所支持的WebSockets快,但是對于這個簡單的機器人來說,速度或許差別。
async def _wait_order_complete(self, order_id): status = 'open' order = None while status is 'open': await asyncio.sleep(self.check_timeout) order = self.exchange.fetch_order(order_id) status = order['status'] logging.info(f'Finished order {order_id} with {status} status') # do not proceed further if we canceled order if status == 'canceled': raise ExchangeError('Trade has been canceled') return order
接下來將創建Telegram機器人,這是最有難度的部分,我們將使其擁有以下指令:
1、列出/取消有效訂單
2、顯示可用余額
3、建立做多或做空交易
我們還需要對機器人做一些安全限制,使其僅對你的消息做出響應,而其他人則無法使用你的帳戶進行交易。
主要是進行做多和做空交易的部分:
1、選擇做空或者做多
2、輸入數字貨幣品種
3、輸入交易數量
4、所占百分比
5、每個價格
6、顯示確認信息
7、顯示最終交易信息
我們來創建telegrambot.py并添加以下常量:
SELECTION = "selection" SHORT_TRADE = "short_trade" LONG_TRADE = "long_trade" OPEN_ORDERS = "open_orders" FREE_BALANCE = "free_balance" CANCEL_ORD = "cancel_order" PROCESS_ORD_CANCEL = "process_ord_cancel" COIN_NAME = "coin_select" PERCENT_CHANGE = "percent_select" AMOUNT = "amount" PRICE = "price" PROCESS_TRADE = "process_trade" CONFIRM = "confirm" CANCEL = "cancel" END_CONVERSATION = ConversationHandler.END
我們可以通過擴展BaseFilter來實現對user_id的限制。這樣機器人必須接受被允許用戶的token、id才能執行操作。
class TelegramBot: class PrivateUserFiler(BaseFilter): def __init__(self, user_id): self.user_id = int(user_id) def filter(self, message): return message.from_user.id == self.user_id def __init__(self, token: str, allowed_user_id, trade_executor: TradeExecutor): self.updater = Updater(tokentoken=token) selfself.dispatcher = self.updater.dispatcher self.trade_executor = trade_executor selfself.exchange = self.trade_executor.exchange selfself.private_filter = self.PrivateUserFiler(allowed_user_id) self._prepare()
在_prepare()函數中,我們將創建所有處理函數并將其附加到調度程序。我們開始與機器人聊天時希望顯示的基本選項:
def _prepare(self): # Create our handlers def show_help(bot, update): update.effective_message.reply_text('Type /trade to show options ') def show_options(bot, update): button_list = [ [InlineKeyboardButton("Short trade", callback_data=SHORT_TRADE), InlineKeyboardButton("Long trade", callback_data=LONG_TRADE), ], [InlineKeyboardButton("Open orders", callback_data=OPEN_ORDERS), InlineKeyboardButton("Available balance", callback_data=FREE_BALANCE)], ] update.message.reply_text("Trade options:", reply_markup=InlineKeyboardMarkup(button_list)) return TRADE_SELECT
InlineKeyboardButton允許我們將文本選項顯示為鍵盤。這比鍵入所有命令更為直觀。callback_data允許在按下按鈕時傳遞其他數據。show_options返回下一個繼續進行對話的處理函數的名稱。其他處理函數將使用類似的方法。然后我們執行用戶選擇的處理程序。在這里,我們主要從一個問題轉到另一個問題:
def process_trade_selection(bot, update, user_data): query = update.callback_query selection = query.data if selection == OPEN_ORDERS: orders = self.exchange.fetch_open_orders() if len(orders) == 0: bot.edit_message_text(text="You don't have open orders", chat_id=query.message.chat_id, message_id=query.message.message_id) return END_CONVERSATION # show the option to cancel active orders keyboard = [ [InlineKeyboardButton("Ok", callback_data=CONFIRM), InlineKeyboardButton("Cancel order", callback_data=CANCEL)] ] bot.edit_message_text(text=formatter.format_open_orders(orders), chat_id=query.message.chat_id, message_id=query.message.message_id, reply_markup=InlineKeyboardMarkup(keyboard)) # attach opened orders, so that we can cancel by index user_data[OPEN_ORDERS] = orders return CANCEL_ORD elif selection == FREE_BALANCE: balance = self.exchange.free_balance msg = "You don't have any available balance" if len(balance) == 0 \ else f"Your available balance:\n{formatter.format_balance(balance)}" bot.edit_message_text(text=msg, chat_id=query.message.chat_id, message_id=query.message.message_id) return END_CONVERSATION user_data[TRADE_SELECT] = selection bot.edit_message_text(text=f'Enter coin name for {selection}', chat_id=query.message.chat_id, message_id=query.message.message_id) return COIN_NAME def cancel_order(bot, update): query = update.callback_query if query.data == CANCEL: query.message.reply_text('Enter order index to cancel: ') return PROCESS_ORD_CANCEL show_help(bot, update) return END_CONVERSATION def process_order_cancel(bot, update, user_data): idx = int(update.message.text) order = user_data[OPEN_ORDERS][idx] self.exchange.cancel_order(order['id']) update.message.reply_text(f'Canceled order: {formatter.format_order(order)}') return END_CONVERSATION def process_coin_name(bot, update, user_data): user_data[COIN_NAME] = update.message.text.upper() update.message.reply_text(f'What amount of {user_data[COIN_NAME]}') return AMOUNT def process_amount(bot, update, user_data): user_data[AMOUNT] = float(update.message.text) update.message.reply_text(f'What % change for {user_data[AMOUNT]} {user_data[COIN_NAME]}') return PERCENT_CHANGE def process_percent(bot, update, user_data): user_data[PERCENT_CHANGE] = float(update.message.text) update.message.reply_text(f'What price for 1 unit of {user_data[COIN_NAME]}') return PRICE def process_price(bot, update, user_data): user_data[PRICE] = float(update.message.text) keyboard = [ [InlineKeyboardButton("Confirm", callback_data=CONFIRM), InlineKeyboardButton("Cancel", callback_data=CANCEL)] ] update.message.reply_text(f"Confirm the trade: '{TelegramBot.build_trade(user_data)}'", reply_markup=InlineKeyboardMarkup(keyboard)) return PROCESS_TRADE
最后,我們構建會話處理程序,設置錯誤處理程序,并將所有處理程序添加到調度程序中。
def process_trade(bot, update, user_data): query = update.callback_query if query.data == CONFIRM: trade = TelegramBot.build_trade(user_data) self._execute_trade(trade) update.callback_query.message.reply_text(f'Scheduled: {trade}') else: show_help(bot, update) return END_CONVERSATION def handle_error(bot, update, error): logging.warning('Update "%s" caused error "%s"', update, error) update.message.reply_text(f'Unexpected error:\n{error}') # configure our handlers def build_conversation_handler(): entry_handler = CommandHandler('trade', filters=self.private_filter, callback=show_options) conversation_handler = ConversationHandler( entry_points=[entry_handler], fallbacks=[entry_handler], states={ TRADE_SELECT: [CallbackQueryHandler(process_trade_selection, pass_user_data=True)], CANCEL_ORD: [CallbackQueryHandler(cancel_order)], PROCESS_ORD_CANCEL: [MessageHandler(filters=Filters.text, callback=process_order_cancel, pass_user_data=True)], COIN_NAME: [MessageHandler(filters=Filters.text, callback=process_coin_name, pass_user_data=True)], AMOUNT: [MessageHandler(Filters.text, callback=process_amount, pass_user_data=True)], PERCENT_CHANGE: [MessageHandler(Filters.text, callback=process_percent, pass_user_data=True)], PRICE: [MessageHandler(Filters.text, callback=process_price, pass_user_data=True)], PROCESS_TRADE: [CallbackQueryHandler(process_trade, pass_user_data=True)], }, ) return conversation_handler self.dispatcher.add_handler(CommandHandler('start', filters=self.private_filter, callback=show_help)) self.dispatcher.add_handler(build_conversation_handler()) self.dispatcher.add_error_handler(handle_error)
傳遞用戶數據時允許我們向處理程序提供其他user_data參數。這樣可以確保機器人從一個處理程序傳遞到另一個處理程序時,保持所有答復的對話狀態。我們需要run_async裝飾器在后臺執行交易,而又不會阻止機器人對新消息進行響應:
def start_bot(self): self.updater.start_polling() @run_async def _execute_trade(self, trade): loop = asyncio.new_event_loop() task = loop.create_task(self.trade_executor.execute_trade(trade)) loop.run_until_complete(task) @staticmethod def build_trade(user_data): current_trade = user_data[TRADE_SELECT] price = user_data[PRICE] coin_name = user_data[COIN_NAME] amount = user_data[AMOUNT] percent_change = user_data[PERCENT_CHANGE] if current_trade == LONG_TRADE: return LongTrade(price, coin_name, amount, percent_change) elif current_trade == SHORT_TRADE: return ShortTrade(price, coin_name, amount, percent_change) else: raise NotImplementedError
這是用于訂單和余額顯示的格式化程序:
TITLES = ['idx', 'type', 'remaining', 'symbol', 'price'] SPACING = [4, 6, 8, 10, 8] def format_open_orders(orders) -> str: def join_line(ln): return ' | '.join(str(item).center(SPACING[i]) for i, item in enumerate(ln)) title_line = join_line(TITLES) lines = [title_line] for idx, order in enumerate(orders): line = [idx, order['side'], order['remaining'], order['symbol'], order['price']] lines.append(join_line(line)) separator_line = '-' * len(title_line) return f"\n{separator_line}\n".join(lines) def format_order(order): return f"{order['amount']} {order['symbol']} priced at {order['price']}" def format_balance(balance) -> str: coin_balance_as_list = list(f"{coin}: {val}" for coin, val in balance.items()) return "\n".join(coin_balance_as_list)
最后,我們創建main.py并將所有內容歸結在一起:
import logging import os import ccxt from core.exchange import CryptoExchange from core.telegrambot import TelegramBot from core.tradeexcutor import TradeExecutor if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) c_dir = os.path.dirname(__file__) with open(os.path.join(c_dir, "config/secrets.txt")) as key_file: api_key, secret, telegram_tkn, user_id = key_file.read().splitlines() ccxtccxt_ex = ccxt.bitfinex() ccxt_ex.apiKey = api_key ccxt_ex.secret = secret exchange = CryptoExchange(ccxt_ex) trade_executor = TradeExecutor(exchange) telegram_bot = TelegramBot(telegram_tkn, user_id, trade_executor) telegram_bot.start_bot()
我們從secrets.txt文件中獲取交易所密鑰,telegram的token和用戶ID,構造核心類并啟動機器人。使用以下內容在config文件夾中創建secrets.txt:
# YOUR_API_KEY # YOUR_SECRET # YOUR_TELEGRAM_TOKEN # YOUR_TELEGRAM_USER_ID
總結
對于想要簡化交易并擁有更好使用體驗的人來說,該機器人更像是一個輔助工具。它不是最先進的算法交易機器人。后面可以進行以下改進:
當進行做空交易時獲取可用余額并顯示用戶可以根據余額做空的最大值
要求在交易所執行之前驗證創建的訂單
添加TA指標、信號以通知最佳交易時間
止盈/止損操作和其他統計數據
有策略地根據超時等原因取消訂單
看完上述內容,你們對如何用Python開發數字貨幣交易機器人有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。