Module Commands
Expand source code
# ****************************************************
# Copyright: 2020 Team Visualizer (Carlos Miguel Sayao, Connor Bettermann, Issac Greenfield, Madeleine Elyea, Tanner Sundwall, Ted Moore, Prerna Agarwal)
# License: MIT
# ****************************************************
# Purpose: Global command buffer queue data structure.
# When a command is registered, it is place on the queue.
# If the queue is at max size, discard the command.
# When a command is to be executed, dequeue and return command.
# If queue is empty, enqueue a NONE to indicate no activity.
# (Leader is constantly sending IDLE to show presence.)
# Sources: Python doc: queue, https://docs.python.org/3.7/library/queue.html
# GFG queue tutorial, https://www.geeksforgeeks.org/queue-in-python/
# ****************************************************
from datetime import datetime
from datetime import timedelta
from ErrorHandler import ErrorHandler
from OrderCode import *
from queue import Queue
from constants import *
from MessageMan import MessageMan
def init():
global CommandQueue
CommandQueue = Queue(maxsize=MAX_QUEUE_SIZE)
header = MessageMan(ErrorHandler()).compose_message("", START_TIME, OrderCode.NONE)
CommandQueue.put(header)
global IsLeader
IsLeader = False
class Commands:
def __init__(self, error_handler=None):
self.eh = error_handler
def enqueue(self, to_queue):
""" Places new commands on the queue.
Args:
to_queue (dict): Command message to add.
Returns:
None
"""
# We don't want to try adding another one if the queue is full.
if CommandQueue.qsize() < MAX_QUEUE_SIZE:
CommandQueue.put_nowait(to_queue)
def dequeue(self) -> dict:
""" Pulls oldest command from the front of the queue.
Args:
None
Returns:
dict: The dictionary that holds the command information
"""
if CommandQueue.empty():
header = MessageMan(self.eh).compose_message("", START_TIME, OrderCode.NONE)
self.enqueue(header)
return header
return CommandQueue.get_nowait()
def flush_queue(self):
while not CommandQueue.empty():
self.dequeue()
Functions
def init()
-
Expand source code
def init(): global CommandQueue CommandQueue = Queue(maxsize=MAX_QUEUE_SIZE) header = MessageMan(ErrorHandler()).compose_message("", START_TIME, OrderCode.NONE) CommandQueue.put(header) global IsLeader IsLeader = False
Classes
class Commands (error_handler=None)
-
Expand source code
class Commands: def __init__(self, error_handler=None): self.eh = error_handler def enqueue(self, to_queue): """ Places new commands on the queue. Args: to_queue (dict): Command message to add. Returns: None """ # We don't want to try adding another one if the queue is full. if CommandQueue.qsize() < MAX_QUEUE_SIZE: CommandQueue.put_nowait(to_queue) def dequeue(self) -> dict: """ Pulls oldest command from the front of the queue. Args: None Returns: dict: The dictionary that holds the command information """ if CommandQueue.empty(): header = MessageMan(self.eh).compose_message("", START_TIME, OrderCode.NONE) self.enqueue(header) return header return CommandQueue.get_nowait() def flush_queue(self): while not CommandQueue.empty(): self.dequeue()
Methods
def dequeue(self)
-
Pulls oldest command from the front of the queue.
Args
None
Returns
dict
- The dictionary that holds the command information
Expand source code
def dequeue(self) -> dict: """ Pulls oldest command from the front of the queue. Args: None Returns: dict: The dictionary that holds the command information """ if CommandQueue.empty(): header = MessageMan(self.eh).compose_message("", START_TIME, OrderCode.NONE) self.enqueue(header) return header return CommandQueue.get_nowait()
def enqueue(self, to_queue)
-
Places new commands on the queue.
Args
to_queue
:dict
- Command message to add.
Returns
None
Expand source code
def enqueue(self, to_queue): """ Places new commands on the queue. Args: to_queue (dict): Command message to add. Returns: None """ # We don't want to try adding another one if the queue is full. if CommandQueue.qsize() < MAX_QUEUE_SIZE: CommandQueue.put_nowait(to_queue)
def flush_queue(self)
-
Expand source code
def flush_queue(self): while not CommandQueue.empty(): self.dequeue()