Skip to content

Commit

Permalink
[runtime] Fix TaskManager's BLOB service host lookup when connecting …
Browse files Browse the repository at this point in the history
…to the JobManager
  • Loading branch information
StephanEwen committed Apr 7, 2015
1 parent 09bd1f8 commit 4b89855
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,16 @@ extends Actor with ActorLogMessages with ActorLogging {
id: InstanceID,
blobPort: Int): Unit = {

if (jobManager == null) {
throw new NullPointerException("jobManager may not be null")
}
if (id == null) {
throw new NullPointerException("instance ID may not be null")
}
if (blobPort <= 0 || blobPort > 65535) {
throw new IllegalArgumentException("blob port is out of range: " + blobPort)
}

// sanity check that we are not currently registered with a different JobManager
if (isConnected) {
if (currentJobManager.get == jobManager) {
Expand Down Expand Up @@ -644,9 +654,8 @@ extends Actor with ActorLogMessages with ActorLogging {

// start a blob service, if a blob server is specified
if (blobPort > 0) {
val address = new InetSocketAddress(
currentJobManager.flatMap(_.path.address.host).getOrElse("localhost"),
blobPort)
val jmHost = jobManager.path.address.host.getOrElse("localhost")
val address = new InetSocketAddress(jmHost, blobPort)

LOG.info("Determined BLOB server address to be {}. Starting BLOB cache.", address)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ public void onReceive(Object message) throws Exception {
if (message instanceof RegistrationMessages.RegisterTaskManager) {
final InstanceID iid = new InstanceID();
final ActorRef self = getSelf();
getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, -1), self);
getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, 12345), self);
}
else if(message instanceof TaskMessages.UpdateTaskExecutionState){
getSender().tell(true, getSelf());
Expand Down

0 comments on commit 4b89855

Please sign in to comment.