OSの6.xの課題

OSの講義の6.xの問題を解く

これが正解だぜ(どや)みたいなノリでは書いてないです、ただの平均的な技術力を持っているB2が解いているだけ

課題の取り組みをブログで書いてもいいという許可をもらった上で記事を書いてます

最新の課題内容であることや、答えが正しいかどうかは保証しません

このブログの情報で読者に何らかの不都合が生じても著者は責任は取らないです


6.1 Rust上のPipeline実行

このページの問題を解く

async と future または channel を使って

Mercural ssh:/net/home/hg/teacher/kono/os/ex/WorkerModel1

と同じような pipeline 実行を実現してみよ。

まず、 すべての pipeline を実行してきちんと停止することを確認する 。

amaneの/home/hg/teacher/kono/os/ex/WorkerModel1から取ってくる

$ rsync -avPz amane:/home/hg/teacher/kono/os/ex/WorkerModel1 ./6.x/6.1/

.
└── WorkerModel1
    └── src
        └── worker
            ├── Task.java
            ├── ThreadPoolTest.java
            └── Worker.java

WorkerModel1/src/worker/Task.java

package worker;

public class Task {

    private String s;

    public Task(String s) {
        this.s = s;
    }
    
    public String get() {
        return s;
    }

    public void work() throws InterruptedException {
            Thread.sleep(100);
        System.out.print(s);
    }

}

WorkerModel1/src/worker/Worker.javak

package worker;

import java.util.concurrent.SynchronousQueue;

public class Worker implements Runnable {
    
    public SynchronousQueue<Task> queue;
    
    public Worker(SynchronousQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while(true) {
            Task t;
            try {
                t = queue.take();
                t.work();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }

}

WorkerModel1/src/worker/ThreadPoolTest.java

package worker;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;

public class ThreadPoolTest {
    static public void main(String [] arg) {
        int cpu = 16;
        System.out.println("hello");
        ExecutorService pool = Executors.newFixedThreadPool(cpu);
        System.out.println(pool);
        SynchronousQueue<Task> queue = new SynchronousQueue<Task>();
        System.out.println(queue);
        
        for(int i =0;i<cpu;i++) {
            pool.execute(new Worker(queue));
        }
        
        new Thread(new Runnable(){
            @Override
            public void run() {
                while(true) {
                    try {
                        Thread.sleep(50);
                        System.out.println(); // " chick");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        String [] data = {
                "a","b","c","d","e","f",
                "a","b","c","d","e","f",
                "a","b","c","d","e","f",
                "a","b","c","d","e","f",
                "a","b","c","d","e","f",
                "a","b","c","d","e","f",
                "a","b","c","d","e","f",
                "a","b","c","d","e","f",
        ""};
        for( String s : data) {
            try {
                queue.put(new Task(s));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        
        
        
    }
}

Rustで実装したコード

  • ワーカープール
    • 両方のコードには、処理を実行するためのワーカーのプールがある。
    • JavaではExecutorServiceがこれを提供し、Rustでは手動で実装されている。
  • タスクの実行
    • 両方のコードでは、各ワーカーはキューからタスクを取得し、それを処理する。
    • JavaではSynchronousQueueが使用され、Rustではmpsc::channelが使用される。
  • 非同期タスク
    • タスクは非同期的に処理される。
    • JavaのTaskクラスではThread.sleep()を使用しているが、Rustではtokio::time::sleepが使われる。
use std::error::Error;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};

#[derive(Clone)]
struct Task {
    s: Arc<String>,
}

impl Task {
    // Constructs a new `Task`.
    fn new(s: Arc<String>) -> Self {
        Task { s }
    }

    // Simulates work asynchronously.
    async fn work(&self) {
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        println!("{}", self.s);
    }
}

struct Worker {
    receiver: Arc<Mutex<mpsc::Receiver<Task>>>,
}

impl Worker {
    // Constructs a new `Worker`.
    fn new(receiver: Arc<Mutex<mpsc::Receiver<Task>>>) -> Self {
        Worker { receiver }
    }

    // Runs the worker, listening for new tasks.
    async fn run(&self) {
        // `lock().await` is used to acquire the lock asynchronously.
        while let Some(task) = self.receiver.lock().await.recv().await {
            task.work().await;
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let cpu_count = 16;
    let (tx, rx) = mpsc::channel::<Task>(cpu_count); // Buffer size can be adjusted as needed.

    let rx = Arc::new(Mutex::new(rx)); // Share Receiver among several workers.

    // Launch multiple workers.
    for _ in 0..cpu_count {
        let worker = Worker::new(rx.clone()); // Pass a clone of Arc<Mutex<Receiver>> to each worker.

        tokio::spawn(async move {
            worker.run().await;
        });
    }

    // Spawn a new thread to periodically output messages.
    tokio::spawn(async {
        loop {
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
            println!(); // Use println!("chick"); if you want to output "chick".
        }
    });

    let data = [
        "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f",
        "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f",
        "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "f",
    ];

    // Send tasks.
    for s in &data {
        let task_message = Arc::new(s.to_string());
        let task = Task::new(task_message);
        tx.send(task).await?;
    }

    // Once all tasks are sent, drop tx to close the channel.
    // This will cause the worker's loop to terminate, ending the worker tasks as a result.
    drop(tx);

    // At this point, the main task ends, but the program continues running until all background tasks are complete.
    Ok(())
}

実行結果

a


b

c


d


e


f


a


b


c


d


e


f


a


b


c


d


e


f


a


b


c


d


e


f


a


b


c


d


e


f


a ←ここで終了

ソースコード


6.2 golang で Semaphor を実装してみる

このページの問題を解く

golang は channel をつかった同期をもっている。

これを使って Semaphor を実装せよ。

libary を使わずに自分で実装すること

書いたコード

  • セマフォの実装
    • セマフォは固定バッファサイズのチャネルとして実装されている。
    • Acquireメソッドでは、チャネルに値を送信することで許可を取得する。
    • Releaseメソッドでは、チャネルから値を受信することで許可を解放する。
  • セマフォの応用
    • 実装したセマフォは同時にリソースにアクセスできるゴルーチンの数を制限するのに使用できる
    • このコードでは、リソースを消費する重い処理をtime.Sleepで模倣している
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Semaphore is a type representing a semaphore.
type Semaphore chan struct{}

// NewSemaphore creates a new semaphore with the specified number of permits.
func NewSemaphore(n int) Semaphore {
    return make(Semaphore, n)
}

// Acquire obtains one permit from the semaphore.
func (s Semaphore) Acquire() {
    s <- struct{}{} // Send to the channel to acquire a permit. This will block if the channel is full.
}

// Release releases one permit back to the semaphore.
func (s Semaphore) Release() {
    <-s // Receive from the channel to release a permit.
}

func heavyProcess(id int, sema Semaphore, wg *sync.WaitGroup) {
    defer wg.Done() // Decrement the counter of the WaitGroup as this goroutine completes.

    sema.Acquire()       // Acquire a permit from the semaphore.
    defer sema.Release() // Release the permit when this function exits.

    // Simulate some heavy work.
    fmt.Printf("Goroutine %d is starting heavy processing\n", id)
    processTime := time.Duration(rand.Intn(3)+1) * time.Second
    time.Sleep(processTime) // Representing heavy work just by sleeping here.
    fmt.Printf("Goroutine %d has completed heavy processing in %v\n", id, processTime)
}

func main() {
    rand.Seed(time.Now().UnixNano()) // Initialize the random seed.
    sema := NewSemaphore(3)          // Only 3 goroutines are allowed to run concurrently.
    var wg sync.WaitGroup            // WaitGroup for waiting for all goroutines to complete.

    for i := 0; i < 10; i++ {
        wg.Add(1) // Increment the WaitGroup counter for each goroutine.
        go heavyProcess(i, sema, &wg)
    }

    wg.Wait() // Wait for all goroutines to complete.
    fmt.Println("All goroutines have completed processing")
}

実行結果

Goroutine 3 is starting heavy processing
Goroutine 5 is starting heavy processing
Goroutine 9 is starting heavy processing
Goroutine 3 has completed heavy processing in 1s
Goroutine 6 is starting heavy processing
Goroutine 5 has completed heavy processing in 2s
Goroutine 7 is starting heavy processing
Goroutine 7 has completed heavy processing in 1s
Goroutine 8 is starting heavy processing
Goroutine 9 has completed heavy processing in 3s
Goroutine 2 is starting heavy processing
Goroutine 6 has completed heavy processing in 2s
Goroutine 0 is starting heavy processing
Goroutine 8 has completed heavy processing in 1s
Goroutine 4 is starting heavy processing
Goroutine 0 has completed heavy processing in 1s
Goroutine 1 is starting heavy processing
Goroutine 2 has completed heavy processing in 2s
Goroutine 1 has completed heavy processing in 1s
Goroutine 4 has completed heavy processing in 1s
All goroutines have completed processing

6.3 Dead Lock

このページの問題を解く

激むず