Run Leadership in Rcpp

To install Systematic Investor Toolbox (SIT) please visit About page.

Let’s continue with Run Correlation in Rcpp post and implement running window Leadership score based on lead-lag relationship [Detecting Leaders from Correlated Time Series D. Wu, Y. Ke, J. Yu, P. Yu, L. Chen]

First, let’s load historical prices for S&P 500:

#*****************************************************************
# Load historical end of day data
#*****************************************************************
library(SIT)
load.packages('quantmod')

filename = 'big.test.Rdata'

if(!file.exists(filename)) {
  tickers = nasdaq.100.components()
  tickers = sp500.components()$tickers

  data = env()
  getSymbols.extra(tickers, src = 'yahoo', from = '1970-01-01', env = data, auto.assign = T)
  
	rm.index = which(sapply(ls(data), function(i) is.null(data[[i]])))
	if(any(rm.index)) env.del(names(rm.index), data)
  
  for(i in ls(data)) data[[i]] = adjustOHLC(data[[i]], use.Adjusted=T)
  #print(bt.start.dates(data))

  bt.prep(data, align='keep.all', dates='2000::', fill.gaps=T)

  # remove ones with little history
  prices = data$prices
  bt.prep.remove.symbols(data, which(
   	count(prices, side = 2) < 3*252 | is.na(last(prices)) 
  ))
       
  # show the ones removed
  print(setdiff(tickers,names(data$prices)))

  prices = data$prices
  save(prices, file=filename)
}

load(file=filename)

#*****************************************************************
# Run Lead Lag Correlation
#*****************************************************************
prices = prices[,1:5]

n = ncol(prices)
nperiod = nrow(prices)
ret = prices / mlag(prices) - 1

index =  (nperiod-20):nperiod
ret = ret[index,]
nperiod = nrow(ret)
nwindow = 15

# lead - lag sample function
rtest = function(ret, nwindow) {
	nlag = floor(nwindow/2)
	n = ncol(ret)
	nperiod = nrow(ret)
	out = array(0, c(n,n,nperiod))
	index = !lower.tri(out[,,1])

	for(i in nwindow:nperiod) {
		temp = coredata(ret[(i-nwindow+1):i,])
		temp.out = matrix(0, n,n)
		
		for (c1 in 2:n) {
			data1 = temp[,c1]
			c2.index = 1:(c1-1)
			data2 = temp[,c2.index,drop=F]

			# if c1.cor : c2 -> c1 i.e. cor(data1[(lag+1):nwindow], data2[1:(nwindow-lag),])
			c1.cor = rep(0,ncol(data2))
			for (lag in 0:nlag) {
				cor.temp = cor(data1[(lag+1):nwindow], data2[1:(nwindow-lag),])
				c1.cor = c1.cor + pmax(cor.temp, 0)
			}
			c1.cor = c1.cor / (nlag + 1)
			
			# if c2.cor : c1 -> c2 i.e. cor(data1[1:(nwindow-lag)], data2[(lag+1):nwindow,])
			c2.cor = rep(0,ncol(data2))
			for (lag in 1:nlag) {
				cor.temp = cor(data1[1:(nwindow-lag)], data2[(lag+1):nwindow,])
				c2.cor = c2.cor + pmax(cor.temp, 0)
			}
			c2.cor = c2.cor / nlag
			
			# if c1.cor >= c2.cor hence c2 -> c1 i.e. cor(data1[(lag+1):nwindow], data2[1:(nwindow-lag),])
			index = c1.cor >= c2.cor
			if(any(index))
				temp.out[c2.index[index], c1] = c1.cor[index]
				
			# if c2.cor >= c1.cor hence c1 -> c2 i.e. cor(data1[1:(nwindow-lag)], data2[(lag+1):nwindow,])
			index = !index
			if(any(index))
				temp.out[c1, c2.index[index]] = c2.cor[index]							
		}			
		out[,,i] = temp.out
	}
	out
}

r.cor = rtest(ret, nwindow)

Next, let’s implement a function using Rcpp. I will re-use code from Run Correlation in Rcpp post.

#include <Rcpp.h>
using namespace Rcpp;
using namespace std;

// [[Rcpp::plugins(cpp11)]]


// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;


/*------ pre-compute sum and stdev ------*/
struct cor_smart_p1 : public Worker {
	const RMatrix<double> mat;
	int rstart, rend;
	const int nperiod;

	RVector<double> rsum, rsum2, rstdev;

	bool first_run;

	cor_smart_p1(const NumericMatrix& mat, const int rstart, const int rend,
		NumericVector rsum, NumericVector rsum2, NumericVector rstdev)
		: mat(mat), rstart(rstart), rend(rend), 
		nperiod(rend - rstart), rsum(rsum), rsum2(rsum2), rstdev(rstdev), first_run(true) { }

	void update(int i) {
		rstart = i;
		rend = rstart + nperiod;
		first_run = false;
	}
	void do_first_run(size_t begin, size_t end) {
		for (size_t c = begin; c < end; c++) {
			double sum, sum2;
			sum = sum2 = 0;

			for (int r = rstart; r < rend; r++) {
				double d = mat(r, c);
				sum += d;
				sum2 += pow(d, 2);
			}

			rsum[c] = sum;
			rsum2[c] = sum2;
			rstdev[c] = sqrt(nperiod * sum2 - pow(sum, 2));
		}
	}
	void do_update(size_t begin, size_t end) {
		for (size_t c = begin; c < end; c++) {
			double d0 = mat(rstart - 1, c);
			double d1 = mat(rend - 1, c);
			rsum[c] += -d0 + d1;
			rsum2[c] += -pow(d0, 2) + pow(d1, 2);
			rstdev[c] = sqrt(nperiod * rsum2[c] - pow(rsum[c], 2));
		}
	}
	void operator()(size_t begin, size_t end) {
		if (first_run)
			do_first_run(begin, end);
		else
			do_update(begin, end);
	}
};


/*------ Running Leadership ------*/
// compute correlation
struct leadership_smart_p2 : public Worker {
	const RMatrix<double> mat;
	int rstart, rend;
	const int nperiod, nc;
	const RVector<double> sum, sum2, stdev;

	vector<vector<double>>& infoXY;
	RMatrix<double> rmat;

	bool first_run;
	int nlag;

	vector<double> cor_mat;

	leadership_smart_p2(const NumericMatrix& mat, int rstart, int rend,
		const NumericVector& sum, const NumericVector& sum2, const NumericVector& stdev,
		vector<vector<double>>& infoXY, NumericMatrix rmat)
		: mat(mat), rstart(rstart), rend(rend), nperiod(rend - rstart), nc(mat.ncol()),
		sum(sum), sum2(sum2), stdev(stdev), infoXY(infoXY), rmat(rmat),
		first_run(true), nlag(floor(nperiod/2))
	{ }

	void update(int i) {
		rstart = i;
		rend = rstart + nperiod;
		first_run = false;
	}

	// first run: pre-compute all correlations
	void do_first_run(size_t begin, size_t end) {
		int xyi = floor(begin * (begin - 1) / 2);
		for (size_t c1 = begin; c1 < end; c1++) {
			for (size_t c2 = 0; c2 < c1; c2++, xyi++) {
				for (int lag = 0; lag <= nlag; lag++) {
					double sXY = 0;
					for (int r1 = rstart + lag, r2 = rstart; r1 < rend; r1++, r2++) {
						sXY += mat(r1, c1) * mat(r2, c2); // r1 always ends at rend
					}
					infoXY[lag][xyi] = sXY;
				}

				for (int lag = nlag+1; lag <= 2*nlag; lag++) {
					double sXY = 0;
					for (int r1 = rstart + lag - nlag, r2 = rstart; r1 < rend; r1++, r2++) {
						sXY += mat(r2, c1) * mat(r1, c2); // r1 always ends at rend
					}
					infoXY[lag][xyi] = sXY;
				}
			}
		}
	}

	// update run: use pre-compute correlations and sums to compute stats
	void do_update(size_t begin, size_t end) {
		int xyi = floor(begin * (begin - 1) / 2);
		for (size_t c1 = begin; c1 < end; c1++) {
			for (size_t c2 = 0; c2 < c1; c2++, xyi++) {
				double c1cor, c2cor;
				double r1sum, r1sum2, r1stdev;
				double r2sum, r2sum2, r2stdev;
				r1sum = sum[c1]; r1sum2 = sum2[c1];
				r2sum = sum[c2]; r2sum2 = sum2[c2];

if (!first_run) infoXY[0][xyi] += -mat(rstart - 1, c1) * mat(rstart - 1, c2) + mat(rend - 1, c1) * mat(rend - 1, c2);

				r1stdev = sqrt(nperiod * r1sum2 - pow(r1sum, 2));
				r2stdev = sqrt(nperiod * r2sum2 - pow(r2sum, 2));
				double temp = (nperiod * infoXY[0][xyi] - r1sum * r2sum) / (r1stdev * r2stdev);
				c1cor = max(temp, 0.0);
				
				int r20 = rstart - 1;
				int r11 = rend - 1;

				for (int lag = 1; lag <= nlag; lag++) {
					int r10 = rstart - 1 + lag;
					int r21 = rend - 1 - lag;
if (!first_run) infoXY[lag][xyi] += -mat(r10, c1) * mat(r20, c2) + mat(r11, c1) * mat(r21, c2);

					double d1 = mat(r10, c1);
					double d2 = mat(r21 + 1, c2);
					r1sum -= d1;
					r2sum -= d2;
					r1sum2 -= pow(d1, 2);
					r2sum2 -= pow(d2, 2);
					int nperiod1 = nperiod - lag;
					r1stdev = sqrt(nperiod1 * r1sum2 - pow(r1sum, 2));
					r2stdev = sqrt(nperiod1 * r2sum2 - pow(r2sum, 2));

					temp = (nperiod1 * infoXY[lag][xyi] - r1sum * r2sum) / (r1stdev * r2stdev);
					c1cor += max(temp, 0.0);
				}

				r1sum = sum[c1]; r1sum2 = sum2[c1];
				r2sum = sum[c2]; r2sum2 = sum2[c2];
				c2cor = 0;
				for (int lag = nlag + 1; lag <= 2 * nlag; lag++) {
					int r10 = rstart - 1 + lag - nlag;
					int r21 = rend - 1 - (lag - nlag);
if (!first_run) infoXY[lag][xyi] += -mat(r20, c1) * mat(r10, c2) + mat(r21, c1) * mat(r11, c2);

					double d1 = mat(r21 + 1, c1);
					double d2 = mat(r10, c2);
					r1sum -= d1;
					r2sum -= d2;
					r1sum2 -= pow(d1, 2);
					r2sum2 -= pow(d2, 2);
					int nperiod1 = nperiod - (lag - nlag);
					r1stdev = sqrt(nperiod1 * r1sum2 - pow(r1sum, 2));
					r2stdev = sqrt(nperiod1 * r2sum2 - pow(r2sum, 2));

					temp = (nperiod1 * infoXY[lag][xyi] - r1sum * r2sum) / (r1stdev * r2stdev);
					c2cor += max(temp, 0.0);
				}

				c1cor = c1cor / (nlag + 1);
				c2cor = c2cor / nlag;

				if (c1cor >= c2cor) {
					rmat(c2, c1) = c1cor;
					rmat(c1, c2) = 0.0;
				} else {
					rmat(c1, c2) = c2cor;
					rmat(c2, c1) = 0.0;
				}
			}
		}
	}

	// main entry
	void operator()(size_t begin, size_t end) {
		if (first_run) 
			do_first_run(begin, end);			
		do_update(begin, end);		
	}

};

// [[Rcpp::export]]
NumericVector cp_run_leadership_smart(NumericMatrix mat, int nwindow, bool runInParallel) {
	//bool runInParallel = true;
	int nc = mat.ncol();
	int nperiod = mat.nrow();
	NumericVector ret = NumericVector(Dimension(nc, nc, nperiod));
		fill_n(ret.begin(), ((0 + nwindow - 1) * nc * nc), 0.0);

	// pre-compute first run
	NumericVector rsum(nc), rsum2(nc), rstdev(nc);
	cor_smart_p1 p1(mat, 0, nwindow, rsum, rsum2, rstdev);
	if(runInParallel)
		parallelFor(0, nc, p1);
	else
		p1.do_first_run(0, nc);

	NumericMatrix cor(nc, nc);
		fill(cor.begin(), cor.end(), 0.0);
	int nlag = floor(nwindow / 2);

	vector<vector<double>> infoXY(2*nlag + 1, vector<double>( floor((nc-1)*nc / 2) ));
	leadership_smart_p2 p2(mat, 0, nwindow, rsum, rsum2, rstdev, infoXY, cor);

	if (runInParallel)
		parallelFor(1, nc, p2);
	else {
		p2.do_first_run(1, nc);
		p2.do_update(1, nc);
	}

	std::copy(cor.begin(), cor.end(), ret.begin() + ((0 + nwindow - 1) * nc * nc));

	// for loop and append
	for (int i = 1; i < (nperiod - nwindow + 1); i++) {
		p1.update(i);
		if (runInParallel)
			parallelFor(0, nc, p1);
		else
			p1.do_update(0, nc);

		p2.update(i);
		if (runInParallel)
			parallelFor(1, nc, p2);
		else
			p2.do_update(1, nc);

		std::copy(cor.begin(), cor.end(), ret.begin() + ((i + nwindow - 1) * nc * nc));
	}

	return ret;
}

Please save above code in the lead.lag.correlation.cpp file or download lead.lag.correlation.cpp.

Please note that above code requires lot’s of memory to store results; hence, you might want to add --max-mem-size=8000M to your R parameters.

Next let’s make sure it produces same results and compare the run times.

#*****************************************************************
# Run Lead Lag Correlation
#*****************************************************************
# make sure to install [Rtools on windows](http://cran.r-project.org/bin/windows/Rtools/)
load.packages('Rcpp')
load.packages('RcppParallel')
# [RcppParallel help site](http://rcppcore.github.io/RcppParallel/)
# [RcppParallel source code](https://github.com/RcppCore/RcppParallel)
# [RcppParallel introduction](http://dirk.eddelbuettel.com/blog/2014/07/16/#introducing_rcppparallel)
# [RcppParallel gallery](http://gallery.rcpp.org/articles/parallel-distance-matrix/)
# defaultNumThreads()
#[High performance functions with Rcpp](http://adv-r.had.co.nz/Rcpp.html)

#*****************************************************************
# Test on small example
#*****************************************************************
# load Rcpp functions
sourceCpp('lead.lag.correlation.cpp')

r.cor = rtest(ret, nwindow)
c.cor = cp_run_leadership_smart(ret, nwindow, T)

print(test.equality(r.cor, c.cor))
item1 item2 equal
r.cor c.cor TRUE
print(max(abs(r.cor - c.cor), na.rm=T))

1.11022302462516e-16

# compare performance
library(rbenchmark)
res <- benchmark(rtest(ret, nwindow),
			cp_run_leadership_smart(ret, nwindow,T),
			cp_run_leadership_smart(ret, nwindow,F),
			replications = 2,
			order="relative")
print(res[,1:4])
rownames(x) test replications elapsed relative
1 rtest(ret, nwindow) 2 0.08  
2 cp_run_leadership_smart(ret, nwindow, T) 2 0.00  
3 cp_run_leadership_smart(ret, nwindow, F) 2 0.00  
#*****************************************************************
# Test on large example
#*****************************************************************
load(file=filename)
prices = prices[,1:25]
n = ncol(prices)
nperiod = nrow(prices)
ret = prices / mlag(prices) - 1

index =  (nperiod-100):nperiod
ret = ret[index,]
nperiod = nrow(ret)
nwindow = 19

r.cor = rtest(ret, nwindow)
c.cor = cp_run_leadership_smart(ret, nwindow, T)

print(test.equality(r.cor, c.cor))
item1 item2 equal
r.cor c.cor TRUE
print(max(abs(r.cor - c.cor), na.rm=T))

1.02695629777827e-15

# compare performance
library(rbenchmark)
res <- benchmark(rtest(ret, nwindow),
			cp_run_leadership_smart(ret, nwindow,T),
			cp_run_leadership_smart(ret, nwindow,F),
			replications = 2,
			order="relative")
print(res[,1:4])
rownames(x) test replications elapsed relative
1 rtest(ret, nwindow) 2 8.26  
2 cp_run_leadership_smart(ret, nwindow, T) 2 0.00  
3 cp_run_leadership_smart(ret, nwindow, F) 2 0.01  
#*****************************************************************
# Test on large example
#*****************************************************************
load(file=filename)
n = ncol(prices)
nperiod = nrow(prices)
ret = prices / mlag(prices) - 1

index =  (nperiod-200):nperiod
ret = ret[index,]
nperiod = nrow(ret)
nwindow = 19

c.cor.F = cp_run_leadership_smart(ret, nwindow, F)
c.cor.T = cp_run_leadership_smart(ret, nwindow, T)

print(test.equality(c.cor.T, c.cor.F))
item1 item2 equal
c.cor.T c.cor.F TRUE
print(max(abs(c.cor.T - c.cor.F), na.rm=T))

0

# compare performance
library(rbenchmark)
res <- benchmark(
			cp_run_leadership_smart(ret, nwindow,T),
			cp_run_leadership_smart(ret, nwindow,F),
			replications = 1,
			order="relative")
print(res[,1:4])
rownames(x) test replications elapsed relative
1 cp_run_leadership_smart(ret, nwindow, T) 1 2.03 1.000
2 cp_run_leadership_smart(ret, nwindow, F) 1 9.71 4.783

The RcppParallel version is about 4 times faster.

(this report was produced on: 2015-04-23)