Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"TypeError: a bytes-like object is required, not 'str'" with HDFS/Hadoop #2402

Closed
Meffi42 opened this issue Apr 14, 2018 · 2 comments
Closed

Comments

@Meffi42
Copy link

Meffi42 commented Apr 14, 2018

I saw several issues mentioning the error message "TypeError: a bytes-like object is required, not 'str'".
From the other issues, I could not derive how I can resolve my problem.

For me, the error message occurs when reading and writing HDFS targets:

ERROR: [pid 855] Worker Worker(salt=806869798, workers=1, host=quickstart.cloudera, username=cloudera, pid=855) failed    ATask(tasklist=, inputfile=Person.json, is_local=False)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/luigi/worker.py", line 194, in run
    new_deps = self._run_get_new_deps()
  File "/usr/local/lib/python3.6/site-packages/luigi/worker.py", line 131, in _run_get_new_deps
    task_gen = self.task.run()
  File "/usr/local/lib/python3.6/site-packages/luigi/contrib/hadoop.py", line 740, in run
    self.job_runner().run_job(self)
  File "/usr/local/lib/python3.6/site-packages/luigi/contrib/hadoop.py", line 579, in run_job
    job.dump(self.tmp_dir)
  File "/usr/local/lib/python3.6/site-packages/luigi/contrib/hadoop.py", line 983, in dump
    d = d.replace(b'(c__main__', "(c" + module_name)
TypeError: a bytes-like object is required, not 'str'

Consequently, one of my tasks fails:

===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 1 present dependencies were encountered:
    - 1 InputFile(filename=Person.json, is_local=False)
* 1 failed:
    - 1 ATask(tasklist=, inputfile=Person.json, is_local=False)
* 2 were left pending, among these:
    * 2 had failed dependencies:
        - 2 ATask(tasklist=A, inputfile=Person.json, is_local=False) and ATask(tasklist=AA, inputfile=Person.json, is_local=False)

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

My code parses a sequence of characters, e.g., "AAA", and dynamically chains three "ATask" MapReduce jobs (later, I want to be able to produce chains of different jobs).

It all works fine when I read and write local targets, the jobs get chained and use intermediate temporary files "temp0.json", "temp1.json", ... to exchange data between tasks.

But when reading and writing HDFS targets, I get the above error message.

This is my code:


import luigi
import sys
import radb
import radb.parse
import json
import luigi.contrib.hadoop
import luigi.contrib.hdfs

'''
Read the original input as a local target or a HDFS target.
'''
class InputFile(luigi.ExternalTask):
    filename = luigi.Parameter()
    is_local = luigi.BoolParameter()

    def output(self):
        if self.is_local:
            return luigi.LocalTarget(self.filename)
        else:
            return luigi.contrib.hdfs.HdfsTarget(self.filename)

'''
The ATask takes input of the form "relationname \t json-encoded-dictionary",
and shrinks the dictionary by removing one key-value pair.

(This is just a dummy functionality, later I want to chain more interesting MapReduce jobs.)
'''
class ATask(luigi.contrib.hadoop.JobTask):
    tasklist = luigi.Parameter()
    inputfile = luigi.Parameter()
    is_local = luigi.BoolParameter()

    def requires(self):
        step = len(self.tasklist)

        if step == 0:
            return [InputFile(filename = self.inputfile, is_local = self.is_local)]

        else:
            first = self.tasklist[0]

            if first == 'A':
                return [ATask(self.tasklist[1:], inputfile=self.inputfile, is_local = self.is_local)]
            else:
                raise Error("Unknown Task identifier.")

    def output(self):
        output_filename = "tmp" + str(len(self.tasklist)) + ".json"
        if self.is_local:
            return luigi.LocalTarget(output_filename)
        else:
            return luigi.contrib.hdfs.HdfsTarget(output_filename)


    def mapper(self, line):
        assert(len(self.tasklist) >= 0)

        relation_name, tuple = line.split('\t')
            
        doc = json.loads(tuple)
        if any(doc):
            doc.pop( list(doc.keys())[0] )
            
        yield(relation_name, json.dumps(doc))


if __name__ == '__main__':
    tasklist = "AAA" # process tasks from right to left, currently just A-tasks 
    is_local = True # does not work for FALSE

    '''
    Dynamically build a chain of ATasks, as many as there are in the tasklist.
    Later, we'll have other kinds of tasks as well...
    '''

    luigi.build([ ATask(tasklist = tasklist[1:], inputfile = 'Person.json', is_local=is_local) ], local_scheduler=True)

My textual input is of the form

Person  {"name": "Amy", "age": 16, "gender": "male"}
Person  {"name": "Cal", "age": 33, "gender": "male"}
Person  {"name": "Dan", "age": 13, "gender": "male"}
@Meffi42
Copy link
Author

Meffi42 commented Apr 15, 2018

Smaller code example, same problem:

import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs

class InputFile(luigi.ExternalTask):
    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('Person.json')

class Test(luigi.contrib.hadoop.JobTask):
    def requires(self):
        return [InputFile()]

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget("out") 

    def mapper(self, line):
        yield("foo", "bar")  

if __name__ == '__main__':
    luigi.build([ Test() ], local_scheduler=True)

@Meffi42
Copy link
Author

Meffi42 commented Apr 19, 2018

It turns out this has nothing to do with HdfsTarget,
but with my main function.

I can execute the task without problems when calling it from the commandline as "PYTHONPATH=. luigi ".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant