diff --git a/docker/netrave-protohandler/netrave_protohandler.rb b/docker/netrave-protohandler/netrave_protohandler.rb index 7ee1eb1..1d8f643 100644 --- a/docker/netrave-protohandler/netrave_protohandler.rb +++ b/docker/netrave-protohandler/netrave_protohandler.rb @@ -49,6 +49,7 @@ def create_socket(ip, port) # rubocop:disable Metrics/MethodLength else ssl_context = OpenSSL::SSL::SSLContext.new ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER + # Add certificate and key configuration here if needed tcp_socket = TCPSocket.new(ip, port) ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) ssl_socket.sync_close = true @@ -57,96 +58,51 @@ def create_socket(ip, port) # rubocop:disable Metrics/MethodLength end end -def handle_input(line, connections, consumer_uuids, blacklisted_ips, client, processors, blacklist, recently_connected) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/ParameterLists +def handle_input(line, processors, orchestrators, blacklist, recently_connected) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity command, *args = line.split uuid = args.shift if command != 'REGISTER' # Check if the UUID is blacklisted - if blacklisted_ips.include?(client.peeraddr[3]) || blacklist.where(uuid:).count.positive? - client.puts 'TERMINATE' - return - end + return 'TERMINATE' if blacklist.where(uuid:).count.positive? # Check if the UUID is in the recently connected cache if recently_connected[uuid] && Time.now - recently_connected[uuid] < 120 # UUID is recently connected and within 2 minutes, no need to re-validate - elsif command != 'REGISTER' && (uuid.nil? || !consumer_uuids.values.include?(uuid)) + elsif command != 'REGISTER' && (uuid.nil? || processors.where(uuid:).count.zero?) # UUID is not recently connected or is invalid, add to blacklist - blacklisted_ips.add(client.peeraddr[3]) blacklist.insert(uuid:) - client.puts 'ERROR Unrecognized UUID' - return + return 'ERROR Unrecognized UUID' else # UUID is valid, update the recently connected cache recently_connected[uuid] = Time.now end - case command - when 'REGISTER' - id = processors.max(:proto_handler_id).to_i + 1 - domain, port = args - uuid = SecureRandom.uuid - client.puts "UUID #{uuid}" - consumer_uuids[id] = uuid - Async do - processor_connection = create_socket(domain, port) - connections[id] = processor_connection - processors.insert(proto_handler_id: id, uuid:, domain:, port:) + handle_command(command, args, processors, orchestrators) +end + +def handle_command(command, args, processors, orchestrators) # rubocop:disable Metrics/MethodLength + if command.start_with?('ORCHESTRATOR') + orchestrator_command = command.split('_', 2).last + orchestrator = orchestrators.first # Assuming you want to pass to the first available orchestrator + case orchestrator_command + when 'REQUEST', 'UPSTATE', 'FINISHED' + orchestrator.puts "#{orchestrator_command} #{args.join(' ')}" + else + puts "Unknown orchestrator command: #{orchestrator_command}" end - - when 'REQUEST' - # Handle request from orchestrator - consumer_id, request_type, pcap_chunk_id = args - - # Get the connection for this consumer - connection = connections[consumer_id] - - # Send the request to the consumer - connection.puts "REQUEST #{request_type} #{pcap_chunk_id}" - - when 'UPSTATE' - # Handle upstate from consumer - consumer_id, state, pcap_chunk_id = args - - # Do something with the state update... - # For now, we'll just print it out - puts "Consumer #{consumer_id} is now #{state} for chunk #{pcap_chunk_id}" - - when 'FINISHED' - # Handle finished from consumer - consumer_id, pcap_chunk_id = args - - # Do something with the finished message... - # For now, we'll just print it out - puts "Consumer #{consumer_id} has finished chunk #{pcap_chunk_id}" - - when 'CHANGE' - # Handle change from orchestrator - consumer_id, state, urgency = args - - # Get the connection for this consumer - connection = connections[consumer_id] - - # Send the change to the consumer - connection.puts "CHANGE #{state} #{urgency}" - - when 'SHUTDOWN' - # Handle shutdown from orchestrator - consumer_id, urgency = args - - # Get the connection for this consumer - connection = connections[consumer_id] - - # Send the shutdown to the consumer - connection.puts "SHUTDOWN #{urgency}" - else - puts "Unknown command: #{command}" + consumer = processors.where(uuid: args.last).first # Assuming UUID is the last argument for CHANGE and SHUTDOWN + case command + when 'CHANGE', 'SHUTDOWN' + consumer.puts "#{command} #{args.join(' ')}" + else + puts "Unknown command: #{command}" + end end end def register_orchestrator(line, orchestrators) - domain, port = line.split + _, domain, port = line.split id = orchestrators.max(:id).to_i + 1 orchestrators.insert(id:, domain:, port:) puts "Orchestrator registered with domain: #{domain}, port: #{port}" @@ -160,11 +116,11 @@ Async do begin line = client.gets # Determine if the connection is from an orchestrator for registration - if is_orchestrator_registration?(line) # Define this method to identify orchestrator registration + if is_orchestrator_registration?(line) register_orchestrator(line, orchestrators) else # Here we handle each line of input from the client - handle_input(line, client, processors, blacklist, recently_connected) # Pass the variables here + handle_input(line, processors, blacklist, recently_connected) end ensure # This code will be executed when the fiber is finished, regardless of whether an exception was raised