OpenSim
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Events Macros
PollServiceRequestManager.cs
Go to the documentation of this file.
1 /*
2  * Copyright (c) Contributors, http://opensimulator.org/
3  * See CONTRIBUTORS.TXT for a full list of copyright holders.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of the OpenSimulator Project nor the
13  * names of its contributors may be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 using System;
29 using System.Collections;
30 using System.Threading;
31 using System.Reflection;
32 using log4net;
33 using HttpServer;
34 using OpenSim.Framework;
35 using OpenSim.Framework.Monitoring;
36 using Amib.Threading;
37 using System.IO;
38 using System.Text;
39 using System.Collections.Generic;
40 
41 namespace OpenSim.Framework.Servers.HttpServer
42 {
44  {
45  private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
46 
47  private readonly BaseHttpServer m_server;
48 
49  private Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>> m_bycontext;
50  private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
51  private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
52  private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
53 
54  private uint m_WorkerThreadCount = 0;
55  private Thread[] m_workerThreads;
56  private Thread m_retrysThread;
57 
58  private bool m_running = false;
59  private int slowCount = 0;
60 
61  private SmartThreadPool m_threadPool;
62 
63 
65  BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout)
66  {
67  m_server = pSrv;
68  m_WorkerThreadCount = pWorkerThreadCount;
69  m_workerThreads = new Thread[m_WorkerThreadCount];
70 
72  m_bycontext = new Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>>(preqCp);
73 
74  STPStartInfo startInfo = new STPStartInfo();
75  startInfo.IdleTimeout = 30000;
76  startInfo.MaxWorkerThreads = 15;
77  startInfo.MinWorkerThreads = 1;
78  startInfo.ThreadPriority = ThreadPriority.Normal;
79  startInfo.StartSuspended = true;
80  startInfo.ThreadPoolName = "PoolService";
81 
82  m_threadPool = new SmartThreadPool(startInfo);
83 
84  }
85 
86  public void Start()
87  {
88  m_running = true;
89  m_threadPool.Start();
90  //startup worker threads
91  for (uint i = 0; i < m_WorkerThreadCount; i++)
92  {
93  m_workerThreads[i]
94  = WorkManager.StartThread(
95  PoolWorkerJob,
96  string.Format("PollServiceWorkerThread {0}:{1}", i, m_server.Port),
97  ThreadPriority.Normal,
98  false,
99  false,
100  null,
101  int.MaxValue);
102  }
103 
104  m_retrysThread = WorkManager.StartThread(
105  this.CheckRetries,
106  string.Format("PollServiceWatcherThread:{0}", m_server.Port),
107  ThreadPriority.Normal,
108  false,
109  true,
110  null,
111  1000 * 60 * 10);
112 
113 
114  }
115 
116  private void ReQueueEvent(PollServiceHttpRequest req)
117  {
118  if (m_running)
119  {
120  lock (m_retryRequests)
121  m_retryRequests.Enqueue(req);
122  }
123  }
124 
125  public void Enqueue(PollServiceHttpRequest req)
126  {
127  lock (m_bycontext)
128  {
129  Queue<PollServiceHttpRequest> ctxQeueue;
130  if (m_bycontext.TryGetValue(req, out ctxQeueue))
131  {
132  ctxQeueue.Enqueue(req);
133  }
134  else
135  {
136  ctxQeueue = new Queue<PollServiceHttpRequest>();
137  m_bycontext[req] = ctxQeueue;
138  EnqueueInt(req);
139  }
140  }
141  }
142 
144  {
145  Queue<PollServiceHttpRequest> ctxQeueue;
146  lock (m_bycontext)
147  {
148  if (m_bycontext.TryGetValue(req, out ctxQeueue))
149  {
150  if (ctxQeueue.Count > 0)
151  {
152  PollServiceHttpRequest newreq = ctxQeueue.Dequeue();
153  EnqueueInt(newreq);
154  }
155  else
156  {
157  m_bycontext.Remove(req);
158  }
159  }
160  }
161  }
162 
163 
165  {
166  if (m_running)
167  {
169  {
170  m_requests.Enqueue(req);
171  }
172  else
173  {
174  lock (m_slowRequests)
175  m_slowRequests.Enqueue(req);
176  }
177  }
178  }
179 
180  private void CheckRetries()
181  {
182  while (m_running)
183 
184  {
185  Thread.Sleep(100); // let the world move .. back to faster rate
186  Watchdog.UpdateThread();
187  lock (m_retryRequests)
188  {
189  while (m_retryRequests.Count > 0 && m_running)
190  m_requests.Enqueue(m_retryRequests.Dequeue());
191  }
192  slowCount++;
193  if (slowCount >= 10)
194  {
195  slowCount = 0;
196 
197  lock (m_slowRequests)
198  {
199  while (m_slowRequests.Count > 0 && m_running)
200  m_requests.Enqueue(m_slowRequests.Dequeue());
201  }
202  }
203  }
204  }
205 
206  public void Stop()
207  {
208  m_running = false;
209 
210  Thread.Sleep(1000); // let the world move
211 
212  foreach (Thread t in m_workerThreads)
213  Watchdog.AbortThread(t.ManagedThreadId);
214 
215  // any entry in m_bycontext should have a active request on the other queues
216  // so just delete contents to easy GC
217  foreach (Queue<PollServiceHttpRequest> qu in m_bycontext.Values)
218  qu.Clear();
219  m_bycontext.Clear();
220 
221  try
222  {
223  foreach (PollServiceHttpRequest req in m_retryRequests)
224  {
225  req.DoHTTPstop(m_server);
226  }
227  }
228  catch
229  {
230  }
231 
233  m_retryRequests.Clear();
234 
235  lock (m_slowRequests)
236  {
237  while (m_slowRequests.Count > 0)
238  m_requests.Enqueue(m_slowRequests.Dequeue());
239 
240  }
241 
242  while (m_requests.Count() > 0)
243  {
244  try
245  {
246  wreq = m_requests.Dequeue(0);
247  wreq.DoHTTPstop(m_server);
248 
249  }
250  catch
251  {
252  }
253  }
254 
255  m_requests.Clear();
256  }
257 
258  // work threads
259 
260  private void PoolWorkerJob()
261  {
262  while (m_running)
263  {
264  PollServiceHttpRequest req = m_requests.Dequeue(5000);
265 
266  Watchdog.UpdateThread();
267  if (req != null)
268  {
269  try
270  {
272  {
273  Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
274 
275  if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) // This is the event queue
276  {
277  try
278  {
279  req.DoHTTPGruntWork(m_server, responsedata);
280  byContextDequeue(req);
281  }
282  catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
283  {
284  // Ignore it, no need to reply
285  }
286  }
287  else
288  {
289  m_threadPool.QueueWorkItem(x =>
290  {
291  try
292  {
293  req.DoHTTPGruntWork(m_server, responsedata);
294  byContextDequeue(req);
295  }
296  catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
297  {
298  // Ignore it, no need to reply
299  }
300 
301  return null;
302  }, null);
303  }
304  }
305  else
306  {
307  if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
308  {
309  req.DoHTTPGruntWork(m_server,
310  req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
311  byContextDequeue(req);
312  }
313  else
314  {
315  ReQueueEvent(req);
316  }
317  }
318  }
319  catch (Exception e)
320  {
321  m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
322  }
323  }
324  }
325  }
326 
327  }
328 }
PollServiceRequestManager(BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout)