Introduction
Introduction Statistics Contact Development Disclaimer Help
worker_manager.rb - warvox - VoIP based wardialing tool, forked from rapid7/war…
git clone git://jay.scot/warvox
Log
Files
Refs
README
---
worker_manager.rb (4573B)
---
1 #!/usr/bin/env ruby
2 ###################
3
4 #
5 # Load the library path
6 #
7 base = __FILE__
8 while File.symlink?(base)
9 base = File.expand_path(File.readlink(base), File.dirname(base))
10 end
11 $:.unshift(File.join(File.expand_path(File.dirname(base)), '..', 'lib'))
12
13 @worker_path = File.expand_path(File.join(File.dirname(base), "worker.rb…
14
15 require 'warvox'
16 require 'socket'
17
18 ENV['RAILS_ENV'] ||= 'production'
19
20 $:.unshift(File.join(File.expand_path(File.dirname(base)), '..'))
21 require 'config/boot'
22 require 'config/environment'
23
24
25 @jobs = []
26
27 def stop
28 WarVOX::Log.info("Worker Manager is terminating due to signal")
29
30 unless @jobs.length > 0
31 exit(0)
32 end
33
34 # Update the database
35 Job.update_all({ status: "stopped", completed_at: Time.now.utc}, { id:…
36
37 # Signal running jobs to shut down
38 @jobs.map{|j| Process.kill("TERM", j[:pid]) rescue nil }
39
40 # Sleep for five seconds
41 sleep(5)
42
43 # Forcibly kill any remaining job processes
44 @jobs.map{|j| Process.kill("KILL", j[:pid]) rescue nil }
45
46 exit(0)
47 end
48
49
50 def clear_zombies
51 while ( r = Process.waitpid(-1, Process::WNOHANG) rescue nil ) do
52 end
53 end
54
55 def schedule_job(j)
56 WarVOX::Log.debug("Worker Manager is launching job #{j.id}")
57 @jobs << {
58 id: j.id,
59 pid: Process.fork { exec("#{@worker_path} #{j.id}") }
60 }
61 end
62
63 def stop_cancelled_jobs
64 jids = []
65 @jobs.each do |x|
66 jids << x[:id]
67 end
68
69 return if jids.length == 0
70 Job.where(status: 'cancelled', id: jids).find_each do |j|
71 job = @jobs.select{ |o| o[:id] == j.id }.first
72 next unless job and job[:pid]
73 pid = job[:pid]
74
75 WarVOX::Log.debug("Worker Manager is killing job #{j.id} with PID #{…
76 Process.kill('TERM', pid)
77 end
78 end
79
80 def clear_completed_jobs
81 dead_pids = []
82 dead_jids = []
83
84 @jobs.each do |j|
85 alive = Process.kill(0, j[:pid]) rescue nil
86 next if alive
87 dead_pids << j[:pid]
88 dead_jids << j[:id]
89 end
90
91 return unless dead_jids.length > 0
92
93 WarVOX::Log.debug("Worker Manager is clearing #{dead_pids.length} comp…
94
95 @jobs = @jobs.reject{|x| dead_pids.include?( x[:pid] ) }
96
97 # Mark failed/crashed jobs as completed
98 Job.where(id: dead_jids, completed_at: nil).update_all({completed_at: …
99 end
100
101 def clear_stale_jobs
102 jids = @jobs.map{|x| x[:id] }
103 stale = nil
104
105 if jids.length > 0
106 stale = Job.where("completed_at IS NULL AND locked_by LIKE ? AND id …
107 else
108 stale = Job.where("completed_at IS NULL AND locked_by LIKE ?", Socke…
109 end
110
111 dead = []
112 pids = {}
113
114 # Extract the PID from the locked_by cookie for each job
115 stale.each do |j|
116 host, pid, uniq = j.locked_by.to_s.split("^", 3)
117 next unless (pid and uniq)
118 pids[pid] ||= []
119 pids[pid] << j
120 end
121
122 # Identify dead processes (must be same user or root)
123 pids.keys.each do |pid|
124 alive = Process.kill(0, pid.to_i) rescue nil
125 next if alive
126 pids[pid].each do |j|
127 dead << j.id
128 end
129 end
130
131 # Mark these jobs as abandoned
132 if dead.length > 0
133 WarVOX::Log.debug("Worker Manager is marking #{dead.length} jobs as …
134 Job.where(id: dead).update_all({locked_by: nil, status: 'abandoned'})
135 end
136 end
137
138 def schedule_submitted_jobs
139 loop do
140 # Look for a candidate job with no current owner
141 j = Job.where(status: 'submitted', locked_by: nil).limit(1).first
142 return unless j
143
144 # Try to get a lock on this job
145 Job.where(id: j.id, locked_by: nil).update_all({locked_by: @cookie, …
146
147 # See if we actually got the lock
148 j = Job.where(id: j.id, status: 'scheduled', locked_by: @cookie).li…
149
150 # Try again if we lost the race,
151 next unless j
152
153 # Hurray, we got a job, run it
154 schedule_job(j)
155
156 return true
157 end
158 end
159
160 #
161 # Main
162 #
163
164 trap("SIGINT") { Thread.new{ stop } }
165 trap("SIGTERM") { Thread.new{ stop } }
166
167 @cookie = Socket.gethostname + "^" + $$.to_s + "^" + sprintf("%.8x", r…
168 @max_jobs = 3
169
170
171 WarVOX::Log.info("Worker Manager initialized with cookie #{@cookie}")
172
173 loop do
174 $0 = "warvox manager: #{@jobs.length} active jobs (cookie : #{@cookie}…
175
176 # Clear any zombie processes
177 clear_zombies()
178
179 # Clear any completed jobs
180 clear_completed_jobs()
181
182 # Stop any jobs cancelled by the user
183 stop_cancelled_jobs()
184
185 # Clear locks on any stale jobs from this host
186 clear_stale_jobs()
187
188 while @jobs.length < @max_jobs
189 break unless schedule_submitted_jobs
190 end
191
192 # Sleep between 3-8 seconds before re-entering the loop
193 sleep(rand(5) + 3)
194 end
You are viewing proxied material from jay.scot. The copyright of proxied material belongs to its original authors. Any comments or complaints in relation to proxied material should be directed to the original authors of the content concerned. Please see the disclaimer for more details.