29 using System.Collections.Generic;
31 using System.Threading;
33 using OpenSim.Framework;
34 using OpenSim.Framework.Monitoring;
36 using OpenMetaverse.Packets;
40 namespace OpenSim.
Region.ClientStack.LindenUDP
53 public delegate
void PacketStats(
int inPackets,
int outPackets,
int unAckedBytes);
71 const float STATE_TASK_PERCENTAGE = 0.8f;
73 private static readonly ILog m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
77 const int THROTTLE_CATEGORY_COUNT = 8;
83 public int DebugDataOutLevel {
get; set; }
89 public int ThrottleDebugLevel
93 return m_throttleDebugLevel;
98 m_throttleDebugLevel = value;
106 private int m_throttleDebugLevel;
114 public event Func<ThrottleOutPacketTypeFlags, bool>
HasUpdates;
129 public readonly DoubleLocklessQueue<uint> PendingAcks =
new DoubleLocklessQueue<uint>();
136 public bool IsConnected =
true;
165 private int m_packetsReceivedReported;
167 private int m_packetsSentReported;
169 private int m_nextOnQueueEmpty = 1;
175 get {
return m_throttleClient; }
179 private readonly
TokenBucket[] m_throttleCategories;
181 private readonly DoubleLocklessQueue<OutgoingPacket>[] m_packetOutboxes =
new DoubleLocklessQueue<OutgoingPacket>[THROTTLE_CATEGORY_COUNT];
189 private byte[] m_packedThrottles;
191 private int m_defaultRTO = 1000;
192 private int m_maxRTO = 60000;
193 public bool m_deliverPackets =
true;
195 private float m_burstTime;
200 public int PingTimeMS
216 private double m_cannibalrate = 0.0;
239 IPEndPoint remoteEndPoint,
int defaultRTO,
int maxRTO)
242 RemoteEndPoint = remoteEndPoint;
243 CircuitCode = circuitCode;
244 m_udpServer = server;
246 m_defaultRTO = defaultRTO;
250 m_burstTime = rates.BrustTime;
251 float m_burst = rates.ClientMaxRate * m_burstTime;
257 m_throttleCategories =
new TokenBucket[THROTTLE_CATEGORY_COUNT];
259 m_cannibalrate = rates.CannibalizeTextureRate;
261 m_burst = rates.Total * rates.BrustTime;
263 for (
int i = 0; i < THROTTLE_CATEGORY_COUNT; i++)
268 m_packetOutboxes[i] =
new DoubleLocklessQueue<OutgoingPacket>();
270 m_throttleCategories[i] =
new TokenBucket(m_throttleClient, rates.
GetRate(type), m_burst);
277 TickLastPacketReceived = Environment.TickCount & Int32.MaxValue;
287 for (
int i = 0; i < THROTTLE_CATEGORY_COUNT; i++)
289 m_packetOutboxes[i].Clear();
290 m_nextPackets[i] = null;
294 m_throttleClient.Parent.UnregisterRequest(m_throttleClient);
295 OnPacketStats = null;
309 m_info.resendThrottle = (int)m_throttleCategories[(
int)ThrottleOutPacketType.Resend].DripRate;
310 m_info.landThrottle = (int)m_throttleCategories[(
int)ThrottleOutPacketType.Land].DripRate;
311 m_info.windThrottle = (int)m_throttleCategories[(
int)ThrottleOutPacketType.Wind].DripRate;
312 m_info.cloudThrottle = (int)m_throttleCategories[(
int)ThrottleOutPacketType.Cloud].DripRate;
313 m_info.taskThrottle = (int)m_throttleCategories[(
int)ThrottleOutPacketType.Task].DripRate;
314 m_info.assetThrottle = (int)m_throttleCategories[(
int)ThrottleOutPacketType.Asset].DripRate;
315 m_info.textureThrottle = (int)m_throttleCategories[(
int)ThrottleOutPacketType.Texture].DripRate;
316 m_info.totalThrottle = (int)m_throttleClient.DripRate;
329 throw new NotImplementedException();
341 total += m_packetOutboxes[i].Count;
353 int icat = (int)throttleType;
354 if (icat > 0 && icat < THROTTLE_CATEGORY_COUNT)
355 return m_packetOutboxes[icat].Count;
369 return string.Format(
370 "{0,7} {1,7} {2,7} {3,9} {4,7} {5,7} {6,7} {7,7} {8,7} {9,8} {10,7} {11,7}",
371 Util.EnvironmentTickCountSubtract(TickLastPacketReceived),
376 m_packetOutboxes[(
int)ThrottleOutPacketType.Resend].Count,
378 m_packetOutboxes[(
int)ThrottleOutPacketType.Wind].Count,
380 m_packetOutboxes[(
int)ThrottleOutPacketType.Task].Count,
382 m_packetOutboxes[(
int)ThrottleOutPacketType.Asset].Count);
387 PacketStats callback = OnPacketStats;
388 if (callback != null)
390 int newPacketsReceived = PacketsReceived - m_packetsReceivedReported;
391 int newPacketsSent = PacketsSent - m_packetsSentReported;
393 callback(newPacketsReceived, newPacketsSent, UnackedBytes);
395 m_packetsReceivedReported += newPacketsReceived;
396 m_packetsSentReported += newPacketsSent;
402 SetThrottles(throttleData, 1.0f);
410 if (!BitConverter.IsLittleEndian)
412 byte[] newData =
new byte[7 * 4];
413 Buffer.BlockCopy(throttleData, 0, newData, 0, 7 * 4);
415 for (
int i = 0; i < 7; i++)
416 Array.Reverse(newData, i * 4, 4);
422 adjData = throttleData;
426 float scale = 0.125f * factor;
427 int resend = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
428 int land = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
429 int wind = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
430 int cloud = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
431 int task = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
432 int texture = (int)(BitConverter.ToSingle(adjData, pos) * scale); pos += 4;
433 int asset = (int)(BitConverter.ToSingle(adjData, pos) * scale);
453 task = task + (int)(m_cannibalrate * texture);
454 texture = (int)((1 - m_cannibalrate) * texture);
456 int total = resend + land + wind + cloud + task + texture + asset;
458 float m_burst = total * m_burstTime;
460 if (ThrottleDebugLevel > 0)
463 "[LLUDPCLIENT]: {0} is setting throttles in {1} to Resend={2}, Land={3}, Wind={4}, Cloud={5}, Task={6}, Texture={7}, Asset={8}, TOTAL = {9}",
464 AgentID, m_udpServer.Scene.Name, resend, land, wind, cloud, task, texture, asset, total);
473 bucket = m_throttleCategories[(
int)ThrottleOutPacketType.Land];
474 bucket.RequestedDripRate = land;
475 bucket.RequestedBurst = m_burst;
481 bucket = m_throttleCategories[(
int)ThrottleOutPacketType.Cloud];
482 bucket.RequestedDripRate = cloud;
483 bucket.RequestedBurst = m_burst;
489 bucket = m_throttleCategories[(
int)ThrottleOutPacketType.Task];
490 bucket.RequestedDripRate = task;
491 bucket.RequestedBurst = m_burst;
498 m_packedThrottles = null;
503 byte[] data = m_packedThrottles;
509 data =
new byte[7 * 4];
515 rate = (float)m_throttleCategories[(
int)ThrottleOutPacketType.Resend].RequestedDripRate * multiplier;
516 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
518 rate = (float)m_throttleCategories[(
int)ThrottleOutPacketType.Land].RequestedDripRate * multiplier;
519 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
521 rate = (float)m_throttleCategories[(
int)ThrottleOutPacketType.Wind].RequestedDripRate * multiplier;
522 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
524 rate = (float)m_throttleCategories[(
int)ThrottleOutPacketType.Cloud].RequestedDripRate * multiplier;
525 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
527 rate = (float)m_throttleCategories[(
int)ThrottleOutPacketType.Task].RequestedDripRate * multiplier;
528 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
530 rate = (float)m_throttleCategories[(
int)ThrottleOutPacketType.Texture].RequestedDripRate * multiplier;
531 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
533 rate = (float)m_throttleCategories[(
int)ThrottleOutPacketType.Asset].RequestedDripRate * multiplier;
534 Buffer.BlockCopy(Utils.FloatToBytes(rate), 0, data, i, 4); i += 4;
536 m_packedThrottles = data;
545 if (icat > 0 && icat < THROTTLE_CATEGORY_COUNT)
548 return bucket.GetCatBytesCanSend(timeMS);
565 return EnqueueOutgoing(packet, forceQueue,
false);
570 int category = (int)packet.
Category;
572 if (category >= 0 && category < m_packetOutboxes.Length)
574 DoubleLocklessQueue<OutgoingPacket> queue = m_packetOutboxes[category];
576 if (m_deliverPackets ==
false)
578 queue.Enqueue(packet, highPriority);
582 TokenBucket bucket = m_throttleCategories[category];
585 if (queue.Count > 0 || m_nextPackets[category] != null)
587 queue.Enqueue(packet, highPriority);
594 bucket.RemoveTokens(packet.Buffer.DataLength);
600 queue.Enqueue(packet, highPriority);
633 DoubleLocklessQueue<OutgoingPacket> queue;
635 bool packetSent =
false;
640 for (
int i = 0; i < THROTTLE_CATEGORY_COUNT; i++)
642 bucket = m_throttleCategories[i];
645 if (m_nextPackets[i] != null)
654 m_udpServer.SendPacketFinal(nextPacket);
655 m_nextPackets[i] = null;
658 if (m_packetOutboxes[i].Count < 5)
659 emptyCategories |= CategoryToFlag(i);
666 queue = m_packetOutboxes[i];
669 bool success =
false;
672 success = queue.Dequeue(out packet);
676 m_packetOutboxes[i] =
new DoubleLocklessQueue<OutgoingPacket>();
685 m_udpServer.SendPacketFinal(packet);
689 emptyCategories |= CategoryToFlag(i);
694 m_nextPackets[i] = packet;
702 emptyCategories |= CategoryToFlag(i);
707 m_packetOutboxes[i] =
new DoubleLocklessQueue<OutgoingPacket>();
708 emptyCategories |= CategoryToFlag(i);
713 if (emptyCategories != 0)
714 BeginFireQueueEmpty(emptyCategories);
730 const float ALPHA = 0.125f;
731 const float BETA = 0.25f;
732 const float K = 4.0f;
743 RTTVAR = (1.0f - BETA) * RTTVAR + BETA * Math.Abs(SRTT - r);
744 SRTT = (1.0f - ALPHA) * SRTT + ALPHA * r;
747 int rto = (int)(SRTT + Math.Max(m_udpServer.TickCountResolution, K * RTTVAR));
750 rto = Utils.Clamp(rto, m_defaultRTO, m_maxRTO);
771 RTO = Math.Min(RTO * 2, m_maxRTO);
775 const int MIN_CALLBACK_MS = 10;
784 if (!m_isQueueEmptyRunning)
786 int start = Environment.TickCount & Int32.MaxValue;
788 if (start < m_nextOnQueueEmpty)
791 m_isQueueEmptyRunning =
true;
793 m_nextOnQueueEmpty = start + MIN_CALLBACK_MS;
794 if (m_nextOnQueueEmpty == 0)
795 m_nextOnQueueEmpty = 1;
797 if (HasUpdates(categories))
799 if (!m_udpServer.OqrEngine.IsRunning)
802 Util.FireAndForget(FireQueueEmpty, categories,
"LLUDPClient.BeginFireQueueEmpty");
806 m_udpServer.OqrEngine.QueueJob(AgentID.ToString(), () => FireQueueEmpty(categories));
811 m_isQueueEmptyRunning =
false;
816 private bool m_isQueueEmptyRunning;
829 QueueEmpty callback = OnQueueEmpty;
831 if (callback != null)
835 try { callback(categories); }
836 catch (Exception e) { m_log.Error(
"[LLUDPCLIENT]: OnQueueEmpty(" + categories +
") threw an exception: " + e.Message, e); }
840 m_isQueueEmptyRunning =
false;
843 internal void ForceThrottleSetting(
int throttle,
int setting)
845 if (throttle > 0 && throttle < THROTTLE_CATEGORY_COUNT)
846 m_throttleCategories[throttle].RequestedDripRate = Math.Max(setting, LLUDPServer.MTU);
849 internal int GetThrottleSetting(
int throttle)
851 if (throttle > 0 && throttle < THROTTLE_CATEGORY_COUNT)
852 return (
int)m_throttleCategories[throttle].RequestedDripRate;
883 case ThrottleOutPacketType.Land:
884 return ThrottleOutPacketTypeFlags.Land;
885 case ThrottleOutPacketType.Wind:
886 return ThrottleOutPacketTypeFlags.Wind;
887 case ThrottleOutPacketType.Cloud:
888 return ThrottleOutPacketTypeFlags.Cloud;
889 case ThrottleOutPacketType.Task:
890 return ThrottleOutPacketTypeFlags.Task;
891 case ThrottleOutPacketType.Texture:
892 return ThrottleOutPacketTypeFlags.Texture;
893 case ThrottleOutPacketType.Asset:
894 return ThrottleOutPacketTypeFlags.Asset;
901 public class DoubleLocklessQueue<T> :
OpenSim.Framework.LocklessQueue<T>
903 OpenSim.Framework.LocklessQueue<T> highQueue =
new OpenSim.Framework.LocklessQueue<T>();
905 public override int Count
909 return base.Count + highQueue.Count;
915 if (highQueue.Dequeue(out item))
918 return base.Dequeue(out item);
921 public void Enqueue(T item,
bool highPriority)
924 highQueue.Enqueue(item);
void BackoffRTO()
Exponential backoff of the retransmission timeout, per section 5.5 of RFC 2988
A circular buffer and hashset for tracking incoming packet sequence numbers
readonly UUID AgentID
AgentID for this client
A hierarchical token bucket for bandwidth throttling. See http://en.wikipedia.org/wiki/Token_bucket f...
int m_lastStartpingTimeMS
PacketStats OnPacketStats
Fired when updated networking stats are produced for this client
LLUDPClient(LLUDPServer server, ThrottleRates rates, TokenBucket parentThrottle, uint circuitCode, UUID agentID, IPEndPoint remoteEndPoint, int defaultRTO, int maxRTO)
Default constructor
int PacketsReceived
Number of packets received from this client
int TickLastPacketReceived
Environment.TickCount when the last packet was received for this client
void SetThrottles(byte[] throttleData, float factor)
bool AdaptiveThrottlesEnabled
Flag used to enable adaptive throttles
virtual float RequestedDripRate
bool CheckTokens(int amount)
float SRTT
Smoothed round-trip time. A smoothed average of the round-trip time for sending a reliable packet to ...
void SetClientInfo(ClientInfo info)
Modifies the UDP throttles
int GetCatBytesCanSend(ThrottleOutPacketType cat, int timeMS)
readonly IPEndPoint RemoteEndPoint
The remote address of the connected client
int UnackedBytes
Total byte count of unacked packets sent to this client
void Enqueue(T item, bool highPriority)
delegate void PacketStats(int inPackets, int outPackets, int unAckedBytes)
Fired when updated networking stats are produced for this client
string GetStats()
Return statistics information about client packet queues.
bool IsPaused
True when this connection is paused, otherwise false
override bool Dequeue(out T item)
OpenSim.Region.ClientStack.LindenUDP.TokenBucket TokenBucket
Tracks state for a client UDP connection and provides client-specific methods
ClientInfo GetClientInfo()
Gets information about this client connection
int GetRate(ThrottleOutPacketType type)
bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue)
Queue an outgoing packet if appropriate.
byte[] GetThrottlesPacked(float multiplier)
int RTO
Retransmission timeout. Packets that have not been acknowledged in this number of milliseconds or lon...
byte CurrentPingSequence
Current ping sequence number
bool RemoveTokens(int amount)
Remove a given number of tokens from the bucket
Holds drip rates and maximum burst rates for throttling with hierarchical token buckets. The maximum burst rates set here are hard limits and can not be overridden by client requests
bool DequeueOutgoing()
Loops through all of the packet queues for this client and tries to send an outgoing packet from each...
UDPPacketBuffer Buffer
Packet data to send
The LLUDP server for a region. This handles incoming and outgoing packets for all UDP connections to ...
float RTTVAR
Round-trip time variance. Measures the consistency of round-trip times
Interactive OpenSim region server
void UpdateRoundTrip(float r)
Called when an ACK packet is received and a round-trip time for a packet is calculated. This is used to calculate the smoothed round-trip time, round trip time variance, and finally the retransmission timeout
int GetTotalPacketsQueuedCount()
Get the total number of pakcets queued for this client.
int PacketsResent
Number of packets resent to this client
readonly float TickCountResolution
The measured resolution of Environment.TickCount
ThrottleOutPacketTypeFlags
ThrottleOutPacketType Category
Category this packet belongs to
void SetThrottles(byte[] throttleData)
delegate void QueueEmpty(ThrottleOutPacketTypeFlags categories)
Fired when the queue for one or more packet categories is empty. This event can be hooked to put more...
Special collection that is optimized for tracking unacknowledged packets
readonly uint CircuitCode
Circuit code that this client is connected on
void Shutdown()
Shuts down this client connection
void FireQueueEmpty(object o)
Fires the OnQueueEmpty callback and sets the minimum time that it can be called again ...
int PacketsSent
Number of packets sent to this client
int GetPacketsQueuedCount(ThrottleOutPacketType throttleType)
Get the number of packets queued for the given throttle type.
QueueEmpty OnQueueEmpty
Fired when the queue for a packet category is empty. This event can be hooked to put more data on the...
int BytesSinceLastACK
Number of bytes received since the last acknowledgement was sent out. This is used to loosely follow ...
int CurrentSequence
Current packet sequence number
Holds a reference to the LLUDPClientthis packet is destined for, along with the serialized packet dat...
bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue, bool highPriority)
Func< ThrottleOutPacketTypeFlags, bool > HasUpdates