Merge pull request 'development' (#6) from development into main

Reviewed-on: #6
This commit is contained in:
Connor C 2023-09-19 23:07:10 -06:00
commit 4a18249f76
5 changed files with 123 additions and 114 deletions

View File

@ -3,4 +3,6 @@
source 'https://rubygems.org'
gem 'async'
gem 'openssl'
gem 'sequel'
gem 'socket'

View File

@ -4,31 +4,52 @@ require 'socket'
require 'async'
require 'sequel'
require 'openssl'
require 'securerandom'
# Set up the database
DB = Sequel.sqlite # In-memory database
# Create processors table
DB.create_table :processors do
primary_key :proto_handler_id
String :uuid
String :domain
Integer :port
end
# Create orchestrators table
DB.create_table :orchestrators do
primary_key :id
String :domain
Integer :port
end
# Create blacklist table
DB.create_table :blacklist do
primary_key :id
String :uuid
end
processors = DB[:processors]
orchestrators = DB[:orchestrators]
blacklist = DB[:blacklist]
listen_ip = ENV['LISTEN_IP'] || '0.0.0.0'
listen_port = ENV['LISTEN_PORT'] || 3080
server = TCPServer.new(listen_ip, listen_port)
# This hash will store the connections to the consumers
connections = {}
# This hash will store the recently connected UUIDs with their last validation timestamp
recently_connected = {}
def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
# If the IP address is set to "loopback", replace it with the actual loopback IP address
ip = '127.0.0.1' if ip.downcase == 'loopback'
if ip =~ Resolv::IPv4::Regex
# If the IP address is an IPv4 address, create an unencrypted socket
TCPSocket.new(ip, port)
else
# If the IP address is a domain name, create an SSL socket
ssl_context = OpenSSL::SSL::SSLContext.new
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
# Add certificate and key configuration here if needed
tcp_socket = TCPSocket.new(ip, port)
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
ssl_socket.sync_close = true
@ -37,92 +58,75 @@ def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
end
end
def handle_input(line, processors, orchestrators, blacklist, recently_connected) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
command, *args = line.split
uuid = args.shift if command != 'REGISTER'
# Check if the UUID is blacklisted
return 'TERMINATE' if blacklist.where(uuid:).count.positive?
# Check if the UUID is in the recently connected cache
if recently_connected[uuid] && Time.now - recently_connected[uuid] < 120
# UUID is recently connected and within 2 minutes, no need to re-validate
elsif command != 'REGISTER' && (uuid.nil? || processors.where(uuid:).count.zero?)
# UUID is not recently connected or is invalid, add to blacklist
blacklist.insert(uuid:)
return 'ERROR Unrecognized UUID'
else
# UUID is valid, update the recently connected cache
recently_connected[uuid] = Time.now
end
handle_command(command, args, processors, orchestrators)
end
def handle_command(command, args, processors, orchestrators) # rubocop:disable Metrics/MethodLength
if command.start_with?('ORCHESTRATOR')
orchestrator_command = command.split('_', 2).last
orchestrator = orchestrators.first # Assuming you want to pass to the first available orchestrator
case orchestrator_command
when 'REQUEST', 'UPSTATE', 'FINISHED'
orchestrator.puts "#{orchestrator_command} #{args.join(' ')}"
else
puts "Unknown orchestrator command: #{orchestrator_command}"
end
else
consumer = processors.where(uuid: args.last).first # Assuming UUID is the last argument for CHANGE and SHUTDOWN
case command
when 'CHANGE', 'SHUTDOWN'
consumer.puts "#{command} #{args.join(' ')}"
else
puts "Unknown command: #{command}"
end
end
end
def register_orchestrator(line, orchestrators)
_, domain, port = line.split
id = orchestrators.max(:id).to_i + 1
orchestrators.insert(id:, domain:, port:)
puts "Orchestrator registered with domain: #{domain}, port: #{port}"
end
Async do
loop do
Async::Task.new do
client = server.accept
begin
while (line = client.gets)
line = client.gets
# Determine if the connection is from an orchestrator for registration
if is_orchestrator_registration?(line)
register_orchestrator(line, orchestrators)
else
# Here we handle each line of input from the client
handle_input(line, connections)
handle_input(line, processors, blacklist, recently_connected)
end
ensure
# This code will be executed when the fiber is finished, regardless of whether an exception was raised
client.close
Async::Task.current.stop # Stop the current task
Async::Task.current.stop
end
end
end
end
def handle_input(line, connections) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
# Split the line into command and args
command, *args = line.split
case command
when 'NEW_PROCESSOR'
# Handle new processor registration
id, domain, port = args
# Create a new connection to the processor
Async do
processor_connection = create_socket(domain, port)
# Store the connection in the hash
connections[id] = processor_connection
# Add the processor to the database
processors.insert(consumer_id: id, ip: domain, port: port)
end
when 'REQUEST'
# Handle request from orchestrator
consumer_id, request_type, pcap_chunk_id = args
# Get the connection for this consumer
connection = connections[consumer_id]
# Send the request to the consumer
connection.puts "REQUEST #{request_type} #{pcap_chunk_id}"
when 'UPSTATE'
# Handle upstate from consumer
consumer_id, state, pcap_chunk_id = args
# Do something with the state update...
# For now, we'll just print it out
puts "Consumer #{consumer_id} is now #{state} for chunk #{pcap_chunk_id}"
when 'FINISHED'
# Handle finished from consumer
consumer_id, pcap_chunk_id = args
# Do something with the finished message...
# For now, we'll just print it out
puts "Consumer #{consumer_id} has finished chunk #{pcap_chunk_id}"
when 'CHANGE'
# Handle change from orchestrator
consumer_id, state, urgency = args
# Get the connection for this consumer
connection = connections[consumer_id]
# Send the change to the consumer
connection.puts "CHANGE #{state} #{urgency}"
when 'SHUTDOWN'
# Handle shutdown from orchestrator
consumer_id, urgency = args
# Get the connection for this consumer
connection = connections[consumer_id]
# Send the shutdown to the consumer
connection.puts "SHUTDOWN #{urgency}"
else
puts "Unknown command: #{command}"
end
end

View File

@ -2,25 +2,26 @@ GEM
remote: https://rubygems.org/
specs:
ast (2.4.2)
atk (4.1.8)
glib2 (= 4.1.8)
atk (4.2.0)
glib2 (= 4.2.0)
backport (1.2.0)
base64 (0.1.1)
benchmark (0.2.1)
bigdecimal (3.1.4)
cairo (1.17.12)
native-package-installer (>= 1.0.3)
pkg-config (>= 1.2.2)
red-colors
cairo-gobject (4.1.8)
cairo-gobject (4.2.0)
cairo (>= 1.16.2)
glib2 (= 4.1.8)
console (1.18.0)
glib2 (= 4.2.0)
console (1.23.2)
fiber-annotation
fiber-local
curses (1.4.4)
diff-lcs (1.5.0)
dotenv (2.8.1)
dynamic_curses_input (1.1.0)
dynamic_curses_input (1.2.1)
curses
reline
e2mmap (0.1.0)
@ -33,23 +34,23 @@ GEM
path_expander (~> 1.0)
ruby_parser (~> 3.0)
sexp_processor (~> 4.0)
gdk3 (4.1.8)
cairo-gobject (= 4.1.8)
gdk_pixbuf2 (= 4.1.8)
pango (= 4.1.8)
gdk_pixbuf2 (4.1.8)
gio2 (= 4.1.8)
gio2 (4.1.8)
gdk3 (4.2.0)
cairo-gobject (= 4.2.0)
gdk_pixbuf2 (= 4.2.0)
pango (= 4.2.0)
gdk_pixbuf2 (4.2.0)
gio2 (= 4.2.0)
gio2 (4.2.0)
fiddle
gobject-introspection (= 4.1.8)
glib2 (4.1.8)
gobject-introspection (= 4.2.0)
glib2 (4.2.0)
native-package-installer (>= 1.0.3)
pkg-config (>= 1.3.5)
gobject-introspection (4.1.8)
glib2 (= 4.1.8)
gtk3 (4.1.8)
atk (= 4.1.8)
gdk3 (= 4.1.8)
gobject-introspection (4.2.0)
glib2 (= 4.2.0)
gtk3 (4.2.0)
atk (= 4.2.0)
gdk3 (= 4.2.0)
io-console (0.6.0)
jaro_winkler (1.5.6)
json (2.6.3)
@ -62,21 +63,21 @@ GEM
matrix (0.4.2)
mysql2 (0.5.5)
native-package-installer (1.1.8)
nokogiri (1.15.3-x86_64-linux)
nokogiri (1.15.4-x86_64-linux)
racc (~> 1.4)
openssl (3.1.0)
packetfu (2.0.0)
pcaprub (~> 0.13.1)
pango (4.1.8)
cairo-gobject (= 4.1.8)
gobject-introspection (= 4.1.8)
pango (4.2.0)
cairo-gobject (= 4.2.0)
gobject-introspection (= 4.2.0)
parallel (1.23.0)
parser (3.2.2.3)
ast (~> 2.4.1)
racc
path_expander (1.1.1)
pcaprub (0.13.1)
pkg-config (1.5.2)
pkg-config (1.5.5)
racc (1.7.1)
rainbow (3.1.1)
rbs (2.8.4)
@ -87,12 +88,13 @@ GEM
parser (~> 3.2.0)
rainbow (>= 2.0, < 4.0)
regexp_parser (2.8.1)
reline (0.3.7)
reline (0.3.8)
io-console (~> 0.5)
reverse_markdown (2.1.1)
nokogiri
rexml (3.2.6)
rubocop (1.55.0)
rubocop (1.56.3)
base64 (~> 0.1.1)
json (~> 2.3)
language_server-protocol (>= 3.17.0)
parallel (~> 1.10)
@ -109,7 +111,8 @@ GEM
ruby_parser (3.20.3)
sexp_processor (~> 4.16)
securerandom (0.2.2)
sequel (5.70.0)
sequel (5.72.0)
bigdecimal
sexp_processor (4.17.0)
solargraph (0.49.0)
backport (~> 1.2)
@ -128,7 +131,7 @@ GEM
tilt (~> 2.0)
yard (~> 0.9, >= 0.9.24)
thor (1.2.2)
tilt (2.2.0)
tilt (2.3.0)
tracer (0.2.2)
unicode-display_width (2.4.2)
yaml (0.2.1)
@ -158,4 +161,4 @@ DEPENDENCIES
yaml (~> 0.2.1)
BUNDLED WITH
2.4.16
2.4.10

BIN
lib/bin/gopls Executable file

Binary file not shown.

View File

@ -1,5 +1,5 @@
go.sum database tree
18772708
zUQL+Z/7glb/JRJWTIOwKpTurAD4UkqBoSM/R6Wxx1U=
18982882
5hq5RwlCLIiagi6hZScdHlumu5XIpITikSz2+tAr9LA=
— sum.golang.org Az3grlYd+7jVJcF9/zAb4Pytelt3fRZySKTY7/bjVB7mPom5EO6a2ir09DWokk6RPoUjmc7yd4zez+i6mUr2sYCddg8=
— sum.golang.org Az3grhw2ZJlhe/saIZhYrbHraYSRxzcFocBkNuDAACA6cT3QTOA1P1fa+rhYfXNq2KVoROv5wGw5Y/2naw4QGoEoYAE=