diff --git a/docker/netrave-protohandler/Gemfile b/docker/netrave-protohandler/Gemfile index 75b89af..14edeeb 100644 --- a/docker/netrave-protohandler/Gemfile +++ b/docker/netrave-protohandler/Gemfile @@ -3,4 +3,6 @@ source 'https://rubygems.org' gem 'async' +gem 'openssl' gem 'sequel' +gem 'socket' diff --git a/docker/netrave-protohandler/netrave_protohandler.rb b/docker/netrave-protohandler/netrave_protohandler.rb index 82b217e..1d8f643 100644 --- a/docker/netrave-protohandler/netrave_protohandler.rb +++ b/docker/netrave-protohandler/netrave_protohandler.rb @@ -4,31 +4,52 @@ require 'socket' require 'async' require 'sequel' require 'openssl' +require 'securerandom' # Set up the database DB = Sequel.sqlite # In-memory database + +# Create processors table +DB.create_table :processors do + primary_key :proto_handler_id + String :uuid + String :domain + Integer :port +end + +# Create orchestrators table +DB.create_table :orchestrators do + primary_key :id + String :domain + Integer :port +end + +# Create blacklist table +DB.create_table :blacklist do + primary_key :id + String :uuid +end + processors = DB[:processors] +orchestrators = DB[:orchestrators] +blacklist = DB[:blacklist] listen_ip = ENV['LISTEN_IP'] || '0.0.0.0' listen_port = ENV['LISTEN_PORT'] || 3080 server = TCPServer.new(listen_ip, listen_port) -# This hash will store the connections to the consumers -connections = {} +# This hash will store the recently connected UUIDs with their last validation timestamp +recently_connected = {} def create_socket(ip, port) # rubocop:disable Metrics/MethodLength - # If the IP address is set to "loopback", replace it with the actual loopback IP address ip = '127.0.0.1' if ip.downcase == 'loopback' - if ip =~ Resolv::IPv4::Regex - # If the IP address is an IPv4 address, create an unencrypted socket TCPSocket.new(ip, port) else - # If the IP address is a domain name, create an SSL socket ssl_context = OpenSSL::SSL::SSLContext.new ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER - + # Add certificate and key configuration here if needed tcp_socket = TCPSocket.new(ip, port) ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) ssl_socket.sync_close = true @@ -37,92 +58,75 @@ def create_socket(ip, port) # rubocop:disable Metrics/MethodLength end end +def handle_input(line, processors, orchestrators, blacklist, recently_connected) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity + command, *args = line.split + uuid = args.shift if command != 'REGISTER' + + # Check if the UUID is blacklisted + return 'TERMINATE' if blacklist.where(uuid:).count.positive? + + # Check if the UUID is in the recently connected cache + if recently_connected[uuid] && Time.now - recently_connected[uuid] < 120 + # UUID is recently connected and within 2 minutes, no need to re-validate + elsif command != 'REGISTER' && (uuid.nil? || processors.where(uuid:).count.zero?) + # UUID is not recently connected or is invalid, add to blacklist + blacklist.insert(uuid:) + return 'ERROR Unrecognized UUID' + else + # UUID is valid, update the recently connected cache + recently_connected[uuid] = Time.now + end + + handle_command(command, args, processors, orchestrators) +end + +def handle_command(command, args, processors, orchestrators) # rubocop:disable Metrics/MethodLength + if command.start_with?('ORCHESTRATOR') + orchestrator_command = command.split('_', 2).last + orchestrator = orchestrators.first # Assuming you want to pass to the first available orchestrator + case orchestrator_command + when 'REQUEST', 'UPSTATE', 'FINISHED' + orchestrator.puts "#{orchestrator_command} #{args.join(' ')}" + else + puts "Unknown orchestrator command: #{orchestrator_command}" + end + else + consumer = processors.where(uuid: args.last).first # Assuming UUID is the last argument for CHANGE and SHUTDOWN + case command + when 'CHANGE', 'SHUTDOWN' + consumer.puts "#{command} #{args.join(' ')}" + else + puts "Unknown command: #{command}" + end + end +end + +def register_orchestrator(line, orchestrators) + _, domain, port = line.split + id = orchestrators.max(:id).to_i + 1 + orchestrators.insert(id:, domain:, port:) + puts "Orchestrator registered with domain: #{domain}, port: #{port}" +end + Async do loop do Async::Task.new do client = server.accept begin - while (line = client.gets) + line = client.gets + # Determine if the connection is from an orchestrator for registration + if is_orchestrator_registration?(line) + register_orchestrator(line, orchestrators) + else # Here we handle each line of input from the client - handle_input(line, connections) + handle_input(line, processors, blacklist, recently_connected) end ensure # This code will be executed when the fiber is finished, regardless of whether an exception was raised client.close - Async::Task.current.stop # Stop the current task + Async::Task.current.stop end end end end - -def handle_input(line, connections) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - # Split the line into command and args - command, *args = line.split - - case command - when 'NEW_PROCESSOR' - # Handle new processor registration - id, domain, port = args - - # Create a new connection to the processor - Async do - processor_connection = create_socket(domain, port) - - # Store the connection in the hash - connections[id] = processor_connection - - # Add the processor to the database - processors.insert(consumer_id: id, ip: domain, port: port) - end - - when 'REQUEST' - # Handle request from orchestrator - consumer_id, request_type, pcap_chunk_id = args - - # Get the connection for this consumer - connection = connections[consumer_id] - - # Send the request to the consumer - connection.puts "REQUEST #{request_type} #{pcap_chunk_id}" - - when 'UPSTATE' - # Handle upstate from consumer - consumer_id, state, pcap_chunk_id = args - - # Do something with the state update... - # For now, we'll just print it out - puts "Consumer #{consumer_id} is now #{state} for chunk #{pcap_chunk_id}" - - when 'FINISHED' - # Handle finished from consumer - consumer_id, pcap_chunk_id = args - - # Do something with the finished message... - # For now, we'll just print it out - puts "Consumer #{consumer_id} has finished chunk #{pcap_chunk_id}" - - when 'CHANGE' - # Handle change from orchestrator - consumer_id, state, urgency = args - - # Get the connection for this consumer - connection = connections[consumer_id] - - # Send the change to the consumer - connection.puts "CHANGE #{state} #{urgency}" - - when 'SHUTDOWN' - # Handle shutdown from orchestrator - consumer_id, urgency = args - - # Get the connection for this consumer - connection = connections[consumer_id] - - # Send the shutdown to the consumer - connection.puts "SHUTDOWN #{urgency}" - - else - puts "Unknown command: #{command}" - end -end diff --git a/lib/Gemfile.lock b/lib/Gemfile.lock index 7ae4793..731569f 100644 --- a/lib/Gemfile.lock +++ b/lib/Gemfile.lock @@ -2,25 +2,26 @@ GEM remote: https://rubygems.org/ specs: ast (2.4.2) - atk (4.1.8) - glib2 (= 4.1.8) + atk (4.2.0) + glib2 (= 4.2.0) backport (1.2.0) base64 (0.1.1) benchmark (0.2.1) + bigdecimal (3.1.4) cairo (1.17.12) native-package-installer (>= 1.0.3) pkg-config (>= 1.2.2) red-colors - cairo-gobject (4.1.8) + cairo-gobject (4.2.0) cairo (>= 1.16.2) - glib2 (= 4.1.8) - console (1.18.0) + glib2 (= 4.2.0) + console (1.23.2) fiber-annotation fiber-local curses (1.4.4) diff-lcs (1.5.0) dotenv (2.8.1) - dynamic_curses_input (1.1.0) + dynamic_curses_input (1.2.1) curses reline e2mmap (0.1.0) @@ -33,23 +34,23 @@ GEM path_expander (~> 1.0) ruby_parser (~> 3.0) sexp_processor (~> 4.0) - gdk3 (4.1.8) - cairo-gobject (= 4.1.8) - gdk_pixbuf2 (= 4.1.8) - pango (= 4.1.8) - gdk_pixbuf2 (4.1.8) - gio2 (= 4.1.8) - gio2 (4.1.8) + gdk3 (4.2.0) + cairo-gobject (= 4.2.0) + gdk_pixbuf2 (= 4.2.0) + pango (= 4.2.0) + gdk_pixbuf2 (4.2.0) + gio2 (= 4.2.0) + gio2 (4.2.0) fiddle - gobject-introspection (= 4.1.8) - glib2 (4.1.8) + gobject-introspection (= 4.2.0) + glib2 (4.2.0) native-package-installer (>= 1.0.3) pkg-config (>= 1.3.5) - gobject-introspection (4.1.8) - glib2 (= 4.1.8) - gtk3 (4.1.8) - atk (= 4.1.8) - gdk3 (= 4.1.8) + gobject-introspection (4.2.0) + glib2 (= 4.2.0) + gtk3 (4.2.0) + atk (= 4.2.0) + gdk3 (= 4.2.0) io-console (0.6.0) jaro_winkler (1.5.6) json (2.6.3) @@ -62,21 +63,21 @@ GEM matrix (0.4.2) mysql2 (0.5.5) native-package-installer (1.1.8) - nokogiri (1.15.3-x86_64-linux) + nokogiri (1.15.4-x86_64-linux) racc (~> 1.4) openssl (3.1.0) packetfu (2.0.0) pcaprub (~> 0.13.1) - pango (4.1.8) - cairo-gobject (= 4.1.8) - gobject-introspection (= 4.1.8) + pango (4.2.0) + cairo-gobject (= 4.2.0) + gobject-introspection (= 4.2.0) parallel (1.23.0) parser (3.2.2.3) ast (~> 2.4.1) racc path_expander (1.1.1) pcaprub (0.13.1) - pkg-config (1.5.2) + pkg-config (1.5.5) racc (1.7.1) rainbow (3.1.1) rbs (2.8.4) @@ -87,12 +88,13 @@ GEM parser (~> 3.2.0) rainbow (>= 2.0, < 4.0) regexp_parser (2.8.1) - reline (0.3.7) + reline (0.3.8) io-console (~> 0.5) reverse_markdown (2.1.1) nokogiri rexml (3.2.6) - rubocop (1.55.0) + rubocop (1.56.3) + base64 (~> 0.1.1) json (~> 2.3) language_server-protocol (>= 3.17.0) parallel (~> 1.10) @@ -109,7 +111,8 @@ GEM ruby_parser (3.20.3) sexp_processor (~> 4.16) securerandom (0.2.2) - sequel (5.70.0) + sequel (5.72.0) + bigdecimal sexp_processor (4.17.0) solargraph (0.49.0) backport (~> 1.2) @@ -128,7 +131,7 @@ GEM tilt (~> 2.0) yard (~> 0.9, >= 0.9.24) thor (1.2.2) - tilt (2.2.0) + tilt (2.3.0) tracer (0.2.2) unicode-display_width (2.4.2) yaml (0.2.1) @@ -158,4 +161,4 @@ DEPENDENCIES yaml (~> 0.2.1) BUNDLED WITH - 2.4.16 + 2.4.10 diff --git a/lib/bin/gopls b/lib/bin/gopls new file mode 100755 index 0000000..76bb18c Binary files /dev/null and b/lib/bin/gopls differ diff --git a/lib/pkg/sumdb/sum.golang.org/latest b/lib/pkg/sumdb/sum.golang.org/latest index 7473cbc..5749fc7 100644 --- a/lib/pkg/sumdb/sum.golang.org/latest +++ b/lib/pkg/sumdb/sum.golang.org/latest @@ -1,5 +1,5 @@ go.sum database tree -18772708 -zUQL+Z/7glb/JRJWTIOwKpTurAD4UkqBoSM/R6Wxx1U= +18982882 +5hq5RwlCLIiagi6hZScdHlumu5XIpITikSz2+tAr9LA= -— sum.golang.org Az3grlYd+7jVJcF9/zAb4Pytelt3fRZySKTY7/bjVB7mPom5EO6a2ir09DWokk6RPoUjmc7yd4zez+i6mUr2sYCddg8= +— sum.golang.org Az3grhw2ZJlhe/saIZhYrbHraYSRxzcFocBkNuDAACA6cT3QTOA1P1fa+rhYfXNq2KVoROv5wGw5Y/2naw4QGoEoYAE=