Documentation for CryptoMarketData.



Every exchange is a subtype of AbstractExchange.


Every exchange also has a matching candle type that's a subtype of AbstractCandle. Its purpose is to capture the data given to us by the exchange.


General Functions

get_saved_markets(; datadir)

Return a DataFrame that lists the currently saved markets.

Keyword Arguments

  • datadir="./data" - directory where saved data is stored


julia> saved = get_saved_markets()
10×4 DataFrame
 Row │ exchange       market          start       stop
     │ Any            Any             Any         Any
   1 │ binance        BTCUSD_240628   2023-12-29  2024-02-17
   2 │ binance        BTCUSD_PERP     2020-08-11  2020-08-16
   3 │ bitget         BTCUSD_DMCBL    2019-04-23  2024-02-16
   4 │ bitget         DOGEUSD_DMCBL   2024-02-01  2024-02-20
   5 │ bitmex         ETHUSD          2018-08-02  2024-02-19
   6 │ bitstamp       BTCUSD          2011-08-18  2024-02-25
   7 │ bybit          ADAUSD          2022-03-24  2022-04-21
   8 │ bybit-inverse  ADAUSD          2022-03-24  2022-04-20
   9 │ bybit-linear   10000LADYSUSDT  2023-05-11  2024-03-04
  10 │ pancakeswap    BTCUSD          2023-03-15  2024-03-04

Generalized on Exchange

save!(exchange, market; datadir, startday, endday, delay)

Download 1m candles from the given exchange and market, and save them locally.

Keyword Arguments

  • datadir="./data" - directory where saved data is stored
  • startday - a Date to start fetching candles from
  • endday - a Date to stop fetching candles
  • delay - a delay to be passed to sleep() that will pause between internal calls to save_day!()


julia> bitstamp = Bitstamp()
julia> save!(bitstamp, "BTC/USD", endday=Date("2020-08-16"))
load(exchange, market; datadir, span, tf, table)

Load candles for the given exchange and market from the file system.

Keyword Arguments

  • datadir="./data" - directory where saved data is stored
  • span - a Date span that defines what Dates to load candles. If it's missing, load everything.
  • tf - a Period that is used to aggregate 1m candles into higher timeframes.
  • table - a Tables.jl-compatible struct to load candles into. The default is DataFrame.


julia> bitstamp = Bitstamp()
julia> btcusd4h = load(bitstamp, "BTC/USD"; span=Date("2024-01-01"):Date("2024-02-10"), tf=Hour(4))
get_candles_for_day(exchange, market, day::Date)

Fetch all of the 1m candles for the given exchange, market, and day. The vector and candles returned is just the right size to save to the archives.

save_day!(exchange, market, candles; datadir="./data")

Save a day worth of 1m candles the caller provides for the given exchange and market.

Keyword Arguments

  • datadir="./data" - directory where saved data is stored

Exchange Specific Implementations


Fetch the available markets for the given exchange.


julia> bitstamp = Bitstamp()
julia> markets = get_markets(bitstamp)

This is a convenience method that accepts URLs as strings.


This is the general version of websocket subscription that the other exchange-specific versions of subscribe are built on. It connects to the given uri and returns a struct that contains two Channels that can be used to interact with the WebSocket.


julia> using URIs, JSON3

julia> s = subscribe(URI("wss://ws.bitstamp.net"))
CryptoMarketData.Session(URI("wss://ws.bitstamp.net"), missing, missing, missing, Task (runnable) @0x00007970dac63d00)

julia> btcusd_subscribe = Dict(:event => "bts:subscribe", :data => Dict(:channel => "live_trades_btcusd"))
Dict{Symbol, Any} with 2 entries:
  :event => "bts:subscribe"
  :data  => Dict(:channel=>"live_trades_btcusd")

julia> put!(s.commands, JSON3.write(btcusd_subscribe))

julia> s.messages
Channel{Any}(32) (2 items available)

julia> take!(s.messages)

julia> JSON3.read(take!(s.messages))
JSON3.Object{Base.CodeUnits{UInt8, String}, Vector{UInt64}} with 3 entries:
  :data    => {…
  :channel => "live_trades_btcusd"
  :event   => "trade"


This was originally NHDaly/Select.jl, but because it wasn't easily installable, I absorbed it into this project. If a better way to multiplex multiple streams comes along, this may go away, but I'm going to use it until then.


select(clauses[, block=true]) -> (clause_index, clause_value)

Functional form of the @select macro, intended to be used when the set of clauses is dynamic. In general, this method will be less performant than the macro variant.

Clauses are specified as an array of tuples. Each tuple is expected to have 2 or 3 elements, as follows:

  1. The clause type (:take or :put)
  2. The waitable object
  3. If the clause type is :put, the value to insert into the object.

If block is true (the default), wait for at least one clause to be satisfied and return a tuple whose first elmement is the index of the clause which unblocked first and whose whose second element is the value of the clause (see the manual on select for the meaning of clause value).

Otherwise, an arbitrary available clause will be executed, or a return value of (0, nothing) will be returned immediately if no clause is available.


@select A select expression of the form:

@select begin
     clause1 => body1
     clause2 => body2
     _       => default_body

Wait for multiple clauses simultaneously using a pattern matching syntax, taking a different action depending on which clause is available first. A clause has three possible forms:

  1. event |> value

If event is an AbstractChannel, wait for a value to become available in the channel and assign take!(event) to value. if event is a Task, wait for the task to complete and assign value the return value of the task.

  1. event |< value

Only suppored for AbstractChannels. Wait for the channel to capabity to store an element, and then call put!(event, value).

  1. event

Calls wait on event, discarding the return value. Usable on any "waitable" events", which include channels, tasks, Condition objects, and processes.

If a default branch is provided, @select will check arbitrary choose any event which is ready and execute its body, or will execute default_body if none of them are.

Otherise, @select blocks until at least one event is ready.

For example,

channel1 = Channel()
channel2 = Channel()
task = @task ...
result = @select begin
    channel1 |> value => begin
            info("Took from channel1")
    channel2 <| :test => info("Put :test into channel2")
    task              => info("task finished")