This is an assignment written for Noroff University to simulate different CPU scheduler methods.
FIFO
from openpyxl import load_workbook
workbook = load_workbook(filename='cpu-scheduling.xlsx')
sheet = workbook.active
all_data = {}
#This program was initially written across several files. It used a "CoreConfig.py" file to hold the class Info, a "CoreFunctions.py", a get_data.py
#as well as the four scheduling algorithms and finally a main.py file. The Config held the class Info, which used a Borg Deisgn Pattern class
#to hold all the variables the program needed, while letting any file access and update the variables.
#CoreFunctions held all the functions needed to simulate the core
#The main file was written to run the algorithm the user would input.
#This allowed the program to be very modular, and changing an algorithm required very little editing.
#This also allows the program to be robust, as it can handle a change in the data source without breaking.
#Due to the way the assessment is delivered, I could not keep the program in its original form, and this is why it looks the way it looks.
#I have simply copy and pasted the "CoreConfig", "CoreFunctions" and "get_data" into each algorithm. This lets each run as it should on its own.
#This is written simply to explain why the code might look messy.
def create_process():
# A function to order the data from excel into a dictionary using the PID as a key, and a list as the values
# Then placing those dictionaries into a larger dictionary, letting the program access the info it needs, using the PID as reference
for row in sheet.iter_rows(min_row=2, values_only=True):
processID = row[0]
process = [row[1], row[2], row[3], 0]
processToDict = {processID: process}
all_data.update(processToDict)
class Info():
# A class made to have several variables needed in the program globally available
# Originally used a Singleton Design Pattern, but opted for a Borg Design Pattern instead, as it functioned better for my program
instance = None
def __new__(cls):
if cls.instance is None:
cls.instance = super(Info, cls).__new__(cls)
return cls.instance
PID = 0
referenceDict = all_data
timeUnit = 0
queueDict = {}
activeCore = {}
queuePrint = ''
waitPrint = ''
instructionLoad = 0
processInfo = []
queueSize = len(queueDict)
coreStatus = len(activeCore)
completed = {}
q = 4
timeInCore = 0
def processCore():
# Prints status of the "core", counts down instruction load and counts up time in core by one time slice
# Time in core is only used for the Round Robin algorithm
inf = Info()
activeCore = inf.activeCore
timeUnit = inf.timeUnit
queuePrint = inf.queuePrint
PID = next(iter(activeCore))
startingInst = inf.referenceDict[PID][1]
print('Time Unit
timeUnit, PID, startingInst, inf.instructionLoad, queuePrint))
inf.instructionLoad -= 1
inf.timeInCore += 1
# If the process is finished, the process is removed from the "core", and creates an entry in the completed dictionary, for reference
if inf.instructionLoad == 0:
del inf.activeCore[PID]
inf.completed[PID] = inf.referenceDict[PID]
def enterCore():
# A function used to move the active process into the "core".
inf = Info()
PID = inf.PID
queueDict = inf.queueDict
activeCore = inf.activeCore
inf.timeInCore = 0
# If there is no queue, the process is entered directly into the core
if len(queueDict) == 0:
inf.activeCore[PID] = inf.referenceDict[PID]
# If the PID chosen is in the queue, the function moves the process from the queue to the "core"
elif inf.PID in inf.queueDict:
inf.activeCore[PID] = inf.queueDict[PID]
del inf.queueDict[PID]
# Sets current instruction load
if activeCore:
inf.instructionLoad = activeCore[PID][1]
def processWait():
# A Function to manage the queue
inf = Info()
referenceDict = inf.referenceDict
timeUnit = inf.timeUnit
activeCore = inf.activeCore
queueDict = inf.queueDict
PID = inf.PID
# Iterates through all processes, and checks if they should arrive in this timeslice. If it should, it adds it to the queue
# It also checks if the process in question is currently in the core, and if it is, it removes it from the queue again.
# This is done so the queue does not contain a process currently being processed.
# Also changes the print message depending on the size of the queue
for check in referenceDict:
arrivalTime = referenceDict[check][0]
if arrivalTime == timeUnit:
inf.queueDict[check] = inf.referenceDict[check]
if activeCore:
if queueDict[check] == activeCore[PID]:
del inf.queueDict[check]
if len(queueDict) == 0:
inf.queuePrint = 'No queue.'
else:
inf.queuePrint = '
def system_cycle():
# A function used to represent one time unit.
# It adds one to the timer, then processes the current process in the core, and checks and updates the queue
inf = Info()
inf.timeUnit += 1
processWait()
processCore()
def check_Shortest():
# A simple function used to set the active PID to be the process with the shortest instruction load that is also in queue
inf = Info()
shortestJobPID = min(inf.queueDict, key=lambda k: inf.queueDict[k][1])
inf.PID = shortestJobPID
def check_Priority():
# The same function as check_shortest, but used for priority
inf = Info()
lowestPrio = min(inf.queueDict, key=lambda k: inf.queueDict[k][2])
inf.PID = lowestPrio
def time_slice_check():
# Checks if the process currently in the core has been there for the alloted timeslices
# If it has, it removes the process from the core, and places it back in the queue
# Updates the process` instruction load accordingly
# Ticks through one timeslice, to simulate the context switch
# Finally resets the instruction load, required for the loop to function
inf = Info()
PID = inf.PID
if (inf.timeInCore == inf.q):
if inf.activeCore:
inf.queueDict[PID] = inf.activeCore[PID]
del inf.activeCore[PID]
inf.queueDict[PID][1] = inf.instructionLoad
inf.timeUnit += 1
processWait()
inf.instructionLoad = 0
def check_q():
# Simple check to find the first process in the queue with an instruction load shorter than the alotted timeslice
inf = Info()
withinQ = np.where(lambda k: inf.queueDict[k][1] <= inf.q)
if withinQ[0] != 0:
inf.PID = int(withinQ[0])
def run_fifo():
inf = Info()
completed = inf.completed
referenceDict = inf.referenceDict
print('system running')
while len(completed) != len(referenceDict):
for process in referenceDict:
inf.PID = process
coreStatus = inf.coreStatus
if coreStatus == 0:
enterCore()
while inf.instructionLoad > 0:
system_cycle()
if inf.PID < len(completed):
break
print('system finished')
create_process()
run_fifo()
SJF
from openpyxl import load_workbook
workbook = load_workbook(filename='cpu-scheduling.xlsx')
sheet = workbook.active
all_data = {}
def create_process():
#A function to order the data from excel into a dictionary using the PID as a key, and a list as the values
#Then placing those dictionaries into a larger dictionary, letting the program access the info it needs, using the PID as reference
for row in sheet.iter_rows(min_row=2, values_only=True):
processID = row[0]
process = [row[1], row[2], row[3], 0]
processToDict = {processID:process}
all_data.update(processToDict)
class Info():
# A class made to have several variables needed in the program globally available
# Originally used a Singleton Design Pattern, but opted for a Borg Design Pattern instead, as it functioned better for my program
instance = None
def __new__(cls):
if cls.instance is None:
cls.instance = super(Info, cls).__new__(cls)
return cls.instance
PID = 0
referenceDict = all_data
timeUnit = 0
queueDict = {}
activeCore = {}
queuePrint = ''
waitPrint = ''
instructionLoad = 0
processInfo = []
queueSize = len(queueDict)
coreStatus = len(activeCore)
completed = {}
q = 4
timeInCore = 0
def processCore():
#Prints status of the "core", counts down instruction load and counts up time in core by one time slice
#Time in core is only used for the Round Robin algorithm
inf = Info()
activeCore = inf.activeCore
timeUnit = inf.timeUnit
queuePrint = inf.queuePrint
PID = next(iter(activeCore))
startingInst = inf.referenceDict[PID][1]
print('Time Unit
inf.instructionLoad -= 1
inf.timeInCore += 1
#If the process is finished, the process is removed from the "core", and creates an entry in the completed dictionary, for reference
if inf.instructionLoad == 0:
del inf.activeCore[PID]
inf.completed[PID] = inf.referenceDict[PID]
def enterCore():
#A function used to move the active process into the "core".
inf = Info()
PID = inf.PID
queueDict = inf.queueDict
activeCore = inf.activeCore
inf.timeInCore = 0
#If there is no queue, the process is entered directly into the core
if len(queueDict) == 0:
inf.activeCore[PID] = inf.referenceDict[PID]
#If the PID chosen is in the queue, the function moves the process from the queue to the "core"
elif inf.PID in inf.queueDict:
inf.activeCore[PID] = inf.queueDict[PID]
del inf.queueDict[PID]
#Sets current instruction load
if activeCore:
inf.instructionLoad = activeCore[PID][1]
def processWait():
#A Function to manage the queue
inf = Info()
referenceDict = inf.referenceDict
timeUnit = inf.timeUnit
activeCore = inf.activeCore
queueDict = inf.queueDict
PID = inf.PID
#Iterates through all processes, and checks if they should arrive in this timeslice. If it should, it adds it to the queue
#It also checks if the process in question is currently in the core, and if it is, it removes it from the queue again.
#This is done so the queue does not contain a process currently being processed.
#Also changes the print message depending on the size of the queue
for check in referenceDict:
arrivalTime = referenceDict[check][0]
if arrivalTime == timeUnit:
inf.queueDict[check] = inf.referenceDict[check]
if activeCore:
if queueDict[check] == activeCore[PID]:
del inf.queueDict[check]
if len(queueDict) == 0:
inf.queuePrint = 'No queue.'
else:
inf.queuePrint = '
def system_cycle():
#A function used to represent one time unit.
#It adds one to the timer, then processes the current process in the core, and checks and updates the queue
inf = Info()
inf.timeUnit += 1
processWait()
processCore()
def check_Shortest():
#A simple function used to set the active PID to be the process with the shortest instruction load that is also in queue
inf = Info()
shortestJobPID = min(inf.queueDict, key = lambda k: inf.queueDict[k][1])
inf.PID = shortestJobPID
def check_Priority():
#The same function as check_shortest, but used for priority
inf = Info()
lowestPrio = min(inf.queueDict, key = lambda k: inf.queueDict[k][2])
inf.PID = lowestPrio
def time_slice_check():
#Checks if the process currently in the core has been there for the alloted timeslices
#If it has, it removes the process from the core, and places it back in the queue
#Updates the process` instruction load accordingly
#Ticks through one timeslice, to simulate the context switch
#Finally resets the instruction load, required for the loop to function
inf = Info()
PID = inf.PID
if (inf.timeInCore == inf.q):
if inf.activeCore:
inf.queueDict[PID] = inf.activeCore[PID]
del inf.activeCore[PID]
inf.queueDict[PID][1] = inf.instructionLoad
inf.timeUnit += 1
processWait()
inf.instructionLoad = 0
def check_q():
#Simple check to find the first process in the queue with an instruction load shorter than the alotted timeslice
inf = Info()
withinQ = np.where(lambda k: inf.queueDict[k][1] <= inf.q)
if withinQ[0] != 0:
inf.PID = int(withinQ[0])
def run_SJF():
inf = Info()
completed = inf.completed
referenceDict = inf.referenceDict
print('system running')
while len(completed) != len(referenceDict):
for process in referenceDict:
inf.PID = process
coreStatus = inf.coreStatus
if coreStatus == 0:
if len(inf.queueDict) != 0:
check_Shortest()
enterCore()
while inf.instructionLoad > 0:
system_cycle()
if inf.PID < len(completed):
break
print('system finished')
create_process()
run_SJF()
Priority
from openpyxl import load_workbook
workbook = load_workbook(filename='cpu-scheduling.xlsx')
sheet = workbook.active
all_data = {}
def create_process():
for row in sheet.iter_rows(min_row=2, values_only=True):
processID = row[0]
process = [row[1], row[2], row[3], 0]
processToDict = {processID:process}
all_data.update(processToDict)
class Info():
instance = None
def __new__(cls):
if cls.instance is None:
cls.instance = super(Info, cls).__new__(cls)
return cls.instance
PID = 0
referenceDict = all_data
timeUnit = 0
queueDict = {}
activeCore = {}
queuePrint = ''
waitPrint = ''
instructionLoad = 0
processInfo = []
queueSize = len(queueDict)
coreStatus = len(activeCore)
completed = {}
q = 4
timeInCore = 0
def processCore():
inf = Info()
activeCore = inf.activeCore
timeUnit = inf.timeUnit
queuePrint = inf.queuePrint
PID = next(iter(activeCore))
startingInst = inf.referenceDict[PID][1]
print('Time Unit
inf.instructionLoad -= 1
inf.timeInCore += 1
if inf.instructionLoad == 0:
del inf.activeCore[PID]
inf.completed[PID] = inf.referenceDict[PID]
def enterCore():
inf = Info()
PID = inf.PID
queueDict = inf.queueDict
activeCore = inf.activeCore
inf.timeInCore = 0
if len(queueDict) == 0:
inf.activeCore[PID] = inf.referenceDict[PID]
elif inf.PID in inf.queueDict:
inf.activeCore[PID] = inf.queueDict[PID]
del inf.queueDict[PID]
if activeCore:
inf.instructionLoad = activeCore[PID][1]
def processWait():
inf = Info()
referenceDict = inf.referenceDict
timeUnit = inf.timeUnit
activeCore = inf.activeCore
queueDict = inf.queueDict
PID = inf.PID
for check in referenceDict:
arrivalTime = referenceDict[check][0]
if arrivalTime == timeUnit:
inf.queueDict[check] = inf.referenceDict[check]
if activeCore:
if queueDict[check] == activeCore[PID]:
del inf.queueDict[check]
if len(queueDict) == 0:
inf.queuePrint = 'No queue.'
else:
inf.queuePrint = '
def system_cycle():
inf = Info()
inf.timeUnit += 1
processWait()
processCore()
def check_Shortest():
inf = Info()
shortestJobPID = min(inf.queueDict, key = lambda k: inf.queueDict[k][1])
inf.PID = shortestJobPID
def check_Priority():
inf = Info()
lowestPrio = min(inf.queueDict, key = lambda k: inf.queueDict[k][2])
inf.PID = lowestPrio
def time_slice_check():
inf = Info()
PID = inf.PID
if (inf.timeInCore == inf.q):
if inf.activeCore:
inf.queueDict[PID] = inf.activeCore[PID]
del inf.activeCore[PID]
inf.queueDict[PID][1] = inf.instructionLoad
inf.timeUnit += 1
processWait()
inf.instructionLoad = 0
def check_q():
inf = Info()
withinQ = np.where(lambda k: inf.queueDict[k][1] <= inf.q)
if withinQ[0] != 0:
inf.PID = int(withinQ[0])
def run_Priority():
inf = Info()
completed = inf.completed
referenceDict = inf.referenceDict
print('system running')
while len(completed) != len(referenceDict):
for process in referenceDict:
inf.PID = process
coreStatus = inf.coreStatus
if coreStatus == 0:
if len(inf.queueDict) != 0:
check_Priority()
enterCore()
while inf.instructionLoad > 0:
system_cycle()
if inf.PID < len(completed):
break
print('system finished')
create_process()
run_Priority()
Round Robin
from openpyxl import load_workbook
import numpy as np
workbook = load_workbook(filename='cpu-scheduling.xlsx')
sheet = workbook.active
all_data = {}
def create_process():
for row in sheet.iter_rows(min_row=2, values_only=True):
processID = row[0]
process = [row[1], row[2], row[3], 0]
processToDict = {processID:process}
all_data.update(processToDict)
class Info():
instance = None
def __new__(cls):
if cls.instance is None:
cls.instance = super(Info, cls).__new__(cls)
return cls.instance
PID = 0
referenceDict = all_data
timeUnit = 0
queueDict = {}
activeCore = {}
queuePrint = ''
waitPrint = ''
instructionLoad = 0
processInfo = []
queueSize = len(queueDict)
coreStatus = len(activeCore)
completed = {}
q = 4
timeInCore = 0
def processCore():
inf = Info()
activeCore = inf.activeCore
timeUnit = inf.timeUnit
queuePrint = inf.queuePrint
PID = next(iter(activeCore))
startingInst = inf.referenceDict[PID][1]
print('Time Unit
inf.instructionLoad -= 1
inf.timeInCore += 1
if inf.instructionLoad == 0:
del inf.activeCore[PID]
inf.completed[PID] = inf.referenceDict[PID]
def enterCore():
inf = Info()
PID = inf.PID
queueDict = inf.queueDict
activeCore = inf.activeCore
inf.timeInCore = 0
if len(queueDict) == 0:
inf.activeCore[PID] = inf.referenceDict[PID]
elif inf.PID in inf.queueDict:
inf.activeCore[PID] = inf.queueDict[PID]
del inf.queueDict[PID]
if activeCore:
inf.instructionLoad = activeCore[PID][1]
def processWait():
inf = Info()
referenceDict = inf.referenceDict
timeUnit = inf.timeUnit
activeCore = inf.activeCore
queueDict = inf.queueDict
PID = inf.PID
for check in referenceDict:
arrivalTime = referenceDict[check][0]
if arrivalTime == timeUnit:
inf.queueDict[check] = inf.referenceDict[check]
if activeCore:
if queueDict[check] == activeCore[PID]:
del inf.queueDict[check]
if len(queueDict) == 0:
inf.queuePrint = 'No queue.'
else:
inf.queuePrint = '
def system_cycle():
inf = Info()
inf.timeUnit += 1
processWait()
processCore()
def check_Shortest():
inf = Info()
shortestJobPID = min(inf.queueDict, key = lambda k: inf.queueDict[k][1])
inf.PID = shortestJobPID
def check_Priority():
inf = Info()
lowestPrio = min(inf.queueDict, key = lambda k: inf.queueDict[k][2])
inf.PID = lowestPrio
def time_slice_check():
inf = Info()
PID = inf.PID
if (inf.timeInCore == inf.q):
if inf.activeCore:
inf.queueDict[PID] = inf.activeCore[PID]
del inf.activeCore[PID]
inf.queueDict[PID][1] = inf.instructionLoad
inf.timeUnit += 1
processWait()
inf.instructionLoad = 0
def check_q():
inf = Info()
withinQ = np.where(lambda k: inf.queueDict[k][1] <= inf.q)
if withinQ[0] != 0:
inf.PID = int(withinQ[0])
def run_Priority():
inf = Info()
completed = inf.completed
referenceDict = inf.referenceDict
print('system running')
while len(completed) != len(referenceDict):
for process in referenceDict:
inf.PID = process
coreStatus = inf.coreStatus
if coreStatus == 0:
if len(inf.queueDict) != 0:
check_Priority()
enterCore()
while inf.instructionLoad > 0:
system_cycle()
if inf.PID < len(completed):
break
print('system finished')
create_process()
run_Priority()
Leave a Reply