This is an assignment written for Noroff University to simulate different CPU scheduler methods.

FIFO

from openpyxl import load_workbook  
workbook = load_workbook(filename='cpu-scheduling.xlsx')  
sheet = workbook.active

all_data = {}

#This program was initially written across several files. It used a "CoreConfig.py" file to hold the class Info, a "CoreFunctions.py", a get_data.py  
#as well as the four scheduling algorithms and finally a main.py file. The Config held the class Info, which used a Borg Deisgn Pattern class  
#to hold all the variables the program needed, while letting any file access and update the variables.  
#CoreFunctions held all the functions needed to simulate the core  
#The main file was written to run the algorithm the user would input.  
#This allowed the program to be very modular, and changing an algorithm required very little editing.  
#This also allows the program to be robust, as it can handle a change in the data source without breaking.

#Due to the way the assessment is delivered, I could not keep the program in its original form, and this is why it looks the way it looks.  
#I have simply copy and pasted the "CoreConfig", "CoreFunctions" and "get_data" into each algorithm. This lets each run as it should on its own.  
#This is written simply to explain why the code might look messy.

def create_process():  
# A function to order the data from excel into a dictionary using the PID as a key, and a list as the values  
# Then placing those dictionaries into a larger dictionary, letting the program access the info it needs, using the PID as reference

for row in sheet.iter_rows(min_row=2, values_only=True):  
processID = row[0]  
process = [row[1], row[2], row[3], 0]  
processToDict = {processID: process}  
all_data.update(processToDict)

  

class Info():  
# A class made to have several variables needed in the program globally available  
# Originally used a Singleton Design Pattern, but opted for a Borg Design Pattern instead, as it functioned better for my program  
instance = None

def __new__(cls):  
if cls.instance is None:  
cls.instance = super(Info, cls).__new__(cls)  
return cls.instance

PID = 0  
referenceDict = all_data  
timeUnit = 0  
queueDict = {}  
activeCore = {}  
queuePrint = ''  
waitPrint = ''  
instructionLoad = 0  
processInfo = []  
queueSize = len(queueDict)  
coreStatus = len(activeCore)  
completed = {}  
q = 4  
timeInCore = 0

  

def processCore():  
# Prints status of the "core", counts down instruction load and counts up time in core by one time slice  
# Time in core is only used for the Round Robin algorithm  
inf = Info()  
activeCore = inf.activeCore  
timeUnit = inf.timeUnit  
queuePrint = inf.queuePrint  
PID = next(iter(activeCore))  
startingInst = inf.referenceDict[PID][1]  
print('Time Unit 
timeUnit, PID, startingInst, inf.instructionLoad, queuePrint))  
inf.instructionLoad -= 1  
inf.timeInCore += 1  
# If the process is finished, the process is removed from the "core", and creates an entry in the completed dictionary, for reference  
if inf.instructionLoad == 0:  
del inf.activeCore[PID]  
inf.completed[PID] = inf.referenceDict[PID]

  

def enterCore():  
# A function used to move the active process into the "core".

inf = Info()  
PID = inf.PID  
queueDict = inf.queueDict  
activeCore = inf.activeCore  
inf.timeInCore = 0  
# If there is no queue, the process is entered directly into the core  
if len(queueDict) == 0:  
inf.activeCore[PID] = inf.referenceDict[PID]  
# If the PID chosen is in the queue, the function moves the process from the queue to the "core"  
elif inf.PID in inf.queueDict:  
inf.activeCore[PID] = inf.queueDict[PID]  
del inf.queueDict[PID]  
# Sets current instruction load  
if activeCore:  
inf.instructionLoad = activeCore[PID][1]

  

def processWait():  
# A Function to manage the queue  
inf = Info()  
referenceDict = inf.referenceDict  
timeUnit = inf.timeUnit  
activeCore = inf.activeCore  
queueDict = inf.queueDict  
PID = inf.PID

# Iterates through all processes, and checks if they should arrive in this timeslice. If it should, it adds it to the queue  
# It also checks if the process in question is currently in the core, and if it is, it removes it from the queue again.  
# This is done so the queue does not contain a process currently being processed.  
# Also changes the print message depending on the size of the queue  
for check in referenceDict:  
arrivalTime = referenceDict[check][0]  
if arrivalTime == timeUnit:  
inf.queueDict[check] = inf.referenceDict[check]  
if activeCore:  
if queueDict[check] == activeCore[PID]:  
del inf.queueDict[check]  
if len(queueDict) == 0:  
inf.queuePrint = 'No queue.'  
else:  
inf.queuePrint = '

  

def system_cycle():  
# A function used to represent one time unit.  
# It adds one to the timer, then processes the current process in the core, and checks and updates the queue  
inf = Info()  
inf.timeUnit += 1  
processWait()  
processCore()

  

def check_Shortest():  
# A simple function used to set the active PID to be the process with the shortest instruction load that is also in queue  
inf = Info()  
shortestJobPID = min(inf.queueDict, key=lambda k: inf.queueDict[k][1])  
inf.PID = shortestJobPID

  

def check_Priority():  
# The same function as check_shortest, but used for priority  
inf = Info()  
lowestPrio = min(inf.queueDict, key=lambda k: inf.queueDict[k][2])  
inf.PID = lowestPrio

  

def time_slice_check():  
# Checks if the process currently in the core has been there for the alloted timeslices  
# If it has, it removes the process from the core, and places it back in the queue  
# Updates the process` instruction load accordingly  
# Ticks through one timeslice, to simulate the context switch  
# Finally resets the instruction load, required for the loop to function  
inf = Info()  
PID = inf.PID

if (inf.timeInCore == inf.q):  
if inf.activeCore:  
inf.queueDict[PID] = inf.activeCore[PID]  
del inf.activeCore[PID]  
inf.queueDict[PID][1] = inf.instructionLoad  
inf.timeUnit += 1  
processWait()  
inf.instructionLoad = 0

  

def check_q():  
# Simple check to find the first process in the queue with an instruction load shorter than the alotted timeslice  
inf = Info()  
withinQ = np.where(lambda k: inf.queueDict[k][1] <= inf.q)  
if withinQ[0] != 0:  
inf.PID = int(withinQ[0])

def run_fifo():  
inf = Info()  
completed = inf.completed  
referenceDict = inf.referenceDict

print('system running')  
while len(completed) != len(referenceDict):  
for process in referenceDict:  
inf.PID = process  
coreStatus = inf.coreStatus  
if coreStatus == 0:  
enterCore()  
while inf.instructionLoad > 0:  
system_cycle()  
if inf.PID < len(completed):  
break

print('system finished')

create_process()  
run_fifo()

SJF

from openpyxl import load_workbook  
workbook = load_workbook(filename='cpu-scheduling.xlsx')  
sheet = workbook.active

all_data = {}

def create_process():  
#A function to order the data from excel into a dictionary using the PID as a key, and a list as the values  
#Then placing those dictionaries into a larger dictionary, letting the program access the info it needs, using the PID as reference

for row in sheet.iter_rows(min_row=2, values_only=True):

processID = row[0]  
process = [row[1], row[2], row[3], 0]  
processToDict = {processID:process}  
all_data.update(processToDict)

  

class Info():  
# A class made to have several variables needed in the program globally available  
# Originally used a Singleton Design Pattern, but opted for a Borg Design Pattern instead, as it functioned better for my program  
instance = None  
def __new__(cls):  
if cls.instance is None:  
cls.instance = super(Info, cls).__new__(cls)  
return cls.instance

PID = 0  
referenceDict = all_data  
timeUnit = 0  
queueDict = {}  
activeCore = {}  
queuePrint = ''  
waitPrint = ''  
instructionLoad = 0  
processInfo = []  
queueSize = len(queueDict)  
coreStatus = len(activeCore)  
completed = {}  
q = 4  
timeInCore = 0

def processCore():  
#Prints status of the "core", counts down instruction load and counts up time in core by one time slice  
#Time in core is only used for the Round Robin algorithm  
inf = Info()  
activeCore = inf.activeCore  
timeUnit = inf.timeUnit  
queuePrint = inf.queuePrint  
PID = next(iter(activeCore))  
startingInst = inf.referenceDict[PID][1]  
print('Time Unit 
inf.instructionLoad -= 1  
inf.timeInCore += 1  
#If the process is finished, the process is removed from the "core", and creates an entry in the completed dictionary, for reference  
if inf.instructionLoad == 0:  
del inf.activeCore[PID]  
inf.completed[PID] = inf.referenceDict[PID]

def enterCore():  
#A function used to move the active process into the "core".

inf = Info()  
PID = inf.PID  
queueDict = inf.queueDict  
activeCore = inf.activeCore  
inf.timeInCore = 0  
#If there is no queue, the process is entered directly into the core  
if len(queueDict) == 0:  
inf.activeCore[PID] = inf.referenceDict[PID]  
#If the PID chosen is in the queue, the function moves the process from the queue to the "core"  
elif inf.PID in inf.queueDict:  
inf.activeCore[PID] = inf.queueDict[PID]  
del inf.queueDict[PID]  
#Sets current instruction load  
if activeCore:  
inf.instructionLoad = activeCore[PID][1]

def processWait():  
#A Function to manage the queue  
inf = Info()  
referenceDict = inf.referenceDict  
timeUnit = inf.timeUnit  
activeCore = inf.activeCore  
queueDict = inf.queueDict  
PID = inf.PID

#Iterates through all processes, and checks if they should arrive in this timeslice. If it should, it adds it to the queue  
#It also checks if the process in question is currently in the core, and if it is, it removes it from the queue again.  
#This is done so the queue does not contain a process currently being processed.  
#Also changes the print message depending on the size of the queue  
for check in referenceDict:  
arrivalTime = referenceDict[check][0]  
if arrivalTime == timeUnit:  
inf.queueDict[check] = inf.referenceDict[check]  
if activeCore:  
if queueDict[check] == activeCore[PID]:  
del inf.queueDict[check]  
if len(queueDict) == 0:  
inf.queuePrint = 'No queue.'  
else:  
inf.queuePrint = '

  
  

def system_cycle():  
#A function used to represent one time unit.  
#It adds one to the timer, then processes the current process in the core, and checks and updates the queue  
inf = Info()  
inf.timeUnit += 1  
processWait()  
processCore()

def check_Shortest():  
#A simple function used to set the active PID to be the process with the shortest instruction load that is also in queue  
inf = Info()  
shortestJobPID = min(inf.queueDict, key = lambda k: inf.queueDict[k][1])  
inf.PID = shortestJobPID

def check_Priority():  
#The same function as check_shortest, but used for priority  
inf = Info()  
lowestPrio = min(inf.queueDict, key = lambda k: inf.queueDict[k][2])  
inf.PID = lowestPrio

def time_slice_check():  
#Checks if the process currently in the core has been there for the alloted timeslices  
#If it has, it removes the process from the core, and places it back in the queue  
#Updates the process` instruction load accordingly  
#Ticks through one timeslice, to simulate the context switch  
#Finally resets the instruction load, required for the loop to function  
inf = Info()  
PID = inf.PID

if (inf.timeInCore == inf.q):  
if inf.activeCore:  
inf.queueDict[PID] = inf.activeCore[PID]  
del inf.activeCore[PID]  
inf.queueDict[PID][1] = inf.instructionLoad  
inf.timeUnit += 1  
processWait()  
inf.instructionLoad = 0

def check_q():  
#Simple check to find the first process in the queue with an instruction load shorter than the alotted timeslice  
inf = Info()  
withinQ = np.where(lambda k: inf.queueDict[k][1] <= inf.q)  
if withinQ[0] != 0:  
inf.PID = int(withinQ[0])

  
  

def run_SJF():  
inf = Info()  
completed = inf.completed  
referenceDict = inf.referenceDict

print('system running')  
while len(completed) != len(referenceDict):  
for process in referenceDict:  
inf.PID = process  
coreStatus = inf.coreStatus  
if coreStatus == 0:  
if len(inf.queueDict) != 0:  
check_Shortest()  
enterCore()  
while inf.instructionLoad > 0:  
system_cycle()  
if inf.PID < len(completed):  
break

print('system finished')

create_process()  
run_SJF()

Priority

from openpyxl import load_workbook  
workbook = load_workbook(filename='cpu-scheduling.xlsx')  
sheet = workbook.active

all_data = {}

def create_process():

for row in sheet.iter_rows(min_row=2, values_only=True):

processID = row[0]  
process = [row[1], row[2], row[3], 0]  
processToDict = {processID:process}  
all_data.update(processToDict)

  

class Info():  
instance = None  
def __new__(cls):  
if cls.instance is None:  
cls.instance = super(Info, cls).__new__(cls)  
return cls.instance

PID = 0  
referenceDict = all_data  
timeUnit = 0  
queueDict = {}  
activeCore = {}  
queuePrint = ''  
waitPrint = ''  
instructionLoad = 0  
processInfo = []  
queueSize = len(queueDict)  
coreStatus = len(activeCore)  
completed = {}  
q = 4  
timeInCore = 0

def processCore():  
inf = Info()  
activeCore = inf.activeCore  
timeUnit = inf.timeUnit  
queuePrint = inf.queuePrint  
PID = next(iter(activeCore))  
startingInst = inf.referenceDict[PID][1]  
print('Time Unit 
inf.instructionLoad -= 1  
inf.timeInCore += 1  
if inf.instructionLoad == 0:  
del inf.activeCore[PID]  
inf.completed[PID] = inf.referenceDict[PID]

def enterCore():

inf = Info()  
PID = inf.PID  
queueDict = inf.queueDict  
activeCore = inf.activeCore  
inf.timeInCore = 0  
if len(queueDict) == 0:  
inf.activeCore[PID] = inf.referenceDict[PID]  
elif inf.PID in inf.queueDict:  
inf.activeCore[PID] = inf.queueDict[PID]  
del inf.queueDict[PID]  
if activeCore:  
inf.instructionLoad = activeCore[PID][1]

def processWait():  
inf = Info()  
referenceDict = inf.referenceDict  
timeUnit = inf.timeUnit  
activeCore = inf.activeCore  
queueDict = inf.queueDict  
PID = inf.PID

for check in referenceDict:  
arrivalTime = referenceDict[check][0]  
if arrivalTime == timeUnit:  
inf.queueDict[check] = inf.referenceDict[check]  
if activeCore:  
if queueDict[check] == activeCore[PID]:  
del inf.queueDict[check]  
if len(queueDict) == 0:  
inf.queuePrint = 'No queue.'  
else:  
inf.queuePrint = '

  
  

def system_cycle():  
inf = Info()

inf.timeUnit += 1  
processWait()  
processCore()

def check_Shortest():  
inf = Info()  
shortestJobPID = min(inf.queueDict, key = lambda k: inf.queueDict[k][1])  
inf.PID = shortestJobPID

def check_Priority():  
inf = Info()  
lowestPrio = min(inf.queueDict, key = lambda k: inf.queueDict[k][2])  
inf.PID = lowestPrio

def time_slice_check():  
inf = Info()  
PID = inf.PID

if (inf.timeInCore == inf.q):  
if inf.activeCore:  
inf.queueDict[PID] = inf.activeCore[PID]  
del inf.activeCore[PID]  
inf.queueDict[PID][1] = inf.instructionLoad  
inf.timeUnit += 1  
processWait()  
inf.instructionLoad = 0

def check_q():  
inf = Info()  
withinQ = np.where(lambda k: inf.queueDict[k][1] <= inf.q)  
if withinQ[0] != 0:  
inf.PID = int(withinQ[0])

  

def run_Priority():  
inf = Info()  
completed = inf.completed  
referenceDict = inf.referenceDict

print('system running')  
while len(completed) != len(referenceDict):  
for process in referenceDict:  
inf.PID = process  
coreStatus = inf.coreStatus  
if coreStatus == 0:  
if len(inf.queueDict) != 0:  
check_Priority()  
enterCore()  
while inf.instructionLoad > 0:  
system_cycle()  
if inf.PID < len(completed):  
break

print('system finished')

create_process()  
run_Priority()

Round Robin

from openpyxl import load_workbook
import numpy as np
workbook = load_workbook(filename='cpu-scheduling.xlsx')
sheet = workbook.active

all_data = {}

def create_process():

    for row in sheet.iter_rows(min_row=2, values_only=True):

        processID = row[0]
        process = [row[1], row[2], row[3], 0]
        processToDict = {processID:process}
        all_data.update(processToDict)


class Info():
    instance = None
    def __new__(cls):
        if cls.instance is None:
            cls.instance = super(Info, cls).__new__(cls)
        return cls.instance

    PID = 0
    referenceDict = all_data
    timeUnit = 0
    queueDict = {}
    activeCore = {}
    queuePrint = ''
    waitPrint = ''
    instructionLoad = 0
    processInfo = []
    queueSize = len(queueDict)
    coreStatus = len(activeCore)
    completed = {}
    q = 4
    timeInCore = 0

def processCore():
    inf = Info()
    activeCore = inf.activeCore
    timeUnit = inf.timeUnit
    queuePrint = inf.queuePrint
    PID = next(iter(activeCore))
    startingInst = inf.referenceDict[PID][1]
    print('Time Unit 
    inf.instructionLoad -= 1
    inf.timeInCore += 1
    if inf.instructionLoad == 0:
        del inf.activeCore[PID]
        inf.completed[PID] = inf.referenceDict[PID]

def enterCore():

    inf = Info()
    PID = inf.PID
    queueDict = inf.queueDict
    activeCore = inf.activeCore
    inf.timeInCore = 0
    if len(queueDict) == 0:
        inf.activeCore[PID] = inf.referenceDict[PID]
    elif inf.PID in inf.queueDict:
        inf.activeCore[PID] = inf.queueDict[PID]
        del inf.queueDict[PID]
    if activeCore:
        inf.instructionLoad = activeCore[PID][1]

def processWait():
    inf = Info()
    referenceDict = inf.referenceDict
    timeUnit = inf.timeUnit
    activeCore = inf.activeCore
    queueDict = inf.queueDict
    PID = inf.PID

    for check in referenceDict:
        arrivalTime = referenceDict[check][0]
        if arrivalTime == timeUnit:
            inf.queueDict[check] = inf.referenceDict[check]
            if activeCore:
                if queueDict[check] == activeCore[PID]:
                    del inf.queueDict[check]
    if len(queueDict) == 0:
        inf.queuePrint = 'No queue.'
    else:
        inf.queuePrint = '



def system_cycle():
    inf = Info()

    inf.timeUnit += 1
    processWait()
    processCore()

def check_Shortest():
    inf = Info()
    shortestJobPID = min(inf.queueDict, key = lambda k: inf.queueDict[k][1])
    inf.PID = shortestJobPID

def check_Priority():
    inf = Info()
    lowestPrio = min(inf.queueDict, key = lambda k: inf.queueDict[k][2])
    inf.PID = lowestPrio

def time_slice_check():
    inf = Info()
    PID = inf.PID

    if (inf.timeInCore == inf.q):
        if inf.activeCore:
            inf.queueDict[PID] = inf.activeCore[PID]
            del inf.activeCore[PID]
            inf.queueDict[PID][1] = inf.instructionLoad
            inf.timeUnit += 1
            processWait()
            inf.instructionLoad = 0

def check_q():
    inf = Info()
    withinQ = np.where(lambda k: inf.queueDict[k][1] <= inf.q)
    if withinQ[0] != 0:
        inf.PID = int(withinQ[0])


def run_Priority():
    inf = Info()
    completed = inf.completed
    referenceDict = inf.referenceDict

    print('system running')
    while len(completed) != len(referenceDict):
        for process in referenceDict:
            inf.PID = process
            coreStatus = inf.coreStatus
            if coreStatus == 0:
                if len(inf.queueDict) != 0:
                    check_Priority()
                enterCore()
            while inf.instructionLoad > 0:
                system_cycle()
            if inf.PID < len(completed):
                break

    print('system finished')

create_process()
run_Priority()