API
Documentation for CryptoMarketData.
Types
AbstractExchange
Every exchange is a subtype of AbstractExchange.
AbstractCandle
Every exchange also has a matching candle type that's a subtype of AbstractCandle. Its purpose is to capture the data given to us by the exchange.
Session
This contains the data needed for a persistent connection to an exchange's WebSocket API. Through this, one can start and stop a WebSocket connection. The connection will also be supervised and automatically restarted if prematurely disconnected.
@kwdef mutable struct Session
exchange::AbstractExchange
market::AbstractString
candles::Vector{<: AbstractCandle}
new_candle::Observable
last_candle::Any # Union{Missing, DataType, AbstractCandle}
ws::Union{Missing, HTTP.WebSockets.WebSocket}
supervisor::Union{Missing, Visor.Supervisor}
supervisor_ws::Union{Missing, Visor.Supervisor}
endFunctions
General Functions
CryptoMarketData.get_saved_markets — Function
get_saved_markets(; datadir) -> DataFrames.DataFrame
Return a DataFrame that lists the currently saved markets.
Keyword Arguments
- datadir="./data" - directory where saved data is stored
Example
julia> saved = get_saved_markets()
10×4 DataFrame
Row │ exchange market start stop
│ Any Any Any Any
─────┼───────────────────────────────────────────────────────
1 │ binance BTCUSD_240628 2023-12-29 2024-02-17
2 │ binance BTCUSD_PERP 2020-08-11 2020-08-16
3 │ bitget BTCUSD_DMCBL 2019-04-23 2024-02-16
4 │ bitget DOGEUSD_DMCBL 2024-02-01 2024-02-20
5 │ bitmex ETHUSD 2018-08-02 2024-02-19
6 │ bitstamp BTCUSD 2011-08-18 2024-02-25
7 │ bybit ADAUSD 2022-03-24 2022-04-21
8 │ bybit-inverse ADAUSD 2022-03-24 2022-04-20
9 │ bybit-linear 10000LADYSUSDT 2023-05-11 2024-03-04
10 │ pancakeswap BTCUSD 2023-03-15 2024-03-04Generalized on Exchange
CryptoMarketData.save! — Function
save!(exchange, market; datadir, startday, endday, delay)
Download 1m candles from the given exchange and market, and save them locally.
Keyword Arguments
- datadir="./data" - directory where saved data is stored
- startday - a
Dateto start fetching candles from - endday - a
Dateto stop fetching candles - delay - a delay to be passed to
sleep()that will pause between internal calls tosave_day!()
Example
julia> bitstamp = Bitstamp()
julia> save!(bitstamp, "BTC/USD", endday=Date("2020-08-16"))To monitor its progress on long downloads, set the JULIA_DEBUG environment variable. This will cause debug log messages to be emitted before each day of candles is downloaded.
julia> ENV["JULIA_DEBUG"] = "CryptoMarketData"CryptoMarketData.load — Function
load(
exchange::AbstractExchange,
market;
datadir,
span,
tf,
table,
remote
) -> Union{Missing, DataFrames.DataFrame}
Load candles for the given exchange and market from the file system.
Keyword Arguments
- datadir="./data" - directory where saved data is stored
- span - a
Datespan that defines what Dates to load candles. If it'smissing, load everything - tf - a
Periodthat is used to aggregate 1m candles into higher timeframes - table - a Tables.jl-compatible struct to load candles into. The default is
DataFrame - remote - if true, fill in gaps in local storage by fetching from remote sources
Example
julia> bitstamp = Bitstamp()
julia> btcusd4h = load(bitstamp, "BTC/USD"; span=Date("2024-01-01"):Date("2024-02-10"), tf=Hour(4))CryptoMarketData.load_remote — Function
load_remote(
exchange::AbstractExchange,
market;
span,
tf,
delay,
table
) -> Any
Load candles using the exchange's API.
CryptoMarketData.earliest_candle — Function
earliest_candle(exchange, market)Return the earliest candle for the given market in the 1m timeframe.
CryptoMarketData.get_candles_for_day — Function
get_candles_for_day(
exchange::AbstractExchange,
market,
day::Dates.Date
) -> Vector
Fetch all of the 1m candles for the given exchange, market, and day. The vector and candles returned is just the right size to save to the archives.
CryptoMarketData.save_day! — Function
save_day!(exchange, market, candles; datadir="./data")Save a day worth of 1m candles the caller provides for the given exchange and market.
Keyword Arguments
- datadir="./data" - directory where saved data is stored
WebSocket Functions
The following 3 functions are the main ways one interacts with WebSockets through this system.
CryptoMarketData.start — Function
start(
exchange::AbstractExchange,
market::AbstractString;
wait
) -> Session
This initiates a websocket connection with an exchange, and subscribes to the given market. It also creates a supervision tree using Visor.jl that tries very hard to keep websocket connections alive.
Example
julia> bitstamp = Bitstamp();
julia> ses = start(bitstamp, "BTCUSD");CryptoMarketData.stop — Function
stop(session::Session)
Close the session's websocket and shut down all of its supervised processes.
Example
julia> stop(ses)CryptoMarketData.stream — Function
stream(
session
) -> Tuple{Channel, Task, Observables.Observable{Observables.Observable}}
stream(
session,
from::Dates.Date
) -> Tuple{Channel, Task, Observables.Observable{Observables.Observable}}
Stream finished 1 minute candles for a market from an exchange. A finished candle is one that will no longer change, because the minute it represents is in the past. Candles from the past as specified by the from parameter are also published to the channel before switching to websockets.
Notes
- The returned channel is what will be interacted with the most.
- The task is the async process that loads the initial candles into the channel and switches to publishing from websockets afterward.
- The observer is what waits for a candle to finish before publishing.
- The task and observer are returned for the sake of completeness, but they're rarely interacted with directly.
Example
julia> bitstamp = Bitstamp();
julia> ses = start(bitstamp, "BTCUSD");
julia> (ch, task, observer) = stream(ses, today() - Day(2));The following functions support the above, but a user of this library probably never needs to call them. Nevertheless, they're documented here for completeness.
CryptoMarketData.observe — Function
observe(
session::Session,
ch::Channel
) -> Tuple{Channel, Observables.ObserverFunction}
Observe the current session for new candles and publish them to ch.
Example
julia> (ch, observer) = observe(session, preload(session, Date("2026-05-05")))CryptoMarketData.feed — Function
feed(
session::Session,
ch::Channel,
from::Dates.Date,
live::Observables.Observable
) -> Observables.ObserverFunction
Feed a channel with candles. Once the candles are up to date with the present, start observing session.new_candles, and feed those into the given channel as they come.
CryptoMarketData.ws_process — Function
ws_process(td::Visor.Process, s::Session)
This Visor.Process is responsible for connecting to an exchange's webscoket and initiating a subscription to whatever data the session wants.
CryptoMarketData.accumulator_process — Function
accumulator_process(td::Visor.Process, s::Session)
This Visor.Process's job is to accumulate price data into finished candles and send them to the command_process.
CryptoMarketData.command_process — Function
command_process(td::Visor.Process, s::Session)
This Visor.Process is the main command loop that users of the session will interact with. It also receives messages from candle_process whenever it completes a candle.
Exchange Specific Implementations
csv_headers- csv_select
- ts2datetime_fn
- short_name
- candles_max
get_markets- get_candles
update!ws_handle_messagews_subscribe_commandsws_uriBase.merge
CryptoMarketData.csv_headers — Function
csv_headers(exchange::AbstractExchange) -> Vector{Symbol}Return headings for each column of candle data.
Example
julia> bitstamp = Bitstamp()
julia> columns = csv_headers(bitstamp)
6-element Vector{Symbol}:
:ts
:o
:h
:l
:c
:vCryptoMarketData.get_markets — Function
get_markets(exchange) -> Vector{String}Fetch the available markets for the given exchange.
Example
julia> bitstamp = Bitstamp()
julia> markets = get_markets(bitstamp)CryptoMarketData.update! — Function
update!(
candles::AbstractVector{<:AbstractCandle},
candle::AbstractCandle
) -> Symbol
Destructively update a vector of candles with new candle data. One of 3 things can happen as a result of calling this function.
- Return
:firstifcandleswas previously empty. - Return
:updatedif the timestamps are the same and update the last candle. - Return
:newif the timestamps are different and push the new candle.
CryptoMarketData.ws_handle_message — Function
ws_handle_message(
bitstamp::Bitstamp,
s::Session,
msg::AbstractString
) -> Any
While inside accumulator_process, handle a websocket message from the exchange. The main job of this function is to send candles to commander_process. Secondary jobs may include acknowledging successful subscriptions or handling reconnect requests.
CryptoMarketData.ws_subscribe_commands — Function
ws_subscribe_commands(
bitstamp::Bitstamp,
market::AbstractString
) -> Vector
Return a vector of JSON commands to send to an exchange's WebSocket API to subscribe to market data.
CryptoMarketData.ws_uri — Function
ws_uri(bitstamp::Bitstamp) -> URIs.URI
Return the websocket URI for the given exchange.
Base.merge — Function
Base.merge(a::C, b::C) where {C <: CryptoMarketData.AbstractCandle} -> CEvery concrete candle type should implement a Base.merge method that will take the current candle a and a newer candle b with the same timestamp, and perform a merge such that high, low, and volume are updated as necessary. It should return a new candle with the merged data.
(This will be used by code that consumes unfinished candle data from WebSockets.)
Other
Base.convert — Function
convert(
_::Type{NamedTuple},
c::AbstractCandle
) -> NamedTuple
Take any AbstractCandle c and return a NamedTuple that can be pushed into a DataFrame.
Select
This was originally NHDaly/Select.jl, but because it wasn't easily installable, I absorbed it into this project. If a better way to multiplex multiple streams comes along, this may go away, but I'm going to use it until then.
CryptoMarketData.Select.select — Function
select(clauses[, block=true]) -> (clause_index, clause_value)
Functional form of the @select macro, intended to be used when the set of clauses is dynamic. In general, this method will be less performant than the macro variant.
Clauses are specified as an array of tuples. Each tuple is expected to have 2 or 3 elements, as follows:
- The clause type (
:takeor:put) - The waitable object
- If the clause type is
:put, the value to insert into the object.
If block is true (the default), wait for at least one clause to be satisfied and return a tuple whose first elmement is the index of the clause which unblocked first and whose whose second element is the value of the clause (see the manual on select for the meaning of clause value).
Otherwise, an arbitrary available clause will be executed, or a return value of (0, nothing) will be returned immediately if no clause is available.
CryptoMarketData.Select.@select — Macro
@select A select expression of the form:
@select begin
clause1 => body1
clause2 => body2
_ => default_body
end
endWait for multiple clauses simultaneously using a pattern matching syntax, taking a different action depending on which clause is available first. A clause has three possible forms:
event |> value
If event is an AbstractChannel, wait for a value to become available in the channel and assign take!(event) to value. if event is a Task, wait for the task to complete and assign value the return value of the task.
event |< value
Only suppored for AbstractChannels. Wait for the channel to capabity to store an element, and then call put!(event, value).
event
Calls wait on event, discarding the return value. Usable on any "waitable" events", which include channels, tasks, Condition objects, and processes.
If a default branch is provided, @select will check arbitrary choose any event which is ready and execute its body, or will execute default_body if none of them are.
Otherise, @select blocks until at least one event is ready.
For example,
channel1 = Channel()
channel2 = Channel()
task = @task ...
result = @select begin
channel1 |> value => begin
info("Took from channel1")
value
end
channel2 <| :test => info("Put :test into channel2")
task => info("task finished")
end