Abort called when joining a vector of threads - c++

I am trying to join a vector of threads but the program keeps aborting with a debug error.
I have stepped through with debugger and it always aborts when trying to join the final thread at the end of the loop, exiting with code 3 (0x3).
void NNTrainer::Train(std::vector<NNPlayer*> _players)
{
int popPerThread = std::ceil(static_cast<float>(m_population) / this->m_threads);
for (int i = 0; i < m_itersPerGen; i++)
{
// Create vector to contain threads
std::vector<std::thread> threads;
threads.resize(this->m_threads);
int thread = 0;
// Create threads
for (int startPopPos = 0; startPopPos < m_population; startPopPos += popPerThread)
{
threads.at(thread) = std::thread(&NNTrainer::TrainThread, this, _players, playersPerTable, thread,
startPopPos, (startPopPos + popPerThread > m_population ? m_population : startPopPos + popPerThread));
thread++;
}
// Join threads
for (int i = 0; i < this->m_threads; i++)
{
if (threads.at(i).joinable())
{
threads.at(i).join(); // Aborts here!
}
}
// Sort players based on their money
Sort(_players, 0, _players.size() - 1);
}
std::vector<NNPlayer*> parents(m_numOfParents);
for (int p = 0; p < m_numOfParents; p++)
{
parents.at(p) = _players.at(p);
parents.at(p)->ResetMoney();
}
GenerateNextGen(_players, parents);
}

Related

double free or corruption

I met a runtime error "double free or corruption" in my C++ program that calls a reliable library ANN and uses OpenMP to parallize a for loop.
*** glibc detected *** /home/tim/test/debug/test: double free or corruption (!prev): 0x0000000002527260 ***
Does it mean that the memory at address 0x0000000002527260 is freed more than once?
The error happens at "_search_struct->annkSearch(queryPt, k_max, nnIdx, dists, _eps);" inside function classify_various_k(), which is in turn inside the OpenMP for-loop inside function tune_complexity().
Note that the error happens when there are more than one threads for OpenMP, and does not happen in single thread case. Not sure why.
Following is my code. If it is not enough for diagnose, just let me know. Thanks for your help!
void KNNClassifier::train(int nb_examples, int dim, double **features, int * labels) {
_nPts = nb_examples;
_labels = labels;
_dataPts = features;
setting_ANN(_dist_type,1);
delete _search_struct;
if(strcmp(_search_neighbors, "brutal") == 0) {
_search_struct = new ANNbruteForce(_dataPts, _nPts, dim);
}else if(strcmp(_search_neighbors, "kdtree") == 0) {
_search_struct = new ANNkd_tree(_dataPts, _nPts, dim);
}
}
void KNNClassifier::classify_various_k(int dim, double *feature, int label, int *ks, double * errors, int nb_ks, int k_max) {
ANNpoint queryPt = 0;
ANNidxArray nnIdx = 0;
ANNdistArray dists = 0;
queryPt = feature;
nnIdx = new ANNidx[k_max];
dists = new ANNdist[k_max];
if(strcmp(_search_neighbors, "brutal") == 0) {
_search_struct->annkSearch(queryPt, k_max, nnIdx, dists, _eps);
}else if(strcmp(_search_neighbors, "kdtree") == 0) {
_search_struct->annkSearch(queryPt, k_max, nnIdx, dists, _eps); // where error occurs
}
for (int j = 0; j < nb_ks; j++)
{
scalar_t result = 0.0;
for (int i = 0; i < ks[j]; i++) {
result+=_labels[ nnIdx[i] ];
}
if (result*label<0) errors[j]++;
}
delete [] nnIdx;
delete [] dists;
}
void KNNClassifier::tune_complexity(int nb_examples, int dim, double **features, int *labels, int fold, char *method, int nb_examples_test, double **features_test, int *labels_test) {
int nb_try = (_k_max - _k_min) / scalar_t(_k_step);
scalar_t *error_validation = new scalar_t [nb_try];
int *ks = new int [nb_try];
for(int i=0; i < nb_try; i ++){
ks[i] = _k_min + _k_step * i;
}
if (strcmp(method, "ct")==0)
{
train(nb_examples, dim, features, labels );// train once for all nb of nbs in ks
for(int i=0; i < nb_try; i ++){
if (ks[i] > nb_examples){nb_try=i; break;}
error_validation[i] = 0;
}
int i = 0;
#pragma omp parallel shared(nb_examples_test, error_validation,features_test, labels_test, nb_try, ks) private(i)
{
#pragma omp for schedule(dynamic) nowait
for (i=0; i < nb_examples_test; i++)
{
classify_various_k(dim, features_test[i], labels_test[i], ks, error_validation, nb_try, ks[nb_try - 1]); // where error occurs
}
}
for (i=0; i < nb_try; i++)
{
error_validation[i]/=nb_examples_test;
}
}
......
}
UPDATE:
Thanks! I am now trying to correct the conflict of writing to same memory problem in classify_various_k() by using "#pragma omp critical":
void KNNClassifier::classify_various_k(int dim, double *feature, int label, int *ks, double * errors, int nb_ks, int k_max) {
ANNpoint queryPt = 0;
ANNidxArray nnIdx = 0;
ANNdistArray dists = 0;
queryPt = feature; //for (int i = 0; i < Vignette::size; i++){ queryPt[i] = vignette->content[i];}
nnIdx = new ANNidx[k_max];
dists = new ANNdist[k_max];
if(strcmp(_search_neighbors, "brutal") == 0) {// search
_search_struct->annkSearch(queryPt, k_max, nnIdx, dists, _eps);
}else if(strcmp(_search_neighbors, "kdtree") == 0) {
_search_struct->annkSearch(queryPt, k_max, nnIdx, dists, _eps);
}
for (int j = 0; j < nb_ks; j++)
{
scalar_t result = 0.0;
for (int i = 0; i < ks[j]; i++) {
result+=_labels[ nnIdx[i] ]; // Program received signal SIGSEGV, Segmentation fault
}
if (result*label<0)
{
#pragma omp critical
{
errors[j]++;
}
}
}
delete [] nnIdx;
delete [] dists;
}
However, there is a new segment fault error at "result+=_labels[ nnIdx[i] ];". Some idea? Thanks!
Okay, since you've stated that it works correctly on a single-thread case, then "normal" methods won't work. You need to do the following:
find all variables that are accessed in parallel
especially take a look at those that are modified
don't call delete on a shared resource
take a look at all library functions that operate on shared resources - check if they don't do allocation/deallocation
This is the list of candidates that are double deleted:
shared(nb_examples_test, error_validation,features_test, labels_test, nb_try, ks)
Also, this code might not be thread safe:
for (int i = 0; i < ks[j]; i++) {
result+=_labels[ nnIdx[i] ];
}
if (result*label<0) errors[j]++;
Because two or more processes may try to do a write to errors array.
And a big advice -- try not to access (especially modify!) anything while in the threaded mode, that is not a parameter to the function!
I don't know if this is your problem, but:
void KNNClassifier::train(int nb_examples, int dim, double **features, int * labels) {
...
delete _search_struct;
if(strcmp(_search_neighbors, "brutal") == 0) {
_search_struct = new ANNbruteForce(_dataPts, _nPts, dim);
}else if(strcmp(_search_neighbors, "kdtree") == 0) {
_search_struct = new ANNkd_tree(_dataPts, _nPts, dim);
}
}
What happens if you don't fall into either the if or the else if clauses? You've deleted _search_struct and left it pointing to garbage. You should set it to NULL afterward.
If this isn't the problem, you could try replacing:
delete p;
with:
assert(p != NULL);
delete p;
p = NULL;
(or similarly for delete[] sites). (This probably would pose a problem for the first invocation of KNNClassifier::train, however.)
Also, obligatory: do you really need to do all of these manual allocations and deallocations? Why aren't you at least using std::vector instead of new[]/delete[] (which are almost always bad)?
Your train method deletes _search_struct before allocating new memory to it. So the first time train is called, it is deleted. Is there code to allocate it before that call to train? You could end up trying to delete junk memory (we don't have the code to tell, though).

Thread Pool C++

I have the following for loop:
for (int i = 0; i < 100; i++) {
someJob();
}
I want to run this function only with 5 threads, how can I do it?
What I'm tried is to make array with 5 threads and if the index is equal to 5 to wait all threads and to it again, but I'm sure there is another way to do that:
std::thread t[THREAD_COUNT];
int j=0;
for (int i = 0; i < 100; i++) {
t[j++] = std::thread(someJob);
if (j == THREAD_COUNT)
{
for (int k = 0; k < THREAD_COUNT; k++)
{
if (t[k].joinable())
t[k].join();
}
j = 0;
}
}
Any suggestion? (I can't use boost)
Thanks!
You could make a function that tests your thread array to find a vacant thread to run each successive job on. Something like this:
// synchronized output to prevent interleaving of results
#define sync_out(m) do{std::ostringstream o; o << m << '\n'; std::cout << o.str();}while(0)
void someJob(int id)
{
sync_out("thread: " << id);
}
template<typename Job>
void start_thread(std::vector<std::thread>& threads, Job&& job)
{
// find an ended thread
for(auto&& thread: threads)
{
if(thread.joinable()) // still running or waiting to join
continue;
thread = std::thread(job);
return;
}
// if not wait for one
for(auto&& thread: threads)
{
if(!thread.joinable()) // dead thread (not run or already joined)
continue;
thread.join();
thread = std::thread(job);
return;
}
}
int main()
{
std::vector<std::thread> threads(5); // 5 threads
for(int i = 0; i < 100; i++)
start_thread(threads, [=]{someJob(i);});
// wait for any unfinished threads
for(auto&& thread: threads)
if(thread.joinable())
thread.join();
}
You can simply use std::async for that.
If you want to execute this function 100 times in 5 different asynchronous actions , then each asynchronous function will execute the function 20 times:
std::vector<std::future> results;
results.reserve(5);
for (auto i = 0; i< 5 ;i++){
results.emplace_back([]{
for(auto j = 0; j < 20 ; j++){
doSomeFunction();
}
});
}
for (auto& f : results){
f.get();
}
the same code can be modified to use naked std::thread.
You should use a Thread Pool.
Specifically, you might use the C++ Thread Pool Library CPTL, with which your code would look like this:
ctpl::thread_pool p(2 /* two threads in the pool */);
for (int i = 0; i < 100; i++) {
p.push(someJob, "additional_param");
}
OpenMP will allow you to do this trivially, while hiding the entire threadpool. Most compilers have build in support, but consult your manual for the specific options. (gcc simply requires passing -fopenmp as an option).
#pragma omp parallel for num_threads(5)
for (int i = 0; i < 100; i++) {
someJob(i);
}
will then split your work over 5 threads. If you leave out num_threads(5) it will choose a number of threads itselve.
Here's one way to implement a thread pool on the fly, while staying safe.
#include <thread>
#include <vector>
#include <algorithm>
#include <mutex>
void someJob() { /* some lengthy process */ }
int main()
{
const size_t THREAD_COUNT = 5;
std::vector<std::thread> threadPool;
std::mutex mtx; // to block access to the pool
for (int i = 0; i < 100; i++)
{
{
// add new thread to the pool.
std::lock_guard<std::mutex> lock(mtx);
threadPool.emplace_back(std::thread([&mtx, &threadPool]()
{
someJob();
// task is done, remove thread from pool
std::lock_guard<std::mutex> lock(mtx);
threadPool.erase(
std::find_if(threadPool.begin(), threadPool.end(),
[](std::thread& x)
{
if (x.get_id() == std::this_thread::get_id())
{
x.detach(); // have to call detach, since we can't
return true; // destroy an attached thread.
}
return false;
})
);
}));
}
for (;;)
{
// wait for a slot to be freed.
std::this_thread::yield();
std::lock_guard<std::mutex> lock(mtx);
if (threadPool.size() < THREAD_COUNT)
{
break;
}
}
}
// wait for the last tasks to be done
for (;;)
{
std::this_thread::yield();
std::lock_guard<std::mutex> lock(mtx); // works fine without.. but...
if (threadPool.empty()) // <-- can't call join here, since detached
{ // threads are not joinable()
break;
}
}
return 0;
}

OpenMP function calls in parallel

I'm looking for a way to call a function in parallel.
For example, if I have 4 threads, I want to each of them to call the same function with their own thread id as an argument.
Because of the argument, no thread will work on the same data.
#pragma omp parallel
{
for(int p = 0; p < numberOfThreads; p++)
{
if(p == omp_get_thread_num())
parDF(p);
}
}
Thread 0 should run parDF(0)
Thread 1 should run parDF(1)
Thread 2 should run parDF(2)
Thread 3 should run parDF(3)
All this should be done at the same time...
This (obviously) doesn't work, but what is the right way to do parallel function calls?
EDIT: The actual code (This might be too much information... But it was asked for...)
From the function that calls parDF():
omp_set_num_threads(NUM_THREADS);
#pragma omp parallel
{
numberOfThreads = omp_get_num_threads();
//split nodeQueue
#pragma omp master
{
splitNodeQueue(numberOfThreads);
}
int tid = omp_get_thread_num();
//printf("Hello World from thread = %d\n", tid);
#pragma omp parallel for private(tid)
for(int i = 0; i < numberOfThreads; ++i)
{
parDF(tid, originalQueueSize, DFlevel);
}
}
The parDF function:
bool Tree::parDF(int id, int originalQueueSize, int DFlevel)
{
double possibilities[20];
double sequence[3];
double workingSequence[3];
int nodesToExpand = originalQueueSize/omp_get_num_threads();
int tenthsTicks = nodesToExpand/10;
int numPossibilities = 0;
int percentage = 0;
list<double>::iterator i;
list<TreeNode*>::iterator n;
cout << "My ID is: "<< omp_get_thread_num() << endl;
while(parNodeQueue[id].size() > 0 and parNodeQueue[id].back()->depth == DFlevel)
{
if(parNodeQueue[id].size()%tenthsTicks == 0)
{
cout << endl;
cout << percentage*10 << "% done..." << endl;
if(percentage == 10)
{
percentage = 0;
}
percentage++;
}
//countStartPoints++;
depthFirstQueue.push_back(parNodeQueue[id].back());
numPossibilities = 0;
for(i = parNodeQueue[id].back()->content.sortedPoints.begin(); i != parNodeQueue[id].back()->content.sortedPoints.end(); i++)
{
for(int j = 0; j < deltas; j++)
{
if(parNodeQueue[id].back()->content.doesPointExist((*i) + delta[j]))
{
for(int k = 0; k <= numPossibilities; k++)
{
if(fabs((*i) + delta[j] - possibilities[k]) < 0.01)
{
goto pointAlreadyAdded;
}
}
possibilities[numPossibilities] = ((*i) + delta[j]);
numPossibilities++;
pointAlreadyAdded:
continue;
}
}
}
// Out of the list of possible points. All combinations of 3 are added, building small subtrees in from the node.
// If a subtree succesfully breaks the lower bound, true is returned.
for(int i = 0; i < numPossibilities; i++)
{
for(int j = 0; j < numPossibilities; j++)
{
for(int k = 0; k < numPossibilities; k++)
{
if( k != j and j != i and i != k)
{
sequence[0] = possibilities[i];
sequence[1] = possibilities[j];
sequence[2] = possibilities[k];
//countSeq++;
if(addSequence(sequence, id))
{
//successes++;
workingSequence[0] = sequence[0];
workingSequence[1] = sequence[1];
workingSequence[2] = sequence[2];
parNodeQueue[id].back()->workingSequence[0] = sequence[0];
parNodeQueue[id].back()->workingSequence[1] = sequence[1];
parNodeQueue[id].back()->workingSequence[2] = sequence[2];
parNodeQueue[id].back()->live = false;
succesfulNodes.push_back(parNodeQueue[id].back());
goto nextNode;
}
else
{
destroySubtree(parNodeQueue[id].back());
}
}
}
}
}
nextNode:
parNodeQueue[id].pop_back();
}
Is this what you are after?
Live On Coliru
#include <omp.h>
#include <cstdio>
int main()
{
int nthreads, tid;
#pragma omp parallel private(tid)
{
tid = ::omp_get_thread_num();
printf("Hello World from thread = %d\n", tid);
/* Only master thread does this */
if (tid == 0) {
nthreads = ::omp_get_num_threads();
printf("Number of threads = %d\n", nthreads);
}
} /* All threads join master thread and terminate */
}
Output:
Hello World from thread = 0
Number of threads = 8
Hello World from thread = 4
Hello World from thread = 3
Hello World from thread = 5
Hello World from thread = 2
Hello World from thread = 1
Hello World from thread = 6
Hello World from thread = 7
There are two ways to achieve what you want:
Exactly the way you are describing it: each thread starts the function with it's own thread id:
#pragma omp parallel
{
int threadId = omp_get_thread_num();
parDF(threadId);
}
The parallel block starts as many threads as the system reports that it supports, and each of them executes the block. Since they differ in threadId, they will process different data. To force that starting of more threads you can add a numthreads(100) or whatever to the pragma.
The correct way to do what you want is to use a parallel for block.
#pragma omp parallel for
for (int i=0; i < numThreads; ++i) {
parDF(i);
}
This way each iteration of the loop (value of i) gets assigned to a thread, that executes it. As many iterations will be ran in parallel, as there are available threads.
Method 1. is not very general, and is inefficient because you have to have as many threads as you want function calls. Method 2. is the canonical (right) way to get your problem solved.
You should be doing something like this :
#pragma omp parallel private(tid)
{
tid = omp_get_thread_num();
parDF(tid);
}
I think its quite straight forward.

I'm trying to shoot with vectors

I am currently writing a game in c++ with direct X and currently a vector to store draw all my sprites. I can't get bullets to work in the vector.
The bullet shoots but only one comes out when there should be 50
//this vector also contains all other sprites i have like my player and my score and other sprites
///Draw is a class
vector<Draw*> chap;
vector<Draw*>::iterator it;
Draw *bullet = NULL;
///initialization of stuff
for (int i = 0; i < gt; i++)
{
bullet = new Draw[gt];
//this sets the x position and y position and other properties
bullet[i].setDraw(blet,0,0,.1,.1,0,64,64,1,0,1,color,0,0,90,0,0,false);
chap.push_back(&bullet[i]);
}
//end initialization
///game loop
for (int i = 0; i < bell; i++)
{
for (it = chap.begin(); it != chap.end(); it++)
{
(*it)->testShoot(&bullet[i], ME);
ME->playerMove();
monster1->move();
}
}
///end loop there is other stuff in loop but doesn't effect anything
/// what i'm using
void Draw::testShoot(Draw *sprite, Draw *sprite1)
{
if ((int)timeGetTime() < timer + 100)return;
timer = timeGetTime();
for (int i = 0; i < 50; i++)
{
if (Key_Down(DIK_SPACE))
{
if (!sprite[i].alive)
{
sprite[i].x = sprite1->x + sprite1->width / 4;
sprite[i].y = sprite1->y + sprite1->height / 4;
sprite[i].velx = 1;
sprite[i].vely = 0;
sprite[i].alive = true;
break;
}
}
}
}
bullet = new Draw[gt]; should go before the for loop, not inside it. Right now you're making a new array each iteration of the loop, which only has one populated element, and losing the previous ones you created when you overwrite the value in bullet each time round.
In other words, this:
for (int i = 0; i < gt; i++)
{
bullet = new Draw[gt];
bullet[i].setDraw( ...
should be this:
bullet = new Draw[gt];
for (int i = 0; i < gt; i++)
{
bullet[i].setDraw( ...

resize vector element of a struct - segv

I am trying to resize a vector element of a structure and it causes segv. But when I did it individually for some small struct it worked fine. I am curious to know how it allocates memory to structure in which there is a vector element that could be resized. The below comment line causes segv in first iteration (type_index = 0).
Structure:-
struct thread_data {
dbPointer_t pObj;
dbObjId_t objId;
dbObjTypeId_t type;
dbObjId_t topCellId;
dbIteratorId_t objsIterId;
int precision;
int64_t shape_objs;
vector<vector<vector<PL_trp_header_t *> > > ps_hdrs;
int pool;
int num_layers;
int to_cell_id;
};
Below is the snippet of code:-
thread_data *t_data[types_length];
for(int type_index=0; type_index < types_length; ++type_index) {
t_data[type_index] = (thread_data*)malloc(sizeof(thread_data));
t_data[type_index]->pObj = NULL;
t_data[type_index]->objId = objId;
t_data[type_index]->type = shape_types[type_index];
t_data[type_index]->topCellId = topCellId;
t_data[type_index]->objsIterId = objsIterId;
t_data[type_index]->precision = nparams.unit_precision;
t_data[type_index]->shape_objs = 0;
t_data[type_index]->ps_hdrs.resize(num_layers); //this line causes segv
t_data[type_index]->pool = pool;
t_data[type_index]->num_layers = num_layers;
t_data[type_index]->to_cell_id = tocell_id;
for (int num = 0; num < num_layers; num++) {
t_data[type_index]->ps_hdrs[num].resize(index_limit);
for (int rows = 0; rows < index_limit; rows++)
t_data[type_index]->ps_hdrs[num][rows].resize(index_limit);
}
for(int i = 0; i < num_layers; i++) {
for (int rows = 0; rows < index_limit; rows++) {
for (int cols = 0; cols < index_limit; cols++) {
t_data[type_index]->ps_hdrs[i][rows][cols] = alloc_hdr(pool);
}
}
}
printf("In main: creating thread %d \n", type_index);
rc_thread = pthread_create(&threads[type_index], NULL, thread_fn, (void *) &t_data[type_index]);
if (rc_thread){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
free(t_data[type_index]);
}
I think you are allocating your data with malloc. In this case no constructors for your objects and theier members are called. This works with PODs but not with classes like vector. In the line with the error you try to access some unitialised memory like an vector. Try new and delete instead of mallac and free to solve this isue.

Resources