Skip to content

Commit

Permalink
Add RESET session property command
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Dec 26, 2014
1 parent 79d5da8 commit 181ee43
Show file tree
Hide file tree
Showing 23 changed files with 298 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,10 @@ private static void process(QueryRunner queryRunner, String sql, OutputFormat ou
query.renderOutput(System.out, outputFormat, interactive);

// update session properties if present
if (!query.getSetSessionProperties().isEmpty()) {
if (!query.getSetSessionProperties().isEmpty() || !query.getResetSessionProperties().isEmpty()) {
Map<String, String> sessionProperties = new HashMap<>(queryRunner.getSession().getProperties());
sessionProperties.putAll(query.getSetSessionProperties());
sessionProperties.keySet().removeAll(query.getResetSessionProperties());
queryRunner.setSession(withProperties(queryRunner.getSession(), sessionProperties));
}
}
Expand Down
6 changes: 6 additions & 0 deletions presto-cli/src/main/java/com/facebook/presto/cli/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.Writer;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.presto.cli.ConsolePrinter.REAL_TERMINAL;
Expand Down Expand Up @@ -63,6 +64,11 @@ public Map<String, String> getSetSessionProperties()
return client.getSetSessionProperties();
}

public Set<String> getResetSessionProperties()
{
return client.getResetSessionProperties();
}

public void renderOutput(PrintStream out, OutputFormat outputFormat, boolean interactive)
{
SignalHandler oldHandler = Signal.handle(SIGINT, new SignalHandler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class PrestoHeaders
public static final String PRESTO_LANGUAGE = "X-Presto-Language";
public static final String PRESTO_SESSION = "X-Presto-Session";
public static final String PRESTO_SET_SESSION = "X-Presto-Set-Session";
public static final String PRESTO_CLEAR_SESSION = "X-Presto-Clear-Session";

public static final String PRESTO_CURRENT_STATE = "X-Presto-Current-State";
public static final String PRESTO_MAX_WAIT = "X-Presto-Max-Wait";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.http.client.FullJsonResponseHandler;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpStatus;
Expand All @@ -29,10 +31,12 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.client.PrestoHeaders.PRESTO_CLEAR_SESSION;
import static com.facebook.presto.client.PrestoHeaders.PRESTO_SET_SESSION;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -69,6 +73,7 @@ public class StatementClient
private final String query;
private final AtomicReference<QueryResults> currentResults = new AtomicReference<>();
private final Map<String, String> setSessionProperties = new ConcurrentHashMap<>();
private final Set<String> resetSessionProperties = Sets.newConcurrentHashSet();
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean gone = new AtomicBoolean();
private final AtomicBoolean valid = new AtomicBoolean(true);
Expand Down Expand Up @@ -174,6 +179,11 @@ public Map<String, String> getSetSessionProperties()
return ImmutableMap.copyOf(setSessionProperties);
}

public Set<String> getResetSessionProperties()
{
return ImmutableSet.copyOf(resetSessionProperties);
}

public boolean isValid()
{
return valid.get() && (!isGone()) && (!isClosed());
Expand Down Expand Up @@ -235,6 +245,9 @@ private void processResponse(JsonResponse<QueryResults> response)
}
setSessionProperties.put(keyValue.get(0), keyValue.size() > 1 ? keyValue.get(1) : "");
}
for (String clearSession : response.getHeaders().get(PRESTO_CLEAR_SESSION)) {
resetSessionProperties.add(clearSession);
}
currentResults.set(response.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -113,7 +114,7 @@ public void addResults(QueryResults results)
}

@Override
public Void build(Map<String, String> setSessionProperties)
public Void build(Map<String, String> setSessionProperties, Set<String> resetSessionProperties)
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class QueryInfo
private final String query;
private final QueryStats queryStats;
private final Map<String, String> setSessionProperties;
private final Set<String> resetSessionProperties;
private final StageInfo outputStage;
private final FailureInfo failureInfo;
private final ErrorCode errorCode;
Expand All @@ -61,6 +62,7 @@ public QueryInfo(
@JsonProperty("query") String query,
@JsonProperty("queryStats") QueryStats queryStats,
@JsonProperty("setSessionProperties") Map<String, String> setSessionProperties,
@JsonProperty("resetSessionProperties") Set<String> resetSessionProperties,
@JsonProperty("outputStage") StageInfo outputStage,
@JsonProperty("failureInfo") FailureInfo failureInfo,
@JsonProperty("errorCode") ErrorCode errorCode,
Expand All @@ -73,6 +75,7 @@ public QueryInfo(
Preconditions.checkNotNull(fieldNames, "fieldNames is null");
Preconditions.checkNotNull(queryStats, "queryStats is null");
Preconditions.checkNotNull(setSessionProperties, "setSessionProperties is null");
Preconditions.checkNotNull(resetSessionProperties, "resetSessionProperties is null");
Preconditions.checkNotNull(query, "query is null");
Preconditions.checkNotNull(inputs, "inputs is null");

Expand All @@ -85,6 +88,7 @@ public QueryInfo(
this.query = query;
this.queryStats = queryStats;
this.setSessionProperties = ImmutableMap.copyOf(setSessionProperties);
this.resetSessionProperties = ImmutableSet.copyOf(resetSessionProperties);
this.outputStage = outputStage;
this.failureInfo = failureInfo;
this.errorCode = errorCode;
Expand Down Expand Up @@ -151,6 +155,12 @@ public Map<String, String> getSetSessionProperties()
return setSessionProperties;
}

@JsonProperty
public Set<String> getResetSessionProperties()
{
return resetSessionProperties;
}

@JsonProperty
public StageInfo getOutputStage()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.net.URI;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -85,6 +86,9 @@ public class QueryStateMachine
@GuardedBy("this")
private final Map<String, String> setSessionProperties = new LinkedHashMap<>();

@GuardedBy("this")
private final Set<String> resetSessionProperties = new LinkedHashSet<>();

@GuardedBy("this")
private Throwable failureCause;

Expand Down Expand Up @@ -247,6 +251,7 @@ public synchronized QueryInfo getQueryInfo(StageInfo rootStage)
query,
queryStats,
setSessionProperties,
resetSessionProperties,
rootStage,
failureInfo,
errorCode,
Expand Down Expand Up @@ -275,6 +280,16 @@ public synchronized void addSetSessionProperties(String key, String value)
setSessionProperties.put(checkNotNull(key, "key is null"), checkNotNull(value, "value is null"));
}

public synchronized Set<String> getResetSessionProperties()
{
return resetSessionProperties;
}

public synchronized void addResetSessionProperties(String name)
{
resetSessionProperties.add(checkNotNull(name, "name is null"));
}

public synchronized QueryState getQueryState()
{
return queryState.get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;

import com.facebook.presto.Session;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.sql.analyzer.SemanticException;
import com.facebook.presto.sql.tree.ResetSession;

import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_SESSION_PROPERTY;

public class ResetSessionTask
implements DataDefinitionTask<ResetSession>
{
@Override
public void execute(ResetSession statement, Session session, Metadata metadata, QueryStateMachine stateMachine)
{
if (statement.getName().getParts().size() > 2) {
throw new SemanticException(INVALID_SESSION_PROPERTY, statement, "Invalid session property '%s'", statement.getName());
}

stateMachine.addResetSessionProperties(statement.getName().toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.RenameTableTask;
import com.facebook.presto.execution.ResetSessionTask;
import com.facebook.presto.execution.SetSessionTask;
import com.facebook.presto.execution.SqlQueryManager;
import com.facebook.presto.metadata.DiscoveryNodeManager;
Expand All @@ -43,6 +44,7 @@
import com.facebook.presto.sql.tree.Insert;
import com.facebook.presto.sql.tree.Query;
import com.facebook.presto.sql.tree.RenameTable;
import com.facebook.presto.sql.tree.ResetSession;
import com.facebook.presto.sql.tree.SetSession;
import com.facebook.presto.sql.tree.ShowCatalogs;
import com.facebook.presto.sql.tree.ShowColumns;
Expand Down Expand Up @@ -139,6 +141,7 @@ public void configure(Binder binder)
bindDataDefinitionTask(binder, executionBinder, CreateView.class, CreateViewTask.class);
bindDataDefinitionTask(binder, executionBinder, DropView.class, DropViewTask.class);
bindDataDefinitionTask(binder, executionBinder, SetSession.class, SetSessionTask.class);
bindDataDefinitionTask(binder, executionBinder, ResetSession.class, ResetSessionTask.class);

jsonCodecBinder(binder).bindJsonCodec(ViewDefinition.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;

import static com.facebook.presto.client.PrestoHeaders.PRESTO_CLEAR_SESSION;
import static com.facebook.presto.client.PrestoHeaders.PRESTO_SET_SESSION;
import static com.facebook.presto.server.ResourceUtil.assertRequest;
import static com.facebook.presto.server.ResourceUtil.createSessionForRequest;
Expand Down Expand Up @@ -187,6 +188,10 @@ private static Response getQueryResults(Query query, Optional<Long> token, UriIn
query.getSetSessionProperties().entrySet().stream()
.forEach(entry -> response.header(PRESTO_SET_SESSION, entry.getKey() + '=' + entry.getValue()));

// add clear session properties
query.getResetSessionProperties().stream()
.forEach(name -> response.header(PRESTO_CLEAR_SESSION, name));

return response.build();
}

Expand Down Expand Up @@ -227,6 +232,9 @@ public static class Query
@GuardedBy("this")
private Map<String, String> setSessionProperties;

@GuardedBy("this")
private Set<String> resetSessionProperties;

public Query(Session session,
String query,
QueryManager queryManager,
Expand Down Expand Up @@ -261,6 +269,11 @@ public synchronized Map<String, String> getSetSessionProperties()
return setSessionProperties;
}

public synchronized Set<String> getResetSessionProperties()
{
return resetSessionProperties;
}

public synchronized QueryResults getResults(long token, UriInfo uriInfo, Duration maxWaitTime)
throws InterruptedException
{
Expand Down Expand Up @@ -328,6 +341,7 @@ else if (queryInfo.getOutputStage() == null) {

// update setSessionProperties
setSessionProperties = queryInfo.getSetSessionProperties();
resetSessionProperties = queryInfo.getResetSessionProperties();

// first time through, self is null
QueryResults queryResults = new QueryResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.joda.time.DateTimeZone;

import java.sql.Date;
Expand All @@ -37,6 +38,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -51,17 +53,19 @@ public class MaterializedResult
private final List<MaterializedRow> rows;
private final List<Type> types;
private final Map<String, String> setSessionProperties;
private final Set<String> resetSessionProperties;

public MaterializedResult(List<MaterializedRow> rows, List<? extends Type> types)
{
this(rows, types, ImmutableMap.of());
this(rows, types, ImmutableMap.of(), ImmutableSet.of());
}

public MaterializedResult(List<MaterializedRow> rows, List<? extends Type> types, Map<String, String> setSessionProperties)
public MaterializedResult(List<MaterializedRow> rows, List<? extends Type> types, Map<String, String> setSessionProperties, Set<String> resetSessionProperties)
{
this.rows = ImmutableList.copyOf(checkNotNull(rows, "rows is null"));
this.types = ImmutableList.copyOf(checkNotNull(types, "types is null"));
this.setSessionProperties = ImmutableMap.copyOf(checkNotNull(setSessionProperties, "setSessionProperties is null"));
this.resetSessionProperties = ImmutableSet.copyOf(checkNotNull(resetSessionProperties, "resetSessionProperties is null"));
}

public int getRowCount()
Expand Down Expand Up @@ -90,6 +94,11 @@ public Map<String, String> getSetSessionProperties()
return setSessionProperties;
}

public Set<String> getResetSessionProperties()
{
return resetSessionProperties;
}

@Override
public boolean equals(Object obj)
{
Expand All @@ -102,13 +111,14 @@ public boolean equals(Object obj)
MaterializedResult o = (MaterializedResult) obj;
return Objects.equal(types, o.types) &&
Objects.equal(rows, o.rows) &&
Objects.equal(setSessionProperties, o.setSessionProperties);
Objects.equal(setSessionProperties, o.setSessionProperties) &&
Objects.equal(resetSessionProperties, o.resetSessionProperties);
}

@Override
public int hashCode()
{
return Objects.hashCode(rows, types, setSessionProperties);
return Objects.hashCode(rows, types, setSessionProperties, resetSessionProperties);
}

@Override
Expand All @@ -118,6 +128,7 @@ public String toString()
.add("rows", rows)
.add("types", types)
.add("setSessionProperties", setSessionProperties)
.add("resetSessionProperties", resetSessionProperties)
.toString();
}

Expand All @@ -127,7 +138,7 @@ public MaterializedResult toJdbcTypes()
for (MaterializedRow row : rows) {
jdbcRows.add(convertToJdbcTypes(row));
}
return new MaterializedResult(jdbcRows.build(), types, setSessionProperties);
return new MaterializedResult(jdbcRows.build(), types, setSessionProperties, resetSessionProperties);
}

private static MaterializedRow convertToJdbcTypes(MaterializedRow prestoRow)
Expand Down
Loading

0 comments on commit 181ee43

Please sign in to comment.