Parallel Patterns Library 개발과 삽질

Visual Studio 2010 부터 지원되는 PPL 에 관련된 자료를 보고
코드 부분만 정리해 보았습니다.

멀티코어 CPU를 사놓고 이제서야 제대로 사용하는 느낌이 드는 군요. OTL

참고로, VS 2010 Beta1 은 ms에서 다운받을수 있습니다.


// http://vsts2010.tistory.com/ 에서 발췌

/* task parallelism
    task_handle
    task_group
*/

/* algorithms
    parallel_for( 0,size,1, [&](int i) { ... } )     // ++, += 지원되는 모든 타입
    parallel_for( v.begin(), v.end(), [&](int i) { ... } );
    parallel_invoke( []{...}, []{...} ... );
    parallel_accumulate( v.begin(), v.end(), init, std::plus );
*/

/* sync prim.
    enter_critical
    reader_writer,
    std::mutex ( C++0x mutex 표준 )
    Event
*/

/* concurrent containers
    combinable<T>
    concurrent_vector<T>
    concurrent_queue<T>
    concurrent_unordered_map<T>
*/

// lamda function의 사용 [ ]
void MulMatrix( int size, double** m1, double** m2, double** result ) {
    for( int i=0; i<size; i++ ) {
        for( int j=0; j<size; j++ ) {
            double temp = 0;
            for( int k=0; k<size; k++) {
                temp += m1[i][k] * m2[k][j];
            }
            result[i][j] = temp;
        }
    }
}

void MulMatrixP( int size, double** m1, double** m2, double** result ) {
    for( int i=0; i<size; i++ ) {
        //for( int j=0; j<size; j++ ) {
        parallel_for( 0,size,1, [&](int j) {    // ++, += 지원되는 모든 타입
        // parallel_for( v.begin(), v.end(), [&](int j) { 와 같은 형태도 가능
            double temp = 0;
            for( int k=0; k<size; k++) {
                // 아랫줄에 브레이크 걸고 Task View등을 연구해보자.
                // 각 Task를 선택할때마다 stack이 바뀌는 것도 확인 가능
                temp += m1[i][k] * m2[k][j];   
            }
            result[i][j] = temp;
        } );
    }
}

// task와 task_group의 도입
void quicksort( int * a, int n ) {
    if( n <= 1 ) return;
    int s = partition(a,n);
    quicksort( a, s );
    quicksort( a+s, n-s );
}

void quicksortP( int * a, int n ) {
    if( n <= 1 ) return;
    int s = partition(a,n);
    task_group g;
    g.run( [&]{ quicksort( a, s ); } );
    g.run( [&]{ quicksort( a+s, n-s ); } );
    g.wait();
}

int Accu()
{
    int sum = 0;
    for( vector<int>::iterator it=vec.begin(); it!=vec.end(); it++ ) {
        int elem = *it;
        SomeComputation(elem);
        sum += elem;
    }
    return sum;
}

int Accu_UsingCombinable()
{
    combinable<int> localSums;
    parallel_for( vec.begin(), vec.end(), [&localSums] (int elem) {
        SomeCompuation(elem);
        localSums.local() += elem;
    } );
    return localSums.combine(std::plus<int>);
}

void loadImages( vector<string> images )
{
    list<CImage*> imageList;    // not safe for MP
   
    for_each( images.begin(), images.end(), [&](string path) {
        CImage* image = new CImage();
        image->Load(path.c_str());
        imageList.push_back(image);
    } );
}

void loadImages_( vector<string> images )
{
    combinable<list<CImage*>> combinableImageList;    // safe for MP
    list<CImage*> imageList;                          // result
   
    parallel_for_each( images.begin(), images.end(), [&](string path) {
        CImage* image = new CImage();
        image->Load(path.c_str());
        combinableImageList.local().push_back(image);
    } );
    combinableImageList.combine_each( [&](list<CImage*> &localList) {
        imageList.splice( resultImageList.begin(), localList);      // ???
    } );
}

/* asynch agent library :
    core message blocks
     unbounded_buffer<T>
    Send, receive
    Transform, call
    choice, join
*/

HRESULT LogChunkFileParser::ParseFile() {
    HRESULT hr = S_OK;
    WCHAR wszLine[MAX_LINE] = { 0 };
    hr = reader->OpenFile(pFIleName);
    if( SUCCEEDED(hr) ) {
        whilre( !reader->EndOfFile() ) {
            hr = reader->ReadLine(wszLine, MAX_LINE);
            if( SUCCEEDED(hr) ) {
                Parse(wszLine);
            }
        }
    }
    return hr;
}

HRESULT LogChunkFileParser::ParseFileP() {
    unbound_buffer<AgentMessage>* MsgBuffer = new unbound_buffer<AgentMessage>();
    ParseAgent parseAgent;
    parseAgent.start();
   
    HRESULT hr = S_OK;
    WCHAR wszLine[MAX_LINE] = { 0 };
    hr = reader->OpenFile(pFIleName);
    if( SUCCEEDED(hr) ) {
        whilre( !reader->EndOfFile() ) {
            hr = reader->ReadLine(wszLine, MAX_LINE);
            if( SUCCEEDED(hr) ) {
                send(MsgBuffer, AgentMessage(wszLine));
            }
        }
        send(MsgBuffer, AgentMessage(EXIT));
        parseAgent.wait();
    }
    return hr;
}

class ParseAgent : agent {
    unbounded_buffer<AgentMessage>& buf_;
public:
    ...
    void Run() {
        AgentMessage msg;
        while( (msg = receive(buf_))->type != EXIT )    // blocking receive
        {
            Parse(msg->pCurrentLine);
            delete(msg->pCurrentLine);           
        }
    }
};

 


덧글

댓글 입력 영역


통계 위젯 (화이트)

51
16
404686

여러가지 알림 목록


Support Wikipedia