scala - Elasticsearch connector works in IDE but not on local cluster -


i trying write twitter stream elasticsearch 2.3 index using provided elasticsearch2 connector

running job in intellij works fine when run jar job on local cluster following error:

05/09/2016 13:26:58 job execution switched status running. 05/09/2016 13:26:58 source: custom source -> (sink: unnamed, sink: unnamed, sink: unnamed)(1/1) switched scheduled  05/09/2016 13:26:58 source: custom source -> (sink: unnamed, sink: unnamed, sink: unnamed)(1/1) switched deploying  05/09/2016 13:26:58 source: custom source -> (sink: unnamed, sink: unnamed, sink: unnamed)(1/1) switched running  05/09/2016 13:26:59 source: custom source -> (sink: unnamed, sink: unnamed, sink: unnamed)(1/1) switched failed  java.lang.runtimeexception: client not connected elasticsearch nodes!     @ org.apache.flink.streaming.connectors.elasticsearch2.elasticsearchsink.open(elasticsearchsink.java:172)     @ org.apache.flink.api.common.functions.util.functionutils.openfunction(functionutils.java:38)     @ org.apache.flink.streaming.api.operators.abstractudfstreamoperator.open(abstractudfstreamoperator.java:91)     @ org.apache.flink.streaming.runtime.tasks.streamtask.openalloperators(streamtask.java:317)     @ org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask.java:215)     @ org.apache.flink.runtime.taskmanager.task.run(task.java:579)     @ java.lang.thread.run(thread.java:745)  05/09/2016 13:26:59 job execution switched status failing. java.lang.runtimeexception: client not connected elasticsearch nodes!     @ org.apache.flink.streaming.connectors.elasticsearch2.elasticsearchsink.open(elasticsearchsink.java:172)     @ org.apache.flink.api.common.functions.util.functionutils.openfunction(functionutils.java:38)     @ org.apache.flink.streaming.api.operators.abstractudfstreamoperator.open(abstractudfstreamoperator.java:91)     @ org.apache.flink.streaming.runtime.tasks.streamtask.openalloperators(streamtask.java:317)     @ org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask.java:215)     @ org.apache.flink.runtime.taskmanager.task.run(task.java:579)     @ java.lang.thread.run(thread.java:745) 05/09/2016 13:26:59 job execution switched status failed.  ------------------------------------------------------------  program finished following exception:  org.apache.flink.client.program.programinvocationexception: program execution failed: job execution failed.     @ org.apache.flink.client.program.client.runblocking(client.java:381)     @ org.apache.flink.client.program.client.runblocking(client.java:355)     @ org.apache.flink.streaming.api.environment.streamcontextenvironment.execute(streamcontextenvironment.java:65)     @ org.apache.flink.streaming.api.scala.streamexecutionenvironment.execute(streamexecutionenvironment.scala:541)     @ com.pl.greeny.flink.twitteranalysis$.main(twitteranalysis.scala:69)     @ com.pl.greeny.flink.twitteranalysis.main(twitteranalysis.scala)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:498)     @ org.apache.flink.client.program.packagedprogram.callmainmethod(packagedprogram.java:505)     @ org.apache.flink.client.program.packagedprogram.invokeinteractivemodeforexecution(packagedprogram.java:403)     @ org.apache.flink.client.program.client.runblocking(client.java:248)     @ org.apache.flink.client.clifrontend.executeprogramblocking(clifrontend.java:860)     @ org.apache.flink.client.clifrontend.run(clifrontend.java:327)     @ org.apache.flink.client.clifrontend.parseparameters(clifrontend.java:1187)     @ org.apache.flink.client.clifrontend.main(clifrontend.java:1238) caused by: org.apache.flink.runtime.client.jobexecutionexception: job execution failed.     @ org.apache.flink.runtime.jobmanager.jobmanager$$anonfun$handlemessage$1$$anonfun$applyorelse$7.apply$mcv$sp(jobmanager.scala:807)     @ org.apache.flink.runtime.jobmanager.jobmanager$$anonfun$handlemessage$1$$anonfun$applyorelse$7.apply(jobmanager.scala:753)     @ org.apache.flink.runtime.jobmanager.jobmanager$$anonfun$handlemessage$1$$anonfun$applyorelse$7.apply(jobmanager.scala:753)     @ scala.concurrent.impl.future$promisecompletingrunnable.liftedtree1$1(future.scala:24)     @ scala.concurrent.impl.future$promisecompletingrunnable.run(future.scala:24)     @ akka.dispatch.taskinvocation.run(abstractdispatcher.scala:41)     @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:401)     @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260)     @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339)     @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979)     @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) caused by: java.lang.runtimeexception: client not connected elasticsearch nodes!     @ org.apache.flink.streaming.connectors.elasticsearch2.elasticsearchsink.open(elasticsearchsink.java:172)     @ org.apache.flink.api.common.functions.util.functionutils.openfunction(functionutils.java:38)     @ org.apache.flink.streaming.api.operators.abstractudfstreamoperator.open(abstractudfstreamoperator.java:91)     @ org.apache.flink.streaming.runtime.tasks.streamtask.openalloperators(streamtask.java:317)     @ org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask.java:215)     @ org.apache.flink.runtime.taskmanager.task.run(task.java:579)     @ java.lang.thread.run(thread.java:745) 

my code in scala:

val config = new java.util.hashmap[string, string]       config.put("bulk.flush.max.actions", "1")       config.put("cluster.name", "elasticsearch")       config.put("node.name", "node-1")        config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2/bin")       val transports = new util.arraylist[inetsocketaddress]       transports.add(new inetsocketaddress(inetaddress.getlocalhost(),9300))     transports.add(new inetsocketaddress(inetaddress.getloopbackaddress(),9300))     transports.add(new inetsocketaddress(inetaddress.getbyname("127.0.0.1"),9300))     transports.add(new inetsocketaddress(inetaddress.getbyname("localhost"),9300))     stream.addsink(new elasticsearchsink(config, transports, new elasticsearchsinktwitter())) 

what difference between running program ide , local cluster?

problems caused different ways dependencies managed / included ides (intellij, eclipse) , flink's job submission via fat jars.

i had same problem other day , task manager log file revealed following root cause:

java.lang.illegalargumentexception: spi class of type org.apache.lucene.codecs.postingsformat name 'lucene50' not exist. need add corresponding jar file supporting spi classpath. current classpath supports following names: [es090, completion090, xbloomfilter]

searching error found answer on solved issue:

https://stackoverflow.com/a/38354027/3609571

by adding following dependency pom.xml:

<dependency>     <groupid>org.apache.lucene</groupid>     <artifactid>lucene-core</artifactid>     <version>5.4.1</version>  </dependency> 

note, order of dependencies matters in case. worked when putting lucene-core dependency on top. adding end did not work me. more "hack" proper fix.


Comments

Popular posts from this blog

javascript - Laravel datatable invalid JSON response -

java - Exception in thread "main" org.springframework.context.ApplicationContextException: Unable to start embedded container; -

sql server 2008 - My Sql Code Get An Error Of Msg 245, Level 16, State 1, Line 1 Conversion failed when converting the varchar value '8:45 AM' to data type int -