Skip to content

Commit

Permalink
Force queue rules to form a tree
Browse files Browse the repository at this point in the history
Previously, queue rules could create DAGs of queues. This leads to
confusing edge cases and will not be supported in the v2 resource
management system that is being worked on.
  • Loading branch information
cberner committed Apr 27, 2016
1 parent 1380637 commit 41b482e
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 26 deletions.
16 changes: 10 additions & 6 deletions presto-docs/src/main/sphinx/admin/queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ execution by the previous queue. A slot for the query is reserved in all queues.
The query is rejected if no slot is available in any of the queues.

Rules are processed sequentially and the first one that matches will be used.
In the example configuration below, there are four queue templates.
In the example configuration below, there are five queue templates.
In the ``user.${USER}`` queue, ``${USER}`` will be expanded to the name of the
user that submitted the query. ``${SOURCE}`` is also supported, which expands
to the source submitting the query. The source name can be set as follows:
Expand All @@ -28,16 +28,16 @@ There are three rules that define which queries go into which queues:
* The first rule makes ``bob`` an admin.

* The second rule states that all queries that come from a source that includes ``pipeline``
should first be queued in the user's personal queue, then the ``pipeline`` queue. When a
query acquires a permit from a new queue, it doesn't release permits from previous queues
until the query finishes execution.
should first be queued in the user's personal pipeline queue, then the ``pipeline`` queue.
When a query acquires a permit from a new queue, it doesn't release permits from previous
queues until the query finishes execution.

* The last rule is a catch all, which puts all queries into the user's personal queue.

All together these rules implement the policy that ``bob`` is an admin and
all other users are subject to the follow limits:

* Users are allowed to have up to 5 queries running.
* Users are allowed to have up to 5 queries running. Additionally, they may run one pipeline.

* No more than 10 ``pipeline`` queries may run at once.

Expand All @@ -51,6 +51,10 @@ all other users are subject to the follow limits:
"maxConcurrent": 5,
"maxQueued": 20
},
"user_pipeline.${USER}": {
"maxConcurrent": 1,
"maxQueued": 10
},
"pipeline": {
"maxConcurrent": 10,
"maxQueued": 100
Expand All @@ -72,7 +76,7 @@ all other users are subject to the follow limits:
{
"source": ".*pipeline.*",
"queues": [
"user.${USER}",
"user_pipeline.${USER}",
"pipeline",
"global"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.jgrapht.GraphPath;
import org.jgrapht.Graphs;
import org.jgrapht.alg.FloydWarshallShortestPaths;
import org.jgrapht.graph.DefaultDirectedGraph;
import org.jgrapht.graph.DefaultEdge;
import org.jgrapht.graph.DirectedPseudograph;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -42,6 +42,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
Expand Down Expand Up @@ -81,13 +82,13 @@ public QueryQueueRuleFactory(QueryManagerConfig config, ObjectMapper mapper)
rules.add(QueryQueueRule.createRule(rule.getUserRegex(), rule.getSourceRegex(), rule.getSessionPropertyRegexes(), rule.getQueues(), definitions));
}
}
checkIsDAG(rules.build());
checkIsTree(rules.build());
this.selectors = rules.build();
}

private static void checkIsDAG(List<QueryQueueRule> rules)
private static void checkIsTree(List<QueryQueueRule> rules)
{
DirectedPseudograph<String, DefaultEdge> graph = new DirectedPseudograph<>(DefaultEdge.class);
DirectedGraph<String, DefaultEdge> graph = new DefaultDirectedGraph<>(DefaultEdge.class);
for (QueryQueueRule rule : rules) {
String lastQueueName = null;
for (QueryQueueDefinition queue : rule.getQueues()) {
Expand All @@ -100,6 +101,15 @@ private static void checkIsDAG(List<QueryQueueRule> rules)
}
}

for (String vertex : graph.vertexSet()) {
if (graph.outDegreeOf(vertex) > 1) {
List<String> targets = graph.outgoingEdgesOf(vertex).stream()
.map(graph::getEdgeTarget)
.collect(Collectors.toList());
throw new IllegalArgumentException(format("Queues must form a tree. Queue %s feeds into %s", vertex, targets));
}
}

List<String> shortestCycle = shortestCycle(graph);

if (shortestCycle != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public void testJsonParsing()
parse("queue_config.json");
assertFails("queue_config_bad_cycle.json", "Queues must not contain a cycle. The shortest cycle found is \\[q(.), q., q., q., q\\1\\]");
assertFails("queue_config_bad_selfcycle.json", "Queues must not contain a cycle. The shortest cycle found is \\[q1, q1\\]");
assertFails("queue_config_bad_out_degree.json", "Queues must form a tree. Queue q0 feeds into \\[q1, q2\\]");
}

private void parse(String fileName)
Expand Down
21 changes: 10 additions & 11 deletions presto-main/src/test/resources/queue_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
"maxConcurrent": 5,
"maxQueued": 20
},
"user_big.${USER}": {
"maxConcurrent": 1,
"maxQueued": 1
},
"user_pipeline.${USER}": {
"maxConcurrent": 5,
"maxQueued": 20
},
"pipeline": {
"maxConcurrent": 10,
"maxQueued": 100
Expand All @@ -30,27 +38,18 @@
"source": "ping_query",
"queues": ["admin"]
},
{
"session.experimental_big_query": "true",
"source": ".*pipeline.*",
"queues": [
"user.${USER}",
"pipeline",
"big"
]
},
{
"source": ".*pipeline.*",
"queues": [
"user.${USER}",
"user_pipeline.${USER}",
"pipeline",
"global"
]
},
{
"session.experimental_big_query": "true",
"queues": [
"user.${USER}",
"user_big.${USER}",
"big"
]
},
Expand Down
31 changes: 31 additions & 0 deletions presto-main/src/test/resources/queue_config_bad_out_degree.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"queues": {
"q0": {
"maxConcurrent": 5,
"maxQueued": 20
},
"q1": {
"maxConcurrent": 5,
"maxQueued": 20
},
"q2": {
"maxConcurrent": 5,
"maxQueued": 20
}
},
"rules": [
{
"user": "bob",
"queues": [
"q0",
"q1"
]
},
{
"queues": [
"q0",
"q2"
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,13 @@ public void testSqlQueryQueueManager()
// submit second non "dashboard" query
QueryId secondNonDashboardQuery = createQuery(queryRunner, newSession(), LONG_LASTING_QUERY);

// wait for the second non "dashboard" query to be queued ("user.${USER}" queue strategy only allows three user queries to be accepted for execution,
// two "dashboard" and one non "dashboard" queries are already accepted by "user.${USER}" queue)
waitForQueryState(queryRunner, secondNonDashboardQuery, QUEUED);
// wait for the second non "dashboard" query to start
waitForQueryState(queryRunner, secondNonDashboardQuery, RUNNING);

// cancel first "dashboard" query, second "dashboard" query and second non "dashboard" query should start running
cancelQuery(queryRunner, firstDashboardQuery);
waitForQueryState(queryRunner, firstDashboardQuery, FAILED);
waitForQueryState(queryRunner, secondDashboardQuery, RUNNING);
waitForQueryState(queryRunner, secondNonDashboardQuery, RUNNING);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
{
"source": "(?i).*dashboard.*",
"queues": [
"user.${USER}",
"dashboard.${USER}",
"user.${USER}",
"global"
]
},
Expand Down

0 comments on commit 41b482e

Please sign in to comment.