| worker_manager.rb - warvox - VoIP based wardialing tool, forked from rapid7/war… | |
| git clone git://jay.scot/warvox | |
| Log | |
| Files | |
| Refs | |
| README | |
| --- | |
| worker_manager.rb (4573B) | |
| --- | |
| 1 #!/usr/bin/env ruby | |
| 2 ################### | |
| 3 | |
| 4 # | |
| 5 # Load the library path | |
| 6 # | |
| 7 base = __FILE__ | |
| 8 while File.symlink?(base) | |
| 9 base = File.expand_path(File.readlink(base), File.dirname(base)) | |
| 10 end | |
| 11 $:.unshift(File.join(File.expand_path(File.dirname(base)), '..', 'lib')) | |
| 12 | |
| 13 @worker_path = File.expand_path(File.join(File.dirname(base), "worker.rb… | |
| 14 | |
| 15 require 'warvox' | |
| 16 require 'socket' | |
| 17 | |
| 18 ENV['RAILS_ENV'] ||= 'production' | |
| 19 | |
| 20 $:.unshift(File.join(File.expand_path(File.dirname(base)), '..')) | |
| 21 require 'config/boot' | |
| 22 require 'config/environment' | |
| 23 | |
| 24 | |
| 25 @jobs = [] | |
| 26 | |
| 27 def stop | |
| 28 WarVOX::Log.info("Worker Manager is terminating due to signal") | |
| 29 | |
| 30 unless @jobs.length > 0 | |
| 31 exit(0) | |
| 32 end | |
| 33 | |
| 34 # Update the database | |
| 35 Job.update_all({ status: "stopped", completed_at: Time.now.utc}, { id:… | |
| 36 | |
| 37 # Signal running jobs to shut down | |
| 38 @jobs.map{|j| Process.kill("TERM", j[:pid]) rescue nil } | |
| 39 | |
| 40 # Sleep for five seconds | |
| 41 sleep(5) | |
| 42 | |
| 43 # Forcibly kill any remaining job processes | |
| 44 @jobs.map{|j| Process.kill("KILL", j[:pid]) rescue nil } | |
| 45 | |
| 46 exit(0) | |
| 47 end | |
| 48 | |
| 49 | |
| 50 def clear_zombies | |
| 51 while ( r = Process.waitpid(-1, Process::WNOHANG) rescue nil ) do | |
| 52 end | |
| 53 end | |
| 54 | |
| 55 def schedule_job(j) | |
| 56 WarVOX::Log.debug("Worker Manager is launching job #{j.id}") | |
| 57 @jobs << { | |
| 58 id: j.id, | |
| 59 pid: Process.fork { exec("#{@worker_path} #{j.id}") } | |
| 60 } | |
| 61 end | |
| 62 | |
| 63 def stop_cancelled_jobs | |
| 64 jids = [] | |
| 65 @jobs.each do |x| | |
| 66 jids << x[:id] | |
| 67 end | |
| 68 | |
| 69 return if jids.length == 0 | |
| 70 Job.where(status: 'cancelled', id: jids).find_each do |j| | |
| 71 job = @jobs.select{ |o| o[:id] == j.id }.first | |
| 72 next unless job and job[:pid] | |
| 73 pid = job[:pid] | |
| 74 | |
| 75 WarVOX::Log.debug("Worker Manager is killing job #{j.id} with PID #{… | |
| 76 Process.kill('TERM', pid) | |
| 77 end | |
| 78 end | |
| 79 | |
| 80 def clear_completed_jobs | |
| 81 dead_pids = [] | |
| 82 dead_jids = [] | |
| 83 | |
| 84 @jobs.each do |j| | |
| 85 alive = Process.kill(0, j[:pid]) rescue nil | |
| 86 next if alive | |
| 87 dead_pids << j[:pid] | |
| 88 dead_jids << j[:id] | |
| 89 end | |
| 90 | |
| 91 return unless dead_jids.length > 0 | |
| 92 | |
| 93 WarVOX::Log.debug("Worker Manager is clearing #{dead_pids.length} comp… | |
| 94 | |
| 95 @jobs = @jobs.reject{|x| dead_pids.include?( x[:pid] ) } | |
| 96 | |
| 97 # Mark failed/crashed jobs as completed | |
| 98 Job.where(id: dead_jids, completed_at: nil).update_all({completed_at: … | |
| 99 end | |
| 100 | |
| 101 def clear_stale_jobs | |
| 102 jids = @jobs.map{|x| x[:id] } | |
| 103 stale = nil | |
| 104 | |
| 105 if jids.length > 0 | |
| 106 stale = Job.where("completed_at IS NULL AND locked_by LIKE ? AND id … | |
| 107 else | |
| 108 stale = Job.where("completed_at IS NULL AND locked_by LIKE ?", Socke… | |
| 109 end | |
| 110 | |
| 111 dead = [] | |
| 112 pids = {} | |
| 113 | |
| 114 # Extract the PID from the locked_by cookie for each job | |
| 115 stale.each do |j| | |
| 116 host, pid, uniq = j.locked_by.to_s.split("^", 3) | |
| 117 next unless (pid and uniq) | |
| 118 pids[pid] ||= [] | |
| 119 pids[pid] << j | |
| 120 end | |
| 121 | |
| 122 # Identify dead processes (must be same user or root) | |
| 123 pids.keys.each do |pid| | |
| 124 alive = Process.kill(0, pid.to_i) rescue nil | |
| 125 next if alive | |
| 126 pids[pid].each do |j| | |
| 127 dead << j.id | |
| 128 end | |
| 129 end | |
| 130 | |
| 131 # Mark these jobs as abandoned | |
| 132 if dead.length > 0 | |
| 133 WarVOX::Log.debug("Worker Manager is marking #{dead.length} jobs as … | |
| 134 Job.where(id: dead).update_all({locked_by: nil, status: 'abandoned'}) | |
| 135 end | |
| 136 end | |
| 137 | |
| 138 def schedule_submitted_jobs | |
| 139 loop do | |
| 140 # Look for a candidate job with no current owner | |
| 141 j = Job.where(status: 'submitted', locked_by: nil).limit(1).first | |
| 142 return unless j | |
| 143 | |
| 144 # Try to get a lock on this job | |
| 145 Job.where(id: j.id, locked_by: nil).update_all({locked_by: @cookie, … | |
| 146 | |
| 147 # See if we actually got the lock | |
| 148 j = Job.where(id: j.id, status: 'scheduled', locked_by: @cookie).li… | |
| 149 | |
| 150 # Try again if we lost the race, | |
| 151 next unless j | |
| 152 | |
| 153 # Hurray, we got a job, run it | |
| 154 schedule_job(j) | |
| 155 | |
| 156 return true | |
| 157 end | |
| 158 end | |
| 159 | |
| 160 # | |
| 161 # Main | |
| 162 # | |
| 163 | |
| 164 trap("SIGINT") { Thread.new{ stop } } | |
| 165 trap("SIGTERM") { Thread.new{ stop } } | |
| 166 | |
| 167 @cookie = Socket.gethostname + "^" + $$.to_s + "^" + sprintf("%.8x", r… | |
| 168 @max_jobs = 3 | |
| 169 | |
| 170 | |
| 171 WarVOX::Log.info("Worker Manager initialized with cookie #{@cookie}") | |
| 172 | |
| 173 loop do | |
| 174 $0 = "warvox manager: #{@jobs.length} active jobs (cookie : #{@cookie}… | |
| 175 | |
| 176 # Clear any zombie processes | |
| 177 clear_zombies() | |
| 178 | |
| 179 # Clear any completed jobs | |
| 180 clear_completed_jobs() | |
| 181 | |
| 182 # Stop any jobs cancelled by the user | |
| 183 stop_cancelled_jobs() | |
| 184 | |
| 185 # Clear locks on any stale jobs from this host | |
| 186 clear_stale_jobs() | |
| 187 | |
| 188 while @jobs.length < @max_jobs | |
| 189 break unless schedule_submitted_jobs | |
| 190 end | |
| 191 | |
| 192 # Sleep between 3-8 seconds before re-entering the loop | |
| 193 sleep(rand(5) + 3) | |
| 194 end |