Futures Websocket

The examples presented below serve to demonstrate the usage of the Futures websocket clients provided by python-kraken-sdk to access Kraken’s Websocket API.

For questions, feedback, additions, suggestions for improvement or problems python-kraken-sdk/discussions or python-kraken-sdk/issues may be helpful.

Example access and usage for Kraken Futures Websocket API
  1# -*- mode: python; coding: utf-8 -*-
  2# !/usr/bin/env python3
  3#
  4# Copyright (C) 2023 Benjamin Thomas Schwertfeger
  5# All rights reserved.
  6# https://github.com/btschwertfeger
  7#
  8
  9"""
 10Module that provides an example usage for the Kraken Futures websocket client.
 11"""
 12
 13from __future__ import annotations
 14
 15import asyncio
 16import logging
 17import logging.config
 18import os
 19import time
 20
 21from kraken.futures import FuturesWSClient
 22
 23logging.basicConfig(
 24    format="%(asctime)s %(module)s,line: %(lineno)d %(levelname)8s | %(message)s",
 25    datefmt="%Y/%m/%d %H:%M:%S",
 26    level=logging.INFO,
 27)
 28logging.getLogger("requests").setLevel(logging.WARNING)
 29logging.getLogger("urllib3").setLevel(logging.WARNING)
 30LOG: logging.Logger = logging.getLogger(__name__)
 31
 32clients = []
 33
 34
 35# Custom client
 36class Client(FuturesWSClient):
 37    """Can be used to create a custom trading strategy"""
 38
 39    async def on_message(self: Client, message: list | dict) -> None:
 40        """Receives the websocket messages"""
 41        LOG.info(message)
 42        # … apply your trading strategy in this class
 43        # … you can also combine this with the Futures REST clients
 44
 45
 46async def main() -> None:
 47    """Create a client and subscribe to channels/feeds"""
 48
 49    key = os.getenv("FUTURES_API_KEY")
 50    secret = os.getenv("FUTURES_SECRET_KEY")
 51
 52    try:
 53        # _____Public_Websocket_Feeds___________________
 54        client = Client()
 55        clients.append(client)
 56        await client.start()
 57        # print(client.get_available_public_subscription_feeds())
 58
 59        products = ["PI_XBTUSD", "PF_SOLUSD"]
 60        # subscribe to a public websocket feed
 61        await client.subscribe(feed="ticker", products=products)
 62        await client.subscribe(feed="book", products=products)
 63        # await client.subscribe(feed='trade', products=products)
 64        # await client.subscribe(feed='ticker_lite', products=products)
 65        # await client.subscribe(feed='heartbeat')
 66        # time.sleep(2)
 67
 68        # unsubscribe from a websocket feed
 69        time.sleep(2)  # in case subscribe is not done yet
 70        # await client.unsubscribe(feed='ticker', products=['PI_XBTUSD'])
 71        await client.unsubscribe(feed="ticker", products=["PF_XBTUSD"])
 72        await client.unsubscribe(feed="book", products=products)
 73        # ...
 74
 75        # _____Private_Websocket_Feeds_________________
 76        if key and secret:
 77            client_auth = Client(key=key, secret=secret)
 78            clients.append(client_auth)
 79            await client_auth.start()
 80            # print(client_auth.get_available_private_subscription_feeds())
 81
 82            # subscribe to a private/authenticated websocket feed
 83            await client_auth.subscribe(feed="fills")
 84            await client_auth.subscribe(feed="open_positions")
 85            # await client_auth.subscribe(feed='open_orders')
 86            # await client_auth.subscribe(feed='open_orders_verbose')
 87            # await client_auth.subscribe(feed='deposits_withdrawals')
 88            # await client_auth.subscribe(feed='account_balances_and_margins')
 89            # await client_auth.subscribe(feed='balances')
 90            # await client_auth.subscribe(feed='account_log')
 91            # await client_auth.subscribe(feed='notifications_auth')
 92
 93            # authenticated clients can also subscribe to public feeds
 94            # await client_auth.subscribe(feed='ticker', products=['PI_XBTUSD', 'PF_ETHUSD'])
 95
 96            # time.sleep(1)
 97            # unsubscribe from a private/authenticated websocket feed
 98            await client_auth.unsubscribe(feed="fills")
 99            await client_auth.unsubscribe(feed="open_positions")
100            # ...
101
102        while not client.exception_occur:  # and not client_auth.exception_occur:
103            await asyncio.sleep(6)
104    finally:
105        # Close the sessions properly.
106        for _client in clients:
107            await _client.close()
108
109
110if __name__ == "__main__":
111    asyncio.run(main())
112    # the websocket client will send {'event': 'asyncio.CancelledError'} via on_message
113    # so you can handle the behavior/next actions individually within you strategy
114
115# ============================================================
116# Alternative - as ContextManager:
117
118# from kraken.futures import KrakenFuturesWSClient
119# import asyncio
120
121# async def on_message(message):
122#     print(message)
123
124# async def main() -> None:
125#     async with KrakenFuturesWSClient(callback=on_message) as session:
126#         await session.subscribe(feed="ticker", products=["PF_XBTUSD"])
127#     while True:
128#         await asyncio.sleep(6)
129
130# if __name__ == "__main__":
131#     try:
132#         asyncio.run(main())
133#     except KeyboardInterrupt:
134#         pass