/**
Copyright (C) 2014 SpiffSpaceman
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
**/
#include "worker.h"
#include "misc_util.h"
#include "amibroker_feed.h"
#include <windows.h>
#include <process.h>
#include <iostream>
#include <sstream>
#include <limits>
/**
* Read Scrips and setup DS. Start Timers and start AB thread
*/
Worker::Worker(){
// _T() - character set Neutral
Event_RTD_Update = CreateEvent( NULL, false, FALSE, _T("RTD_UPDATE") );// Manual Reset = false - Event resets to nonsignaled on 1 wait release
Event_StopNow = CreateEvent( NULL, true, FALSE, NULL ); // Initialize state to FALSE. Read data only after callback
Event_Stopped = CreateEvent( NULL, true, FALSE, NULL );
AB_timer = CreateWaitableTimer( NULL, false,NULL );
today_date = MiscUtil::getTime("%Y%m%d"); // Get todays date - yyyymmdd
settings.loadSettings();
rtd_client = new RTDClient( settings.rtd_server_prog_id );
current = new ScripState[ settings.no_of_scrips ] ;
previous = new ScripState[ settings.no_of_scrips ] ;
for( int i=0 ; i<settings.no_of_scrips ; i++ ){ // Make map key topic_id and value = scripd_id,field_id
for( int j=0 ; j<FIELD_COUNT ; j++ ){ // topic_id generated using FIELD_COUNT as base multiplier for each scrip
topic_id_to_scrip_field_map[ i*FIELD_COUNT + j ] = std::make_pair( i,j );
}
}
LARGE_INTEGER start_now = {0}; // Start Timers immediately
// SetWaitableTimer( AB_timer , &start_now, settings.bar_period, NULL, NULL, false );
SetWaitableTimer( AB_timer , &start_now, 1000, NULL, NULL, false ); // Bar period hardcoded by Josh1
InitializeCriticalSection( &lock );
_beginthread( threadEntryDummy, 0, this ); // Start Amibroker Poller Thread
}
/**
* Cleanup
*/
Worker::~Worker(){
CancelWaitableTimer( AB_timer ) ;
CloseHandle(AB_timer) ;
CloseHandle(Event_RTD_Update) ;
CloseHandle(Event_StopNow) ;
CloseHandle(Event_Stopped) ;
if( csv_file_out.is_open() ){ // Close file if not done - just in case
csv_file_out.close();
}
delete [] current; current = 0;
delete [] previous; previous = 0;
delete rtd_client; rtd_client = 0;
}
/**
* Signal Thread to stop and wait for it - Wait Maximum 3 seconds
**/
void Worker::stop(){
if( SetEvent(Event_StopNow) ){
WaitForSingleObject( Event_Stopped, 3*1000 );
}
}
Worker::ScripState::ScripState() :
ltp(0), vol_today(0), oi(0), bar_high(0), bar_low(std::numeric_limits<double>::infinity()), bar_open(0)
{}
void Worker::ScripState::reset(){
ltp = 0; vol_today = 0; oi =0; bar_high = 0; bar_low = std::numeric_limits<double>::infinity(); bar_open = 0;
}
bool Worker::ScripState::operator==(const ScripState& right) const{
return (ltp == right.ltp) && (vol_today == right.vol_today) &&
(oi == right.oi ) && (bar_high == right.bar_high) &&
(ltt == right.ltt) && (bar_low == right.bar_low ) && (volume == right.volume);
}
//Inserted by Josh1
Worker::RTData::RTData() :
ltp(0), vol_today(0), oi(0),volume(0), ltt()
{}
void Worker::RTData::reset(){
ltp = 0; vol_today = 0; oi =0; volume= 0; ltt.empty(); ticker.empty();
}
/**
* Connect Topics
*/
void Worker::connect(){
rtd_client->startServer();
for( int i=0 ; i<settings.no_of_scrips ; i++ ){
std::cout << settings.scrips_array[i].ticker << std::endl ;
long topic_id = i * FIELD_COUNT;
std::string topic_1 = settings.scrips_array[i].topic_name;
rtd_client->connectTopic(topic_id+LTP, topic_1, settings.scrips_array[i].topic_LTP );
rtd_client->connectTopic(topic_id+LTT, topic_1, settings.scrips_array[i].topic_LTT );
rtd_client->connectTopic(topic_id+VOLUME_TODAY, topic_1, settings.scrips_array[i].topic_vol_today );
rtd_client->connectTopic(topic_id+OI, topic_1, settings.scrips_array[i].topic_OI );
}
}
/**
* Wait for RTD Update event. On event, read new data and setup Current Bars
*/
void Worker::poll(){
while(1){
if( WaitForSingleObject( Event_RTD_Update, INFINITE ) == WAIT_OBJECT_0 ){
std::map<long,CComVariant>* data = rtd_client->readNewData() ;
if( data != 0 && !data->empty() ){
processRTDData( data );
}
delete data;
}
}
}
/**
* Read TopicId-Value data from COM and update Current Bar
**/
void Worker::processRTDData( const std::map<long,CComVariant>* data ){
RTData mydata; //inserted by Josh1
int prev_script_id = 999999; // set prev_script_id as a large integer which can never be reached
for( auto i=data->begin(), end=data->end() ; i!=end ; ++i ){
const long topic_id = i->first;
CComVariant topic_value = i->second;
std::pair<int,int> &ids = topic_id_to_scrip_field_map.at( topic_id ) ;
int script_id = ids.first; // Resolve Topic id to Scrip id and field
int field_id = ids.second;
// Changes made by Josh1 - fill all fields of mydata from RTDdata.
//****************************************************************************
if(mydata.sid != script_id && prev_script_id != 999999) { //if data for different scrip and it is not
array1.push_front(mydata); // first scrip then push it to array1
mydata.reset();
};
mydata.ticker = settings.scrips_array[script_id].ticker; // fill scrip name from settings
mydata.sid = script_id; // store script id also
prev_script_id = script_id; // and push to previous script id for comparing next time
switch( field_id ){ //Start filling data in my data.
case LTP :{
double ltp = MiscUtil::getDouble( topic_value );
mydata.ltp = ltp;
break ;
}
case VOLUME_TODAY :{
long long vol_today = MiscUtil::getLong ( topic_value );
mydata.vol_today = vol_today;
break ;
}
case LTT : {
mydata.ltt = MiscUtil::getString( topic_value );
break ;
}
case OI : mydata.oi = MiscUtil::getLong ( topic_value ); break ;
}
/*
//Uncomment these lines to view data received from RTD server
std::cout << mydata.ticker << " - " <<
script_id <<" - " <<
current[script_id].ltt <<" - " <<
current[script_id].ltp <<" - "<<
current[script_id].vol_today <<" - "<<
current[script_id].oi << std::endl;
*/
/*
//Uncomment these lines to view data filled in mydata structure
std::cout << mydata.ticker << " - " <<
mydata.sid <<" - " <<
mydata.ltt <<" - " <<
mydata.ltp <<" - "<<
mydata.vol_today <<" - "<<
mydata.oi << std::endl;
*/
//****************************************************************************
// End Changes made by Josh1
}
}
/**
* New Thread Entry Point
**/
void Worker::threadEntryDummy(void* _this){
((Worker*)_this)->amibrokerPoller();
}
void Worker::amibrokerPoller(){
std::vector<ScripBar> new_bars;
amibroker = new Amibroker( settings.ab_db_path, settings.csv_path, std::string("rtd.format") );
// amibroker constructor has to be called in new thread
while(1){
// Use events and timers instead of sleep which would be blocking
// Need to exit thread cleanly and immediately on application quit - sleep would block
HANDLE events[] = {Event_StopNow,AB_timer}; // (A) Wait For Timer Event / Application Quit Event
DWORD return_code = WaitForMultipleObjects( 2, events , false, INFINITE );
if( return_code == WAIT_OBJECT_0 ){ // Quit Event
delete amibroker; amibroker = 0;
SetEvent(Event_Stopped) ;
std::cout << "AB Feeder Thread Stopped" << std::endl;
return;
}
else if( return_code != WAIT_OBJECT_0 + 1 ){ // If not Timer Event, then we have some error
std::stringstream msg; msg << "WaitForSingleObject Failed - " << return_code;
throw( msg.str() );
}
// Shared data access start
EnterCriticalSection( &lock );
array2 = array1;
array1.clear();
LeaveCriticalSection( &lock );
// Shared data access end
array2.reverse();
for( auto it = array2.begin(); it!= array2.end() ; ++it ){
/******************************************************************
// Uncomment these lines to view data filled in array2
std::cout << it->ticker << " - " <<
it->ltt <<" - " <<
it->ltp <<" - "<<
it->vol_today <<" - "<<
it->oi << " before "<<std::endl;
*/
int s_id = it->sid;
ScripState *_current = ¤t[s_id];
ScripState *_prev = &previous[s_id];
_current->ltp = it->ltp; //Start creating bars
_current->ltt = it->ltt;; // ltt can be empty for index scrips
_current->vol_today = it->vol_today;
_current->oi = it->oi;
if (_current->vol_today == 0 //&& current->ltt.empty()
){ //Index does not have LTT Topic but Vol_today is always zero
_current->ltt = MiscUtil::getTime( "%H:%M:%S" ); //Hence set ltt for Index
}
//Convert time to "HHmmss" format
_current->ltt.erase(2,1); // Remove colons in the time string
// If Bar Period is 60000 then "HHmm" format, remove
//all characters after 4th to create 1min bars
settings.bar_period == 60000 ? _current->ltt.erase(4) : _current->ltt.erase(4,1);
if (_current->ltt != _prev->ltt || _prev->ltt.empty()) { // For startup or first tick of subsequent
_current->bar_high = _current->ltp; // period, ltt != previous ltt
_current->bar_low = _current->ltp; // so O,H,L are equalto LTP
_current->bar_open = _current->ltp;
_current->volume = _current->vol_today - _prev->vol_today; // calculate volume for bar
_prev->ltt = _current->ltt; // Push current bar time and volume
_prev->vol_today = _current->vol_today; // to previous bar
// _prev = _current;
} // if ltt is same, we have to adjust
else { // High and Low of bar
if( _current->bar_high < _current->ltp ) _current->bar_high = _current->ltp;
if( _current->bar_low >_current->ltp ) _current->bar_low = _current->ltp;
_current->volume = _current->vol_today - _prev->vol_today; //calculate volume for bar
}
/***************************************************************************
// Uncomment these lines to view data filled in current script
std::cout <<
it->ticker << " - " <<
_current->ltt <<" - " <<
_current->bar_open <<" - " <<
_current->bar_high <<" - " <<
_current->bar_low <<" - " <<
_current->ltp << " - " <<
_current->volume <<" - "<<
_current->oi <<" - "<<
std::endl;
*/
// If data not changed, skip
if( (_current->bar_open == 0) || // 1. No New data from readNewData()
(_current->volume==0 && _current->vol_today!=0) || // 2. Also skip if bar volume 0 but allow 0 volume scrips like forex
((*_current) == (*_prev)) // 3. We got new data from readNewData() but its duplicate
) continue; // NEST RTD sends all fields even if unconnected field (ex B/A) changes
/********************************************************************************************************************************************************
// Following code inserted by Josh1 for removing ticks that are away from current time by three minutes or more.
/*******************************************************************************************************************************************************
std::string Current_time = MiscUtil::getTime( settings.time_format.c_str() ); // get current time and
Current_time.erase(2,1); // remove colon from time
Current_time.erase(4,1);
// Current_time.erase(4); // Remove Colon and Seconds from Current time
//convert bar_ltt and current time to integers and find time difference
int i_Current_time = std::stoi (Current_time,nullptr,0);
int i_ltt = std::stoi (bar_ltt,nullptr,0);
int time_diff = i_ltt - i_Current_time;
if(time_diff > 300) { // if time difference is greater than 3 minutes, skip to next scrip.
_current->reset();
continue; // This is to remove Previous days's quotes of 15:29 captured in the morning.
} // also to remove stray quotes received if any.
}
*/
new_bars.push_back( ScripBar() );
ScripBar* bar = &new_bars.back();
bar->ltt = _current->ltt;
bar->volume = _current->volume;
bar->ticker = it->ticker;
bar->bar_open = _current->bar_open;
bar->bar_high = _current->bar_high;
bar->bar_low = _current->bar_low;
bar->bar_close = _current->ltp;
bar->oi = _current->oi;
/************************************************************
// Uncomment these lines to view bars sent to Amibroker
std::cout << bar->ticker << " - " <<
bar->ltt <<" - " <<
bar->bar_close <<" - "<<
bar->bar_open <<" - "<<
bar->bar_high <<" - "<<
bar->bar_low <<" - "<<
bar->volume <<" - "<<
bar->oi <<" - "<< std::endl;
*/
}
array2.clear();
if( !new_bars.empty() ){ // (C) Write to csv and Send to Amibroker
writeCsv( new_bars );
amibroker->import();
}
new_bars.clear();
}
}
void Worker::writeCsv( const std::vector<ScripBar> & bars ){
csv_file_out.open( settings.csv_path ); // Setup output stream to csv file
if( !csv_file_out.is_open() ){ // Reopening will also clear old content by default
throw( "Error opening file - " + settings.csv_path );
}
size_t size = bars.size();
const ScripBar *bar;
for( size_t i=0 ; i<size ; i++ ){ // $FORMAT Ticker, Date_YMD, Time, Open, High, Low, Close, Volume, OpenInt
bar = &bars[i];
csv_file_out << bar->ticker << ','
<< today_date << ','
<< bar->ltt << ','
<< bar->bar_open << ','
<< bar->bar_high << ','
<< bar->bar_low << ','
<< bar->bar_close << ','
<< bar->volume << ','
<< bar->oi << std::endl ;
}
csv_file_out.close();
}