diff --git a/src/buffer.rs b/src/buffer.rs new file mode 100644 index 0000000..33f3877 --- /dev/null +++ b/src/buffer.rs @@ -0,0 +1,132 @@ +use std::sync::{Arc, Condvar, Mutex}; +use std::thread; +use std::time::Duration; +// Using MaybeUninit is the idiomatic way to handle a buffer +// of potentially uninitialized items without requiring a Default trait. +use std::mem::MaybeUninit; + +/// A fixed-capacity, overwriting ring buffer. +pub struct RingBuffer { + buffer: [MaybeUninit; N], + head: usize, + tail: usize, + size: usize, +} + +impl RingBuffer { + /// Creates a new, empty RingBuffer. + pub fn new() -> Self { + Self { + // This is a safe way to create an array of uninitialized data. + buffer: unsafe { MaybeUninit::uninit().assume_init() }, + head: 0, + tail: 0, + size: 0, + } + } + + /// Checks if the buffer is empty. + pub fn is_empty(&self) -> bool { + self.size == 0 + } + + /// Pushes an item into the buffer. + /// If the buffer is full, the oldest item is overwritten. + pub fn push(&mut self, item: T) { + // Write the item to the tail position. + // This is safe because we manage initialization with `size`. + self.buffer[self.tail].write(item); + self.tail = (self.tail + 1) % N; + + if self.size < N { + // Buffer is not full, just increment size. + self.size += 1; + } else { + // Buffer was full. The push overwrote the oldest item. + // The head must also advance to the new oldest item. + self.head = (self.head + 1) % N; + } + } + + /// Pops an item from the buffer. + /// Returns None if the buffer is empty. + pub fn pop(&mut self) -> Option { + if self.is_empty() { + return None; + } + + // Read the item from the head, leaving that slot uninitialized. + // This is safe because is_empty() check ensures `head` points to valid data. + let item = unsafe { self.buffer[self.head].assume_init_read() }; + self.head = (self.head + 1) % N; + self.size -= 1; + Some(item) + } +} + +fn main() { + const CAPACITY: usize = 5; + println!("Ring Buffer Capacity: {}", CAPACITY); + println!("Producer will produce 15 items, so overwrites are expected."); + + // The state is shared between threads using an Arc (Atomic Reference Counter). + // The Mutex ensures exclusive access, and the Condvar allows threads to wait efficiently. + let pair = Arc::new(( + Mutex::new(RingBuffer::::new()), + Condvar::new(), + )); + + // --- Producer Thread --- + let producer_pair = Arc::clone(&pair); + let producer_handle = thread::spawn(move || { + for i in 0..15 { + // Lock the mutex to get exclusive access to the buffer. + let (lock, cvar) = &*producer_pair; + let mut buffer = lock.lock().unwrap(); + + buffer.push(i); + println!("➡️ Produced: {}", i); + + // This is crucial: after adding an item, we notify one waiting thread. + cvar.notify_one(); + + // We don't need the lock anymore, so we can drop it explicitly + // before sleeping to allow the consumer to work. + drop(buffer); + thread::sleep(Duration::from_millis(100)); // Producer is fast + } + println!("✅ Producer finished."); + }); + + // --- Consumer Thread --- + let consumer_pair = Arc::clone(&pair); + let consumer_handle = thread::spawn(move || { + let mut items_processed = 0; + while items_processed < 15 { + let (lock, cvar) = &*consumer_pair; + let mut buffer = lock.lock().unwrap(); + + // Use a while loop to handle spurious wakeups. + // The thread will sleep until the buffer is no longer empty. + while buffer.is_empty() { + // `cvar.wait` atomically unlocks the mutex and waits. + // When woken, it re-locks the mutex before returning. + buffer = cvar.wait(buffer).unwrap(); + } + + // At this point, the buffer is not empty. + if let Some(item) = buffer.pop() { + println!(" Consumed: {}", item); + items_processed += 1; + } + + drop(buffer); + thread::sleep(Duration::from_millis(300)); // Consumer is slow + } + println!("✅ Consumer finished."); + }); + + producer_handle.join().unwrap(); + consumer_handle.join().unwrap(); +} + diff --git a/src/main.rs b/src/main.rs index 23bc058..3e56a03 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,11 @@ +mod buffer; +use crate::buffer::{RingBuffer}; use std::fs::File; use std::io::BufReader; use std::net::{ToSocketAddrs, UdpSocket}; use std::time::Duration; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread; // Constants from the C code const QUEUE_LEN: usize = 1000; @@ -96,6 +100,7 @@ impl BouncingImage { struct Display { socket: UdpSocket, bufs: Vec<[u8; MSGSIZE]>, + pair: Arc<(Mutex>,Condvar)>, next_buf: usize, pos_in_buf: usize, } @@ -117,10 +122,14 @@ impl Display { buf[0] = 0x00; buf[1] = 0x01; } - + let pair = Arc::new(( + Mutex::new(RingBuffer::::new()), + Condvar::new(), + )); Display { socket, bufs, + pair, next_buf: 0, pos_in_buf: 0, } @@ -167,32 +176,45 @@ impl Display { } } } + fn send_thread(&mut self) { + let consumer_pair = Arc::clone(&self.pair); + let consumer_handle = thread::spawn(move || { + let mut items_processed = 0; + loop { + let (lock, cvar) = &*consumer_pair; + let mut buffer = lock.lock().unwrap(); - /// Clears the entire screen to black. - #[allow(dead_code)] - fn blank_screen(&mut self) { - for x in 0..DISPLAY_WIDTH { - for y in 0..DISPLAY_HEIGHT { - self.set_pixel(x as u16, y as u16, 0, 0, 0); + // Use a while loop to handle spurious wakeups. + // The thread will sleep until the buffer is no longer empty. + while buffer.is_empty() { + // `cvar.wait` atomically unlocks the mutex and waits. + // When woken, it re-locks the mutex before returning. + buffer = cvar.wait(buffer).unwrap(); } + + // At this point, the buffer is not empty. + if let Some(item) = buffer.pop() { + println!(" Consumed: {}", item); + items_processed += 1; + } + + drop(buffer); + thread::sleep(Duration::from_millis(300)); // Consumer is slow } - self.flush_frame(); + }); + } } fn main() { let mut images = vec![ BouncingImage::new("images/unicorn_cc.png", 13, -10, 1, -1, -1), - BouncingImage::new("images/windows_logo.png", -8, 3, 2, -1, -1), - BouncingImage::new("images/spade.png", 32, -12, 1, 0, 0), - BouncingImage::new("images/dvdvideo.png", 20, 6, 5, 1000, 800), - BouncingImage::new("images/hackaday.png", 40, 18, 3, 500, 800), ]; let mut display = Display::new(DISPLAY_HOST, DISPLAY_PORT); let mut frame_counter: u32 = 0; - // display.blank_screen(); + display.send_thread(); loop { for (i, bb) in images.iter_mut().enumerate() { @@ -203,7 +225,7 @@ fn main() { } display.flush_frame(); frame_counter += 1; - + println!("test"); // A small delay to control the frame rate std::thread::sleep(Duration::from_millis(16)); }