Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade akka libs #5491

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ Spray Caching 1.3.4 (io.spray:spray-caching_2.11:1.3.4 - http:https://spray.io/documen
License included at licenses/LICENSE-spray.txt, or https://github.com/spray/spray/blob/master/LICENSE
Copyright (C) 2011-2015 the spray project <http:https://spray.io>

common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3Exception.scala
is based on https://github.com/akka/alpakka/blob/v1.0.2/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala
Copyright (C) 2016-2019 Lightbend Inc. <http:https://www.lightbend.com>

This product bundles the files gradlew and gradlew.bat from Gradle v5.5
which are distributed under the Apache License, Version 2.0.
For details see ./gradlew and ./gradlew.bat.
6 changes: 3 additions & 3 deletions common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ dependencies {
exclude group: 'org.scala-lang', module: 'scala-compiler'
exclude group: 'org.scala-lang', module: 'scala-reflect'
}
api "io.spray:spray-json_${gradle.scala.depVersion}:1.3.5"
api "io.spray:spray-json_${gradle.scala.depVersion}:1.3.6"
api "com.lihaoyi:fastparse_${gradle.scala.depVersion}:2.3.0"
api "com.typesafe.akka:akka-actor_${gradle.scala.depVersion}:${gradle.akka.version}"
api "com.typesafe.akka:akka-stream_${gradle.scala.depVersion}:${gradle.akka.version}"
Expand All @@ -53,7 +53,7 @@ dependencies {
api "com.typesafe.akka:akka-http-core_${gradle.scala.depVersion}:${gradle.akka_http.version}"
api "com.typesafe.akka:akka-http-spray-json_${gradle.scala.depVersion}:${gradle.akka_http.version}"

api "com.lightbend.akka:akka-stream-alpakka-file_${gradle.scala.depVersion}:1.1.2"
api "com.lightbend.akka:akka-stream-alpakka-file_${gradle.scala.depVersion}:3.0.4"

api "ch.qos.logback:logback-classic:1.2.11"
api "org.slf4j:jcl-over-slf4j:1.7.25"
Expand Down Expand Up @@ -95,7 +95,7 @@ dependencies {
api "io.reactivex:rxjava-reactive-streams:1.2.1"


api ("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:1.1.2") {
api ("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:3.0.4") {
exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.dataformat'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ object OwSink {
*/
def combine[T, U, M1, M2](first: Sink[U, M1], second: Sink[U, M2])(
strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, (M1, M2)] = {
Sink.fromGraph(GraphDSL.create(first, second)((_, _)) { implicit b => (s1, s2) =>
Sink.fromGraph(GraphDSL.createGraph(first, second)((_, _)) { implicit b => (s1, s2) =>
import GraphDSL.Implicits._
val d = b.add(strategy(2))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.headers.Authorization
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
Expand Down Expand Up @@ -73,7 +72,7 @@ case class SplunkLogStoreConfig(host: String,
disableSNI: Boolean)
case class SplunkResponse(results: Vector[JsObject])
object SplunkResponseJsonProtocol extends DefaultJsonProtocol {
implicit val orderFormat = jsonFormat1(SplunkResponse)
implicit val orderFormat: RootJsonFormat[SplunkResponse] = jsonFormat1(SplunkResponse)
}

/**
Expand Down Expand Up @@ -180,7 +179,7 @@ class SplunkLogStore(
//based on http:https://doc.akka.io/docs/akka-http/10.0.6/scala/http/client-side/host-level.html
val queue =
Source
.queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests, OverflowStrategy.dropNew)
.queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests)
.via(httpFlow.getOrElse(defaultHttpFlow))
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
Expand All @@ -190,7 +189,7 @@ class SplunkLogStore(

def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue.offer(request -> responsePromise).flatMap {
queue.offer(request -> responsePromise) match {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped =>
Future.failed(new RuntimeException("Splunk API Client Queue overflowed. Try again later."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.openwhisk.core.database
import akka.Done
import akka.actor.ActorSystem

import scala.annotation.nowarn
import scala.collection.immutable.Queue
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -59,6 +60,7 @@ class Batcher[T, R](batchSize: Int, concurrency: Int, retry: Int)(operation: (Se
CompletionStrategy.immediately
}

@nowarn("msg=deprecated")
private val stream = Source
.actorRef[(T, Promise[R])](
completionMatcher = cm,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[database] object StoreUtils {

def combinedSink[T](dest: Sink[ByteString, Future[T]])(
implicit ec: ExecutionContext): Sink[ByteString, Future[AttachmentUploadResult[T]]] = {
Sink.fromGraph(GraphDSL.create(digestSink(), lengthSink(), dest)(combineResult) {
Sink.fromGraph(GraphDSL.createGraph(digestSink(), lengthSink(), dest)(combineResult) {
implicit builder => (dgs, ls, dests) =>
import GraphDSL.Implicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.alpakka.s3.headers.CannedAcl
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{S3Attributes, S3Exception, S3Headers, S3Settings}
import akka.stream.alpakka.s3.{S3Attributes, S3Headers, S3Settings}
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import com.typesafe.config.Config
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Copyright (C) 2016-2019 Lightbend Inc. <http:https://www.lightbend.com>
*/

package org.apache.openwhisk.core.database.s3

import scala.util.Try
import scala.xml.{Elem, XML}

/**
* Exception thrown by S3 operations.
*
* Copied from https://github.com/akka/alpakka/blob/v1.0.2/s3/src/main/scala/akka/stream/alpakka/s3/S3Exception.scala
*/
private[s3] class S3Exception(val code: String, val message: String, val requestId: String, val hostId: String)
extends RuntimeException(message) {

def this(xmlResponse: Elem) =
this(
(xmlResponse \ "Code").text,
(xmlResponse \ "Message").text,
(xmlResponse \ "RequestID").text,
(xmlResponse \ "HostID").text)

def this(response: String) =
this(
Try(XML.loadString(response)).getOrElse(
<Error><Code>-</Code><Message>{response}</Message><RequestID>-</RequestID><HostID>-</HostID></Error>))

override def toString: String =
s"${super.toString} (Code: $code, RequestID: $requestId, HostID: $hostId)"

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.openwhisk.core.containerpool.v2
import java.net.InetSocketAddress
import java.time.Instant
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{actorRef2Scala, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import akka.event.Logging.InfoLevel
import akka.io.{IO, Tcp}
import akka.pattern.pipe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ object Invoker {
}

// load values for the required properties from the environment
implicit val config = new WhiskConfig(requiredProperties, optionalProperties)
implicit val config: WhiskConfig = new WhiskConfig(requiredProperties, optionalProperties)

def abort(message: String) = {
logger.error(this, message)(TransactionId.invoker)
Expand Down
8 changes: 4 additions & 4 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ gradle.ext.scalafmt = [
config: new File(rootProject.projectDir, '.scalafmt.conf')
]

gradle.ext.akka = [version : '2.6.12']
gradle.ext.akka_kafka = [version : '2.0.5']
gradle.ext.akka_http = [version : '10.2.4']
gradle.ext.akka_management = [version : '1.0.5']
gradle.ext.akka = [version : '2.6.21']
gradle.ext.akka_kafka = [version : '2.1.1']
gradle.ext.akka_http = [version : '10.2.10']
gradle.ext.akka_management = [version : '1.1.4']

gradle.ext.curator = [version : '4.3.0']
gradle.ext.kube_client = [version: '4.10.3']
Expand Down
Loading