#include "stdafx.h" #include "StdThread.h" #include ProducerThread::ProducerThread() { m_bEngineRunning = false; m_hHandle = NULL; } ProducerThread::~ProducerThread(void) { StopThread(); } bool ProducerThread::StartThread() { if ( !m_bEngineRunning && m_hHandle == NULL ) { m_hHandle = (HANDLE)_beginthreadex(NULL,0,TheThread,this,0,NULL); if ( m_hHandle != NULL ) { int nWaitTime = 0; while (!m_bEngineRunning && nWaitTime < 3000) { Sleep(10); nWaitTime = nWaitTime + 10; } } } return m_bEngineRunning; } bool ProducerThread::StopThread() { if ( m_bEngineRunning ) { m_bEngineRunning = false; WaitForSingleObject(m_hHandle,INFINITE); } return true; } unsigned int ProducerThread::TheThread(void *pParam) { ProducerThread *pThis = (ProducerThread *)pParam; pThis->m_bEngineRunning = true; pThis->ThreadFunction(); return 0; } ConsumerThread::ConsumerThread() { m_bEngineRunning = false; m_hHandle = NULL; } ConsumerThread::~ConsumerThread(void) { StopThread(); } bool ConsumerThread::StartThread() { if (!m_bEngineRunning && m_hHandle == NULL) { m_hHandle = (HANDLE)_beginthreadex(NULL,0,TheThread,this,0,NULL); if (m_hHandle != NULL) { int nWaitTime = 0; while (!m_bEngineRunning && nWaitTime < 3000) { Sleep(10); nWaitTime = nWaitTime + 10; } } } return m_bEngineRunning; } bool ConsumerThread::StopThread() { if ( m_bEngineRunning ) { m_bEngineRunning = false; m_queData.push_back( NULL ); WaitForSingleObject(m_hHandle,INFINITE); void* pData = NULL; while( m_queData.PopFront( pData ) ) { if ( pData != NULL ) { ReleaseData(pData); } } m_hHandle = NULL; } return true; } void ConsumerThread::PutDataToQue( void * pData ) { m_queData.push_back( pData ); } unsigned int ConsumerThread::TheThread(void *pParam) { ConsumerThread *pThis = (ConsumerThread *)pParam; pThis->m_bEngineRunning = true; while ( pThis->m_bEngineRunning ) { void* pData = NULL; pThis->m_queData.wait_and_pop( pData ); if ( pData != NULL ) { pThis->ConsumeDataFromQue( pData ); pThis->ReleaseData( pData ); } else { break; } } return 0; } MultiConsumerThread::MultiConsumerThread() { m_bEngineRunning = false; m_vecHandle.clear(); } MultiConsumerThread::~MultiConsumerThread(void) { StopThread(); } bool MultiConsumerThread::StartThread(int iThread) { for(int i = 0;i < iThread ; ++i) { HANDLE hHandle = (HANDLE)_beginthreadex(NULL,0,TheThread,this,0,NULL); if (hHandle != NULL) { m_vecHandle.push_back(hHandle); } } return m_bEngineRunning; } bool MultiConsumerThread::StopThread() { if ( m_bEngineRunning ) { m_bEngineRunning = false; int iSize = m_vecHandle.size(); for(int i = 0; i < iSize; ++i) { m_queData.push_back( NULL ); } vector::iterator iterCur = m_vecHandle.begin(); vector::iterator iterEnd = m_vecHandle.end(); for(;iterCur != iterEnd; ++iterCur) { WaitForSingleObject(*iterCur,INFINITE); CloseHandle(*iterCur); } void* pData = NULL; while( m_queData.PopFront( pData ) ) { if ( pData != NULL ) { ReleaseData(pData); } } } return true; } void MultiConsumerThread::PutDataToQue( void * pData ) { m_queData.push_back( pData ); } unsigned int MultiConsumerThread::TheThread(void *pParam) { MultiConsumerThread *pThis = (MultiConsumerThread *)pParam; pThis->m_bEngineRunning = true; while ( pThis->m_bEngineRunning ) { void* pData = NULL; pThis->m_queData.wait_and_pop( pData ); if ( pData != NULL ) { pThis->ConsumeDataFromQue( pData ); pThis->ReleaseData( pData ); } else { break; } } return 0; }