# frozen_string_literal: true require 'socket' require 'async' require 'openssl' require 'securerandom' require_relative 'db/protohandler_dbmanager' # main class for the protocol handler server class ProtoServer attr_reader :listen_ip, :listen_port, :db_manager def initialize(listen_ip, listen_port) @listen_ip = listen_ip @listen_port = listen_port @db_manager = ProtohandlerDBManager.new end end # Initialize server with config config = ServerConfig.new('0.0.0.0', 3080) server = TCPServer.new(config.listen_ip, config.listen_port) # Initialize database tables db_manager = config.db_manager processors = db_manager.db[:processors] orchestrators = db_manager.db[:orchestrators] blacklist = db_manager.db[:blacklist] # This hash will store the recently connected UUIDs with their last validation timestamp recently_connected = {} def create_socket(ip, port) # rubocop:disable Metrics/MethodLength ip = '127.0.0.1' if ip.downcase == 'loopback' if ip =~ Resolv::IPv4::Regex TCPSocket.new(ip, port) else ssl_context = OpenSSL::SSL::SSLContext.new ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER # Add certificate and key configuration here if needed tcp_socket = TCPSocket.new(ip, port) ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) ssl_socket.sync_close = true ssl_socket.connect ssl_socket end end # Check if the UUID is blacklisted def blacklisted?(uuid, blacklist) blacklist.where(uuid:).count.positive? end # Validate if the UUID has recently connected def recently_connected?(uuid, recently_connected) recently_connected[uuid] && Time.now - recently_connected[uuid] < 120 end # Handle unregistered UUIDs def handle_unregistered_uuid(uuid, processors, blacklist) if uuid.nil? || processors.where(uuid:).count.zero? blacklist.insert(uuid:) return 'ERROR Unrecognized UUID' end nil end def handle_input(line, processors, orchestrators, blacklist, recently_connected) command, *args = line.split uuid = args.shift if command != 'REGISTER' return 'TERMINATE' if blacklisted?(uuid, blacklist) if recently_connected?(uuid, recently_connected) # UUID is recently connected and within 2 minutes, no need to re-validate else unregistered_response = handle_unregistered_uuid(uuid, processors, blacklist) return unregistered_response if unregistered_response # UUID is valid, update the recently connected cache recently_connected[uuid] = Time.now end handle_command(command, args, processors, orchestrators) end def handle_command(command, args, processors, orchestrators) # rubocop:disable Metrics/MethodLength if command.start_with?('ORCHESTRATOR') orchestrator_command = command.split('_', 2).last orchestrator = orchestrators.first # Assuming you want to pass to the first available orchestrator case orchestrator_command when 'REQUEST', 'UPSTATE', 'FINISHED' orchestrator.puts "#{orchestrator_command} #{args.join(' ')}" else puts "Unknown orchestrator command: #{orchestrator_command}" end else consumer = processors.where(uuid: args.last).first # Assuming UUID is the last argument for CHANGE and SHUTDOWN case command when 'CHANGE', 'SHUTDOWN' consumer.puts "#{command} #{args.join(' ')}" else puts "Unknown command: #{command}" end end end def register_orchestrator(line, orchestrators) _, domain, port = line.split id = orchestrators.max(:id).to_i + 1 orchestrators.insert(id:, domain:, port:) puts "Orchestrator registered with domain: #{domain}, port: #{port}" end # Main Async loop Async do loop do Async::Task.new do client = server.accept begin line = client.gets # Determine if the connection is from an orchestrator for registration if is_orchestrator_registration?(line) register_orchestrator(line, orchestrators) else # Here we handle each line of input from the client handle_input(line, processors, orchestrators, blacklist, recently_connected) end ensure # This code will be executed when the fiber is finished, regardless of whether an exception was raised client.close Async::Task.current.stop end end end end