begin queu method

This commit is contained in:
0m.ax 2025-07-18 22:53:57 +02:00
parent 970bb5187a
commit 57880ef70a
2 changed files with 142 additions and 2 deletions

132
src/buffer.rs Normal file
View file

@ -0,0 +1,132 @@
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
// Using MaybeUninit is the idiomatic way to handle a buffer
// of potentially uninitialized items without requiring a Default trait.
use std::mem::MaybeUninit;
/// A fixed-capacity, overwriting ring buffer.
pub struct RingBuffer<T, const N: usize> {
buffer: [MaybeUninit<T>; N],
head: usize,
tail: usize,
size: usize,
}
impl<T, const N: usize> RingBuffer<T, N> {
/// Creates a new, empty RingBuffer.
pub fn new() -> Self {
Self {
// This is a safe way to create an array of uninitialized data.
buffer: unsafe { MaybeUninit::uninit().assume_init() },
head: 0,
tail: 0,
size: 0,
}
}
/// Checks if the buffer is empty.
pub fn is_empty(&self) -> bool {
self.size == 0
}
/// Pushes an item into the buffer.
/// If the buffer is full, the oldest item is overwritten.
pub fn push(&mut self, item: T) {
// Write the item to the tail position.
// This is safe because we manage initialization with `size`.
self.buffer[self.tail].write(item);
self.tail = (self.tail + 1) % N;
if self.size < N {
// Buffer is not full, just increment size.
self.size += 1;
} else {
// Buffer was full. The push overwrote the oldest item.
// The head must also advance to the new oldest item.
self.head = (self.head + 1) % N;
}
}
/// Pops an item from the buffer.
/// Returns None if the buffer is empty.
pub fn pop(&mut self) -> Option<T> {
if self.is_empty() {
return None;
}
// Read the item from the head, leaving that slot uninitialized.
// This is safe because is_empty() check ensures `head` points to valid data.
let item = unsafe { self.buffer[self.head].assume_init_read() };
self.head = (self.head + 1) % N;
self.size -= 1;
Some(item)
}
}
fn main() {
const CAPACITY: usize = 5;
println!("Ring Buffer Capacity: {}", CAPACITY);
println!("Producer will produce 15 items, so overwrites are expected.");
// The state is shared between threads using an Arc (Atomic Reference Counter).
// The Mutex ensures exclusive access, and the Condvar allows threads to wait efficiently.
let pair = Arc::new((
Mutex::new(RingBuffer::<i32, CAPACITY>::new()),
Condvar::new(),
));
// --- Producer Thread ---
let producer_pair = Arc::clone(&pair);
let producer_handle = thread::spawn(move || {
for i in 0..15 {
// Lock the mutex to get exclusive access to the buffer.
let (lock, cvar) = &*producer_pair;
let mut buffer = lock.lock().unwrap();
buffer.push(i);
println!("➡️ Produced: {}", i);
// This is crucial: after adding an item, we notify one waiting thread.
cvar.notify_one();
// We don't need the lock anymore, so we can drop it explicitly
// before sleeping to allow the consumer to work.
drop(buffer);
thread::sleep(Duration::from_millis(100)); // Producer is fast
}
println!("✅ Producer finished.");
});
// --- Consumer Thread ---
let consumer_pair = Arc::clone(&pair);
let consumer_handle = thread::spawn(move || {
let mut items_processed = 0;
while items_processed < 15 {
let (lock, cvar) = &*consumer_pair;
let mut buffer = lock.lock().unwrap();
// Use a while loop to handle spurious wakeups.
// The thread will sleep until the buffer is no longer empty.
while buffer.is_empty() {
// `cvar.wait` atomically unlocks the mutex and waits.
// When woken, it re-locks the mutex before returning.
buffer = cvar.wait(buffer).unwrap();
}
// At this point, the buffer is not empty.
if let Some(item) = buffer.pop() {
println!(" Consumed: {}", item);
items_processed += 1;
}
drop(buffer);
thread::sleep(Duration::from_millis(300)); // Consumer is slow
}
println!("✅ Consumer finished.");
});
producer_handle.join().unwrap();
consumer_handle.join().unwrap();
}

View file

@ -1,7 +1,10 @@
mod buffer;
use crate::buffer::{RingBuffer};
use std::fs::File;
use std::io::BufReader;
use std::net::{ToSocketAddrs, UdpSocket};
use std::time::Duration;
use std::sync::{Arc, Condvar, Mutex};
// Constants from the C code
const QUEUE_LEN: usize = 1000;
@ -96,6 +99,7 @@ impl BouncingImage {
struct Display {
socket: UdpSocket,
bufs: Vec<[u8; MSGSIZE]>,
pair: Arc<(Mutex<RingBuffer::<i32, QUEUE_LEN>>,Condvar)>,
next_buf: usize,
pos_in_buf: usize,
}
@ -117,10 +121,14 @@ impl Display {
buf[0] = 0x00;
buf[1] = 0x01;
}
let pair = Arc::new((
Mutex::new(RingBuffer::<i32, QUEUE_LEN>::new()),
Condvar::new(),
));
Display {
socket,
bufs,
pair,
next_buf: 0,
pos_in_buf: 0,
}
@ -188,7 +196,7 @@ fn main() {
}
display.flush_frame();
frame_counter += 1;
println!("test");
// A small delay to control the frame rate
std::thread::sleep(Duration::from_millis(16));
}