OpenSim
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Events Macros
AssetServicesConnector.cs
Go to the documentation of this file.
1 /*
2  * Copyright (c) Contributors, http://opensimulator.org/
3  * See CONTRIBUTORS.TXT for a full list of copyright holders.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * * Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of the OpenSimulator Project nor the
13  * names of its contributors may be used to endorse or promote products
14  * derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 using log4net;
29 using System;
30 using System.Threading;
31 using System.Collections.Generic;
32 using System.IO;
33 using System.Reflection;
34 using System.Timers;
35 using Nini.Config;
36 using OpenSim.Framework;
37 using OpenSim.Framework.Console;
38 using OpenSim.Services.Interfaces;
39 using OpenMetaverse;
40 
41 namespace OpenSim.Services.Connectors
42 {
44  {
45  private static readonly ILog m_log =
46  LogManager.GetLogger(
47  MethodBase.GetCurrentMethod().DeclaringType);
48 
49  private string m_ServerURI = String.Empty;
50  private IImprovedAssetCache m_Cache = null;
51  private int m_retryCounter;
52  private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
53  private System.Timers.Timer m_retryTimer;
54  private int m_maxAssetRequestConcurrency = 30;
55 
56  private delegate void AssetRetrievedEx(AssetBase asset);
57 
58  // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
59  // Maps: Asset ID -> Handlers which will be called when the asset has been loaded
60 // private Dictionary<string, AssetRetrievedEx> m_AssetHandlers = new Dictionary<string, AssetRetrievedEx>();
61 
62  private Dictionary<string, List<AssetRetrievedEx>> m_AssetHandlers = new Dictionary<string, List<AssetRetrievedEx>>();
63 
64  private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
65 
66  private Thread[] m_fetchThreads;
67 
68  public int MaxAssetRequestConcurrency
69  {
70  get { return m_maxAssetRequestConcurrency; }
71  set { m_maxAssetRequestConcurrency = value; }
72  }
73 
75  {
76  }
77 
78  public AssetServicesConnector(string serverURI)
79  {
80  m_ServerURI = serverURI.TrimEnd('/');
81  }
82 
83  public AssetServicesConnector(IConfigSource source)
84  : base(source, "AssetService")
85  {
86  Initialise(source);
87  }
88 
89  public virtual void Initialise(IConfigSource source)
90  {
91  IConfig netconfig = source.Configs["Network"];
92  if (netconfig != null)
93  m_maxAssetRequestConcurrency = netconfig.GetInt("MaxRequestConcurrency",m_maxAssetRequestConcurrency);
94 
95  IConfig assetConfig = source.Configs["AssetService"];
96  if (assetConfig == null)
97  {
98  m_log.Error("[ASSET CONNECTOR]: AssetService missing from OpenSim.ini");
99  throw new Exception("Asset connector init error");
100  }
101 
102  string serviceURI = assetConfig.GetString("AssetServerURI",
103  String.Empty);
104 
105  m_ServerURI = serviceURI;
106 
107  if (serviceURI == String.Empty)
108  {
109  m_log.Error("[ASSET CONNECTOR]: No Server URI named in section AssetService");
110  throw new Exception("Asset connector init error");
111  }
112 
113 
114  m_retryTimer = new System.Timers.Timer();
115  m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
116  m_retryTimer.Interval = 60000;
117 
118  Uri serverUri = new Uri(m_ServerURI);
119 
120  string groupHost = serverUri.Host;
121 
122  for (int i = 0 ; i < 256 ; i++)
123  {
124  string prefix = i.ToString("x2");
125  groupHost = assetConfig.GetString("AssetServerHost_"+prefix, groupHost);
126 
127  m_UriMap[prefix] = groupHost;
128  //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
129  }
130 
131  m_fetchThreads = new Thread[2];
132 
133  for (int i = 0 ; i < 2 ; i++)
134  {
135  m_fetchThreads[i] = new Thread(AssetRequestProcessor);
136  m_fetchThreads[i].Start();
137  }
138  }
139 
140  private string MapServer(string id)
141  {
142  if (m_UriMap.Count == 0)
143  return m_ServerURI;
144 
145  UriBuilder serverUri = new UriBuilder(m_ServerURI);
146 
147  string prefix = id.Substring(0, 2).ToLower();
148 
149  string host;
150 
151  // HG URLs will not be valid UUIDS
152  if (m_UriMap.ContainsKey(prefix))
153  host = m_UriMap[prefix];
154  else
155  host = m_UriMap["00"];
156 
157  serverUri.Host = host;
158 
159  // m_log.DebugFormat("[ASSET]: Using {0} for host name for prefix {1}", host, prefix);
160 
161  string ret = serverUri.Uri.AbsoluteUri;
162  if (ret.EndsWith("/"))
163  ret = ret.Substring(0, ret.Length - 1);
164  return ret;
165  }
166 
167  protected void retryCheck(object source, ElapsedEventArgs e)
168  {
169  m_retryCounter++;
170  if (m_retryCounter > 60)
171  m_retryCounter -= 60;
172 
173  List<int> keys = new List<int>();
174  foreach (int a in m_retryQueue.Keys)
175  {
176  keys.Add(a);
177  }
178  foreach (int a in keys)
179  {
180  //We exponentially fall back on frequency until we reach one attempt per hour
181  //The net result is that we end up in the queue for roughly 24 hours..
182  //24 hours worth of assets could be a lot, so the hope is that the region admin
183  //will have gotten the asset connector back online quickly!
184 
185  int timefactor = a ^ 2;
186  if (timefactor > 60)
187  {
188  timefactor = 60;
189  }
190 
191  //First, find out if we care about this timefactor
192  if (timefactor % a == 0)
193  {
194  //Yes, we do!
195  List<AssetBase> retrylist = m_retryQueue[a];
196  m_retryQueue.Remove(a);
197 
198  foreach(AssetBase ass in retrylist)
199  {
200  Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
201  }
202  }
203  }
204 
205  if (m_retryQueue.Count == 0)
206  {
207  //It might only be one tick per minute, but I have
208  //repented and abandoned my wasteful ways
209  m_retryCounter = 0;
210  m_retryTimer.Stop();
211  }
212  }
213 
214  protected void SetCache(IImprovedAssetCache cache)
215  {
216  m_Cache = cache;
217  }
218 
219  public AssetBase Get(string id)
220  {
221  string uri = MapServer(id) + "/assets/" + id;
222 
223  AssetBase asset = null;
224  if (m_Cache != null)
225  asset = m_Cache.Get(id);
226 
227  if (asset == null || asset.Data == null || asset.Data.Length == 0)
228  {
229  // XXX: Commented out for now since this has either never been properly operational or not for some time
230  // as m_maxAssetRequestConcurrency was being passed as the timeout, not a concurrency limiting option.
231  // Wasn't noticed before because timeout wasn't actually used.
232  // Not attempting concurrency setting for now as this omission was discovered in release candidate
233  // phase for OpenSimulator 0.8. Need to revisit afterwards.
234 // asset
235 // = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>(
236 // "GET", uri, 0, m_maxAssetRequestConcurrency);
237 
238  asset = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, m_Auth);
239 
240  if (m_Cache != null)
241  m_Cache.Cache(asset);
242  }
243  return asset;
244  }
245 
246  public AssetBase GetCached(string id)
247  {
248 // m_log.DebugFormat("[ASSET SERVICE CONNECTOR]: Cache request for {0}", id);
249 
250  if (m_Cache != null)
251  return m_Cache.Get(id);
252 
253  return null;
254  }
255 
256  public AssetMetadata GetMetadata(string id)
257  {
258  if (m_Cache != null)
259  {
260  AssetBase fullAsset = m_Cache.Get(id);
261 
262  if (fullAsset != null)
263  return fullAsset.Metadata;
264  }
265 
266  string uri = MapServer(id) + "/assets/" + id + "/metadata";
267 
268  AssetMetadata asset = SynchronousRestObjectRequester.MakeRequest<int, AssetMetadata>("GET", uri, 0, m_Auth);
269  return asset;
270  }
271 
272  public byte[] GetData(string id)
273  {
274  if (m_Cache != null)
275  {
276  AssetBase fullAsset = m_Cache.Get(id);
277 
278  if (fullAsset != null)
279  return fullAsset.Data;
280  }
281 
282  using (RestClient rc = new RestClient(MapServer(id)))
283  {
284  rc.AddResourcePath("assets");
285  rc.AddResourcePath(id);
286  rc.AddResourcePath("data");
287 
288  rc.RequestMethod = "GET";
289 
290  using (Stream s = rc.Request(m_Auth))
291  {
292  if (s == null)
293  return null;
294 
295  if (s.Length > 0)
296  {
297  byte[] ret = new byte[s.Length];
298  s.Read(ret, 0, (int)s.Length);
299 
300  return ret;
301  }
302  }
303  return null;
304  }
305  }
306 
307  private class QueuedAssetRequest
308  {
309  public string uri;
310  public string id;
311  }
312 
313  private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue =
314  new OpenMetaverse.BlockingQueue<QueuedAssetRequest>();
315 
316  private void AssetRequestProcessor()
317  {
318  QueuedAssetRequest r;
319 
320  while (true)
321  {
322  r = m_requestQueue.Dequeue();
323 
324  string uri = r.uri;
325  string id = r.id;
326 
327  bool success = false;
328  try
329  {
330  AssetBase a = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, 30000, m_Auth);
331  if (a != null)
332  {
333  if (m_Cache != null)
334  m_Cache.Cache(a);
335 
336  List<AssetRetrievedEx> handlers;
337  lock (m_AssetHandlers)
338  {
339  handlers = m_AssetHandlers[id];
340  m_AssetHandlers.Remove(id);
341  }
342 
343  Util.FireAndForget(x =>
344  {
345 
346  foreach (AssetRetrievedEx h in handlers)
347  {
348  // Util.FireAndForget(x =>
349  // {
350  try { h.Invoke(a); }
351  catch { }
352  // });
353  }
354 
355  if (handlers != null)
356  handlers.Clear();
357 
358  });
359 
360 // if (handlers != null)
361 // handlers.Clear();
362  success = true;
363  }
364  }
365  finally
366  {
367  if (!success)
368  {
369  List<AssetRetrievedEx> handlers;
370  lock (m_AssetHandlers)
371  {
372  handlers = m_AssetHandlers[id];
373  m_AssetHandlers.Remove(id);
374  }
375  if (handlers != null)
376  handlers.Clear();
377  }
378  }
379  }
380  }
381 
382  public bool Get(string id, Object sender, AssetRetrieved handler)
383  {
384  string uri = MapServer(id) + "/assets/" + id;
385 
386  AssetBase asset = null;
387  if (m_Cache != null)
388  asset = m_Cache.Get(id);
389 
390  if (asset == null || asset.Data == null || asset.Data.Length == 0)
391  {
392  lock (m_AssetHandlers)
393  {
394  AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
395 
396 // AssetRetrievedEx handlers;
397  List<AssetRetrievedEx> handlers;
398  if (m_AssetHandlers.TryGetValue(id, out handlers))
399  {
400  // Someone else is already loading this asset. It will notify our handler when done.
401 // handlers += handlerEx;
402  handlers.Add(handlerEx);
403  return true;
404  }
405 
406  // Load the asset ourselves
407 // handlers += handlerEx;
408  handlers = new List<AssetRetrievedEx>();
409  handlers.Add(handlerEx);
410 
411  m_AssetHandlers.Add(id, handlers);
412  }
413 
414  QueuedAssetRequest request = new QueuedAssetRequest();
415  request.id = id;
416  request.uri = uri;
417 
418  m_requestQueue.Enqueue(request);
419  }
420  else
421  {
422  handler(id, sender, asset);
423  }
424 
425  return true;
426  }
427 
428  public virtual bool[] AssetsExist(string[] ids)
429  {
430  string uri = m_ServerURI + "/get_assets_exist";
431 
432  bool[] exist = null;
433  try
434  {
435  exist = SynchronousRestObjectRequester.MakeRequest<string[], bool[]>("POST", uri, ids, m_Auth);
436  }
437  catch (Exception)
438  {
439  // This is most likely to happen because the server doesn't support this function,
440  // so just silently return "doesn't exist" for all the assets.
441  }
442 
443  if (exist == null)
444  exist = new bool[ids.Length];
445 
446  return exist;
447  }
448 
449  string stringUUIDZero = UUID.Zero.ToString();
450 
451  public string Store(AssetBase asset)
452  {
453  // Have to assign the asset ID here. This isn't likely to
454  // trigger since current callers don't pass emtpy IDs
455  // We need the asset ID to route the request to the proper
456  // cluster member, so we can't have the server assign one.
457  if (asset.ID == string.Empty || asset.ID == stringUUIDZero)
458  {
459  if (asset.FullID == UUID.Zero)
460  {
461  asset.FullID = UUID.Random();
462  }
463  m_log.WarnFormat("[Assets] Zero ID: {0}",asset.Name);
464  asset.ID = asset.FullID.ToString();
465  }
466 
467  if (asset.FullID == UUID.Zero)
468  {
469  UUID uuid = UUID.Zero;
470  if (UUID.TryParse(asset.ID, out uuid))
471  {
472  asset.FullID = uuid;
473  }
474  if(asset.FullID == UUID.Zero)
475  {
476  m_log.WarnFormat("[Assets] Zero IDs: {0}",asset.Name);
477  asset.FullID = UUID.Random();
478  asset.ID = asset.FullID.ToString();
479  }
480  }
481 
482  if (m_Cache != null)
483  m_Cache.Cache(asset);
484 
485  if (asset.Temporary || asset.Local)
486  {
487  return asset.ID;
488  }
489 
490  string uri = MapServer(asset.FullID.ToString()) + "/assets/";
491 
492  string newID = null;
493  try
494  {
495  newID = SynchronousRestObjectRequester.
496  MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth);
497  }
498  catch {}
499 
500  if (newID == null || newID == String.Empty || newID == stringUUIDZero)
501  {
502  //The asset upload failed, put it in a queue for later
503  asset.UploadAttempts++;
504  if (asset.UploadAttempts > 30)
505  {
506  //By this stage we've been in the queue for a good few hours;
507  //We're going to drop the asset.
508  m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString());
509  }
510  else
511  {
512  if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
513  {
514  m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
515  }
516  List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
517  m_queue.Add(asset);
518  m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString());
519  m_retryTimer.Start();
520  }
521  }
522  else
523  {
524  if (asset.UploadAttempts > 0)
525  {
526  m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
527  }
528  if (newID != asset.ID)
529  {
530  // Placing this here, so that this work with old asset servers that don't send any reply back
531  // SynchronousRestObjectRequester returns somethins that is not an empty string
532 
533  asset.ID = newID;
534 // what about FullID ????
535  if (m_Cache != null)
536  m_Cache.Cache(asset);
537  }
538  }
539  return asset.ID;
540  }
541 
542  public bool UpdateContent(string id, byte[] data)
543  {
544  AssetBase asset = null;
545 
546  if (m_Cache != null)
547  asset = m_Cache.Get(id);
548 
549  if (asset == null)
550  {
551  AssetMetadata metadata = GetMetadata(id);
552  if (metadata == null)
553  return false;
554 
555  asset = new AssetBase(metadata.FullID, metadata.Name, metadata.Type, UUID.Zero.ToString());
556  asset.Metadata = metadata;
557  }
558  asset.Data = data;
559 
560  string uri = MapServer(id) + "/assets/" + id;
561 
562  if (SynchronousRestObjectRequester.MakeRequest<AssetBase, bool>("POST", uri, asset, m_Auth))
563  {
564  if (m_Cache != null)
565  m_Cache.Cache(asset);
566 
567  return true;
568  }
569  return false;
570  }
571 
572  public bool Delete(string id)
573  {
574  string uri = MapServer(id) + "/assets/" + id;
575 
576  if (SynchronousRestObjectRequester.MakeRequest<int, bool>("DELETE", uri, 0, m_Auth))
577  {
578  if (m_Cache != null)
579  m_Cache.Expire(id);
580 
581  return true;
582  }
583  return false;
584  }
585  }
586 }
AssetBase Get(string id)
Get an asset synchronously.
bool Get(string id, Object sender, AssetRetrieved handler)
Get an asset synchronously or asynchronously (depending on whether it is locally cached) and fire a c...
string Store(AssetBase asset)
Creates a new asset
virtual bool[] AssetsExist(string[] ids)
Check if assets exist in the database.
AssetMetadata GetMetadata(string id)
Get an asset's metadata
bool Local
Is this a region only asset, or does this exist on the asset server also
Definition: AssetBase.cs:213
Asset class. All Assets are reference by this class or a class derived from this class ...
Definition: AssetBase.cs:49
bool UpdateContent(string id, byte[] data)
Update an asset's content
byte[] GetData(string id)
Get an asset's data, ignoring the metadata.
Implementation of a generic REST client
Definition: RestClient.cs:59
void retryCheck(object source, ElapsedEventArgs e)
UUID FullID
Asset UUID
Definition: AssetBase.cs:168
delegate void AssetRetrieved(string id, Object sender, AssetBase asset)
string ID
Asset MetaData ID (transferring from UUID to string ID)
Definition: AssetBase.cs:177
bool Temporary
Is this asset going to be saved to the asset database?
Definition: AssetBase.cs:222
AssetBase GetCached(string id)
Synchronously fetches an asset from the local cache only.