#!/usr/bin python3
import os
import socket
import subprocess as sp
import selectors
import json
import hashlib
import uuid
[docs]class Interface:
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.created = False
self.opened = False
self.fh = None
self._create()
def _create(self):
# Creates file if non-existent
if not os.path.exists(self.filename):
os.mkfifo(self.filename)
self.created = True
return self.filename
[docs] def get_mode(self):
return self.mode
[docs] def get_file(self):
return self.filename
[docs] def get_fh(self):
return self.fh
[docs] def open(self):
# Opens a file handle based on mode
# Needs to be non-blocking to prevent network software from halting
self.fh = os.open(self.filename, (os.O_RDONLY if "r" else os.O_WRONLY) | os.O_NONBLOCK)
self.opened = True
[docs] def close(self):
# If file handle has been opened, close it
if self.opened:
os.close(self.fh)
# If interface created the file, delete it
if self.created:
os.remove(self.filename)
[docs]class Antenna:
def __init__(self, process, ant_type, modes, original_process_args, file_path, interfaces={}):
# Save off antenna type and modes
self.ant_type = ant_type
self.modes = modes
self.uuid = uuid.uuid4().hex[:8]
# Create a dictionary of all interfaces, also generate interfaces for all files
self.interfaces = {mode:self._create_interface(self._create_filename(file_path, mode)
if mode not in interfaces else interfaces[mode], mode) for mode in self.modes}
# Create the arguments to be passed into the process
process_args = [self.interfaces[mode].get_file() for mode in self.modes] + original_process_args
# Create the command to be run and run it
cmd = " ".join([process, modes] + process_args)
self.process = sp.Popen(cmd, shell=True, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE)
[docs] def name(self):
return self.ant_type+"_"+self.uuid
[docs] def is_closed(self):
return not self.is_open()
[docs] def is_open(self):
return self.process.poll() is None
[docs] def get_interfaces(self):
# Returns all interfaces in order of mode
return [self.interfaces[mode] for mode in self.modes]
[docs] def call(self, data):
# Call the interface
return self.process.communicate(data, timeout=1)
[docs] def status(self):
return "Working" if self.is_open() else str(self.process.returncode)
[docs] def close(self):
# Kill process, then close all interfaces
self.process.terminate()
[self.interfaces[mode].close() for mode in self.interfaces]
try:
return str(self.process.wait(timeout=1))
except sp.TimeoutExpired:
pass
self.process.kill()
try:
return str(self.process.wait(timeout=1))
except sp.TimeoutExpired:
pass
print("Failed to close antenna")
return "Failed to kill"
[docs] def get_stderr(self):
return self.process.stderr
def _create_filename(self, file_path, mode):
# Creates basic filename based on antenna type and mode
return os.path.join(file_path, self.name()+ "_" + mode)
def _create_interface(self, file, mode):
return Interface(file, mode)
[docs]class NetworkManager:
def __init__(self, config):
self._setup(config)
# Save off hash algorithm to be used later
self.hash_algo = hashlib.sha256
self.block_size = 65536
[docs] def reset(self, config):
self.close()
self._setup(config)
[docs] def process(self):
# Get events then run through all of them calling their callback
events = self.sel.select()
for key, mask in events:
callback = key.data[0]
callback(key)
[docs] def close(self):
# Close all antennas
[antenna.close() for antenna in self.antennas]
# Close interfaces then remove server socket
self.in_interface.close()
self.out_interface.close()
self.server_socket.close()
os.remove(self.config["server socket"])
def _setup(self, config):
# Save config
self.config = config
# Setup everything to default values
os.makedirs(self.config["pipe dir"],exist_ok=True)
self.antennas = []
self.antenna_dict = {}
self.sel = selectors.DefaultSelector()
self.fifo_files = set()
# Run all config commands
self._run_config_precommands()
# Create the two interfaces for the network
self.in_interface = Interface(os.path.join(self.config["pipe dir"], "network_manager_r"), "r")
self.out_interface = Interface(os.path.join(self.config["pipe dir"], "network_manager_w"), "w")
# Open the two interfaces
self.in_interface.open()
self.out_interface.open()
# Remove old socket if previous not properly closed
try:
os.remove(self.config["server socket"])
except OSError:
pass
# Create and bind to socket and begin listening
self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.server_socket.bind(self.config["server socket"])
self.server_socket.listen()
# Register the socket and interface
self.sel.register(self.server_socket, selectors.EVENT_READ, data=(self._handle_connection, None))
self.sel.register(self.in_interface.get_fh(), selectors.EVENT_READ, data=(self._handle_file, None))
def _run_config_precommands(self):
# Check if the commands are in the config
if "commands" in self.config:
# Process all commands
cmds = self.config["commands"]
for cmd in cmds:
self._process_command(cmd.split(" ", 1))
def _create_connection(self, data):
# Get information from data
ant_type, modes, *original_process_args = data.split(" ")
# Create the antenna
antenna = Antenna(self.config["processes"][ant_type], ant_type, modes, original_process_args, self.config["pipe dir"])
print("Antenna started")
# Save off the antenna
self.antennas.append(antenna)
self.antenna_dict[antenna.name()] = antenna
# Register the standard err in case of failure
self.sel.register(antenna.get_stderr(), selectors.EVENT_READ, data=(self._antenna_error, antenna))
# Return interfaces to antenna
return " ".join([antenna.name()] + [interface.get_file() for interface in antenna.get_interfaces()]).encode('utf-8')
def _create_attach_connection(self, data):
# Get information from data
ant_type, modes, *original_process_args = data.split(" ")
# Gather interfaces from data
additional_interfaces = original_process_args[:len(modes)]
original_process_args = original_process_args[len(modes):]
# Create the interfaces
interfaces = {mode: additional_interfaces[i] for i, mode in enumerate(modes)}
# Geenrate the antenna with the interfaces
antenna = Antenna(self.config["processes"][ant_type], ant_type, modes,
original_process_args, self.config["pipe dir"], interfaces=interfaces)
print("Antenna started")
# Save off the antenna
self.antennas.append(antenna)
self.antenna_dict[antenna.name()] = antenna
# Register the standard err in case of failure
self.sel.register(antenna.get_stderr(), selectors.EVENT_READ, data=(self._antenna_error, antenna))
# Return interfaces to antenna
return " ".join([interface.get_file() for interface in antenna.get_interfaces()]).encode('utf-8')
def _call_antenna(self, data):
# Get the data in the first area
antenna_name = data[0]
# Find antenna and call it
antenna = self.antenna_dict[antenna.name()]
antenna.call(data[1:])
def _run_shell_command(self, data):
# Run shell command
out = sp.run(data, shell=True)
return str(out.returncode).encode('utf-8')
def _get_info(self, data):
data = data.split(" ")
info_type = data[0]
antenna_id = data[1]
antenna = self.antenna_dict[antenna_id]
info_types = {"status": antenna.status}
cmd = info_types[info_type]
return cmd().encode('utf-8')
def _upload_file(self, filename):
# Create Hash algorithm
hasher = self.hash_algo()
buf = b''
# Read it into buffer and hash it
with open(filename, "rb") as fh:
buf = fh.read()
hasher.update(buf)
# Send hashed value and buffer
return hasher.digest() + buf
def _download_file(self, data):
# Get files from data
out_file, download_file = data.split(" ")
# Unregister interface to prevent weird errors
self.sel.unregister(self.in_interface.out)
# Create hasher
hasher = self.hash_algo()
# Create an internal download function
def __download():
# Write command to other device
os.write(self.out_interface.get_fh(), ("upload " + download_file).encode('utf-8'))
# Read in the hash they're sending back into hash
buf = os.read(self.in_interface.get_fh(), hasher.digest_size)
hash_down = buf
# Read in more data and repeat until buffer empty
buf = os.read(self.in_interface.get_fh(), 1024)
out_buf = buf
while len(buf) > 0:
out_fh.write(buf)
buf = os.read(self.in_interface.get_fh(), 1024)
out_buf += buf
return hash_down, buf
# download file
hash_down, buf = __download()
# Update hash
hasher.update(buf)
# Check if hashes match, otherwise repeat
while hash_down != hasher.digest():
hash_down, buf = __download()
hasher = self.hash_algo()
hasher.update(buf)
pass
# Write file data to file
out_fh = open(out_file, "wb")
out_fh.write(buf)
out_fh.close()
# Reregister interface
self.sel.register(self.in_interface.get_fh(), selectors.EVENT_READ, data=(self._handle_file, None))
def _process_command(self, command_list):
# Create a list of commands
commands = {"create": self._create_connection,
"create_attach": self._create_attach_connection,
"upload": self._upload_file,
"download": self._download_file,
"info": self._get_info,
"close": self._close_connection,
"run": self._run_shell_command}
# Parse out command and call command
command_name, command_data = command_list
try:
command_func = commands[command_name]
except KeyError:
print("Invalid command: " + str(command_name))
return "".encode('utf-8')
return command_func(command_data)
def _close_connection(self, antenna_id):
antenna = self.antenna_dict[antenna_id]
return self._close_antenna(antenna).encode('utf-8')
def _close_antenna(self, antenna):
# Unregister the antenna standard error
self.sel.unregister(antenna.get_stderr())
# Remove stored antenna
self.antennas.remove(antenna)
# Close antenna
return antenna.close()
def _antenna_error(self, key):
# Let use know antenna failed
print("Antenna failed")
print(os.read(key.fd,1024).decode('utf-8'))
# Close antenna
self._close_antenna(key.data[1])
def _handle_file(self, key):
try:
# Get data from file
data = ""
while len(data) == 0 or data[-1] != '\0':
buf = os.read(self.in_interface.get_fh(),1024)
data += buf.decode('utf-8')
# Preprocess data
data = data[:-1].strip()
# Call command
val = self._process_command(data.split(" ", 1))
# Send back command
os.write(self.out_interface.get_fh(),val + b'\0')
except IOError:
pass
def _handle_connection(self, key):
conn, addr = self.server_socket.accept()
try:
# Receive command
try:
# Get data from file
data = ""
while len(data) == 0 or data[-1] != '\0':
buf = conn.recv(1024)
data += buf.decode('utf-8')
# Preprocess data
data = data[:-1].strip()
except IOError as e:
print("Receive connection IO ERROR: " + str(e))
pass
# Call command
val = self._process_command(data.split(" ", 1))
# Send back command data
try:
# Send back command
conn.send(val + b'\0')
except IOError as e:
print("Send connection IO ERROR: " + str(e))
finally:
try:
conn.close()
except OSError:
pass
[docs]def main():
# Create an empty config in case of failure
config = {}
# Load config
with open("default_config.json") as fh:
config = json.load(fh)
# Create network manager
manager = NetworkManager(config)
# Have manager continually process
try:
while True:
manager.process()
except KeyboardInterrupt:
print("\nClosing...")
finally:
manager.close()
if __name__ == "__main__":
main()