Module EtherTalk
Expand source code
# ****************************************************
# Purpose: Manages the ethernet connection between pis.
# Set up hostname, port.
# Establish connection.
# Listen for messages.
# Send messages.
# To connect pis, add to constants.py:KNOWN_IPS
#
# Sources: Disable WiFi, https://raspberrypi.stackexchange.com/q/43720
# Send/Receive files, https://www.thepythoncode.com/article/send-receive-files-using-sockets-python
# Netifaces: https://pypi.org/project/netifaces/, Has open-source MIT license
# Socket doc for 3.7.6, https://docs.python.org/3.7/library/socket.html
# Concurrent Futures, https://docs.python.org/3.7/library/concurrent.futures.html
#
# Debug [+] process happening
# Notes: [*] network process
# [!] important
# [&] address
# [x] something broke...
# Asserts show places with high probability of failure along
# with 'notes'.
#
# ****************************************************
import os
import sys
import time
import socket
import pickle
import traceback
import threading
from ast import literal_eval
from datetime import datetime
from datetime import timedelta
import netifaces as ni
from OrderCode import *
from constants import *
from ErrorHandler import ErrorHandler
from enum import IntEnum
Talk = IntEnum('Talk', 'HEADER')
class EtherTalk:
def __init__(self, is_leader=True, error_handler=None, test=False, verbose=False):
""" Class manages ethernet communication.
Args:
error_handler (ErrorHandler class) Logging class for errors.
is_leader (bool): Indicate if leader or not, default True.
test (bool): Indicate if testing on localhost, default False.
verbose (bool): Indicate verbose output, default False.
Returns:
None
"""
self.test = test
self.eh = error_handler
self.v = verbose
self.ip = None
self.serverip = None
self.hostip = None
self.netmask = None
# Scan network to instantiate self.serverip and self.ip
self.scan()
self.port = 5001
self.socket = None
# Listener ip, reachable on all ips
self.server_host = "0.0.0.0"
def get_status(self):
print(f"[!] self.ip: {self.ip}")
print(f"[!] self.serverip: {self.serverip}\n")
def __isup(self, addr) -> int:
""" Check to see if an address exists on network.
Args:
addr (str): address to ping
Returns:
int: 0 if down, 1 if up
"""
# Subshell command. "-c" stops after sending 1 packet, redirect
# output to the almighty blackhole.
# os.system() returns subshell returncode.
res = os.system("ping -c 1 -q " + addr + " >> /dev/null 2>&1")
if res == 0:
print(f"[+] {addr} is up!")
return 1
else:
print(f"[x] {addr} is down")
return 0
def scan(self) -> bool:
""" Scan for active network hosts in constants.KNOWN_IPS.
If found, set self.serverip to found ip, return True. There is someone
on LAN, but we don't yet know if it is accepting connections.
Must be used after instantiating EtherTalk class.
Args:
None
Returns:
bool: True indicates success, False indicates nobody there.
"""
if self.test == True:
# Loopback device for local
interface = ni.ifaddresses('lo')[ni.AF_INET][0]
self.ip = interface['addr']
self.serverip = "localhost"
return True
else:
# Rpi specific ethernet interface 'eth0'.
# This fails and ret 0 if there is no ethernet conn.
try:
interface = ni.ifaddresses('eth0')[ni.AF_INET][0]
self.ip = interface['addr']
except:
self.ip = None
self.serverip = None
return False
print("[*] Scanning for ips on lan")
print("[*] My ip:", self.ip)
for ip in KNOWN_IPS:
print("[*] Pinging", ip)
if self.ip == ip:
print("[!] SAME!")
continue
if self.__isup(ip):
print(f"[+] Active host found at {ip}.\n")
self.serverip = ip
return True
print()
return False
def ping(self, timeout_time=TIMEOUT_DURATION) -> int:
""" Scan network, if active found we know we are not the first to start.
Args:
None
Returns:
int: 0 on connection, 1 on nothing found
"""
loop_end_time = datetime.now() + timedelta(minutes=timeout_time)
# Loop for TIMEOUT_DURATION minutes or until we first connection.
while datetime.now() < loop_end_time:
# Try to set serverip
print("Ping trying to connect")
if self.scan():
return 0
return 1
def __connect(self) -> int:
""" Attempt to establish connection preceding a send(). Two cases:
If we have a serverip (follower), connect to to it with our
serverip and port. If we fail to connect to that follower, it might
have gone down, return 1.
If we did not have a follower, scan network. If there is someone on
the LAN, set serverip to that host, return 0. If the scan comes
back negative, we know there is **still** nobody out there.
Args:
None
Returns:
int
0 on successful connection.
1 someone was there before, but our connection failed,
likely went down.
2 nobody at all on our LAN.
"""
time.sleep(.1)
# Create TCP server socket, set to reuse address and disable Nagle
self.socket = socket.socket()
# Set timeout for receiving confirmation
self.socket.settimeout(CONF_TIMEOUT)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# If we a follower, try to connect to it
if self.serverip is not None:
try: # Fails when connection refused
print(f"[+] Connecting to {self.serverip}:{self.port}")
self.socket.connect((self.serverip, self.port))
print("[+] Connected.")
return 0
except Exception as e:
# Our serverip is now failing, that means follower is down
print(f"[x] {str(e)}")
# Set serverip to None because we no longer have follower
self.serverip = None
# We have no follower, no need for socket
self.socket.close()
# Return 1 indicating we lost a follower
return 1
else: # If we don't have a follower, try scanning for one
# Try scanning for ips
stat = self.scan()
# Scan returns False, there is nobody on LAN
if stat == False:
self.socket.close()
return 2
# Scan returns True, we have a follower after having had no follower.
# We then want to try to connect again so our send function can try
# again.
elif stat == True:
try:
self.socket.connect((self.serverip, self.port))
return 0
# The new serverip we acquired didn't work! Just call it
# and return 1...
except Exception as e:
self.serverip = None
self.socket.close()
return 1
print("[[]] EtherTalk says: 'We should not've gotten here!!'")
sys.exit()
def __close(self) -> int:
"""
Close our socket
Returns : (int) 0 success, 1 error
"""
try:
self.socket.close()
return 0
except:
return 1
def __getconf(self, kind) -> None:
"""Wait for confirmation.
Args:
kind (Talk IntEnum): type of conf we want
"""
print(f"[*] Listening for {kind.name} conf")
try:
print("Entered getconf", time.time())
conf = self.socket.recv(BUFFER_SIZE)
conf = pickle.loads(conf)
if conf == kind:
print(f"[!] Received {Talk(conf)}")
print(f"[+] Correctly received conf")
else:
print(f"[x] Did not correctly receive conf")
print(f"[x] Wanted {kind}, conf received: {Talk(conf)}")
except EOFError:
print("EOFerror", conf)
except ConnectionResetError as e:
print(f"[!] Connection closed before handshake could finish!")
except Exception as e:
print(f"[x] Could not get conf {e}")
def __sendconf(self, kind, client_socket) -> None:
"""Send confirmation.
Args:
kind (Talk IntEnum): type of conf to send
client_socket (socket obj): send conf through client connection
"""
print(f"[*] Sending conf {kind.name}")
conf = pickle.dumps(kind.value)
print("KIND",kind.value)
print("Entered sendconf", time.time())
client_socket.sendall(conf)
def send(self, header) -> int:
""" Send message to the other pi.
Check that message is valid, then connect and send message. Then wait
for confirmation.
Args:
header (dict): dictionary of header info
Returns:
int
0 on successful connection.
1 someone was there before, but our connection failed,
likely went down.
2 nobody at all on our LAN.
3 connection was ok, but send failed.
Example:
`from OrderCode import OrderCode`
`EtherTalkObject.send(OrderCode.SHUT_DOWN)`
"""
assert(type(header['code']) == type(OrderCode.SHUT_DOWN.value))
print(f"\n[+] Sending {header['code']}")
# Keep trying to connect until CONF_TIMEOUT
stat = self.__connect()
tf = time.time() + CONF_TIMEOUT
while stat != 0 and time.time() < tf:
stat = self.__connect()
print(f"[!] send connect stat {stat}")
# If we've connect was not successful upon timeout, return stat
if stat > 0:
return stat
try:
# Pickle our payload, send it, then confirm
self.socket.sendall(pickle.dumps(f"{header}{HEADER}"))
self.__getconf(Talk.HEADER)
return 0
except Exception as e:
print(f"[x] Failed to send {e}")
self.socket.close()
return 3
def send_file(self, header, filename) -> int:
""" Send file to connected pi. Attempts to establish connection, then
sends file. If no connection (no serverip), then scans once to attempt
setting serverip to active host on network.
Args:
header (dict): Generated by MessageMan. We need to add filename
and filsize.
filename (str): Path to file, relative to sender. This will be
stripped away.
Returns:
int
0 on successful connection.
1 someone was there before, but our connection failed,
likely went down.
2 nobody at all on our LAN.
3 connection was ok, but send failed.
message =
{
"code" : (OrderCode),
"sent_time" : (float)),
"start_time" : (start_time),
"message" : (input_message)
}
"""
print(f"\n[!] Sending {filename}")
assert(os.path.isfile(filename))
# Keep trying to connect until CONF_TIMEOUT
stat = self.__connect()
tf = time.time() + CONF_TIMEOUT
while stat != 0 and (tf-time.time()) > 0:
stat = self.__connect()
print(f"[!] send connect stat {stat}")
# If we've connect was not successful upon timeout, return stat
if stat > 0:
return stat
# Put file name and size into header (to let follower know we're
# about to send a file)
header['file'] = filename if filename else ""
header['filesize'] = os.path.getsize(filename) if filename else ""
filesize = header['filesize']
try:
start = time.time()
# Send our header declaring a file is to be sent
package = pickle.dumps(f"{header}{HEADER}")
self.socket.sendall(package)
self.__getconf(Talk.HEADER)
print("[*] Sending file.")
f = open(filename, 'rb')
bytes_sent = self.socket.sendfile(f)
print(f"[!] Bytes sent {bytes_sent}")
if bytes_sent - filesize != 0:
print("[x] Something went wrong with our sending!")
f.close()
end = time.time()
print(f"[!] Sent in {end-start}s")
print(f"[+] {filename} sent")
self.socket.close()
return 0
except Exception as e:
self.eh.log_error(e)
print(f"[x] Sending file error: {e}")
print("[x] Failed to send file.")
return 3
def listen(self, path=None) -> (int, dict):
""" Listener that spawns threads for PiMan. When a header is received,
spawn a client thread to manage connection.
Args:
path (str): optional path to write received file to.
Returns:
tuple (int, dict): int 0 Success, > 0 failure,
"""
print("\nListen")
# Create TCP server socket, set to reuse address and disable Nagle
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# Bind socket to our local address and port
self.socket.bind((self.server_host, self.port))
# Accept 5 connections
self.socket.listen(10)
print(f"[*] Listening as {self.server_host}:{self.port}")
# Accept connection and create new socket representing connection
try:
client_socket, address = self.socket.accept()
except Exception as e:
return 2, {'code': OrderCode.FAILED.value}
print(f"[+] {address} is connected.")
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
status, header = self.__client_thread(client_socket, path)
client_socket.close()
print("[x] Closed client socket.")
self.socket.close()
print("[x] Closed main socket.")
return (status, header)
def __client_thread(self, client_socket, path=None) -> (int, dict):
""" Client thread to handle input. Receive with given client socket.
Args:
client_socket (socket): Client socket
path (str): optional path to write to. Default to current working
dir.
Returns:
tuple (int, dict): 0 success, 1 error and dict of header.
"""
print(f"[*] Client thread")
package = client_socket.recv(BUFFER_SIZE)
# NOTE: Sometimes file gets sent with header. Turned off Nagle's
# algorithm, but for very small files it may send the file
# with the header.
header, payload = pickle.loads(package).split(HEADER)
header = literal_eval(header) # Load the header
code = header['code'] # Evaluate code
print(f"[+] Code: {header['code']}")
if header['message']:
print(f"[!] Message: {header['message']}")
# Confirm payload secure
print("[*] Sending conf")
self.__sendconf(Talk.HEADER, client_socket)
# If we're getting a file, pass to receive_file()
if code == OrderCode.SEND_FILE.value:
return self.__receive_file(header, payload, client_socket, path)
return 0, header
def __receive_file(self, header, payload, client_socket, path=None) -> (int, dict):
""" Receive file from pi. Listens on all ips. If not successful,
dictionary will be empty.
Args:
header (dict): header information
payload (str): files sometimes catted to header
client_socket (socket):
path (str): optional. Default to cwd.
Returns:
tuple (int, dict): 0 success, 1 error and header
"""
# Receive file info using client socket, not server socket
filename = header['file']
filesize = header['filesize']
filename = os.path.basename(filename)
filename = path + filename
filesize = int(filesize)
print(f"[!] Receiving {filename}, size {filesize}")
try:
start = time.time()
f = open(filename, 'wb')
while True:
bytes_read = client_socket.recv(BUFFER_SIZE)
if not bytes_read:
break
f.write(bytes_read)
f.close()
end = time.time()
print(f"[!] Received in {end-start}s")
print(f"[+] Received {filename}")
return (0, header)
except Exception as e:
self.eh.log_error(e)
print("[x] Failed to receive file.")
return (1, {})
Classes
class EtherTalk (is_leader=True, error_handler=None, test=False, verbose=False)
-
Class manages ethernet communication.
Args
- error_handler (ErrorHandler class) Logging class for errors.
is_leader
:bool
- Indicate if leader or not, default True.
test
:bool
- Indicate if testing on localhost, default False.
verbose
:bool
- Indicate verbose output, default False.
Returns
None
Expand source code
class EtherTalk: def __init__(self, is_leader=True, error_handler=None, test=False, verbose=False): """ Class manages ethernet communication. Args: error_handler (ErrorHandler class) Logging class for errors. is_leader (bool): Indicate if leader or not, default True. test (bool): Indicate if testing on localhost, default False. verbose (bool): Indicate verbose output, default False. Returns: None """ self.test = test self.eh = error_handler self.v = verbose self.ip = None self.serverip = None self.hostip = None self.netmask = None # Scan network to instantiate self.serverip and self.ip self.scan() self.port = 5001 self.socket = None # Listener ip, reachable on all ips self.server_host = "0.0.0.0" def get_status(self): print(f"[!] self.ip: {self.ip}") print(f"[!] self.serverip: {self.serverip}\n") def __isup(self, addr) -> int: """ Check to see if an address exists on network. Args: addr (str): address to ping Returns: int: 0 if down, 1 if up """ # Subshell command. "-c" stops after sending 1 packet, redirect # output to the almighty blackhole. # os.system() returns subshell returncode. res = os.system("ping -c 1 -q " + addr + " >> /dev/null 2>&1") if res == 0: print(f"[+] {addr} is up!") return 1 else: print(f"[x] {addr} is down") return 0 def scan(self) -> bool: """ Scan for active network hosts in constants.KNOWN_IPS. If found, set self.serverip to found ip, return True. There is someone on LAN, but we don't yet know if it is accepting connections. Must be used after instantiating EtherTalk class. Args: None Returns: bool: True indicates success, False indicates nobody there. """ if self.test == True: # Loopback device for local interface = ni.ifaddresses('lo')[ni.AF_INET][0] self.ip = interface['addr'] self.serverip = "localhost" return True else: # Rpi specific ethernet interface 'eth0'. # This fails and ret 0 if there is no ethernet conn. try: interface = ni.ifaddresses('eth0')[ni.AF_INET][0] self.ip = interface['addr'] except: self.ip = None self.serverip = None return False print("[*] Scanning for ips on lan") print("[*] My ip:", self.ip) for ip in KNOWN_IPS: print("[*] Pinging", ip) if self.ip == ip: print("[!] SAME!") continue if self.__isup(ip): print(f"[+] Active host found at {ip}.\n") self.serverip = ip return True print() return False def ping(self, timeout_time=TIMEOUT_DURATION) -> int: """ Scan network, if active found we know we are not the first to start. Args: None Returns: int: 0 on connection, 1 on nothing found """ loop_end_time = datetime.now() + timedelta(minutes=timeout_time) # Loop for TIMEOUT_DURATION minutes or until we first connection. while datetime.now() < loop_end_time: # Try to set serverip print("Ping trying to connect") if self.scan(): return 0 return 1 def __connect(self) -> int: """ Attempt to establish connection preceding a send(). Two cases: If we have a serverip (follower), connect to to it with our serverip and port. If we fail to connect to that follower, it might have gone down, return 1. If we did not have a follower, scan network. If there is someone on the LAN, set serverip to that host, return 0. If the scan comes back negative, we know there is **still** nobody out there. Args: None Returns: int 0 on successful connection. 1 someone was there before, but our connection failed, likely went down. 2 nobody at all on our LAN. """ time.sleep(.1) # Create TCP server socket, set to reuse address and disable Nagle self.socket = socket.socket() # Set timeout for receiving confirmation self.socket.settimeout(CONF_TIMEOUT) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # If we a follower, try to connect to it if self.serverip is not None: try: # Fails when connection refused print(f"[+] Connecting to {self.serverip}:{self.port}") self.socket.connect((self.serverip, self.port)) print("[+] Connected.") return 0 except Exception as e: # Our serverip is now failing, that means follower is down print(f"[x] {str(e)}") # Set serverip to None because we no longer have follower self.serverip = None # We have no follower, no need for socket self.socket.close() # Return 1 indicating we lost a follower return 1 else: # If we don't have a follower, try scanning for one # Try scanning for ips stat = self.scan() # Scan returns False, there is nobody on LAN if stat == False: self.socket.close() return 2 # Scan returns True, we have a follower after having had no follower. # We then want to try to connect again so our send function can try # again. elif stat == True: try: self.socket.connect((self.serverip, self.port)) return 0 # The new serverip we acquired didn't work! Just call it # and return 1... except Exception as e: self.serverip = None self.socket.close() return 1 print("[[]] EtherTalk says: 'We should not've gotten here!!'") sys.exit() def __close(self) -> int: """ Close our socket Returns : (int) 0 success, 1 error """ try: self.socket.close() return 0 except: return 1 def __getconf(self, kind) -> None: """Wait for confirmation. Args: kind (Talk IntEnum): type of conf we want """ print(f"[*] Listening for {kind.name} conf") try: print("Entered getconf", time.time()) conf = self.socket.recv(BUFFER_SIZE) conf = pickle.loads(conf) if conf == kind: print(f"[!] Received {Talk(conf)}") print(f"[+] Correctly received conf") else: print(f"[x] Did not correctly receive conf") print(f"[x] Wanted {kind}, conf received: {Talk(conf)}") except EOFError: print("EOFerror", conf) except ConnectionResetError as e: print(f"[!] Connection closed before handshake could finish!") except Exception as e: print(f"[x] Could not get conf {e}") def __sendconf(self, kind, client_socket) -> None: """Send confirmation. Args: kind (Talk IntEnum): type of conf to send client_socket (socket obj): send conf through client connection """ print(f"[*] Sending conf {kind.name}") conf = pickle.dumps(kind.value) print("KIND",kind.value) print("Entered sendconf", time.time()) client_socket.sendall(conf) def send(self, header) -> int: """ Send message to the other pi. Check that message is valid, then connect and send message. Then wait for confirmation. Args: header (dict): dictionary of header info Returns: int 0 on successful connection. 1 someone was there before, but our connection failed, likely went down. 2 nobody at all on our LAN. 3 connection was ok, but send failed. Example: `from OrderCode import OrderCode` `EtherTalkObject.send(OrderCode.SHUT_DOWN)` """ assert(type(header['code']) == type(OrderCode.SHUT_DOWN.value)) print(f"\n[+] Sending {header['code']}") # Keep trying to connect until CONF_TIMEOUT stat = self.__connect() tf = time.time() + CONF_TIMEOUT while stat != 0 and time.time() < tf: stat = self.__connect() print(f"[!] send connect stat {stat}") # If we've connect was not successful upon timeout, return stat if stat > 0: return stat try: # Pickle our payload, send it, then confirm self.socket.sendall(pickle.dumps(f"{header}{HEADER}")) self.__getconf(Talk.HEADER) return 0 except Exception as e: print(f"[x] Failed to send {e}") self.socket.close() return 3 def send_file(self, header, filename) -> int: """ Send file to connected pi. Attempts to establish connection, then sends file. If no connection (no serverip), then scans once to attempt setting serverip to active host on network. Args: header (dict): Generated by MessageMan. We need to add filename and filsize. filename (str): Path to file, relative to sender. This will be stripped away. Returns: int 0 on successful connection. 1 someone was there before, but our connection failed, likely went down. 2 nobody at all on our LAN. 3 connection was ok, but send failed. message = { "code" : (OrderCode), "sent_time" : (float)), "start_time" : (start_time), "message" : (input_message) } """ print(f"\n[!] Sending {filename}") assert(os.path.isfile(filename)) # Keep trying to connect until CONF_TIMEOUT stat = self.__connect() tf = time.time() + CONF_TIMEOUT while stat != 0 and (tf-time.time()) > 0: stat = self.__connect() print(f"[!] send connect stat {stat}") # If we've connect was not successful upon timeout, return stat if stat > 0: return stat # Put file name and size into header (to let follower know we're # about to send a file) header['file'] = filename if filename else "" header['filesize'] = os.path.getsize(filename) if filename else "" filesize = header['filesize'] try: start = time.time() # Send our header declaring a file is to be sent package = pickle.dumps(f"{header}{HEADER}") self.socket.sendall(package) self.__getconf(Talk.HEADER) print("[*] Sending file.") f = open(filename, 'rb') bytes_sent = self.socket.sendfile(f) print(f"[!] Bytes sent {bytes_sent}") if bytes_sent - filesize != 0: print("[x] Something went wrong with our sending!") f.close() end = time.time() print(f"[!] Sent in {end-start}s") print(f"[+] {filename} sent") self.socket.close() return 0 except Exception as e: self.eh.log_error(e) print(f"[x] Sending file error: {e}") print("[x] Failed to send file.") return 3 def listen(self, path=None) -> (int, dict): """ Listener that spawns threads for PiMan. When a header is received, spawn a client thread to manage connection. Args: path (str): optional path to write received file to. Returns: tuple (int, dict): int 0 Success, > 0 failure, """ print("\nListen") # Create TCP server socket, set to reuse address and disable Nagle self.socket = socket.socket() self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # Bind socket to our local address and port self.socket.bind((self.server_host, self.port)) # Accept 5 connections self.socket.listen(10) print(f"[*] Listening as {self.server_host}:{self.port}") # Accept connection and create new socket representing connection try: client_socket, address = self.socket.accept() except Exception as e: return 2, {'code': OrderCode.FAILED.value} print(f"[+] {address} is connected.") client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) status, header = self.__client_thread(client_socket, path) client_socket.close() print("[x] Closed client socket.") self.socket.close() print("[x] Closed main socket.") return (status, header) def __client_thread(self, client_socket, path=None) -> (int, dict): """ Client thread to handle input. Receive with given client socket. Args: client_socket (socket): Client socket path (str): optional path to write to. Default to current working dir. Returns: tuple (int, dict): 0 success, 1 error and dict of header. """ print(f"[*] Client thread") package = client_socket.recv(BUFFER_SIZE) # NOTE: Sometimes file gets sent with header. Turned off Nagle's # algorithm, but for very small files it may send the file # with the header. header, payload = pickle.loads(package).split(HEADER) header = literal_eval(header) # Load the header code = header['code'] # Evaluate code print(f"[+] Code: {header['code']}") if header['message']: print(f"[!] Message: {header['message']}") # Confirm payload secure print("[*] Sending conf") self.__sendconf(Talk.HEADER, client_socket) # If we're getting a file, pass to receive_file() if code == OrderCode.SEND_FILE.value: return self.__receive_file(header, payload, client_socket, path) return 0, header def __receive_file(self, header, payload, client_socket, path=None) -> (int, dict): """ Receive file from pi. Listens on all ips. If not successful, dictionary will be empty. Args: header (dict): header information payload (str): files sometimes catted to header client_socket (socket): path (str): optional. Default to cwd. Returns: tuple (int, dict): 0 success, 1 error and header """ # Receive file info using client socket, not server socket filename = header['file'] filesize = header['filesize'] filename = os.path.basename(filename) filename = path + filename filesize = int(filesize) print(f"[!] Receiving {filename}, size {filesize}") try: start = time.time() f = open(filename, 'wb') while True: bytes_read = client_socket.recv(BUFFER_SIZE) if not bytes_read: break f.write(bytes_read) f.close() end = time.time() print(f"[!] Received in {end-start}s") print(f"[+] Received {filename}") return (0, header) except Exception as e: self.eh.log_error(e) print("[x] Failed to receive file.") return (1, {})
Methods
def get_status(self)
-
Expand source code
def get_status(self): print(f"[!] self.ip: {self.ip}") print(f"[!] self.serverip: {self.serverip}\n")
def listen(self, path=None)
-
Listener that spawns threads for PiMan. When a header is received, spawn a client thread to manage connection.
Args
path
:str
- optional path to write received file to.
Returns
tuple
:int
,dict
- int 0 Success, > 0 failure,
Expand source code
def listen(self, path=None) -> (int, dict): """ Listener that spawns threads for PiMan. When a header is received, spawn a client thread to manage connection. Args: path (str): optional path to write received file to. Returns: tuple (int, dict): int 0 Success, > 0 failure, """ print("\nListen") # Create TCP server socket, set to reuse address and disable Nagle self.socket = socket.socket() self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # Bind socket to our local address and port self.socket.bind((self.server_host, self.port)) # Accept 5 connections self.socket.listen(10) print(f"[*] Listening as {self.server_host}:{self.port}") # Accept connection and create new socket representing connection try: client_socket, address = self.socket.accept() except Exception as e: return 2, {'code': OrderCode.FAILED.value} print(f"[+] {address} is connected.") client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) status, header = self.__client_thread(client_socket, path) client_socket.close() print("[x] Closed client socket.") self.socket.close() print("[x] Closed main socket.") return (status, header)
def ping(self, timeout_time=2)
-
Scan network, if active found we know we are not the first to start.
Args
None
Returns
int
- 0 on connection, 1 on nothing found
Expand source code
def ping(self, timeout_time=TIMEOUT_DURATION) -> int: """ Scan network, if active found we know we are not the first to start. Args: None Returns: int: 0 on connection, 1 on nothing found """ loop_end_time = datetime.now() + timedelta(minutes=timeout_time) # Loop for TIMEOUT_DURATION minutes or until we first connection. while datetime.now() < loop_end_time: # Try to set serverip print("Ping trying to connect") if self.scan(): return 0 return 1
def scan(self)
-
Scan for active network hosts in constants.KNOWN_IPS. If found, set self.serverip to found ip, return True. There is someone on LAN, but we don't yet know if it is accepting connections. Must be used after instantiating EtherTalk class.
Args: None
Returns
bool
- True indicates success, False indicates nobody there.
Expand source code
def scan(self) -> bool: """ Scan for active network hosts in constants.KNOWN_IPS. If found, set self.serverip to found ip, return True. There is someone on LAN, but we don't yet know if it is accepting connections. Must be used after instantiating EtherTalk class. Args: None Returns: bool: True indicates success, False indicates nobody there. """ if self.test == True: # Loopback device for local interface = ni.ifaddresses('lo')[ni.AF_INET][0] self.ip = interface['addr'] self.serverip = "localhost" return True else: # Rpi specific ethernet interface 'eth0'. # This fails and ret 0 if there is no ethernet conn. try: interface = ni.ifaddresses('eth0')[ni.AF_INET][0] self.ip = interface['addr'] except: self.ip = None self.serverip = None return False print("[*] Scanning for ips on lan") print("[*] My ip:", self.ip) for ip in KNOWN_IPS: print("[*] Pinging", ip) if self.ip == ip: print("[!] SAME!") continue if self.__isup(ip): print(f"[+] Active host found at {ip}.\n") self.serverip = ip return True print() return False
def send(self, header)
-
Send message to the other pi.
Check that message is valid, then connect and send message. Then wait for confirmation.
Args
header
:dict
- dictionary of header info
Returns
int
- 0 on successful connection.
1
someone
was
there
before
,but
our
connection
failed
,
likely went down. 2 nobody at all on our LAN. 3 connection was ok, but send failed.
Example
from OrderCode import OrderCode
EtherTalkObject.send(OrderCode.SHUT_DOWN)
Expand source code
def send(self, header) -> int: """ Send message to the other pi. Check that message is valid, then connect and send message. Then wait for confirmation. Args: header (dict): dictionary of header info Returns: int 0 on successful connection. 1 someone was there before, but our connection failed, likely went down. 2 nobody at all on our LAN. 3 connection was ok, but send failed. Example: `from OrderCode import OrderCode` `EtherTalkObject.send(OrderCode.SHUT_DOWN)` """ assert(type(header['code']) == type(OrderCode.SHUT_DOWN.value)) print(f"\n[+] Sending {header['code']}") # Keep trying to connect until CONF_TIMEOUT stat = self.__connect() tf = time.time() + CONF_TIMEOUT while stat != 0 and time.time() < tf: stat = self.__connect() print(f"[!] send connect stat {stat}") # If we've connect was not successful upon timeout, return stat if stat > 0: return stat try: # Pickle our payload, send it, then confirm self.socket.sendall(pickle.dumps(f"{header}{HEADER}")) self.__getconf(Talk.HEADER) return 0 except Exception as e: print(f"[x] Failed to send {e}") self.socket.close() return 3
def send_file(self, header, filename)
-
Send file to connected pi. Attempts to establish connection, then sends file. If no connection (no serverip), then scans once to attempt setting serverip to active host on network.
Args
header
:dict
- Generated by MessageMan. We need to add filename and filsize.
filename
:str
- Path to file, relative to sender. This will be stripped away.
Returns
int
- 0 on successful connection.
1
someone
was
there
before
,but
our
connection
failed
,
likely went down. 2 nobody at all on our LAN. 3 connection was ok, but send failed.
message = { "code" : (OrderCode), "sent_time" : (float)), "start_time" : (start_time), "message" : (input_message) }
Expand source code
def send_file(self, header, filename) -> int: """ Send file to connected pi. Attempts to establish connection, then sends file. If no connection (no serverip), then scans once to attempt setting serverip to active host on network. Args: header (dict): Generated by MessageMan. We need to add filename and filsize. filename (str): Path to file, relative to sender. This will be stripped away. Returns: int 0 on successful connection. 1 someone was there before, but our connection failed, likely went down. 2 nobody at all on our LAN. 3 connection was ok, but send failed. message = { "code" : (OrderCode), "sent_time" : (float)), "start_time" : (start_time), "message" : (input_message) } """ print(f"\n[!] Sending {filename}") assert(os.path.isfile(filename)) # Keep trying to connect until CONF_TIMEOUT stat = self.__connect() tf = time.time() + CONF_TIMEOUT while stat != 0 and (tf-time.time()) > 0: stat = self.__connect() print(f"[!] send connect stat {stat}") # If we've connect was not successful upon timeout, return stat if stat > 0: return stat # Put file name and size into header (to let follower know we're # about to send a file) header['file'] = filename if filename else "" header['filesize'] = os.path.getsize(filename) if filename else "" filesize = header['filesize'] try: start = time.time() # Send our header declaring a file is to be sent package = pickle.dumps(f"{header}{HEADER}") self.socket.sendall(package) self.__getconf(Talk.HEADER) print("[*] Sending file.") f = open(filename, 'rb') bytes_sent = self.socket.sendfile(f) print(f"[!] Bytes sent {bytes_sent}") if bytes_sent - filesize != 0: print("[x] Something went wrong with our sending!") f.close() end = time.time() print(f"[!] Sent in {end-start}s") print(f"[+] {filename} sent") self.socket.close() return 0 except Exception as e: self.eh.log_error(e) print(f"[x] Sending file error: {e}") print("[x] Failed to send file.") return 3
class Talk (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var HEADER
-
An enumeration.