Any C++ programmers willing to help modify RTDMan?

josh1

Well-Known Member
#1
Most members here know RTDMan used to feed RT data from NOW/Nest to AmiBroker.

Original application builds 1 sec. bars.
I have modified it to create 1 min. bars. However there are small glitches which I am not able to understand.

It will be great if some C++ programmer helps to remove the bug as well as help to make it faster.
 
#2
member called kelvinhand seems to have good knowledge of programming in c++ and java.another one can be Happy_Singh or tradebullet or something but idk.
i say paste here code and i am sure some member will arise .

i am good at doing little bit reverse engineering :( sadly.
 

josh1

Well-Known Member
#3
member called kelvinhand seems to have good knowledge of programming in c++ and java.another one can be Happy_Singh or tradebullet or something but idk.
i say paste here code and i am sure some member will arise .

i am good at doing little bit reverse engineering :( sadly.
Thanks test123. kelvinhand and Mastermind know C++. tracerbullet seems busy. Anyway I shall post the code with comments tonight. The code is working but there is a stupid bug. It was quite an effort for me to learn C++
let's see.
 
Last edited:

Blackhole

Well-Known Member
#4
member called kelvinhand seems to have good knowledge of programming in c++ and java.another one can be Happy_Singh or tradebullet or something but idk.
i say paste here code and i am sure some member will arise .

i am good at doing little bit reverse engineering :( sadly.
i believe mastermind007 can help u!
 

copypasteaee

Humbled by Markets
#5
Thanks test123. kelvinhand and Mastermind know C++. tracerbullet seems busy. Anyway I shall post the code with comments tonight. The code is working but there is a stupid bug. It was quite an effort for me to learn C++
let's see.
post the code, will review it. i will try my best to resolve the issue.
 

josh1

Well-Known Member
#6
post the code, will review it. i will try my best to resolve the issue.
Well, Original source code for RTDMan_0.1.4_2 is available from the link in my signature. It can be compiled in VC++ express 2010 or above. ATL MFC is also required which I can provide or it can be downloaded from Microsoft.

Tracerbullet has done the job of polling real time data from Now/Nest beautifully. There is no problem there. Our main code lies in two files. Worker.h and worker.cpp which process the data and send to Amibroker. I have made some additions in worker.h and made lot of changes in worker.cpp.

This is how it worked. (in words of Tracerbullet)
Tracerbullet; said:
I have two threads. Thread A maintains O/H/L/C of each scrip.
Thread B takes current bar data, sends it to AB and resets O/H/L/C for next bar
Two threads are used so that AB calls dont block getting data from NEST.

Problem - If we use hh::mm and send data to AB every second, Thread B will have to do O/H/L/C reset only after LTT changes ( ie after minute changes ) to maintain correct O/H/L/C for the minute. Previously we did it on every AB call.

This change will cause us to loose some data after minute change
why? - say Thread B sends data at 10:00:58 and then 10:00:59. Because minute is still 00 it wont reset O/H/L/C.

Then we get 3 ticks from Nest, 1 of which is say at 10:01:01. All three ticks are read and set to O/H/L/C by Thread A with LTT at 10:01. When Thread B reads this data, it sees that LTT changed, so it will send the data till 10:00:59 + the 3 ticks to AB and then reset O/H/L/C for next minute.

So the first candle update for 10:01 will actually be duplicate of 10:00 + 3 ticks. Next update in 10:01 will overwrite this first candle update.

Solution will need invasive changes that i dont want to do now. Maybe in a few months

1. Option 1 is for thread A to call thread B synchronously whenever LTT changes to next minute. This will still be complicated as we have more than 1 scrips only some of which may have LTT changed to next minute.
Anyway this will block thread A which i dont want

2. Option 2 is for thread A to not maintain O/H/L/C. Instead it collects all ticks in an array for each scrip and thread B constructs O/H/L/C and differentates between 10:00 and 10:01
This is cleaner. It will probably need more cpu time but thats ok.

I hope you understood. You can look at code and suggest simpler ways.
Alternative - Create an AB scrip that compresses its database to 1min (if possible)
 

josh1

Well-Known Member
#7
In my first attempt, I made changes to the way in which thread A (worker::processRTDdata) was maintaining OHLC of scrips. Entire processing is done in that thread and thread B (AmibrokerPoller) accesses the bars every second and pushes them to Amibroker.

The new code is here-
New header file-
Code:
#ifndef RTDMAN_WORKER_H
#define RTDMAN_WORKER_H

#include "rtd_client.h"
#include "amibroker_feed.h"
#include "settings.h"

#include <string>
#include <vector>
#include <map>
#include <utility>
#include <fstream> 

#include <forward_list>

class Worker{

public:
    Worker();
    ~Worker();

    void connect();
    void poll();    
    void stop();                                                            // Stop Thread and quit    

private:
    struct ScripBar; struct ScripState;  struct RTData;                                   // Forward Declare

    RTDClient                           *rtd_client;
    Amibroker                           *amibroker;
    Settings                             settings;    

    std::map< long, std::pair<int,int>>  topic_id_to_scrip_field_map;       // topic_id  :  scripd_id,field_id
    ScripState                          *current, *previous;                // Maintains Current and last state of each Scrip
    std::string                          today_date;                            // (in same order as Settings::Scrip::topic_name)
    std::ofstream                        csv_file_out;    

	std::forward_list<RTData>			 array1, array2;

    CRITICAL_SECTION                     lock;                              // Thread Data sync    
    HANDLE                               Event_RTD_Update;                  // Signaled by RTD Callback
    HANDLE                               AB_timer;                          // Timer for AB poller    
    HANDLE                               Event_StopNow;                     // This will be used to stop AB thread 
    HANDLE                               Event_Stopped;                     // This will be fired after thread is done    
    

    void        loadSettings    ();    
    void        processRTDData  ( const std::map<long,CComVariant>* data );
    static void threadEntryDummy( void* _this);                             // Entry Point for Amibroker Feeder thread
    void        amibrokerPoller ();                                             // Fetches Bar data and feeds to Amibroker
    void        writeCsv        ( const std::vector<ScripBar> & bars  );        // This thread uses members - current , previous, settings
    Worker( const Worker& );                                                // Disable copy
    Worker operator=(const Worker& );

    
    // Used to create and resolve Topic ids
    enum SCRIP_FIELDS{                                                      // -- Topic 2 --        
        LTP=0,                                                              // "LTP"
        LTT=1,                                                              // "LTT"        
        VOLUME_TODAY=2,                                                     // "Volume Traded Today"
        OI=3,                                                               // "Open Interest"        
        FIELD_COUNT=4                                                       // No of Fields used
    };
    struct ScripState {                                                        
        double       ltp;                                                    
        std::string  ltt;                                                   // ltt can be empty for index scrips
//        std::string  last_bar_ltt;                                          // Removed by Josh1
        long long    vol_today;
        long long    oi;
		long long    volume;												//Inserted by Josh1
                
        double       bar_high;
        double       bar_low;
        double       bar_open;        

        ScripState();
        void  reset();
        bool  operator==(const ScripState& right) const ;
    };
    struct ScripBar{                                                        // Bar data to Amibroker
        std::string  ticker;
        std::string  ltt;

        double       bar_open;    
        double       bar_high;
        double       bar_low;
        double       bar_close;
        long long    volume;
        long long    oi;
    };

	
	//RTData Inserted by Josh1
    struct RTData {   
		int			 sid;
		std::string  ticker;
        double       ltp;                                                    
        std::string  ltt;                                                   // ltt can be empty for index scrips
        long long    vol_today;
        long long    oi;
		long long    volume;												//Inserted by Josh1

		RTData();
		void  reset();
    };

};


#endif
New worker.cpp
Code:
#include "worker.h"
#include "misc_util.h"
#include "amibroker_feed.h"

#include <windows.h> 
#include <process.h>

#include <iostream>
#include <sstream>
#include <limits>

/**
 * Read Scrips and setup DS. Start Timers and start AB thread
 */
Worker::Worker(){
                                                                           // _T()  - character set Neutral
    Event_RTD_Update = CreateEvent( NULL, false, FALSE, _T("RTD_UPDATE") );// Manual Reset = false - Event resets to nonsignaled on 1 wait release
    Event_StopNow    = CreateEvent( NULL, true,  FALSE, NULL );                // Initialize state to FALSE. Read data only after callback
    Event_Stopped    = CreateEvent( NULL, true,  FALSE, NULL );
    AB_timer         = CreateWaitableTimer( NULL, false,NULL );

    today_date       = MiscUtil::getTime("%Y%m%d");                        // Get todays date - yyyymmdd

    settings.loadSettings();

    rtd_client = new RTDClient( settings.rtd_server_prog_id  );
    current    = new ScripState[ settings.no_of_scrips ] ;
    previous   = new ScripState[ settings.no_of_scrips ] ;
                                                                                    
    for( int i=0 ; i<settings.no_of_scrips ; i++ ){                        // Make map key topic_id and value = scripd_id,field_id 
        for( int j=0 ; j<FIELD_COUNT ; j++ ){                              // topic_id generated using FIELD_COUNT as base multiplier for each scrip
            topic_id_to_scrip_field_map[ i*FIELD_COUNT + j  ]  =  std::make_pair( i,j );
        }
    }    

    LARGE_INTEGER start_now = {0};                                         // Start Timers immediately    
 //   SetWaitableTimer( AB_timer , &start_now, settings.bar_period, NULL, NULL, false );
    SetWaitableTimer( AB_timer , &start_now, 1000, NULL, NULL, false );	// Bar period hardcoded by Josh1

    InitializeCriticalSection( &lock );
    _beginthread( threadEntryDummy, 0, this );                             // Start Amibroker Poller Thread
}


/**
 * Cleanup
 */
Worker::~Worker(){    
    CancelWaitableTimer( AB_timer ) ;
        
    CloseHandle(AB_timer) ;    
    CloseHandle(Event_RTD_Update) ;    
    CloseHandle(Event_StopNow) ;
    CloseHandle(Event_Stopped) ;

    if( csv_file_out.is_open() ){                                          // Close file if not done - just in case
        csv_file_out.close();
    }    

    delete [] current;     current    = 0;
    delete [] previous;    previous   = 0;
    delete rtd_client;     rtd_client = 0;
}

/**
 * Signal Thread to stop and wait for it - Wait Maximum 3 seconds
 **/
void Worker::stop(){    

    if( SetEvent(Event_StopNow)   ){        
        WaitForSingleObject( Event_Stopped, 3*1000 );        
    }    
}

Worker::ScripState::ScripState() : 
    ltp(0), vol_today(0), oi(0), bar_high(0), bar_low(std::numeric_limits<double>::infinity()), bar_open(0)    
{}

void Worker::ScripState::reset(){
    ltp = 0; vol_today = 0; oi =0; bar_high = 0; bar_low = std::numeric_limits<double>::infinity(); bar_open = 0; last_bar_ltt="";
}

bool Worker::ScripState::operator==(const ScripState& right) const{
    return (ltp == right.ltp)  && (vol_today == right.vol_today) &&
           (oi  == right.oi )  && (bar_high  == right.bar_high)  &&
           (ltt == right.ltt)  && (bar_low   == right.bar_low ); 
}



/** 
 * Connect Topics
 */
void Worker::connect(){
    
    rtd_client->startServer();

    for( int i=0 ; i<settings.no_of_scrips ; i++ ){        
        
        std::cout <<  settings.scrips_array[i].ticker  << std::endl ;

        long        topic_id = i * FIELD_COUNT;
        std::string topic_1  = settings.scrips_array[i].topic_name;
                        
        rtd_client->connectTopic(topic_id+LTP,          topic_1, settings.scrips_array[i].topic_LTP       );
        rtd_client->connectTopic(topic_id+LTT,          topic_1, settings.scrips_array[i].topic_LTT       );
        rtd_client->connectTopic(topic_id+VOLUME_TODAY, topic_1, settings.scrips_array[i].topic_vol_today );
        rtd_client->connectTopic(topic_id+OI,           topic_1, settings.scrips_array[i].topic_OI        );
    }     
}

    
/**
 * Wait for RTD Update event. On event, read new data and setup Current Bars
 */
void Worker::poll(){
        
    while(1){    
        if( WaitForSingleObject( Event_RTD_Update, INFINITE ) ==  WAIT_OBJECT_0 ){                    

            std::map<long,CComVariant>*  data = rtd_client->readNewData() ;
            if( data != 0 && !data->empty() ){
                processRTDData( data );                
            }            
            delete data;            
        }
    }
}

/**
 * Read TopicId-Value data from COM and update Current Bar
 **/
void Worker::processRTDData( const std::map<long,CComVariant>* data ){

	int script_id , prev_script_id = 999999
                
    for( auto i=data->begin(), end=data->end() ;  i!=end ; ++i  ){

        const long   topic_id     = i->first;
        CComVariant  topic_value  = i->second;
                
        std::pair<int,int> &ids = topic_id_to_scrip_field_map.at( topic_id ) ; 
        int script_id   =  ids.first;                                      // Resolve Topic id to Scrip id and field
        int field_id    =  ids.second;

		EnterCriticalSection( &lock );                                     // Lock when accessing current[] / previous[]

//		Changes made by Josh1 
//****************************************************************************

        ScripState *_current = & current[script_id];

		switch( field_id ){                        
            case LTP :{
                double      ltp      = MiscUtil::getDouble( topic_value );
                _current->ltp = ltp;
                break ;    
            }
            case VOLUME_TODAY :{  
                long long vol_today          = MiscUtil::getLong  ( topic_value );
                _current->vol_today = vol_today;
                 break ;
            }
            case LTT  : {
				_current->ltt  = MiscUtil::getString( topic_value );  
				//Convert time from "%H:%M:%S" to "HHmm" format
				_current->ltt.erase(2,1);											// Remove colons in the time string
					// If Bar Period is 60000 then "HHmm" format, remove all characters after 4th
				settings.bar_period == 60000 ? _current->ltt.erase(4) : _current->ltt.erase(4,1);	// Remove Colon and Seconds from time string
				break ;
			}

            case OI   :  _current->oi   = MiscUtil::getLong  ( topic_value ); break ;
        }	
		//Data from COM server comes in pairs of Topic values  
		//I assume that all pairs for one scrip come together
		//Therefore, creation of current bar with info from data is complete here


		if (_current->vol_today == 0){									//Index does not have LTT Topic but Vol_today is always zero
					_current->ltt = MiscUtil::getTime( "%H:%M:%S" );	//Hence set ltt for Index 
					//Convert time to "HHmmss" format
					_current->ltt.erase(2,1);							// Remove colons in the time string
					// If Bar Period is 60000 then "HHmm" format, all characters after 4th
					settings.bar_period == 60000 ? _current->ltt.erase(4) : _current->ltt.erase(4,1);	
				}

        if (current[script_id].ltt  != previous[script_id].ltt) {			// For startup or subsequent period, ltt != previous ltt
                _current->bar_high = _current->ltp;							// initialise O,H,L with LTP
                _current->bar_low  = _current->ltp;
                _current->bar_open = _current->ltp;
//Last change
				if(previous[script_id].vol_today = 0)  previous[script_id].vol_today = _current->vol_today  ;
//Last change end
				_current->volume = _current->vol_today - previous[script_id].vol_today;
				previous[script_id].ltt = current[script_id].ltt;
//Last change
				previous[script_id].vol_today = _current->vol_today;
//Last change end
		}
		else {
                if( _current->bar_high < _current->ltp )    _current->bar_high = _current->ltp;
                if( _current->bar_low  >_current->ltp )    _current->bar_low  = _current->ltp;
				_current->volume = _current->vol_today - previous[script_id].vol_today;
		}

		LeaveCriticalSection( &lock ) ;
/*
		std::cout << script_id << " - " <<
					current[script_id].ltt <<" - " << 
					current[script_id].ltp <<" - "<<
					current[script_id].vol_today <<" - "<<
					current[script_id].oi <<" - "<<
					previous[script_id].ltt << std::endl;
*/
//****************************************************************************
//		End Changes made by Josh1 

	}    

}


/**
 *    New Thread Entry Point
 **/
void Worker::threadEntryDummy(void* _this){
    ((Worker*)_this)->amibrokerPoller();
}

void Worker::amibrokerPoller(){

    std::vector<ScripBar>  new_bars;
    amibroker = new Amibroker( settings.ab_db_path, settings.csv_path, std::string("rtd.format") );
                                                                           // amibroker constructor has to be called in new thread 
    while(1){    
        // Use events and timers instead of sleep which would be blocking 
        // Need to exit thread cleanly and immediately on application quit - sleep would block

        HANDLE   events[]    = {Event_StopNow,AB_timer};                   // (A) Wait For Timer Event / Application Quit Event
        DWORD    return_code = WaitForMultipleObjects( 2, events , false, INFINITE );                
                                                                            
        if( return_code == WAIT_OBJECT_0 ){                                // Quit Event
            delete amibroker; amibroker = 0;
            SetEvent(Event_Stopped) ;
            std::cout << "AB Feeder Thread Stopped" << std::endl;
            return;
        }
        else if( return_code != WAIT_OBJECT_0 + 1 ){                       // If not Timer Event, then we have some error
            std::stringstream msg;  msg << "WaitForSingleObject Failed - " << return_code;
            throw( msg.str() );                
        }        

    // Shared data access start
        EnterCriticalSection( &lock );

        for( int i=0 ; i<settings.no_of_scrips ; i++  ){                   // (B) Setup Bar data for each updated scrip using current and previous

            ScripState *_current  =  &current[i];
            ScripState *_prev     =  &previous[i];
            long long bar_volume  =  _current->volume;
                                                                           // If data not changed, skip
            if( (_current->bar_open == 0)                   ||             // 1. No New data from readNewData()     
                (bar_volume==0 && _current->vol_today!=0)   ||             // 2. Also skip if bar volume 0 but allow 0 volume scrips like forex
                ((*_current) == (*_prev))                                  // 3. We got new data from readNewData() but its duplicate
              )    continue;                                               //    NEST RTD sends all fields even if unconnected field (ex B/A) changes    
                                    
            std::string bar_ltt;                                           // Use ltt if present else use current time  
			bar_ltt = _current->ltt;

			
/********************************************************************************************************************************************************
//		Following code inserted by Josh1 for removing ticks that are away from current time by three minutes or more.
/********************************************************************************************************************************************************			
			 std::string Current_time = MiscUtil::getTime( settings.time_format.c_str() ); // get current time and
			 Current_time.erase(2,1);														// remove colon from time
			 Current_time.erase(4,1);
//			 Current_time.erase(4);											// Remove Colon and Seconds from Current time
			 //convert bar_ltt and current time to integers and find time difference
			 int i_Current_time = std::stoi (Current_time,nullptr,0);
			 int i_ltt = std::stoi (bar_ltt,nullptr,0);
			 int time_diff = i_ltt - i_Current_time;

			 if(time_diff > 300) {						// if time difference is greater than 3 minutes, skip to next scrip.
				 _current->reset();
				 continue;								// This is to remove Previous days's quotes of 15:29 captured in the morning.
				}										// also to remove stray quotes received if any. 

//**************End changes by Josh1**********************************************************************************************************************/


			 
			 new_bars.push_back( ScripBar() );
            ScripBar* bar = &new_bars.back();

            bar->ltt                = _current->ltt;
             _current->last_bar_ltt = bar_ltt;

//Last change
//            _prev->vol_today !=0    ? bar->volume = _current->volume    : bar->volume = 0;// Ignore First bar volume as prev bar is not set.
			 bar->volume = _current->volume;
//Last change end
														
            bar->ticker     = settings.scrips_array[i].ticker;             // Otherwise, we get today's volume = First Bar volume
            bar->bar_open   = _current->bar_open;                                
            bar->bar_high   = _current->bar_high;
            bar->bar_low    = _current->bar_low;
            bar->bar_close  = _current->ltp;            
            bar->oi         = _current->oi;


//			Inserted by Josh1
//            if(  bar_ltt != _prev->ltt  ){                        // If time stamp of current bar is not same as previous bar then
//               _prev->vol_today = _current->vol_today;             // On startup prev vol is 0, Set it so that we can get first bar volume
//			   _prev->ltt = bar_ltt;
//			}

/*
		std::cout << bar->ticker << " - " <<
					bar->ltt <<" - " << 
					bar->bar_close <<" - "<<
					bar->bar_open <<" - "<<
					bar->bar_high <<" - "<<
					bar->bar_low  <<" - "<<
					settings.bar_period <<" - "<<
					a << std::endl;
*/			
        }

		LeaveCriticalSection( &lock );
    // Shared data access end

		if( !new_bars.empty() ){                                           // (C) Write to csv    and Send to Amibroker
            writeCsv( new_bars );
            amibroker->import();            
        }
        new_bars.clear();
    }
}
 

void Worker::writeCsv( const std::vector<ScripBar> & bars ){
    
    csv_file_out.open( settings.csv_path  );                               // Setup output stream to csv file
    if( !csv_file_out.is_open() ){                                         // Reopening will also clear old content by default
        throw( "Error opening file - " + settings.csv_path );        
    }

    size_t          size = bars.size();
    const ScripBar *bar;

    for( size_t i=0 ; i<size ; i++ ){                                      // $FORMAT Ticker, Date_YMD, Time, Open, High, Low, Close, Volume, OpenInt
        bar = &bars[i];
        csv_file_out << bar->ticker     << ',' 
                     << today_date      << ',' 
                     << bar->ltt        << ',' 
                     << bar->bar_open   << ',' 
                     << bar->bar_high   << ',' 
                     << bar->bar_low    << ',' 
                     << bar->bar_close  << ',' 
                     << bar->volume     << ',' 
                     << bar->oi         << std::endl ;
    }

    csv_file_out.close();
}
This code is working well. I have hardcoded the refresh period as 1 second.
It creates 1sec. and 1 min. bars optionally as per bar period in settings.ini set as 1000 or 60000.

I tallied the results with Hourly statistics data from NOW. It is tallies to a large extent.
However, it is missing some ticks. (In every 1 crore nifty futures, 6000 to 20000 shares are missed). This happens because thread A is maintaining OHLC from data received in array. It creates one current and one previous bar for each scrip. If it receives 1 tic of one period and 1 tick of next period in the same array, it creates new current bar and data for first tick is lost as it is pushed to previous bar. This also creates slight 5, 10 paise deviation in OHLC and volume of bars.

Though this is acceptable in time frames of 3 min and above, I am not satisfied with it. Therefore I decided to go for option 2 of Tracerbullet.
 
Last edited:

josh1

Well-Known Member
#8
I created new structure RTDData to hold tic data and two forward_lists, array1 and array2 containing the structure.

Thread Process_RTDdata receives real time data from RTD server and puts it into array1 with push.front.

Thread Amibroker_Poller copies array1 into array2 -- clears array1 -- reverses array2 and thereafter processes data from array 2 for maintaining OHLC of current Bar. OHLC is maintained as per the same logic as my earlier code. The current bar is sent to Amibroker every second.

This code is also working properly. Though I have not tallied it with Hourly statistics data yet.
However, there is one stupid bug in the code. It is not creating bars for the last scrip given in settings.ini. This problem can be solved easily by including one extra scrip in settings.ini. However, that is bad work around. I need immediate help to solve this.

Here is the code (worker.cpp) for second option.
Code:
/**
  Copyright (C) 2014  SpiffSpaceman

  This program is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  any later version.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program.  If not, see <http://www.gnu.org/licenses/>
**/


#include "worker.h"
#include "misc_util.h"
#include "amibroker_feed.h"

#include <windows.h> 
#include <process.h>

#include <iostream>
#include <sstream>
#include <limits>


/**
 * Read Scrips and setup DS. Start Timers and start AB thread
 */
Worker::Worker(){
                                                                           // _T()  - character set Neutral
    Event_RTD_Update = CreateEvent( NULL, false, FALSE, _T("RTD_UPDATE") );// Manual Reset = false - Event resets to nonsignaled on 1 wait release
    Event_StopNow    = CreateEvent( NULL, true,  FALSE, NULL );                // Initialize state to FALSE. Read data only after callback
    Event_Stopped    = CreateEvent( NULL, true,  FALSE, NULL );
    AB_timer         = CreateWaitableTimer( NULL, false,NULL );

    today_date       = MiscUtil::getTime("%Y%m%d");                        // Get todays date - yyyymmdd

    settings.loadSettings();

    rtd_client = new RTDClient( settings.rtd_server_prog_id  );
    current    = new ScripState[ settings.no_of_scrips ] ;
    previous   = new ScripState[ settings.no_of_scrips ] ;
                                                                                    
    for( int i=0 ; i<settings.no_of_scrips ; i++ ){                        // Make map key topic_id and value = scripd_id,field_id 
        for( int j=0 ; j<FIELD_COUNT ; j++ ){                              // topic_id generated using FIELD_COUNT as base multiplier for each scrip
            topic_id_to_scrip_field_map[ i*FIELD_COUNT + j  ]  =  std::make_pair( i,j );
        }
    }    

    LARGE_INTEGER start_now = {0};                                         // Start Timers immediately    
 //   SetWaitableTimer( AB_timer , &start_now, settings.bar_period, NULL, NULL, false );
    SetWaitableTimer( AB_timer , &start_now, 1000, NULL, NULL, false );	// Bar period hardcoded by Josh1

    InitializeCriticalSection( &lock );
    _beginthread( threadEntryDummy, 0, this );                             // Start Amibroker Poller Thread
}


/**
 * Cleanup
 */
Worker::~Worker(){    
    CancelWaitableTimer( AB_timer ) ;
        
    CloseHandle(AB_timer) ;    
    CloseHandle(Event_RTD_Update) ;    
    CloseHandle(Event_StopNow) ;
    CloseHandle(Event_Stopped) ;

    if( csv_file_out.is_open() ){                                          // Close file if not done - just in case
        csv_file_out.close();
    }    

    delete [] current;     current    = 0;
    delete [] previous;    previous   = 0;
    delete rtd_client;     rtd_client = 0;
}

/**
 * Signal Thread to stop and wait for it - Wait Maximum 3 seconds
 **/
void Worker::stop(){    

    if( SetEvent(Event_StopNow)   ){        
        WaitForSingleObject( Event_Stopped, 3*1000 );        
    }    
}

Worker::ScripState::ScripState() : 
    ltp(0), vol_today(0), oi(0), bar_high(0), bar_low(std::numeric_limits<double>::infinity()), bar_open(0)    
{}

void Worker::ScripState::reset(){
    ltp = 0; vol_today = 0; oi =0; bar_high = 0; bar_low = std::numeric_limits<double>::infinity(); bar_open = 0; 
}

bool Worker::ScripState::operator==(const ScripState& right) const{
    return (ltp == right.ltp)  && (vol_today == right.vol_today) &&
           (oi  == right.oi )  && (bar_high  == right.bar_high)  &&
		   (ltt == right.ltt)  && (bar_low   == right.bar_low ) && (volume == right.volume); 
}

//Inserted by Josh1
Worker::RTData::RTData() : 
    ltp(0), vol_today(0), oi(0),volume(0), ltt()    
{}

void Worker::RTData::reset(){
    ltp = 0; vol_today = 0; oi =0; volume= 0; ltt.empty(); ticker.empty();
}


/** 
 * Connect Topics
 */
void Worker::connect(){
    
    rtd_client->startServer();

    for( int i=0 ; i<settings.no_of_scrips ; i++ ){        
        
        std::cout <<  settings.scrips_array[i].ticker  << std::endl ;

        long        topic_id = i * FIELD_COUNT;
        std::string topic_1  = settings.scrips_array[i].topic_name;
                        
        rtd_client->connectTopic(topic_id+LTP,          topic_1, settings.scrips_array[i].topic_LTP       );
        rtd_client->connectTopic(topic_id+LTT,          topic_1, settings.scrips_array[i].topic_LTT       );
        rtd_client->connectTopic(topic_id+VOLUME_TODAY, topic_1, settings.scrips_array[i].topic_vol_today );
        rtd_client->connectTopic(topic_id+OI,           topic_1, settings.scrips_array[i].topic_OI        );
    }     
}

    
/**
 * Wait for RTD Update event. On event, read new data and setup Current Bars
 */
void Worker::poll(){
        
    while(1){    
        if( WaitForSingleObject( Event_RTD_Update, INFINITE ) ==  WAIT_OBJECT_0 ){                    

            std::map<long,CComVariant>*  data = rtd_client->readNewData() ;
            if( data != 0 && !data->empty() ){
                processRTDData( data );                
            }            
            delete data;            
        }
    }
}

/**
 * Read TopicId-Value data from COM and update Current Bar
 **/
void Worker::processRTDData( const std::map<long,CComVariant>* data ){
	
	RTData mydata;												//inserted by Josh1
	int  prev_script_id = 999999;								// set prev_script_id as a large integer which can never be reached 

	for( auto i=data->begin(), end=data->end() ;  i!=end ; ++i  ){
		
        const long   topic_id     = i->first;
        CComVariant  topic_value  = i->second;
                
        std::pair<int,int> &ids = topic_id_to_scrip_field_map.at( topic_id ) ; 

		int script_id   =  ids.first;                                      // Resolve Topic id to Scrip id and field
        int field_id    =  ids.second;

//		Changes made by Josh1 - fill all fields of mydata from RTDdata.
//****************************************************************************

		if(mydata.sid != script_id && prev_script_id != 999999) {	//if data for different scrip and it is not 
			array1.push_front(mydata);									// first scrip then push it to array1
			mydata.reset();
		};

		mydata.ticker = settings.scrips_array[script_id].ticker;		// fill scrip name from settings
		mydata.sid = script_id;											// store script id also
		prev_script_id = script_id;										// and push to previous script id for comparing next time

		switch( field_id ){											//Start filling data in my data.
            case LTP :{
                double      ltp      = MiscUtil::getDouble( topic_value );
                mydata.ltp = ltp;
                break ;    
            }
            case VOLUME_TODAY :{  
                long long vol_today          = MiscUtil::getLong  ( topic_value );
                mydata.vol_today = vol_today;
                 break ;
            }
            case LTT  : {
				mydata.ltt  = MiscUtil::getString( topic_value );  
				break ;
			}

            case OI   :  mydata.oi   = MiscUtil::getLong  ( topic_value ); break ;
        }	
/*
//Uncomment these lines to view data received from RTD server
		std::cout << mydata.ticker << " - " <<
					script_id <<" - " << 
					current[script_id].ltt <<" - " << 
					current[script_id].ltp <<" - "<<
					current[script_id].vol_today <<" - "<<
					current[script_id].oi  << std::endl;
*/
/*
//Uncomment these lines to view data filled in mydata structure
		std::cout << mydata.ticker << " - " <<
					mydata.sid <<" - " << 
					mydata.ltt <<" - " << 
					mydata.ltp <<" - "<<
					mydata.vol_today <<" - "<<
					mydata.oi  << std::endl;
*/

//****************************************************************************
//		End Changes made by Josh1 

	}    

}


/**
 *    New Thread Entry Point
 **/
void Worker::threadEntryDummy(void* _this){
    ((Worker*)_this)->amibrokerPoller();
}

void Worker::amibrokerPoller(){

    std::vector<ScripBar>  new_bars;
    amibroker = new Amibroker( settings.ab_db_path, settings.csv_path, std::string("rtd.format") );
                                                                           // amibroker constructor has to be called in new thread 
    while(1){    
        // Use events and timers instead of sleep which would be blocking 
        // Need to exit thread cleanly and immediately on application quit - sleep would block

        HANDLE   events[]    = {Event_StopNow,AB_timer};                   // (A) Wait For Timer Event / Application Quit Event
        DWORD    return_code = WaitForMultipleObjects( 2, events , false, INFINITE );                
                                                                            
        if( return_code == WAIT_OBJECT_0 ){                                // Quit Event
            delete amibroker; amibroker = 0;
            SetEvent(Event_Stopped) ;
            std::cout << "AB Feeder Thread Stopped" << std::endl;
            return;
        }
        else if( return_code != WAIT_OBJECT_0 + 1 ){                       // If not Timer Event, then we have some error
            std::stringstream msg;  msg << "WaitForSingleObject Failed - " << return_code;
            throw( msg.str() );                
        }        

    // Shared data access start
        EnterCriticalSection( &lock );
			array2 = array1;
			array1.clear();
		LeaveCriticalSection( &lock );
    // Shared data access end

		array2.reverse();

	for( auto it = array2.begin(); it!= array2.end() ;  ++it  ){

/******************************************************************
// Uncomment these lines to view data filled in array2
			std::cout << it->ticker <<	" - " <<
					 it->ltt <<" - " << 
					 it->ltp <<" - "<<
					 it->vol_today <<" - "<<
					 it->oi  << " before "<<std::endl;
*/
		int s_id   = it->sid;
		
		ScripState *_current  =  &current[s_id];
        ScripState *_prev     =  &previous[s_id];

        _current->ltp		= it->ltp;							//Start creating bars                              
        _current->ltt		= it->ltt;;                         // ltt can be empty for index scrips
        _current->vol_today	= it->vol_today;
		_current->oi		= it->oi;

		
		if (_current->vol_today == 0 //&& current->ltt.empty()
			){									//Index does not have LTT Topic but Vol_today is always zero
			_current->ltt = MiscUtil::getTime( "%H:%M:%S" );	//Hence set ltt for Index 
		}
																//Convert time to "HHmmss" format
			_current->ltt.erase(2,1);							// Remove colons in the time string
																// If Bar Period is 60000 then "HHmm" format, remove 
  			 													//all characters after 4th to create 1min bars
			settings.bar_period == 60000 ?	_current->ltt.erase(4) : _current->ltt.erase(4,1);					

        if (_current->ltt  != _prev->ltt || _prev->ltt.empty()) {			// For startup or first tick of subsequent 
                _current->bar_high = _current->ltp;							// period, ltt != previous ltt 
                _current->bar_low  = _current->ltp;							// so O,H,L are equalto LTP
                _current->bar_open = _current->ltp;
				_current->volume = _current->vol_today - _prev->vol_today;	// calculate volume for bar
				_prev->ltt = _current->ltt;									// Push current bar time and volume 
				_prev->vol_today = _current->vol_today;						// to previous bar
//				_prev = _current;
		}																	// if ltt is same, we have to adjust
		else {																// High and Low of bar
                if( _current->bar_high < _current->ltp )    _current->bar_high = _current->ltp;
                if( _current->bar_low  >_current->ltp )    _current->bar_low  = _current->ltp;
				_current->volume = _current->vol_today - _prev->vol_today;	//calculate volume for bar
		}

/***************************************************************************
// Uncomment these lines to view data filled in current script
					std::cout <<
						it->ticker << " - " <<
					_current->ltt <<" - " << 
					_current->bar_open <<" - " <<
					_current->bar_high <<" - " <<
					_current->bar_low <<" - " <<
					_current->ltp << " - " <<
					_current->volume <<" - "<<
					_current->oi <<" - "<<
					 std::endl;
*/

			// If data not changed, skip
            if( (_current->bar_open == 0)                   ||             // 1. No New data from readNewData()     
                (_current->volume==0 && _current->vol_today!=0)   ||       // 2. Also skip if bar volume 0 but allow 0 volume scrips like forex
                ((*_current) == (*_prev))                                  // 3. We got new data from readNewData() but its duplicate
              )    continue;                                               //    NEST RTD sends all fields even if unconnected field (ex B/A) changes    
                                    
		
/********************************************************************************************************************************************************
//		Following code inserted by Josh1 for removing ticks that are away from current time by three minutes or more.
/*******************************************************************************************************************************************************
			 std::string Current_time = MiscUtil::getTime( settings.time_format.c_str() ); // get current time and
			 Current_time.erase(2,1);														// remove colon from time
			 Current_time.erase(4,1);
//			 Current_time.erase(4);											// Remove Colon and Seconds from Current time
			 //convert bar_ltt and current time to integers and find time difference
			 int i_Current_time = std::stoi (Current_time,nullptr,0);
			 int i_ltt = std::stoi (bar_ltt,nullptr,0);
			 int time_diff = i_ltt - i_Current_time;

			 if(time_diff > 300) {						// if time difference is greater than 3 minutes, skip to next scrip.
				 _current->reset();
				 continue;								// This is to remove Previous days's quotes of 15:29 captured in the morning.
				}										// also to remove stray quotes received if any. 
		}

*/				 
			 new_bars.push_back( ScripBar() );
            ScripBar* bar = &new_bars.back();

            bar->ltt        = _current->ltt;
			bar->volume		= _current->volume;
            bar->ticker     = it->ticker;             
            bar->bar_open   = _current->bar_open;                                
            bar->bar_high   = _current->bar_high;
            bar->bar_low    = _current->bar_low;
            bar->bar_close  = _current->ltp;            
            bar->oi         = _current->oi;
/************************************************************
// Uncomment these lines to view bars sent to Amibroker
		std::cout << bar->ticker << " - " <<
					bar->ltt <<" - " << 
					bar->bar_close <<" - "<<
					bar->bar_open <<" - "<<
					bar->bar_high <<" - "<<
					bar->bar_low  <<" - "<<
					bar->volume  <<" - "<<
					bar->oi <<" - "<< std::endl;
*/		
        }

		array2.clear();

		if( !new_bars.empty() ){                                           // (C) Write to csv    and Send to Amibroker
            writeCsv( new_bars );
            amibroker->import();            
        }
        new_bars.clear();

	}

}
 

void Worker::writeCsv( const std::vector<ScripBar> & bars ){
    
    csv_file_out.open( settings.csv_path  );                               // Setup output stream to csv file
    if( !csv_file_out.is_open() ){                                         // Reopening will also clear old content by default
        throw( "Error opening file - " + settings.csv_path );        
    }

    size_t          size = bars.size();
    const ScripBar *bar;

    for( size_t i=0 ; i<size ; i++ ){                                      // $FORMAT Ticker, Date_YMD, Time, Open, High, Low, Close, Volume, OpenInt
        bar = &bars[i];
        csv_file_out << bar->ticker     << ',' 
                     << today_date      << ',' 
                     << bar->ltt        << ',' 
                     << bar->bar_open   << ',' 
                     << bar->bar_high   << ',' 
                     << bar->bar_low    << ',' 
                     << bar->bar_close  << ',' 
                     << bar->volume     << ',' 
                     << bar->oi         << std::endl ;
    }

    csv_file_out.close();
}
There are some other issues which can be looked at later.
1. Whether list can be used instead of forward_list.
2. RTD server sometimes sends previous days' last tic in the morning. Tracerbullet's code ignores first tic of the day as a workaround. However, tic is missed in case of scrips for which RTD server sends current day's data.
3. compact the code.
 
Last edited:

mastermind007

Well-Known Member
#9
To be able to help you with this, I will need to see and understand the exact tick data that comes from the source you happen to have. Looking at source alone will not reveal the error
 

josh1

Well-Known Member
#10
To be able to help you with this, I will need to see and understand the exact tick data that comes from the source you happen to have. Looking at source alone will not reveal the error
Are you using NOW or Nest Trader as your trading terminal? If yes, it is easy. You can obtain the application free from my other thread. Here - http://www.traderji.com/intraday/97...est-trader-amibroker-fcharts.html#post1050832http://www.traderji.com/intraday/97637-real-time-data-now-nest-trader-amibroker-fcharts.html#post1050832

My logic is similar to yours in previous post.
 
Last edited:

Similar threads