Pipeline with thread vectors and queues C++ -


here code, works after few iterations stop without error, because of race or deadlock.

the goal of code model encoding application: after creation of fake random frames, stages of pipeline first give type frames , encode random operation.

to i've used 2 different thread vectors (one each stage) used concurrently shared queues, once 1 thread have pushed frame, popped in other vector , "encoded".

 #include <iostream>  #include <vector>  #include <algorithm>   #include "safequeue.h"   using namespace std;   const int data_mag = 256;   struct frame  {     int num;      char type;      bool encoded;      vector<vector<int>> grid; };  void do_join(thread& t) {     t.join(); }  void join_all(vector<thread>& v) {     for_each(v.begin(),v.end(),do_join); }  void create_input (queue<frame>& stream, int num_frames, int height, int width) {     (int = 0; < num_frames; i++)     {         vector<vector<int>>tmp_grid(height, vector<int>(width, 0));          frame frame;          (int j = 0; j < height; j++)         {             (int k = 0; k < width; k++)             {                 tmp_grid[j][k] = rand()%data_mag;             }         }          frame.grid = tmp_grid;         frame.num = i;          stream.push(frame);     } }   void decide_type(int preset, queue<frame>& stream, queue<frame>& typed, vector<char>& param, int num_frames) {     cout<<"hello decide"<<" "<<endl;      for(int = 0; < num_frames; i++)     {         frame tmp = stream.pop();          int j = rand() % 10;          if(j < preset)         {             tmp.type = 'i';         }          else         {             tmp.type = 'b';         }          param[tmp.num] = tmp.type;          typed.push(tmp);     } }  void decode_flow(int preset, queue<frame>& typed, vector<frame>& encoded,                     vector<char>& parameters, int num_frames, int height, int width) {     cout<<"hello decode"<<" "<<endl;      for(int = 0; < num_frames; i++)     {         frame f = typed.pop();          if (f.type == 'i')         {             cout<<"hi i"<<" "<<endl;             (int j = 0; j < height; j++)             {                 (int k = 0; k < width; k++)                 {                     f.grid[j][k] = f.grid[j][k] * 2;                 }             }         }          else cout<<"hi b"<<" "<<endl;          encoded.push_back(f);     } }     int main() {     srand(time(null));      int num_threadsxstage = 2;      int width = 500;     int height = 500;      int num_frames = 100;      int frames_thread = num_frames/num_threadsxstage;      int preset = 3;      vector<frame> final;      //vectors of threads     vector<thread> typer;     vector<thread> encoder;      //vector of parameters     vector<char> parameters(num_frames);      //working queues     queue<frame> created;     queue<frame> typed;      //final vector     vector<frame> encoded(num_frames);      //movie creation      create_input(created, num_frames, height, width);    (int = 0; < num_threadsxstage; i++)     {         //stage 1         typer.push_back(thread(bind(&decide_type, preset, ref(created),                                     ref(typed), ref(parameters), frames_thread)));          //stage 2         encoder.push_back(thread(bind(&decode_flow, preset, ref(typed), ref(encoded),                                       ref(parameters), frames_thread, height, width)));     }       // join      join_all(typer);      join_all(encoder);       (int = 0; < num_frames; i++)     {         frame k = typed.pop();          cout<<k.type<<" ";     }      cout<<endl<<endl;      (int = 0; < num_frames; i++)     {         cout<<parameters[i]<<" ";     } } 

and code of thread safe queue, or @ least supposed be.

#include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <iostream>  using namespace std;  template <typename t>  class queue { private:     queue<t> queue_;     mutex mutex_;     condition_variable cond_;  public:      t pop()     {         unique_lock<std::mutex> mlock(mutex_);         while (queue_.empty())         {             cond_.wait(mlock);         }          auto val = queue_.front();         queue_.pop();         return val;     }      void pop(t& item)     {         unique_lock<std::mutex> mlock(mutex_);          while (queue_.empty())         {             cond_.wait(mlock);         }         item = queue_.front();         queue_.pop();     }      void push(const t& item)     {         unique_lock<std::mutex> mlock(mutex_);         queue_.push(item);         mlock.unlock();         cond_.notify_one();     }     queue()=default;     queue(const queue&) = delete;            // disable copying     queue& operator=(const queue&) = delete; // disable assignment  }; 

after threads have finished, extract queued frames typed queue - intermediate queue between processing stages, , empty. call typed.pop() block forever.

you should extracting frames output queue encoded.


Comments

Popular posts from this blog

javascript - Laravel datatable invalid JSON response -

java - Exception in thread "main" org.springframework.context.ApplicationContextException: Unable to start embedded container; -

sql server 2008 - My Sql Code Get An Error Of Msg 245, Level 16, State 1, Line 1 Conversion failed when converting the varchar value '8:45 AM' to data type int -