Compare commits
3 commits
master
...
some-buffe
Author | SHA1 | Date | |
---|---|---|---|
|
c9d179c33f | ||
|
57880ef70a | ||
|
970bb5187a |
2 changed files with 168 additions and 14 deletions
132
src/buffer.rs
Normal file
132
src/buffer.rs
Normal file
|
@ -0,0 +1,132 @@
|
|||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
// Using MaybeUninit is the idiomatic way to handle a buffer
|
||||
// of potentially uninitialized items without requiring a Default trait.
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
/// A fixed-capacity, overwriting ring buffer.
|
||||
pub struct RingBuffer<T, const N: usize> {
|
||||
buffer: [MaybeUninit<T>; N],
|
||||
head: usize,
|
||||
tail: usize,
|
||||
size: usize,
|
||||
}
|
||||
|
||||
impl<T, const N: usize> RingBuffer<T, N> {
|
||||
/// Creates a new, empty RingBuffer.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
// This is a safe way to create an array of uninitialized data.
|
||||
buffer: unsafe { MaybeUninit::uninit().assume_init() },
|
||||
head: 0,
|
||||
tail: 0,
|
||||
size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if the buffer is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.size == 0
|
||||
}
|
||||
|
||||
/// Pushes an item into the buffer.
|
||||
/// If the buffer is full, the oldest item is overwritten.
|
||||
pub fn push(&mut self, item: T) {
|
||||
// Write the item to the tail position.
|
||||
// This is safe because we manage initialization with `size`.
|
||||
self.buffer[self.tail].write(item);
|
||||
self.tail = (self.tail + 1) % N;
|
||||
|
||||
if self.size < N {
|
||||
// Buffer is not full, just increment size.
|
||||
self.size += 1;
|
||||
} else {
|
||||
// Buffer was full. The push overwrote the oldest item.
|
||||
// The head must also advance to the new oldest item.
|
||||
self.head = (self.head + 1) % N;
|
||||
}
|
||||
}
|
||||
|
||||
/// Pops an item from the buffer.
|
||||
/// Returns None if the buffer is empty.
|
||||
pub fn pop(&mut self) -> Option<T> {
|
||||
if self.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Read the item from the head, leaving that slot uninitialized.
|
||||
// This is safe because is_empty() check ensures `head` points to valid data.
|
||||
let item = unsafe { self.buffer[self.head].assume_init_read() };
|
||||
self.head = (self.head + 1) % N;
|
||||
self.size -= 1;
|
||||
Some(item)
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
const CAPACITY: usize = 5;
|
||||
println!("Ring Buffer Capacity: {}", CAPACITY);
|
||||
println!("Producer will produce 15 items, so overwrites are expected.");
|
||||
|
||||
// The state is shared between threads using an Arc (Atomic Reference Counter).
|
||||
// The Mutex ensures exclusive access, and the Condvar allows threads to wait efficiently.
|
||||
let pair = Arc::new((
|
||||
Mutex::new(RingBuffer::<i32, CAPACITY>::new()),
|
||||
Condvar::new(),
|
||||
));
|
||||
|
||||
// --- Producer Thread ---
|
||||
let producer_pair = Arc::clone(&pair);
|
||||
let producer_handle = thread::spawn(move || {
|
||||
for i in 0..15 {
|
||||
// Lock the mutex to get exclusive access to the buffer.
|
||||
let (lock, cvar) = &*producer_pair;
|
||||
let mut buffer = lock.lock().unwrap();
|
||||
|
||||
buffer.push(i);
|
||||
println!("➡️ Produced: {}", i);
|
||||
|
||||
// This is crucial: after adding an item, we notify one waiting thread.
|
||||
cvar.notify_one();
|
||||
|
||||
// We don't need the lock anymore, so we can drop it explicitly
|
||||
// before sleeping to allow the consumer to work.
|
||||
drop(buffer);
|
||||
thread::sleep(Duration::from_millis(100)); // Producer is fast
|
||||
}
|
||||
println!("✅ Producer finished.");
|
||||
});
|
||||
|
||||
// --- Consumer Thread ---
|
||||
let consumer_pair = Arc::clone(&pair);
|
||||
let consumer_handle = thread::spawn(move || {
|
||||
let mut items_processed = 0;
|
||||
while items_processed < 15 {
|
||||
let (lock, cvar) = &*consumer_pair;
|
||||
let mut buffer = lock.lock().unwrap();
|
||||
|
||||
// Use a while loop to handle spurious wakeups.
|
||||
// The thread will sleep until the buffer is no longer empty.
|
||||
while buffer.is_empty() {
|
||||
// `cvar.wait` atomically unlocks the mutex and waits.
|
||||
// When woken, it re-locks the mutex before returning.
|
||||
buffer = cvar.wait(buffer).unwrap();
|
||||
}
|
||||
|
||||
// At this point, the buffer is not empty.
|
||||
if let Some(item) = buffer.pop() {
|
||||
println!(" Consumed: {}", item);
|
||||
items_processed += 1;
|
||||
}
|
||||
|
||||
drop(buffer);
|
||||
thread::sleep(Duration::from_millis(300)); // Consumer is slow
|
||||
}
|
||||
println!("✅ Consumer finished.");
|
||||
});
|
||||
|
||||
producer_handle.join().unwrap();
|
||||
consumer_handle.join().unwrap();
|
||||
}
|
||||
|
50
src/main.rs
50
src/main.rs
|
@ -1,7 +1,11 @@
|
|||
mod buffer;
|
||||
use crate::buffer::{RingBuffer};
|
||||
use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
use std::net::{ToSocketAddrs, UdpSocket};
|
||||
use std::time::Duration;
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::thread;
|
||||
|
||||
// Constants from the C code
|
||||
const QUEUE_LEN: usize = 1000;
|
||||
|
@ -96,6 +100,7 @@ impl BouncingImage {
|
|||
struct Display {
|
||||
socket: UdpSocket,
|
||||
bufs: Vec<[u8; MSGSIZE]>,
|
||||
pair: Arc<(Mutex<RingBuffer::<i32, QUEUE_LEN>>,Condvar)>,
|
||||
next_buf: usize,
|
||||
pos_in_buf: usize,
|
||||
}
|
||||
|
@ -117,10 +122,14 @@ impl Display {
|
|||
buf[0] = 0x00;
|
||||
buf[1] = 0x01;
|
||||
}
|
||||
|
||||
let pair = Arc::new((
|
||||
Mutex::new(RingBuffer::<i32, QUEUE_LEN>::new()),
|
||||
Condvar::new(),
|
||||
));
|
||||
Display {
|
||||
socket,
|
||||
bufs,
|
||||
pair,
|
||||
next_buf: 0,
|
||||
pos_in_buf: 0,
|
||||
}
|
||||
|
@ -167,32 +176,45 @@ impl Display {
|
|||
}
|
||||
}
|
||||
}
|
||||
fn send_thread(&mut self) {
|
||||
let consumer_pair = Arc::clone(&self.pair);
|
||||
let consumer_handle = thread::spawn(move || {
|
||||
let mut items_processed = 0;
|
||||
loop {
|
||||
let (lock, cvar) = &*consumer_pair;
|
||||
let mut buffer = lock.lock().unwrap();
|
||||
|
||||
/// Clears the entire screen to black.
|
||||
#[allow(dead_code)]
|
||||
fn blank_screen(&mut self) {
|
||||
for x in 0..DISPLAY_WIDTH {
|
||||
for y in 0..DISPLAY_HEIGHT {
|
||||
self.set_pixel(x as u16, y as u16, 0, 0, 0);
|
||||
// Use a while loop to handle spurious wakeups.
|
||||
// The thread will sleep until the buffer is no longer empty.
|
||||
while buffer.is_empty() {
|
||||
// `cvar.wait` atomically unlocks the mutex and waits.
|
||||
// When woken, it re-locks the mutex before returning.
|
||||
buffer = cvar.wait(buffer).unwrap();
|
||||
}
|
||||
|
||||
// At this point, the buffer is not empty.
|
||||
if let Some(item) = buffer.pop() {
|
||||
println!(" Consumed: {}", item);
|
||||
items_processed += 1;
|
||||
}
|
||||
|
||||
drop(buffer);
|
||||
thread::sleep(Duration::from_millis(300)); // Consumer is slow
|
||||
}
|
||||
self.flush_frame();
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let mut images = vec![
|
||||
BouncingImage::new("images/unicorn_cc.png", 13, -10, 1, -1, -1),
|
||||
BouncingImage::new("images/windows_logo.png", -8, 3, 2, -1, -1),
|
||||
BouncingImage::new("images/spade.png", 32, -12, 1, 0, 0),
|
||||
BouncingImage::new("images/dvdvideo.png", 20, 6, 5, 1000, 800),
|
||||
BouncingImage::new("images/hackaday.png", 40, 18, 3, 500, 800),
|
||||
];
|
||||
|
||||
let mut display = Display::new(DISPLAY_HOST, DISPLAY_PORT);
|
||||
let mut frame_counter: u32 = 0;
|
||||
|
||||
// display.blank_screen();
|
||||
display.send_thread();
|
||||
|
||||
loop {
|
||||
for (i, bb) in images.iter_mut().enumerate() {
|
||||
|
@ -203,7 +225,7 @@ fn main() {
|
|||
}
|
||||
display.flush_frame();
|
||||
frame_counter += 1;
|
||||
|
||||
println!("test");
|
||||
// A small delay to control the frame rate
|
||||
std::thread::sleep(Duration::from_millis(16));
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue