Compare commits

...

1 Commits

22 changed files with 1018 additions and 0 deletions

View File

@ -0,0 +1,22 @@

Microsoft Visual Studio Solution File, Format Version 12.00
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Logic", "Logic\Logic.csproj", "{1EF89BE7-709C-420A-9FDE-6AED2D0FBF2E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Data", "Data\Data.csproj", "{0687300A-A514-43FE-924C-29E9203EBD50}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{1EF89BE7-709C-420A-9FDE-6AED2D0FBF2E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1EF89BE7-709C-420A-9FDE-6AED2D0FBF2E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1EF89BE7-709C-420A-9FDE-6AED2D0FBF2E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1EF89BE7-709C-420A-9FDE-6AED2D0FBF2E}.Release|Any CPU.Build.0 = Release|Any CPU
{0687300A-A514-43FE-924C-29E9203EBD50}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0687300A-A514-43FE-924C-29E9203EBD50}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0687300A-A514-43FE-924C-29E9203EBD50}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0687300A-A514-43FE-924C-29E9203EBD50}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal

View File

@ -0,0 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,32 @@
using System;
namespace Data
{
/// <summary>
/// A generic range-based filter for querying data in ITicketData
/// </summary>
public class Filter
{
public Filter(string key, double min, double max)
{
Key = key;
Min = min;
Max = max;
}
/// <summary>
/// The attribute to query
/// </summary>
public string Key { get; set; }
/// <summary>
/// The minimum value [inclusive] of the range
/// </summary>
public double Min { get; set; }
/// <summary>
/// The maximum value [exclusive] of the range
/// </summary>
public double Max { get; set; }
}
}

View File

@ -0,0 +1,55 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Data
{
/// <summary>
/// An interface for creating, deleting, querying, and updating ticket information
/// </summary>
public interface ITicketData
{
/// <summary>
/// Marks the provided ticket ids as ignorable in queries for a period of time
/// </summary>
/// <param name="ticketIds">The ticket ids to be ignored by queries</param>
/// <param name="durationMs"></param>
Task AwaitingAssignmentAsync(IEnumerable<Guid> ticketIds, long durationMs);
/// <summary>
/// Populates the assignment field of the target tickets
/// </summary>
/// <param name="ticketIds">A list of ticket ids to be assigned</param>
/// <param name="assignment">A string containing the assignment of the tickets</param>
/// <returns></returns>
Task AssignTicketsAsync(IEnumerable<Guid> ticketIds, string assignment);
/// <summary>
/// Executes a collection of queries on the ticket data and returns the union of the results
/// </summary>
/// <param name="query">The query to execute</param>
/// <returns>A collection of tickets union of the results</returns>
Task<IEnumerable<Ticket>> QueryTicketsAsync(Query query);
/// <summary>
/// Creates a ticket and adds it to the ticket datastore
/// </summary>
/// <param name="ticket">The ticket to create</param>
/// <remarks>This may create indexes for the data in the ticket to make it queryable</remarks>
Task CreateTicketAsync(Ticket ticket);
/// <summary>
/// Returns a ticket by id
/// </summary>
/// <param name="id">The id of the ticket</param>
/// <returns>The requested ticket if it exists</returns>
Task<Ticket> GetTicketAsync(Guid id);
/// <summary>
/// Deletes a ticket by id
/// </summary>
/// <param name="id">The id of the ticket</param>
/// <remarks>This may delete indexes for the data in the ticket</remarks>
Task DeleteTicketAsync(Guid id);
}
}

View File

@ -0,0 +1,202 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Data
{
public class MemoryData : ITicketData
{
private const string awaitingIndex = "awaitingAssignment";
private const string createdIndex = "created";
protected ConcurrentDictionary<Guid, Ticket> m_Tickets = new ConcurrentDictionary<Guid, Ticket>();
protected ConcurrentDictionary<string, SortedDictionary<Guid, double>> m_Indexes = new ConcurrentDictionary<string, SortedDictionary<Guid, double>>();
public Task AssignTicketsAsync(IEnumerable<Guid> ticketIds, string assignment)
{
foreach (var ticketId in ticketIds)
{
Ticket ticket = m_Tickets[ticketId];
RemoveTicketIndexes(ticket);
ticket.Assignment = assignment;
}
return Task.CompletedTask;
}
public Task AwaitingAssignmentAsync(IEnumerable<Guid> ticketIds, long durationMs)
{
long unixNowMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
AddOrCreateIndex(ticketIds, new KeyValuePair<string, double>(awaitingIndex, unixNowMs + durationMs));
return Task.CompletedTask;
}
public Task<IEnumerable<Ticket>> QueryTicketsAsync(Query query)
{
// Validate the query
if (query.Filters == null || query.Filters.Count < 1) throw new ArgumentException("Must specify at least 1 filter");
// Get a copy of the indexes and tickets
Dictionary<string, SortedDictionary<Guid, double>> indexes = new Dictionary<string, SortedDictionary<Guid, double>>();
foreach (var keyValueIndex in m_Indexes)
{
lock (keyValueIndex.Value)
{
indexes.Add(keyValueIndex.Key, new SortedDictionary<Guid, double>(keyValueIndex.Value));
}
}
List<List<Guid>> hits = new List<List<Guid>>();
foreach (var filter in query.Filters)
{
if (indexes.ContainsKey(filter.Key))
{
IEnumerable<Guid> hit = indexes[filter.Key]
.Where(i => i.Value >= filter.Min && i.Value <= filter.Max)
.Select(k => k.Key);
hits.Add(hit.ToList());
}
}
List<Guid> pool = hits.FirstOrDefault();
if (pool == null)
return Task.FromResult<IEnumerable<Ticket>>(new List<Ticket>());
for (int i = 1; i < hits.Count; i++)
{
pool = pool.Intersect(hits[i]).ToList();
}
// Built-in ignore index
if (indexes.ContainsKey(awaitingIndex))
{
long unixNowMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
IEnumerable<Guid> ignore = indexes[awaitingIndex].Where(i => i.Value >= unixNowMs).Select(k => k.Key);
pool = pool.Except(ignore).ToList(); // TODO: the list is ordered and except doesn't take advantage of that
}
List<Ticket> ticketList = new List<Ticket>();
foreach (var guid in pool)
{
ticketList.Add(m_Tickets[guid]);
}
return Task.FromResult<IEnumerable<Ticket>>(ticketList);
}
public Task CreateTicketAsync(Ticket ticket)
{
// Validate the ticket
if (ticket == null) throw new ArgumentNullException(nameof(ticket));
if (ticket.Attributes == null) throw new ArgumentNullException(paramName: "ticketAttributes");
if (ticket.Attributes.Count == 0) throw new ArgumentException("There must be at least 1 attribute to index", paramName: "ticketAttributes");
Ticket newTicket = new Ticket()
{
Id = ticket.Id,
Attributes = new Dictionary<string, double>(ticket.Attributes),
Properties = ticket.Properties,
Created = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
Assignment = string.IsNullOrEmpty(ticket.Assignment) ? string.Empty : ticket.Assignment
};
AddTicketIndexes(newTicket);
m_Tickets.TryAdd(newTicket.Id, newTicket);
return Task.CompletedTask;
}
public Task<Ticket> GetTicketAsync(Guid id)
{
if (m_Tickets.TryGetValue(id, out Ticket ticket))
{
return Task.FromResult(ticket);
}
throw new Exception("Not Found");
}
public Task DeleteTicketAsync(Guid id)
{
Ticket ticket = m_Tickets[id];
RemoveTicketIndexes(ticket);
m_Tickets.TryRemove(id, out ticket);
return Task.CompletedTask;
}
private void AddTicketIndexes(Ticket ticket)
{
foreach (var attribute in ticket.Attributes)
{
AddOrCreateIndex(ticket.Id, attribute);
}
// Built-in indexes
AddOrCreateIndex(ticket.Id, new KeyValuePair<string, double>(createdIndex, ticket.Created));
}
/// <summary>
/// The bulk version for reducing locks when updating lots of records at once with the same attribute
/// </summary>
private void AddOrCreateIndex(IEnumerable<Guid> ids, KeyValuePair<string, double> attribute)
{
if (!m_Indexes.ContainsKey(attribute.Key))
{
m_Indexes.TryAdd(attribute.Key, new SortedDictionary<Guid, double>());
}
lock (m_Indexes[attribute.Key])
{
foreach (var id in ids)
{
m_Indexes[attribute.Key].Add(id, attribute.Value);
}
}
}
private void AddOrCreateIndex(Guid id, KeyValuePair<string, double> attribute)
{
if (m_Indexes.ContainsKey(attribute.Key))
{
lock (m_Indexes[attribute.Key])
{
m_Indexes[attribute.Key].Add(id, attribute.Value);
}
}
else
{
m_Indexes.TryAdd(attribute.Key, new SortedDictionary<Guid, double>() { { id, attribute.Value } });
}
}
private void RemoveTicketIndexes(Ticket ticket)
{
foreach (var attribute in ticket.Attributes)
{
DeleteIndex(ticket.Id, attribute.Key);
}
}
private void DeleteIndex(Guid id, string attributeKey)
{
if (m_Indexes.ContainsKey(attributeKey))
{
lock (m_Indexes[attributeKey])
{
m_Indexes[attributeKey].Remove(id);
}
}
}
public async Task CreateTicketsAsync(IEnumerable<Ticket> tickets)
{
foreach (var ticket in tickets)
{
await CreateTicketAsync(ticket);
}
}
}
}

View File

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
namespace Data
{
/// <summary>
/// Captures a searching behavior for ITicketData
/// </summary>
public class Query
{
public Query()
{
}
public Query(List<Filter> filters)
{
Filters = filters;
}
/// <summary>
/// A list of hard filters to be applied to the provided searchable attributes
/// </summary>
public List<Filter> Filters { get; set; }
}
}

View File

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;
namespace Data
{
/// <summary>
/// Intended to be used as a data model abstraction for handling groups of players organized into indexable tickets
/// </summary>
public class Ticket
{
/// <summary>
/// The identifier of the ticket tracked by clients
/// </summary>
public Guid Id { get; set; }
/// <summary>
/// A contract for allowing a backend to provide assignment information
/// </summary>
public string Assignment { get; set; }
/// <summary>
/// Range indexes
/// </summary>
public IDictionary<string, double> Attributes { get; set; }
/// <summary>
/// The milliseconds in unix utc representing when this ticket was created
/// </summary>
public long Created {get;set;}
/// <summary>
/// Custom data provided by the ticket creator
/// </summary>
public JObject Properties { get; set; }
}
}

View File

@ -0,0 +1,32 @@
using System;
using System.Net.Http;
using Data;
using Logic.InternalContracts;
using Microsoft.Extensions.Logging;
namespace Logic
{
public class FunctionClientResolver
{
readonly HttpClient m_HttpClient;
ILoggerFactory m_LoggerFactory;
public FunctionClientResolver(IServiceProvider serviceProvider, HttpClient httpClient, ITicketData ticketData, ILoggerFactory loggerFactory)
{
m_HttpClient = httpClient;
m_LoggerFactory = loggerFactory;
}
public IFunctionClient GetFunctionClientByTarget(TargetFunction target)
{
switch (target.Kind)
{
case FunctionKind.Rest:
return new FunctionRestClient(m_HttpClient, target, m_LoggerFactory.CreateLogger<FunctionRestClient>());
default:
throw new ArgumentOutOfRangeException();
}
}
}
}

View File

@ -0,0 +1,62 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Logic.InternalContracts;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Logic
{
public class FunctionRestClient : IFunctionClient
{
HttpClient m_Client;
string m_Address;
ILogger<FunctionRestClient> m_Log;
public FunctionRestClient(HttpClient client, TargetFunction targetFunction, ILogger<FunctionRestClient> log)
{
m_Log = log;
m_Client = client;
IPHostEntry dns = Dns.GetHostEntry(targetFunction.Name);
m_Address = dns.AddressList[0].ToString();
}
public async Task<IEnumerable<Match>> RunAsync(MatchSpec spec, CancellationToken cancellationToken)
{
FunctionRestParams context = new FunctionRestParams()
{
Pools = new List<Pool>(),
Config = spec.Config
};
foreach (var specPool in spec.Pools)
{
context.Pools.Add(new Pool() { Name = specPool.Key, Filters = specPool.Value });
}
string json = JsonConvert.SerializeObject(context);
string url = "http://" + m_Address + ":8080" + "/api/function";
m_Log.LogDebug("Calling {Url} with body {Body}", url, json);
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, url);
request.Content = new StringContent(json);
request.Content.Headers.Clear();
request.Content.Headers.Add("Content-Type", "application/json");
HttpResponseMessage message = await m_Client.SendAsync(request, cancellationToken);
if (!message.IsSuccessStatusCode)
{
m_Log.LogWarning("{StatusCode} received from function {Url}. {Reason}", message.StatusCode, m_Address, message.ReasonPhrase);
}
string body = await message.Content.ReadAsStringAsync();
return JsonConvert.DeserializeObject<List<Match>>(body);
}
}
}

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Logic.InternalContracts;
namespace Logic
{
public interface IEvaluator
{
Task<List<Match>> Evaluate(List<Match> Matches);
}
}

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Logic.InternalContracts;
namespace Logic
{
public interface IFunctionClient
{
Task<IEnumerable<Match>> RunAsync(MatchSpec config, CancellationToken cancellationToken);
}
}

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Logic.InternalContracts;
namespace Logic
{
public interface IMatchmakingBackend
{
Task<List<Match>> GetMatchesAsync(List<MatchSpec> matchSpecs, CancellationToken cancellationToken);
}
}

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;
namespace Logic.InternalContracts
{
public class FunctionRestParams
{
public JObject Config { get; set; }
public List<Pool> Pools { get; set; }
}
}

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using Data;
using Newtonsoft.Json.Linq;
namespace Logic.InternalContracts
{
public class Match
{
public Guid Id { get; set; }
public List<Ticket> Tickets { get; set; }
public JObject Properties { get; set; }
}
}

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using Data;
using Newtonsoft.Json.Linq;
namespace Logic.InternalContracts
{
public class MatchSpec
{
public TargetFunction Target { get; set; }
public JObject Config { get; set; }
public IDictionary<string, List<Filter>> Pools { get; set; }
}
}

View File

@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using Data;
namespace Logic.InternalContracts
{
/// <summary>
/// A generalized matchmaking "hard" filtering description. Consists of sets of filters
/// </summary>
public class Pool
{
/// <summary>
/// A friendly name identifier for the pool
/// </summary>
public string Name { get; set; }
/// <summary>
/// The collection of generic filters for performing query logic
/// </summary>
public List<Filter> Filters { get; set; }
}
}

View File

@ -0,0 +1,21 @@
using System;
namespace Logic.InternalContracts
{
public class TargetFunction
{
public string Name { get; set; }
public string Version { get; set; }
public FunctionKind Kind { get; set; }
}
public enum FunctionKind
{
None,
Rest,
Grpc,
Memory
}
}

View File

@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Data\Data.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,78 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Data;
using Logic.InternalContracts;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
namespace Logic
{
public class MatchmakingBackend : IMatchmakingBackend
{
ITicketData m_TicketData;
SynchronizationContext m_SyncContext;
ILogger<MatchmakingBackend> m_Logger;
FunctionClientResolver m_FunctionClientResolver;
public MatchmakingBackend(ITicketData ticketData, ILogger<MatchmakingBackend> logger, FunctionClientResolver resolver, SynchronizationContext syncContext)
{
m_TicketData = ticketData;
m_Logger = logger;
m_FunctionClientResolver = resolver;
m_SyncContext = syncContext;
}
public async Task<List<Match>> GetMatchesAsync(List<MatchSpec> matchSpecs, CancellationToken cancellationToken)
{
// Generate a cancellation time for all the functions. TODO: Make the global timeout configurable
CancellationToken token = AddTimeCancellationToken(cancellationToken, 60000);
Guid contextRegistrationId = await m_SyncContext.AcquireContext();
// Execute functions in parallel
Stopwatch watch = Stopwatch.StartNew();
List<Task<IEnumerable<Match>>> tasks = new List<Task<IEnumerable<Match>>>();
foreach (var matchSpec in matchSpecs)
{
IFunctionClient client = m_FunctionClientResolver.GetFunctionClientByTarget(matchSpec.Target);
m_Logger.LogInformation("Running target {Target} as {Kind}", matchSpec.Target.Name, matchSpec.Target.Kind);
tasks.Add(Task.Run(() => client.RunAsync(matchSpec, token)));
}
// Wait for all the Matches to come back in
List<Match> Matches = (await Task.WhenAll(tasks)).SelectMany(r => r.AsEnumerable()).ToList();
m_Logger.LogInformation("Function run time {ElapsedMs}ms. Submitting {MatchCount} Matches for evaluation.", watch.ElapsedMilliseconds, Matches.Count);
watch.Restart();
// Send the Matches to the evaluator, which will automatically synchronize the Matches
List<Match> goodMatches = await m_SyncContext.EvaluateAsync(contextRegistrationId, Matches);
m_Logger.LogDebug("Evaluator waiting time {ElapsedMs}ms", watch.ElapsedMilliseconds);
// Tell the data api so it can start de-indexing those players
List<Match> matches = new List<Match>();
foreach (var Match in goodMatches)
{
matches.Add(new Match()
{
Properties = JObject.FromObject(Match.Properties),
Tickets = Match.Tickets,
// MatchSpec = null TODO: Maybe re-associate the original matchSpec
});
}
return matches;
}
static CancellationToken AddTimeCancellationToken(CancellationToken token, int ms)
{
CancellationTokenSource cts = new CancellationTokenSource(ms);
return CancellationTokenSource.CreateLinkedTokenSource(token, cts.Token).Token;
}
}
}

View File

@ -0,0 +1,53 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Logic.InternalContracts;
using Microsoft.Extensions.Logging;
namespace Logic
{
/// <summary>
/// A Match de-collider takes non-colliding Matches in descending score order
/// </summary>
public class ScoreEvaluator : IEvaluator
{
ILogger<ScoreEvaluator> m_Log { get; }
public ScoreEvaluator(ILogger<ScoreEvaluator> log)
{
m_Log = log;
}
public Task<List<Match>> Evaluate(List<Match> Matches)
{
m_Log.LogDebug("{MatchCount} Matches to be evaluated", Matches.Count);
// Sort the Matches by score
Matches = Matches.OrderByDescending(p => p.Properties["score"]).ToList();
List<Match> goodMatches = new List<Match>();
HashSet<Guid> ticketsPresent = new HashSet<Guid>();
foreach (var nextMatch in Matches)
{
// Optimize by converting the prop tickets to a hashset
var propTickets = nextMatch.Tickets.Select(t => t.Id).ToHashSet();
// Check if any of the tickets in the Match are already spoken for
if (ticketsPresent.Overlaps(propTickets))
continue;
// If not, the Match is a good match and mark the tickets as spoken for
goodMatches.Add(nextMatch);
foreach (var ticketId in propTickets)
{
ticketsPresent.Add(ticketId);
}
}
m_Log.LogDebug("{MatchesApproved} Matches approved in evaluation. {TicketsApproved} tickets approved", goodMatches.Count, ticketsPresent.Count);
return Task.FromResult(goodMatches);
}
}
}

View File

@ -0,0 +1,252 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Data;
using Logic.InternalContracts;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Logic
{
/// <summary>
/// A threadsafe way to run an evaluator. Intended to be used as a singleton, or shared context behind a service
/// </summary>
public class SynchronizationContext
{
int m_MinRunMs;
int m_MaxRunMs;
ITicketData m_TicketData;
IEvaluator m_Evaluator;
ILogger<SynchronizationContext> m_Logger;
ConcurrentDictionary<Guid, List<Match>> m_ContextMatches = new ConcurrentDictionary<Guid, List<Match>>();
ConcurrentDictionary<Guid, List<Guid>> m_ContextResults = new ConcurrentDictionary<Guid, List<Guid>>();
ConcurrentDictionary<Guid, bool> m_ExistingContexts = new ConcurrentDictionary<Guid, bool>();
ManualResetEvent m_NewContextsAvailable = new ManualResetEvent(false);
ManualResetEvent m_ResultsAvailable = new ManualResetEvent(false);
bool m_AcceptingMatches = false;
Timer m_Timer;
Stopwatch m_Watch = new Stopwatch();
Task m_EvalTask = null;
SyncState m_State = SyncState.NotRunning;
object startLock = new object();
enum SyncState
{
NotRunning,
AcceptingContexts,
AcceptingMatches,
Evaluating
}
public SynchronizationContext(ILogger<SynchronizationContext> logger, ITicketData ticketData, IEvaluator evaluator, IOptions<SynchronizationOptions> options)
{
m_Logger = logger;
m_MinRunMs = options.Value.MinWindowSizeMs;
m_MaxRunMs = options.Value.MaxWindowSizeMs;
m_Evaluator = evaluator;
m_TicketData = ticketData;
// TODO: Make the loop event schedulable instead of loop driven
m_Timer = new Timer(
UpdateState,
new AutoResetEvent(true),
options.Value.StateMachineUpdateMs,
options.Value.StateMachineUpdateMs
);
}
/// <summary>
/// Thread safe way to acquire a contextId and register for the evaluator
/// </summary>
/// <returns>A contextId</returns>
public async Task<Guid> AcquireContext()
{
Stopwatch watch = Stopwatch.StartNew();
Guid contextId = await WaitRegisterContextAsync();
m_Logger.LogDebug("{ElapsedMs}ms to acquire context", watch.ElapsedMilliseconds);
return contextId;
}
/// <summary>
/// Thread safe way to let the evaluator de-collide the passed in Matches with other contexts
/// </summary>
/// <param name="contextId">The id of this context</param>
/// <param name="Matches">Matches to de-collide</param>
/// <returns>A list of de-collided Matches</returns>
/// <exception cref="Exception"></exception>
public async Task<List<Match>> EvaluateAsync(Guid contextId, List<Match> Matches)
{
Stopwatch watch = Stopwatch.StartNew();
// Try to register the Matches with the machine
if (TryRegisterMatches(contextId, Matches))
{
m_Logger.LogDebug("{ElapsedMs}ms to register Matches", watch.ElapsedMilliseconds);
watch.Restart();
// Wait for the machine to run and return my results
List<Guid> good = await WaitResultsAsync(contextId);
m_Logger.LogDebug("{ElapsedMs}ms evaluation results available", watch.ElapsedMilliseconds);
List<Match> goodMatches = new List<Match>();
foreach (var prop in Matches)
{
if (good.Contains(prop.Id))
{
goodMatches.Add(prop);
}
}
return goodMatches;
}
throw new Exception("Match registration failed");
}
/// <summary>
/// Attempt to wait for context acquisition to become available and register for one. If the evaluator
/// is not running, it will set the state machine into a runnable state
/// </summary>
/// <returns></returns>
private Task<Guid> WaitRegisterContextAsync()
{
// If the machine isn't started, try to start it
if (m_State == SyncState.NotRunning)
{
lock(startLock)
{
// Make sure this call got the lock in time, otherwise bail
if (m_State == SyncState.NotRunning)
{
// The machine isn't running so clear the current results, any registrations, and any Matches
m_ContextResults.Clear();
m_ExistingContexts.Clear();
m_ContextMatches.Clear();
// Allow new contexts to register, allow new Matches, and disallow results reading
m_State = SyncState.AcceptingContexts;
m_NewContextsAvailable.Set();
m_ResultsAvailable.Reset();
m_AcceptingMatches = true;
m_Watch = Stopwatch.StartNew();
}
}
}
m_NewContextsAvailable.WaitOne(5000); // TODO: Make this wait timeout automated
Guid newId = Guid.NewGuid();
m_ExistingContexts.TryAdd(newId, false);
return Task.FromResult(newId);
}
private Task<List<Guid>> WaitResultsAsync(Guid contextId)
{
m_ResultsAvailable.WaitOne(5000); // TODO: Make this wait timeout automated
return Task.FromResult(m_ContextResults[contextId]);
}
private bool TryRegisterMatches(Guid contextId, List<Match> Matches)
{
if (!m_ExistingContexts.ContainsKey(contextId)) return false;
if (!m_AcceptingMatches) return false;
m_ExistingContexts[contextId] = true;
m_ContextResults.TryAdd(contextId, new List<Guid>());
return m_ContextMatches.TryAdd(contextId, Matches);
}
private void UpdateState(object state)
{
switch (m_State)
{
case SyncState.NotRunning:
break;
case SyncState.AcceptingContexts:
if (m_Watch.ElapsedMilliseconds > m_MinRunMs)
{
m_Logger.LogDebug("Min window passed at {ElapsedMs}ms", m_Watch.ElapsedMilliseconds);
m_State = SyncState.AcceptingMatches;
m_NewContextsAvailable.Reset();
UpdateState(state); // Just go ahead and check the accepting state
}
break;
case SyncState.AcceptingMatches:
bool maxWindowExceeded = m_Watch.ElapsedMilliseconds > m_MaxRunMs;
bool allIn = m_ExistingContexts.Values.All(b => b);
if (m_Watch.ElapsedMilliseconds > m_MaxRunMs || m_ExistingContexts.Values.All(b => b))
{
if (maxWindowExceeded) m_Logger.LogDebug("Max window exceeded. Moving to eval at {ElapsedMs}ms", m_Watch.ElapsedMilliseconds);
if (allIn) m_Logger.LogDebug("All contexts reported in. Moving to eval at {ElapsedMs}ms", m_Watch.ElapsedMilliseconds);
m_State = SyncState.Evaluating;
m_AcceptingMatches = false;
m_EvalTask = Task.Run(async () =>
{
// Run the evaluator in parallel to this state system
await RunEvaluation(m_Evaluator);
// Once done, clear the other threads to read from the results. Set the machine back to doing nothing
m_State = SyncState.NotRunning;
m_ResultsAvailable.Set();
m_Logger.LogDebug("Evaluation completed at {ElapsedMs}ms", m_Watch.ElapsedMilliseconds);
m_Watch.Reset();
});
}
break;
case SyncState.Evaluating:
break;
default:
throw new ArgumentOutOfRangeException();
}
}
private async Task RunEvaluation(IEvaluator evaluator)
{
// Create a reverse map of context to Match (to rebuild the results at the end)
Dictionary<Guid, Guid> MatchIdToContextId = new Dictionary<Guid, Guid>();
List<Match> allMatches = new List<Match>();
foreach (var contextMatch in m_ContextMatches)
{
foreach (var Match in contextMatch.Value)
{
allMatches.Add(Match);
MatchIdToContextId.Add(Match.Id, contextMatch.Key);
}
}
// Run the evaluator
List<Match> matches = await evaluator.Evaluate(allMatches);
// Flag the selected tickets as un-queryable for a set period of time. // TODO: Make configurable
List<Guid> ticketsTaken = matches.SelectMany(m => m.Tickets.Select(t => t.Id)).ToList();
await m_TicketData.AwaitingAssignmentAsync(ticketsTaken, 60000);
// Put the match results into the proper results context. TODO: Failure handled good enough by unqueryable timeout?
foreach (var match in matches)
{
Guid contextId = MatchIdToContextId[match.Id];
m_ContextResults[contextId].Add(match.Id);
}
}
}
}

View File

@ -0,0 +1,15 @@
using System;
namespace Logic
{
public class SynchronizationOptions
{
public const string SectionName = "SynchronizationOptions";
public int MinWindowSizeMs { get; set; }
public int MaxWindowSizeMs { get; set; }
public int StateMachineUpdateMs { get; set; }
}
}