cascalog - JCascalog/Pail shredding stage works locall,y but not in Hadoop -
following "big data" lambda architecture book, i've got incoming directory full of typed thift data objects, datapailstructure defined pail.meta file
i take snapshot of data:
pail snapshotpail = newdatapail.snapshot(pailfactory.snapshot);
the incoming files , meta data files duplicated, , pail.meta file has
structure: datapailstructure
now want shred data, split vertical partitions. book, create 2 pailtap objects, 1 snapshot , splitdatastructure, 1 new shredded folder.
pailtap source = datatap(pailfactory.snapshot); pailtap sink = splitdatatap(pailfactory.shredded);
the /shredded folder has pail.meta file structure: splitdatapailstructure
following instructions, execute jcascalog query force reducer:
api.execute(sink, new subquery(data).predicate(reduced, empty, data));
now, in local mode, works fine. there's "temporary" subfolder created under /shredded, , vertically partitioned expected "1/1" structure. in local mode, moved /shredded folder, , can consolidate , merge master without problems.
but running inside hadoop, fails @ point, error:
cascading.tuple.tupleexception: unable sink output identifier: /tmp/swa/shredded ... caused by: java.lang.illegalargumentexception: 1/1/part-000000 not valid pail structure {structure=com.hibu.pail.splitdatapailstructure, args={}, format=sequencefile} --> [1, _temporary, attempt_1393854491571_12900_r_000000_1, 1, 1] @ com.backtype.hadoop.pail.pail.checkvalidstructure(pail.java:563)
needless say, if change shredded sink structure type datapailstructure, works fine, it's pointless operation, in incoming folder. it's okay now, i'm working 1 data type, going change , i'll need partition.
any ideas? didn't want post source code here initially, i'm missing something.
Comments
Post a Comment