Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
0m.ax
c9d179c33f threading 2025-07-18 23:03:45 +02:00
0m.ax
57880ef70a begin queu method 2025-07-18 22:53:57 +02:00
0m.ax
970bb5187a 1 image 2025-07-18 22:17:22 +02:00
2 changed files with 168 additions and 14 deletions

132
src/buffer.rs Normal file
View file

@ -0,0 +1,132 @@
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
// Using MaybeUninit is the idiomatic way to handle a buffer
// of potentially uninitialized items without requiring a Default trait.
use std::mem::MaybeUninit;
/// A fixed-capacity, overwriting ring buffer.
pub struct RingBuffer<T, const N: usize> {
buffer: [MaybeUninit<T>; N],
head: usize,
tail: usize,
size: usize,
}
impl<T, const N: usize> RingBuffer<T, N> {
/// Creates a new, empty RingBuffer.
pub fn new() -> Self {
Self {
// This is a safe way to create an array of uninitialized data.
buffer: unsafe { MaybeUninit::uninit().assume_init() },
head: 0,
tail: 0,
size: 0,
}
}
/// Checks if the buffer is empty.
pub fn is_empty(&self) -> bool {
self.size == 0
}
/// Pushes an item into the buffer.
/// If the buffer is full, the oldest item is overwritten.
pub fn push(&mut self, item: T) {
// Write the item to the tail position.
// This is safe because we manage initialization with `size`.
self.buffer[self.tail].write(item);
self.tail = (self.tail + 1) % N;
if self.size < N {
// Buffer is not full, just increment size.
self.size += 1;
} else {
// Buffer was full. The push overwrote the oldest item.
// The head must also advance to the new oldest item.
self.head = (self.head + 1) % N;
}
}
/// Pops an item from the buffer.
/// Returns None if the buffer is empty.
pub fn pop(&mut self) -> Option<T> {
if self.is_empty() {
return None;
}
// Read the item from the head, leaving that slot uninitialized.
// This is safe because is_empty() check ensures `head` points to valid data.
let item = unsafe { self.buffer[self.head].assume_init_read() };
self.head = (self.head + 1) % N;
self.size -= 1;
Some(item)
}
}
fn main() {
const CAPACITY: usize = 5;
println!("Ring Buffer Capacity: {}", CAPACITY);
println!("Producer will produce 15 items, so overwrites are expected.");
// The state is shared between threads using an Arc (Atomic Reference Counter).
// The Mutex ensures exclusive access, and the Condvar allows threads to wait efficiently.
let pair = Arc::new((
Mutex::new(RingBuffer::<i32, CAPACITY>::new()),
Condvar::new(),
));
// --- Producer Thread ---
let producer_pair = Arc::clone(&pair);
let producer_handle = thread::spawn(move || {
for i in 0..15 {
// Lock the mutex to get exclusive access to the buffer.
let (lock, cvar) = &*producer_pair;
let mut buffer = lock.lock().unwrap();
buffer.push(i);
println!("➡️ Produced: {}", i);
// This is crucial: after adding an item, we notify one waiting thread.
cvar.notify_one();
// We don't need the lock anymore, so we can drop it explicitly
// before sleeping to allow the consumer to work.
drop(buffer);
thread::sleep(Duration::from_millis(100)); // Producer is fast
}
println!("✅ Producer finished.");
});
// --- Consumer Thread ---
let consumer_pair = Arc::clone(&pair);
let consumer_handle = thread::spawn(move || {
let mut items_processed = 0;
while items_processed < 15 {
let (lock, cvar) = &*consumer_pair;
let mut buffer = lock.lock().unwrap();
// Use a while loop to handle spurious wakeups.
// The thread will sleep until the buffer is no longer empty.
while buffer.is_empty() {
// `cvar.wait` atomically unlocks the mutex and waits.
// When woken, it re-locks the mutex before returning.
buffer = cvar.wait(buffer).unwrap();
}
// At this point, the buffer is not empty.
if let Some(item) = buffer.pop() {
println!(" Consumed: {}", item);
items_processed += 1;
}
drop(buffer);
thread::sleep(Duration::from_millis(300)); // Consumer is slow
}
println!("✅ Consumer finished.");
});
producer_handle.join().unwrap();
consumer_handle.join().unwrap();
}

View file

@ -1,7 +1,11 @@
mod buffer;
use crate::buffer::{RingBuffer};
use std::fs::File;
use std::io::BufReader;
use std::net::{ToSocketAddrs, UdpSocket};
use std::time::Duration;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
// Constants from the C code
const QUEUE_LEN: usize = 1000;
@ -96,6 +100,7 @@ impl BouncingImage {
struct Display {
socket: UdpSocket,
bufs: Vec<[u8; MSGSIZE]>,
pair: Arc<(Mutex<RingBuffer::<i32, QUEUE_LEN>>,Condvar)>,
next_buf: usize,
pos_in_buf: usize,
}
@ -117,10 +122,14 @@ impl Display {
buf[0] = 0x00;
buf[1] = 0x01;
}
let pair = Arc::new((
Mutex::new(RingBuffer::<i32, QUEUE_LEN>::new()),
Condvar::new(),
));
Display {
socket,
bufs,
pair,
next_buf: 0,
pos_in_buf: 0,
}
@ -167,32 +176,45 @@ impl Display {
}
}
}
fn send_thread(&mut self) {
let consumer_pair = Arc::clone(&self.pair);
let consumer_handle = thread::spawn(move || {
let mut items_processed = 0;
loop {
let (lock, cvar) = &*consumer_pair;
let mut buffer = lock.lock().unwrap();
/// Clears the entire screen to black.
#[allow(dead_code)]
fn blank_screen(&mut self) {
for x in 0..DISPLAY_WIDTH {
for y in 0..DISPLAY_HEIGHT {
self.set_pixel(x as u16, y as u16, 0, 0, 0);
// Use a while loop to handle spurious wakeups.
// The thread will sleep until the buffer is no longer empty.
while buffer.is_empty() {
// `cvar.wait` atomically unlocks the mutex and waits.
// When woken, it re-locks the mutex before returning.
buffer = cvar.wait(buffer).unwrap();
}
// At this point, the buffer is not empty.
if let Some(item) = buffer.pop() {
println!(" Consumed: {}", item);
items_processed += 1;
}
drop(buffer);
thread::sleep(Duration::from_millis(300)); // Consumer is slow
}
self.flush_frame();
});
}
}
fn main() {
let mut images = vec![
BouncingImage::new("images/unicorn_cc.png", 13, -10, 1, -1, -1),
BouncingImage::new("images/windows_logo.png", -8, 3, 2, -1, -1),
BouncingImage::new("images/spade.png", 32, -12, 1, 0, 0),
BouncingImage::new("images/dvdvideo.png", 20, 6, 5, 1000, 800),
BouncingImage::new("images/hackaday.png", 40, 18, 3, 500, 800),
];
let mut display = Display::new(DISPLAY_HOST, DISPLAY_PORT);
let mut frame_counter: u32 = 0;
// display.blank_screen();
display.send_thread();
loop {
for (i, bb) in images.iter_mut().enumerate() {
@ -203,7 +225,7 @@ fn main() {
}
display.flush_frame();
frame_counter += 1;
println!("test");
// A small delay to control the frame rate
std::thread::sleep(Duration::from_millis(16));
}