Module EtherTalk

Expand source code
# ****************************************************
# Purpose:  Manages the ethernet connection between pis.
#           Set up hostname, port.
#           Establish connection.
#           Listen for messages.
#           Send messages.
#           To connect pis, add to constants.py:KNOWN_IPS
#
# Sources:  Disable WiFi, https://raspberrypi.stackexchange.com/q/43720
#           Send/Receive files, https://www.thepythoncode.com/article/send-receive-files-using-sockets-python
#           Netifaces: https://pypi.org/project/netifaces/, Has open-source MIT license
#           Socket doc for 3.7.6, https://docs.python.org/3.7/library/socket.html
#           Concurrent Futures, https://docs.python.org/3.7/library/concurrent.futures.html
#
# Debug     [+] process happening
# Notes:    [*] network process
#           [!] important
#           [&] address
#           [x] something broke...
#           Asserts show places with high probability of failure along
#           with 'notes'.
#
# ****************************************************

import os
import sys
import time
import socket
import pickle
import traceback
import threading
from ast import literal_eval
from datetime import datetime
from datetime import timedelta
import netifaces as ni
from OrderCode import *
from constants import *
from ErrorHandler import ErrorHandler
from enum import IntEnum


Talk = IntEnum('Talk', 'HEADER')


class EtherTalk:

    def __init__(self, is_leader=True, error_handler=None, test=False, verbose=False):
        """ Class manages ethernet communication.

        Args:
            error_handler (ErrorHandler class) Logging class for errors.
            is_leader (bool): Indicate if leader or not, default True.
            test (bool): Indicate if testing on localhost, default False.
            verbose (bool): Indicate verbose output, default False.

        Returns:
            None
        """
        self.test = test
        self.eh = error_handler
        self.v = verbose
        self.ip = None
        self.serverip = None
        self.hostip = None
        self.netmask = None
        # Scan network to instantiate self.serverip and self.ip
        self.scan()
        self.port = 5001
        self.socket = None
        # Listener ip, reachable on all ips
        self.server_host = "0.0.0.0"

    def get_status(self):
        print(f"[!] self.ip:       {self.ip}")
        print(f"[!] self.serverip: {self.serverip}\n")

    def __isup(self, addr) -> int:
        """ Check to see if an address exists on network.

        Args:
            addr (str): address to ping

        Returns:
            int: 0 if down, 1 if up
        """
        # Subshell command. "-c" stops after sending 1 packet, redirect
        # output to the almighty blackhole.
        # os.system() returns subshell returncode.
        res = os.system("ping -c 1 -q " + addr + " >> /dev/null 2>&1")
        if res == 0:
            print(f"[+] {addr} is up!")
            return 1
        else:
            print(f"[x] {addr} is down")
            return 0

    def scan(self) -> bool:
        """ Scan for active network hosts in constants.KNOWN_IPS.
        If found, set self.serverip to found ip, return True. There is someone
        on LAN, but we don't yet know if it is accepting connections.
        Must be used after instantiating EtherTalk class.

        Args: 
            None

        Returns:
            bool: True indicates success, False indicates nobody there.
        """
        if self.test == True:
            # Loopback device for local
            interface = ni.ifaddresses('lo')[ni.AF_INET][0]
            self.ip = interface['addr']
            self.serverip = "localhost"
            return True
        else:
            # Rpi specific ethernet interface 'eth0'.
            # This fails and ret 0 if there is no ethernet conn.
            try:
                interface = ni.ifaddresses('eth0')[ni.AF_INET][0]
                self.ip = interface['addr']
            except:
                self.ip = None
                self.serverip = None
                return False
        print("[*] Scanning for ips on lan")
        print("[*] My ip:", self.ip)
        for ip in KNOWN_IPS:
            print("[*] Pinging", ip)
            if self.ip == ip:
                print("[!] SAME!")
                continue
            if self.__isup(ip):
                print(f"[+] Active host found at {ip}.\n")
                self.serverip = ip
                return True
        print()
        return False

    def ping(self, timeout_time=TIMEOUT_DURATION) -> int:
        """ Scan network, if active found we know we are not the first to start.

        Args:
            None

        Returns:
            int: 0 on connection, 1 on nothing found
        """
        loop_end_time = datetime.now() + timedelta(minutes=timeout_time)
        # Loop for TIMEOUT_DURATION minutes or until we first connection.
        while datetime.now() < loop_end_time:
            # Try to set serverip
            print("Ping trying to connect")
            if self.scan():
                return 0
        return 1

    def __connect(self) -> int:
        """ Attempt to establish connection preceding a send(). Two cases:

        If we have a serverip (follower), connect to to it with our
        serverip and port. If we fail to connect to that follower, it might
        have gone down, return 1.
        
        If we did not have a follower, scan network. If there is someone on
        the LAN, set serverip to that host, return 0. If the scan comes
        back negative, we know there is **still** nobody out there.

        Args:
            None

        Returns:
            int
        0 on successful connection.
        1 someone was there before, but our connection failed,
        likely went down.
        2 nobody at all on our LAN.
        """
        time.sleep(.1)
        # Create TCP server socket, set to reuse address and disable Nagle
        self.socket = socket.socket()
        # Set timeout for receiving confirmation
        self.socket.settimeout(CONF_TIMEOUT)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        # If we a follower, try to connect to it
        if self.serverip is not None:
            try:  # Fails when connection refused
                print(f"[+] Connecting to {self.serverip}:{self.port}")
                self.socket.connect((self.serverip, self.port))
                print("[+] Connected.")
                return 0
            except Exception as e:
                # Our serverip is now failing, that means follower is down
                print(f"[x] {str(e)}")
                # Set serverip to None because we no longer have follower
                self.serverip = None
                # We have no follower, no need for socket
                self.socket.close()
                # Return 1 indicating we lost a follower
                return 1
        else:   # If we don't have a follower, try scanning for one
            # Try scanning for ips
            stat = self.scan() 
            # Scan returns False, there is nobody on LAN
            if stat == False:
                self.socket.close()
                return 2
            # Scan returns True, we have a follower after having had no follower.
            # We then want to try to connect again so our send function can try
            # again.
            elif stat == True:
                try:
                    self.socket.connect((self.serverip, self.port))
                    return 0
                # The new serverip we acquired didn't work! Just call it
                # and return 1...
                except Exception as e:
                    self.serverip = None
                    self.socket.close()
                    return 1
        print("[[]] EtherTalk says: 'We should not've gotten here!!'")
        sys.exit()
        
    def __close(self) -> int:
        """
        Close our socket

        Returns  : (int) 0 success, 1 error
        """
        try:
            self.socket.close()
            return 0
        except:
            return 1
    
    def __getconf(self, kind) -> None:
        """Wait for confirmation.

        Args:
            kind (Talk IntEnum): type of conf we want
        """
        print(f"[*] Listening for {kind.name} conf")
        try:
            print("Entered getconf", time.time())
            conf = self.socket.recv(BUFFER_SIZE)
            conf = pickle.loads(conf)
            if conf == kind:
                print(f"[!] Received {Talk(conf)}")
                print(f"[+] Correctly received conf")
            else:
                print(f"[x] Did not correctly receive conf")
                print(f"[x] Wanted {kind}, conf received: {Talk(conf)}")
        except EOFError:
            print("EOFerror", conf)
        except ConnectionResetError as e:
            print(f"[!] Connection closed before handshake could finish!")
        except Exception as e:
            print(f"[x] Could not get conf {e}")
    
    def __sendconf(self, kind, client_socket) -> None:
        """Send confirmation.

        Args:
            kind (Talk IntEnum): type of conf to send
            client_socket (socket obj): send conf through client connection
        """
        print(f"[*] Sending conf {kind.name}")
        conf = pickle.dumps(kind.value)
        print("KIND",kind.value)
        print("Entered sendconf", time.time())
        client_socket.sendall(conf)

    def send(self, header) -> int:
        """ Send message to the other pi.
        
        Check that message is valid, then connect and send message. Then wait
        for confirmation.

        Args:
            header (dict): dictionary of header info

        Returns:
            int
        0 on successful connection.
        1 someone was there before, but our connection failed,
        likely went down.
        2 nobody at all on our LAN.
        3 connection was ok, but send failed.

        Example:
            `from OrderCode import OrderCode`
            `EtherTalkObject.send(OrderCode.SHUT_DOWN)`

        """
        assert(type(header['code']) == type(OrderCode.SHUT_DOWN.value))

        print(f"\n[+] Sending {header['code']}")

        # Keep trying to connect until CONF_TIMEOUT
        stat = self.__connect()
        tf = time.time() + CONF_TIMEOUT
        while stat != 0 and time.time() < tf:
            stat = self.__connect()
            print(f"[!] send connect stat {stat}")
        # If we've connect was not successful upon timeout, return stat
        if stat > 0:
            return stat

        try:
            # Pickle our payload, send it, then confirm
            self.socket.sendall(pickle.dumps(f"{header}{HEADER}"))
            self.__getconf(Talk.HEADER)
            return 0
        except Exception as e:
            print(f"[x] Failed to send {e}")
            self.socket.close()
            return 3

    def send_file(self, header, filename) -> int:
        """ Send file to connected pi. Attempts to establish connection, then
        sends file. If no connection (no serverip), then scans once to attempt
        setting serverip to active host on network.

        Args:
            header (dict): Generated by MessageMan. We need to add filename
                and filsize.
            filename (str): Path to file, relative to sender. This will be
                stripped away.

        Returns:
            int
        0 on successful connection.
        1 someone was there before, but our connection failed,
        likely went down.
        2 nobody at all on our LAN.
        3 connection was ok, but send failed.

            message =
                {
                    "code"       : (OrderCode),
                    "sent_time"  : (float)),
                    "start_time" : (start_time),
                    "message"    : (input_message)
                }                
        """
        print(f"\n[!] Sending {filename}")

        assert(os.path.isfile(filename))

        # Keep trying to connect until CONF_TIMEOUT
        stat = self.__connect()
        tf = time.time() + CONF_TIMEOUT
        while stat != 0 and (tf-time.time()) > 0:
            stat = self.__connect()
            print(f"[!] send connect stat {stat}")
        # If we've connect was not successful upon timeout, return stat
        if stat > 0:
            return stat
        
        # Put file name and size into header (to let follower know we're
        # about to send a file)
        header['file'] = filename if filename else ""
        header['filesize'] = os.path.getsize(filename) if filename else ""

        filesize = header['filesize']
        try:
            start = time.time()

            # Send our header declaring a file is to be sent
            package = pickle.dumps(f"{header}{HEADER}")
            self.socket.sendall(package)

            self.__getconf(Talk.HEADER)

            print("[*] Sending file.")
            f = open(filename, 'rb')
            bytes_sent = self.socket.sendfile(f)

            print(f"[!] Bytes sent {bytes_sent}")
            if bytes_sent - filesize != 0:
                print("[x] Something went wrong with our sending!")
            f.close()

            end = time.time()
            print(f"[!] Sent in {end-start}s")

            print(f"[+] {filename} sent")
            self.socket.close()
            return 0
        except Exception as e:
            self.eh.log_error(e)
            print(f"[x] Sending file error: {e}")
            print("[x] Failed to send file.")
            return 3

    def listen(self, path=None) -> (int, dict):
        """ Listener that spawns threads for PiMan. When a header is received,
        spawn a client thread to manage connection.

        Args:
            path (str): optional path to write received file to.

        Returns:
            tuple (int, dict): int 0 Success, > 0 failure,
        """
        print("\nListen")
        # Create TCP server socket, set to reuse address and disable Nagle
        self.socket = socket.socket()
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

        # Bind socket to our local address and port
        self.socket.bind((self.server_host, self.port))
        # Accept 5 connections
        self.socket.listen(10)
        print(f"[*] Listening as {self.server_host}:{self.port}")

        # Accept connection and create new socket representing connection
        try:
            client_socket, address = self.socket.accept()
        except Exception as e:
            return 2, {'code': OrderCode.FAILED.value}

        print(f"[+] {address} is connected.")
        client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

        status, header = self.__client_thread(client_socket, path)

        client_socket.close()
        print("[x] Closed client socket.")
        self.socket.close()
        print("[x] Closed main socket.")
        return (status, header)

    def __client_thread(self, client_socket, path=None) -> (int, dict):
        """ Client thread to handle input. Receive with given client socket.

        Args:
            client_socket (socket): Client socket
            path (str): optional path to write to. Default to current working 
            dir.

        Returns:
            tuple (int, dict): 0 success, 1 error and dict of header.
        """
        print(f"[*] Client thread")

        package = client_socket.recv(BUFFER_SIZE)
        # NOTE: Sometimes file gets sent with header. Turned off Nagle's
        #       algorithm, but for very small files it may send the file
        #       with the header.
        header, payload = pickle.loads(package).split(HEADER)
        header = literal_eval(header)  # Load the header
        code = header['code']  # Evaluate code
        print(f"[+] Code: {header['code']}")
        if header['message']:
            print(f"[!] Message: {header['message']}")
        # Confirm payload secure
        print("[*] Sending conf")
        self.__sendconf(Talk.HEADER, client_socket)
        # If we're getting a file, pass to receive_file()
        if code == OrderCode.SEND_FILE.value:
            return self.__receive_file(header, payload, client_socket, path)

        return 0, header

    def __receive_file(self, header, payload, client_socket, path=None) -> (int, dict):
        """ Receive file from pi. Listens on all ips. If not successful, 
        dictionary will be empty.

        Args:
            header (dict): header information
            payload (str): files sometimes catted to header
            client_socket (socket): 
            path (str): optional. Default to cwd.

        Returns:
            tuple (int, dict): 0 success, 1 error and header
        """
        # Receive file info using client socket, not server socket
        filename = header['file']
        filesize = header['filesize']
        filename = os.path.basename(filename)
        filename = path + filename
        filesize = int(filesize)

        print(f"[!] Receiving {filename}, size {filesize}")

        try:
            start = time.time()

            f = open(filename, 'wb')
            while True:
                bytes_read = client_socket.recv(BUFFER_SIZE)
                if not bytes_read:
                    break
                f.write(bytes_read)
            f.close()

            end = time.time()
            print(f"[!] Received in {end-start}s")

            print(f"[+] Received {filename}")

            return (0, header)
        except Exception as e:
            self.eh.log_error(e)
            print("[x] Failed to receive file.")
            return (1, {})

Classes

class EtherTalk (is_leader=True, error_handler=None, test=False, verbose=False)

Class manages ethernet communication.

Args

error_handler (ErrorHandler class) Logging class for errors.
is_leader : bool
Indicate if leader or not, default True.
test : bool
Indicate if testing on localhost, default False.
verbose : bool
Indicate verbose output, default False.

Returns

None
 
Expand source code
class EtherTalk:

    def __init__(self, is_leader=True, error_handler=None, test=False, verbose=False):
        """ Class manages ethernet communication.

        Args:
            error_handler (ErrorHandler class) Logging class for errors.
            is_leader (bool): Indicate if leader or not, default True.
            test (bool): Indicate if testing on localhost, default False.
            verbose (bool): Indicate verbose output, default False.

        Returns:
            None
        """
        self.test = test
        self.eh = error_handler
        self.v = verbose
        self.ip = None
        self.serverip = None
        self.hostip = None
        self.netmask = None
        # Scan network to instantiate self.serverip and self.ip
        self.scan()
        self.port = 5001
        self.socket = None
        # Listener ip, reachable on all ips
        self.server_host = "0.0.0.0"

    def get_status(self):
        print(f"[!] self.ip:       {self.ip}")
        print(f"[!] self.serverip: {self.serverip}\n")

    def __isup(self, addr) -> int:
        """ Check to see if an address exists on network.

        Args:
            addr (str): address to ping

        Returns:
            int: 0 if down, 1 if up
        """
        # Subshell command. "-c" stops after sending 1 packet, redirect
        # output to the almighty blackhole.
        # os.system() returns subshell returncode.
        res = os.system("ping -c 1 -q " + addr + " >> /dev/null 2>&1")
        if res == 0:
            print(f"[+] {addr} is up!")
            return 1
        else:
            print(f"[x] {addr} is down")
            return 0

    def scan(self) -> bool:
        """ Scan for active network hosts in constants.KNOWN_IPS.
        If found, set self.serverip to found ip, return True. There is someone
        on LAN, but we don't yet know if it is accepting connections.
        Must be used after instantiating EtherTalk class.

        Args: 
            None

        Returns:
            bool: True indicates success, False indicates nobody there.
        """
        if self.test == True:
            # Loopback device for local
            interface = ni.ifaddresses('lo')[ni.AF_INET][0]
            self.ip = interface['addr']
            self.serverip = "localhost"
            return True
        else:
            # Rpi specific ethernet interface 'eth0'.
            # This fails and ret 0 if there is no ethernet conn.
            try:
                interface = ni.ifaddresses('eth0')[ni.AF_INET][0]
                self.ip = interface['addr']
            except:
                self.ip = None
                self.serverip = None
                return False
        print("[*] Scanning for ips on lan")
        print("[*] My ip:", self.ip)
        for ip in KNOWN_IPS:
            print("[*] Pinging", ip)
            if self.ip == ip:
                print("[!] SAME!")
                continue
            if self.__isup(ip):
                print(f"[+] Active host found at {ip}.\n")
                self.serverip = ip
                return True
        print()
        return False

    def ping(self, timeout_time=TIMEOUT_DURATION) -> int:
        """ Scan network, if active found we know we are not the first to start.

        Args:
            None

        Returns:
            int: 0 on connection, 1 on nothing found
        """
        loop_end_time = datetime.now() + timedelta(minutes=timeout_time)
        # Loop for TIMEOUT_DURATION minutes or until we first connection.
        while datetime.now() < loop_end_time:
            # Try to set serverip
            print("Ping trying to connect")
            if self.scan():
                return 0
        return 1

    def __connect(self) -> int:
        """ Attempt to establish connection preceding a send(). Two cases:

        If we have a serverip (follower), connect to to it with our
        serverip and port. If we fail to connect to that follower, it might
        have gone down, return 1.
        
        If we did not have a follower, scan network. If there is someone on
        the LAN, set serverip to that host, return 0. If the scan comes
        back negative, we know there is **still** nobody out there.

        Args:
            None

        Returns:
            int
        0 on successful connection.
        1 someone was there before, but our connection failed,
        likely went down.
        2 nobody at all on our LAN.
        """
        time.sleep(.1)
        # Create TCP server socket, set to reuse address and disable Nagle
        self.socket = socket.socket()
        # Set timeout for receiving confirmation
        self.socket.settimeout(CONF_TIMEOUT)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        # If we a follower, try to connect to it
        if self.serverip is not None:
            try:  # Fails when connection refused
                print(f"[+] Connecting to {self.serverip}:{self.port}")
                self.socket.connect((self.serverip, self.port))
                print("[+] Connected.")
                return 0
            except Exception as e:
                # Our serverip is now failing, that means follower is down
                print(f"[x] {str(e)}")
                # Set serverip to None because we no longer have follower
                self.serverip = None
                # We have no follower, no need for socket
                self.socket.close()
                # Return 1 indicating we lost a follower
                return 1
        else:   # If we don't have a follower, try scanning for one
            # Try scanning for ips
            stat = self.scan() 
            # Scan returns False, there is nobody on LAN
            if stat == False:
                self.socket.close()
                return 2
            # Scan returns True, we have a follower after having had no follower.
            # We then want to try to connect again so our send function can try
            # again.
            elif stat == True:
                try:
                    self.socket.connect((self.serverip, self.port))
                    return 0
                # The new serverip we acquired didn't work! Just call it
                # and return 1...
                except Exception as e:
                    self.serverip = None
                    self.socket.close()
                    return 1
        print("[[]] EtherTalk says: 'We should not've gotten here!!'")
        sys.exit()
        
    def __close(self) -> int:
        """
        Close our socket

        Returns  : (int) 0 success, 1 error
        """
        try:
            self.socket.close()
            return 0
        except:
            return 1
    
    def __getconf(self, kind) -> None:
        """Wait for confirmation.

        Args:
            kind (Talk IntEnum): type of conf we want
        """
        print(f"[*] Listening for {kind.name} conf")
        try:
            print("Entered getconf", time.time())
            conf = self.socket.recv(BUFFER_SIZE)
            conf = pickle.loads(conf)
            if conf == kind:
                print(f"[!] Received {Talk(conf)}")
                print(f"[+] Correctly received conf")
            else:
                print(f"[x] Did not correctly receive conf")
                print(f"[x] Wanted {kind}, conf received: {Talk(conf)}")
        except EOFError:
            print("EOFerror", conf)
        except ConnectionResetError as e:
            print(f"[!] Connection closed before handshake could finish!")
        except Exception as e:
            print(f"[x] Could not get conf {e}")
    
    def __sendconf(self, kind, client_socket) -> None:
        """Send confirmation.

        Args:
            kind (Talk IntEnum): type of conf to send
            client_socket (socket obj): send conf through client connection
        """
        print(f"[*] Sending conf {kind.name}")
        conf = pickle.dumps(kind.value)
        print("KIND",kind.value)
        print("Entered sendconf", time.time())
        client_socket.sendall(conf)

    def send(self, header) -> int:
        """ Send message to the other pi.
        
        Check that message is valid, then connect and send message. Then wait
        for confirmation.

        Args:
            header (dict): dictionary of header info

        Returns:
            int
        0 on successful connection.
        1 someone was there before, but our connection failed,
        likely went down.
        2 nobody at all on our LAN.
        3 connection was ok, but send failed.

        Example:
            `from OrderCode import OrderCode`
            `EtherTalkObject.send(OrderCode.SHUT_DOWN)`

        """
        assert(type(header['code']) == type(OrderCode.SHUT_DOWN.value))

        print(f"\n[+] Sending {header['code']}")

        # Keep trying to connect until CONF_TIMEOUT
        stat = self.__connect()
        tf = time.time() + CONF_TIMEOUT
        while stat != 0 and time.time() < tf:
            stat = self.__connect()
            print(f"[!] send connect stat {stat}")
        # If we've connect was not successful upon timeout, return stat
        if stat > 0:
            return stat

        try:
            # Pickle our payload, send it, then confirm
            self.socket.sendall(pickle.dumps(f"{header}{HEADER}"))
            self.__getconf(Talk.HEADER)
            return 0
        except Exception as e:
            print(f"[x] Failed to send {e}")
            self.socket.close()
            return 3

    def send_file(self, header, filename) -> int:
        """ Send file to connected pi. Attempts to establish connection, then
        sends file. If no connection (no serverip), then scans once to attempt
        setting serverip to active host on network.

        Args:
            header (dict): Generated by MessageMan. We need to add filename
                and filsize.
            filename (str): Path to file, relative to sender. This will be
                stripped away.

        Returns:
            int
        0 on successful connection.
        1 someone was there before, but our connection failed,
        likely went down.
        2 nobody at all on our LAN.
        3 connection was ok, but send failed.

            message =
                {
                    "code"       : (OrderCode),
                    "sent_time"  : (float)),
                    "start_time" : (start_time),
                    "message"    : (input_message)
                }                
        """
        print(f"\n[!] Sending {filename}")

        assert(os.path.isfile(filename))

        # Keep trying to connect until CONF_TIMEOUT
        stat = self.__connect()
        tf = time.time() + CONF_TIMEOUT
        while stat != 0 and (tf-time.time()) > 0:
            stat = self.__connect()
            print(f"[!] send connect stat {stat}")
        # If we've connect was not successful upon timeout, return stat
        if stat > 0:
            return stat
        
        # Put file name and size into header (to let follower know we're
        # about to send a file)
        header['file'] = filename if filename else ""
        header['filesize'] = os.path.getsize(filename) if filename else ""

        filesize = header['filesize']
        try:
            start = time.time()

            # Send our header declaring a file is to be sent
            package = pickle.dumps(f"{header}{HEADER}")
            self.socket.sendall(package)

            self.__getconf(Talk.HEADER)

            print("[*] Sending file.")
            f = open(filename, 'rb')
            bytes_sent = self.socket.sendfile(f)

            print(f"[!] Bytes sent {bytes_sent}")
            if bytes_sent - filesize != 0:
                print("[x] Something went wrong with our sending!")
            f.close()

            end = time.time()
            print(f"[!] Sent in {end-start}s")

            print(f"[+] {filename} sent")
            self.socket.close()
            return 0
        except Exception as e:
            self.eh.log_error(e)
            print(f"[x] Sending file error: {e}")
            print("[x] Failed to send file.")
            return 3

    def listen(self, path=None) -> (int, dict):
        """ Listener that spawns threads for PiMan. When a header is received,
        spawn a client thread to manage connection.

        Args:
            path (str): optional path to write received file to.

        Returns:
            tuple (int, dict): int 0 Success, > 0 failure,
        """
        print("\nListen")
        # Create TCP server socket, set to reuse address and disable Nagle
        self.socket = socket.socket()
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

        # Bind socket to our local address and port
        self.socket.bind((self.server_host, self.port))
        # Accept 5 connections
        self.socket.listen(10)
        print(f"[*] Listening as {self.server_host}:{self.port}")

        # Accept connection and create new socket representing connection
        try:
            client_socket, address = self.socket.accept()
        except Exception as e:
            return 2, {'code': OrderCode.FAILED.value}

        print(f"[+] {address} is connected.")
        client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

        status, header = self.__client_thread(client_socket, path)

        client_socket.close()
        print("[x] Closed client socket.")
        self.socket.close()
        print("[x] Closed main socket.")
        return (status, header)

    def __client_thread(self, client_socket, path=None) -> (int, dict):
        """ Client thread to handle input. Receive with given client socket.

        Args:
            client_socket (socket): Client socket
            path (str): optional path to write to. Default to current working 
            dir.

        Returns:
            tuple (int, dict): 0 success, 1 error and dict of header.
        """
        print(f"[*] Client thread")

        package = client_socket.recv(BUFFER_SIZE)
        # NOTE: Sometimes file gets sent with header. Turned off Nagle's
        #       algorithm, but for very small files it may send the file
        #       with the header.
        header, payload = pickle.loads(package).split(HEADER)
        header = literal_eval(header)  # Load the header
        code = header['code']  # Evaluate code
        print(f"[+] Code: {header['code']}")
        if header['message']:
            print(f"[!] Message: {header['message']}")
        # Confirm payload secure
        print("[*] Sending conf")
        self.__sendconf(Talk.HEADER, client_socket)
        # If we're getting a file, pass to receive_file()
        if code == OrderCode.SEND_FILE.value:
            return self.__receive_file(header, payload, client_socket, path)

        return 0, header

    def __receive_file(self, header, payload, client_socket, path=None) -> (int, dict):
        """ Receive file from pi. Listens on all ips. If not successful, 
        dictionary will be empty.

        Args:
            header (dict): header information
            payload (str): files sometimes catted to header
            client_socket (socket): 
            path (str): optional. Default to cwd.

        Returns:
            tuple (int, dict): 0 success, 1 error and header
        """
        # Receive file info using client socket, not server socket
        filename = header['file']
        filesize = header['filesize']
        filename = os.path.basename(filename)
        filename = path + filename
        filesize = int(filesize)

        print(f"[!] Receiving {filename}, size {filesize}")

        try:
            start = time.time()

            f = open(filename, 'wb')
            while True:
                bytes_read = client_socket.recv(BUFFER_SIZE)
                if not bytes_read:
                    break
                f.write(bytes_read)
            f.close()

            end = time.time()
            print(f"[!] Received in {end-start}s")

            print(f"[+] Received {filename}")

            return (0, header)
        except Exception as e:
            self.eh.log_error(e)
            print("[x] Failed to receive file.")
            return (1, {})

Methods

def get_status(self)
Expand source code
def get_status(self):
    print(f"[!] self.ip:       {self.ip}")
    print(f"[!] self.serverip: {self.serverip}\n")
def listen(self, path=None)

Listener that spawns threads for PiMan. When a header is received, spawn a client thread to manage connection.

Args

path : str
optional path to write received file to.

Returns

tuple : int, dict
int 0 Success, > 0 failure,
Expand source code
def listen(self, path=None) -> (int, dict):
    """ Listener that spawns threads for PiMan. When a header is received,
    spawn a client thread to manage connection.

    Args:
        path (str): optional path to write received file to.

    Returns:
        tuple (int, dict): int 0 Success, > 0 failure,
    """
    print("\nListen")
    # Create TCP server socket, set to reuse address and disable Nagle
    self.socket = socket.socket()
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

    # Bind socket to our local address and port
    self.socket.bind((self.server_host, self.port))
    # Accept 5 connections
    self.socket.listen(10)
    print(f"[*] Listening as {self.server_host}:{self.port}")

    # Accept connection and create new socket representing connection
    try:
        client_socket, address = self.socket.accept()
    except Exception as e:
        return 2, {'code': OrderCode.FAILED.value}

    print(f"[+] {address} is connected.")
    client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

    status, header = self.__client_thread(client_socket, path)

    client_socket.close()
    print("[x] Closed client socket.")
    self.socket.close()
    print("[x] Closed main socket.")
    return (status, header)
def ping(self, timeout_time=2)

Scan network, if active found we know we are not the first to start.

Args

None
 

Returns

int
0 on connection, 1 on nothing found
Expand source code
def ping(self, timeout_time=TIMEOUT_DURATION) -> int:
    """ Scan network, if active found we know we are not the first to start.

    Args:
        None

    Returns:
        int: 0 on connection, 1 on nothing found
    """
    loop_end_time = datetime.now() + timedelta(minutes=timeout_time)
    # Loop for TIMEOUT_DURATION minutes or until we first connection.
    while datetime.now() < loop_end_time:
        # Try to set serverip
        print("Ping trying to connect")
        if self.scan():
            return 0
    return 1
def scan(self)

Scan for active network hosts in constants.KNOWN_IPS. If found, set self.serverip to found ip, return True. There is someone on LAN, but we don't yet know if it is accepting connections. Must be used after instantiating EtherTalk class.

Args: None

Returns

bool
True indicates success, False indicates nobody there.
Expand source code
def scan(self) -> bool:
    """ Scan for active network hosts in constants.KNOWN_IPS.
    If found, set self.serverip to found ip, return True. There is someone
    on LAN, but we don't yet know if it is accepting connections.
    Must be used after instantiating EtherTalk class.

    Args: 
        None

    Returns:
        bool: True indicates success, False indicates nobody there.
    """
    if self.test == True:
        # Loopback device for local
        interface = ni.ifaddresses('lo')[ni.AF_INET][0]
        self.ip = interface['addr']
        self.serverip = "localhost"
        return True
    else:
        # Rpi specific ethernet interface 'eth0'.
        # This fails and ret 0 if there is no ethernet conn.
        try:
            interface = ni.ifaddresses('eth0')[ni.AF_INET][0]
            self.ip = interface['addr']
        except:
            self.ip = None
            self.serverip = None
            return False
    print("[*] Scanning for ips on lan")
    print("[*] My ip:", self.ip)
    for ip in KNOWN_IPS:
        print("[*] Pinging", ip)
        if self.ip == ip:
            print("[!] SAME!")
            continue
        if self.__isup(ip):
            print(f"[+] Active host found at {ip}.\n")
            self.serverip = ip
            return True
    print()
    return False
def send(self, header)

Send message to the other pi.

Check that message is valid, then connect and send message. Then wait for confirmation.

Args

header : dict
dictionary of header info

Returns

int
 
0 on successful connection.
1 someone was there before, but our connection failed,
 

likely went down. 2 nobody at all on our LAN. 3 connection was ok, but send failed.

Example

from OrderCode import OrderCode EtherTalkObject.send(OrderCode.SHUT_DOWN)

Expand source code
def send(self, header) -> int:
    """ Send message to the other pi.
    
    Check that message is valid, then connect and send message. Then wait
    for confirmation.

    Args:
        header (dict): dictionary of header info

    Returns:
        int
    0 on successful connection.
    1 someone was there before, but our connection failed,
    likely went down.
    2 nobody at all on our LAN.
    3 connection was ok, but send failed.

    Example:
        `from OrderCode import OrderCode`
        `EtherTalkObject.send(OrderCode.SHUT_DOWN)`

    """
    assert(type(header['code']) == type(OrderCode.SHUT_DOWN.value))

    print(f"\n[+] Sending {header['code']}")

    # Keep trying to connect until CONF_TIMEOUT
    stat = self.__connect()
    tf = time.time() + CONF_TIMEOUT
    while stat != 0 and time.time() < tf:
        stat = self.__connect()
        print(f"[!] send connect stat {stat}")
    # If we've connect was not successful upon timeout, return stat
    if stat > 0:
        return stat

    try:
        # Pickle our payload, send it, then confirm
        self.socket.sendall(pickle.dumps(f"{header}{HEADER}"))
        self.__getconf(Talk.HEADER)
        return 0
    except Exception as e:
        print(f"[x] Failed to send {e}")
        self.socket.close()
        return 3
def send_file(self, header, filename)

Send file to connected pi. Attempts to establish connection, then sends file. If no connection (no serverip), then scans once to attempt setting serverip to active host on network.

Args

header : dict
Generated by MessageMan. We need to add filename and filsize.
filename : str
Path to file, relative to sender. This will be stripped away.

Returns

int
 
0 on successful connection.
1 someone was there before, but our connection failed,
 

likely went down. 2 nobody at all on our LAN. 3 connection was ok, but send failed.

message =
    {
        "code"       : (OrderCode),
        "sent_time"  : (float)),
        "start_time" : (start_time),
        "message"    : (input_message)
    }
Expand source code
def send_file(self, header, filename) -> int:
    """ Send file to connected pi. Attempts to establish connection, then
    sends file. If no connection (no serverip), then scans once to attempt
    setting serverip to active host on network.

    Args:
        header (dict): Generated by MessageMan. We need to add filename
            and filsize.
        filename (str): Path to file, relative to sender. This will be
            stripped away.

    Returns:
        int
    0 on successful connection.
    1 someone was there before, but our connection failed,
    likely went down.
    2 nobody at all on our LAN.
    3 connection was ok, but send failed.

        message =
            {
                "code"       : (OrderCode),
                "sent_time"  : (float)),
                "start_time" : (start_time),
                "message"    : (input_message)
            }                
    """
    print(f"\n[!] Sending {filename}")

    assert(os.path.isfile(filename))

    # Keep trying to connect until CONF_TIMEOUT
    stat = self.__connect()
    tf = time.time() + CONF_TIMEOUT
    while stat != 0 and (tf-time.time()) > 0:
        stat = self.__connect()
        print(f"[!] send connect stat {stat}")
    # If we've connect was not successful upon timeout, return stat
    if stat > 0:
        return stat
    
    # Put file name and size into header (to let follower know we're
    # about to send a file)
    header['file'] = filename if filename else ""
    header['filesize'] = os.path.getsize(filename) if filename else ""

    filesize = header['filesize']
    try:
        start = time.time()

        # Send our header declaring a file is to be sent
        package = pickle.dumps(f"{header}{HEADER}")
        self.socket.sendall(package)

        self.__getconf(Talk.HEADER)

        print("[*] Sending file.")
        f = open(filename, 'rb')
        bytes_sent = self.socket.sendfile(f)

        print(f"[!] Bytes sent {bytes_sent}")
        if bytes_sent - filesize != 0:
            print("[x] Something went wrong with our sending!")
        f.close()

        end = time.time()
        print(f"[!] Sent in {end-start}s")

        print(f"[+] {filename} sent")
        self.socket.close()
        return 0
    except Exception as e:
        self.eh.log_error(e)
        print(f"[x] Sending file error: {e}")
        print("[x] Failed to send file.")
        return 3
class Talk (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var HEADER

An enumeration.