Navin Kumar
2 min readJul 9, 2022

--

The Below Code is in Java, and I have implemented the Thread Safe Blocking queue using LinkedList.
also created Consumer and Producer Worker whose responsibility is to create work and consume work using BoundedBlockingQueue.
package com.navin.learn;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class Consumer implements Runnable{

BoundedBlockingQueue boundedBlockingQueue;
private final int stopper;
Consumer(BoundedBlockingQueue boundedBlockingQueue, final int stopper){
this.boundedBlockingQueue = boundedBlockingQueue;
this.stopper = stopper;
}


@Override
public void run() {
try{
while(true){
int x = boundedBlockingQueue.dequeue();
if(x == stopper){
return;
}
System.out.println(Thread.currentThread().getId() + " Thread message consumed : " + x);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

class Producer implements Runnable{

BoundedBlockingQueue boundedBlockingQueue;
private final int stopper;
private final int numberOfStoppers;
Producer(BoundedBlockingQueue boundedBlockingQueue, final int stopper, final int numberOfStoppers){
this.boundedBlockingQueue = boundedBlockingQueue;
this.numberOfStoppers = numberOfStoppers;
this.stopper = stopper;
}


@Override
public void run() {
try{
for (int i = 0; i < 100; i++) {
boundedBlockingQueue.enqueue(i);
}
for (int i = 0; i < numberOfStoppers; i++) {
boundedBlockingQueue.enqueue(stopper);
}
}catch (InterruptedException e){
Thread.currentThread().interrupt();
}
}

}
public class BoundedBlockingQueue {
private static volatile BoundedBlockingQueue boundedBlockingQueue;
private final int capacity;
private List<Integer> quueue;
ReentrantReadWriteLock reentrantReadWriteLock;

private BoundedBlockingQueue(final int capacity){
this.capacity = capacity;
this.quueue = new LinkedList<>();
reentrantReadWriteLock = new ReentrantReadWriteLock();
}

public static BoundedBlockingQueue getInstance(int capacity){
if(boundedBlockingQueue == null){
synchronized (BoundedBlockingQueue.class){
if(boundedBlockingQueue == null){
boundedBlockingQueue = new BoundedBlockingQueue(capacity);
}
}
}
return boundedBlockingQueue;
}

public synchronized void enqueue(int ele) throws InterruptedException {
if(this.quueue.size() == this.capacity){
System.out.println("waiting to message consumed");
wait();
}
reentrantReadWriteLock.writeLock().lock();
this.quueue.add(ele);
reentrantReadWriteLock.writeLock().unlock();
notifyAll();
}

public synchronized int dequeue() throws InterruptedException {
if(this.quueue.isEmpty()){
System.out.println("Message are not present");
wait();
}
reentrantReadWriteLock.writeLock().lock();
int ele = this.quueue.remove(0);
reentrantReadWriteLock.writeLock().unlock();
System.out.println("element { "+ ele +"} is removed...");
notifyAll();
return ele;
}

public synchronized int getSize(){
reentrantReadWriteLock.readLock().lock();
int size = this.quueue.size();
reentrantReadWriteLock.readLock().unlock();
return size;
}

public static void main(String[] args) throws InterruptedException {
int boundLimit = 10;
int numberOfProducers = 4;
int numberOfConsumers = Runtime.getRuntime().availableProcessors();
int stopper = Integer.MAX_VALUE;
int numberOfStoppers = numberOfConsumers / numberOfProducers;
int mod = numberOfConsumers % numberOfProducers;

BoundedBlockingQueue boundedBlockingQueue = BoundedBlockingQueue.getInstance(boundLimit);

for (int i = 1; i < numberOfProducers; i++) {
new Thread(new Producer(boundedBlockingQueue, stopper, numberOfStoppers)).start();
}

for (int j = 0; j < numberOfConsumers; j++) {
new Thread(new Consumer(boundedBlockingQueue, stopper)).start();
}

new Thread(new Producer(boundedBlockingQueue, stopper, numberOfStoppers + mod)).start();
}
}

--

--