29 using System.Collections;
30 using System.Threading;
31 using System.Reflection;
34 using OpenSim.Framework;
35 using OpenSim.Framework.Monitoring;
39 using System.Collections.Generic;
41 namespace OpenSim.Framework.Servers.HttpServer
45 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
49 private Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>> m_bycontext;
50 private BlockingQueue<PollServiceHttpRequest> m_requests =
new BlockingQueue<PollServiceHttpRequest>();
51 private static Queue<PollServiceHttpRequest> m_slowRequests =
new Queue<PollServiceHttpRequest>();
52 private static Queue<PollServiceHttpRequest> m_retryRequests =
new Queue<PollServiceHttpRequest>();
54 private uint m_WorkerThreadCount = 0;
55 private Thread[] m_workerThreads;
56 private Thread m_retrysThread;
58 private bool m_running =
false;
59 private int slowCount = 0;
65 BaseHttpServer pSrv,
bool performResponsesAsync, uint pWorkerThreadCount,
int pTimeout)
68 m_WorkerThreadCount = pWorkerThreadCount;
69 m_workerThreads =
new Thread[m_WorkerThreadCount];
72 m_bycontext =
new Dictionary<PollServiceHttpRequest, Queue<PollServiceHttpRequest>>(preqCp);
74 STPStartInfo startInfo =
new STPStartInfo();
75 startInfo.IdleTimeout = 30000;
76 startInfo.MaxWorkerThreads = 15;
77 startInfo.MinWorkerThreads = 1;
78 startInfo.ThreadPriority = ThreadPriority.Normal;
79 startInfo.StartSuspended =
true;
80 startInfo.ThreadPoolName =
"PoolService";
91 for (uint i = 0; i < m_WorkerThreadCount; i++)
94 = WorkManager.StartThread(
96 string.Format(
"PollServiceWorkerThread {0}:{1}", i, m_server.Port),
97 ThreadPriority.Normal,
104 m_retrysThread = WorkManager.StartThread(
106 string.Format(
"PollServiceWatcherThread:{0}", m_server.Port),
107 ThreadPriority.Normal,
120 lock (m_retryRequests)
121 m_retryRequests.Enqueue(req);
129 Queue<PollServiceHttpRequest> ctxQeueue;
130 if (m_bycontext.TryGetValue(req, out ctxQeueue))
132 ctxQeueue.Enqueue(req);
136 ctxQeueue =
new Queue<PollServiceHttpRequest>();
137 m_bycontext[req] = ctxQeueue;
145 Queue<PollServiceHttpRequest> ctxQeueue;
148 if (m_bycontext.TryGetValue(req, out ctxQeueue))
150 if (ctxQeueue.Count > 0)
157 m_bycontext.Remove(req);
170 m_requests.Enqueue(req);
174 lock (m_slowRequests)
175 m_slowRequests.Enqueue(req);
180 private void CheckRetries()
186 Watchdog.UpdateThread();
187 lock (m_retryRequests)
189 while (m_retryRequests.Count > 0 && m_running)
190 m_requests.Enqueue(m_retryRequests.Dequeue());
197 lock (m_slowRequests)
199 while (m_slowRequests.Count > 0 && m_running)
200 m_requests.Enqueue(m_slowRequests.Dequeue());
212 foreach (
Thread t
in m_workerThreads)
213 Watchdog.AbortThread(t.ManagedThreadId);
217 foreach (Queue<PollServiceHttpRequest> qu
in m_bycontext.Values)
225 req.DoHTTPstop(m_server);
233 m_retryRequests.Clear();
235 lock (m_slowRequests)
237 while (m_slowRequests.Count > 0)
238 m_requests.Enqueue(m_slowRequests.Dequeue());
242 while (m_requests.Count() > 0)
246 wreq = m_requests.Dequeue(0);
247 wreq.DoHTTPstop(m_server);
260 private void PoolWorkerJob()
266 Watchdog.UpdateThread();
273 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
279 req.DoHTTPGruntWork(m_server, responsedata);
280 byContextDequeue(req);
282 catch (ObjectDisposedException)
289 m_threadPool.QueueWorkItem(x =>
293 req.DoHTTPGruntWork(m_server, responsedata);
294 byContextDequeue(req);
296 catch (ObjectDisposedException)
307 if ((Environment.TickCount - req.
RequestTime) > req.PollServiceArgs.TimeOutms)
309 req.DoHTTPGruntWork(m_server,
310 req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
311 byContextDequeue(req);
321 m_log.ErrorFormat(
"Exception in poll service thread: " + e.ToString());
PollServiceRequestManager(BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout)
readonly PollServiceEventArgs PollServiceArgs
HasEventsMethod HasEvents
void byContextDequeue(PollServiceHttpRequest req)
void EnqueueInt(PollServiceHttpRequest req)