Skip to content
This repository has been archived by the owner on Apr 4, 2019. It is now read-only.

Error while running copy job in standalone mode #41

Open
akshatthakar opened this issue Jun 8, 2017 · 0 comments
Open

Error while running copy job in standalone mode #41

akshatthakar opened this issue Jun 8, 2017 · 0 comments

Comments

@akshatthakar
Copy link

akshatthakar commented Jun 8, 2017

#I running streamx master branch on Apache Kafka .10.0 server and trying to copy Kafka topic messages to s3. I am getting below error in starting connector. I suspect S3 filesystem bucket and key names are not properly passed to underlying APIs. Please let me know if any hadoop configuration property needed to be added.(hdfs-site.xml published below).

Command to start--
./connect-standalone.sh /app/kafka_2.11-0.10.2.1/config/connect-standalone.properties /app/streamx/streamx-0.1.0-SNAPSHOT-package/etc/streamx/connector1.properties

Connector Propeties- connector1.properties
name=s3-sink
connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=com.qubole.streamx.SourceFormat
tasks.max=1
topics=test
topics.dir=test
logs.dir=logs
flush.size=3
s3.url=s3:https://platform.com/data/rawdata
hadoop.conf.dir=/app/streamx/streamx-0.1.0-SNAPSHOT-package/etc/streamx/hadoop-conf
partitioner.class=io.confluent.connect.hdfs.partitioner.DailyPartitioner

Error-
Caused by: java.io.IOException: / doesn't exist
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy41.retrieveINode(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdir(S3FileSystem.java:165)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdirs(S3FileSystem.java:154)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
at com.qubole.streamx.s3.S3Storage.mkdirs(S3Storage.java:68)
at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:374)
at io.confluent.connect.hdfs.DataWriter.(DataWriter.java:174)

Worker Configuration-
connect-standalone.properties
bootstrap.servers=localhost:9092
Kafka
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/kafkadata/kafka/connect.offsets

offset.flush.interval.ms=10000

hdfs-site.xml
<configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3.S3FileSystem</value> </property> <property> <name>fs.s3.awsAccessKeyId</name> <value>secret</value> </property> <property> <name>fs.s3.awsSecretAccessKey</name> <value>secret</value> </property> </configuration>

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant