[LAU] Simple, easy multithreaded circular buffer library for Linux?

Fons Adriaensen fons at kokkinizita.net
Fri Oct 17 09:39:43 EDT 2008


On Fri, Oct 17, 2008 at 03:02:30PM +0200, Paul Davis wrote:

> we can and *do* anticipate that context switches will occur there -
> there is no general way to that execute that operation
> (increment-and-mask) in a lock-free atomic way. the idea is that even if
> it does happen, the resulting error prevents the buffer from being
> misused. of course, its a little inefficient. but until we get cpu's
> with atomic inc-and-mask instructions, its the price we pay for going
> lock-free (and its a relatively small cost).

The original test fails here as well (dual i686).

I attach the same test modified to use the simple
C++ class I use normally. It has a slightly different
API (allowing random access in the block read or written),
but the essential difference is that the operations that
modify the state seen by the other end (the commit() calls)
are atomic.

The _nrd and _nwr counters (similar to the r/w pointers
in jack_ringbuffer) are never masked - the mask is
applied in the testing and r/w code only.
It would be quite easy to do something similar in the
jack_ringbuffer code.

This version procudes no errors here.

Ciao,


#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <inttypes.h>
#include <assert.h>


// compile using   g++ -I. -o testrb2 -Wall testrb2.cc -lpthread


#define ARRAY_SIZE 64
#define MAX_VALUE 0x10000



class Lfq_u32
{
public:

    Lfq_u32 (int size);
    ~Lfq_u32 (void); 

    int       write_avail (void) const { return _size - _nwr + _nrd; } 
    void      write_commit (int n) { _nwr += n; }
    void      write (int i, uint32_t v) { _data [(_nwr + i) & _mask] = v; }

    int       read_avail (void) const { return _nwr - _nrd; } 
    void      read_commit (int n) { _nrd += n; }
    uint32_t  read (int i) { return _data [(_nrd + i) & _mask]; }

private:

    uint32_t *_data;
    int       _size;
    int       _mask;
    int       _nwr;
    int       _nrd;
};


Lfq_u32::Lfq_u32 (int size) : _size (size), _mask (_size - 1), _nwr (0), _nrd (0)
{
    assert (!(_size & _mask));
    _data = new uint32_t [_size];
}

Lfq_u32::~Lfq_u32 (void)
{
    delete[] _data;
} 



static Lfq_u32 *Q;



static int fill_int_array (int *array, int start, int count)
{
    int i, j = start;
    for (i = 0; i < count; i++)
    {
	array[i] = j;
	j = (j + 1) % MAX_VALUE;
    }
    return j;
}


static int cmp_array (int *array1, int *array2, int count)
{
    int i;
    for (i = 0; i < count; i++)
	if (array1[i] != array2[i])
	{
	    printf("%d != %d at offset %d\n", array1[i], array2[i], i);
	    return 0;
	}

    return 1;
}


static void *reader_start (void * arg)
{
    int k, i = 0, a[ARRAY_SIZE], b[ARRAY_SIZE];

    unsigned long j = 0, nfailures = 0;
    printf("reader started on cpu %d\n", sched_getcpu());
    i = fill_int_array (a, i, ARRAY_SIZE);
    while (1)
    {
	if (Q->read_avail() >= ARRAY_SIZE)
	{
	    for (k = 0; k < ARRAY_SIZE; k++) b [k] = Q->read (k);
	    Q->read_commit (ARRAY_SIZE);
	    if (!cmp_array (a, b, ARRAY_SIZE))
	    {
		nfailures++;
		printf("failure in chunk %lu - probability: %lu/%lu = %.3f per million\n",
		       j, nfailures, j, (float) nfailures / (j + 1) * 1000000);
		i = (b[0] + ARRAY_SIZE) % MAX_VALUE;
	    }
	    i = fill_int_array (a, i, ARRAY_SIZE);
	    j++;
	}
    }
    return NULL;
}


static void *writer_start (void * arg)
{
    int k, i = 0, a[ARRAY_SIZE];
    printf("writer started on cpu %d\n", sched_getcpu());

    i = fill_int_array (a, i, ARRAY_SIZE);

    while (1)
    {
	if (Q->write_avail() >= ARRAY_SIZE)
	{
	    for (k = 0; k < ARRAY_SIZE; k++) Q->write (k, a [k]);
	    Q->write_commit (ARRAY_SIZE);
  	    i = fill_int_array (a, i, ARRAY_SIZE);
	}
    }
    return NULL;
}


int main(int argc, char *argv[])
{
    int size;
    pthread_t reader_thread, writer_thread;

    if (argc < 2) return 1;
    sscanf(argv[1], "%d", &size);
    printf("starting ringbuffer stress test\n");
    printf("buffer size (bytes): %d\n", size);
    printf("array size (bytes): %d\n", sizeof(int) * ARRAY_SIZE);

    Q = new Lfq_u32 (size);

    pthread_create (&reader_thread, NULL, reader_start, NULL);
    pthread_create (&writer_thread, NULL, writer_start, NULL);
    while (1) sleep(1);

    return 0;
}






-- 
FA

Laboratorio di Acustica ed Elettroacustica
Parma, Italia

Lascia la spina, cogli la rosa.




More information about the Linux-audio-user mailing list