689 lines
19 KiB
C++
689 lines
19 KiB
C++
// Copyright (C) 2006 Davis E. King (davis@dlib.net)
|
|
// License: Boost Software License See LICENSE.txt for the full license.
|
|
|
|
|
|
#include <sstream>
|
|
#include <string>
|
|
#include <cstdlib>
|
|
#include <ctime>
|
|
#include <dlib/misc_api.h>
|
|
#include <dlib/pipe.h>
|
|
|
|
#include "tester.h"
|
|
|
|
namespace
|
|
{
|
|
using namespace test;
|
|
using namespace dlib;
|
|
using namespace std;
|
|
|
|
logger dlog("test.pipe");
|
|
|
|
namespace pipe_kernel_test_helpers
|
|
{
|
|
const unsigned long proc1_count = 10000;
|
|
dlib::mutex m;
|
|
signaler s(m);
|
|
unsigned long threads_running = 0;
|
|
bool found_error;
|
|
|
|
inline void add_running_thread (
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
++threads_running;
|
|
}
|
|
|
|
inline void remove_running_thread (
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
--threads_running;
|
|
s.broadcast();
|
|
}
|
|
|
|
inline void wait_for_threads (
|
|
)
|
|
{
|
|
auto_mutex M(m);
|
|
while (threads_running > 0)
|
|
s.wait();
|
|
}
|
|
|
|
template <
|
|
typename pipe
|
|
>
|
|
void threadproc1 (
|
|
void* param
|
|
)
|
|
{
|
|
add_running_thread();
|
|
pipe& p = *static_cast<pipe*>(param);
|
|
try
|
|
{
|
|
|
|
int last = -1;
|
|
for (unsigned long i = 0; i < proc1_count; ++i)
|
|
{
|
|
int cur=0;
|
|
DLIB_TEST(p.dequeue(cur) == true);
|
|
DLIB_TEST(last + 1 == cur);
|
|
last = cur;
|
|
}
|
|
DLIB_TEST(p.size() == 0);
|
|
}
|
|
catch(exception& e)
|
|
{
|
|
auto_mutex M(m);
|
|
found_error = true;
|
|
cout << "\n\nERRORS FOUND" << endl;
|
|
cout << e.what() << endl;
|
|
dlog << LWARN << "ERRORS FOUND";
|
|
dlog << LWARN << e.what();
|
|
p.disable();
|
|
}
|
|
|
|
remove_running_thread();
|
|
}
|
|
|
|
|
|
template <
|
|
typename pipe
|
|
>
|
|
void threadproc2 (
|
|
void* param
|
|
)
|
|
{
|
|
add_running_thread();
|
|
pipe& p = *static_cast<pipe*>(param);
|
|
try
|
|
{
|
|
|
|
int last = -1;
|
|
int cur;
|
|
while (p.dequeue(cur))
|
|
{
|
|
DLIB_TEST(last < cur);
|
|
last = cur;
|
|
}
|
|
auto_mutex M(m);
|
|
}
|
|
catch(exception& e)
|
|
{
|
|
auto_mutex M(m);
|
|
found_error = true;
|
|
cout << "\n\nERRORS FOUND" << endl;
|
|
cout << e.what() << endl;
|
|
dlog << LWARN << "ERRORS FOUND";
|
|
dlog << LWARN << e.what();
|
|
p.disable();
|
|
}
|
|
remove_running_thread();
|
|
}
|
|
|
|
|
|
|
|
template <
|
|
typename pipe
|
|
>
|
|
void threadproc3 (
|
|
void* param
|
|
)
|
|
{
|
|
add_running_thread();
|
|
pipe& p = *static_cast<pipe*>(param);
|
|
try
|
|
{
|
|
|
|
int last = -1;
|
|
int cur;
|
|
while (p.dequeue_or_timeout(cur,100000))
|
|
{
|
|
DLIB_TEST(last < cur);
|
|
last = cur;
|
|
}
|
|
auto_mutex M(m);
|
|
}
|
|
catch(exception& e)
|
|
{
|
|
auto_mutex M(m);
|
|
found_error = true;
|
|
cout << "\n\nERRORS FOUND" << endl;
|
|
cout << e.what() << endl;
|
|
dlog << LWARN << "ERRORS FOUND";
|
|
dlog << LWARN << e.what();
|
|
p.disable();
|
|
}
|
|
remove_running_thread();
|
|
}
|
|
|
|
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template<typename in_type, typename out_type>
|
|
class PipelineProcessor : private dlib::threaded_object
|
|
{
|
|
public:
|
|
PipelineProcessor(
|
|
dlib::pipe<in_type> & in,
|
|
dlib::pipe<out_type> & out) :
|
|
InPipe(in),
|
|
OutPipe(out),
|
|
InMsg(),
|
|
OutMsg() {
|
|
start();
|
|
}
|
|
|
|
~PipelineProcessor() {
|
|
// signal the thread to stop
|
|
stop();
|
|
wait();
|
|
}
|
|
|
|
private:
|
|
dlib::pipe<in_type> & InPipe;
|
|
dlib::pipe<out_type> & OutPipe;
|
|
|
|
in_type InMsg;
|
|
out_type OutMsg;
|
|
|
|
void thread()
|
|
{
|
|
while (!should_stop()) {
|
|
if(InPipe.dequeue_or_timeout(InMsg, 100))
|
|
{
|
|
// if function signals ready to send OutMsg
|
|
while (!OutPipe.enqueue_or_timeout(OutMsg, 100))
|
|
{
|
|
// try to send until should stop
|
|
if (should_stop())
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
};
|
|
|
|
|
|
void do_zero_size_test_with_timeouts()
|
|
{
|
|
dlog << LINFO << "in do_zero_size_test_with_timeouts()";
|
|
// make sure we can get though this without deadlocking
|
|
for (int k = 0; k < 10; ++k)
|
|
{
|
|
dlib::pipe<int> in_pipe(10);
|
|
dlib::pipe<float> out_pipe(0);
|
|
{
|
|
PipelineProcessor<int, float> pp(in_pipe, out_pipe);
|
|
|
|
int in = 1;
|
|
in_pipe.enqueue(in);
|
|
in = 2;
|
|
in_pipe.enqueue(in);
|
|
in = 3;
|
|
in_pipe.enqueue(in);
|
|
// sleep to make sure thread enqueued
|
|
dlib::sleep(100);
|
|
|
|
float out = 1.0f;
|
|
out_pipe.dequeue(out);
|
|
dlib::sleep(100);
|
|
}
|
|
print_spinner();
|
|
}
|
|
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------------------
|
|
// ----------------------------------------------------------------------------------------
|
|
|
|
template <
|
|
typename pipe
|
|
>
|
|
void pipe_kernel_test (
|
|
)
|
|
/*!
|
|
requires
|
|
- pipe is an implementation of pipe/pipe_kernel_abstract.h and
|
|
is instantiated with int
|
|
ensures
|
|
- runs tests on pipe for compliance with the specs
|
|
!*/
|
|
{
|
|
using namespace pipe_kernel_test_helpers;
|
|
found_error = false;
|
|
|
|
|
|
print_spinner();
|
|
pipe test(10), test2(100);
|
|
pipe test_0(0), test2_0(0);
|
|
pipe test_1(1), test2_1(1);
|
|
|
|
DLIB_TEST(test.size() == 0);
|
|
DLIB_TEST(test2.size() == 0);
|
|
DLIB_TEST(test_0.size() == 0);
|
|
DLIB_TEST(test2_0.size() == 0);
|
|
DLIB_TEST(test_1.size() == 0);
|
|
DLIB_TEST(test2_1.size() == 0);
|
|
|
|
DLIB_TEST(test.is_enqueue_enabled() == true);
|
|
DLIB_TEST(test.is_dequeue_enabled() == true);
|
|
DLIB_TEST(test.is_enabled() == true);
|
|
|
|
test.empty();
|
|
test2.empty();
|
|
DLIB_TEST(test.size() == 0);
|
|
DLIB_TEST(test2.size() == 0);
|
|
|
|
test_0.empty();
|
|
test2_0.empty();
|
|
DLIB_TEST(test_0.size() == 0);
|
|
DLIB_TEST(test2_0.size() == 0);
|
|
|
|
test_1.empty();
|
|
test2_1.empty();
|
|
DLIB_TEST(test_1.size() == 0);
|
|
DLIB_TEST(test2_1.size() == 0);
|
|
|
|
|
|
|
|
int a;
|
|
a = 3;
|
|
test.enqueue(a);
|
|
DLIB_TEST(test.size() == 1);
|
|
a = 5;
|
|
test.enqueue(a);
|
|
DLIB_TEST(test.size() == 2);
|
|
|
|
a = 0;
|
|
test.dequeue(a);
|
|
DLIB_TEST(a == 3);
|
|
DLIB_TEST(test.size() == 1);
|
|
|
|
a = 0;
|
|
test.dequeue(a);
|
|
DLIB_TEST(a == 5);
|
|
DLIB_TEST(test.size() == 0);
|
|
|
|
|
|
print_spinner();
|
|
{
|
|
dlog << LINFO << "starting normal length pipe tests";
|
|
create_new_thread(&threadproc1<pipe>,&test);
|
|
create_new_thread(&threadproc2<pipe>,&test2);
|
|
create_new_thread(&threadproc2<pipe>,&test2);
|
|
create_new_thread(&threadproc2<pipe>,&test2);
|
|
|
|
for (unsigned long i = 0; i < proc1_count; ++i)
|
|
{
|
|
a = i;
|
|
test.enqueue(a);
|
|
}
|
|
DLIB_TEST(test.is_enqueue_enabled() == true);
|
|
test.disable_enqueue();
|
|
DLIB_TEST(test.is_enqueue_enabled() == false);
|
|
for (unsigned long i = 0; i < proc1_count; ++i)
|
|
{
|
|
a = i;
|
|
test.enqueue(a);
|
|
}
|
|
|
|
for (unsigned long i = 0; i < 100000; ++i)
|
|
{
|
|
a = i;
|
|
if (i%2 == 0)
|
|
test2.enqueue(a);
|
|
else
|
|
test2.enqueue_or_timeout(a,100000);
|
|
}
|
|
|
|
test2.wait_for_num_blocked_dequeues(3);
|
|
DLIB_TEST(test2.size() == 0);
|
|
test2.disable();
|
|
|
|
wait_for_threads();
|
|
DLIB_TEST(test2.size() == 0);
|
|
|
|
test2.enable();
|
|
|
|
print_spinner();
|
|
|
|
create_new_thread(&threadproc3<pipe>,&test2);
|
|
create_new_thread(&threadproc3<pipe>,&test2);
|
|
|
|
|
|
for (unsigned long i = 0; i < 100000; ++i)
|
|
{
|
|
a = i;
|
|
if (i%2 == 0)
|
|
test2.enqueue(a);
|
|
else
|
|
test2.enqueue_or_timeout(a,100000);
|
|
}
|
|
|
|
test2.wait_for_num_blocked_dequeues(2);
|
|
DLIB_TEST(test2.size() == 0);
|
|
test2.disable();
|
|
|
|
wait_for_threads();
|
|
DLIB_TEST(test2.size() == 0);
|
|
|
|
}
|
|
|
|
|
|
print_spinner();
|
|
{
|
|
dlog << LINFO << "starting 0 length pipe tests";
|
|
create_new_thread(&threadproc1<pipe>,&test_0);
|
|
create_new_thread(&threadproc2<pipe>,&test2_0);
|
|
create_new_thread(&threadproc2<pipe>,&test2_0);
|
|
create_new_thread(&threadproc2<pipe>,&test2_0);
|
|
dlog << LTRACE << "0: 1";
|
|
|
|
for (unsigned long i = 0; i < proc1_count; ++i)
|
|
{
|
|
a = i;
|
|
test_0.enqueue(a);
|
|
}
|
|
|
|
dlog << LTRACE << "0: 2";
|
|
DLIB_TEST(test_0.is_enqueue_enabled() == true);
|
|
test_0.disable_enqueue();
|
|
DLIB_TEST(test_0.is_enqueue_enabled() == false);
|
|
for (unsigned long i = 0; i < proc1_count; ++i)
|
|
{
|
|
a = i;
|
|
test_0.enqueue(a);
|
|
}
|
|
|
|
dlog << LTRACE << "0: 3";
|
|
for (unsigned long i = 0; i < 100000; ++i)
|
|
{
|
|
a = i;
|
|
if (i%2 == 0)
|
|
test2_0.enqueue(a);
|
|
else
|
|
test2_0.enqueue_or_timeout(a,100000);
|
|
}
|
|
|
|
print_spinner();
|
|
dlog << LTRACE << "0: 4";
|
|
test2_0.wait_for_num_blocked_dequeues(3);
|
|
DLIB_TEST(test2_0.size() == 0);
|
|
test2_0.disable();
|
|
|
|
wait_for_threads();
|
|
DLIB_TEST(test2_0.size() == 0);
|
|
|
|
dlog << LTRACE << "0: 5";
|
|
test2_0.enable();
|
|
|
|
|
|
create_new_thread(&threadproc3<pipe>,&test2_0);
|
|
create_new_thread(&threadproc3<pipe>,&test2_0);
|
|
|
|
|
|
for (unsigned long i = 0; i < 20000; ++i)
|
|
{
|
|
if ((i%100) == 0)
|
|
print_spinner();
|
|
|
|
a = i;
|
|
if (i%2 == 0)
|
|
test2_0.enqueue(a);
|
|
else
|
|
test2_0.enqueue_or_timeout(a,100000);
|
|
}
|
|
|
|
dlog << LTRACE << "0: 6";
|
|
test2_0.wait_for_num_blocked_dequeues(2);
|
|
DLIB_TEST(test2_0.size() == 0);
|
|
test2_0.disable();
|
|
|
|
wait_for_threads();
|
|
DLIB_TEST(test2_0.size() == 0);
|
|
|
|
dlog << LTRACE << "0: 7";
|
|
}
|
|
|
|
print_spinner();
|
|
{
|
|
dlog << LINFO << "starting 1 length pipe tests";
|
|
create_new_thread(&threadproc1<pipe>,&test_1);
|
|
create_new_thread(&threadproc2<pipe>,&test2_1);
|
|
create_new_thread(&threadproc2<pipe>,&test2_1);
|
|
create_new_thread(&threadproc2<pipe>,&test2_1);
|
|
|
|
for (unsigned long i = 0; i < proc1_count; ++i)
|
|
{
|
|
a = i;
|
|
test_1.enqueue(a);
|
|
}
|
|
DLIB_TEST(test_1.is_enqueue_enabled() == true);
|
|
test_1.disable_enqueue();
|
|
DLIB_TEST(test_1.is_enqueue_enabled() == false);
|
|
for (unsigned long i = 0; i < proc1_count; ++i)
|
|
{
|
|
a = i;
|
|
test_1.enqueue(a);
|
|
}
|
|
print_spinner();
|
|
|
|
for (unsigned long i = 0; i < 100000; ++i)
|
|
{
|
|
a = i;
|
|
if (i%2 == 0)
|
|
test2_1.enqueue(a);
|
|
else
|
|
test2_1.enqueue_or_timeout(a,100000);
|
|
}
|
|
|
|
test2_1.wait_for_num_blocked_dequeues(3);
|
|
DLIB_TEST(test2_1.size() == 0);
|
|
test2_1.disable();
|
|
|
|
wait_for_threads();
|
|
DLIB_TEST(test2_1.size() == 0);
|
|
|
|
test2_1.enable();
|
|
|
|
|
|
create_new_thread(&threadproc3<pipe>,&test2_1);
|
|
create_new_thread(&threadproc3<pipe>,&test2_1);
|
|
|
|
|
|
for (unsigned long i = 0; i < 100000; ++i)
|
|
{
|
|
a = i;
|
|
if (i%2 == 0)
|
|
test2_1.enqueue(a);
|
|
else
|
|
test2_1.enqueue_or_timeout(a,100000);
|
|
}
|
|
|
|
test2_1.wait_for_num_blocked_dequeues(2);
|
|
DLIB_TEST(test2_1.size() == 0);
|
|
test2_1.disable();
|
|
|
|
wait_for_threads();
|
|
DLIB_TEST(test2_1.size() == 0);
|
|
|
|
}
|
|
|
|
test.enable_enqueue();
|
|
test_0.enable_enqueue();
|
|
test_1.enable_enqueue();
|
|
|
|
DLIB_TEST(test.is_enabled());
|
|
DLIB_TEST(test.is_enqueue_enabled());
|
|
DLIB_TEST(test_0.is_enabled());
|
|
DLIB_TEST(test_0.is_enqueue_enabled());
|
|
DLIB_TEST(test_1.is_enabled());
|
|
DLIB_TEST(test_1.is_enqueue_enabled());
|
|
|
|
DLIB_TEST(test.size() == 0);
|
|
DLIB_TEST(test_0.size() == 0);
|
|
DLIB_TEST(test_1.size() == 0);
|
|
DLIB_TEST(test.max_size() == 10);
|
|
DLIB_TEST(test_0.max_size() == 0);
|
|
DLIB_TEST(test_1.max_size() == 1);
|
|
|
|
|
|
for (int i = 0; i < 100; ++i)
|
|
{
|
|
a = 1;
|
|
test.enqueue_or_timeout(a,0);
|
|
a = 1;
|
|
test_0.enqueue_or_timeout(a,0);
|
|
a = 1;
|
|
test_1.enqueue_or_timeout(a,0);
|
|
}
|
|
|
|
DLIB_TEST_MSG(test.size() == 10,"size: " << test.size() );
|
|
DLIB_TEST_MSG(test_0.size() == 0,"size: " << test.size() );
|
|
DLIB_TEST_MSG(test_1.size() == 1,"size: " << test.size() );
|
|
|
|
for (int i = 0; i < 10; ++i)
|
|
{
|
|
a = 0;
|
|
DLIB_TEST(test.enqueue_or_timeout(a,10) == false);
|
|
a = 0;
|
|
DLIB_TEST(test_0.enqueue_or_timeout(a,10) == false);
|
|
a = 0;
|
|
DLIB_TEST(test_1.enqueue_or_timeout(a,10) == false);
|
|
}
|
|
|
|
DLIB_TEST_MSG(test.size() == 10,"size: " << test.size() );
|
|
DLIB_TEST_MSG(test_0.size() == 0,"size: " << test.size() );
|
|
DLIB_TEST_MSG(test_1.size() == 1,"size: " << test.size() );
|
|
|
|
for (int i = 0; i < 10; ++i)
|
|
{
|
|
a = 0;
|
|
DLIB_TEST(test.dequeue_or_timeout(a,0) == true);
|
|
DLIB_TEST(a == 1);
|
|
}
|
|
|
|
DLIB_TEST(test.max_size() == 10);
|
|
DLIB_TEST(test_0.max_size() == 0);
|
|
DLIB_TEST(test_1.max_size() == 1);
|
|
|
|
a = 0;
|
|
DLIB_TEST(test_1.dequeue_or_timeout(a,0) == true);
|
|
|
|
DLIB_TEST(test.max_size() == 10);
|
|
DLIB_TEST(test_0.max_size() == 0);
|
|
DLIB_TEST(test_1.max_size() == 1);
|
|
|
|
|
|
DLIB_TEST_MSG(a == 1,"a: " << a);
|
|
|
|
DLIB_TEST(test.size() == 0);
|
|
DLIB_TEST(test_0.size() == 0);
|
|
DLIB_TEST(test_1.size() == 0);
|
|
|
|
DLIB_TEST(test.dequeue_or_timeout(a,0) == false);
|
|
DLIB_TEST(test_0.dequeue_or_timeout(a,0) == false);
|
|
DLIB_TEST(test_1.dequeue_or_timeout(a,0) == false);
|
|
DLIB_TEST(test.dequeue_or_timeout(a,10) == false);
|
|
DLIB_TEST(test_0.dequeue_or_timeout(a,10) == false);
|
|
DLIB_TEST(test_1.dequeue_or_timeout(a,10) == false);
|
|
|
|
DLIB_TEST(test.size() == 0);
|
|
DLIB_TEST(test_0.size() == 0);
|
|
DLIB_TEST(test_1.size() == 0);
|
|
|
|
DLIB_TEST(found_error == false);
|
|
|
|
|
|
|
|
|
|
{
|
|
test.enable();
|
|
test.enable_enqueue();
|
|
test.empty();
|
|
DLIB_TEST(test.size() == 0);
|
|
DLIB_TEST(test.is_enabled() == true);
|
|
DLIB_TEST(test.is_enqueue_enabled() == true);
|
|
DLIB_TEST(test.is_dequeue_enabled() == true);
|
|
test.disable_dequeue();
|
|
dlog << LINFO << "Make sure disable_dequeue() works right...";
|
|
DLIB_TEST(test.is_dequeue_enabled() == false);
|
|
DLIB_TEST(test.dequeue(a) == false);
|
|
test.wait_until_empty();
|
|
a = 4;
|
|
test.enqueue(a);
|
|
test.wait_until_empty();
|
|
test.wait_for_num_blocked_dequeues(4);
|
|
DLIB_TEST(test.size() == 1);
|
|
DLIB_TEST(test.dequeue(a) == false);
|
|
DLIB_TEST(test.dequeue_or_timeout(a,10000) == false);
|
|
DLIB_TEST(test.size() == 1);
|
|
a = 0;
|
|
test.enable_dequeue();
|
|
DLIB_TEST(test.is_dequeue_enabled() == true);
|
|
DLIB_TEST(test.dequeue(a) == true);
|
|
DLIB_TEST(a == 4);
|
|
test_1.wait_until_empty();
|
|
}
|
|
{
|
|
test_1.enable();
|
|
test_1.enable_enqueue();
|
|
test_1.empty();
|
|
DLIB_TEST(test_1.size() == 0);
|
|
DLIB_TEST(test_1.is_enabled() == true);
|
|
DLIB_TEST(test_1.is_enqueue_enabled() == true);
|
|
DLIB_TEST(test_1.is_dequeue_enabled() == true);
|
|
test_1.disable_dequeue();
|
|
dlog << LINFO << "Make sure disable_dequeue() works right...";
|
|
DLIB_TEST(test_1.is_dequeue_enabled() == false);
|
|
DLIB_TEST(test_1.dequeue(a) == false);
|
|
a = 4;
|
|
test_1.wait_for_num_blocked_dequeues(4);
|
|
test_1.wait_for_num_blocked_dequeues(0);
|
|
test_1.enqueue(a);
|
|
test_1.wait_until_empty();
|
|
DLIB_TEST(test_1.size() == 1);
|
|
DLIB_TEST(test_1.dequeue(a) == false);
|
|
DLIB_TEST(test_1.dequeue_or_timeout(a,10000) == false);
|
|
DLIB_TEST(test_1.size() == 1);
|
|
a = 0;
|
|
test_1.enable_dequeue();
|
|
DLIB_TEST(test_1.is_dequeue_enabled() == true);
|
|
DLIB_TEST(test_1.dequeue(a) == true);
|
|
DLIB_TEST(a == 4);
|
|
test_1.wait_until_empty();
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class pipe_tester : public tester
|
|
{
|
|
public:
|
|
pipe_tester (
|
|
) :
|
|
tester ("test_pipe",
|
|
"Runs tests on the pipe component.")
|
|
{}
|
|
|
|
void perform_test (
|
|
)
|
|
{
|
|
pipe_kernel_test<dlib::pipe<int> >();
|
|
|
|
do_zero_size_test_with_timeouts();
|
|
}
|
|
} a;
|
|
|
|
}
|
|
|
|
|