Pipeline with thread vectors and queues C++ -
here code, works after few iterations stop without error, because of race or deadlock.
the goal of code model encoding application: after creation of fake random frames, stages of pipeline first give type frames , encode random operation.
to i've used 2 different thread vectors (one each stage) used concurrently shared queues, once 1 thread have pushed frame, popped in other vector , "encoded".
#include <iostream> #include <vector> #include <algorithm> #include "safequeue.h" using namespace std; const int data_mag = 256; struct frame { int num; char type; bool encoded; vector<vector<int>> grid; }; void do_join(thread& t) { t.join(); } void join_all(vector<thread>& v) { for_each(v.begin(),v.end(),do_join); } void create_input (queue<frame>& stream, int num_frames, int height, int width) { (int = 0; < num_frames; i++) { vector<vector<int>>tmp_grid(height, vector<int>(width, 0)); frame frame; (int j = 0; j < height; j++) { (int k = 0; k < width; k++) { tmp_grid[j][k] = rand()%data_mag; } } frame.grid = tmp_grid; frame.num = i; stream.push(frame); } } void decide_type(int preset, queue<frame>& stream, queue<frame>& typed, vector<char>& param, int num_frames) { cout<<"hello decide"<<" "<<endl; for(int = 0; < num_frames; i++) { frame tmp = stream.pop(); int j = rand() % 10; if(j < preset) { tmp.type = 'i'; } else { tmp.type = 'b'; } param[tmp.num] = tmp.type; typed.push(tmp); } } void decode_flow(int preset, queue<frame>& typed, vector<frame>& encoded, vector<char>& parameters, int num_frames, int height, int width) { cout<<"hello decode"<<" "<<endl; for(int = 0; < num_frames; i++) { frame f = typed.pop(); if (f.type == 'i') { cout<<"hi i"<<" "<<endl; (int j = 0; j < height; j++) { (int k = 0; k < width; k++) { f.grid[j][k] = f.grid[j][k] * 2; } } } else cout<<"hi b"<<" "<<endl; encoded.push_back(f); } } int main() { srand(time(null)); int num_threadsxstage = 2; int width = 500; int height = 500; int num_frames = 100; int frames_thread = num_frames/num_threadsxstage; int preset = 3; vector<frame> final; //vectors of threads vector<thread> typer; vector<thread> encoder; //vector of parameters vector<char> parameters(num_frames); //working queues queue<frame> created; queue<frame> typed; //final vector vector<frame> encoded(num_frames); //movie creation create_input(created, num_frames, height, width); (int = 0; < num_threadsxstage; i++) { //stage 1 typer.push_back(thread(bind(&decide_type, preset, ref(created), ref(typed), ref(parameters), frames_thread))); //stage 2 encoder.push_back(thread(bind(&decode_flow, preset, ref(typed), ref(encoded), ref(parameters), frames_thread, height, width))); } // join join_all(typer); join_all(encoder); (int = 0; < num_frames; i++) { frame k = typed.pop(); cout<<k.type<<" "; } cout<<endl<<endl; (int = 0; < num_frames; i++) { cout<<parameters[i]<<" "; } }
and code of thread safe queue, or @ least supposed be.
#include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <iostream> using namespace std; template <typename t> class queue { private: queue<t> queue_; mutex mutex_; condition_variable cond_; public: t pop() { unique_lock<std::mutex> mlock(mutex_); while (queue_.empty()) { cond_.wait(mlock); } auto val = queue_.front(); queue_.pop(); return val; } void pop(t& item) { unique_lock<std::mutex> mlock(mutex_); while (queue_.empty()) { cond_.wait(mlock); } item = queue_.front(); queue_.pop(); } void push(const t& item) { unique_lock<std::mutex> mlock(mutex_); queue_.push(item); mlock.unlock(); cond_.notify_one(); } queue()=default; queue(const queue&) = delete; // disable copying queue& operator=(const queue&) = delete; // disable assignment };
after threads have finished, extract queued frames typed
queue - intermediate queue between processing stages, , empty. call typed.pop()
block forever.
you should extracting frames output queue encoded
.
Comments
Post a Comment