August 10th 2023 - Proto_handler update
Fix the proto_handler to separate request handling from input handling Make the proto_handler just pass messages between. We don't need to actually process anything Add some logic to determine if the request is coming from the orchestrator or consumer Some other various proto_handler fixes
This commit is contained in:
parent
674aa5df91
commit
140860e1ea
|
@ -49,6 +49,7 @@ def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
|
||||||
else
|
else
|
||||||
ssl_context = OpenSSL::SSL::SSLContext.new
|
ssl_context = OpenSSL::SSL::SSLContext.new
|
||||||
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
|
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
|
||||||
|
# Add certificate and key configuration here if needed
|
||||||
tcp_socket = TCPSocket.new(ip, port)
|
tcp_socket = TCPSocket.new(ip, port)
|
||||||
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
|
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
|
||||||
ssl_socket.sync_close = true
|
ssl_socket.sync_close = true
|
||||||
|
@ -57,96 +58,51 @@ def create_socket(ip, port) # rubocop:disable Metrics/MethodLength
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_input(line, connections, consumer_uuids, blacklisted_ips, client, processors, blacklist, recently_connected) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/ParameterLists
|
def handle_input(line, processors, orchestrators, blacklist, recently_connected) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
|
||||||
command, *args = line.split
|
command, *args = line.split
|
||||||
uuid = args.shift if command != 'REGISTER'
|
uuid = args.shift if command != 'REGISTER'
|
||||||
|
|
||||||
# Check if the UUID is blacklisted
|
# Check if the UUID is blacklisted
|
||||||
if blacklisted_ips.include?(client.peeraddr[3]) || blacklist.where(uuid:).count.positive?
|
return 'TERMINATE' if blacklist.where(uuid:).count.positive?
|
||||||
client.puts 'TERMINATE'
|
|
||||||
return
|
|
||||||
end
|
|
||||||
|
|
||||||
# Check if the UUID is in the recently connected cache
|
# Check if the UUID is in the recently connected cache
|
||||||
if recently_connected[uuid] && Time.now - recently_connected[uuid] < 120
|
if recently_connected[uuid] && Time.now - recently_connected[uuid] < 120
|
||||||
# UUID is recently connected and within 2 minutes, no need to re-validate
|
# UUID is recently connected and within 2 minutes, no need to re-validate
|
||||||
elsif command != 'REGISTER' && (uuid.nil? || !consumer_uuids.values.include?(uuid))
|
elsif command != 'REGISTER' && (uuid.nil? || processors.where(uuid:).count.zero?)
|
||||||
# UUID is not recently connected or is invalid, add to blacklist
|
# UUID is not recently connected or is invalid, add to blacklist
|
||||||
blacklisted_ips.add(client.peeraddr[3])
|
|
||||||
blacklist.insert(uuid:)
|
blacklist.insert(uuid:)
|
||||||
client.puts 'ERROR Unrecognized UUID'
|
return 'ERROR Unrecognized UUID'
|
||||||
return
|
|
||||||
else
|
else
|
||||||
# UUID is valid, update the recently connected cache
|
# UUID is valid, update the recently connected cache
|
||||||
recently_connected[uuid] = Time.now
|
recently_connected[uuid] = Time.now
|
||||||
end
|
end
|
||||||
|
|
||||||
case command
|
handle_command(command, args, processors, orchestrators)
|
||||||
when 'REGISTER'
|
end
|
||||||
id = processors.max(:proto_handler_id).to_i + 1
|
|
||||||
domain, port = args
|
def handle_command(command, args, processors, orchestrators) # rubocop:disable Metrics/MethodLength
|
||||||
uuid = SecureRandom.uuid
|
if command.start_with?('ORCHESTRATOR')
|
||||||
client.puts "UUID #{uuid}"
|
orchestrator_command = command.split('_', 2).last
|
||||||
consumer_uuids[id] = uuid
|
orchestrator = orchestrators.first # Assuming you want to pass to the first available orchestrator
|
||||||
Async do
|
case orchestrator_command
|
||||||
processor_connection = create_socket(domain, port)
|
when 'REQUEST', 'UPSTATE', 'FINISHED'
|
||||||
connections[id] = processor_connection
|
orchestrator.puts "#{orchestrator_command} #{args.join(' ')}"
|
||||||
processors.insert(proto_handler_id: id, uuid:, domain:, port:)
|
else
|
||||||
|
puts "Unknown orchestrator command: #{orchestrator_command}"
|
||||||
end
|
end
|
||||||
|
else
|
||||||
when 'REQUEST'
|
consumer = processors.where(uuid: args.last).first # Assuming UUID is the last argument for CHANGE and SHUTDOWN
|
||||||
# Handle request from orchestrator
|
case command
|
||||||
consumer_id, request_type, pcap_chunk_id = args
|
when 'CHANGE', 'SHUTDOWN'
|
||||||
|
consumer.puts "#{command} #{args.join(' ')}"
|
||||||
# Get the connection for this consumer
|
|
||||||
connection = connections[consumer_id]
|
|
||||||
|
|
||||||
# Send the request to the consumer
|
|
||||||
connection.puts "REQUEST #{request_type} #{pcap_chunk_id}"
|
|
||||||
|
|
||||||
when 'UPSTATE'
|
|
||||||
# Handle upstate from consumer
|
|
||||||
consumer_id, state, pcap_chunk_id = args
|
|
||||||
|
|
||||||
# Do something with the state update...
|
|
||||||
# For now, we'll just print it out
|
|
||||||
puts "Consumer #{consumer_id} is now #{state} for chunk #{pcap_chunk_id}"
|
|
||||||
|
|
||||||
when 'FINISHED'
|
|
||||||
# Handle finished from consumer
|
|
||||||
consumer_id, pcap_chunk_id = args
|
|
||||||
|
|
||||||
# Do something with the finished message...
|
|
||||||
# For now, we'll just print it out
|
|
||||||
puts "Consumer #{consumer_id} has finished chunk #{pcap_chunk_id}"
|
|
||||||
|
|
||||||
when 'CHANGE'
|
|
||||||
# Handle change from orchestrator
|
|
||||||
consumer_id, state, urgency = args
|
|
||||||
|
|
||||||
# Get the connection for this consumer
|
|
||||||
connection = connections[consumer_id]
|
|
||||||
|
|
||||||
# Send the change to the consumer
|
|
||||||
connection.puts "CHANGE #{state} #{urgency}"
|
|
||||||
|
|
||||||
when 'SHUTDOWN'
|
|
||||||
# Handle shutdown from orchestrator
|
|
||||||
consumer_id, urgency = args
|
|
||||||
|
|
||||||
# Get the connection for this consumer
|
|
||||||
connection = connections[consumer_id]
|
|
||||||
|
|
||||||
# Send the shutdown to the consumer
|
|
||||||
connection.puts "SHUTDOWN #{urgency}"
|
|
||||||
|
|
||||||
else
|
else
|
||||||
puts "Unknown command: #{command}"
|
puts "Unknown command: #{command}"
|
||||||
end
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def register_orchestrator(line, orchestrators)
|
def register_orchestrator(line, orchestrators)
|
||||||
domain, port = line.split
|
_, domain, port = line.split
|
||||||
id = orchestrators.max(:id).to_i + 1
|
id = orchestrators.max(:id).to_i + 1
|
||||||
orchestrators.insert(id:, domain:, port:)
|
orchestrators.insert(id:, domain:, port:)
|
||||||
puts "Orchestrator registered with domain: #{domain}, port: #{port}"
|
puts "Orchestrator registered with domain: #{domain}, port: #{port}"
|
||||||
|
@ -160,11 +116,11 @@ Async do
|
||||||
begin
|
begin
|
||||||
line = client.gets
|
line = client.gets
|
||||||
# Determine if the connection is from an orchestrator for registration
|
# Determine if the connection is from an orchestrator for registration
|
||||||
if is_orchestrator_registration?(line) # Define this method to identify orchestrator registration
|
if is_orchestrator_registration?(line)
|
||||||
register_orchestrator(line, orchestrators)
|
register_orchestrator(line, orchestrators)
|
||||||
else
|
else
|
||||||
# Here we handle each line of input from the client
|
# Here we handle each line of input from the client
|
||||||
handle_input(line, client, processors, blacklist, recently_connected) # Pass the variables here
|
handle_input(line, processors, blacklist, recently_connected)
|
||||||
end
|
end
|
||||||
ensure
|
ensure
|
||||||
# This code will be executed when the fiber is finished, regardless of whether an exception was raised
|
# This code will be executed when the fiber is finished, regardless of whether an exception was raised
|
||||||
|
|
Loading…
Reference in New Issue
Block a user