OSの講義の6.xの問題を解く
これが正解だぜ(どや)みたいなノリでは書いてないです、ただの平均的な技術力を持っているB2が解いているだけ
課題の取り組みをブログで書いてもいいという許可をもらった上で記事を書いてます
最新の課題内容であることや、答えが正しいかどうかは保証しません
このブログの情報で読者に何らかの不都合が生じても著者は責任は取らないです
6.1 Rust上のPipeline実行
このページの問題を解く
async と future または channel を使って Mercural ssh:/net/home/hg/teacher/kono/os/ex/WorkerModel1 と同じような pipeline 実行を実現してみよ。 まず、 すべての pipeline を実行してきちんと停止することを確認する 。
amaneの/home/hg/teacher/kono/os/ex/WorkerModel1
から取ってくる
$ rsync -avPz amane:/home/hg/teacher/kono/os/ex/WorkerModel1 ./6.x/6.1/
. └── WorkerModel1 └── src └── worker ├── Task.java ├── ThreadPoolTest.java └── Worker.java
WorkerModel1/src/worker/Task.java
package worker; public class Task { private String s; public Task(String s) { this.s = s; } public String get() { return s; } public void work() throws InterruptedException { Thread.sleep(100); System.out.print(s); } }
WorkerModel1/src/worker/Worker.javak
package worker; import java.util.concurrent.SynchronousQueue; public class Worker implements Runnable { public SynchronousQueue<Task> queue; public Worker(SynchronousQueue<Task> queue) { this.queue = queue; } @Override public void run() { while(true) { Task t; try { t = queue.take(); t.work(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } }
WorkerModel1/src/worker/ThreadPoolTest.java
package worker; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; public class ThreadPoolTest { static public void main(String [] arg) { int cpu = 16; System.out.println("hello"); ExecutorService pool = Executors.newFixedThreadPool(cpu); System.out.println(pool); SynchronousQueue<Task> queue = new SynchronousQueue<Task>(); System.out.println(queue); for(int i =0;i<cpu;i++) { pool.execute(new Worker(queue)); } new Thread(new Runnable(){ @Override public void run() { while(true) { try { Thread.sleep(50); System.out.println(); // " chick"); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); String [] data = { "a","b","c","d","e","f", "a","b","c","d","e","f", "a","b","c","d","e","f", "a","b","c","d","e","f", "a","b","c","d","e","f", "a","b","c","d","e","f", "a","b","c","d","e","f", "a","b","c","d","e","f", ""}; for( String s : data) { try { queue.put(new Task(s)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Rustで実装したコード
- ワーカープール
- 両方のコードには、処理を実行するためのワーカーのプールがある。
- JavaではExecutorServiceがこれを提供し、Rustでは手動で実装されている。
- タスクの実行
- 両方のコードでは、各ワーカーはキューからタスクを取得し、それを処理する。
- JavaではSynchronousQueueが使用され、Rustではmpsc::channelが使用される。
- 非同期タスク
use std::error::Error; use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; #[derive(Clone)] struct Task { s: Arc<String>, } impl Task { // Constructs a new `Task`. fn new(s: Arc<String>) -> Self { Task { s } } // Simulates work asynchronously. async fn work(&self) { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; println!("{}", self.s); } } struct Worker { receiver: Arc<Mutex<mpsc::Receiver<Task>>>, } impl Worker { // Constructs a new `Worker`. fn new(receiver: Arc<Mutex<mpsc::Receiver<Task>>>) -> Self { Worker { receiver } } // Runs the worker, listening for new tasks. async fn run(&self) { // `lock().await` is used to acquire the lock asynchronously. while let Some(task) = self.receiver.lock().await.recv().await { task.work().await; } } } #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let cpu_count = 16; let (tx, rx) = mpsc::channel::<Task>(cpu_count); // Buffer size can be adjusted as needed. let rx = Arc::new(Mutex::new(rx)); // Share Receiver among several workers. // Launch multiple workers. for _ in 0..cpu_count { let worker = Worker::new(rx.clone()); // Pass a clone of Arc<Mutex<Receiver>> to each worker. tokio::spawn(async move { worker.run().await; }); } // Spawn a new thread to periodically output messages. tokio::spawn(async { loop { tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; println!(); // Use println!("chick"); if you want to output "chick". } }); let data = [ "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f", ]; // Send tasks. for s in &data { let task_message = Arc::new(s.to_string()); let task = Task::new(task_message); tx.send(task).await?; } // Once all tasks are sent, drop tx to close the channel. // This will cause the worker's loop to terminate, ending the worker tasks as a result. drop(tx); // At this point, the main task ends, but the program continues running until all background tasks are complete. Ok(()) }
実行結果
a b c d e f a b c d e f a b c d e f a b c d e f a b c d e f a ←ここで終了
6.2 golang で Semaphor を実装してみる
このページの問題を解く
golang は channel をつかった同期をもっている。 これを使って Semaphor を実装せよ。 libary を使わずに自分で実装すること
書いたコード
- セマフォの実装
- セマフォは固定バッファサイズのチャネルとして実装されている。
- Acquireメソッドでは、チャネルに値を送信することで許可を取得する。
- Releaseメソッドでは、チャネルから値を受信することで許可を解放する。
- セマフォの応用
- 実装したセマフォは同時にリソースにアクセスできるゴルーチンの数を制限するのに使用できる
- このコードでは、リソースを消費する重い処理をtime.Sleepで模倣している
package main import ( "fmt" "math/rand" "sync" "time" ) // Semaphore is a type representing a semaphore. type Semaphore chan struct{} // NewSemaphore creates a new semaphore with the specified number of permits. func NewSemaphore(n int) Semaphore { return make(Semaphore, n) } // Acquire obtains one permit from the semaphore. func (s Semaphore) Acquire() { s <- struct{}{} // Send to the channel to acquire a permit. This will block if the channel is full. } // Release releases one permit back to the semaphore. func (s Semaphore) Release() { <-s // Receive from the channel to release a permit. } func heavyProcess(id int, sema Semaphore, wg *sync.WaitGroup) { defer wg.Done() // Decrement the counter of the WaitGroup as this goroutine completes. sema.Acquire() // Acquire a permit from the semaphore. defer sema.Release() // Release the permit when this function exits. // Simulate some heavy work. fmt.Printf("Goroutine %d is starting heavy processing\n", id) processTime := time.Duration(rand.Intn(3)+1) * time.Second time.Sleep(processTime) // Representing heavy work just by sleeping here. fmt.Printf("Goroutine %d has completed heavy processing in %v\n", id, processTime) } func main() { rand.Seed(time.Now().UnixNano()) // Initialize the random seed. sema := NewSemaphore(3) // Only 3 goroutines are allowed to run concurrently. var wg sync.WaitGroup // WaitGroup for waiting for all goroutines to complete. for i := 0; i < 10; i++ { wg.Add(1) // Increment the WaitGroup counter for each goroutine. go heavyProcess(i, sema, &wg) } wg.Wait() // Wait for all goroutines to complete. fmt.Println("All goroutines have completed processing") }
実行結果
Goroutine 3 is starting heavy processing Goroutine 5 is starting heavy processing Goroutine 9 is starting heavy processing Goroutine 3 has completed heavy processing in 1s Goroutine 6 is starting heavy processing Goroutine 5 has completed heavy processing in 2s Goroutine 7 is starting heavy processing Goroutine 7 has completed heavy processing in 1s Goroutine 8 is starting heavy processing Goroutine 9 has completed heavy processing in 3s Goroutine 2 is starting heavy processing Goroutine 6 has completed heavy processing in 2s Goroutine 0 is starting heavy processing Goroutine 8 has completed heavy processing in 1s Goroutine 4 is starting heavy processing Goroutine 0 has completed heavy processing in 1s Goroutine 1 is starting heavy processing Goroutine 2 has completed heavy processing in 2s Goroutine 1 has completed heavy processing in 1s Goroutine 4 has completed heavy processing in 1s All goroutines have completed processing
6.3 Dead Lock
このページの問題を解く
激むず