Spot Trading Bot Templates¶
The templates presented below serve as starting points for the development of a trading algorithms for Spot trading on the crypto asset exchange Kraken using the python-kraken-sdk.
The trading strategy can be implemented using the TradingBot class. This
class has access to all REST clients and receives all messages that are sent by
the subscribed websocket feeds via the on_message function.
Template to build a trading bot using the Kraken Spot Websocket API v2¶
1# !/usr/bin/env python3
2# -*- mode: python; coding: utf-8 -*-
3#
4# Copyright (C) 2023 Benjamin Thomas Schwertfeger
5# All rights reserved.
6# https://github.com/btschwertfeger
7#
8
9"""
10Module that provides a template to build a Spot trading algorithm using the
11python-kraken-sdk and Kraken Spot websocket API v2.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import logging
18import os
19import sys
20import traceback
21
22import requests
23import urllib3
24
25from kraken.exceptions import KrakenAuthenticationError # , KrakenPermissionDeniedError
26from kraken.spot import Funding, Market, SpotWSClient, Trade, User
27
28logging.basicConfig(
29 format="%(asctime)s %(module)s,line: %(lineno)d %(levelname)8s | %(message)s",
30 datefmt="%Y/%m/%d %H:%M:%S",
31 level=logging.INFO,
32)
33logging.getLogger("requests").setLevel(logging.WARNING)
34logging.getLogger("urllib3").setLevel(logging.WARNING)
35
36LOG: logging.Logger = logging.getLogger(__name__)
37
38
39class TradingBot(SpotWSClient):
40 """
41 Class that implements the trading strategy
42
43 * The on_message function gets all messages sent by the websocket feeds.
44 * Decisions can be made based on these messages
45 * Can place trades using the self.__trade client or self.send_message
46 * Do everything you want
47
48 ====== P A R A M E T E R S ======
49 config: dict
50 configuration like: {
51 "key": "kraken-spot-key",
52 "secret": "kraken-spot-secret",
53 "pairs": ["DOT/USD", "BTC/USD"],
54 }
55 """
56
57 def __init__(
58 self: TradingBot,
59 config: dict,
60 **kwargs: object | dict | set | tuple | list | str | float | None,
61 ) -> None:
62 super().__init__(
63 key=config["key"],
64 secret=config["secret"],
65 **kwargs,
66 )
67 self.__config: dict = config
68
69 self.__user: User = User(key=config["key"], secret=config["secret"])
70 self.__trade: Trade = Trade(key=config["key"], secret=config["secret"])
71 self.__market: Market = Market(key=config["key"], secret=config["secret"])
72 self.__funding: Funding = Funding(key=config["key"], secret=config["secret"])
73
74 async def on_message(self: TradingBot, message: dict) -> None:
75 """Receives all messages of the websocket connection(s)"""
76 if message.get("method") == "pong" or message.get("channel") == "heartbeat":
77 return
78 if "error" in message:
79 # handle exceptions/errors sent by websocket connection …
80 pass
81
82 LOG.info(message)
83
84 # == apply your trading strategy here ==
85
86 # Call functions of `self.__trade` and other clients if conditions met …
87 # try:
88 # print(self.__trade.create_order(
89 # ordertype='limit',
90 # side='buy',
91 # volume=2,
92 # pair='XBTUSD',
93 # price=12000
94 # ))
95 # except KrakenPermissionDeniedError:
96 # # … handle exceptions
97 # pass
98
99 # The spot websocket client also allow sending orders via websockets
100 # this is way faster than using REST endpoints.
101 # await self.send_message(
102 # message={
103 # "method": "add_order",
104 # "params": {
105 # "limit_price": 1234.56,
106 # "order_type": "limit",
107 # "order_userref": 123456789,
108 # "order_qty": 1.0,
109 # "side": "buy",
110 # "symbol": "BTC/USD",
111 # "validate": True,
112 # },
113 # }
114 # )
115
116 # You can also un-/subscribe here using `self.subscribe(...)` or
117 # `self.unsubscribe(...)`.
118 #
119 # … more can be found in the documentation
120 # (https://python-kraken-sdk.readthedocs.io/en/stable/).
121
122 # Add more functions to customize the trading strategy …
123
124 def save_exit(self: TradingBot, reason: str | None = "") -> None:
125 """controlled shutdown of the strategy"""
126 LOG.warning("Save exit triggered, reason: %s", reason)
127 # Some ideas:
128 # * Save the current data
129 # * Close trades
130 # * Enable dead man's switch
131 sys.exit(1)
132
133
134class Manager:
135 """
136 Class to manage the trading strategy
137
138 … subscribes to desired feeds, instantiates the strategy and runs as long
139 as there is no error.
140
141 ====== P A R A M E T E R S ======
142 config: dict
143 configuration like: {
144 "key" "kraken-spot-key",
145 "secret": "kraken-secret-key",
146 "pairs": ["DOT/USD", "BTC/USD"],
147 }
148 """
149
150 def __init__(self: Manager, config: dict) -> None:
151 self.__config: dict = config
152 self.__trading_strategy: TradingBot | None = None
153
154 def run(self: Manager) -> None:
155 """Starts the event loop and bot"""
156 if not self.__check_credentials():
157 sys.exit(1)
158
159 try:
160 asyncio.run(self.__main())
161 except KeyboardInterrupt:
162 self.save_exit(reason="KeyboardInterrupt")
163 else:
164 self.save_exit(reason="Asyncio loop left")
165
166 async def __main(self: Manager) -> None:
167 """
168 Instantiates the trading strategy and subscribes to the desired
169 websocket feeds. While no exception within the strategy occur run the
170 loop.
171
172 The variable `exception_occur` which is an attribute of the SpotWSClient
173 can be set individually but is also being set to `True` if the websocket
174 connection has some fatal error. This is used to exit the asyncio loop -
175 but you can also apply your own reconnect rules.
176 """
177 try:
178 self.__trading_strategy = TradingBot(config=self.__config)
179 await self.__trading_strategy.start()
180
181 await self.__trading_strategy.subscribe(
182 params={"channel": "ticker", "symbol": self.__config["pairs"]},
183 )
184 await self.__trading_strategy.subscribe(
185 params={
186 "channel": "ohlc",
187 "interval": 15,
188 "symbol": self.__config["pairs"],
189 },
190 )
191
192 await self.__trading_strategy.subscribe(params={"channel": "executions"})
193
194 while not self.__trading_strategy.exception_occur:
195 # Check if the algorithm feels good
196 # Send a status update every day via Telegram or Mail
197 # …
198 await asyncio.sleep(6)
199
200 except Exception as exc:
201 LOG.error(message := f"Exception in main: {exc} {traceback.format_exc()}")
202 self.__trading_strategy.save_exit(reason=message)
203 finally:
204 # Close the sessions properly.
205 await self.__trading_strategy.close()
206
207 def __check_credentials(self: Manager) -> bool:
208 """Checks the user credentials and the connection to Kraken"""
209 try:
210 User(self.__config["key"], self.__config["secret"]).get_account_balance()
211 LOG.info("Client credentials are valid.")
212 return True
213 except urllib3.exceptions.MaxRetryError:
214 LOG.error("MaxRetryError, can't connect.")
215 return False
216 except requests.exceptions.ConnectionError:
217 LOG.error("ConnectionError, Kraken not available.")
218 return False
219 except KrakenAuthenticationError:
220 LOG.error("Invalid credentials!")
221 return False
222
223 def save_exit(self: Manager, reason: str = "") -> None:
224 """Invoke the save exit function of the trading strategy"""
225 print(f"Save exit triggered - {reason}")
226 if self.__trading_strategy is not None:
227 self.__trading_strategy.save_exit(reason=reason)
228 else:
229 sys.exit(1)
230
231
232def main() -> None:
233 """Example main - load environment variables and run the strategy."""
234 manager: Manager = Manager(
235 config={
236 "key": os.getenv("SPOT_API_KEY"),
237 "secret": os.getenv("SPOT_SECRET_KEY"),
238 "pairs": ["DOT/USD", "BTC/USD"],
239 },
240 )
241
242 try:
243 manager.run()
244 except Exception:
245 manager.save_exit(
246 reason=f"manageBot.run() has ended: {traceback.format_exc()}",
247 )
248
249
250if __name__ == "__main__":
251 main()