Compare commits

..

6 Commits

Author SHA1 Message Date
4a18249f76 Merge pull request 'development' (#6) from development into main
Reviewed-on: #6
2023-09-19 23:07:10 -06:00
5dde3f6ca4 Merge branch 'main' into development 2023-09-19 23:06:08 -06:00
VetheonGames
e8f2792bf6 Update license and bundle 2023-09-19 22:59:57 -06:00
VetheonGames
140860e1ea August 10th 2023 - Proto_handler update
Fix the proto_handler to separate request handling from input handling
Make the proto_handler just pass messages between. We don't need to actually process anything
Add some logic to determine if the request is coming from the orchestrator or consumer
Some other various proto_handler fixes
2023-08-10 15:37:51 -06:00
VetheonGames
674aa5df91 Setup UUID generation for Consumers
Setup registration for orchestrators
make the proto_handler properly use it's in-memory DB for registrations
Ensure the Proto_handler goes both ways
Add a cache system so that consumers don't need to constantly re-register
Add a blacklist system to prevent unauthorized Consumers from getting access to the message queue
Add a ID assignment for each registered consumer to the proto_handler so we can keep track of them in the proto_handler
Add a async task to actually handle the input from the consumers
2023-08-10 15:01:48 -06:00
VetheonGames
f66f125a49 Update Gopls 2023-08-10 12:45:34 -06:00
5 changed files with 123 additions and 114 deletions

View File

@ -3,4 +3,6 @@
source 'https://rubygems.org' source 'https://rubygems.org'
gem 'async' gem 'async'
gem 'openssl'
gem 'sequel' gem 'sequel'
gem 'socket'

View File

@ -4,31 +4,52 @@ require 'socket'
require 'async' require 'async'
require 'sequel' require 'sequel'
require 'openssl' require 'openssl'
require 'securerandom'
# Set up the database # Set up the database
DB = Sequel.sqlite # In-memory database DB = Sequel.sqlite # In-memory database
# Create processors table
DB.create_table :processors do
primary_key :proto_handler_id
String :uuid
String :domain
Integer :port
end
# Create orchestrators table
DB.create_table :orchestrators do
primary_key :id
String :domain
Integer :port
end
# Create blacklist table
DB.create_table :blacklist do
primary_key :id
String :uuid
end
processors = DB[:processors] processors = DB[:processors]
orchestrators = DB[:orchestrators]
blacklist = DB[:blacklist]
listen_ip = ENV['LISTEN_IP'] || '0.0.0.0' listen_ip = ENV['LISTEN_IP'] || '0.0.0.0'
listen_port = ENV['LISTEN_PORT'] || 3080 listen_port = ENV['LISTEN_PORT'] || 3080
server = TCPServer.new(listen_ip, listen_port) server = TCPServer.new(listen_ip, listen_port)
# This hash will store the connections to the consumers # This hash will store the recently connected UUIDs with their last validation timestamp
connections = {} recently_connected = {}
def create_socket(ip, port) # rubocop:disable Metrics/MethodLength def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
# If the IP address is set to "loopback", replace it with the actual loopback IP address
ip = '127.0.0.1' if ip.downcase == 'loopback' ip = '127.0.0.1' if ip.downcase == 'loopback'
if ip =~ Resolv::IPv4::Regex if ip =~ Resolv::IPv4::Regex
# If the IP address is an IPv4 address, create an unencrypted socket
TCPSocket.new(ip, port) TCPSocket.new(ip, port)
else else
# If the IP address is a domain name, create an SSL socket
ssl_context = OpenSSL::SSL::SSLContext.new ssl_context = OpenSSL::SSL::SSLContext.new
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
# Add certificate and key configuration here if needed
tcp_socket = TCPSocket.new(ip, port) tcp_socket = TCPSocket.new(ip, port)
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
ssl_socket.sync_close = true ssl_socket.sync_close = true
@ -37,92 +58,75 @@ def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
end end
end end
def handle_input(line, processors, orchestrators, blacklist, recently_connected) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
command, *args = line.split
uuid = args.shift if command != 'REGISTER'
# Check if the UUID is blacklisted
return 'TERMINATE' if blacklist.where(uuid:).count.positive?
# Check if the UUID is in the recently connected cache
if recently_connected[uuid] && Time.now - recently_connected[uuid] < 120
# UUID is recently connected and within 2 minutes, no need to re-validate
elsif command != 'REGISTER' && (uuid.nil? || processors.where(uuid:).count.zero?)
# UUID is not recently connected or is invalid, add to blacklist
blacklist.insert(uuid:)
return 'ERROR Unrecognized UUID'
else
# UUID is valid, update the recently connected cache
recently_connected[uuid] = Time.now
end
handle_command(command, args, processors, orchestrators)
end
def handle_command(command, args, processors, orchestrators) # rubocop:disable Metrics/MethodLength
if command.start_with?('ORCHESTRATOR')
orchestrator_command = command.split('_', 2).last
orchestrator = orchestrators.first # Assuming you want to pass to the first available orchestrator
case orchestrator_command
when 'REQUEST', 'UPSTATE', 'FINISHED'
orchestrator.puts "#{orchestrator_command} #{args.join(' ')}"
else
puts "Unknown orchestrator command: #{orchestrator_command}"
end
else
consumer = processors.where(uuid: args.last).first # Assuming UUID is the last argument for CHANGE and SHUTDOWN
case command
when 'CHANGE', 'SHUTDOWN'
consumer.puts "#{command} #{args.join(' ')}"
else
puts "Unknown command: #{command}"
end
end
end
def register_orchestrator(line, orchestrators)
_, domain, port = line.split
id = orchestrators.max(:id).to_i + 1
orchestrators.insert(id:, domain:, port:)
puts "Orchestrator registered with domain: #{domain}, port: #{port}"
end
Async do Async do
loop do loop do
Async::Task.new do Async::Task.new do
client = server.accept client = server.accept
begin begin
while (line = client.gets) line = client.gets
# Determine if the connection is from an orchestrator for registration
if is_orchestrator_registration?(line)
register_orchestrator(line, orchestrators)
else
# Here we handle each line of input from the client # Here we handle each line of input from the client
handle_input(line, connections) handle_input(line, processors, blacklist, recently_connected)
end end
ensure ensure
# This code will be executed when the fiber is finished, regardless of whether an exception was raised # This code will be executed when the fiber is finished, regardless of whether an exception was raised
client.close client.close
Async::Task.current.stop # Stop the current task Async::Task.current.stop
end end
end end
end end
end end
def handle_input(line, connections) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
# Split the line into command and args
command, *args = line.split
case command
when 'NEW_PROCESSOR'
# Handle new processor registration
id, domain, port = args
# Create a new connection to the processor
Async do
processor_connection = create_socket(domain, port)
# Store the connection in the hash
connections[id] = processor_connection
# Add the processor to the database
processors.insert(consumer_id: id, ip: domain, port: port)
end
when 'REQUEST'
# Handle request from orchestrator
consumer_id, request_type, pcap_chunk_id = args
# Get the connection for this consumer
connection = connections[consumer_id]
# Send the request to the consumer
connection.puts "REQUEST #{request_type} #{pcap_chunk_id}"
when 'UPSTATE'
# Handle upstate from consumer
consumer_id, state, pcap_chunk_id = args
# Do something with the state update...
# For now, we'll just print it out
puts "Consumer #{consumer_id} is now #{state} for chunk #{pcap_chunk_id}"
when 'FINISHED'
# Handle finished from consumer
consumer_id, pcap_chunk_id = args
# Do something with the finished message...
# For now, we'll just print it out
puts "Consumer #{consumer_id} has finished chunk #{pcap_chunk_id}"
when 'CHANGE'
# Handle change from orchestrator
consumer_id, state, urgency = args
# Get the connection for this consumer
connection = connections[consumer_id]
# Send the change to the consumer
connection.puts "CHANGE #{state} #{urgency}"
when 'SHUTDOWN'
# Handle shutdown from orchestrator
consumer_id, urgency = args
# Get the connection for this consumer
connection = connections[consumer_id]
# Send the shutdown to the consumer
connection.puts "SHUTDOWN #{urgency}"
else
puts "Unknown command: #{command}"
end
end

View File

@ -2,25 +2,26 @@ GEM
remote: https://rubygems.org/ remote: https://rubygems.org/
specs: specs:
ast (2.4.2) ast (2.4.2)
atk (4.1.8) atk (4.2.0)
glib2 (= 4.1.8) glib2 (= 4.2.0)
backport (1.2.0) backport (1.2.0)
base64 (0.1.1) base64 (0.1.1)
benchmark (0.2.1) benchmark (0.2.1)
bigdecimal (3.1.4)
cairo (1.17.12) cairo (1.17.12)
native-package-installer (>= 1.0.3) native-package-installer (>= 1.0.3)
pkg-config (>= 1.2.2) pkg-config (>= 1.2.2)
red-colors red-colors
cairo-gobject (4.1.8) cairo-gobject (4.2.0)
cairo (>= 1.16.2) cairo (>= 1.16.2)
glib2 (= 4.1.8) glib2 (= 4.2.0)
console (1.18.0) console (1.23.2)
fiber-annotation fiber-annotation
fiber-local fiber-local
curses (1.4.4) curses (1.4.4)
diff-lcs (1.5.0) diff-lcs (1.5.0)
dotenv (2.8.1) dotenv (2.8.1)
dynamic_curses_input (1.1.0) dynamic_curses_input (1.2.1)
curses curses
reline reline
e2mmap (0.1.0) e2mmap (0.1.0)
@ -33,23 +34,23 @@ GEM
path_expander (~> 1.0) path_expander (~> 1.0)
ruby_parser (~> 3.0) ruby_parser (~> 3.0)
sexp_processor (~> 4.0) sexp_processor (~> 4.0)
gdk3 (4.1.8) gdk3 (4.2.0)
cairo-gobject (= 4.1.8) cairo-gobject (= 4.2.0)
gdk_pixbuf2 (= 4.1.8) gdk_pixbuf2 (= 4.2.0)
pango (= 4.1.8) pango (= 4.2.0)
gdk_pixbuf2 (4.1.8) gdk_pixbuf2 (4.2.0)
gio2 (= 4.1.8) gio2 (= 4.2.0)
gio2 (4.1.8) gio2 (4.2.0)
fiddle fiddle
gobject-introspection (= 4.1.8) gobject-introspection (= 4.2.0)
glib2 (4.1.8) glib2 (4.2.0)
native-package-installer (>= 1.0.3) native-package-installer (>= 1.0.3)
pkg-config (>= 1.3.5) pkg-config (>= 1.3.5)
gobject-introspection (4.1.8) gobject-introspection (4.2.0)
glib2 (= 4.1.8) glib2 (= 4.2.0)
gtk3 (4.1.8) gtk3 (4.2.0)
atk (= 4.1.8) atk (= 4.2.0)
gdk3 (= 4.1.8) gdk3 (= 4.2.0)
io-console (0.6.0) io-console (0.6.0)
jaro_winkler (1.5.6) jaro_winkler (1.5.6)
json (2.6.3) json (2.6.3)
@ -62,21 +63,21 @@ GEM
matrix (0.4.2) matrix (0.4.2)
mysql2 (0.5.5) mysql2 (0.5.5)
native-package-installer (1.1.8) native-package-installer (1.1.8)
nokogiri (1.15.3-x86_64-linux) nokogiri (1.15.4-x86_64-linux)
racc (~> 1.4) racc (~> 1.4)
openssl (3.1.0) openssl (3.1.0)
packetfu (2.0.0) packetfu (2.0.0)
pcaprub (~> 0.13.1) pcaprub (~> 0.13.1)
pango (4.1.8) pango (4.2.0)
cairo-gobject (= 4.1.8) cairo-gobject (= 4.2.0)
gobject-introspection (= 4.1.8) gobject-introspection (= 4.2.0)
parallel (1.23.0) parallel (1.23.0)
parser (3.2.2.3) parser (3.2.2.3)
ast (~> 2.4.1) ast (~> 2.4.1)
racc racc
path_expander (1.1.1) path_expander (1.1.1)
pcaprub (0.13.1) pcaprub (0.13.1)
pkg-config (1.5.2) pkg-config (1.5.5)
racc (1.7.1) racc (1.7.1)
rainbow (3.1.1) rainbow (3.1.1)
rbs (2.8.4) rbs (2.8.4)
@ -87,12 +88,13 @@ GEM
parser (~> 3.2.0) parser (~> 3.2.0)
rainbow (>= 2.0, < 4.0) rainbow (>= 2.0, < 4.0)
regexp_parser (2.8.1) regexp_parser (2.8.1)
reline (0.3.7) reline (0.3.8)
io-console (~> 0.5) io-console (~> 0.5)
reverse_markdown (2.1.1) reverse_markdown (2.1.1)
nokogiri nokogiri
rexml (3.2.6) rexml (3.2.6)
rubocop (1.55.0) rubocop (1.56.3)
base64 (~> 0.1.1)
json (~> 2.3) json (~> 2.3)
language_server-protocol (>= 3.17.0) language_server-protocol (>= 3.17.0)
parallel (~> 1.10) parallel (~> 1.10)
@ -109,7 +111,8 @@ GEM
ruby_parser (3.20.3) ruby_parser (3.20.3)
sexp_processor (~> 4.16) sexp_processor (~> 4.16)
securerandom (0.2.2) securerandom (0.2.2)
sequel (5.70.0) sequel (5.72.0)
bigdecimal
sexp_processor (4.17.0) sexp_processor (4.17.0)
solargraph (0.49.0) solargraph (0.49.0)
backport (~> 1.2) backport (~> 1.2)
@ -128,7 +131,7 @@ GEM
tilt (~> 2.0) tilt (~> 2.0)
yard (~> 0.9, >= 0.9.24) yard (~> 0.9, >= 0.9.24)
thor (1.2.2) thor (1.2.2)
tilt (2.2.0) tilt (2.3.0)
tracer (0.2.2) tracer (0.2.2)
unicode-display_width (2.4.2) unicode-display_width (2.4.2)
yaml (0.2.1) yaml (0.2.1)
@ -158,4 +161,4 @@ DEPENDENCIES
yaml (~> 0.2.1) yaml (~> 0.2.1)
BUNDLED WITH BUNDLED WITH
2.4.16 2.4.10

BIN
lib/bin/gopls Executable file

Binary file not shown.

View File

@ -1,5 +1,5 @@
go.sum database tree go.sum database tree
18772708 18982882
zUQL+Z/7glb/JRJWTIOwKpTurAD4UkqBoSM/R6Wxx1U= 5hq5RwlCLIiagi6hZScdHlumu5XIpITikSz2+tAr9LA=
— sum.golang.org Az3grlYd+7jVJcF9/zAb4Pytelt3fRZySKTY7/bjVB7mPom5EO6a2ir09DWokk6RPoUjmc7yd4zez+i6mUr2sYCddg8= — sum.golang.org Az3grhw2ZJlhe/saIZhYrbHraYSRxzcFocBkNuDAACA6cT3QTOA1P1fa+rhYfXNq2KVoROv5wGw5Y/2naw4QGoEoYAE=