Compare commits
No commits in common. "4a18249f76450d13d295ecab01c020277b0f74c2" and "65bae7f27e31b962968b2783dbd9e8e39c772f55" have entirely different histories.
4a18249f76
...
65bae7f27e
|
@ -3,6 +3,4 @@
|
||||||
source 'https://rubygems.org'
|
source 'https://rubygems.org'
|
||||||
|
|
||||||
gem 'async'
|
gem 'async'
|
||||||
gem 'openssl'
|
|
||||||
gem 'sequel'
|
gem 'sequel'
|
||||||
gem 'socket'
|
|
||||||
|
|
|
@ -4,52 +4,31 @@ require 'socket'
|
||||||
require 'async'
|
require 'async'
|
||||||
require 'sequel'
|
require 'sequel'
|
||||||
require 'openssl'
|
require 'openssl'
|
||||||
require 'securerandom'
|
|
||||||
|
|
||||||
# Set up the database
|
# Set up the database
|
||||||
DB = Sequel.sqlite # In-memory database
|
DB = Sequel.sqlite # In-memory database
|
||||||
|
|
||||||
# Create processors table
|
|
||||||
DB.create_table :processors do
|
|
||||||
primary_key :proto_handler_id
|
|
||||||
String :uuid
|
|
||||||
String :domain
|
|
||||||
Integer :port
|
|
||||||
end
|
|
||||||
|
|
||||||
# Create orchestrators table
|
|
||||||
DB.create_table :orchestrators do
|
|
||||||
primary_key :id
|
|
||||||
String :domain
|
|
||||||
Integer :port
|
|
||||||
end
|
|
||||||
|
|
||||||
# Create blacklist table
|
|
||||||
DB.create_table :blacklist do
|
|
||||||
primary_key :id
|
|
||||||
String :uuid
|
|
||||||
end
|
|
||||||
|
|
||||||
processors = DB[:processors]
|
processors = DB[:processors]
|
||||||
orchestrators = DB[:orchestrators]
|
|
||||||
blacklist = DB[:blacklist]
|
|
||||||
|
|
||||||
listen_ip = ENV['LISTEN_IP'] || '0.0.0.0'
|
listen_ip = ENV['LISTEN_IP'] || '0.0.0.0'
|
||||||
listen_port = ENV['LISTEN_PORT'] || 3080
|
listen_port = ENV['LISTEN_PORT'] || 3080
|
||||||
|
|
||||||
server = TCPServer.new(listen_ip, listen_port)
|
server = TCPServer.new(listen_ip, listen_port)
|
||||||
|
|
||||||
# This hash will store the recently connected UUIDs with their last validation timestamp
|
# This hash will store the connections to the consumers
|
||||||
recently_connected = {}
|
connections = {}
|
||||||
|
|
||||||
def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
|
def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
|
||||||
|
# If the IP address is set to "loopback", replace it with the actual loopback IP address
|
||||||
ip = '127.0.0.1' if ip.downcase == 'loopback'
|
ip = '127.0.0.1' if ip.downcase == 'loopback'
|
||||||
|
|
||||||
if ip =~ Resolv::IPv4::Regex
|
if ip =~ Resolv::IPv4::Regex
|
||||||
|
# If the IP address is an IPv4 address, create an unencrypted socket
|
||||||
TCPSocket.new(ip, port)
|
TCPSocket.new(ip, port)
|
||||||
else
|
else
|
||||||
|
# If the IP address is a domain name, create an SSL socket
|
||||||
ssl_context = OpenSSL::SSL::SSLContext.new
|
ssl_context = OpenSSL::SSL::SSLContext.new
|
||||||
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
|
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
|
||||||
# Add certificate and key configuration here if needed
|
|
||||||
tcp_socket = TCPSocket.new(ip, port)
|
tcp_socket = TCPSocket.new(ip, port)
|
||||||
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
|
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
|
||||||
ssl_socket.sync_close = true
|
ssl_socket.sync_close = true
|
||||||
|
@ -58,75 +37,92 @@ def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_input(line, processors, orchestrators, blacklist, recently_connected) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
|
|
||||||
command, *args = line.split
|
|
||||||
uuid = args.shift if command != 'REGISTER'
|
|
||||||
|
|
||||||
# Check if the UUID is blacklisted
|
|
||||||
return 'TERMINATE' if blacklist.where(uuid:).count.positive?
|
|
||||||
|
|
||||||
# Check if the UUID is in the recently connected cache
|
|
||||||
if recently_connected[uuid] && Time.now - recently_connected[uuid] < 120
|
|
||||||
# UUID is recently connected and within 2 minutes, no need to re-validate
|
|
||||||
elsif command != 'REGISTER' && (uuid.nil? || processors.where(uuid:).count.zero?)
|
|
||||||
# UUID is not recently connected or is invalid, add to blacklist
|
|
||||||
blacklist.insert(uuid:)
|
|
||||||
return 'ERROR Unrecognized UUID'
|
|
||||||
else
|
|
||||||
# UUID is valid, update the recently connected cache
|
|
||||||
recently_connected[uuid] = Time.now
|
|
||||||
end
|
|
||||||
|
|
||||||
handle_command(command, args, processors, orchestrators)
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_command(command, args, processors, orchestrators) # rubocop:disable Metrics/MethodLength
|
|
||||||
if command.start_with?('ORCHESTRATOR')
|
|
||||||
orchestrator_command = command.split('_', 2).last
|
|
||||||
orchestrator = orchestrators.first # Assuming you want to pass to the first available orchestrator
|
|
||||||
case orchestrator_command
|
|
||||||
when 'REQUEST', 'UPSTATE', 'FINISHED'
|
|
||||||
orchestrator.puts "#{orchestrator_command} #{args.join(' ')}"
|
|
||||||
else
|
|
||||||
puts "Unknown orchestrator command: #{orchestrator_command}"
|
|
||||||
end
|
|
||||||
else
|
|
||||||
consumer = processors.where(uuid: args.last).first # Assuming UUID is the last argument for CHANGE and SHUTDOWN
|
|
||||||
case command
|
|
||||||
when 'CHANGE', 'SHUTDOWN'
|
|
||||||
consumer.puts "#{command} #{args.join(' ')}"
|
|
||||||
else
|
|
||||||
puts "Unknown command: #{command}"
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def register_orchestrator(line, orchestrators)
|
|
||||||
_, domain, port = line.split
|
|
||||||
id = orchestrators.max(:id).to_i + 1
|
|
||||||
orchestrators.insert(id:, domain:, port:)
|
|
||||||
puts "Orchestrator registered with domain: #{domain}, port: #{port}"
|
|
||||||
end
|
|
||||||
|
|
||||||
Async do
|
Async do
|
||||||
loop do
|
loop do
|
||||||
Async::Task.new do
|
Async::Task.new do
|
||||||
client = server.accept
|
client = server.accept
|
||||||
|
|
||||||
begin
|
begin
|
||||||
line = client.gets
|
while (line = client.gets)
|
||||||
# Determine if the connection is from an orchestrator for registration
|
|
||||||
if is_orchestrator_registration?(line)
|
|
||||||
register_orchestrator(line, orchestrators)
|
|
||||||
else
|
|
||||||
# Here we handle each line of input from the client
|
# Here we handle each line of input from the client
|
||||||
handle_input(line, processors, blacklist, recently_connected)
|
handle_input(line, connections)
|
||||||
end
|
end
|
||||||
ensure
|
ensure
|
||||||
# This code will be executed when the fiber is finished, regardless of whether an exception was raised
|
# This code will be executed when the fiber is finished, regardless of whether an exception was raised
|
||||||
client.close
|
client.close
|
||||||
Async::Task.current.stop
|
Async::Task.current.stop # Stop the current task
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def handle_input(line, connections) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
|
||||||
|
# Split the line into command and args
|
||||||
|
command, *args = line.split
|
||||||
|
|
||||||
|
case command
|
||||||
|
when 'NEW_PROCESSOR'
|
||||||
|
# Handle new processor registration
|
||||||
|
id, domain, port = args
|
||||||
|
|
||||||
|
# Create a new connection to the processor
|
||||||
|
Async do
|
||||||
|
processor_connection = create_socket(domain, port)
|
||||||
|
|
||||||
|
# Store the connection in the hash
|
||||||
|
connections[id] = processor_connection
|
||||||
|
|
||||||
|
# Add the processor to the database
|
||||||
|
processors.insert(consumer_id: id, ip: domain, port: port)
|
||||||
|
end
|
||||||
|
|
||||||
|
when 'REQUEST'
|
||||||
|
# Handle request from orchestrator
|
||||||
|
consumer_id, request_type, pcap_chunk_id = args
|
||||||
|
|
||||||
|
# Get the connection for this consumer
|
||||||
|
connection = connections[consumer_id]
|
||||||
|
|
||||||
|
# Send the request to the consumer
|
||||||
|
connection.puts "REQUEST #{request_type} #{pcap_chunk_id}"
|
||||||
|
|
||||||
|
when 'UPSTATE'
|
||||||
|
# Handle upstate from consumer
|
||||||
|
consumer_id, state, pcap_chunk_id = args
|
||||||
|
|
||||||
|
# Do something with the state update...
|
||||||
|
# For now, we'll just print it out
|
||||||
|
puts "Consumer #{consumer_id} is now #{state} for chunk #{pcap_chunk_id}"
|
||||||
|
|
||||||
|
when 'FINISHED'
|
||||||
|
# Handle finished from consumer
|
||||||
|
consumer_id, pcap_chunk_id = args
|
||||||
|
|
||||||
|
# Do something with the finished message...
|
||||||
|
# For now, we'll just print it out
|
||||||
|
puts "Consumer #{consumer_id} has finished chunk #{pcap_chunk_id}"
|
||||||
|
|
||||||
|
when 'CHANGE'
|
||||||
|
# Handle change from orchestrator
|
||||||
|
consumer_id, state, urgency = args
|
||||||
|
|
||||||
|
# Get the connection for this consumer
|
||||||
|
connection = connections[consumer_id]
|
||||||
|
|
||||||
|
# Send the change to the consumer
|
||||||
|
connection.puts "CHANGE #{state} #{urgency}"
|
||||||
|
|
||||||
|
when 'SHUTDOWN'
|
||||||
|
# Handle shutdown from orchestrator
|
||||||
|
consumer_id, urgency = args
|
||||||
|
|
||||||
|
# Get the connection for this consumer
|
||||||
|
connection = connections[consumer_id]
|
||||||
|
|
||||||
|
# Send the shutdown to the consumer
|
||||||
|
connection.puts "SHUTDOWN #{urgency}"
|
||||||
|
|
||||||
|
else
|
||||||
|
puts "Unknown command: #{command}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
|
@ -2,26 +2,25 @@ GEM
|
||||||
remote: https://rubygems.org/
|
remote: https://rubygems.org/
|
||||||
specs:
|
specs:
|
||||||
ast (2.4.2)
|
ast (2.4.2)
|
||||||
atk (4.2.0)
|
atk (4.1.8)
|
||||||
glib2 (= 4.2.0)
|
glib2 (= 4.1.8)
|
||||||
backport (1.2.0)
|
backport (1.2.0)
|
||||||
base64 (0.1.1)
|
base64 (0.1.1)
|
||||||
benchmark (0.2.1)
|
benchmark (0.2.1)
|
||||||
bigdecimal (3.1.4)
|
|
||||||
cairo (1.17.12)
|
cairo (1.17.12)
|
||||||
native-package-installer (>= 1.0.3)
|
native-package-installer (>= 1.0.3)
|
||||||
pkg-config (>= 1.2.2)
|
pkg-config (>= 1.2.2)
|
||||||
red-colors
|
red-colors
|
||||||
cairo-gobject (4.2.0)
|
cairo-gobject (4.1.8)
|
||||||
cairo (>= 1.16.2)
|
cairo (>= 1.16.2)
|
||||||
glib2 (= 4.2.0)
|
glib2 (= 4.1.8)
|
||||||
console (1.23.2)
|
console (1.18.0)
|
||||||
fiber-annotation
|
fiber-annotation
|
||||||
fiber-local
|
fiber-local
|
||||||
curses (1.4.4)
|
curses (1.4.4)
|
||||||
diff-lcs (1.5.0)
|
diff-lcs (1.5.0)
|
||||||
dotenv (2.8.1)
|
dotenv (2.8.1)
|
||||||
dynamic_curses_input (1.2.1)
|
dynamic_curses_input (1.1.0)
|
||||||
curses
|
curses
|
||||||
reline
|
reline
|
||||||
e2mmap (0.1.0)
|
e2mmap (0.1.0)
|
||||||
|
@ -34,23 +33,23 @@ GEM
|
||||||
path_expander (~> 1.0)
|
path_expander (~> 1.0)
|
||||||
ruby_parser (~> 3.0)
|
ruby_parser (~> 3.0)
|
||||||
sexp_processor (~> 4.0)
|
sexp_processor (~> 4.0)
|
||||||
gdk3 (4.2.0)
|
gdk3 (4.1.8)
|
||||||
cairo-gobject (= 4.2.0)
|
cairo-gobject (= 4.1.8)
|
||||||
gdk_pixbuf2 (= 4.2.0)
|
gdk_pixbuf2 (= 4.1.8)
|
||||||
pango (= 4.2.0)
|
pango (= 4.1.8)
|
||||||
gdk_pixbuf2 (4.2.0)
|
gdk_pixbuf2 (4.1.8)
|
||||||
gio2 (= 4.2.0)
|
gio2 (= 4.1.8)
|
||||||
gio2 (4.2.0)
|
gio2 (4.1.8)
|
||||||
fiddle
|
fiddle
|
||||||
gobject-introspection (= 4.2.0)
|
gobject-introspection (= 4.1.8)
|
||||||
glib2 (4.2.0)
|
glib2 (4.1.8)
|
||||||
native-package-installer (>= 1.0.3)
|
native-package-installer (>= 1.0.3)
|
||||||
pkg-config (>= 1.3.5)
|
pkg-config (>= 1.3.5)
|
||||||
gobject-introspection (4.2.0)
|
gobject-introspection (4.1.8)
|
||||||
glib2 (= 4.2.0)
|
glib2 (= 4.1.8)
|
||||||
gtk3 (4.2.0)
|
gtk3 (4.1.8)
|
||||||
atk (= 4.2.0)
|
atk (= 4.1.8)
|
||||||
gdk3 (= 4.2.0)
|
gdk3 (= 4.1.8)
|
||||||
io-console (0.6.0)
|
io-console (0.6.0)
|
||||||
jaro_winkler (1.5.6)
|
jaro_winkler (1.5.6)
|
||||||
json (2.6.3)
|
json (2.6.3)
|
||||||
|
@ -63,21 +62,21 @@ GEM
|
||||||
matrix (0.4.2)
|
matrix (0.4.2)
|
||||||
mysql2 (0.5.5)
|
mysql2 (0.5.5)
|
||||||
native-package-installer (1.1.8)
|
native-package-installer (1.1.8)
|
||||||
nokogiri (1.15.4-x86_64-linux)
|
nokogiri (1.15.3-x86_64-linux)
|
||||||
racc (~> 1.4)
|
racc (~> 1.4)
|
||||||
openssl (3.1.0)
|
openssl (3.1.0)
|
||||||
packetfu (2.0.0)
|
packetfu (2.0.0)
|
||||||
pcaprub (~> 0.13.1)
|
pcaprub (~> 0.13.1)
|
||||||
pango (4.2.0)
|
pango (4.1.8)
|
||||||
cairo-gobject (= 4.2.0)
|
cairo-gobject (= 4.1.8)
|
||||||
gobject-introspection (= 4.2.0)
|
gobject-introspection (= 4.1.8)
|
||||||
parallel (1.23.0)
|
parallel (1.23.0)
|
||||||
parser (3.2.2.3)
|
parser (3.2.2.3)
|
||||||
ast (~> 2.4.1)
|
ast (~> 2.4.1)
|
||||||
racc
|
racc
|
||||||
path_expander (1.1.1)
|
path_expander (1.1.1)
|
||||||
pcaprub (0.13.1)
|
pcaprub (0.13.1)
|
||||||
pkg-config (1.5.5)
|
pkg-config (1.5.2)
|
||||||
racc (1.7.1)
|
racc (1.7.1)
|
||||||
rainbow (3.1.1)
|
rainbow (3.1.1)
|
||||||
rbs (2.8.4)
|
rbs (2.8.4)
|
||||||
|
@ -88,13 +87,12 @@ GEM
|
||||||
parser (~> 3.2.0)
|
parser (~> 3.2.0)
|
||||||
rainbow (>= 2.0, < 4.0)
|
rainbow (>= 2.0, < 4.0)
|
||||||
regexp_parser (2.8.1)
|
regexp_parser (2.8.1)
|
||||||
reline (0.3.8)
|
reline (0.3.7)
|
||||||
io-console (~> 0.5)
|
io-console (~> 0.5)
|
||||||
reverse_markdown (2.1.1)
|
reverse_markdown (2.1.1)
|
||||||
nokogiri
|
nokogiri
|
||||||
rexml (3.2.6)
|
rexml (3.2.6)
|
||||||
rubocop (1.56.3)
|
rubocop (1.55.0)
|
||||||
base64 (~> 0.1.1)
|
|
||||||
json (~> 2.3)
|
json (~> 2.3)
|
||||||
language_server-protocol (>= 3.17.0)
|
language_server-protocol (>= 3.17.0)
|
||||||
parallel (~> 1.10)
|
parallel (~> 1.10)
|
||||||
|
@ -111,8 +109,7 @@ GEM
|
||||||
ruby_parser (3.20.3)
|
ruby_parser (3.20.3)
|
||||||
sexp_processor (~> 4.16)
|
sexp_processor (~> 4.16)
|
||||||
securerandom (0.2.2)
|
securerandom (0.2.2)
|
||||||
sequel (5.72.0)
|
sequel (5.70.0)
|
||||||
bigdecimal
|
|
||||||
sexp_processor (4.17.0)
|
sexp_processor (4.17.0)
|
||||||
solargraph (0.49.0)
|
solargraph (0.49.0)
|
||||||
backport (~> 1.2)
|
backport (~> 1.2)
|
||||||
|
@ -131,7 +128,7 @@ GEM
|
||||||
tilt (~> 2.0)
|
tilt (~> 2.0)
|
||||||
yard (~> 0.9, >= 0.9.24)
|
yard (~> 0.9, >= 0.9.24)
|
||||||
thor (1.2.2)
|
thor (1.2.2)
|
||||||
tilt (2.3.0)
|
tilt (2.2.0)
|
||||||
tracer (0.2.2)
|
tracer (0.2.2)
|
||||||
unicode-display_width (2.4.2)
|
unicode-display_width (2.4.2)
|
||||||
yaml (0.2.1)
|
yaml (0.2.1)
|
||||||
|
@ -161,4 +158,4 @@ DEPENDENCIES
|
||||||
yaml (~> 0.2.1)
|
yaml (~> 0.2.1)
|
||||||
|
|
||||||
BUNDLED WITH
|
BUNDLED WITH
|
||||||
2.4.10
|
2.4.16
|
||||||
|
|
BIN
lib/bin/gopls
BIN
lib/bin/gopls
Binary file not shown.
|
@ -1,5 +1,5 @@
|
||||||
go.sum database tree
|
go.sum database tree
|
||||||
18982882
|
18772708
|
||||||
5hq5RwlCLIiagi6hZScdHlumu5XIpITikSz2+tAr9LA=
|
zUQL+Z/7glb/JRJWTIOwKpTurAD4UkqBoSM/R6Wxx1U=
|
||||||
|
|
||||||
— sum.golang.org Az3grhw2ZJlhe/saIZhYrbHraYSRxzcFocBkNuDAACA6cT3QTOA1P1fa+rhYfXNq2KVoROv5wGw5Y/2naw4QGoEoYAE=
|
— sum.golang.org Az3grlYd+7jVJcF9/zAb4Pytelt3fRZySKTY7/bjVB7mPom5EO6a2ir09DWokk6RPoUjmc7yd4zez+i6mUr2sYCddg8=
|
||||||
|
|
Loading…
Reference in New Issue
Block a user