NETRAVE/docker/netrave-protohandler/netrave_protohandler.rb
VetheonGames 674aa5df91 Setup UUID generation for Consumers
Setup registration for orchestrators
make the proto_handler properly use it's in-memory DB for registrations
Ensure the Proto_handler goes both ways
Add a cache system so that consumers don't need to constantly re-register
Add a blacklist system to prevent unauthorized Consumers from getting access to the message queue
Add a ID assignment for each registered consumer to the proto_handler so we can keep track of them in the proto_handler
Add a async task to actually handle the input from the consumers
2023-08-10 15:01:48 -06:00

177 lines
5.1 KiB
Ruby

# frozen_string_literal: true
require 'socket'
require 'async'
require 'sequel'
require 'openssl'
require 'securerandom'
# Set up the database
DB = Sequel.sqlite # In-memory database
# Create processors table
DB.create_table :processors do
primary_key :proto_handler_id
String :uuid
String :domain
Integer :port
end
# Create orchestrators table
DB.create_table :orchestrators do
primary_key :id
String :domain
Integer :port
end
# Create blacklist table
DB.create_table :blacklist do
primary_key :id
String :uuid
end
processors = DB[:processors]
orchestrators = DB[:orchestrators]
blacklist = DB[:blacklist]
listen_ip = ENV['LISTEN_IP'] || '0.0.0.0'
listen_port = ENV['LISTEN_PORT'] || 3080
server = TCPServer.new(listen_ip, listen_port)
# This hash will store the recently connected UUIDs with their last validation timestamp
recently_connected = {}
def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
ip = '127.0.0.1' if ip.downcase == 'loopback'
if ip =~ Resolv::IPv4::Regex
TCPSocket.new(ip, port)
else
ssl_context = OpenSSL::SSL::SSLContext.new
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
tcp_socket = TCPSocket.new(ip, port)
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
ssl_socket.sync_close = true
ssl_socket.connect
ssl_socket
end
end
def handle_input(line, connections, consumer_uuids, blacklisted_ips, client, processors, blacklist, recently_connected) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/ParameterLists
command, *args = line.split
uuid = args.shift if command != 'REGISTER'
# Check if the UUID is blacklisted
if blacklisted_ips.include?(client.peeraddr[3]) || blacklist.where(uuid:).count.positive?
client.puts 'TERMINATE'
return
end
# Check if the UUID is in the recently connected cache
if recently_connected[uuid] && Time.now - recently_connected[uuid] < 120
# UUID is recently connected and within 2 minutes, no need to re-validate
elsif command != 'REGISTER' && (uuid.nil? || !consumer_uuids.values.include?(uuid))
# UUID is not recently connected or is invalid, add to blacklist
blacklisted_ips.add(client.peeraddr[3])
blacklist.insert(uuid:)
client.puts 'ERROR Unrecognized UUID'
return
else
# UUID is valid, update the recently connected cache
recently_connected[uuid] = Time.now
end
case command
when 'REGISTER'
id = processors.max(:proto_handler_id).to_i + 1
domain, port = args
uuid = SecureRandom.uuid
client.puts "UUID #{uuid}"
consumer_uuids[id] = uuid
Async do
processor_connection = create_socket(domain, port)
connections[id] = processor_connection
processors.insert(proto_handler_id: id, uuid:, domain:, port:)
end
when 'REQUEST'
# Handle request from orchestrator
consumer_id, request_type, pcap_chunk_id = args
# Get the connection for this consumer
connection = connections[consumer_id]
# Send the request to the consumer
connection.puts "REQUEST #{request_type} #{pcap_chunk_id}"
when 'UPSTATE'
# Handle upstate from consumer
consumer_id, state, pcap_chunk_id = args
# Do something with the state update...
# For now, we'll just print it out
puts "Consumer #{consumer_id} is now #{state} for chunk #{pcap_chunk_id}"
when 'FINISHED'
# Handle finished from consumer
consumer_id, pcap_chunk_id = args
# Do something with the finished message...
# For now, we'll just print it out
puts "Consumer #{consumer_id} has finished chunk #{pcap_chunk_id}"
when 'CHANGE'
# Handle change from orchestrator
consumer_id, state, urgency = args
# Get the connection for this consumer
connection = connections[consumer_id]
# Send the change to the consumer
connection.puts "CHANGE #{state} #{urgency}"
when 'SHUTDOWN'
# Handle shutdown from orchestrator
consumer_id, urgency = args
# Get the connection for this consumer
connection = connections[consumer_id]
# Send the shutdown to the consumer
connection.puts "SHUTDOWN #{urgency}"
else
puts "Unknown command: #{command}"
end
end
def register_orchestrator(line, orchestrators)
domain, port = line.split
id = orchestrators.max(:id).to_i + 1
orchestrators.insert(id:, domain:, port:)
puts "Orchestrator registered with domain: #{domain}, port: #{port}"
end
Async do
loop do
Async::Task.new do
client = server.accept
begin
line = client.gets
# Determine if the connection is from an orchestrator for registration
if is_orchestrator_registration?(line) # Define this method to identify orchestrator registration
register_orchestrator(line, orchestrators)
else
# Here we handle each line of input from the client
handle_input(line, client, processors, blacklist, recently_connected) # Pass the variables here
end
ensure
# This code will be executed when the fiber is finished, regardless of whether an exception was raised
client.close
Async::Task.current.stop
end
end
end
end