Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

google cloud datastore put_multi couldn't insert data #3115

Closed
daiyueweng opened this issue Mar 7, 2017 · 12 comments
Closed

google cloud datastore put_multi couldn't insert data #3115

daiyueweng opened this issue Mar 7, 2017 · 12 comments
Assignees
Labels
api: datastore Issues related to the Datastore API.

Comments

@daiyueweng
Copy link

daiyueweng commented Mar 7, 2017

Hi, I am trying to insert 6000 rows/entities into google cloud datastore. I am also using datastore emulator as the local server on Linux Mint 18.1.

In the code, I created an insert function that inserts entities in batches using put_multi and set the batch size to 50. I use python multiprocessing to spawn processes that execute the function.

A slice function is also used to divide the workload based on how many CPU cores are used. e.g. if there are 3 cores, the workload (6000 entities) is divided into 3 parts with 2000 entities each, then each part is inserted by a spawned process that executes the insert function.

After insertion is done, I checked with Cloud Datastore Admin console, but couldn't find the kinds that have been inserted.

I am wondering what is the issue here and how to solve it.

I am using Python 3.5.2.

the code snippet is as follows,

cores_to_use = 3 # cores_to_use is how many cpu cores available for dividing workload

# a datastore client is passed in as the argument
inserter = FastInsertGCDatastore(client)

# entities is a list of datastore entities to be inserted
# the number of entities is 6000 here
input_size = len(entities)
slice_size = int(input_size / cores_to_use)
entity_blocks = []

iterator = iter(entities)
for i in range(cores_to_use):
    entity_blocks.append([])
    for j in range(slice_size):
        entity_blocks[i].append(iterator.__next__())

for block in entity_blocks:
    p = multiprocessing.Process(target=inserter.execute, args=(block,))
    p.start()


class FastInsertGCDatastore:
    """
    batch insert entities into gc datastore based on batch_size and number_of_entities
    """
    def __init__(self, client):
        """
        initialize with datastore client
        :param client: the datastore client
        """
        self.client = client

    def execute(self, entities):
        """
        batch insert entities
        :paratities: a list of datastore entities need to be inserted
        """
        number_of_entities = len(entities)

        batch_size = 50

        batch_documents = [0] * batch_size
        rowct = 0  # entity count as index for accessing rows
        for index in range(number_of_entities):
            try:
                batch_documents[index % batch_size] = entities[rowct]
                rowct += 1
                if (index + 1) % batch_size == 0:
                    self.client.put_multi(batch_documents)
                index += 1
            except Exception as e:
                print('Unexpected error for index ', index, ' message reads', str(e))
                raise e

        # insert any remaining entities
        if not index % batch_size == 0:
            self.client.put_multi(batch_documents[:index % batch_size])
@daspecster
Copy link
Contributor

@daiyueweng the library isn't thread safe 😞. It uses httplib2 (however it's in process of being removed #1998). If you run this in a single process does it work?

@daspecster daspecster added the api: datastore Issues related to the Datastore API. label Mar 7, 2017
@dhermes
Copy link
Contributor

dhermes commented Mar 7, 2017

@daiyueweng The Datastore Admin console will be eventually consistent, so you may not see your inserted entities right away.

As @daspecster mentioned, there are some issues with thread-safety, so sharing a Client across subprocesses may be problematic. Instead, your task should create a client that is local to the subprocess and then you should have no threading issues.

@daiyueweng
Copy link
Author

Hi. changed the code to use the single process and set batch size to 500 (maximal). It took 74 secs to insert 6946 entities, which is slow comparing to the others. I am wondering is there a way to improve the efficiency of inserting in datastore.

One other thing, I use Datastore Admin console to delete entities of a certain kind, which always left one entity undeleted that can seen in Datastore Query that I have to delete it manually, I am wondering how to resolve this issue.

@daiyueweng
Copy link
Author

If I use python multiprocessing, I will get the following errors,

@contextlib.contextmanager
def _grpc_catch_rendezvous():
    """Re-map gRPC exceptions that happen in context.

    .. _code.proto: https://github.com/googleapis/googleapis/blob/\
                    master/google/rpc/code.proto

    Remaps gRPC exceptions to the classes defined in
    :mod:`~google.cloud.exceptions` (according to the description
    in `code.proto`_).
    """
    try:
        yield
    except exceptions.GrpcRendezvous as exc:
        error_code = exc.code()
        error_class = _GRPC_ERROR_MAPPING.get(error_code)
        if error_class is None:
            raise
        else:
>               raise error_class(exc.details())
E               google.cloud.exceptions.BadRequest: 400 cannot write more than 500 entities in a single call

../../.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py:260: BadRequest

@dhermes
Copy link
Contributor

dhermes commented Mar 7, 2017

@daiyueweng That error seems to indicate a bug in your calling code.

@daiyueweng
Copy link
Author

daiyueweng commented Mar 8, 2017

@dhermes I updated with the calling code, got some errors with python multiprocessing,

def multiprocess_insert(val_dicts):
    client = datastore.Client()
    # val_dicts is a list of dictionaries for updating the entities
    entities = [Entity(client.key(self.kind)) for i in range(len(val_dicts))]

    for entity, update_dict in zip(entities, val_dicts):
        entity.update(update_dict)

    cores_to_use = 3 # cores_to_use is how many cpu cores available for dividing workload

    # a datastore client is passed in as the argument
    inserter = FastInsertGCDatastore(client)  # FastInsertGCDatastore class code is in my OP
	
    # entities is a list of datastore entities to be inserted
    # the number of entities is 6000 here
    input_size = len(entities)
    slice_size = int(input_size / cores_to_use)
    entity_blocks = []
	
    iterator = iter(entities)
    for i in range(cores_to_use):
	entity_blocks.append([])
	for j in range(slice_size):
	    entity_blocks[i].append(iterator.__next__())
	
    for block in entity_blocks:
	p = multiprocessing.Process(target=inserter.execute, args=(block,))
	p.start()

Error

    Unexpected error for index  49  message reads 503 {"created":"@1488965334.282616744","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1488965334.282613183","description":"OS Error","errno":104,"file":"src/core/lib/iomgr/tcp_posix.c","file_line":229,"os_error":"Connection reset by peer","syscall":"recvmsg"}]}
    Unexpected error for index  49  message reads 503 {"created":"@1488965334.282625975","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1488965334.282624336","description":"EOF","file":"src/core/lib/iomgr/tcp_posix.c","file_line":235}]}
    Unexpected error for index  49  message reads 500 {"created":"@1488965334.282910547","description":"OS Error","errno":32,"file":"src/core/lib/iomgr/tcp_posix.c","file_line":340,"os_error":"Broken pipe","syscall":"sendmsg"}
    E0308 09:28:54.282614345    5842 ssl_transport_security.c:437] SSL_read returned 0 unexpectedly.
    E0308 09:28:54.282627602    5842 secure_endpoint.c:176]      Decryption error: TSI_INTERNAL_ERROR
    Process Process-2:
    Process Process-1:
    Process Process-3:
    Traceback (most recent call last):
    Traceback (most recent call last):
      File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 253, in _grpc_catch_rendezvous
    yield
      File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 253, in _grpc_catch_rendezvous
    yield
      File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 356, in commit
    return self._stub.Commit(request_pb)
      File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 356, in commit
    return self._stub.Commit(request_pb)
      File "/home/daiyue/.local/lib/python3.5/site-packages/grpc/_channel.py", line 481, in __call__
    return _end_unary_response_blocking(state, False, deadline)
      File "/home/daiyue/.local/lib/python3.5/site-packages/grpc/_channel.py", line 481, in __call__
    return _end_unary_response_blocking(state, False, deadline)
      File "/home/daiyue/.local/lib/python3.5/site-packages/grpc/_channel.py", line 432, in _end_unary_response_blocking
    raise _Rendezvous(state, None, None, deadline)
      File "/home/daiyue/.local/lib/python3.5/site-packages/grpc/_channel.py", line 432, in _end_unary_response_blocking
    raise _Rendezvous(state, None, None, deadline)
    grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, {"created":"@1488965334.282616744","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1488965334.282613183","description":"OS Error","errno":104,"file":"src/core/lib/iomgr/tcp_posix.c","file_line":229,"os_error":"Connection reset by peer","syscall":"recvmsg"}]})>
    grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, {"created":"@1488965334.282625975","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1488965334.282624336","description":"EOF","file":"src/core/lib/iomgr/tcp_posix.c","file_line":235}]})>

@dhermes
Copy link
Contributor

dhermes commented Mar 8, 2017

@daiyueweng Your code still does not create a new client within each subprocess

@daiyueweng
Copy link
Author

@dhermes That was my mistake I didn't put the correct code snippet, I have updated my last post correctly. I am getting those error, even I passed in datastore.Client as the argument.

@dhermes
Copy link
Contributor

dhermes commented Mar 8, 2017

@daiyueweng Your snippet still uses self.client in

    entities = [Entity(self.client.key(self.kind)) for i in range(len(val_dicts))]

That means the code won't run.

@dhermes
Copy link
Contributor

dhermes commented Mar 8, 2017

@daiyueweng I'm saying inserter.execute needs to create an instance of Client, not the code that spawns each process.

@daiyueweng
Copy link
Author

daiyueweng commented Mar 9, 2017

@dhermes changed the code to spawn processes within the execute method of FastInsertGCDatastore class now,

class FastInsertGCDatastore:
    """
    batch insert entities into gc datastore
    """
    def execute(self, entities):
        client = datastore.Client()

        number_of_entities = len(entities)

        batch_size = 50

        batch_documents = [0] * batch_size
        rowct = 0  # entity count as index for accessing rows
        for index in range(number_of_entities):
            try:
                batch_documents[index % batch_size] = entities[rowct]
                rowct += 1
                if (index + 1) % batch_size == 0:
                    # self.client.put_multi(batch_documents)
                    client.put_multi(batch_documents)
                index += 1
            except Exception as e:
                print('Unexpected error for index ', index, ' message reads', str(e))
                raise e

        # insert any that missed the boat
        if not index % batch_size == 0:
            # self.client.put_multi(batch_documents[:index % batch_size])
            client.put_multi(batch_documents[:index % batch_size])

Tested the entire piece of code with inserting over 1 million of entities, using the datastore emulator as local server. The memory was consumed very quickly (had to swap). Errors were generated,

Process Process-1:
Traceback (most recent call last):
  File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 253, in _grpc_catch_rendezvous
    yield
  File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 356, in commit
    return self._stub.Commit(request_pb)
  File "/home/daiyue/.local/lib/python3.5/site-packages/grpc/_channel.py", line 481, in __call__
    return _end_unary_response_blocking(state, False, deadline)
  File "/home/daiyue/.local/lib/python3.5/site-packages/grpc/_channel.py", line 432, in _end_unary_response_blocking
    raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, {"created":"@1489058568.826120836","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1489058568.217166049","description":"EOF","file":"src/core/lib/iomgr/tcp_posix.c","file_line":235}]})>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/daiyue/PycharmProjects/lumar_ingestion/ingestion_db/fast_insert_gc_datastore.py", line 63, in execute
    raise e
  File "/home/daiyue/PycharmProjects/lumar_ingestion/ingestion_db/fast_insert_gc_datastore.py", line 59, in execute
    client.put_multi(batch_documents)
  File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/client.py", line 362, in put_multi
    current.commit()
  File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/batch.py", line 265, in commit
    self._commit()
  File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/batch.py", line 242, in _commit
    self.project, self._commit_request, self._id)
  File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 627, in commit
    response = self._datastore_api.commit(project, request)
  File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 356, in commit
    return self._stub.Commit(request_pb)
  File "/usr/lib/python3.5/contextlib.py", line 77, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/daiyue/.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py", line 260, in _grpc_catch_rendezvous
    raise error_class(exc.details())
google.cloud.exceptions.ServiceUnavailable: 503 {"created":"@1489058568.826120836","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1489058568.217166049","description":"EOF","file":"src/core/lib/iomgr/tcp_posix.c","file_line":235}]}
F
self = <lumar_ingestion.ingestion_workflow_modules.insert_into_database_test.TestInsertIntoDatabase object at 0x7f72c174cb00>

    def test_sap_data_gc_datastore(self):
        # read db config file
        with open('linux_mint_config_datastore.json') as db_config_file:
            db_config = json.load(db_config_file)
    
        # create the input data packages from standard sap test rows
        data_set = create_test_data_objects(erp='sap', data_folder='copaco', stage_to_test='insert_into_database',
                                            use_cache=False, db_config=db_config)
    
        # init google datastore client
        gc_datastore_client = datastore.Client()
    
        # insert every table into gc datastore using InsertIntoDatabase module and db_handler
        for tab in data_set:
            tab.time.start_time = time.time()
            insert_into_db = InsertIntoDatabase(
                db_handler=DatabaseHandler(database=gc_datastore_client, table=tab['table_name']))
>           tab_output = insert_into_db.execute(tab)

ingestion_workflow_modules/insert_into_database_test.py:92: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
ingestion_workflow_modules/insert_into_database.py:72: in execute
    if data.table_name in self.db_handler.get_tables():
ingestion_db/db_handler.py:45: in get_tables
    return self.db_handler.get_tables()
ingestion_db/back_end_interfaces/google_db_handler.py:60: in get_tables
    kinds = [entity.key.id_or_name for entity in query.fetch()
ingestion_db/back_end_interfaces/google_db_handler.py:60: in <listcomp>
    kinds = [entity.key.id_or_name for entity in query.fetch()
../../.local/lib/python3.5/site-packages/google/cloud/iterator.py:210: in _items_iter
    for page in self._page_iter(increment=False):
../../.local/lib/python3.5/site-packages/google/cloud/iterator.py:239: in _page_iter
    page = self._next_page()
../../.local/lib/python3.5/site-packages/google/cloud/datastore/query.py:499: in _next_page
    transaction_id=transaction and transaction.id,
../../.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py:573: in run_query
    response = self._datastore_api.run_query(project, request)
../../.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py:321: in run_query
    return self._stub.RunQuery(request_pb)
/usr/lib/python3.5/contextlib.py:77: in __exit__
    self.gen.throw(type, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @contextlib.contextmanager
    def _grpc_catch_rendezvous():
        """Re-map gRPC exceptions that happen in context.
    
        .. _code.proto: https://github.com/googleapis/googleapis/blob/\
                        master/google/rpc/code.proto
    
        Remaps gRPC exceptions to the classes defined in
        :mod:`~google.cloud.exceptions` (according to the description
        in `code.proto`_).
        """
        try:
            yield
        except exceptions.GrpcRendezvous as exc:
            error_code = exc.code()
            error_class = _GRPC_ERROR_MAPPING.get(error_code)
            if error_class is None:
                raise
            else:
>               raise error_class(exc.details())
E               google.cloud.exceptions.ServiceUnavailable: 503 {"created":"@1489058833.599921395","description":"Secure read failed","file":"src/core/lib/security/transport/secure_endpoint.c","file_line":157,"grpc_status":14,"referenced_errors":[{"created":"@1489058833.376186909","description":"EOF","file":"src/core/lib/iomgr/tcp_posix.c","file_line":235}]}

../../.local/lib/python3.5/site-packages/google/cloud/datastore/_http.py:260: ServiceUnavailable

The code seemed to be partially worked, as I can see from Cloud Datastore console that there were kinds and entities been inserted.

@dhermes
Copy link
Contributor

dhermes commented Mar 9, 2017

The failures may just be from the emulator not being able to handle a certain level of throughput? That is outside our purview. From here, it seems like a retry strategy (on errors) would help you. We are planning to add a retry feature but as of right now you'd need to implement your own.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: datastore Issues related to the Datastore API.
Projects
None yet
Development

No branches or pull requests

3 participants