OpenSim
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Events Macros
JobEngine.cs
Go to the documentation of this file.
1 /*
2  * Copyright (c) Contributors, http://opensimulator.org/
3  * See CONTRIBUTORS.TXT for a full list of copyright holders.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of the OpenSimulator Project nor the
13  * names of its contributors may be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 using System;
29 using System.Collections.Concurrent;
30 using System.Reflection;
31 using System.Threading;
32 using log4net;
33 using OpenSim.Framework;
34 
35 namespace OpenSim.Framework.Monitoring
36 {
37  public class JobEngine
38  {
39  private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
40 
41  public int LogLevel { get; set; }
42 
43  private object JobLock = new object();
44 
45  public string Name { get; private set; }
46 
47  public string LoggingName { get; private set; }
48 
52  public bool IsRunning { get; private set; }
53 
60  public Job CurrentJob { get; private set; }
61 
65  public int JobsWaiting { get { return m_jobQueue.Count; } }
66 
70  public int RequestProcessTimeoutOnStop { get; set; }
71 
79  private bool m_warnOverMaxQueue = true;
80 
81  private BlockingCollection<Job> m_jobQueue = new BlockingCollection<Job>(new ConcurrentQueue<Job>(), 5000);
82 
83  private CancellationTokenSource m_cancelSource;
84 
88  private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
89 
90  public JobEngine(string name, string loggingName)
91  {
92  Name = name;
93  LoggingName = loggingName;
94 
95  RequestProcessTimeoutOnStop = 5000;
96  }
97 
98  public void Start()
99  {
100  lock (JobLock)
101  {
102  if (IsRunning)
103  return;
104 
105  IsRunning = true;
106 
107  m_finishedProcessingAfterStop.Reset();
108 
109  m_cancelSource = new CancellationTokenSource();
110 
111  WorkManager.StartThread(
112  ProcessRequests,
113  Name,
114  ThreadPriority.Normal,
115  false,
116  true,
117  null,
118  int.MaxValue);
119  }
120  }
121 
122  public void Stop()
123  {
124  lock (JobLock)
125  {
126  try
127  {
128  if (!IsRunning)
129  return;
130 
131  m_log.DebugFormat("[JobEngine] Stopping {0}", Name);
132 
133  IsRunning = false;
134 
135  m_finishedProcessingAfterStop.Reset();
136  if(m_jobQueue.Count <= 0)
137  m_cancelSource.Cancel();
138 
139  m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop);
140  }
141  finally
142  {
143  m_cancelSource.Dispose();
144  }
145  }
146  }
147 
159  public static Job MakeJob(string name, Action action, string commonId = null)
160  {
161  return Job.MakeJob(name, action, commonId);
162  }
163 
172  {
173  Job nextJob;
174  m_jobQueue.TryTake(out nextJob);
175 
176  return nextJob;
177  }
178 
189  public bool QueueJob(string name, Action action, string commonId = null)
190  {
191  return QueueJob(MakeJob(name, action, commonId));
192  }
193 
200  public bool QueueJob(Job job)
201  {
202  if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
203  {
204  m_jobQueue.Add(job);
205 
206  if (!m_warnOverMaxQueue)
207  m_warnOverMaxQueue = true;
208 
209  return true;
210  }
211  else
212  {
213  if (m_warnOverMaxQueue)
214  {
215  m_log.WarnFormat(
216  "[{0}]: Job queue at maximum capacity, not recording job from {1} in {2}",
217  LoggingName, job.Name, Name);
218 
219  m_warnOverMaxQueue = false;
220  }
221 
222  return false;
223  }
224  }
225 
226  private void ProcessRequests()
227  {
228  while(IsRunning || m_jobQueue.Count > 0)
229  {
230  try
231  {
232  CurrentJob = m_jobQueue.Take(m_cancelSource.Token);
233  }
234  catch(ObjectDisposedException e)
235  {
236  // If we see this whilst not running then it may be due to a race where this thread checks
237  // IsRunning after the stopping thread sets it to false and disposes of the cancellation source.
238  if(IsRunning)
239  throw e;
240  else
241  {
242  m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
243  Name,m_jobQueue.Count);
244  break;
245  }
246  }
247  catch(OperationCanceledException)
248  {
249  break;
250  }
251 
252  if(LogLevel >= 1)
253  m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name);
254 
255  try
256  {
257  CurrentJob.Action();
258  }
259  catch(Exception e)
260  {
261  m_log.Error(
262  string.Format(
263  "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e);
264  }
265 
266  if(LogLevel >= 1)
267  m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name);
268 
269  CurrentJob = null;
270  }
271 
272  Watchdog.RemoveThread(false);
273  m_finishedProcessingAfterStop.Set();
274  }
275 
276  public class Job
277  {
284  public string Name { get; private set; }
285 
293  public string CommonId { get; private set; }
294 
298  public Action Action { get; private set; }
299 
300  private Job(string name, string commonId, Action action)
301  {
302  Name = name;
303  CommonId = commonId;
304  Action = action;
305  }
306 
318  public static Job MakeJob(string name, Action action, string commonId = null)
319  {
320  return new Job(name, commonId, action);
321  }
322  }
323  }
324 }
Job RemoveNextJob()
Remove the next job queued for processing.
Definition: JobEngine.cs:171
bool QueueJob(Job job)
Queue the job for processing.
Definition: JobEngine.cs:200
bool QueueJob(string name, Action action, string commonId=null)
Queue the job for processing.
Definition: JobEngine.cs:189
static Job MakeJob(string name, Action action, string commonId=null)
Make a job.
Definition: JobEngine.cs:159
JobEngine(string name, string loggingName)
Definition: JobEngine.cs:90
static Job MakeJob(string name, Action action, string commonId=null)
Make a job. It needs to be separately queued.
Definition: JobEngine.cs:318