29 using System.Collections.Concurrent;
30 using System.Reflection;
31 using System.Threading;
33 using OpenSim.Framework;
35 namespace OpenSim.Framework.Monitoring
39 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
41 public int LogLevel {
get; set; }
43 private object JobLock =
new object();
45 public string Name {
get;
private set; }
47 public string LoggingName {
get;
private set; }
52 public bool IsRunning {
get;
private set; }
60 public Job CurrentJob {
get;
private set; }
65 public int JobsWaiting {
get {
return m_jobQueue.Count; } }
70 public int RequestProcessTimeoutOnStop {
get; set; }
79 private bool m_warnOverMaxQueue =
true;
81 private BlockingCollection<Job> m_jobQueue =
new BlockingCollection<Job>(
new ConcurrentQueue<Job>(), 5000);
83 private CancellationTokenSource m_cancelSource;
88 private ManualResetEvent m_finishedProcessingAfterStop =
new ManualResetEvent(
false);
93 LoggingName = loggingName;
95 RequestProcessTimeoutOnStop = 5000;
107 m_finishedProcessingAfterStop.Reset();
109 m_cancelSource =
new CancellationTokenSource();
111 WorkManager.StartThread(
114 ThreadPriority.Normal,
131 m_log.DebugFormat(
"[JobEngine] Stopping {0}", Name);
135 m_finishedProcessingAfterStop.Reset();
136 if(m_jobQueue.Count <= 0)
137 m_cancelSource.Cancel();
139 m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop);
143 m_cancelSource.Dispose();
159 public static Job MakeJob(
string name, Action action,
string commonId = null)
161 return Job.MakeJob(name, action, commonId);
174 m_jobQueue.TryTake(out nextJob);
189 public bool QueueJob(
string name, Action action,
string commonId = null)
191 return QueueJob(MakeJob(name, action, commonId));
202 if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
206 if (!m_warnOverMaxQueue)
207 m_warnOverMaxQueue =
true;
213 if (m_warnOverMaxQueue)
216 "[{0}]: Job queue at maximum capacity, not recording job from {1} in {2}",
217 LoggingName, job.Name, Name);
219 m_warnOverMaxQueue =
false;
226 private void ProcessRequests()
228 while(IsRunning || m_jobQueue.Count > 0)
232 CurrentJob = m_jobQueue.Take(m_cancelSource.Token);
234 catch(ObjectDisposedException e)
242 m_log.DebugFormat(
"[JobEngine] {0} stopping ignoring {1} jobs in queue",
243 Name,m_jobQueue.Count);
247 catch(OperationCanceledException)
253 m_log.DebugFormat(
"[{0}]: Processing job {1}",LoggingName,CurrentJob.Name);
263 "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e);
267 m_log.DebugFormat(
"[{0}]: Processed job {1}",LoggingName,CurrentJob.Name);
272 Watchdog.RemoveThread(
false);
273 m_finishedProcessingAfterStop.Set();
284 public string Name {
get;
private set; }
293 public string CommonId {
get;
private set; }
298 public Action Action {
get;
private set; }
300 private Job(
string name,
string commonId, Action action)
318 public static Job MakeJob(
string name, Action action,
string commonId = null)
320 return new Job(name, commonId, action);
Job RemoveNextJob()
Remove the next job queued for processing.
bool QueueJob(Job job)
Queue the job for processing.
bool QueueJob(string name, Action action, string commonId=null)
Queue the job for processing.
static Job MakeJob(string name, Action action, string commonId=null)
Make a job.
JobEngine(string name, string loggingName)
static Job MakeJob(string name, Action action, string commonId=null)
Make a job. It needs to be separately queued.