Applied Online Learning with Haskell and friends, Part 3

Posted on June 17, 2018 under tag(s) machine learning, functional programming, haskell, python, R, online learning, autoregression, forex

This post is the third in a series on online machine learning techniques. In the first post, I went over online convex optimization basics and showed how to apply the framework to ridge regression. In the second post, I took a brief look at some EUR/USD exchange data, and showed how ridge regression can help predict it via auto-regression.

The full supporting Haskell implementation for these posts and the one you are reading now is documented via Haddock here with hyperlinked sources. These posts are packaged via Nix here so you can play with the source as you like (check out README.md) on any Linux or MacOs box.

In this post, I’ll conclude on our toy example by interfacing it with Python notebooks, optimizing hyper-parameters and evaluating the performance of the approach.

Language Bindings

The last post introduces the software architecture we want to have:

Let’s serialize the types from that last post using MsgPack. The data-msgpack package is on Stackage (a good quality indicator), and Msgpack bindings are available for Python and R. Three very good arguments for using this serializer. We could use generics, but that’s not very stable at this point, so we’ll have to do some extra work when changing the interface. That’s a price we unfortunately have to pay for using a widely supported serialization library.

The implementation is quite straightforward (file Model.hs – source, haddock, raw):

instance MTC.MessagePack Prediction where
  toObject = let (n::Option Double) = None
    in \case
      Ingested -> MTC.toObject n
      Prediction p -> MTC.toObject $ Some p
  fromObject t = MTC.fromObject t >>= \case
     Some t' -> return $ Prediction t'
     None -> return Ingested

instance MTC.MessagePack NewTick where
  toObject NewTick{..} =
    MTC.toObject (newOpen,lastClose,lastHigh,lastLow,lastVolume)
  fromObject t = do
    (newOpen,lastClose,lastHigh,lastLow,lastVolume) <- MTC.fromObject t
    return $ NewTick newOpen lastClose lastHigh lastLow lastVolume

We’ll now use the Conduits we implemented in the last post. We want to build a pipeline using ZeroMQ interface to the pipeline. That’s the most verbose part, but we somehow have to pay that price if we want to provide the pipeline as a daemon. It’s not as large a codebase as could be expected in another language though, since conduit exposes combinator abstractions. The file (file Model.hs – source, haddock, raw) implements this as a daemon that binds the pipeline to a ZeroMQ interface:

-- | Module : Main
-- Description : ML Pipeline Daemon for the DS experiments.
module Main where
import qualified OCO
import qualified ML
import qualified Model

import qualified System.Environment as SE
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List as CL
import qualified System.ZMQ4 as ZMQ
import qualified Data.MessagePack as MsgPack
import qualified Data.ByteString.Lazy as BL

import Data.Conduit

-- | Main pipeline. 'pipeline' 'sock' 'p' 'lambda' 'ratetype' 'rateValue'
-- 'baseline' recieves from zmq4 socket 'sock' and learns using a ridge
-- AR('p') model with regularizer size 'lambda', rate type 'rateType'
-- in "fixed" and "squared", rate value 'rateValue'. If 'baseline'
-- is set to true, the pipeline defaults to using the previous tick's
-- 'High' value for the prediction.
pipeline :: (ZMQ.Sender a, ZMQ.Receiver a) =>
  ZMQ.Socket a -> Int -> Double -> String -> Double -> Bool -> IO ()
pipeline sock p lambda rateType rateValue baseline= runConduit $
   CC.repeatM (ZMQ.receive sock)              .| --request
   CL.mapMaybe (MsgPack.unpack.BL.fromStrict) .| --unpacking
   logic                                      .| --learning logic
   CC.map MsgPack.pack                        .| --packing
   CC.mapM_ (ZMQ.send' sock [])                  --response
   where logic = case baseline of
           False -> (ML.aggregator (ML.P p) .|
                     ML.learner rate (OCO.Lambda lambda) initialState Nothing )
           True -> CC.map (\newtick -> ML.Prediction $ Model.lastHigh newtick)
         initialState = ML.initialState p
         rate = case rateType of
          "fixed" -> OCO.fixedRate rateValue
          "squared" -> OCO.squaredRate rateValue
          _ -> fail "rate type unknown"

-- | The arguments for this binary have the form:
-- ./PipelineDaemon sockname aggregation lambda rateType rateValue baseline
main :: IO ()
main = do (sockname:aggregation:lambda:rateType:rateValue:baseline:_) <- SE.getArgs
          ZMQ.withContext
            (\ctx -> ZMQ.withSocket ctx ZMQ.Pair
            (\sock -> (ZMQ.bind sock sockname >>
              pipeline sock (read aggregation) (read lambda) rateType
              (read rateValue) (read baseline)
            )))

Phew. That’s actually all the haskell code we’ll write in this post. We’ve built a message passing interface, so the more hackish data-science steps can all be performed in a Python or R notebook. Let’s choose Python and write a wrapper for that interface. That amounts to setting up an IPC socket and piping data through the MsgPack and ZMQ layers (file Pipeline.py – raw):

class Pipeline:
  def __init__(self, daemonpath="hs/PipelineDaemon", socketname="aoeu", Lambda=1, p=1, rateType="fixed", rateValue=0.1, baseline=False):
    s = "exec %s ipc:///tmp/%s %s %s %s %s %s" %(daemonpath,socketname,p,Lambda,rateType,rateValue,baseline)
    print(s)
    self.daemon = subprocess.Popen([s], shell=True)
    self.ctx = zmq.Context.instance()
    self.socket = self.ctx.socket(zmq.PAIR)
    self.socket.connect("ipc:///tmp/"+socketname)
  def feed(self, newOpen,lastClose,lastHigh,lastLow,lastVolume):
    self.socket.send(msgpack.packb([newOpen ,lastClose
                                   ,lastHigh ,lastLow ,lastVolume]))
    return(msgpack.unpackb(self.socket.recv()))
  def __del__(self):
    self.socket.close()
    self.ctx.term()
    self.daemon.kill()

Great. Let’s move on to the hackish last few steps using notebook-style data wrangling.

Experimental evaluation

We now have a Python wrapper so let’s get something out of the way immediately.

\(\quad\) import numpy import pandas \(\quad\)

That feels much better. In the rest of this post, I’ll load up the data we downloaded previously and use our python wrapper to explore the performance of the autoregressive ridge model. I’ll present this under a hackish “trial and error” notebook format, in order to underline the fact that interactive notebooks are absolutely necessary for such work – that wrapper was really not written in vain.

We’ll load up some of that data from before (file notebook.py – raw) and implement model fitting and loss reporting:

import pandas as pd
import numpy as np

from Python.Pipeline import Pipeline

def read_eurusd(fn):
  df=pd.read_csv(fn)
  df['newOpen']=df.Open.shift(periods=-1)
  df=df.drop('Open',axis=1)
  return(df.drop(df[df.Volume==0].index).reindex())

train = read_eurusd('data/EURUSD_sample.csv')
test = read_eurusd('data/EURUSD.csv')

#run the model and returns the lists of target and predicted values.
def run_model(dataset,P=1,Lambda=10,rateType="fixed",rateValue=0.001,baseline=False):
  pp=Pipeline("hs/PipelineDaemon","aosenuth",Lambda,P,rateType,rateValue,baseline)
  y, targets, predictions = None , [], []
  for index,row in dataset.iterrows():
    predicted=pp.feed(newOpen=row['newOpen']
                      ,lastClose=row['Close'] ,lastHigh=row['High']
                      ,lastLow=row['Low'] ,lastVolume=row['Volume'])
    if y:
      targets.append(row['High'])
      predictions.append(y)
    y=predicted
  return(targets,predictions)

#evaluation
def se(targets,predictions):
  return([(i-j)**2 for i,j in zip(targets,predictions)])
def mse(targets,predictions,cutoff=0):
  losses = se(targets,predictions)[cutoff:]
  return( sum(losses)/len(losses) )

Let’s learn the model using arbitrary hyperparameters on a training set. We’ll also look at the performance of the most naive baseline possible: using the last tick’s ‘High’ as a prediction.

#computing the squared error for every instance
t_tr_lrn,p_tr_lrn=run_model(train)
t_tr_base,p_tr_base=run_model(train,baseline=True)
print("Mean training MSE of the baseline: %s" %mse(t_tr_base,p_tr_base))
print("Mean training MSE of the learner: %s" %mse(t_tr_lrn,p_tr_lrn))
: Mean training MSE of the baseline: 3.971963273872008e-07
: Mean training MSE of the learner: 1.1353425196525514e+21

28 orders of magnitude above the baseline – something’s fishy. Rewind to the regret bound from the first post in this series:

\[\begin{equation} R_F(T) \leq 2 \lambda^2 \sqrt{T} + \left( \sqrt{T} - \frac{1}{2} \right) \|\nabla c \|^2 \end{equation} \]

where \(\|\nabla c \| = \max_{w,x,y} x^t \left( w^T x^t - y^t \right)\).

Here, we have quite large volume values:

train.describe()
:                High           Low         Close        Volume       newOpen
: count  10483.000000  10483.000000  10483.000000  1.048300e+04  10483.000000
: mean       1.114819      1.113976      1.114393  2.855758e+09      1.114393
: std        0.020224      0.020264      0.020244  1.787806e+09      0.020244
: min        1.072250      1.071070      1.071500  3.820000e+07      1.071500
: 25%        1.093730      1.092955      1.093335  1.474655e+09      1.093330
: 50%        1.117720      1.116890      1.117280  2.481320e+09      1.117280
: 75%        1.131630      1.130870      1.131280  3.956030e+09      1.131285
: max        1.161620      1.159960      1.160240  1.228204e+10      1.160220

The value of \(\|\nabla c \|\) renders the OGD regret bound useless. There are a few ways to approach this problem. Using one of the available adaptive gradient descent methods could do the trick. A recursive range doubling trick could work reasonably well, if restarting the model \(\log(\|\nabla c \|)\) is reasonable. In a real system, it’s usually not.

Here, we’ll stick with OGD and use a third, more pragmatic, data-science-ish approach. What’s the maximum traded volume on the EUR/USD exchange in some offline training data? Let’s double that and normalize. We’d ultimately implement that in the pipeline, but as far as this post is concerned, I’ll just run some notebook tests:

normalizer = max(train['Volume']) * 2
test['Volume'] = test['Volume']/normalizer
train['Volume'] = train['Volume']/normalizer
t_tr_lrn,p_tr_lrn=run_model(train)
print("Mean training MSE of the learner with pre-normalized volume: %s" %mse(t_tr_lrn,p_tr_lrn))
: Mean training MSE of the learner with pre-normalized volume: 0.009773901384921718

That’s better, but something is still off. Let’s look at the evolution of the loss of the learner:

import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
plt.figure(figsize=(6,4))
ax=sns.tsplot(np.cumsum(se(t_tr_lrn,p_tr_lrn)))
ax.set(xlabel='ticks', ylabel='squared loss')
plt.savefig("evolution.png")

It appears the contribution to the cumulative objective metric is mostly from the first few ticks. That is due to the starting parameter and the first few gradient steps being very poor. Ideally, we’d set some data aside to pre-train. For the purposes of keeping this post simple, I’ll just give the algorithm some head-start:

print("Mean (Cutoff) training MSE of the baseline: %s" %mse(t_tr_base,p_tr_base,cutoff=1000))
print("Mean (Cutoff) training MSE of the learner with pre-normalized volume: %s" %mse(t_tr_lrn,p_tr_lrn,cutoff=1000))
: Mean (Cutoff) training MSE of the baseline: 3.98316292312566e-07
: Mean (Cutoff) training MSE of the learner with pre-normalized volume: 1.6258569899648825e-06

Okay, almost there but the baseline is still better than our learner. Let’s optimize hyperparameters using a grid search.

def gridsearch(hyperparameters):
  losses,ps,lambdas,rts,rvs=[],[],[],[],[]
  for P in hyperparameters['P']:
    for Lambda in hyperparameters['Lambda']:
      for rt in hyperparameters['rateType']:
        for rv in hyperparameters['rateValue']:
           targets,predictions=run_model(train,P,Lambda,rt,rv)
           losses.append(mse(targets,predictions,cutoff=1000))
           ps.append(P)
           lambdas.append(Lambda)
           rts.append(rt)
           rvs.append(rv)
  return(pd.DataFrame({'Losses':losses, 'P':ps,
                       'Lambda':lambdas, 'RateType':rts,
                       'RateValue':rvs}))
hp1 = { 'P':[1,10]
      , 'Lambda':[0.1,1,10]
      , 'rateType':["fixed","squared"]
      , 'rateValue':[10,1,0.1,0.01,0.001] }
print(gridsearch(hp1))
0      0.1  7.615971e-01   1    fixed     10.000
1      0.1  7.615964e-01   1    fixed      1.000
2      0.1  7.615943e-01   1    fixed      0.100
3      0.1  7.616207e-01   1    fixed      0.010
4      0.1  7.617186e-01   1    fixed      0.001
5      0.1  7.618420e-01   1  squared     10.000
6      0.1  7.619636e-01   1  squared      1.000
7      0.1  7.620538e-01   1  squared      0.100
8      0.1  1.028639e+00   1  squared      0.010
9      0.1  1.226082e+00   1  squared      0.001
10     1.0  7.262987e+00   1    fixed     10.000
11     1.0  7.262585e+00   1    fixed      1.000
12     1.0  3.154518e-07   1    fixed      0.100
13     1.0  3.820267e-07   1    fixed      0.010
14     1.0  1.625857e-06   1    fixed      0.001
15     1.0  2.522215e-05   1  squared     10.000
16     1.0  1.561961e-03   1  squared      1.000
17     1.0  1.013956e-01   1  squared      0.100
18     1.0  1.028639e+00   1  squared      0.010
19     1.0  1.226082e+00   1  squared      0.001
20    10.0  6.026437e+02   1    fixed     10.000
21    10.0  6.026287e+02   1    fixed      1.000
22    10.0  3.154518e-07   1    fixed      0.100
23    10.0  3.820267e-07   1    fixed      0.010
24    10.0  1.625857e-06   1    fixed      0.001
25    10.0  4.166677e-05   1  squared     10.000
26    10.0  2.198284e-02   1  squared      1.000
27    10.0  1.013956e-01   1  squared      0.100
28    10.0  1.028639e+00   1  squared      0.010
29    10.0  1.226082e+00   1  squared      0.001
30     0.1  1.622722e-01  10    fixed     10.000
31     0.1  1.622713e-01  10    fixed      1.000
32     0.1  1.622676e-01  10    fixed      0.100
33     0.1  1.622964e-01  10    fixed      0.010
34     0.1  1.624527e-01  10    fixed      0.001
35     0.1  1.625985e-01  10  squared     10.000
36     0.1  1.627590e-01  10  squared      1.000
37     0.1  1.629935e-01  10  squared      0.100
38     0.1  1.674918e-01  10  squared      0.010
39     0.1  1.060242e+00  10  squared      0.001
40     1.0  5.239155e+01  10    fixed     10.000
41     1.0  5.239144e+01  10    fixed      1.000
42     1.0  5.238925e+01  10    fixed      0.100
43     1.0  5.657399e-07  10    fixed      0.010
44     1.0  1.379379e-06  10    fixed      0.001
45     1.0  1.964126e-06  10  squared     10.000
46     1.0  1.947093e-06  10  squared      1.000
47     1.0  1.046234e-02  10  squared      0.100
48     1.0  1.674918e-01  10  squared      0.010
49     1.0  1.060242e+00  10  squared      0.001
50    10.0  5.115440e+03  10    fixed     10.000
51    10.0  5.115429e+03  10    fixed      1.000
52    10.0  5.115232e+03  10    fixed      0.100
53    10.0  5.657399e-07  10    fixed      0.010
54    10.0  1.379379e-06  10    fixed      0.001
55    10.0  4.956356e-06  10  squared     10.000
56    10.0  6.287338e-06  10  squared      1.000
57    10.0  1.046234e-02  10  squared      0.100
58    10.0  1.674918e-01  10  squared      0.010
59    10.0  1.060242e+00  10  squared      0.001

Alright, that was a very poor hyper-parameter search , and we didn’t split the training set for C/V. Moreover, the best hyper-parameters doesn’t really make use of the historical data – it seems that the last tick’s value is enough. Maybe we need a better hyper-parameter search, and maybe this is a modeling issue. This post is getting long, so let’s leave it at that and try validating on the testing set right ahead:

t_te_bas,p_te_bas=run_model(test,baseline=True)
t_te_lrn,p_te_lrn=run_model(test,P=1,Lambda=1,rateType="fixed",rateValue=0.1)

print("Mean (cutoff) testing MSE of the baseline: %s"
  %mse(t_te_bas,p_te_bas,cutoff=1000))
print("Mean (cutoff) testing MSE of the learner with pre-normalized volume: %s" 
  %mse(t_te_lrn,p_te_lrn,cutoff=1000))
: Mean (cutoff) testing MSE of the baseline: 5.39433568176514e-07
: Mean (cutoff) testing MSE of the learner with pre-normalized volume: 4.534395880527734e-07

We’ve reached our goal: The learner outperforms the baseline. The aggregation hyper-parameter that was selected means this was achieved using an AR(1) model with a constant learning rate. The hyper-optimization method we used is basic: we could achieve better results using a more careful search technique. More precisely, we’d like the value of \(\lambda\) to be smaller. Here, the regularization constraint is not really met.


Back