Applied Online Learning with Haskell and friends, Part 2

Posted on June 17, 2018 under tag(s) machine learning, functional programming, haskell, python, R, online learning, autoregression, forex

This post is the second in a series on online machine learning techniques. Check out the first post if you haven’t read it yet.

We continue our exposition of the previously introduced online convex programming framework by playing with the OGD algorithm for online convex optimization. I want to use the implementation from the previous one to build a toy predictor for the EUR/USD exchange rate. Here, we’ll explore the data with R and build an auto-regression model in Haskell, and in the next post we’ll run our learner in pure notebook data-science fashion using Python notebooks. The full supporting Haskell implementation for the thee first posts in this series is documented via Haddock here with hyperlinked sources. The posts are packaged via Nix here so you can play with the sources as you like (check out on any Linux or MacOs box.

We’ll use data from this kaggle contribution and build an autoregressive model. Auto-regression (AR) is a non-IID setting. Fortunately, the setup for online convex optimization is non-stochastic. Therefore, we can feed auto-regression data to the fitRidge function from the last post with no second thoughts.

Data exploration

Let’s download the data to disk in CSV format and take a peek using R.

mkdir -p data
master/notebooks/EURUSD.csv > data/EURUSD.csv
master/notebooks/EURUSD_sample.csv > data/EURUSD_sample.csv

The CSV files have headers and contain tick opening, closing and maximum (highs and lows) values for the exchange rate, measured every 15 minutes. We have also access to the trading volume during the 15 minute period:

head -n 3  data/EURUSD_sample.csv
2015-12-29 00:00,1.09746,1.09783,1.09741,1.09772,486680003.2
2015-12-29 00:15,1.09772,1.098,1.0977,1.0979,445919999.1

This is rather poor pre-processed market data. Indeed, we could use tick information with a finer grain. Yet, it’s a good toy dataset to showcase online learning methods.

Upon inspecting of the data with a CSV viewer, I saw that many volume values are null. Let’s take a look using R (file analysis.R – raw):

 colClasses=c(Time="character", Open="numeric", High="numeric",
              Low="numeric", Close="numeric", Volume="numeric"))
sample$Time <- as.Date(sample$Time,format="%Y-%m-%d %H:%M")
sample[which(sample$Volume==0),]$Time %>% wday %>% ftable
   1     2    5  6   7
1996  800  8  200 1392

According to this frequency table for the day of the week on which these null volumes happen, this is mostly normal activity. The forex market closes on weekends, day 1 and 7. Let’s take a further look at this and show a frequency table of the length of the zero-volume sequence.

repeats <- rle(sample$Volume)
rtable <- repeats[["lengths"]][which(repeats[["values"]]==0)] %>%
          ftable %>%
#The data is in increments of 15 minutes. Let's look at hours instead.
rtable$. <- as.numeric(as.vector(rtable$.)) /4
   . Freq
1 20    1
2 47    1
3 48   20
4 72    1

The zero-volume ticks are grouped in sequences of two days. But occasionally the market data starts one hour earlier, and there may be sporadic days off too. We will have to keep that in mind when building both ingestion heuristics and features for our AR model. We could do much more with just this dataset, but let’s not get sidetracked: At this point, I think we know enough to move on with the next modeling step.

Ingestion and Learning pipeline

Let’s decide on a specification for the service. I’d like to write a tool for users of these ticks. As a proof of example, I’m picking the following: we’ll predict the “High” of the currently open tick.

We’ll opt for the following simplistic dense feature set for the autoregression model:

All in all, this is looks like an AR(\(p\))-ish model, but not quite. The following types will be all that’s necessary to learn (file Model.hs – source, haddock, raw).

-- | Type for a new incoming tick
data NewTick = NewTick
  { newOpen    :: Double  -- Open of the current bar
  , lastClose  :: Double  -- Close of the previous bar
  , lastHigh   :: Double  -- High of the previous bar
  , lastLow    :: Double  -- Low of the previous bar
  , lastVolume :: Double  -- Volume of the previous bar
  } deriving Generic
-- | x^t
data X = X { lastpticks :: [NewTick] } deriving Generic --Learner input x^t
instance Default X           --We'll need this for automating model dimensionality

-- | y^t
data Y = Y { high :: Double } --Learner target y^t

At this point, we can chose the memory representation of our data and of the model. Here, the data is dense so we’ll use a Vector of Double . Let’s implement the dumbest feature mapper ever (file ML.hs – source, haddock, raw):

featureBuilder :: X -> V.Vector Double
featureBuilder X{..} = V.fromList $ 1.0 : (concat $ (map tickToList lastpticks))
  where tickToList NewTick{..} = [ newOpen
                                 , lastClose
                                 , lastHigh
                                 , lastLow
                                 , lastVolume]

We now need to start writing our ML pipeline around this type. Ideally, we would share as much code as possible between the supposed production platform and the R&D activity.

We’ll write the learner code in Haskell as if it were in the production environment, and run our data-science steps around it in a more hackish fashion with python. Being able to run exploratory steps in a notebook is almost a requirement for DS. Let’s suppose that the data pipeline in the production environment looks like this:

The system feeds bar data as it arrives, along with the opening of the new bar for which the close must be predicted. This corresponds to the NewTick type previously defined. A data aggregator serves to aggregate \(p\) ticks, and feeds X’s and Y’s alternatively to the learner. The learner builds his features using featureBuilder and returns his Prediction.

This prediction type will be designed so that we can let the rest of the pipeline know if the data was aggregated but no prediction is produced. That can happen when aggregating the first \(p\) ticks for autoregression (file PipelineDaemon.hs – source, haddock, raw):

-- | Prediction output: either 'Prediction' y^t, or 'Ingested', which
-- means the data point was aggregated, but no prediction was
-- produced.
data Prediction = Prediction
     { prediction :: Double } --Learner output \widehat{y^t}
   | Ingested  -- Sometimes, the aggregator takes the value but no
               -- prediction happens.

Let’s use Conduits as the framework for the implementations of aggregator and learner (file ML.hs – source, haddock, raw):

-- | aggregation size p
newtype P = P Int
-- | AR(p) model builder. Conduit input: New tick information, output: y^t-1, x^t.
aggregator :: Monad m =>
  P ->
  ConduitT NewTick (Maybe (Y, X)) m ()
aggregator (P p) = agg []
  where agg q = awaitForever $ process q
        process q t =
          if length q < p
          then do yield Nothing
                  agg $ q ++ [t]
          else let q' = tail q ++ [t]
               in do yield $ Just $ (Y (lastHigh t), X q')
                     agg $ q'

-- | Learner. Conduit input: (y^t-1, x^t), output: \widehat{y}^t
learner :: (Monad m) =>
  LearningRate Double ->
  Lambda Double ->              --lambda(regularizer size)
  Ogdstate V.Vector Double ->   --w^t-1
  Maybe X ->                    --x^t-1
  ConduitT (Maybe (Y, X)) Prediction m ()
learner rate (Lambda lambda) state x' = awaitForever process
   where fitter = OCO.fitRidge rate (Lambda lambda)
         update y = case x' of
            Just x'' -> fitter state (featureBuilder x'') (high y)
            Nothing -> state
         process (Just (y, x)) =
          let state' = update y
          in do
           let xv = featureBuilder x
           yield $ Prediction $ OCO.predictReg state' xv
           learner rate (Lambda lambda) state' $ Just x
         process Nothing = do
           yield $ Ingested
           learner rate (Lambda lambda) state x'

-- | Helper function. Builds an initial state for 'learner'.
initialState :: Int -> Ogdstate V.Vector Double
initialState p = OCO.initialOGDState $ V.replicate n 0.0
  where n = 1 + p * (V.length $ featureBuilder $ Data.Default.Class.def)

That’s it! The learning pipeline is implemented. Note that it doesn’t expose a very rich interface. We’d like a real pipeline to have more capabilities, such as model check-pointing, input/output logging, model exportation and live metrics. This exchange rate example is really a toy, and I won’t deploy any infrastructure, so let’s keep it at that.

The DS workflow around the Haskell learner code should use notebooks as much as possible. There are basically two main ways to interface this code with python notebooks. When working on self-contained parts of the pipeline (such as featureBuilder), the haskell FFI would be sufficient. However, we’ll also want to run experiments with the whole pipeline, and in that case the learner should use its own logic for maintaining its mutable state. Otherwise, that wouldn’t really amout to doing DS experiments with the production code. In that case, it’s easier to just run the pipeline as a daemon. In that situation, it’s best to go all the way and use a third-party message-passing library and a serializer:

In the next post, I’ll implement this software architecture and go through the final data-science steps with our learner. I’ll do all message passing with standard libraries, using ZeroMQ and MsgPack.