5df588674e
So, let's get into it. First, we are straying from the in-memory ideation of storing all our pcap chunks in the memory I fear that we might bottle up the ram if we depend solely on it. So, i've made things default to sqlite flatfile, with the option for both remote and in memory This way, dependant on the users needs, they can use a different implementation. That's why the creation of a great deal of files and an entire new directory space I've almost finished implementing the database system into itself, but I want the DB system to be largely independent of the rest of the system. Because, you know me, I am a fan of encapsulation. and I like making software coded to be independent. Yes, if the rest of the system isn't working, the database has nothing to docker but at least it makes the system more robust, if even a major system can collapse, and the system as a whole remains functional. Philisophically, I find this is largely lacking in modern software, and I believe that these practices will see the protohandler performing much better than we would expect Since this system as a whole will be independent of the main NETRAVE system, it opens the door for further expanding, or perhaps even completely repurposing this protocol going forward.
138 lines
4.2 KiB
Ruby
138 lines
4.2 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require 'socket'
|
|
require 'async'
|
|
require 'openssl'
|
|
require 'securerandom'
|
|
require_relative 'db/protohandler_dbmanager'
|
|
|
|
# main class for the protocol handler server
|
|
class ProtoServer
|
|
attr_reader :listen_ip, :listen_port, :db_manager
|
|
|
|
def initialize(listen_ip, listen_port)
|
|
@listen_ip = listen_ip
|
|
@listen_port = listen_port
|
|
@db_manager = ProtohandlerDBManager.new
|
|
end
|
|
end
|
|
|
|
# Initialize server with config
|
|
config = ServerConfig.new('0.0.0.0', 3080)
|
|
server = TCPServer.new(config.listen_ip, config.listen_port)
|
|
|
|
# Initialize database tables
|
|
db_manager = config.db_manager
|
|
processors = db_manager.db[:processors]
|
|
orchestrators = db_manager.db[:orchestrators]
|
|
blacklist = db_manager.db[:blacklist]
|
|
|
|
# This hash will store the recently connected UUIDs with their last validation timestamp
|
|
recently_connected = {}
|
|
|
|
def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
|
|
ip = '127.0.0.1' if ip.downcase == 'loopback'
|
|
if ip =~ Resolv::IPv4::Regex
|
|
TCPSocket.new(ip, port)
|
|
else
|
|
ssl_context = OpenSSL::SSL::SSLContext.new
|
|
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
|
|
# Add certificate and key configuration here if needed
|
|
tcp_socket = TCPSocket.new(ip, port)
|
|
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
|
|
ssl_socket.sync_close = true
|
|
ssl_socket.connect
|
|
ssl_socket
|
|
end
|
|
end
|
|
|
|
# Check if the UUID is blacklisted
|
|
def blacklisted?(uuid, blacklist)
|
|
blacklist.where(uuid:).count.positive?
|
|
end
|
|
|
|
# Validate if the UUID has recently connected
|
|
def recently_connected?(uuid, recently_connected)
|
|
recently_connected[uuid] && Time.now - recently_connected[uuid] < 120
|
|
end
|
|
|
|
# Handle unregistered UUIDs
|
|
def handle_unregistered_uuid(uuid, processors, blacklist)
|
|
if uuid.nil? || processors.where(uuid:).count.zero?
|
|
blacklist.insert(uuid:)
|
|
return 'ERROR Unrecognized UUID'
|
|
end
|
|
nil
|
|
end
|
|
|
|
def handle_input(line, processors, orchestrators, blacklist, recently_connected)
|
|
command, *args = line.split
|
|
uuid = args.shift if command != 'REGISTER'
|
|
|
|
return 'TERMINATE' if blacklisted?(uuid, blacklist)
|
|
|
|
if recently_connected?(uuid, recently_connected)
|
|
# UUID is recently connected and within 2 minutes, no need to re-validate
|
|
else
|
|
unregistered_response = handle_unregistered_uuid(uuid, processors, blacklist)
|
|
return unregistered_response if unregistered_response
|
|
|
|
# UUID is valid, update the recently connected cache
|
|
recently_connected[uuid] = Time.now
|
|
end
|
|
|
|
handle_command(command, args, processors, orchestrators)
|
|
end
|
|
|
|
def handle_command(command, args, processors, orchestrators) # rubocop:disable Metrics/MethodLength
|
|
if command.start_with?('ORCHESTRATOR')
|
|
orchestrator_command = command.split('_', 2).last
|
|
orchestrator = orchestrators.first # Assuming you want to pass to the first available orchestrator
|
|
case orchestrator_command
|
|
when 'REQUEST', 'UPSTATE', 'FINISHED'
|
|
orchestrator.puts "#{orchestrator_command} #{args.join(' ')}"
|
|
else
|
|
puts "Unknown orchestrator command: #{orchestrator_command}"
|
|
end
|
|
else
|
|
consumer = processors.where(uuid: args.last).first # Assuming UUID is the last argument for CHANGE and SHUTDOWN
|
|
case command
|
|
when 'CHANGE', 'SHUTDOWN'
|
|
consumer.puts "#{command} #{args.join(' ')}"
|
|
else
|
|
puts "Unknown command: #{command}"
|
|
end
|
|
end
|
|
end
|
|
|
|
def register_orchestrator(line, orchestrators)
|
|
_, domain, port = line.split
|
|
id = orchestrators.max(:id).to_i + 1
|
|
orchestrators.insert(id:, domain:, port:)
|
|
puts "Orchestrator registered with domain: #{domain}, port: #{port}"
|
|
end
|
|
|
|
# Main Async loop
|
|
Async do
|
|
loop do
|
|
Async::Task.new do
|
|
client = server.accept
|
|
|
|
begin
|
|
line = client.gets
|
|
# Determine if the connection is from an orchestrator for registration
|
|
if is_orchestrator_registration?(line)
|
|
register_orchestrator(line, orchestrators)
|
|
else
|
|
# Here we handle each line of input from the client
|
|
handle_input(line, processors, orchestrators, blacklist, recently_connected)
|
|
end
|
|
ensure
|
|
# This code will be executed when the fiber is finished, regardless of whether an exception was raised
|
|
client.close
|
|
Async::Task.current.stop
|
|
end
|
|
end
|
|
end
|
|
end
|