scala - Elasticsearch connector works in IDE but not on local cluster -
i trying write twitter stream elasticsearch 2.3 index using provided elasticsearch2 connector
running job in intellij works fine when run jar job on local cluster following error:
05/09/2016 13:26:58 job execution switched status running. 05/09/2016 13:26:58 source: custom source -> (sink: unnamed, sink: unnamed, sink: unnamed)(1/1) switched scheduled 05/09/2016 13:26:58 source: custom source -> (sink: unnamed, sink: unnamed, sink: unnamed)(1/1) switched deploying 05/09/2016 13:26:58 source: custom source -> (sink: unnamed, sink: unnamed, sink: unnamed)(1/1) switched running 05/09/2016 13:26:59 source: custom source -> (sink: unnamed, sink: unnamed, sink: unnamed)(1/1) switched failed java.lang.runtimeexception: client not connected elasticsearch nodes! @ org.apache.flink.streaming.connectors.elasticsearch2.elasticsearchsink.open(elasticsearchsink.java:172) @ org.apache.flink.api.common.functions.util.functionutils.openfunction(functionutils.java:38) @ org.apache.flink.streaming.api.operators.abstractudfstreamoperator.open(abstractudfstreamoperator.java:91) @ org.apache.flink.streaming.runtime.tasks.streamtask.openalloperators(streamtask.java:317) @ org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask.java:215) @ org.apache.flink.runtime.taskmanager.task.run(task.java:579) @ java.lang.thread.run(thread.java:745) 05/09/2016 13:26:59 job execution switched status failing. java.lang.runtimeexception: client not connected elasticsearch nodes! @ org.apache.flink.streaming.connectors.elasticsearch2.elasticsearchsink.open(elasticsearchsink.java:172) @ org.apache.flink.api.common.functions.util.functionutils.openfunction(functionutils.java:38) @ org.apache.flink.streaming.api.operators.abstractudfstreamoperator.open(abstractudfstreamoperator.java:91) @ org.apache.flink.streaming.runtime.tasks.streamtask.openalloperators(streamtask.java:317) @ org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask.java:215) @ org.apache.flink.runtime.taskmanager.task.run(task.java:579) @ java.lang.thread.run(thread.java:745) 05/09/2016 13:26:59 job execution switched status failed. ------------------------------------------------------------ program finished following exception: org.apache.flink.client.program.programinvocationexception: program execution failed: job execution failed. @ org.apache.flink.client.program.client.runblocking(client.java:381) @ org.apache.flink.client.program.client.runblocking(client.java:355) @ org.apache.flink.streaming.api.environment.streamcontextenvironment.execute(streamcontextenvironment.java:65) @ org.apache.flink.streaming.api.scala.streamexecutionenvironment.execute(streamexecutionenvironment.scala:541) @ com.pl.greeny.flink.twitteranalysis$.main(twitteranalysis.scala:69) @ com.pl.greeny.flink.twitteranalysis.main(twitteranalysis.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ org.apache.flink.client.program.packagedprogram.callmainmethod(packagedprogram.java:505) @ org.apache.flink.client.program.packagedprogram.invokeinteractivemodeforexecution(packagedprogram.java:403) @ org.apache.flink.client.program.client.runblocking(client.java:248) @ org.apache.flink.client.clifrontend.executeprogramblocking(clifrontend.java:860) @ org.apache.flink.client.clifrontend.run(clifrontend.java:327) @ org.apache.flink.client.clifrontend.parseparameters(clifrontend.java:1187) @ org.apache.flink.client.clifrontend.main(clifrontend.java:1238) caused by: org.apache.flink.runtime.client.jobexecutionexception: job execution failed. @ org.apache.flink.runtime.jobmanager.jobmanager$$anonfun$handlemessage$1$$anonfun$applyorelse$7.apply$mcv$sp(jobmanager.scala:807) @ org.apache.flink.runtime.jobmanager.jobmanager$$anonfun$handlemessage$1$$anonfun$applyorelse$7.apply(jobmanager.scala:753) @ org.apache.flink.runtime.jobmanager.jobmanager$$anonfun$handlemessage$1$$anonfun$applyorelse$7.apply(jobmanager.scala:753) @ scala.concurrent.impl.future$promisecompletingrunnable.liftedtree1$1(future.scala:24) @ scala.concurrent.impl.future$promisecompletingrunnable.run(future.scala:24) @ akka.dispatch.taskinvocation.run(abstractdispatcher.scala:41) @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:401) @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) caused by: java.lang.runtimeexception: client not connected elasticsearch nodes! @ org.apache.flink.streaming.connectors.elasticsearch2.elasticsearchsink.open(elasticsearchsink.java:172) @ org.apache.flink.api.common.functions.util.functionutils.openfunction(functionutils.java:38) @ org.apache.flink.streaming.api.operators.abstractudfstreamoperator.open(abstractudfstreamoperator.java:91) @ org.apache.flink.streaming.runtime.tasks.streamtask.openalloperators(streamtask.java:317) @ org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask.java:215) @ org.apache.flink.runtime.taskmanager.task.run(task.java:579) @ java.lang.thread.run(thread.java:745)
my code in scala:
val config = new java.util.hashmap[string, string] config.put("bulk.flush.max.actions", "1") config.put("cluster.name", "elasticsearch") config.put("node.name", "node-1") config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2/bin") val transports = new util.arraylist[inetsocketaddress] transports.add(new inetsocketaddress(inetaddress.getlocalhost(),9300)) transports.add(new inetsocketaddress(inetaddress.getloopbackaddress(),9300)) transports.add(new inetsocketaddress(inetaddress.getbyname("127.0.0.1"),9300)) transports.add(new inetsocketaddress(inetaddress.getbyname("localhost"),9300)) stream.addsink(new elasticsearchsink(config, transports, new elasticsearchsinktwitter()))
what difference between running program ide , local cluster?
problems caused different ways dependencies managed / included ides (intellij, eclipse) , flink's job submission via fat jars.
i had same problem other day , task manager log file revealed following root cause:
java.lang.illegalargumentexception: spi class of type org.apache.lucene.codecs.postingsformat name 'lucene50' not exist. need add corresponding jar file supporting spi classpath. current classpath supports following names: [es090, completion090, xbloomfilter]
searching error found answer on solved issue:
https://stackoverflow.com/a/38354027/3609571
by adding following dependency pom.xml
:
<dependency> <groupid>org.apache.lucene</groupid> <artifactid>lucene-core</artifactid> <version>5.4.1</version> </dependency>
note, order of dependencies matters in case. worked when putting lucene-core
dependency on top. adding end did not work me. more "hack" proper fix.
Comments
Post a Comment