Saturday, March 3, 2012

Capturing Tick Data via C#, Interactive Brokers, and MySQL

Interactive Brokers is a discount brokerage that provides a good API for programatically accessing their platform.  The purpose of this post is to create an application that will capture tick level data and save that data into a database for future use.

I started to use the IBrokers package in R to do this post.  However, as R is NOT easily threaded and the IB API is heavily threaded, well... oil and water.

Instead I went with the C# port of the API from DinosaurTech.  It's good and it's free.

Luckily you do not need an account with Interactive Brokers for this project.  They have a demo environment available on their webpage.  The data is FAKE, but it's good enough to test connectivity.  Simply run the demo, then go to Configure->API->Settings and insure that the "Enable ActiveX and Socket Clients" is checked and the port is set to 7496.

To follow along in this post, you will need
  1. MySQL, MySQL Workbench, and MySQL Connector for .NET (all available here).
  2. VS2010 with C#.  You can download the free Express version here.
  3. The libraries from DinosaurTech available at the link above.
  4. A basic understanding of C# (available here [Pro C# 2010 and the .NET 4 Platform], here [Beginning Visual C# 2010 (Wrox Programmer to Programmer)], or here[Google] ).
To start, install all of the above items.  In MySQL workbench, run the following SQL to create the table to store ticks.
SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0;
SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0;
SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='TRADITIONAL';

DROP SCHEMA IF EXISTS `tick_db` ;
CREATE SCHEMA IF NOT EXISTS `tick_db` DEFAULT CHARACTER SET latin1 COLLATE latin1_swedish_ci ;
USE `tick_db` ;

-- -----------------------------------------------------
-- Table `ticks`
-- -----------------------------------------------------
DROP TABLE IF EXISTS `ticks` ;

CREATE  TABLE IF NOT EXISTS `ticks` (
  `idticks` INT NOT NULL ,
  `symbol` VARCHAR(8) NOT NULL ,
  `date` DATE NOT NULL  ,
  `time` TIME NOT NULL  ,
  `value` FLOAT NOT NULL ,
  `type` VARCHAR(12) NOT NULL ,
  PRIMARY KEY (`idticks`, `date`) )
ENGINE = InnoDB PARTITION BY KEY(date) PARTITIONS 1;

CREATE INDEX `Symbol` ON `ticks` (`symbol` ASC) ;

SET SQL_MODE=@OLD_SQL_MODE;
SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS;
SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS;
You will note that we are partitioning the table by date.  The goal of this is so that as the data grows, day over day, that we can still query quickly.

If you do not not have a user in the database, create one and give it permissions on this table (my user is dom and the password, a super secure dom123).

Before we go on, let's discuss how the IB API works.  Your programming environment connects through their trading platform called TWS (Trader Work Station) as it is running on your computer.  It uses the TWS connections to the IB servers to make your requests.  When you request market tick data for a stock, the data arrives whenever it wants.  There is no synchronous get() function.  You ask for data and the API fires events when that data comes back.  In this, you can have multiple requests running at the same time.  You just have to keep straight what equity data you are processing at any given time.

This threaded nature means that we can process a large number of requests.  It also means that we need to free up the threads that are receiving data as quickly as possible.  I.E.  they shouldn't be writing to the database.  We will create a queue of ticks in a separate thread that will post those to the database as quickly as it can.

Ticks are reported as updates to BID, BID_SIZE, ASK, ASK_SIZE, LAST, LAST_SIZE, VOLUME, HIGH, LOW, and CLOSE.

Here is the plan of action:

  1. Create a new console application in C#.
  2. Create a Tick class for holding the tick and give it a method that returns the SQL INSERT statement to put that tick into our database.
  3. Create a TickQueue class that will process ticks that are inserted into its Queue object as fast as possible.
  4. Create a TickMain class that will request all the market data and insert the tick events into the TickQueue.
  5. Call the TickMain class from a separate thread in the console application and watch the magic happen.
First, our Tick class
public class Tick
{
    public String type = "";
    public decimal value = 0;
    public String date = System.DateTime.Now.ToString("yyyy-MM-dd");
    public String time = System.DateTime.Now.ToString("H:mm:ss.ss");
    public String symbol = "";
    public Int32 index = 0;

    public Tick(String type, decimal value, String symbol, Int32 index)
    {
        this.type = type;
        this.value = value;
        this.symbol = symbol;
        this.index = index;
    }

    public Tick() { }

    public String toInsert()
    {
        String output = "insert into ticks (idticks,symbol,date,time,value,type) values (" +
                            index +
                            ",'" + symbol +
                            "', DATE('" + date +
                            "'),TIME('" + time + "')," +
                            value +
                            ",'" + type + "')";

        return output;
    }
}
This class is basic.  We simply hold the information about the tick, and create a method that will create our SQL INSERT statement.

Next is the TickQueue.  This is more involved...

public class TickQueue
{
    // Internal queue of db inserts
    private Queue<Tick> queue = new Queue<Tick>();
       
    // Locking Object.  Necessary as multiple threads will be
    // accessing this object simultanously
    private Object qLockObj = new object();

    //Connection object to MySQL
    public MySqlConnection conn = null;
    //Flag to stop processing
    public bool stop = false;

    //Method to enqueue a write
    public void Add(Tick item)
    {
        //Lock for single access only
        lock(qLockObj)
        {
            queue.Enqueue(item);
        }
    }

    //Method to write items as they are enqueued
    public void Run()
    {
        int n = 0;
        //Loop while the stop flag is false
        while (!stop)
        {
            //Lock and get a count of the object in the queue
            lock (qLockObj)
            {
                n = queue.Count;
            }

            //If there are objects in the queue, then process them
            if (n > 0)
            {
                process();
            }

            //Sleep for .1 seconds before looping again
            System.Threading.Thread.Sleep(100);
        }

        //When the shutdown flag is received, write any
        //values still in the queue and then stop
        Console.WriteLine("Shutting Down TickQueue; " + queue.Count + " items left");
        process();
    }

    //Method to process items in the queue
    private void process()
    {
        List<Tick> inserts = new List<Tick>();
        int i = 0;
        //Loop through the items in the queue and put them in a list
        lock (qLockObj)
        {
            for (i = 0; i < queue.Count; i++)
                inserts.Add(queue.Dequeue());
        }

        Console.WriteLine("Processing " + i + " items into database");

        //call insert for each item.
        foreach (Tick t in inserts)
            insert(t);
    }

    //Method to insert a tick
    private void insert(Tick t)
    {
        using (MySqlCommand cmd = conn.CreateCommand())
        {
            cmd.CommandText = t.toInsert();
            try
            {
                cmd.ExecuteNonQuery();
            }
            catch (Exception exp)
            {
                Console.WriteLine("OOPS " + exp.Message);
            }
        }
    }
}
You will notice that I am not using threads in this object.  The TickMain object will have the worker thread that call the Run() method in TickQueue.  Because multiple threads will be accessing the object, I've surrounded all access points to the actual queue with a lock() statement.  That will insure that only 1 thread at a time gets access.

I've tried to comment enough to give an idea of what is happening in there.  If not, let me know and I will expand.  The same goes for TickMain below.

Now the  TickMain class:
public class TickMain
{
    //Private and public accessor for the IBClient object
    private IBClient _client = null;
    public IBClient client
    {
        get { return _client; }
        set
        {
            _client = value;
            //If the client is connected, then set the queue.stop = false
            if (_client.Connected)
                queue.stop = false;
            else
                queue.stop = true;
        }

    }

    public List<String> stockList = new List<string>();
    public MySqlConnection conn = null;
    public bool doGet = true;

    private TickQueue queue = new TickQueue();
    private BackgroundWorker bg = new BackgroundWorker();
    private Dictionary<int, String> tickId = new Dictionary<int, string>();
    private int tickIndex = 0;
    private object lockObj = new object();

    //Constructors
    public TickMain()
    {
        initialize();
    }

    public TickMain(IBClient client, List<String> stockList, MySqlConnection conn)
    {
        this.client = client;
        this.stockList = stockList;
        this.conn = conn;
        initialize();
    }

    //Initialization method
    private void initialize()
    {
        //Setup the background worker to run the queue
        bg.DoWork += new DoWorkEventHandler(bg_DoWork);

        //Connect to MySQL if we haven't already
        if (conn.State != System.Data.ConnectionState.Open)
            conn.Open();
        //Don't process the queue if not connected to IB
        if (!client.Connected)
            queue.stop = true;

        //Set the MySQL connection for hte queue
        queue.conn = conn;

        //Get the next value of the queue index
        using (MySqlCommand cmd = conn.CreateCommand())
        {
            cmd.CommandText = "select coalesce(max(idticks),0) from ticks";
            MySqlDataReader Reader;

            Reader = cmd.ExecuteReader();
            Reader.Read();
            tickIndex = Reader.GetInt32(0) + 1;
            Reader.Close();
        }
    }

    //Method for getting market prices
    public void Run()
    {
        if (client.Connected)
        {
            //Set up the event handlers for the ticks
            client.TickPrice += new EventHandler<TickPriceEventArgs>(client_TickPrice);
            client.TickSize += new EventHandler<TickSizeEventArgs>(client_TickSize);
               
            //Initialize a counter for stock symbols
            int i = 1;
               
            //Start the queue
            bg.RunWorkerAsync();

            //Request market data for each stock in the stockList
            foreach (String str in stockList)
            {
                tickId.Add(i, str);
                client.RequestMarketData(i, new Equity(str), null, false, false);
                i++;
            }

            //Hang out until told otherwise
            while (doGet)
            {
                System.Threading.Thread.Sleep(100);
            }

            //Remove event handlers
            Console.WriteLine("Shutting Down TickMain");
            client.TickPrice -= new EventHandler<TickPriceEventArgs>(client_TickPrice);
            client.TickSize -= new EventHandler<TickSizeEventArgs>(client_TickSize);
            queue.stop = true;
        }
    }

    //Event handler for TickSize events
    void client_TickSize(object sender, TickSizeEventArgs e)
    {
        //Get the symbol from the dictionary
        String symbol = tickId[e.TickerId];
        int i = 0;
           
        //As this is asynchronous, lock and get the current tick index
        lock (lockObj)
        {
            i = tickIndex;
            tickIndex++;
        }

        //Create a tick object and enqueue it
        Tick tick = new Tick(EnumDescConverter.GetEnumDescription(e.TickType),
            e.Size, symbol, i);
        queue.Add(tick);
    }

    //Event Handler for TickPrice events
    void client_TickPrice(object sender, TickPriceEventArgs e)
    {
        //Get the symbol from the dictionary
        String symbol = tickId[e.TickerId];
        int i = 0;

        //As this is asynchronous, lock and get the current tick index
        lock (lockObj)
        {
            i = tickIndex;
            tickIndex++;
        }

        //Create a tick object and enqueue it
        Tick tick = new Tick(EnumDescConverter.GetEnumDescription(e.TickType),
            e.Price, symbol, i);
        queue.Add(tick);
    }

    //BackgroundWorker delegate to run the queue.
    private void bg_DoWork(object sender, DoWorkEventArgs e)
    {
        queue.Run();
    }

}
Stock requests are given a unique ID.  The tick events have this ID, not the symbol.  So we keep track of the ID and symbol pairs in a Dictionary.  You will note that we have a BackgroundWorker in here that calls and run the database write queue. The tick event handlers process each tick, assign it a unique index ID (another source of possible thread contention, hence the lock() around the index creation).

Finally the program Class in the console application
class Program
{
    static MySqlConnection conn =
        new MySqlConnection("server=LOCALHOST;DATABASE=tick_db;USER=dom;PASSWORD=dom123");
    static TickMain main = null;

    static void Main(string[] args)
    {
        //Open the connections.
        conn.Open();
        IBClient client = new IBClient();
        client.Connect("localhost", 7496, 2);

        //List of stock ticks to get
        List<String> stockList = new List<string>();
        stockList.Add("GOOG");
        stockList.Add("SPY");
        stockList.Add("SH");
        stockList.Add("DIA");

        //Initialize the TickMain object
        main = new TickMain(client, stockList, conn);
        main.doGet = true;

        //Setup a worker to call main.Run() asycronously.
        BackgroundWorker bg = new BackgroundWorker();
        bg.DoWork += new DoWorkEventHandler(bg_DoWork);
        bg.RunWorkerAsync();
           
        //Chill until the user hits enter then stop the TickMain object
        Console.ReadLine();
        main.doGet = false;

        //disconnect
        client.Disconnect();
           
        Console.WriteLine("Hit Enter to Continue");
        Console.ReadLine();
    }

    //Delegate for main.Run()
    static void bg_DoWork(object sender, DoWorkEventArgs e)
    {
        main.Run();
    }
  
}
The application will run, scrolling updates on how many records are being written to the database until you hit the enter key.  After that, the system shuts down and you are prompted to hit enter once more to exit the application.

The entire VS2010 project and SQL for the table can be found here.

My plan is to set this up to run all next week with live data, pull it into R next weekend, and see what we can see.

11 comments:

  1. IF you put this into R next week,
    Why did you label this R now?
    :(

    ReplyDelete
  2. I would love for an R user to prove me wrong and show me how I can do this in R.

    ReplyDelete
  3. Bugs in the code above prevent fractional seconds to be saved to the database. See the next post for updated code.

    ReplyDelete
  4. An old thread but I saw this and would suggest saving data to a memory based database (like memcache) and then using a separate thread to write to sqlserver. That way you avoid latency issues when sqlserver slows down due to disk access, and there are lots of ticks coming through

    ReplyDelete
    Replies
    1. If you are actively running a trading program against the values as they are coming in and are reading them from the database, then I agree.

      If you are just trying to capture the data, then the code here should be sufficient. The database writes are performed in a separate thread. Incoming ticks are put into a queue are dumped to the DB every so often. This is basically what you advocate, just in a form that cannot be queried from another process.

      If I were building a trading system, I would have the tick handler pushing the data to a messaging system as well as the queue process as written here. That would allow me to have trading algos running independent of the data capture, possibly on separate machines.

      Delete
  5. Hi Dom, Thanks! for your post. I was wondering if there is a file i can go to (I don't know much c# programming) to customize the fields that come into mysql table- for example, to not receive some of these fields, or to get any additional fields that might be available for example, bid exchange, ask exchange, last exchange etc. Thanks!

    ReplyDelete
  6. New API users will want to read:
    http://www.interactivebrokers.com/en/software/tws/usersguidebook/configuretws/api_settings.htm
    and note that first setting:
    Enable Active X and Socket Clients - Check to enable integration with TWS using ActiveX or socket clients including Java and C++. Download sample programs from the Software page on the IB website.

    ReplyDelete
  7. Easily, the article is actually the best topic on this registry related issue. I fit in with your conclusions and will eagerly look forward to your next updates. โบรกเกอร์ Forex

    ReplyDelete
  8. แนะนำเว็บไซต์รีวิวโบรกเกอร์ Eluforex เว็บรีวิวโบรกเกอร์

    ReplyDelete
  9. Now day, everything is going to find a new but well settled and successful stream for their career. When I came to this blog, I really impressed by all the knowledge points mentioned here. Thank you for this assistance.

    AFB88

    ReplyDelete